1 /*
2 * Copyright 2004-2026 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
10 #include <crm_internal.h>
11
12 #include <stdbool.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <signal.h>
16 #include <errno.h>
17
18 #include <sys/wait.h>
19
20 #include <crm/crm.h>
21 #include <crm/common/xml.h>
22 #include <crm/common/mainloop.h>
23
24 #include <qb/qbarray.h>
25
26 struct trigger_s {
27 GSource source;
28 gboolean running;
29 gboolean trigger;
30 void *user_data;
31 guint id;
32
33 };
34
35 struct mainloop_timer_s {
36 guint id;
37 guint period_ms;
38 bool repeat;
39 char *name;
40 GSourceFunc cb;
41 void *userdata;
42 };
43
44 static gboolean
45 crm_trigger_prepare(GSource * source, gint * timeout)
46 {
47 crm_trigger_t *trig = (crm_trigger_t *) source;
48
49 /* cluster-glue's FD and IPC related sources make use of
50 * g_source_add_poll() but do not set a timeout in their prepare
51 * functions
52 *
53 * This means mainloop's poll() will block until an event for one
54 * of these sources occurs - any /other/ type of source, such as
55 * this one or g_idle_*, that doesn't use g_source_add_poll() is
56 * S-O-L and won't be processed until there is something fd-based
57 * happens.
58 *
59 * Luckily the timeout we can set here affects all sources and
60 * puts an upper limit on how long poll() can take.
61 *
62 * So unconditionally set a small-ish timeout, not too small that
63 * we're in constant motion, which will act as an upper bound on
64 * how long the signal handling might be delayed for.
65 */
66 *timeout = 500; /* Timeout in ms */
67
68 return trig->trigger;
69 }
70
71 static gboolean
72 crm_trigger_check(GSource * source)
73 {
74 crm_trigger_t *trig = (crm_trigger_t *) source;
75
76 return trig->trigger;
77 }
78
79 /*!
80 * \internal
81 * \brief GSource dispatch function for crm_trigger_t
82 *
83 * \param[in] source crm_trigger_t being dispatched
84 * \param[in] callback Callback passed at source creation
85 * \param[in,out] userdata User data passed at source creation
86 *
87 * \return G_SOURCE_REMOVE to remove source, G_SOURCE_CONTINUE to keep it
88 */
89 static gboolean
90 crm_trigger_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
91 {
92 gboolean rc = G_SOURCE_CONTINUE;
93 crm_trigger_t *trig = (crm_trigger_t *) source;
94
95 if (trig->running) {
96 /* Wait until the existing job is complete before starting the next one */
97 return G_SOURCE_CONTINUE;
98 }
99 trig->trigger = FALSE;
100
101 if (callback) {
102 int callback_rc = callback(trig->user_data);
103
104 if (callback_rc < 0) {
105 pcmk__trace("Trigger handler %p not yet complete", trig);
106 trig->running = TRUE;
107 } else if (callback_rc == 0) {
108 rc = G_SOURCE_REMOVE;
109 }
110 }
111 return rc;
112 }
113
114 static void
115 crm_trigger_finalize(GSource * source)
116 {
117 pcmk__trace("Trigger %p destroyed", source);
118 }
119
120 static GSourceFuncs crm_trigger_funcs = {
121 crm_trigger_prepare,
122 crm_trigger_check,
123 crm_trigger_dispatch,
124 crm_trigger_finalize,
125 };
126
127 static crm_trigger_t *
128 mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
129 gpointer userdata)
130 {
131 crm_trigger_t *trigger = NULL;
132
133 trigger = (crm_trigger_t *) source;
134
135 trigger->id = 0;
136 trigger->trigger = FALSE;
137 trigger->user_data = userdata;
138
139 if (dispatch) {
140 g_source_set_callback(source, dispatch, trigger, NULL);
141 }
142
143 g_source_set_priority(source, priority);
144 g_source_set_can_recurse(source, FALSE);
145
146 trigger->id = g_source_attach(source, NULL);
147 return trigger;
148 }
149
150 void
151 mainloop_trigger_complete(crm_trigger_t * trig)
152 {
153 pcmk__trace("Trigger handler %p complete", trig);
154 trig->running = FALSE;
155 }
156
157 /*!
158 * \brief Create a trigger to be used as a mainloop source
159 *
160 * \param[in] priority Relative priority of source (lower number is higher priority)
161 * \param[in] dispatch Trigger dispatch function (should return 0 to remove the
162 * trigger from the mainloop, -1 if the trigger should be
163 * kept but the job is still running and not complete, and
164 * 1 if the trigger should be kept and the job is complete)
165 * \param[in] userdata Pointer to pass to \p dispatch
166 *
167 * \return Newly allocated mainloop source for trigger
168 */
169 crm_trigger_t *
170 mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data),
171 gpointer userdata)
172 {
173 GSource *source = NULL;
174
175 pcmk__assert(sizeof(crm_trigger_t) > sizeof(GSource));
176 source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
177
178 return mainloop_setup_trigger(source, priority, dispatch, userdata);
179 }
180
181 void
182 mainloop_set_trigger(crm_trigger_t * source)
183 {
184 if(source) {
185 source->trigger = TRUE;
186 }
187 }
188
189 gboolean
190 mainloop_destroy_trigger(crm_trigger_t * source)
191 {
192 GSource *gs = NULL;
193
194 if(source == NULL) {
195 return TRUE;
196 }
197
198 gs = (GSource *)source;
199
200 g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
201 g_source_unref(gs); /* The caller no longer carries a reference to source
202 *
203 * At this point the source should be free'd,
204 * unless we're currently processing said
205 * source, in which case mainloop holds an
206 * additional reference and it will be free'd
207 * once our processing completes
208 */
209 return TRUE;
210 }
211
212 // Define a custom glib source for signal handling
213
214 // Data structure for custom glib source
215 typedef struct {
216 crm_trigger_t trigger; // trigger that invoked source (must be first)
217 void (*handler) (int sig); // signal handler
218 int signal; // signal that was received
219 } crm_signal_t;
220
221 // Table to associate signal handlers with signal numbers
222 static crm_signal_t *crm_signals[NSIG];
223
224 /*!
225 * \internal
226 * \brief Dispatch an event from custom glib source for signals
227 *
228 * Given an signal event, clear the event trigger and call any registered
229 * signal handler.
230 *
231 * \param[in] source glib source that triggered this dispatch
232 * \param[in] callback (ignored)
233 * \param[in] userdata (ignored)
234 */
235 static gboolean
236 crm_signal_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
237 {
238 crm_signal_t *sig = (crm_signal_t *) source;
239
240 if(sig->signal != SIGCHLD) {
241 pcmk__notice("Caught '%s' signal " QB_XS " %d (%s handler)",
242 strsignal(sig->signal), sig->signal,
243 ((sig->handler != NULL)? "invoking" : "no"));
244 }
245
246 sig->trigger.trigger = FALSE;
247 if (sig->handler) {
248 sig->handler(sig->signal);
249 }
250 return TRUE;
251 }
252
253 /*!
254 * \internal
255 * \brief Handle a signal by setting a trigger for signal source
256 *
257 * \param[in] sig Signal number that was received
258 *
259 * \note This is the true signal handler for the mainloop signal source, and
260 * must be async-safe.
261 */
262 static void
263 mainloop_signal_handler(int sig)
264 {
265 if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
266 mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
267 }
268 }
269
270 // Functions implementing our custom glib source for signal handling
271 static GSourceFuncs crm_signal_funcs = {
272 crm_trigger_prepare,
273 crm_trigger_check,
274 crm_signal_dispatch,
275 crm_trigger_finalize,
276 };
277
278 /*!
279 * \internal
280 * \brief Set a true signal handler
281 *
282 * signal()-like interface to sigaction()
283 *
284 * \param[in] sig Signal number to register handler for
285 * \param[in] dispatch Signal handler
286 *
287 * \return The previous value of the signal handler, or SIG_ERR on error
288 * \note The dispatch function must be async-safe.
289 */
290 sighandler_t
291 crm_signal_handler(int sig, sighandler_t dispatch)
292 {
293 sigset_t mask;
294 struct sigaction sa;
295 struct sigaction old;
296
297 if (sigemptyset(&mask) < 0) {
298 pcmk__err("Could not %sset handler for signal %d: %s",
299 ((dispatch == NULL)? "un" : ""), sig, strerror(errno));
300 return SIG_ERR;
301 }
302
303 memset(&sa, 0, sizeof(struct sigaction));
304 sa.sa_handler = dispatch;
305 sa.sa_flags = SA_RESTART;
306 sa.sa_mask = mask;
307
308 if (sigaction(sig, &sa, &old) < 0) {
309 pcmk__err("Could not %sset handler for signal %d: %s",
310 ((dispatch == NULL)? "un" : ""), sig, strerror(errno));
311 return SIG_ERR;
312 }
313 return old.sa_handler;
314 }
315
316 static void
317 mainloop_destroy_signal_entry(int sig)
318 {
319 crm_signal_t *tmp = crm_signals[sig];
320
321 if (tmp != NULL) {
322 crm_signals[sig] = NULL;
323 pcmk__trace("Unregistering mainloop handler for signal %d", sig);
324 mainloop_destroy_trigger((crm_trigger_t *) tmp);
325 }
326 }
327
328 /*!
329 * \internal
330 * \brief Add a signal handler to a mainloop
331 *
332 * \param[in] sig Signal number to handle
333 * \param[in] dispatch Signal handler function (\c NULL to ignore the signal)
334 *
335 * \note The true signal handler merely sets a mainloop trigger to call this
336 * dispatch function via the mainloop. Therefore, the dispatch function
337 * does not need to be async-safe.
338 */
339 gboolean
340 mainloop_add_signal(int sig, void (*dispatch) (int sig))
341 {
342 GSource *source = NULL;
343 int priority = G_PRIORITY_HIGH - 1;
344
345 if (sig == SIGTERM) {
346 /* TERM is higher priority than other signals,
347 * signals are higher priority than other ipc.
348 * Yes, minus: smaller is "higher"
349 */
350 priority--;
351 }
352
353 if (sig >= NSIG || sig < 0) {
354 pcmk__err("Signal %d is out of range", sig);
355 return FALSE;
356
357 } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
358 pcmk__trace("Signal handler for %d is already installed", sig);
359 return TRUE;
360
361 } else if (crm_signals[sig] != NULL) {
362 pcmk__err("Different signal handler for %d is already installed", sig);
363 return FALSE;
364 }
365
366 pcmk__assert(sizeof(crm_signal_t) > sizeof(GSource));
367 source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
368
369 crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
370 pcmk__assert(crm_signals[sig] != NULL);
371
372 crm_signals[sig]->handler = dispatch;
373 crm_signals[sig]->signal = sig;
374
375 if (crm_signal_handler(sig, mainloop_signal_handler) == SIG_ERR) {
376 mainloop_destroy_signal_entry(sig);
377 return FALSE;
378 }
379
380 return TRUE;
381 }
382
383 gboolean
384 mainloop_destroy_signal(int sig)
385 {
386 if (sig >= NSIG || sig < 0) {
387 pcmk__err("Signal %d is out of range", sig);
388 return FALSE;
389
390 } else if (crm_signal_handler(sig, NULL) == SIG_ERR) {
391 // Error already logged
392 return FALSE;
393
394 } else if (crm_signals[sig] == NULL) {
395 return TRUE;
396 }
397 mainloop_destroy_signal_entry(sig);
398 return TRUE;
399 }
400
401 static qb_array_t *gio_map = NULL;
402
403 void
404 mainloop_cleanup(void)
405 {
406 g_clear_pointer(&gio_map, qb_array_free);
407
408 for (int sig = 0; sig < NSIG; ++sig) {
409 mainloop_destroy_signal_entry(sig);
410 }
411 }
412
413 /*
414 * libqb...
415 */
416 struct gio_to_qb_poll {
417 int32_t is_used;
418 guint source;
419 int32_t events;
420 void *data;
421 qb_ipcs_dispatch_fn_t fn;
422 enum qb_loop_priority p;
423 };
424
425 static gboolean
426 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
427 {
428 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
429 gint fd = g_io_channel_unix_get_fd(gio);
430
431 pcmk__trace("%p.%d %d", data, fd, condition);
432
433 /* if this assert get's hit, then there is a race condition between
434 * when we destroy a fd and when mainloop actually gives it up */
435 pcmk__assert(adaptor->is_used > 0);
436
437 return (adaptor->fn(fd, condition, adaptor->data) == 0);
438 }
439
440 static void
441 gio_poll_destroy(gpointer data)
442 {
443 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
444
445 adaptor->is_used--;
446 pcmk__assert(adaptor->is_used >= 0);
447
448 if (adaptor->is_used == 0) {
449 pcmk__trace("Marking adaptor %p unused", adaptor);
450 adaptor->source = 0;
451 }
452 }
453
454 /*!
455 * \internal
456 * \brief Convert libqb's poll priority into GLib's one
457 *
458 * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
459 *
460 * \return best matching GLib's priority
461 */
462 static gint
463 conv_prio_libqb2glib(enum qb_loop_priority prio)
464 {
465 switch (prio) {
466 case QB_LOOP_LOW: return G_PRIORITY_LOW;
467 case QB_LOOP_HIGH: return G_PRIORITY_HIGH;
468 default: return G_PRIORITY_DEFAULT; // QB_LOOP_MED
469 }
470 }
471
472 /*!
473 * \internal
474 * \brief Convert libqb's poll priority to rate limiting spec
475 *
476 * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
477 *
478 * \return best matching rate limiting spec
479 * \note This is the inverse of libqb's qb_ipcs_request_rate_limit().
480 */
481 static enum qb_ipcs_rate_limit
482 conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
483 {
484 switch (prio) {
485 case QB_LOOP_LOW: return QB_IPCS_RATE_SLOW;
486 case QB_LOOP_HIGH: return QB_IPCS_RATE_FAST;
487 default: return QB_IPCS_RATE_NORMAL; // QB_LOOP_MED
488 }
489 }
490
491 static int32_t
492 gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
493 void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
494 {
495 struct gio_to_qb_poll *adaptor;
496 GIOChannel *channel;
497 int32_t res = 0;
498
499 res = qb_array_index(gio_map, fd, (void **)&adaptor);
500 if (res < 0) {
501 pcmk__err("Array lookup failed for fd=%d: %d", fd, res);
502 return res;
503 }
504
505 pcmk__trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
506
507 if (add && adaptor->source) {
508 pcmk__err("Adaptor for descriptor %d is still in-use", fd);
509 return -EEXIST;
510 }
511 if (!add && !adaptor->is_used) {
512 pcmk__err("Adaptor for descriptor %d is not in-use", fd);
513 return -ENOENT;
514 }
515
516 /* channel is created with ref_count = 1 */
517 channel = g_io_channel_unix_new(fd);
518 if (!channel) {
519 pcmk__err("No memory left to add fd=%d", fd);
520 return -ENOMEM;
521 }
522
523 if (adaptor->source) {
524 g_source_remove(adaptor->source);
525 adaptor->source = 0;
526 }
527
528 /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
529 evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
530
531 adaptor->fn = fn;
532 adaptor->events = evts;
533 adaptor->data = data;
534 adaptor->p = p;
535 adaptor->is_used++;
536 adaptor->source =
537 g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
538 gio_read_socket, adaptor, gio_poll_destroy);
539
540 /* Now that mainloop now holds a reference to channel,
541 * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
542 *
543 * This means that channel will be free'd by:
544 * g_main_context_dispatch()
545 * -> g_source_destroy_internal()
546 * -> g_source_callback_unref()
547 * shortly after gio_poll_destroy() completes
548 */
549 g_io_channel_unref(channel);
550
551 pcmk__trace("Added to mainloop with gsource id=%d", adaptor->source);
552 if (adaptor->source > 0) {
553 return 0;
554 }
555
556 return -EINVAL;
557 }
558
559 static int32_t
560 gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
561 void *data, qb_ipcs_dispatch_fn_t fn)
562 {
563 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
564 }
565
566 static int32_t
567 gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
568 void *data, qb_ipcs_dispatch_fn_t fn)
569 {
570 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
571 }
572
573 static int32_t
574 gio_poll_dispatch_del(int32_t fd)
575 {
576 struct gio_to_qb_poll *adaptor;
577
578 pcmk__trace("Looking for fd=%d", fd);
579 if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
580 if (adaptor->source) {
581 g_source_remove(adaptor->source);
582 adaptor->source = 0;
583 }
584 }
585 return 0;
586 }
587
588 struct qb_ipcs_poll_handlers gio_poll_funcs = {
589 .job_add = NULL,
590 .dispatch_add = gio_poll_dispatch_add,
591 .dispatch_mod = gio_poll_dispatch_mod,
592 .dispatch_del = gio_poll_dispatch_del,
593 };
594
595 static enum qb_ipc_type
596 pick_ipc_type(enum qb_ipc_type requested)
597 {
598 const char *env = pcmk__env_option(PCMK__ENV_IPC_TYPE);
599
600 if (env && strcmp("shared-mem", env) == 0) {
601 return QB_IPC_SHM;
602 } else if (env && strcmp("socket", env) == 0) {
603 return QB_IPC_SOCKET;
604 } else if (env && strcmp("posix", env) == 0) {
605 return QB_IPC_POSIX_MQ;
606 } else if (env && strcmp("sysv", env) == 0) {
607 return QB_IPC_SYSV_MQ;
608 } else if (requested == QB_IPC_NATIVE) {
609 /* We prefer shared memory because the server never blocks on
610 * send. If part of a message fits into the socket, libqb
611 * needs to block until the remainder can be sent also.
612 * Otherwise the client will wait forever for the remaining
613 * bytes.
614 */
615 return QB_IPC_SHM;
616 }
617 return requested;
618 }
619
620 qb_ipcs_service_t *
621 mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
622 struct qb_ipcs_service_handlers *callbacks)
623 {
624 return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED);
625 }
626
627 qb_ipcs_service_t *
628 mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type,
629 struct qb_ipcs_service_handlers *callbacks,
630 enum qb_loop_priority prio)
631 {
632 int rc = 0;
633 qb_ipcs_service_t *server = NULL;
634
635 if (gio_map == NULL) {
636 gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
637 }
638
639 server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
640
641 if (server == NULL) {
642 pcmk__err("Could not create %s IPC server: %s (%d)", name,
643 pcmk_rc_str(errno), errno);
644 return NULL;
645 }
646
647 if (prio != QB_LOOP_MED) {
648 qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
649 }
650
651 // Enforce a minimum IPC buffer size on all clients
652 qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
653 qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
654
655 rc = qb_ipcs_run(server);
656 if (rc < 0) {
657 pcmk__err("Could not start %s IPC server: %s (%d)", name,
658 pcmk_strerror(rc), rc);
659 return NULL; // qb_ipcs_run() destroys server on failure
660 }
661
662 return server;
663 }
664
665 void
666 mainloop_del_ipc_server(qb_ipcs_service_t * server)
667 {
668 if (server) {
669 qb_ipcs_destroy(server);
670 }
671 }
672
673 struct mainloop_io_s {
674 char *name;
675 void *userdata;
676
677 int fd;
678 guint source;
679 crm_ipc_t *ipc;
680 GIOChannel *channel;
681
682 int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
683 int (*dispatch_fn_io) (gpointer userdata);
684 void (*destroy_fn) (gpointer userdata);
685
686 };
687
688 /*!
689 * \internal
690 * \brief I/O watch callback function (GIOFunc)
691 *
692 * \param[in] gio I/O channel being watched
693 * \param[in] condition I/O condition satisfied
694 * \param[in] data User data passed when source was created
695 *
696 * \return G_SOURCE_REMOVE to remove source, G_SOURCE_CONTINUE to keep it
697 */
698 static gboolean
699 mainloop_gio_callback(GIOChannel *gio, GIOCondition condition, gpointer data)
700 {
701 gboolean rc = G_SOURCE_CONTINUE;
702 mainloop_io_t *client = data;
703
704 pcmk__assert(client->fd == g_io_channel_unix_get_fd(gio));
705
706 if (condition & G_IO_IN) {
707 if (client->ipc) {
708 long read_rc = 0L;
709 int max = 10;
710
711 do {
712 read_rc = crm_ipc_read(client->ipc);
713 if (read_rc <= 0) {
714 pcmk__trace("Could not read IPC message from %s: %s (%ld)",
715 client->name, pcmk_strerror(read_rc), read_rc);
716
717 if (read_rc == -EAGAIN) {
718 continue;
719 }
720
721 } else if (client->dispatch_fn_ipc) {
722 const char *buffer = crm_ipc_buffer(client->ipc);
723
724 pcmk__trace("New %ld-byte IPC message from %s after I/O "
725 "condition %d",
726 read_rc, client->name, (int) condition);
727 if (client->dispatch_fn_ipc(buffer, read_rc, client->userdata) < 0) {
728 pcmk__trace("Connection to %s no longer required",
729 client->name);
730 rc = G_SOURCE_REMOVE;
731 }
732 }
733
734 pcmk__ipc_free_client_buffer(client->ipc);
735
736 } while ((rc == G_SOURCE_CONTINUE) && (--max > 0)
737 && ((read_rc > 0) || (read_rc == -EAGAIN)));
738
739 } else {
740 pcmk__trace("New I/O event for %s after I/O condition %d",
741 client->name, (int) condition);
742 if (client->dispatch_fn_io) {
743 if (client->dispatch_fn_io(client->userdata) < 0) {
744 pcmk__trace("Connection to %s no longer required",
745 client->name);
746 rc = G_SOURCE_REMOVE;
747 }
748 }
749 }
750 }
751
752 if (client->ipc && !crm_ipc_connected(client->ipc)) {
753 pcmk__err("Connection to %s closed " QB_XS " client=%p condition=%d",
754 client->name, client, condition);
755 rc = G_SOURCE_REMOVE;
756
757 } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
758 pcmk__trace("The connection %s[%p] has been closed (I/O condition=%d)",
759 client->name, client, condition);
760 rc = G_SOURCE_REMOVE;
761
762 } else if ((condition & G_IO_IN) == 0) {
763 /*
764 #define GLIB_SYSDEF_POLLIN =1
765 #define GLIB_SYSDEF_POLLPRI =2
766 #define GLIB_SYSDEF_POLLOUT =4
767 #define GLIB_SYSDEF_POLLERR =8
768 #define GLIB_SYSDEF_POLLHUP =16
769 #define GLIB_SYSDEF_POLLNVAL =32
770
771 typedef enum
772 {
773 G_IO_IN GLIB_SYSDEF_POLLIN,
774 G_IO_OUT GLIB_SYSDEF_POLLOUT,
775 G_IO_PRI GLIB_SYSDEF_POLLPRI,
776 G_IO_ERR GLIB_SYSDEF_POLLERR,
777 G_IO_HUP GLIB_SYSDEF_POLLHUP,
778 G_IO_NVAL GLIB_SYSDEF_POLLNVAL
779 } GIOCondition;
780
781 A bitwise combination representing a condition to watch for on an event source.
782
783 G_IO_IN There is data to read.
784 G_IO_OUT Data can be written (without blocking).
785 G_IO_PRI There is urgent data to read.
786 G_IO_ERR Error condition.
787 G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
788 G_IO_NVAL Invalid request. The file descriptor is not open.
789 */
790 pcmk__err("Strange condition: %d", condition);
791 }
792
793 /* G_SOURCE_REMOVE results in mainloop_gio_destroy() being called
794 * just before the source is removed from mainloop
795 */
796 return rc;
797 }
798
799 static void
|
(3) Event deallocator: |
Deallocator for "struct mainloop_io_s". |
| Also see events: |
[allocation][allocation] |
800 mainloop_gio_destroy(gpointer c)
801 {
802 mainloop_io_t *client = c;
803 char *c_name = strdup(client->name);
804
805 /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
806 * client->channel will still have ref_count > 0... should be == 1
807 */
808 pcmk__trace("Destroying client %s[%p]", c_name, c);
809
810 if (client->ipc) {
811 crm_ipc_close(client->ipc);
812 }
813
814 if (client->destroy_fn) {
815 void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
816
817 client->destroy_fn = NULL;
818 destroy_fn(client->userdata);
819 }
820
821 if (client->ipc) {
822 crm_ipc_t *ipc = client->ipc;
823
824 client->ipc = NULL;
825 crm_ipc_destroy(ipc);
826 }
827
828 pcmk__trace("Destroyed client %s[%p]", c_name, c);
829
830 free(client->name);
831 free(client);
832
833 free(c_name);
834 }
835
836 /*!
837 * \brief Connect to IPC and add it as a main loop source
838 *
839 * \param[in,out] ipc IPC connection to add
840 * \param[in] priority Event source priority to use for connection
841 * \param[in] userdata Data to register with callbacks
842 * \param[in] callbacks Dispatch and destroy callbacks for connection
843 * \param[out] source Newly allocated event source
844 *
845 * \return Standard Pacemaker return code
846 *
847 * \note On failure, the caller is still responsible for ipc. On success, the
848 * caller should call mainloop_del_ipc_client() when source is no longer
849 * needed, which will lead to the disconnection of the IPC later in the
850 * main loop if it is connected. However the IPC disconnects,
851 * mainloop_gio_destroy() will free ipc and source after calling the
852 * destroy callback.
853 */
854 int
855 pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata,
856 const struct ipc_client_callbacks *callbacks,
857 mainloop_io_t **source)
858 {
859 int rc = pcmk_rc_ok;
860 int fd = -1;
861 const char *ipc_name = NULL;
862
863 CRM_CHECK((ipc != NULL) && (callbacks != NULL), return EINVAL);
864
865 ipc_name = pcmk__s(crm_ipc_name(ipc), "Pacemaker");
866 rc = pcmk__connect_generic_ipc(ipc);
867 if (rc != pcmk_rc_ok) {
868 pcmk__debug("Connection to %s failed: %s", ipc_name, pcmk_rc_str(rc));
869 return rc;
870 }
871
872 rc = pcmk__ipc_fd(ipc, &fd);
873 if (rc != pcmk_rc_ok) {
874 pcmk__debug("Could not obtain file descriptor for %s IPC: %s", ipc_name,
875 pcmk_rc_str(rc));
876 crm_ipc_close(ipc);
877 return rc;
878 }
879
880 *source = mainloop_add_fd(ipc_name, priority, fd, userdata, NULL);
881 if (*source == NULL) {
882 rc = errno;
883 crm_ipc_close(ipc);
884 return rc;
885 }
886
887 (*source)->ipc = ipc;
888 (*source)->destroy_fn = callbacks->destroy;
889 (*source)->dispatch_fn_ipc = callbacks->dispatch;
890 return pcmk_rc_ok;
891 }
892
893 /*!
894 * \brief Get period for mainloop timer
895 *
896 * \param[in] timer Timer
897 *
898 * \return Period in ms
899 */
900 guint
901 pcmk__mainloop_timer_get_period(const mainloop_timer_t *timer)
902 {
903 if (timer) {
904 return timer->period_ms;
905 }
906 return 0;
907 }
908
909 mainloop_io_t *
910 mainloop_add_ipc_client(const char *name, int priority, size_t max_size,
911 void *userdata, struct ipc_client_callbacks *callbacks)
912 {
913 crm_ipc_t *ipc = crm_ipc_new(name, 0);
914 mainloop_io_t *source = NULL;
915 int rc = pcmk__add_mainloop_ipc(ipc, priority, userdata, callbacks,
916 &source);
917
918 if (rc != pcmk_rc_ok) {
919 if (crm_log_level == PCMK__LOG_STDOUT) {
920 fprintf(stderr, "Connection to %s failed: %s",
921 name, pcmk_rc_str(rc));
922 }
923 crm_ipc_destroy(ipc);
924 if (rc > 0) {
925 errno = rc;
926 } else {
927 errno = ENOTCONN;
928 }
929 return NULL;
930 }
931 return source;
932 }
933
934 void
935 mainloop_del_ipc_client(mainloop_io_t * client)
936 {
937 mainloop_del_fd(client);
938 }
939
940 crm_ipc_t *
941 mainloop_get_ipc_client(mainloop_io_t * client)
942 {
943 if (client) {
944 return client->ipc;
945 }
946 return NULL;
947 }
948
949 mainloop_io_t *
950 mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
951 struct mainloop_fd_callbacks * callbacks)
952 {
953 mainloop_io_t *client = NULL;
954
955 if (fd >= 0) {
956 client = calloc(1, sizeof(mainloop_io_t));
957 if (client == NULL) {
958 return NULL;
959 }
960 client->name = strdup(name);
961 client->userdata = userdata;
962
963 if (callbacks) {
964 client->destroy_fn = callbacks->destroy;
965 client->dispatch_fn_io = callbacks->dispatch;
966 }
967
968 client->fd = fd;
|
CID (unavailable; MK=b13c37cce9c4cfa0c98303204ab57333) (#1 of 1): Resource not released (INCOMPLETE_DEALLOCATOR): |
|
(1) Event allocation: |
Memory is allocated. |
|
(2) Event allocation: |
The field "client->channel" is allocated, but not released in the identified deallocator. |
| Also see events: |
[deallocator] |
969 client->channel = g_io_channel_unix_new(fd);
970 client->source =
971 g_io_add_watch_full(client->channel, priority,
972 (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
973 client, mainloop_gio_destroy);
974
975 /* Now that mainloop now holds a reference to channel,
976 * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
977 *
978 * This means that channel will be free'd by:
979 * g_main_context_dispatch() or g_source_remove()
980 * -> g_source_destroy_internal()
981 * -> g_source_callback_unref()
982 * shortly after mainloop_gio_destroy() completes
983 */
984 g_io_channel_unref(client->channel);
985 pcmk__trace("Added connection %d for %s[%p].%d", client->source,
986 client->name, client, fd);
987 } else {
988 errno = EINVAL;
989 }
990
991 return client;
992 }
993
994 void
995 mainloop_del_fd(mainloop_io_t *client)
996 {
997 if ((client == NULL) || (client->source == 0)) {
998 return;
999 }
1000
1001 pcmk__trace("Removing client %s[%p]", client->name, client);
1002
1003 // mainloop_gio_destroy() gets called during source removal
1004 g_source_remove(client->source);
1005 }
1006
1007 static GList *child_list = NULL;
1008
1009 pid_t
1010 mainloop_child_pid(mainloop_child_t * child)
1011 {
1012 return child->pid;
1013 }
1014
1015 const char *
1016 mainloop_child_name(mainloop_child_t * child)
1017 {
1018 return child->desc;
1019 }
1020
1021 int
1022 mainloop_child_timeout(mainloop_child_t * child)
1023 {
1024 return child->timeout;
1025 }
1026
1027 void *
1028 mainloop_child_userdata(mainloop_child_t * child)
1029 {
1030 return child->privatedata;
1031 }
1032
1033 void
1034 mainloop_clear_child_userdata(mainloop_child_t * child)
1035 {
1036 child->privatedata = NULL;
1037 }
1038
1039 /* good function name */
1040 static void
1041 child_free(mainloop_child_t *child)
1042 {
1043 if (child->timerid != 0) {
1044 pcmk__trace("Removing timer %d", child->timerid);
1045 g_source_remove(child->timerid);
1046 child->timerid = 0;
1047 }
1048 free(child->desc);
1049 free(child);
1050 }
1051
1052 /* terrible function name */
1053 static int
1054 child_kill_helper(mainloop_child_t *child)
1055 {
1056 int rc;
1057 if (child->flags & mainloop_leave_pid_group) {
1058 pcmk__debug("Killing PID %lld only. Leaving its process group intact.",
1059 (long long) child->pid);
1060 rc = kill(child->pid, SIGKILL);
1061 } else {
1062 pcmk__debug("Killing PID %lld's entire process group",
1063 (long long) child->pid);
1064 rc = kill(-child->pid, SIGKILL);
1065 }
1066
1067 if (rc < 0) {
1068 if (errno != ESRCH) {
1069 pcmk__err("kill(%d, KILL) failed: %s", child->pid, strerror(errno));
1070 }
1071 return -errno;
1072 }
1073 return 0;
1074 }
1075
1076 static gboolean
1077 child_timeout_callback(gpointer p)
1078 {
1079 mainloop_child_t *child = p;
1080 int rc = 0;
1081
1082 child->timerid = 0;
1083 if (child->timeout) {
1084 pcmk__warn("%s process (PID %lld) will not die!", child->desc,
1085 (long long) child->pid);
1086 return FALSE;
1087 }
1088
1089 rc = child_kill_helper(child);
1090 if (rc == -ESRCH) {
1091 /* Nothing left to do. pid doesn't exist */
1092 return FALSE;
1093 }
1094
1095 child->timeout = TRUE;
1096 pcmk__debug("%s process (PID %lld) timed out", child->desc,
1097 (long long) child->pid);
1098
1099 child->timerid = pcmk__create_timer(5000, child_timeout_callback, child);
1100 return FALSE;
1101 }
1102
1103 static bool
1104 child_waitpid(mainloop_child_t *child, int flags)
1105 {
1106 int rc = 0;
1107 int core = 0;
1108 int signo = 0;
1109 int status = 0;
1110 int exitcode = 0;
1111 bool callback_needed = true;
1112
1113 rc = waitpid(child->pid, &status, flags);
1114 if (rc == 0) { // WNOHANG in flags, and child status is not available
1115 pcmk__trace("Child process %lld (%s) still active",
1116 (long long) child->pid, child->desc);
1117 callback_needed = false;
1118
1119 } else if (rc != child->pid) {
1120 /* According to POSIX, possible conditions:
1121 * - child->pid was non-positive (process group or any child),
1122 * and rc is specific child
1123 * - errno ECHILD (pid does not exist or is not child)
1124 * - errno EINVAL (invalid flags)
1125 * - errno EINTR (caller interrupted by signal)
1126 *
1127 * @TODO Handle these cases more specifically.
1128 */
1129 signo = SIGCHLD;
1130 exitcode = 1;
1131 pcmk__notice("Wait for child process %d (%s) interrupted: %s",
1132 child->pid, child->desc, pcmk_rc_str(errno));
1133
1134 } else if (WIFEXITED(status)) {
1135 exitcode = WEXITSTATUS(status);
1136 pcmk__trace("Child process %lld (%s) exited with status %d",
1137 (long long) child->pid, child->desc, exitcode);
1138
1139 } else if (WIFSIGNALED(status)) {
1140 signo = WTERMSIG(status);
1141 pcmk__trace("Child process %lld (%s) exited with signal %d (%s)",
1142 (long long) child->pid, child->desc, signo,
1143 strsignal(signo));
1144
1145 #ifdef WCOREDUMP // AIX, SunOS, maybe others
1146 } else if (WCOREDUMP(status)) {
1147 core = 1;
1148 pcmk__err("Child process %d (%s) dumped core", child->pid, child->desc);
1149 #endif
1150
1151 } else { // flags must contain WUNTRACED and/or WCONTINUED to reach this
1152 pcmk__trace("Child process %lld (%s) stopped or continued",
1153 (long long) child->pid, child->desc);
1154 callback_needed = false;
1155 }
1156
1157 if (callback_needed && child->exit_fn) {
1158 child->exit_fn(child, core, signo, exitcode);
1159 }
1160 return callback_needed;
1161 }
1162
1163 static void
1164 child_death_dispatch(int signal)
1165 {
1166 for (GList *iter = child_list; iter; ) {
1167 GList *saved = iter;
1168 mainloop_child_t *child = iter->data;
1169
1170 iter = iter->next;
1171 if (child_waitpid(child, WNOHANG)) {
1172 pcmk__trace("Removing completed process %lld from child list",
1173 (long long) child->pid);
1174 child_list = g_list_remove_link(child_list, saved);
1175 g_list_free(saved);
1176 child_free(child);
1177 }
1178 }
1179 }
1180
1181 static gboolean
1182 child_signal_init(gpointer p)
1183 {
1184 pcmk__trace("Installed SIGCHLD handler");
1185 /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
1186 mainloop_add_signal(SIGCHLD, child_death_dispatch);
1187
1188 /* In case they terminated before the signal handler was installed */
1189 child_death_dispatch(SIGCHLD);
1190 return FALSE;
1191 }
1192
1193 gboolean
1194 mainloop_child_kill(pid_t pid)
1195 {
1196 GList *iter;
1197 mainloop_child_t *child = NULL;
1198 mainloop_child_t *match = NULL;
1199 /* It is impossible to block SIGKILL, this allows us to
1200 * call waitpid without WNOHANG flag.*/
1201 int waitflags = 0, rc = 0;
1202
1203 for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1204 child = iter->data;
1205 if (pid == child->pid) {
1206 match = child;
1207 }
1208 }
1209
1210 if (match == NULL) {
1211 return FALSE;
1212 }
1213
1214 rc = child_kill_helper(match);
1215 if(rc == -ESRCH) {
1216 /* It's gone, but hasn't shown up in waitpid() yet. Wait until we get
1217 * SIGCHLD and let handler clean it up as normal (so we get the correct
1218 * return code/status). The blocking alternative would be to call
1219 * child_waitpid(match, 0).
1220 */
1221 pcmk__trace("Waiting for signal that child process %lld completed",
1222 (long long) match->pid);
1223 return TRUE;
1224
1225 } else if(rc != 0) {
1226 /* If KILL for some other reason set the WNOHANG flag since we
1227 * can't be certain what happened.
1228 */
1229 waitflags = WNOHANG;
1230 }
1231
1232 if (!child_waitpid(match, waitflags)) {
1233 /* not much we can do if this occurs */
1234 return FALSE;
1235 }
1236
1237 child_list = g_list_remove(child_list, match);
1238 child_free(match);
1239 return TRUE;
1240 }
1241
1242 /* Create/Log a new tracked process
1243 * To track a process group, use -pid
1244 *
1245 * @TODO Using a non-positive pid (i.e. any child, or process group) would
1246 * likely not be useful since we will free the child after the first
1247 * completed process.
1248 */
1249 void
1250 mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc,
1251 void *privatedata,
1252 enum mainloop_child_flags flags,
1253 pcmk__mainloop_child_exit_fn_t exit_fn)
1254 {
1255 static bool need_init = TRUE;
1256 mainloop_child_t *child = pcmk__assert_alloc(1, sizeof(mainloop_child_t));
1257
1258 child->pid = pid;
1259 child->timerid = 0;
1260 child->timeout = FALSE;
1261 child->privatedata = privatedata;
1262 child->exit_fn = exit_fn;
1263 child->flags = flags;
1264 child->desc = pcmk__str_copy(desc);
1265
1266 if (timeout) {
1267 child->timerid = pcmk__create_timer(timeout, child_timeout_callback, child);
1268 }
1269
1270 child_list = g_list_append(child_list, child);
1271
1272 if(need_init) {
1273 need_init = FALSE;
1274 /* SIGCHLD processing has to be invoked from mainloop.
1275 * We do not want it to be possible to both add a child pid
1276 * to mainloop, and have the pid's exit callback invoked within
1277 * the same callstack. */
1278 pcmk__create_timer(1, child_signal_init, NULL);
1279 }
1280 }
1281
1282 void
1283 mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1284 pcmk__mainloop_child_exit_fn_t exit_fn)
1285 {
1286 mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, exit_fn);
1287 }
1288
1289 static gboolean
1290 mainloop_timer_cb(gpointer user_data)
1291 {
1292 int id = 0;
1293 bool repeat = FALSE;
1294 struct mainloop_timer_s *t = user_data;
1295
1296 pcmk__assert(t != NULL);
1297
1298 id = t->id;
1299 t->id = 0; /* Ensure it's unset during callbacks so that
1300 * mainloop_timer_running() works as expected
1301 */
1302
1303 if(t->cb) {
1304 pcmk__trace("Invoking callbacks for timer %s", t->name);
1305 repeat = t->repeat;
1306 if(t->cb(t->userdata) == FALSE) {
1307 pcmk__trace("Timer %s complete", t->name);
1308 repeat = FALSE;
1309 }
1310 }
1311
1312 if(repeat) {
1313 /* Restore if repeating */
1314 t->id = id;
1315 }
1316
1317 return repeat;
1318 }
1319
1320 bool
1321 mainloop_timer_running(mainloop_timer_t *t)
1322 {
1323 if(t && t->id != 0) {
1324 return TRUE;
1325 }
1326 return FALSE;
1327 }
1328
1329 void
1330 mainloop_timer_start(mainloop_timer_t *t)
1331 {
1332 mainloop_timer_stop(t);
1333 if(t && t->period_ms > 0) {
1334 pcmk__trace("Starting timer %s", t->name);
1335 t->id = pcmk__create_timer(t->period_ms, mainloop_timer_cb, t);
1336 }
1337 }
1338
1339 void
1340 mainloop_timer_stop(mainloop_timer_t *t)
1341 {
1342 if(t && t->id != 0) {
1343 pcmk__trace("Stopping timer %s", t->name);
1344 g_source_remove(t->id);
1345 t->id = 0;
1346 }
1347 }
1348
1349 guint
1350 mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
1351 {
1352 guint last = 0;
1353
1354 if(t) {
1355 last = t->period_ms;
1356 t->period_ms = period_ms;
1357 }
1358
1359 if(t && t->id != 0 && last != t->period_ms) {
1360 mainloop_timer_start(t);
1361 }
1362 return last;
1363 }
1364
1365 mainloop_timer_t *
1366 mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1367 {
1368 mainloop_timer_t *t = pcmk__assert_alloc(1, sizeof(mainloop_timer_t));
1369
1370 if (name != NULL) {
1371 t->name = pcmk__assert_asprintf("%s-%u-%d", name, period_ms, repeat);
1372 } else {
1373 t->name = pcmk__assert_asprintf("%p-%u-%d", t, period_ms, repeat);
1374 }
1375 t->id = 0;
1376 t->period_ms = period_ms;
1377 t->repeat = repeat;
1378 t->cb = cb;
1379 t->userdata = userdata;
1380 pcmk__trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
1381 return t;
1382 }
1383
1384 void
1385 mainloop_timer_del(mainloop_timer_t *t)
1386 {
1387 if(t) {
1388 pcmk__trace("Destroying timer %s", t->name);
1389 mainloop_timer_stop(t);
1390 free(t->name);
1391 free(t);
1392 }
1393 }
1394
1395 /*
1396 * Helpers to make sure certain events aren't lost at shutdown
1397 */
1398
1399 static gboolean
1400 drain_timeout_cb(gpointer user_data)
1401 {
1402 bool *timeout_popped = (bool*) user_data;
1403
1404 *timeout_popped = TRUE;
1405 return FALSE;
1406 }
1407
1408 /*!
1409 * \brief Drain some remaining main loop events then quit it
1410 *
1411 * \param[in,out] mloop Main loop to drain and quit
1412 * \param[in] n Drain up to this many pending events
1413 */
1414 void
1415 pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n)
1416 {
1417 if ((mloop != NULL) && g_main_loop_is_running(mloop)) {
1418 GMainContext *ctx = g_main_loop_get_context(mloop);
1419
1420 /* Drain up to n events in case some memory clean-up is pending
1421 * (helpful to reduce noise in valgrind output).
1422 */
1423 for (int i = 0; (i < n) && g_main_context_pending(ctx); ++i) {
1424 g_main_context_dispatch(ctx);
1425 }
1426 g_main_loop_quit(mloop);
1427 }
1428 }
1429
1430 /*!
1431 * \brief Process main loop events while a certain condition is met
1432 *
1433 * \param[in,out] mloop Main loop to process
1434 * \param[in] timer_ms Don't process longer than this amount of time
1435 * \param[in] check Function that returns true if events should be
1436 * processed
1437 *
1438 * \note This function is intended to be called at shutdown if certain important
1439 * events should not be missed. The caller would likely quit the main loop
1440 * or exit after calling this function. The check() function will be
1441 * passed the remaining timeout in milliseconds.
1442 */
1443 void
1444 pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
1445 {
1446 bool timeout_popped = FALSE;
1447 guint timer = 0;
1448 GMainContext *ctx = NULL;
1449
1450 CRM_CHECK(mloop && check, return);
1451
1452 ctx = g_main_loop_get_context(mloop);
1453 if (ctx) {
1454 time_t start_time = time(NULL);
1455
1456 timer = pcmk__create_timer(timer_ms, drain_timeout_cb, &timeout_popped);
1457 while (!timeout_popped
1458 && check(timer_ms - (time(NULL) - start_time) * 1000)) {
1459 g_main_context_iteration(ctx, TRUE);
1460 }
1461 }
1462 if (!timeout_popped && (timer > 0)) {
1463 g_source_remove(timer);
1464 }
1465 }
1466