1 /*
2 * Copyright 2004-2024 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
10 #include <crm_internal.h>
11
12 #ifndef _GNU_SOURCE
13 # define _GNU_SOURCE
14 #endif
15
16 #include <stdlib.h>
17 #include <string.h>
18 #include <signal.h>
19 #include <errno.h>
20
21 #include <sys/wait.h>
22
23 #include <crm/crm.h>
24 #include <crm/common/xml.h>
25 #include <crm/common/mainloop.h>
26 #include <crm/common/ipc_internal.h>
27
28 #include <qb/qbarray.h>
29
30 struct mainloop_child_s {
31 pid_t pid;
32 char *desc;
33 unsigned timerid;
34 gboolean timeout;
35 void *privatedata;
36
37 enum mainloop_child_flags flags;
38
39 /* Called when a process dies */
40 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
41 };
42
43 struct trigger_s {
44 GSource source;
45 gboolean running;
46 gboolean trigger;
47 void *user_data;
48 guint id;
49
50 };
51
52 struct mainloop_timer_s {
53 guint id;
54 guint period_ms;
55 bool repeat;
56 char *name;
57 GSourceFunc cb;
58 void *userdata;
59 };
60
61 static gboolean
62 crm_trigger_prepare(GSource * source, gint * timeout)
63 {
64 crm_trigger_t *trig = (crm_trigger_t *) source;
65
66 /* cluster-glue's FD and IPC related sources make use of
67 * g_source_add_poll() but do not set a timeout in their prepare
68 * functions
69 *
70 * This means mainloop's poll() will block until an event for one
71 * of these sources occurs - any /other/ type of source, such as
72 * this one or g_idle_*, that doesn't use g_source_add_poll() is
73 * S-O-L and won't be processed until there is something fd-based
74 * happens.
75 *
76 * Luckily the timeout we can set here affects all sources and
77 * puts an upper limit on how long poll() can take.
78 *
79 * So unconditionally set a small-ish timeout, not too small that
80 * we're in constant motion, which will act as an upper bound on
81 * how long the signal handling might be delayed for.
82 */
83 *timeout = 500; /* Timeout in ms */
84
85 return trig->trigger;
86 }
87
88 static gboolean
89 crm_trigger_check(GSource * source)
90 {
91 crm_trigger_t *trig = (crm_trigger_t *) source;
92
93 return trig->trigger;
94 }
95
96 /*!
97 * \internal
98 * \brief GSource dispatch function for crm_trigger_t
99 *
100 * \param[in] source crm_trigger_t being dispatched
101 * \param[in] callback Callback passed at source creation
102 * \param[in,out] userdata User data passed at source creation
103 *
104 * \return G_SOURCE_REMOVE to remove source, G_SOURCE_CONTINUE to keep it
105 */
106 static gboolean
107 crm_trigger_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
108 {
109 gboolean rc = G_SOURCE_CONTINUE;
110 crm_trigger_t *trig = (crm_trigger_t *) source;
111
112 if (trig->running) {
113 /* Wait until the existing job is complete before starting the next one */
114 return G_SOURCE_CONTINUE;
115 }
116 trig->trigger = FALSE;
117
118 if (callback) {
119 int callback_rc = callback(trig->user_data);
120
121 if (callback_rc < 0) {
122 crm_trace("Trigger handler %p not yet complete", trig);
123 trig->running = TRUE;
124 } else if (callback_rc == 0) {
125 rc = G_SOURCE_REMOVE;
126 }
127 }
128 return rc;
129 }
130
131 static void
132 crm_trigger_finalize(GSource * source)
133 {
134 crm_trace("Trigger %p destroyed", source);
135 }
136
137 static GSourceFuncs crm_trigger_funcs = {
138 crm_trigger_prepare,
139 crm_trigger_check,
140 crm_trigger_dispatch,
141 crm_trigger_finalize,
142 };
143
144 static crm_trigger_t *
145 mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
146 gpointer userdata)
147 {
148 crm_trigger_t *trigger = NULL;
149
150 trigger = (crm_trigger_t *) source;
151
152 trigger->id = 0;
153 trigger->trigger = FALSE;
154 trigger->user_data = userdata;
155
156 if (dispatch) {
157 g_source_set_callback(source, dispatch, trigger, NULL);
158 }
159
160 g_source_set_priority(source, priority);
161 g_source_set_can_recurse(source, FALSE);
162
163 trigger->id = g_source_attach(source, NULL);
164 return trigger;
165 }
166
167 void
168 mainloop_trigger_complete(crm_trigger_t * trig)
169 {
170 crm_trace("Trigger handler %p complete", trig);
171 trig->running = FALSE;
172 }
173
174 /*!
175 * \brief Create a trigger to be used as a mainloop source
176 *
177 * \param[in] priority Relative priority of source (lower number is higher priority)
178 * \param[in] dispatch Trigger dispatch function (should return 0 to remove the
179 * trigger from the mainloop, -1 if the trigger should be
180 * kept but the job is still running and not complete, and
181 * 1 if the trigger should be kept and the job is complete)
182 * \param[in] userdata Pointer to pass to \p dispatch
183 *
184 * \return Newly allocated mainloop source for trigger
185 */
186 crm_trigger_t *
187 mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data),
188 gpointer userdata)
189 {
190 GSource *source = NULL;
191
192 pcmk__assert(sizeof(crm_trigger_t) > sizeof(GSource));
193 source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
194
195 return mainloop_setup_trigger(source, priority, dispatch, userdata);
196 }
197
198 void
199 mainloop_set_trigger(crm_trigger_t * source)
200 {
201 if(source) {
202 source->trigger = TRUE;
203 }
204 }
205
206 gboolean
207 mainloop_destroy_trigger(crm_trigger_t * source)
208 {
209 GSource *gs = NULL;
210
211 if(source == NULL) {
212 return TRUE;
213 }
214
215 gs = (GSource *)source;
216
217 g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
218 g_source_unref(gs); /* The caller no longer carries a reference to source
219 *
220 * At this point the source should be free'd,
221 * unless we're currently processing said
222 * source, in which case mainloop holds an
223 * additional reference and it will be free'd
224 * once our processing completes
225 */
226 return TRUE;
227 }
228
229 // Define a custom glib source for signal handling
230
231 // Data structure for custom glib source
232 typedef struct signal_s {
233 crm_trigger_t trigger; // trigger that invoked source (must be first)
234 void (*handler) (int sig); // signal handler
235 int signal; // signal that was received
236 } crm_signal_t;
237
238 // Table to associate signal handlers with signal numbers
239 static crm_signal_t *crm_signals[NSIG];
240
241 /*!
242 * \internal
243 * \brief Dispatch an event from custom glib source for signals
244 *
245 * Given an signal event, clear the event trigger and call any registered
246 * signal handler.
247 *
248 * \param[in] source glib source that triggered this dispatch
249 * \param[in] callback (ignored)
250 * \param[in] userdata (ignored)
251 */
252 static gboolean
253 crm_signal_dispatch(GSource *source, GSourceFunc callback, gpointer userdata)
254 {
255 crm_signal_t *sig = (crm_signal_t *) source;
256
257 if(sig->signal != SIGCHLD) {
258 crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)",
259 strsignal(sig->signal), sig->signal,
260 (sig->handler? "invoking" : "no"));
261 }
262
263 sig->trigger.trigger = FALSE;
264 if (sig->handler) {
265 sig->handler(sig->signal);
266 }
267 return TRUE;
268 }
269
270 /*!
271 * \internal
272 * \brief Handle a signal by setting a trigger for signal source
273 *
274 * \param[in] sig Signal number that was received
275 *
276 * \note This is the true signal handler for the mainloop signal source, and
277 * must be async-safe.
278 */
279 static void
280 mainloop_signal_handler(int sig)
281 {
282 if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
283 mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
284 }
285 }
286
287 // Functions implementing our custom glib source for signal handling
288 static GSourceFuncs crm_signal_funcs = {
289 crm_trigger_prepare,
290 crm_trigger_check,
291 crm_signal_dispatch,
292 crm_trigger_finalize,
293 };
294
295 /*!
296 * \internal
297 * \brief Set a true signal handler
298 *
299 * signal()-like interface to sigaction()
300 *
301 * \param[in] sig Signal number to register handler for
302 * \param[in] dispatch Signal handler
303 *
304 * \return The previous value of the signal handler, or SIG_ERR on error
305 * \note The dispatch function must be async-safe.
306 */
307 sighandler_t
308 crm_signal_handler(int sig, sighandler_t dispatch)
309 {
310 sigset_t mask;
311 struct sigaction sa;
312 struct sigaction old;
313
314 if (sigemptyset(&mask) < 0) {
315 crm_err("Could not set handler for signal %d: %s",
316 sig, pcmk_rc_str(errno));
317 return SIG_ERR;
318 }
319
320 memset(&sa, 0, sizeof(struct sigaction));
321 sa.sa_handler = dispatch;
322 sa.sa_flags = SA_RESTART;
323 sa.sa_mask = mask;
324
325 if (sigaction(sig, &sa, &old) < 0) {
326 crm_err("Could not set handler for signal %d: %s",
327 sig, pcmk_rc_str(errno));
328 return SIG_ERR;
329 }
330 return old.sa_handler;
331 }
332
333 static void
334 mainloop_destroy_signal_entry(int sig)
335 {
336 crm_signal_t *tmp = crm_signals[sig];
337
338 if (tmp != NULL) {
339 crm_signals[sig] = NULL;
340 crm_trace("Unregistering mainloop handler for signal %d", sig);
341 mainloop_destroy_trigger((crm_trigger_t *) tmp);
342 }
343 }
344
345 /*!
346 * \internal
347 * \brief Add a signal handler to a mainloop
348 *
349 * \param[in] sig Signal number to handle
350 * \param[in] dispatch Signal handler function
351 *
352 * \note The true signal handler merely sets a mainloop trigger to call this
353 * dispatch function via the mainloop. Therefore, the dispatch function
354 * does not need to be async-safe.
355 */
356 gboolean
357 mainloop_add_signal(int sig, void (*dispatch) (int sig))
358 {
359 GSource *source = NULL;
360 int priority = G_PRIORITY_HIGH - 1;
361
362 if (sig == SIGTERM) {
363 /* TERM is higher priority than other signals,
364 * signals are higher priority than other ipc.
365 * Yes, minus: smaller is "higher"
366 */
367 priority--;
368 }
369
370 if (sig >= NSIG || sig < 0) {
371 crm_err("Signal %d is out of range", sig);
372 return FALSE;
373
374 } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
375 crm_trace("Signal handler for %d is already installed", sig);
376 return TRUE;
377
378 } else if (crm_signals[sig] != NULL) {
379 crm_err("Different signal handler for %d is already installed", sig);
380 return FALSE;
381 }
382
383 pcmk__assert(sizeof(crm_signal_t) > sizeof(GSource));
384 source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
385
386 crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
387 pcmk__assert(crm_signals[sig] != NULL);
388
389 crm_signals[sig]->handler = dispatch;
390 crm_signals[sig]->signal = sig;
391
392 if (crm_signal_handler(sig, mainloop_signal_handler) == SIG_ERR) {
393 mainloop_destroy_signal_entry(sig);
394 return FALSE;
395 }
396
397 return TRUE;
398 }
399
400 gboolean
401 mainloop_destroy_signal(int sig)
402 {
403 if (sig >= NSIG || sig < 0) {
404 crm_err("Signal %d is out of range", sig);
405 return FALSE;
406
407 } else if (crm_signal_handler(sig, NULL) == SIG_ERR) {
408 crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
409 return FALSE;
410
411 } else if (crm_signals[sig] == NULL) {
412 return TRUE;
413 }
414 mainloop_destroy_signal_entry(sig);
415 return TRUE;
416 }
417
418 static qb_array_t *gio_map = NULL;
419
420 void
421 mainloop_cleanup(void)
422 {
423 if (gio_map != NULL) {
424 qb_array_free(gio_map);
425 gio_map = NULL;
426 }
427
428 for (int sig = 0; sig < NSIG; ++sig) {
429 mainloop_destroy_signal_entry(sig);
430 }
431 }
432
433 /*
434 * libqb...
435 */
436 struct gio_to_qb_poll {
437 int32_t is_used;
438 guint source;
439 int32_t events;
440 void *data;
441 qb_ipcs_dispatch_fn_t fn;
442 enum qb_loop_priority p;
443 };
444
445 static gboolean
446 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
447 {
448 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
449 gint fd = g_io_channel_unix_get_fd(gio);
450
451 crm_trace("%p.%d %d", data, fd, condition);
452
453 /* if this assert get's hit, then there is a race condition between
454 * when we destroy a fd and when mainloop actually gives it up */
455 pcmk__assert(adaptor->is_used > 0);
456
457 return (adaptor->fn(fd, condition, adaptor->data) == 0);
458 }
459
460 static void
461 gio_poll_destroy(gpointer data)
462 {
463 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
464
465 adaptor->is_used--;
466 pcmk__assert(adaptor->is_used >= 0);
467
468 if (adaptor->is_used == 0) {
469 crm_trace("Marking adaptor %p unused", adaptor);
470 adaptor->source = 0;
471 }
472 }
473
474 /*!
475 * \internal
476 * \brief Convert libqb's poll priority into GLib's one
477 *
478 * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
479 *
480 * \return best matching GLib's priority
481 */
482 static gint
483 conv_prio_libqb2glib(enum qb_loop_priority prio)
484 {
485 switch (prio) {
486 case QB_LOOP_LOW: return G_PRIORITY_LOW;
487 case QB_LOOP_HIGH: return G_PRIORITY_HIGH;
488 default: return G_PRIORITY_DEFAULT; // QB_LOOP_MED
489 }
490 }
491
492 /*!
493 * \internal
494 * \brief Convert libqb's poll priority to rate limiting spec
495 *
496 * \param[in] prio libqb's poll priority (#QB_LOOP_MED assumed as fallback)
497 *
498 * \return best matching rate limiting spec
499 * \note This is the inverse of libqb's qb_ipcs_request_rate_limit().
500 */
501 static enum qb_ipcs_rate_limit
502 conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
503 {
504 switch (prio) {
505 case QB_LOOP_LOW: return QB_IPCS_RATE_SLOW;
506 case QB_LOOP_HIGH: return QB_IPCS_RATE_FAST;
507 default: return QB_IPCS_RATE_NORMAL; // QB_LOOP_MED
508 }
509 }
510
511 static int32_t
512 gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
513 void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
514 {
515 struct gio_to_qb_poll *adaptor;
516 GIOChannel *channel;
517 int32_t res = 0;
518
519 res = qb_array_index(gio_map, fd, (void **)&adaptor);
520 if (res < 0) {
521 crm_err("Array lookup failed for fd=%d: %d", fd, res);
522 return res;
523 }
524
525 crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
526
527 if (add && adaptor->source) {
528 crm_err("Adaptor for descriptor %d is still in-use", fd);
529 return -EEXIST;
530 }
531 if (!add && !adaptor->is_used) {
532 crm_err("Adaptor for descriptor %d is not in-use", fd);
533 return -ENOENT;
534 }
535
536 /* channel is created with ref_count = 1 */
537 channel = g_io_channel_unix_new(fd);
538 if (!channel) {
539 crm_err("No memory left to add fd=%d", fd);
540 return -ENOMEM;
541 }
542
543 if (adaptor->source) {
544 g_source_remove(adaptor->source);
545 adaptor->source = 0;
546 }
547
548 /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
549 evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
550
551 adaptor->fn = fn;
552 adaptor->events = evts;
553 adaptor->data = data;
554 adaptor->p = p;
555 adaptor->is_used++;
556 adaptor->source =
557 g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
558 gio_read_socket, adaptor, gio_poll_destroy);
559
560 /* Now that mainloop now holds a reference to channel,
561 * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
562 *
563 * This means that channel will be free'd by:
564 * g_main_context_dispatch()
565 * -> g_source_destroy_internal()
566 * -> g_source_callback_unref()
567 * shortly after gio_poll_destroy() completes
568 */
569 g_io_channel_unref(channel);
570
571 crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
572 if (adaptor->source > 0) {
573 return 0;
574 }
575
576 return -EINVAL;
577 }
578
579 static int32_t
580 gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
581 void *data, qb_ipcs_dispatch_fn_t fn)
582 {
583 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
584 }
585
586 static int32_t
587 gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
588 void *data, qb_ipcs_dispatch_fn_t fn)
589 {
590 return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
591 }
592
593 static int32_t
594 gio_poll_dispatch_del(int32_t fd)
595 {
596 struct gio_to_qb_poll *adaptor;
597
598 crm_trace("Looking for fd=%d", fd);
599 if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
600 if (adaptor->source) {
601 g_source_remove(adaptor->source);
602 adaptor->source = 0;
603 }
604 }
605 return 0;
606 }
607
608 struct qb_ipcs_poll_handlers gio_poll_funcs = {
609 .job_add = NULL,
610 .dispatch_add = gio_poll_dispatch_add,
611 .dispatch_mod = gio_poll_dispatch_mod,
612 .dispatch_del = gio_poll_dispatch_del,
613 };
614
615 static enum qb_ipc_type
616 pick_ipc_type(enum qb_ipc_type requested)
617 {
618 const char *env = pcmk__env_option(PCMK__ENV_IPC_TYPE);
619
620 if (env && strcmp("shared-mem", env) == 0) {
621 return QB_IPC_SHM;
622 } else if (env && strcmp("socket", env) == 0) {
623 return QB_IPC_SOCKET;
624 } else if (env && strcmp("posix", env) == 0) {
625 return QB_IPC_POSIX_MQ;
626 } else if (env && strcmp("sysv", env) == 0) {
627 return QB_IPC_SYSV_MQ;
628 } else if (requested == QB_IPC_NATIVE) {
629 /* We prefer shared memory because the server never blocks on
630 * send. If part of a message fits into the socket, libqb
631 * needs to block until the remainder can be sent also.
632 * Otherwise the client will wait forever for the remaining
633 * bytes.
634 */
635 return QB_IPC_SHM;
636 }
637 return requested;
638 }
639
640 qb_ipcs_service_t *
641 mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
642 struct qb_ipcs_service_handlers *callbacks)
643 {
644 return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED);
645 }
646
647 qb_ipcs_service_t *
648 mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type,
649 struct qb_ipcs_service_handlers *callbacks,
650 enum qb_loop_priority prio)
651 {
652 int rc = 0;
653 qb_ipcs_service_t *server = NULL;
654
655 if (gio_map == NULL) {
656 gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
657 }
658
659 server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
660
661 if (server == NULL) {
662 crm_err("Could not create %s IPC server: %s (%d)",
663 name, pcmk_rc_str(errno), errno);
664 return NULL;
665 }
666
667 if (prio != QB_LOOP_MED) {
668 qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
669 }
670
671 /* All clients should use at least ipc_buffer_max as their buffer size */
672 qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
673 qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
674
675 rc = qb_ipcs_run(server);
676 if (rc < 0) {
677 crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
678 return NULL; // qb_ipcs_run() destroys server on failure
679 }
680
681 return server;
682 }
683
684 void
685 mainloop_del_ipc_server(qb_ipcs_service_t * server)
686 {
687 if (server) {
688 qb_ipcs_destroy(server);
689 }
690 }
691
692 struct mainloop_io_s {
693 char *name;
694 void *userdata;
695
696 int fd;
697 guint source;
698 crm_ipc_t *ipc;
699 GIOChannel *channel;
700
701 int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
702 int (*dispatch_fn_io) (gpointer userdata);
703 void (*destroy_fn) (gpointer userdata);
704
705 };
706
707 /*!
708 * \internal
709 * \brief I/O watch callback function (GIOFunc)
710 *
711 * \param[in] gio I/O channel being watched
712 * \param[in] condition I/O condition satisfied
713 * \param[in] data User data passed when source was created
714 *
715 * \return G_SOURCE_REMOVE to remove source, G_SOURCE_CONTINUE to keep it
716 */
717 static gboolean
|
(3) Event deallocator: |
Deallocator for "struct mainloop_io_s". |
| Also see events: |
[allocation][allocation] |
718 mainloop_gio_callback(GIOChannel *gio, GIOCondition condition, gpointer data)
719 {
720 gboolean rc = G_SOURCE_CONTINUE;
721 mainloop_io_t *client = data;
722
723 pcmk__assert(client->fd == g_io_channel_unix_get_fd(gio));
724
725 if (condition & G_IO_IN) {
726 if (client->ipc) {
727 long read_rc = 0L;
728 int max = 10;
729
730 do {
731 read_rc = crm_ipc_read(client->ipc);
732 if (read_rc <= 0) {
733 crm_trace("Could not read IPC message from %s: %s (%ld)",
734 client->name, pcmk_strerror(read_rc), read_rc);
735
736 } else if (client->dispatch_fn_ipc) {
737 const char *buffer = crm_ipc_buffer(client->ipc);
738
739 crm_trace("New %ld-byte IPC message from %s "
740 "after I/O condition %d",
741 read_rc, client->name, (int) condition);
742 if (client->dispatch_fn_ipc(buffer, read_rc, client->userdata) < 0) {
743 crm_trace("Connection to %s no longer required", client->name);
744 rc = G_SOURCE_REMOVE;
745 }
746 }
747
748 } while ((rc == G_SOURCE_CONTINUE) && (read_rc > 0) && --max > 0);
749
750 } else {
751 crm_trace("New I/O event for %s after I/O condition %d",
752 client->name, (int) condition);
753 if (client->dispatch_fn_io) {
754 if (client->dispatch_fn_io(client->userdata) < 0) {
755 crm_trace("Connection to %s no longer required", client->name);
756 rc = G_SOURCE_REMOVE;
757 }
758 }
759 }
760 }
761
762 if (client->ipc && !crm_ipc_connected(client->ipc)) {
763 crm_err("Connection to %s closed " CRM_XS " client=%p condition=%d",
764 client->name, client, condition);
765 rc = G_SOURCE_REMOVE;
766
767 } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
768 crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
769 client->name, client, condition);
770 rc = G_SOURCE_REMOVE;
771
772 } else if ((condition & G_IO_IN) == 0) {
773 /*
774 #define GLIB_SYSDEF_POLLIN =1
775 #define GLIB_SYSDEF_POLLPRI =2
776 #define GLIB_SYSDEF_POLLOUT =4
777 #define GLIB_SYSDEF_POLLERR =8
778 #define GLIB_SYSDEF_POLLHUP =16
779 #define GLIB_SYSDEF_POLLNVAL =32
780
781 typedef enum
782 {
783 G_IO_IN GLIB_SYSDEF_POLLIN,
784 G_IO_OUT GLIB_SYSDEF_POLLOUT,
785 G_IO_PRI GLIB_SYSDEF_POLLPRI,
786 G_IO_ERR GLIB_SYSDEF_POLLERR,
787 G_IO_HUP GLIB_SYSDEF_POLLHUP,
788 G_IO_NVAL GLIB_SYSDEF_POLLNVAL
789 } GIOCondition;
790
791 A bitwise combination representing a condition to watch for on an event source.
792
793 G_IO_IN There is data to read.
794 G_IO_OUT Data can be written (without blocking).
795 G_IO_PRI There is urgent data to read.
796 G_IO_ERR Error condition.
797 G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
798 G_IO_NVAL Invalid request. The file descriptor is not open.
799 */
800 crm_err("Strange condition: %d", condition);
801 }
802
803 /* G_SOURCE_REMOVE results in mainloop_gio_destroy() being called
804 * just before the source is removed from mainloop
805 */
806 return rc;
807 }
808
809 static void
810 mainloop_gio_destroy(gpointer c)
811 {
812 mainloop_io_t *client = c;
813 char *c_name = strdup(client->name);
814
815 /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
816 * client->channel will still have ref_count > 0... should be == 1
817 */
818 crm_trace("Destroying client %s[%p]", c_name, c);
819
820 if (client->ipc) {
821 crm_ipc_close(client->ipc);
822 }
823
824 if (client->destroy_fn) {
825 void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
826
827 client->destroy_fn = NULL;
828 destroy_fn(client->userdata);
829 }
830
831 if (client->ipc) {
832 crm_ipc_t *ipc = client->ipc;
833
834 client->ipc = NULL;
835 crm_ipc_destroy(ipc);
836 }
837
838 crm_trace("Destroyed client %s[%p]", c_name, c);
839
840 free(client->name); client->name = NULL;
841 free(client);
842
843 free(c_name);
844 }
845
846 /*!
847 * \brief Connect to IPC and add it as a main loop source
848 *
849 * \param[in,out] ipc IPC connection to add
850 * \param[in] priority Event source priority to use for connection
851 * \param[in] userdata Data to register with callbacks
852 * \param[in] callbacks Dispatch and destroy callbacks for connection
853 * \param[out] source Newly allocated event source
854 *
855 * \return Standard Pacemaker return code
856 *
857 * \note On failure, the caller is still responsible for ipc. On success, the
858 * caller should call mainloop_del_ipc_client() when source is no longer
859 * needed, which will lead to the disconnection of the IPC later in the
860 * main loop if it is connected. However the IPC disconnects,
861 * mainloop_gio_destroy() will free ipc and source after calling the
862 * destroy callback.
863 */
864 int
865 pcmk__add_mainloop_ipc(crm_ipc_t *ipc, int priority, void *userdata,
866 const struct ipc_client_callbacks *callbacks,
867 mainloop_io_t **source)
868 {
869 int rc = pcmk_rc_ok;
870 int fd = -1;
871 const char *ipc_name = NULL;
872
873 CRM_CHECK((ipc != NULL) && (callbacks != NULL), return EINVAL);
874
875 ipc_name = pcmk__s(crm_ipc_name(ipc), "Pacemaker");
876 rc = pcmk__connect_generic_ipc(ipc);
877 if (rc != pcmk_rc_ok) {
878 crm_debug("Connection to %s failed: %s", ipc_name, pcmk_rc_str(rc));
879 return rc;
880 }
881
882 rc = pcmk__ipc_fd(ipc, &fd);
883 if (rc != pcmk_rc_ok) {
884 crm_debug("Could not obtain file descriptor for %s IPC: %s",
885 ipc_name, pcmk_rc_str(rc));
886 crm_ipc_close(ipc);
887 return rc;
888 }
889
890 *source = mainloop_add_fd(ipc_name, priority, fd, userdata, NULL);
891 if (*source == NULL) {
892 rc = errno;
893 crm_ipc_close(ipc);
894 return rc;
895 }
896
897 (*source)->ipc = ipc;
898 (*source)->destroy_fn = callbacks->destroy;
899 (*source)->dispatch_fn_ipc = callbacks->dispatch;
900 return pcmk_rc_ok;
901 }
902
903 /*!
904 * \brief Get period for mainloop timer
905 *
906 * \param[in] timer Timer
907 *
908 * \return Period in ms
909 */
910 guint
911 pcmk__mainloop_timer_get_period(const mainloop_timer_t *timer)
912 {
913 if (timer) {
914 return timer->period_ms;
915 }
916 return 0;
917 }
918
919 mainloop_io_t *
920 mainloop_add_ipc_client(const char *name, int priority, size_t max_size,
921 void *userdata, struct ipc_client_callbacks *callbacks)
922 {
923 crm_ipc_t *ipc = crm_ipc_new(name, max_size);
924 mainloop_io_t *source = NULL;
925 int rc = pcmk__add_mainloop_ipc(ipc, priority, userdata, callbacks,
926 &source);
927
928 if (rc != pcmk_rc_ok) {
929 if (crm_log_level == LOG_STDOUT) {
930 fprintf(stderr, "Connection to %s failed: %s",
931 name, pcmk_rc_str(rc));
932 }
933 crm_ipc_destroy(ipc);
934 if (rc > 0) {
935 errno = rc;
936 } else {
937 errno = ENOTCONN;
938 }
939 return NULL;
940 }
941 return source;
942 }
943
944 void
945 mainloop_del_ipc_client(mainloop_io_t * client)
946 {
947 mainloop_del_fd(client);
948 }
949
950 crm_ipc_t *
951 mainloop_get_ipc_client(mainloop_io_t * client)
952 {
953 if (client) {
954 return client->ipc;
955 }
956 return NULL;
957 }
958
959 mainloop_io_t *
960 mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
961 struct mainloop_fd_callbacks * callbacks)
962 {
963 mainloop_io_t *client = NULL;
964
965 if (fd >= 0) {
966 client = calloc(1, sizeof(mainloop_io_t));
967 if (client == NULL) {
968 return NULL;
969 }
970 client->name = strdup(name);
971 client->userdata = userdata;
972
973 if (callbacks) {
974 client->destroy_fn = callbacks->destroy;
975 client->dispatch_fn_io = callbacks->dispatch;
976 }
977
978 client->fd = fd;
|
CID (unavailable; MK=b13c37cce9c4cfa0c98303204ab57333) (#1 of 1): Resource not released (INCOMPLETE_DEALLOCATOR): |
|
(1) Event allocation: |
Memory is allocated. |
|
(2) Event allocation: |
The field "client->channel" is allocated, but not released in the identified deallocator. |
| Also see events: |
[deallocator] |
979 client->channel = g_io_channel_unix_new(fd);
980 client->source =
981 g_io_add_watch_full(client->channel, priority,
982 (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
983 client, mainloop_gio_destroy);
984
985 /* Now that mainloop now holds a reference to channel,
986 * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
987 *
988 * This means that channel will be free'd by:
989 * g_main_context_dispatch() or g_source_remove()
990 * -> g_source_destroy_internal()
991 * -> g_source_callback_unref()
992 * shortly after mainloop_gio_destroy() completes
993 */
994 g_io_channel_unref(client->channel);
995 crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
996 } else {
997 errno = EINVAL;
998 }
999
1000 return client;
1001 }
1002
1003 void
1004 mainloop_del_fd(mainloop_io_t * client)
1005 {
1006 if (client != NULL) {
1007 crm_trace("Removing client %s[%p]", client->name, client);
1008 if (client->source) {
1009 /* Results in mainloop_gio_destroy() being called just
1010 * before the source is removed from mainloop
1011 */
1012 g_source_remove(client->source);
1013 }
1014 }
1015 }
1016
1017 static GList *child_list = NULL;
1018
1019 pid_t
1020 mainloop_child_pid(mainloop_child_t * child)
1021 {
1022 return child->pid;
1023 }
1024
1025 const char *
1026 mainloop_child_name(mainloop_child_t * child)
1027 {
1028 return child->desc;
1029 }
1030
1031 int
1032 mainloop_child_timeout(mainloop_child_t * child)
1033 {
1034 return child->timeout;
1035 }
1036
1037 void *
1038 mainloop_child_userdata(mainloop_child_t * child)
1039 {
1040 return child->privatedata;
1041 }
1042
1043 void
1044 mainloop_clear_child_userdata(mainloop_child_t * child)
1045 {
1046 child->privatedata = NULL;
1047 }
1048
1049 /* good function name */
1050 static void
1051 child_free(mainloop_child_t *child)
1052 {
1053 if (child->timerid != 0) {
1054 crm_trace("Removing timer %d", child->timerid);
1055 g_source_remove(child->timerid);
1056 child->timerid = 0;
1057 }
1058 free(child->desc);
1059 free(child);
1060 }
1061
1062 /* terrible function name */
1063 static int
1064 child_kill_helper(mainloop_child_t *child)
1065 {
1066 int rc;
1067 if (child->flags & mainloop_leave_pid_group) {
1068 crm_debug("Kill pid %d only. leave group intact.", child->pid);
1069 rc = kill(child->pid, SIGKILL);
1070 } else {
1071 crm_debug("Kill pid %d's group", child->pid);
1072 rc = kill(-child->pid, SIGKILL);
1073 }
1074
1075 if (rc < 0) {
1076 if (errno != ESRCH) {
1077 crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
1078 }
1079 return -errno;
1080 }
1081 return 0;
1082 }
1083
1084 static gboolean
1085 child_timeout_callback(gpointer p)
1086 {
1087 mainloop_child_t *child = p;
1088 int rc = 0;
1089
1090 child->timerid = 0;
1091 if (child->timeout) {
1092 crm_warn("%s process (PID %d) will not die!", child->desc, (int)child->pid);
1093 return FALSE;
1094 }
1095
1096 rc = child_kill_helper(child);
1097 if (rc == -ESRCH) {
1098 /* Nothing left to do. pid doesn't exist */
1099 return FALSE;
1100 }
1101
1102 child->timeout = TRUE;
1103 crm_debug("%s process (PID %d) timed out", child->desc, (int)child->pid);
1104
1105 child->timerid = g_timeout_add(5000, child_timeout_callback, child);
1106 return FALSE;
1107 }
1108
1109 static bool
1110 child_waitpid(mainloop_child_t *child, int flags)
1111 {
1112 int rc = 0;
1113 int core = 0;
1114 int signo = 0;
1115 int status = 0;
1116 int exitcode = 0;
1117 bool callback_needed = true;
1118
1119 rc = waitpid(child->pid, &status, flags);
1120 if (rc == 0) { // WNOHANG in flags, and child status is not available
1121 crm_trace("Child process %d (%s) still active",
1122 child->pid, child->desc);
1123 callback_needed = false;
1124
1125 } else if (rc != child->pid) {
1126 /* According to POSIX, possible conditions:
1127 * - child->pid was non-positive (process group or any child),
1128 * and rc is specific child
1129 * - errno ECHILD (pid does not exist or is not child)
1130 * - errno EINVAL (invalid flags)
1131 * - errno EINTR (caller interrupted by signal)
1132 *
1133 * @TODO Handle these cases more specifically.
1134 */
1135 signo = SIGCHLD;
1136 exitcode = 1;
1137 crm_notice("Wait for child process %d (%s) interrupted: %s",
1138 child->pid, child->desc, pcmk_rc_str(errno));
1139
1140 } else if (WIFEXITED(status)) {
1141 exitcode = WEXITSTATUS(status);
1142 crm_trace("Child process %d (%s) exited with status %d",
1143 child->pid, child->desc, exitcode);
1144
1145 } else if (WIFSIGNALED(status)) {
1146 signo = WTERMSIG(status);
1147 crm_trace("Child process %d (%s) exited with signal %d (%s)",
1148 child->pid, child->desc, signo, strsignal(signo));
1149
1150 #ifdef WCOREDUMP // AIX, SunOS, maybe others
1151 } else if (WCOREDUMP(status)) {
1152 core = 1;
1153 crm_err("Child process %d (%s) dumped core",
1154 child->pid, child->desc);
1155 #endif
1156
1157 } else { // flags must contain WUNTRACED and/or WCONTINUED to reach this
1158 crm_trace("Child process %d (%s) stopped or continued",
1159 child->pid, child->desc);
1160 callback_needed = false;
1161 }
1162
1163 if (callback_needed && child->callback) {
1164 child->callback(child, child->pid, core, signo, exitcode);
1165 }
1166 return callback_needed;
1167 }
1168
1169 static void
1170 child_death_dispatch(int signal)
1171 {
1172 for (GList *iter = child_list; iter; ) {
1173 GList *saved = iter;
1174 mainloop_child_t *child = iter->data;
1175
1176 iter = iter->next;
1177 if (child_waitpid(child, WNOHANG)) {
1178 crm_trace("Removing completed process %d from child list",
1179 child->pid);
1180 child_list = g_list_remove_link(child_list, saved);
1181 g_list_free(saved);
1182 child_free(child);
1183 }
1184 }
1185 }
1186
1187 static gboolean
1188 child_signal_init(gpointer p)
1189 {
1190 crm_trace("Installed SIGCHLD handler");
1191 /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
1192 mainloop_add_signal(SIGCHLD, child_death_dispatch);
1193
1194 /* In case they terminated before the signal handler was installed */
1195 child_death_dispatch(SIGCHLD);
1196 return FALSE;
1197 }
1198
1199 gboolean
1200 mainloop_child_kill(pid_t pid)
1201 {
1202 GList *iter;
1203 mainloop_child_t *child = NULL;
1204 mainloop_child_t *match = NULL;
1205 /* It is impossible to block SIGKILL, this allows us to
1206 * call waitpid without WNOHANG flag.*/
1207 int waitflags = 0, rc = 0;
1208
1209 for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1210 child = iter->data;
1211 if (pid == child->pid) {
1212 match = child;
1213 }
1214 }
1215
1216 if (match == NULL) {
1217 return FALSE;
1218 }
1219
1220 rc = child_kill_helper(match);
1221 if(rc == -ESRCH) {
1222 /* It's gone, but hasn't shown up in waitpid() yet. Wait until we get
1223 * SIGCHLD and let handler clean it up as normal (so we get the correct
1224 * return code/status). The blocking alternative would be to call
1225 * child_waitpid(match, 0).
1226 */
1227 crm_trace("Waiting for signal that child process %d completed",
1228 match->pid);
1229 return TRUE;
1230
1231 } else if(rc != 0) {
1232 /* If KILL for some other reason set the WNOHANG flag since we
1233 * can't be certain what happened.
1234 */
1235 waitflags = WNOHANG;
1236 }
1237
1238 if (!child_waitpid(match, waitflags)) {
1239 /* not much we can do if this occurs */
1240 return FALSE;
1241 }
1242
1243 child_list = g_list_remove(child_list, match);
1244 child_free(match);
1245 return TRUE;
1246 }
1247
1248 /* Create/Log a new tracked process
1249 * To track a process group, use -pid
1250 *
1251 * @TODO Using a non-positive pid (i.e. any child, or process group) would
1252 * likely not be useful since we will free the child after the first
1253 * completed process.
1254 */
1255 void
1256 mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
1257 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1258 {
1259 static bool need_init = TRUE;
1260 mainloop_child_t *child = pcmk__assert_alloc(1, sizeof(mainloop_child_t));
1261
1262 child->pid = pid;
1263 child->timerid = 0;
1264 child->timeout = FALSE;
1265 child->privatedata = privatedata;
1266 child->callback = callback;
1267 child->flags = flags;
1268 child->desc = pcmk__str_copy(desc);
1269
1270 if (timeout) {
1271 child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
1272 }
1273
1274 child_list = g_list_append(child_list, child);
1275
1276 if(need_init) {
1277 need_init = FALSE;
1278 /* SIGCHLD processing has to be invoked from mainloop.
1279 * We do not want it to be possible to both add a child pid
1280 * to mainloop, and have the pid's exit callback invoked within
1281 * the same callstack. */
1282 g_timeout_add(1, child_signal_init, NULL);
1283 }
1284 }
1285
1286 void
1287 mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1288 void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1289 {
1290 mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
1291 }
1292
1293 static gboolean
1294 mainloop_timer_cb(gpointer user_data)
1295 {
1296 int id = 0;
1297 bool repeat = FALSE;
1298 struct mainloop_timer_s *t = user_data;
1299
1300 pcmk__assert(t != NULL);
1301
1302 id = t->id;
1303 t->id = 0; /* Ensure it's unset during callbacks so that
1304 * mainloop_timer_running() works as expected
1305 */
1306
1307 if(t->cb) {
1308 crm_trace("Invoking callbacks for timer %s", t->name);
1309 repeat = t->repeat;
1310 if(t->cb(t->userdata) == FALSE) {
1311 crm_trace("Timer %s complete", t->name);
1312 repeat = FALSE;
1313 }
1314 }
1315
1316 if(repeat) {
1317 /* Restore if repeating */
1318 t->id = id;
1319 }
1320
1321 return repeat;
1322 }
1323
1324 bool
1325 mainloop_timer_running(mainloop_timer_t *t)
1326 {
1327 if(t && t->id != 0) {
1328 return TRUE;
1329 }
1330 return FALSE;
1331 }
1332
1333 void
1334 mainloop_timer_start(mainloop_timer_t *t)
1335 {
1336 mainloop_timer_stop(t);
1337 if(t && t->period_ms > 0) {
1338 crm_trace("Starting timer %s", t->name);
1339 t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
1340 }
1341 }
1342
1343 void
1344 mainloop_timer_stop(mainloop_timer_t *t)
1345 {
1346 if(t && t->id != 0) {
1347 crm_trace("Stopping timer %s", t->name);
1348 g_source_remove(t->id);
1349 t->id = 0;
1350 }
1351 }
1352
1353 guint
1354 mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
1355 {
1356 guint last = 0;
1357
1358 if(t) {
1359 last = t->period_ms;
1360 t->period_ms = period_ms;
1361 }
1362
1363 if(t && t->id != 0 && last != t->period_ms) {
1364 mainloop_timer_start(t);
1365 }
1366 return last;
1367 }
1368
1369 mainloop_timer_t *
1370 mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1371 {
1372 mainloop_timer_t *t = pcmk__assert_alloc(1, sizeof(mainloop_timer_t));
1373
1374 if (name != NULL) {
1375 t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
1376 } else {
1377 t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
1378 }
1379 t->id = 0;
1380 t->period_ms = period_ms;
1381 t->repeat = repeat;
1382 t->cb = cb;
1383 t->userdata = userdata;
1384 crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
1385 return t;
1386 }
1387
1388 void
1389 mainloop_timer_del(mainloop_timer_t *t)
1390 {
1391 if(t) {
1392 crm_trace("Destroying timer %s", t->name);
1393 mainloop_timer_stop(t);
1394 free(t->name);
1395 free(t);
1396 }
1397 }
1398
1399 /*
1400 * Helpers to make sure certain events aren't lost at shutdown
1401 */
1402
1403 static gboolean
1404 drain_timeout_cb(gpointer user_data)
1405 {
1406 bool *timeout_popped = (bool*) user_data;
1407
1408 *timeout_popped = TRUE;
1409 return FALSE;
1410 }
1411
1412 /*!
1413 * \brief Drain some remaining main loop events then quit it
1414 *
1415 * \param[in,out] mloop Main loop to drain and quit
1416 * \param[in] n Drain up to this many pending events
1417 */
1418 void
1419 pcmk_quit_main_loop(GMainLoop *mloop, unsigned int n)
1420 {
1421 if ((mloop != NULL) && g_main_loop_is_running(mloop)) {
1422 GMainContext *ctx = g_main_loop_get_context(mloop);
1423
1424 /* Drain up to n events in case some memory clean-up is pending
1425 * (helpful to reduce noise in valgrind output).
1426 */
1427 for (int i = 0; (i < n) && g_main_context_pending(ctx); ++i) {
1428 g_main_context_dispatch(ctx);
1429 }
1430 g_main_loop_quit(mloop);
1431 }
1432 }
1433
1434 /*!
1435 * \brief Process main loop events while a certain condition is met
1436 *
1437 * \param[in,out] mloop Main loop to process
1438 * \param[in] timer_ms Don't process longer than this amount of time
1439 * \param[in] check Function that returns true if events should be
1440 * processed
1441 *
1442 * \note This function is intended to be called at shutdown if certain important
1443 * events should not be missed. The caller would likely quit the main loop
1444 * or exit after calling this function. The check() function will be
1445 * passed the remaining timeout in milliseconds.
1446 */
1447 void
1448 pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
1449 {
1450 bool timeout_popped = FALSE;
1451 guint timer = 0;
1452 GMainContext *ctx = NULL;
1453
1454 CRM_CHECK(mloop && check, return);
1455
1456 ctx = g_main_loop_get_context(mloop);
1457 if (ctx) {
1458 time_t start_time = time(NULL);
1459
1460 timer = g_timeout_add(timer_ms, drain_timeout_cb, &timeout_popped);
1461 while (!timeout_popped
1462 && check(timer_ms - (time(NULL) - start_time) * 1000)) {
1463 g_main_context_iteration(ctx, TRUE);
1464 }
1465 }
1466 if (!timeout_popped && (timer > 0)) {
1467 g_source_remove(timer);
1468 }
1469 }
1470
1471 // Deprecated functions kept only for backward API compatibility
1472 // LCOV_EXCL_START
1473
1474 #include <crm/common/mainloop_compat.h>
1475
1476 gboolean
1477 crm_signal(int sig, void (*dispatch) (int sig))
1478 {
1479 return crm_signal_handler(sig, dispatch) != SIG_ERR;
1480 }
1481
1482 // LCOV_EXCL_STOP
1483 // End deprecated API
1484