1 /*
2 * Copyright 2004-2026 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
10 #include <crm_internal.h>
11
12 #include <stdbool.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <signal.h>
16 #include <errno.h>
17
18 #include <sys/wait.h>
19
20 #include <crm/crm.h>
21 #include <crm/common/xml.h>
22 #include <crm/common/mainloop.h>
23
24 #include <qb/qbarray.h>
25
26 struct trigger_s {
27 GSource source;
28 gboolean running;
29 gboolean trigger;
30 void *user_data;
31 guint id;
32
33 };
34
35 struct mainloop_timer_s {
36 guint id;
37 guint period_ms;
38 bool repeat;
39 char *name;
40 GSourceFunc cb;
41 void *userdata;
42 };
43
44 static gboolean
45 crm_trigger_prepare(GSource * source, gint * timeout)
46 {
47 crm_trigger_t *trig = (crm_trigger_t *) source;
48
49 /* cluster-glue's FD and IPC related sources make use of
50 * g_source_add_poll() but do not set a timeout in their prepare
51 * functions
52 *
53 * This means mainloop's poll() will block until an event for one
54 * of these sources occurs - any /other/ type of source, such as
55 * this one or g_idle_*, that doesn't use g_source_add_poll() is
56 * S-O-L and won't be processed until there is something fd-based
57 * happens.
58 *
59 * Luckily the timeout we can set here affects all sources and
60 * puts an upper limit on how long poll() can take.
61 *
62 * So unconditionally set a small-ish timeout, not too small that
63 * we're in constant motion, which will act as an upper bound on
64 * how long the signal handling might be delayed for.
65 */
66 *timeout = 500; /* Timeout in ms */
67
68 return trig->trigger;
69 }
70
71 static gboolean
72 crm_trigger_check(GSource * source)
73 {
74 crm_trigger_t *trig = (crm_trigger_t *) source;
75
76 return trig->trigger;
77 }
78
79 /*!
80 * \internal
81 * \brief GSource dispatch function for crm_trigger_t
82 *
83 * \param[in] source crm_trigger_t being dispatched
84 * \param[in] callback Callback passed at source creation
85 * \param[in,out] userdata User data passed at source creation
86 *
87 * \return G_SOURCE_REMOVE to remove source, G_SOURCE_CONTINUE to keep it
88 */
89 static gboolean
90 crm_trigger_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
91 {
92 gboolean rc = G_SOURCE_CONTINUE;
93 crm_trigger_t *trig = (crm_trigger_t *) source;
94
95 if (trig->running) {
96 /* Wait until the existing job is complete before starting the next one */
97 return G_SOURCE_CONTINUE;
98 }
99 trig->trigger = FALSE;
100
101 if (callback) {
102 int callback_rc = callback(trig->user_data);
103
104 if (callback_rc < 0) {
105 pcmk__trace("Trigger handler %p not yet complete", trig);
106 trig->running = TRUE;
107 } else if (callback_rc == 0) {
108 rc = G_SOURCE_REMOVE;
109 }
110 }
111 return rc;
112 }
113
114 static void
115 crm_trigger_finalize(GSource * source)
116 {
117 pcmk__trace("Trigger %p destroyed", source);
118 }
119
120 static GSourceFuncs crm_trigger_funcs = {
121 crm_trigger_prepare,
122 crm_trigger_check,
123 crm_trigger_dispatch,
124 crm_trigger_finalize,
125 };
126
127 static crm_trigger_t *
128 mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
129 gpointer userdata)
130 {
131 crm_trigger_t *trigger = NULL;
132
133 trigger = (crm_trigger_t *) source;
134
135 trigger->id = 0;
136 trigger->trigger = FALSE;
137 trigger->user_data = userdata;
138
139 if (dispatch) {
140 g_source_set_callback(source, dispatch, trigger, NULL);
141 }
142
143 g_source_set_priority(source, priority);
144 g_source_set_can_recurse(source, FALSE);
145
146 trigger->id = g_source_attach(source, NULL);
147 return trigger;
148 }
149
150 void
151 mainloop_trigger_complete(crm_trigger_t * trig)
152 {
153 pcmk__trace("Trigger handler %p complete", trig);
154 trig->running = FALSE;
155 }
156
157 /*!
158 * \brief Create a trigger to be used as a mainloop source
159 *
160 * \param[in] priority Relative priority of source (lower number is higher priority)
161 * \param[in] dispatch Trigger dispatch function (should return 0 to remove the
162 * trigger from the mainloop, -1 if the trigger should be
163 * kept but the job is still running and not complete, and
164 * 1 if the trigger should be kept and the job is complete)
165 * \param[in] userdata Pointer to pass to \p dispatch
166 *
167 * \return Newly allocated mainloop source for trigger
168 */
169 crm_trigger_t *
170 mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data),
171 gpointer userdata)
172 {
173 GSource *source = NULL;
174
175 pcmk__assert(sizeof(crm_trigger_t) > sizeof(GSource));
176 source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
177
178 return mainloop_setup_trigger(source, priority, dispatch, userdata);
179 }
180
181 void
182 mainloop_set_trigger(crm_trigger_t * source)
183 {
184 if(source) {
185 source->trigger = TRUE;
186 }
187 }
188
189 gboolean
190 mainloop_destroy_trigger(crm_trigger_t * source)
191 {
192 GSource *gs = NULL;
193
194 if(source == NULL) {
195 return TRUE;
196 }
197
198 gs = (GSource *)source;
199
200 g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
201 g_source_unref(gs); /* The caller no longer carries a reference to source
202 *
203 * At this point the source should be free'd,
204 * unless we're currently processing said
205 * source, in which case mainloop holds an
206 * additional reference and it will be free'd
207 * once our processing completes
208 */
209 return TRUE;
210 }
211
212 // Define a custom glib source for signal handling
213
214 // Data structure for custom glib source
215 typedef struct {
216 crm_trigger_t trigger; // trigger that invoked source (must be first)
217 void (*handler) (int sig); // signal handler
218 int signal; // signal that was received
219 } crm_signal_t;
220
221 // Table to associate signal handlers with signal numbers
222 static crm_signal_t *crm_signals[NSIG];
223
224 /*!
225 * \internal
226 * \brief Dispatch an event from custom glib source for signals
227 *
228 * Given an signal event, clear the event trigger and call any registered
229 * signal handler.
230 *
231 * \param[in] source glib source that triggered this dispatch
232 * \param[in] callback (ignored)
233 * \param[in] userdata (ignored)
234 */
235 static gboolean
236 crm_signal_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
237 {
238 crm_signal_t *sig = (crm_signal_t *) source;
239
240 if(sig->signal != SIGCHLD) {
241 pcmk__notice("Caught '%s' signal " QB_XS " %d (%s handler)",
242 strsignal(sig->signal), sig->signal,
243 ((sig->handler != NULL)? "invoking" : "no"));
244 }
245
246 sig->trigger.trigger = FALSE;
247 if (sig->handler) {
248 sig->handler(sig->signal);
249 }
250 return TRUE;
251 }
252
253 /*!
254 * \internal
255 * \brief Handle a signal by setting a trigger for signal source
256 *
257 * \param[in] sig Signal number that was received
258 *
259 * \note This is the true signal handler for the mainloop signal source, and
260 * must be async-safe.
261 */
262 static void
263 mainloop_signal_handler(int sig)
264 {
265 if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
266 mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
267 }
268 }
269
270 // Functions implementing our custom glib source for signal handling
271 static GSourceFuncs crm_signal_funcs = {
272 crm_trigger_prepare,
273 crm_trigger_check,
274 crm_signal_dispatch,
275 crm_trigger_finalize,
276 };
277
278 /*!
279 * \internal
280 * \brief Set a true signal handler
281 *
282 * signal()-like interface to sigaction()
283 *
284 * \param[in] sig Signal number to register handler for
285 * \param[in] dispatch Signal handler
286 *
287 * \return The previous value of the signal handler, or SIG_ERR on error
288 * \note The dispatch function must be async-safe.
289 */
290 sighandler_t
291 crm_signal_handler(int sig, sighandler_t dispatch)
292 {
293 sigset_t mask;
294 struct sigaction sa;
295 struct sigaction old;
296
297 if (sigemptyset(&mask) < 0) {
298 pcmk__err("Could not %sset handler for signal %d: %s",
299 ((dispatch == NULL)? "un" : ""), sig, strerror(errno));
300 return SIG_ERR;
301 }
302
303 memset(&sa, 0, sizeof(struct sigaction));
304 sa.sa_handler = dispatch;
305 sa.sa_flags = SA_RESTART;
306 sa.sa_mask = mask;
307
308 if (sigaction(sig, &sa, &old) < 0) {
309 pcmk__err("Could not %sset handler for signal %d: %s",
310 ((dispatch == NULL)? "un" : ""), sig, strerror(errno));
311 return SIG_ERR;
312 }
313 return old.sa_handler;
314 }
315
316 static void
317 mainloop_destroy_signal_entry(int sig)
318 {
319 crm_signal_t *tmp = crm_signals[sig];
320
321 if (tmp != NULL) {
322 crm_signals[sig] = NULL;
323 pcmk__trace("Unregistering mainloop handler for signal %d", sig);
324 mainloop_destroy_trigger((crm_trigger_t *) tmp);
325 }
326 }
327
328 /*!
329 * \internal
330 * \brief Add a signal handler to a mainloop
331 *
332 * \param[in] sig Signal number to handle
333 * \param[in] dispatch Signal handler function (\c NULL to ignore the signal)
334 *
335 * \note The true signal handler merely sets a mainloop trigger to call this
336 * dispatch function via the mainloop. Therefore, the dispatch function
337 * does not need to be async-safe.
338 */
339 gboolean
340 mainloop_add_signal(int sig, void (*dispatch) (int sig))
341 {
342 GSource *source = NULL;
343 int priority = G_PRIORITY_HIGH - 1;
344
345 if (sig == SIGTERM) {
346 /* TERM is higher priority than other signals,
347 * signals are higher priority than other ipc.
348 * Yes, minus: smaller is "higher"
349 */
350 priority--;
351 }
352
353 if (sig >= NSIG || sig < 0) {
354 pcmk__err("Signal %d is out of range", sig);
355 return FALSE;
356
357 } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
358 pcmk__trace("Signal handler for %d is already installed", sig);
359 return TRUE;
360
361 } else if (crm_signals[sig] != NULL) {
362 pcmk__err("Different signal handler for %d is already installed", sig);
363 return FALSE;
364 }
365
366 pcmk__assert(sizeof(crm_signal_t) > sizeof(GSource));
367 source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
368
369 crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
370 pcmk__assert(crm_signals[sig] != NULL);
371
372 crm_signals[sig]->handler = dispatch;
373 crm_signals[sig]->signal = sig;
374
375 if (crm_signal_handler(sig, mainloop_signal_handler) == SIG_ERR) {
376 mainloop_destroy_signal_entry(sig);
377 return FALSE;
378 }
379
380 return TRUE;
381 }
382
383 gboolean
384 mainloop_destroy_signal(int sig)
385 {
386 if (sig >= NSIG || sig < 0) {
387 pcmk__err("Signal %d is out of range", sig);
388 return FALSE;
389
390 } else if (crm_signal_handler(sig, NULL) == SIG_ERR) {
391 // Error already logged
392 return FALSE;
393
394 } else if (crm_signals[sig] == NULL) {
395 return TRUE;
396 }
397 mainloop_destroy_signal_entry(sig);
398 return TRUE;
399 }
400
401 static qb_array_t *gio_map = NULL;
402
403 void
404 mainloop_cleanup(void)
405 {
406 if (gio_map != NULL) {
407 qb_array_free(gio_map);
408 gio_map = NULL;
409 }
410
411 for (int sig = 0; sig < NSIG; ++sig) {
412 mainloop_destroy_signal_entry(sig);
413 }
414 }
415
416 /*
417 * libqb...
418 */
419 struct gio_to_qb_poll {
420 int32_t is_used;
421 guint source;
422 int32_t events;
423 void *data;
424 qb_ipcs_dispatch_fn_t fn;
425 enum qb_loop_priority p;
426 };
427
428 static gboolean
429 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
430 {
431 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
432 gint fd = g_io_channel_unix_get_fd(gio);
433
434 pcmk__trace("%p.%d %d", data, fd, condition);
435
436 /* if this assert get's hit, then there is a race condition between
437 * when we destroy a fd and when mainloop actually gives it up */
438 pcmk__assert(adaptor->is_used > 0);
439
440 return (adaptor->fn(fd, condition, adaptor->data) == 0);
441 }
442
443 static void
444 gio_poll_destroy(gpointer data)
445 {
446 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
447
448 adaptor->is_used--;
449 pcmk__assert(adaptor->is_used >= 0);
450
451 if (adaptor->is_used == 0) {
452 pcmk__trace("Marking adaptor %p unused", adaptor);
453 adaptor->source = 0;
454 }
455 }
456
457 /*!
458 * \internal
459 * \brief Convert libqb's poll priority into GLib's one
460 *
461 * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
462 *
463 * \return best matching GLib's priority
464 */
465 static gint
466 conv_prio_libqb2glib(enum qb_loop_priority prio)
467 {
468 switch (prio) {
469 case QB_LOOP_LOW: return G_PRIORITY_LOW;
470 case QB_LOOP_HIGH: return G_PRIORITY_HIGH;
471 default: return G_PRIORITY_DEFAULT; // QB_LOOP_MED
472 }
473 }
474
475 /*!
476 * \internal
477 * \brief Convert libqb's poll priority to rate limiting spec
478 *
479 * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
480 *
481 * \return best matching rate limiting spec
482 * \note This is the inverse of libqb's qb_ipcs_request_rate_limit().
483 */
484 static enum qb_ipcs_rate_limit
485 conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
486 {
487 switch (prio) {
488 case QB_LOOP_LOW: return QB_IPCS_RATE_SLOW;
489 case QB_LOOP_HIGH: return QB_IPCS_RATE_FAST;
490 default: return QB_IPCS_RATE_NORMAL; // QB_LOOP_MED
491 }
492 }
493
494 static int32_t
495 gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
496 void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
497 {
498 struct gio_to_qb_poll *adaptor;
499 GIOChannel *channel;
500 int32_t res = 0;
501
502 res = qb_array_index(gio_map, fd, (void **)&adaptor);
503 if (res < 0) {
504 pcmk__err("Array lookup failed for fd=%d: %d", fd, res);
505 return res;
506 }
507
508 pcmk__trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
509
510 if (add && adaptor->source) {
511 pcmk__err("Adaptor for descriptor %d is still in-use", fd);
512 return -EEXIST;
513 }
514 if (!add && !adaptor->is_used) {
515 pcmk__err("Adaptor for descriptor %d is not in-use", fd);
516 return -ENOENT;
517 }
518
519 /* channel is created with ref_count = 1 */
520 channel = g_io_channel_unix_new(fd);
521 if (!channel) {
522 pcmk__err("No memory left to add fd=%d", fd);
523 return -ENOMEM;
524 }
525
526 if (adaptor->source) {
527 g_source_remove(adaptor->source);
528 adaptor->source = 0;
529 }
530
531 /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
532 evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
533
534 adaptor->fn = fn;
535 adaptor->events = evts;
536 adaptor->data = data;
537 adaptor->p = p;
538 adaptor->is_used++;
539 adaptor->source =
540 g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
541 gio_read_socket, adaptor, gio_poll_destroy);
542
543 /* Now that mainloop now holds a reference to channel,
544 * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
545 *
546 * This means that channel will be free'd by:
547 * g_main_context_dispatch()
548 * -> g_source_destroy_internal()
549 * -> g_source_callback_unref()
550 * shortly after gio_poll_destroy() completes
551 */
552 g_io_channel_unref(channel);
553
554 pcmk__trace("Added to mainloop with gsource id=%d", adaptor->source);
555 if (adaptor->source > 0) {
556 return 0;
557 }
558
559 return -EINVAL;
560 }
561
562 static int32_t
563 gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
564 void *data, qb_ipcs_dispatch_fn_t fn)
565 {
566 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
567 }
568
569 static int32_t
570 gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
571 void *data, qb_ipcs_dispatch_fn_t fn)
572 {
573 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
574 }
575
576 static int32_t
577 gio_poll_dispatch_del(int32_t fd)
578 {
579 struct gio_to_qb_poll *adaptor;
580
581 pcmk__trace("Looking for fd=%d", fd);
582 if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
583 if (adaptor->source) {
584 g_source_remove(adaptor->source);
585 adaptor->source = 0;
586 }
587 }
588 return 0;
589 }
590
591 struct qb_ipcs_poll_handlers gio_poll_funcs = {
592 .job_add = NULL,
593 .dispatch_add = gio_poll_dispatch_add,
594 .dispatch_mod = gio_poll_dispatch_mod,
595 .dispatch_del = gio_poll_dispatch_del,
596 };
597
598 static enum qb_ipc_type
599 pick_ipc_type(enum qb_ipc_type requested)
600 {
601 const char *env = pcmk__env_option(PCMK__ENV_IPC_TYPE);
602
603 if (env && strcmp("shared-mem", env) == 0) {
604 return QB_IPC_SHM;
605 } else if (env && strcmp("socket", env) == 0) {
606 return QB_IPC_SOCKET;
607 } else if (env && strcmp("posix", env) == 0) {
608 return QB_IPC_POSIX_MQ;
609 } else if (env && strcmp("sysv", env) == 0) {
610 return QB_IPC_SYSV_MQ;
611 } else if (requested == QB_IPC_NATIVE) {
612 /* We prefer shared memory because the server never blocks on
613 * send. If part of a message fits into the socket, libqb
614 * needs to block until the remainder can be sent also.
615 * Otherwise the client will wait forever for the remaining
616 * bytes.
617 */
618 return QB_IPC_SHM;
619 }
620 return requested;
621 }
622
623 qb_ipcs_service_t *
624 mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
625 struct qb_ipcs_service_handlers *callbacks)
626 {
627 return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED);
628 }
629
630 qb_ipcs_service_t *
631 mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type,
632 struct qb_ipcs_service_handlers *callbacks,
633 enum qb_loop_priority prio)
634 {
635 int rc = 0;
636 qb_ipcs_service_t *server = NULL;
637
638 if (gio_map == NULL) {
639 gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
640 }
641
642 server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
643
644 if (server == NULL) {
645 pcmk__err("Could not create %s IPC server: %s (%d)", name,
646 pcmk_rc_str(errno), errno);
647 return NULL;
648 }
649
650 if (prio != QB_LOOP_MED) {
651 qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
652 }
653
654 // Enforce a minimum IPC buffer size on all clients
655 qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
656 qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
657
658 rc = qb_ipcs_run(server);
659 if (rc < 0) {
660 pcmk__err("Could not start %s IPC server: %s (%d)", name,
661 pcmk_strerror(rc), rc);
662 return NULL; // qb_ipcs_run() destroys server on failure
663 }
664
665 return server;
666 }
667
668 void
669 mainloop_del_ipc_server(qb_ipcs_service_t * server)
670 {
671 if (server) {
672 qb_ipcs_destroy(server);
673 }
674 }
675
676 struct mainloop_io_s {
677 char *name;
678 void *userdata;
679
680 int fd;
681 guint source;
682 crm_ipc_t *ipc;
683 GIOChannel *channel;
684
685 int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
686 int (*dispatch_fn_io) (gpointer userdata);
687 void (*destroy_fn) (gpointer userdata);
688
689 };
690
691 /*!
692 * \internal
693 * \brief I/O watch callback function (GIOFunc)
694 *
695 * \param[in] gio I/O channel being watched
696 * \param[in] condition I/O condition satisfied
697 * \param[in] data User data passed when source was created
698 *
699 * \return G_SOURCE_REMOVE to remove source, G_SOURCE_CONTINUE to keep it
700 */
701 static gboolean
702 mainloop_gio_callback(GIOChannel *gio, GIOCondition condition, gpointer data)
703 {
704 gboolean rc = G_SOURCE_CONTINUE;
705 mainloop_io_t *client = data;
706
707 pcmk__assert(client->fd == g_io_channel_unix_get_fd(gio));
708
709 if (condition & G_IO_IN) {
710 if (client->ipc) {
711 long read_rc = 0L;
712 int max = 10;
713
714 do {
715 read_rc = crm_ipc_read(client->ipc);
716 if (read_rc <= 0) {
717 pcmk__trace("Could not read IPC message from %s: %s (%ld)",
718 client->name, pcmk_strerror(read_rc), read_rc);
719
720 if (read_rc == -EAGAIN) {
721 continue;
722 }
723
724 } else if (client->dispatch_fn_ipc) {
725 const char *buffer = crm_ipc_buffer(client->ipc);
726
727 pcmk__trace("New %ld-byte IPC message from %s after I/O "
728 "condition %d",
729 read_rc, client->name, (int) condition);
730 if (client->dispatch_fn_ipc(buffer, read_rc, client->userdata) < 0) {
731 pcmk__trace("Connection to %s no longer required",
732 client->name);
733 rc = G_SOURCE_REMOVE;
734 }
735 }
736
737 pcmk__ipc_free_client_buffer(client->ipc);
738
739 } while ((rc == G_SOURCE_CONTINUE) && (--max > 0)
740 && ((read_rc > 0) || (read_rc == -EAGAIN)));
741
742 } else {
743 pcmk__trace("New I/O event for %s after I/O condition %d",
744 client->name, (int) condition);
745 if (client->dispatch_fn_io) {
746 if (client->dispatch_fn_io(client->userdata) < 0) {
747 pcmk__trace("Connection to %s no longer required",
748 client->name);
749 rc = G_SOURCE_REMOVE;
750 }
751 }
752 }
753 }
754
755 if (client->ipc && !crm_ipc_connected(client->ipc)) {
756 pcmk__err("Connection to %s closed " QB_XS " client=%p condition=%d",
757 client->name, client, condition);
758 rc = G_SOURCE_REMOVE;
759
760 } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
761 pcmk__trace("The connection %s[%p] has been closed (I/O condition=%d)",
762 client->name, client, condition);
763 rc = G_SOURCE_REMOVE;
764
765 } else if ((condition & G_IO_IN) == 0) {
766 /*
767 #define GLIB_SYSDEF_POLLIN =1
768 #define GLIB_SYSDEF_POLLPRI =2
769 #define GLIB_SYSDEF_POLLOUT =4
770 #define GLIB_SYSDEF_POLLERR =8
771 #define GLIB_SYSDEF_POLLHUP =16
772 #define GLIB_SYSDEF_POLLNVAL =32
773
774 typedef enum
775 {
776 G_IO_IN GLIB_SYSDEF_POLLIN,
777 G_IO_OUT GLIB_SYSDEF_POLLOUT,
778 G_IO_PRI GLIB_SYSDEF_POLLPRI,
779 G_IO_ERR GLIB_SYSDEF_POLLERR,
780 G_IO_HUP GLIB_SYSDEF_POLLHUP,
781 G_IO_NVAL GLIB_SYSDEF_POLLNVAL
782 } GIOCondition;
783
784 A bitwise combination representing a condition to watch for on an event source.
785
786 G_IO_IN There is data to read.
787 G_IO_OUT Data can be written (without blocking).
788 G_IO_PRI There is urgent data to read.
789 G_IO_ERR Error condition.
790 G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
791 G_IO_NVAL Invalid request. The file descriptor is not open.
792 */
793 pcmk__err("Strange condition: %d", condition);
794 }
795
796 /* G_SOURCE_REMOVE results in mainloop_gio_destroy() being called
797 * just before the source is removed from mainloop
798 */
799 return rc;
800 }
801
802 static void
803 mainloop_gio_destroy(gpointer c)
804 {
805 mainloop_io_t *client = c;
806 char *c_name = strdup(client->name);
807
808 /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
809 * client->channel will still have ref_count > 0... should be == 1
810 */
811 pcmk__trace("Destroying client %s[%p]", c_name, c);
812
813 if (client->ipc) {
814 crm_ipc_close(client->ipc);
815 }
816
817 if (client->destroy_fn) {
818 void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
819
820 client->destroy_fn = NULL;
821 destroy_fn(client->userdata);
822 }
823
824 if (client->ipc) {
825 crm_ipc_t *ipc = client->ipc;
826
827 client->ipc = NULL;
828 crm_ipc_destroy(ipc);
829 }
830
831 pcmk__trace("Destroyed client %s[%p]", c_name, c);
832
833 free(client->name); client->name = NULL;
834 free(client);
835
836 free(c_name);
837 }
838
839 /*!
840 * \brief Connect to IPC and add it as a main loop source
841 *
842 * \param[in,out] ipc IPC connection to add
843 * \param[in] priority Event source priority to use for connection
844 * \param[in] userdata Data to register with callbacks
845 * \param[in] callbacks Dispatch and destroy callbacks for connection
846 * \param[out] source Newly allocated event source
847 *
848 * \return Standard Pacemaker return code
849 *
850 * \note On failure, the caller is still responsible for ipc. On success, the
851 * caller should call mainloop_del_ipc_client() when source is no longer
852 * needed, which will lead to the disconnection of the IPC later in the
853 * main loop if it is connected. However the IPC disconnects,
854 * mainloop_gio_destroy() will free ipc and source after calling the
855 * destroy callback.
856 */
857 int
858 pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata,
859 const struct ipc_client_callbacks *callbacks,
860 mainloop_io_t **source)
861 {
862 int rc = pcmk_rc_ok;
863 int fd = -1;
864 const char *ipc_name = NULL;
865
866 CRM_CHECK((ipc != NULL) && (callbacks != NULL), return EINVAL);
867
868 ipc_name = pcmk__s(crm_ipc_name(ipc), "Pacemaker");
869 rc = pcmk__connect_generic_ipc(ipc);
870 if (rc != pcmk_rc_ok) {
871 pcmk__debug("Connection to %s failed: %s", ipc_name, pcmk_rc_str(rc));
872 return rc;
873 }
874
875 rc = pcmk__ipc_fd(ipc, &fd);
876 if (rc != pcmk_rc_ok) {
877 pcmk__debug("Could not obtain file descriptor for %s IPC: %s", ipc_name,
878 pcmk_rc_str(rc));
879 crm_ipc_close(ipc);
880 return rc;
881 }
882
883 *source = mainloop_add_fd(ipc_name, priority, fd, userdata, NULL);
884 if (*source == NULL) {
885 rc = errno;
886 crm_ipc_close(ipc);
887 return rc;
888 }
889
890 (*source)->ipc = ipc;
891 (*source)->destroy_fn = callbacks->destroy;
892 (*source)->dispatch_fn_ipc = callbacks->dispatch;
893 return pcmk_rc_ok;
894 }
895
896 /*!
897 * \brief Get period for mainloop timer
898 *
899 * \param[in] timer Timer
900 *
901 * \return Period in ms
902 */
903 guint
904 pcmk__mainloop_timer_get_period(const mainloop_timer_t *timer)
905 {
906 if (timer) {
907 return timer->period_ms;
908 }
909 return 0;
910 }
911
912 mainloop_io_t *
913 mainloop_add_ipc_client(const char *name, int priority, size_t max_size,
914 void *userdata, struct ipc_client_callbacks *callbacks)
915 {
916 crm_ipc_t *ipc = crm_ipc_new(name, 0);
917 mainloop_io_t *source = NULL;
918 int rc = pcmk__add_mainloop_ipc(ipc, priority, userdata, callbacks,
919 &source);
920
921 if (rc != pcmk_rc_ok) {
922 if (crm_log_level == PCMK__LOG_STDOUT) {
923 fprintf(stderr, "Connection to %s failed: %s",
924 name, pcmk_rc_str(rc));
925 }
926 crm_ipc_destroy(ipc);
927 if (rc > 0) {
928 errno = rc;
929 } else {
930 errno = ENOTCONN;
931 }
932 return NULL;
933 }
934 return source;
935 }
936
937 void
938 mainloop_del_ipc_client(mainloop_io_t * client)
939 {
940 mainloop_del_fd(client);
941 }
942
943 crm_ipc_t *
944 mainloop_get_ipc_client(mainloop_io_t * client)
945 {
946 if (client) {
947 return client->ipc;
948 }
949 return NULL;
950 }
951
952 mainloop_io_t *
953 mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
954 struct mainloop_fd_callbacks * callbacks)
955 {
956 mainloop_io_t *client = NULL;
957
958 if (fd >= 0) {
959 client = calloc(1, sizeof(mainloop_io_t));
960 if (client == NULL) {
961 return NULL;
962 }
963 client->name = strdup(name);
964 client->userdata = userdata;
965
966 if (callbacks) {
967 client->destroy_fn = callbacks->destroy;
968 client->dispatch_fn_io = callbacks->dispatch;
969 }
970
971 client->fd = fd;
972 client->channel = g_io_channel_unix_new(fd);
973 client->source =
974 g_io_add_watch_full(client->channel, priority,
975 (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
976 client, mainloop_gio_destroy);
977
978 /* Now that mainloop now holds a reference to channel,
979 * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
980 *
981 * This means that channel will be free'd by:
982 * g_main_context_dispatch() or g_source_remove()
983 * -> g_source_destroy_internal()
984 * -> g_source_callback_unref()
985 * shortly after mainloop_gio_destroy() completes
986 */
987 g_io_channel_unref(client->channel);
988 pcmk__trace("Added connection %d for %s[%p].%d", client->source,
989 client->name, client, fd);
990 } else {
991 errno = EINVAL;
992 }
993
994 return client;
995 }
996
997 void
998 mainloop_del_fd(mainloop_io_t *client)
999 {
1000 if ((client == NULL) || (client->source == 0)) {
1001 return;
1002 }
1003
1004 pcmk__trace("Removing client %s[%p]", client->name, client);
1005
1006 // mainloop_gio_destroy() gets called during source removal
1007 g_source_remove(client->source);
1008 }
1009
1010 static GList *child_list = NULL;
1011
1012 pid_t
1013 mainloop_child_pid(mainloop_child_t * child)
1014 {
1015 return child->pid;
1016 }
1017
1018 const char *
1019 mainloop_child_name(mainloop_child_t * child)
1020 {
1021 return child->desc;
1022 }
1023
1024 int
1025 mainloop_child_timeout(mainloop_child_t * child)
1026 {
1027 return child->timeout;
1028 }
1029
1030 void *
1031 mainloop_child_userdata(mainloop_child_t * child)
1032 {
1033 return child->privatedata;
1034 }
1035
1036 void
1037 mainloop_clear_child_userdata(mainloop_child_t * child)
1038 {
1039 child->privatedata = NULL;
1040 }
1041
1042 /* good function name */
1043 static void
1044 child_free(mainloop_child_t *child)
1045 {
1046 if (child->timerid != 0) {
1047 pcmk__trace("Removing timer %d", child->timerid);
1048 g_source_remove(child->timerid);
1049 child->timerid = 0;
1050 }
1051 free(child->desc);
1052 free(child);
1053 }
1054
1055 /* terrible function name */
1056 static int
1057 child_kill_helper(mainloop_child_t *child)
1058 {
1059 int rc;
1060 if (child->flags & mainloop_leave_pid_group) {
1061 pcmk__debug("Killing PID %lld only. Leaving its process group intact.",
1062 (long long) child->pid);
1063 rc = kill(child->pid, SIGKILL);
1064 } else {
1065 pcmk__debug("Killing PID %lld's entire process group",
1066 (long long) child->pid);
1067 rc = kill(-child->pid, SIGKILL);
1068 }
1069
1070 if (rc < 0) {
1071 if (errno != ESRCH) {
1072 pcmk__err("kill(%d, KILL) failed: %s", child->pid, strerror(errno));
1073 }
1074 return -errno;
1075 }
1076 return 0;
1077 }
1078
1079 static gboolean
1080 child_timeout_callback(gpointer p)
1081 {
1082 mainloop_child_t *child = p;
1083 int rc = 0;
1084
1085 child->timerid = 0;
1086 if (child->timeout) {
1087 pcmk__warn("%s process (PID %lld) will not die!", child->desc,
1088 (long long) child->pid);
1089 return FALSE;
1090 }
1091
1092 rc = child_kill_helper(child);
1093 if (rc == -ESRCH) {
1094 /* Nothing left to do. pid doesn't exist */
1095 return FALSE;
1096 }
1097
1098 child->timeout = TRUE;
1099 pcmk__debug("%s process (PID %lld) timed out", child->desc,
1100 (long long) child->pid);
1101
1102 child->timerid = pcmk__create_timer(5000, child_timeout_callback, child);
1103 return FALSE;
1104 }
1105
1106 static bool
1107 child_waitpid(mainloop_child_t *child, int flags)
1108 {
1109 int rc = 0;
1110 int core = 0;
1111 int signo = 0;
1112 int status = 0;
1113 int exitcode = 0;
1114 bool callback_needed = true;
1115
1116 rc = waitpid(child->pid, &status, flags);
1117 if (rc == 0) { // WNOHANG in flags, and child status is not available
1118 pcmk__trace("Child process %lld (%s) still active",
1119 (long long) child->pid, child->desc);
1120 callback_needed = false;
1121
1122 } else if (rc != child->pid) {
1123 /* According to POSIX, possible conditions:
1124 * - child->pid was non-positive (process group or any child),
1125 * and rc is specific child
1126 * - errno ECHILD (pid does not exist or is not child)
1127 * - errno EINVAL (invalid flags)
1128 * - errno EINTR (caller interrupted by signal)
1129 *
1130 * @TODO Handle these cases more specifically.
1131 */
1132 signo = SIGCHLD;
1133 exitcode = 1;
1134 pcmk__notice("Wait for child process %d (%s) interrupted: %s",
1135 child->pid, child->desc, pcmk_rc_str(errno));
1136
1137 } else if (WIFEXITED(status)) {
1138 exitcode = WEXITSTATUS(status);
1139 pcmk__trace("Child process %lld (%s) exited with status %d",
1140 (long long) child->pid, child->desc, exitcode);
1141
1142 } else if (WIFSIGNALED(status)) {
1143 signo = WTERMSIG(status);
1144 pcmk__trace("Child process %lld (%s) exited with signal %d (%s)",
1145 (long long) child->pid, child->desc, signo,
1146 strsignal(signo));
1147
1148 #ifdef WCOREDUMP // AIX, SunOS, maybe others
1149 } else if (WCOREDUMP(status)) {
1150 core = 1;
1151 pcmk__err("Child process %d (%s) dumped core", child->pid, child->desc);
1152 #endif
1153
1154 } else { // flags must contain WUNTRACED and/or WCONTINUED to reach this
1155 pcmk__trace("Child process %lld (%s) stopped or continued",
1156 (long long) child->pid, child->desc);
1157 callback_needed = false;
1158 }
1159
1160 if (callback_needed && child->exit_fn) {
1161 child->exit_fn(child, core, signo, exitcode);
1162 }
1163 return callback_needed;
1164 }
1165
1166 static void
1167 child_death_dispatch(int signal)
1168 {
1169 for (GList *iter = child_list; iter; ) {
1170 GList *saved = iter;
1171 mainloop_child_t *child = iter->data;
1172
1173 iter = iter->next;
1174 if (child_waitpid(child, WNOHANG)) {
1175 pcmk__trace("Removing completed process %lld from child list",
1176 (long long) child->pid);
1177 child_list = g_list_remove_link(child_list, saved);
1178 g_list_free(saved);
1179 child_free(child);
1180 }
1181 }
1182 }
1183
1184 static gboolean
1185 child_signal_init(gpointer p)
1186 {
1187 pcmk__trace("Installed SIGCHLD handler");
1188 /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
1189 mainloop_add_signal(SIGCHLD, child_death_dispatch);
1190
1191 /* In case they terminated before the signal handler was installed */
1192 child_death_dispatch(SIGCHLD);
1193 return FALSE;
1194 }
1195
1196 gboolean
1197 mainloop_child_kill(pid_t pid)
1198 {
1199 GList *iter;
1200 mainloop_child_t *child = NULL;
1201 mainloop_child_t *match = NULL;
1202 /* It is impossible to block SIGKILL, this allows us to
1203 * call waitpid without WNOHANG flag.*/
1204 int waitflags = 0, rc = 0;
1205
1206 for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1207 child = iter->data;
1208 if (pid == child->pid) {
1209 match = child;
1210 }
1211 }
1212
1213 if (match == NULL) {
1214 return FALSE;
1215 }
1216
1217 rc = child_kill_helper(match);
1218 if(rc == -ESRCH) {
1219 /* It's gone, but hasn't shown up in waitpid() yet. Wait until we get
1220 * SIGCHLD and let handler clean it up as normal (so we get the correct
1221 * return code/status). The blocking alternative would be to call
1222 * child_waitpid(match, 0).
1223 */
1224 pcmk__trace("Waiting for signal that child process %lld completed",
1225 (long long) match->pid);
1226 return TRUE;
1227
1228 } else if(rc != 0) {
1229 /* If KILL for some other reason set the WNOHANG flag since we
1230 * can't be certain what happened.
1231 */
1232 waitflags = WNOHANG;
1233 }
1234
1235 if (!child_waitpid(match, waitflags)) {
1236 /* not much we can do if this occurs */
1237 return FALSE;
1238 }
1239
1240 child_list = g_list_remove(child_list, match);
1241 child_free(match);
1242 return TRUE;
1243 }
1244
1245 /* Create/Log a new tracked process
1246 * To track a process group, use -pid
1247 *
1248 * @TODO Using a non-positive pid (i.e. any child, or process group) would
1249 * likely not be useful since we will free the child after the first
1250 * completed process.
1251 */
1252 void
1253 mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc,
1254 void *privatedata,
1255 enum mainloop_child_flags flags,
1256 pcmk__mainloop_child_exit_fn_t exit_fn)
1257 {
1258 static bool need_init = TRUE;
1259 mainloop_child_t *child = pcmk__assert_alloc(1, sizeof(mainloop_child_t));
1260
1261 child->pid = pid;
1262 child->timerid = 0;
1263 child->timeout = FALSE;
1264 child->privatedata = privatedata;
1265 child->exit_fn = exit_fn;
1266 child->flags = flags;
1267 child->desc = pcmk__str_copy(desc);
1268
1269 if (timeout) {
1270 child->timerid = pcmk__create_timer(timeout, child_timeout_callback, child);
1271 }
1272
1273 child_list = g_list_append(child_list, child);
1274
1275 if(need_init) {
1276 need_init = FALSE;
1277 /* SIGCHLD processing has to be invoked from mainloop.
1278 * We do not want it to be possible to both add a child pid
1279 * to mainloop, and have the pid's exit callback invoked within
1280 * the same callstack. */
1281 pcmk__create_timer(1, child_signal_init, NULL);
1282 }
1283 }
1284
1285 void
1286 mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1287 pcmk__mainloop_child_exit_fn_t exit_fn)
1288 {
1289 mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, exit_fn);
1290 }
1291
1292 static gboolean
1293 mainloop_timer_cb(gpointer user_data)
1294 {
1295 int id = 0;
1296 bool repeat = FALSE;
1297 struct mainloop_timer_s *t = user_data;
1298
1299 pcmk__assert(t != NULL);
1300
1301 id = t->id;
1302 t->id = 0; /* Ensure it's unset during callbacks so that
1303 * mainloop_timer_running() works as expected
1304 */
1305
1306 if(t->cb) {
1307 pcmk__trace("Invoking callbacks for timer %s", t->name);
1308 repeat = t->repeat;
1309 if(t->cb(t->userdata) == FALSE) {
1310 pcmk__trace("Timer %s complete", t->name);
1311 repeat = FALSE;
1312 }
1313 }
1314
1315 if(repeat) {
1316 /* Restore if repeating */
1317 t->id = id;
1318 }
1319
1320 return repeat;
1321 }
1322
1323 bool
1324 mainloop_timer_running(mainloop_timer_t *t)
1325 {
1326 if(t && t->id != 0) {
1327 return TRUE;
1328 }
1329 return FALSE;
1330 }
1331
1332 void
1333 mainloop_timer_start(mainloop_timer_t *t)
1334 {
1335 mainloop_timer_stop(t);
1336 if(t && t->period_ms > 0) {
1337 pcmk__trace("Starting timer %s", t->name);
1338 t->id = pcmk__create_timer(t->period_ms, mainloop_timer_cb, t);
1339 }
1340 }
1341
1342 void
1343 mainloop_timer_stop(mainloop_timer_t *t)
1344 {
1345 if(t && t->id != 0) {
1346 pcmk__trace("Stopping timer %s", t->name);
1347 g_source_remove(t->id);
1348 t->id = 0;
1349 }
1350 }
1351
1352 guint
1353 mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
1354 {
1355 guint last = 0;
1356
1357 if(t) {
1358 last = t->period_ms;
1359 t->period_ms = period_ms;
1360 }
1361
1362 if(t && t->id != 0 && last != t->period_ms) {
1363 mainloop_timer_start(t);
1364 }
1365 return last;
1366 }
1367
1368 mainloop_timer_t *
1369 mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1370 {
|
(1) Event alloc_fn: |
Storage is returned from allocation function "pcmk__assert_alloc_as". [details] |
|
(2) Event assign: |
Assigning: "t" = "pcmk__assert_alloc_as("mainloop.c", <anonymous>, 1371U, 1UL, 40UL)". |
| Also see events: |
[return_alloc] |
1371 mainloop_timer_t *t = pcmk__assert_alloc(1, sizeof(mainloop_timer_t));
1372
|
(3) Event path: |
Condition "name != NULL", taking true branch. |
1373 if (name != NULL) {
1374 t->name = pcmk__assert_asprintf("%s-%u-%d", name, period_ms, repeat);
|
(4) Event path: |
Falling through to end of if statement. |
1375 } else {
1376 t->name = pcmk__assert_asprintf("%p-%u-%d", t, period_ms, repeat);
1377 }
1378 t->id = 0;
1379 t->period_ms = period_ms;
1380 t->repeat = repeat;
1381 t->cb = cb;
1382 t->userdata = userdata;
|
(5) Event path: |
Switch case default. |
|
(6) Event path: |
Condition "trace_cs == NULL", taking true branch. |
|
(7) Event path: |
Condition "crm_is_callsite_active(trace_cs, _level, 0)", taking false branch. |
|
(8) Event path: |
Breaking from switch. |
1383 pcmk__trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
|
(9) Event return_alloc: |
Returning allocated memory "t". |
| Also see events: |
[alloc_fn][assign] |
1384 return t;
1385 }
1386
1387 void
1388 mainloop_timer_del(mainloop_timer_t *t)
1389 {
1390 if(t) {
1391 pcmk__trace("Destroying timer %s", t->name);
1392 mainloop_timer_stop(t);
1393 free(t->name);
1394 free(t);
1395 }
1396 }
1397
1398 /*
1399 * Helpers to make sure certain events aren't lost at shutdown
1400 */
1401
1402 static gboolean
1403 drain_timeout_cb(gpointer user_data)
1404 {
1405 bool *timeout_popped = (bool*) user_data;
1406
1407 *timeout_popped = TRUE;
1408 return FALSE;
1409 }
1410
1411 /*!
1412 * \brief Drain some remaining main loop events then quit it
1413 *
1414 * \param[in,out] mloop Main loop to drain and quit
1415 * \param[in] n Drain up to this many pending events
1416 */
1417 void
1418 pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n)
1419 {
1420 if ((mloop != NULL) && g_main_loop_is_running(mloop)) {
1421 GMainContext *ctx = g_main_loop_get_context(mloop);
1422
1423 /* Drain up to n events in case some memory clean-up is pending
1424 * (helpful to reduce noise in valgrind output).
1425 */
1426 for (int i = 0; (i < n) && g_main_context_pending(ctx); ++i) {
1427 g_main_context_dispatch(ctx);
1428 }
1429 g_main_loop_quit(mloop);
1430 }
1431 }
1432
1433 /*!
1434 * \brief Process main loop events while a certain condition is met
1435 *
1436 * \param[in,out] mloop Main loop to process
1437 * \param[in] timer_ms Don't process longer than this amount of time
1438 * \param[in] check Function that returns true if events should be
1439 * processed
1440 *
1441 * \note This function is intended to be called at shutdown if certain important
1442 * events should not be missed. The caller would likely quit the main loop
1443 * or exit after calling this function. The check() function will be
1444 * passed the remaining timeout in milliseconds.
1445 */
1446 void
1447 pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
1448 {
1449 bool timeout_popped = FALSE;
1450 guint timer = 0;
1451 GMainContext *ctx = NULL;
1452
1453 CRM_CHECK(mloop && check, return);
1454
1455 ctx = g_main_loop_get_context(mloop);
1456 if (ctx) {
1457 time_t start_time = time(NULL);
1458
1459 timer = pcmk__create_timer(timer_ms, drain_timeout_cb, &timeout_popped);
1460 while (!timeout_popped
1461 && check(timer_ms - (time(NULL) - start_time) * 1000)) {
1462 g_main_context_iteration(ctx, TRUE);
1463 }
1464 }
1465 if (!timeout_popped && (timer > 0)) {
1466 g_source_remove(timer);
1467 }
1468 }
1469