1 /*
2 * Copyright 2004-2026 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
10 #include <crm_internal.h>
11
12 #include <stdbool.h>
13 #include <stdio.h>
14 #include <errno.h>
15 #include <bzlib.h>
16 #include <sys/stat.h>
17 #include <sys/types.h>
18
19 #include <crm/crm.h>
20 #include <crm/common/xml.h>
21 #include <crm/common/ipc.h>
22 #include "crmcommon_private.h"
23
24 /* Evict clients whose event queue grows this large (by default) */
25 #define PCMK_IPC_DEFAULT_QUEUE_MAX 500
26
27 static GHashTable *client_connections = NULL;
28
29 /*!
30 * \internal
31 * \brief Count IPC clients
32 *
33 * \return Number of active IPC client connections
34 */
35 guint
36 pcmk__ipc_client_count(void)
37 {
38 return client_connections? g_hash_table_size(client_connections) : 0;
39 }
40
41 /*!
42 * \internal
43 * \brief Execute a function for each active IPC client connection
44 *
45 * \param[in] func Function to call
46 * \param[in,out] user_data Pointer to pass to function
47 *
48 * \note The parameters are the same as for g_hash_table_foreach().
49 */
50 void
51 pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
52 {
53 pcmk__assert(func != NULL);
54
55 if (client_connections != NULL) {
56 g_hash_table_foreach(client_connections, func, user_data);
57 }
58 }
59
60 pcmk__client_t *
61 pcmk__find_client(const qb_ipcs_connection_t *c)
62 {
63 if (client_connections) {
64 return g_hash_table_lookup(client_connections, c);
65 }
66
67 pcmk__trace("No client found for %p", c);
68 return NULL;
69 }
70
71 pcmk__client_t *
72 pcmk__find_client_by_id(const char *id)
73 {
74 if ((client_connections != NULL) && (id != NULL)) {
75 gpointer key;
76 pcmk__client_t *client = NULL;
77 GHashTableIter iter;
78
79 g_hash_table_iter_init(&iter, client_connections);
80 while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
81 if (strcmp(client->id, id) == 0) {
82 return client;
83 }
84 }
85 }
86 pcmk__trace("No client found with id='%s'", pcmk__s(id, ""));
87 return NULL;
88 }
89
90 /*!
91 * \internal
92 * \brief Get a client identifier for use in log messages
93 *
94 * \param[in] c Client
95 *
96 * \return Client's name, client's ID, or a string literal, as available
97 * \note This is intended to be used in format strings like "client %s".
98 */
99 const char *
100 pcmk__client_name(const pcmk__client_t *c)
101 {
102 if (c == NULL) {
103 return "(unspecified)";
104
105 } else if (c->name != NULL) {
106 return c->name;
107
108 } else if (c->id != NULL) {
109 return c->id;
110
111 } else {
112 return "(unidentified)";
113 }
114 }
115
116 void
117 pcmk__client_cleanup(void)
118 {
|
(1) Event path: |
Condition "client_connections != NULL", taking true branch. |
119 if (client_connections != NULL) {
120 int active = g_hash_table_size(client_connections);
121
|
(2) Event path: |
Condition "active > 0", taking true branch. |
122 if (active > 0) {
|
(3) Event path: |
Condition "active == 1", taking true branch. |
123 pcmk__warn("Exiting with %d active IPC client%s", active,
124 pcmk__plural_s(active));
125 }
|
CID (unavailable; MK=7738de940091a0758ed0ae977b621ce2) (#1 of 1): Inconsistent C union access (INCONSISTENT_UNION_ACCESS): |
|
(4) Event assign_union_field: |
The union field "in" of "_pp" is written. |
|
(5) Event inconsistent_union_field_access: |
In "_pp.out", the union field used: "out" is inconsistent with the field most recently stored: "in". |
126 g_clear_pointer(&client_connections, g_hash_table_destroy);
127 }
128 }
129
130 void
131 pcmk__drop_all_clients(qb_ipcs_service_t *service)
132 {
133 qb_ipcs_connection_t *c = NULL;
134
135 if (service == NULL) {
136 return;
137 }
138
139 c = qb_ipcs_connection_first_get(service);
140
141 while (c != NULL) {
142 qb_ipcs_connection_t *last = c;
143
144 c = qb_ipcs_connection_next_get(service, last);
145
146 /* There really shouldn't be anyone connected at this point */
147 pcmk__notice("Disconnecting client %p, pid=%d...", last,
148 pcmk__client_pid(last));
149 qb_ipcs_disconnect(last);
150 qb_ipcs_connection_unref(last);
151 }
152 }
153
154 /*!
155 * \internal
156 * \brief Allocate a new pcmk__client_t object based on an IPC connection
157 *
158 * \param[in] c IPC connection (NULL to allocate generic client)
159 * \param[in] key Connection table key (NULL to use sane default)
160 * \param[in] uid_client UID corresponding to c (ignored if c is NULL)
161 *
162 * \return Pointer to new pcmk__client_t (guaranteed not to be \c NULL)
163 */
164 static pcmk__client_t *
165 client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client)
166 {
167 pcmk__client_t *client = pcmk__assert_alloc(1, sizeof(pcmk__client_t));
168
169 if (c) {
170 client->user = pcmk__uid2username(uid_client);
171 if (client->user == NULL) {
172 client->user = pcmk__str_copy("#unprivileged");
173 pcmk__err("Unable to enforce ACLs for user ID %d, assuming "
174 "unprivileged",
175 uid_client);
176 }
177 client->ipcs = c;
178 pcmk__set_client_flags(client, pcmk__client_ipc);
179 client->pid = pcmk__client_pid(c);
180 if (key == NULL) {
181 key = c;
182 }
183 }
184
185 client->id = pcmk__generate_uuid();
186 if (key == NULL) {
187 key = client->id;
188 }
189 if (client_connections == NULL) {
190 pcmk__trace("Creating IPC client table");
191 client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
192 }
193 g_hash_table_insert(client_connections, key, client);
194 return client;
195 }
196
197 /*!
198 * \brief Allocate a new pcmk__client_t object and generate its ID
199 *
200 * \param[in] key What to use as connections hash table key (NULL to use ID)
201 *
202 * \return Pointer to new pcmk__client_t (asserts on failure)
203 */
204 pcmk__client_t *
205 pcmk__new_unauth_client(void *key)
206 {
207 return client_from_connection(NULL, key, 0);
208 }
209
210 pcmk__client_t *
211 pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
212 {
213 gid_t uid_cluster = 0;
214 gid_t gid_cluster = 0;
215
216 pcmk__client_t *client = NULL;
217
218 CRM_CHECK(c != NULL, return NULL);
219
220 if (pcmk__daemon_user(&uid_cluster, &gid_cluster) != pcmk_rc_ok) {
221 static bool need_log = true;
222
223 if (need_log) {
224 pcmk__warn("Could not find user and group IDs for user "
225 CRM_DAEMON_USER);
226 need_log = false;
227 }
228 }
229
230 if (uid_client != 0) {
231 pcmk__trace("Giving group %u access to new IPC connection",
232 gid_cluster);
233 /* Passing -1 to chown(2) means don't change */
234 qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
235 }
236
237 /* TODO: Do our own auth checking, return NULL if unauthorized */
238 client = client_from_connection(c, NULL, uid_client);
239
240 if ((uid_client == 0) || (uid_client == uid_cluster)) {
241 /* Remember when a connection came from root or hacluster */
242 pcmk__set_client_flags(client, pcmk__client_privileged);
243 }
244
245 pcmk__debug("New IPC client %s for PID %u with uid %d and gid %d",
246 client->id, client->pid, uid_client, gid_client);
247 return client;
248 }
249
250 static struct iovec *
251 pcmk__new_ipc_event(void)
252 {
253 return (struct iovec *) pcmk__assert_alloc(2, sizeof(struct iovec));
254 }
255
256 /*!
257 * \brief Free an I/O vector created by pcmk__ipc_prepare_iov()
258 *
259 * \param[in,out] event I/O vector to free
260 */
261 void
262 pcmk_free_ipc_event(struct iovec *event)
263 {
264 if (event != NULL) {
265 free(event[0].iov_base);
266 free(event[1].iov_base);
267 free(event);
268 }
269 }
270
271 static void
272 free_event(gpointer data)
273 {
274 pcmk_free_ipc_event((struct iovec *) data);
275 }
276
277 static void
278 add_event(pcmk__client_t *c, struct iovec *iov)
279 {
280 if (c->event_queue == NULL) {
281 c->event_queue = g_queue_new();
282 }
283 g_queue_push_tail(c->event_queue, iov);
284 }
285
286 void
287 pcmk__free_client(pcmk__client_t *c)
288 {
289 if (c == NULL) {
290 return;
291 }
292
293 if (client_connections) {
294 if (c->ipcs) {
295 pcmk__trace("Destroying %p/%p (%u remaining)", c, c->ipcs,
296 (g_hash_table_size(client_connections) - 1));
297 g_hash_table_remove(client_connections, c->ipcs);
298
299 } else {
300 pcmk__trace("Destroying remote connection %p (%u remaining)", c,
301 (g_hash_table_size(client_connections) - 1));
302 g_hash_table_remove(client_connections, c->id);
303 }
304 }
305
306 if (c->event_timer) {
307 g_source_remove(c->event_timer);
308 }
309
310 if (c->event_queue) {
311 pcmk__debug("Destroying %d events", g_queue_get_length(c->event_queue));
312 g_queue_free_full(c->event_queue, free_event);
313 }
314
315 free(c->id);
316 free(c->name);
317 free(c->user);
318
319 if (c->buffer != NULL) {
320 g_byte_array_free(c->buffer, TRUE);
321 c->buffer = NULL;
322 }
323
324 if (c->remote) {
325 if (c->remote->auth_timeout) {
326 g_source_remove(c->remote->auth_timeout);
327 }
328 if (c->remote->tls_session != NULL) {
329 /* @TODO Reduce duplication at callers. Put here everything
330 * necessary to tear down and free tls_session.
331 */
332 gnutls_deinit(c->remote->tls_session);
333 }
334 free(c->remote->buffer);
335 free(c->remote);
336 }
337 free(c);
338 }
339
340 /*!
341 * \internal
342 * \brief Raise IPC eviction threshold for a client, if allowed
343 *
344 * \param[in,out] client Client to modify
345 * \param[in] qmax New threshold
346 */
347 void
348 pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
349 {
350 int rc = pcmk_rc_ok;
351 long long qmax_ll = 0LL;
352 unsigned int orig_value = 0U;
353
354 CRM_CHECK(client != NULL, return);
355
356 orig_value = client->queue_max;
357
358 if (pcmk__is_set(client->flags, pcmk__client_privileged)) {
359 rc = pcmk__scan_ll(qmax, &qmax_ll, 0LL);
360 if (rc == pcmk_rc_ok) {
361 if ((qmax_ll <= 0LL) || (qmax_ll > UINT_MAX)) {
362 rc = ERANGE;
363 } else {
364 client->queue_max = (unsigned int) qmax_ll;
365 }
366 }
367 } else {
368 rc = EACCES;
369 }
370
371 if (rc != pcmk_rc_ok) {
372 pcmk__info("Could not set IPC threshold for client %s[%u] to %s: %s",
373 pcmk__client_name(client), client->pid,
374 pcmk__s(qmax, "default"), pcmk_rc_str(rc));
375
376 } else if (client->queue_max != orig_value) {
377 pcmk__debug("IPC threshold for client %s[%u] is now %u (was %u)",
378 pcmk__client_name(client), client->pid, client->queue_max,
379 orig_value);
380 }
381 }
382
383 int
384 pcmk__client_pid(qb_ipcs_connection_t *c)
385 {
386 struct qb_ipcs_connection_stats stats;
387
388 stats.client_pid = 0;
389 qb_ipcs_connection_stats_get(c, &stats, 0);
390 return stats.client_pid;
391 }
392
393 /*!
394 * \internal
395 * \brief Retrieve message XML from data read from client IPC
396 *
397 * \param[in,out] c IPC client connection
398 * \param[out] id Where to store message ID from libqb header
399 * \param[out] flags Where to store flags from libqb header
400 *
401 * \return Message XML on success, NULL otherwise
402 */
403 xmlNode *
404 pcmk__client_data2xml(pcmk__client_t *c, uint32_t *id, uint32_t *flags)
405 {
406 xmlNode *xml = NULL;
407 pcmk__ipc_header_t *header = (void *) c->buffer->data;
408 char *text = (char *) header + sizeof(pcmk__ipc_header_t);
409
410 if (!pcmk__valid_ipc_header(header)) {
411 return NULL;
412 }
413
414 if (id) {
415 *id = header->qb.id;
416 }
417
418 if (flags) {
419 *flags = header->flags;
420 }
421
422 if (pcmk__is_set(header->flags, crm_ipc_proxied)) {
423 /* Mark this client as being the endpoint of a proxy connection.
424 * Proxy connections responses are sent on the event channel, to avoid
425 * blocking the controller serving as proxy.
426 */
427 pcmk__set_client_flags(c, pcmk__client_proxied);
428 }
429
430 pcmk__assert(text[header->size - 1] == 0);
431
432 xml = pcmk__xml_parse(text);
433 pcmk__log_xml_trace(xml, "[IPC received]");
434 return xml;
435 }
436
437 static int crm_ipcs_flush_events(pcmk__client_t *c);
438
439 static gboolean
440 crm_ipcs_flush_events_cb(gpointer data)
441 {
442 pcmk__client_t *c = data;
443
444 c->event_timer = 0;
445 crm_ipcs_flush_events(c);
446 return FALSE;
447 }
448
449 /*!
450 * \internal
451 * \brief Add progressive delay before next event queue flush
452 *
453 * \param[in,out] c Client connection to add delay to
454 * \param[in] queue_len Current event queue length
455 */
456 static inline void
457 delay_next_flush(pcmk__client_t *c, unsigned int queue_len)
458 {
459 /* Delay a maximum of 1.5 seconds */
460 guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
461
462 c->event_timer = pcmk__create_timer(delay, crm_ipcs_flush_events_cb, c);
463 }
464
465 /*!
466 * \internal
467 * \brief Send client any messages in its queue
468 *
469 * \param[in,out] c Client to flush
470 *
471 * \return Standard Pacemaker return value
472 */
473 static int
474 crm_ipcs_flush_events(pcmk__client_t *c)
475 {
476 int rc = pcmk_rc_ok;
477 ssize_t qb_rc = 0;
478 unsigned int sent = 0;
479 unsigned int queue_len = 0;
480
481 if (c == NULL) {
482 return rc;
483 }
484
485 if (c->event_timer != 0) {
486 /* There is already a timer, wait until it goes off */
487 pcmk__trace("Timer active for %p - %d", c->ipcs, c->event_timer);
488 return rc;
489 }
490
491 if (c->event_queue != NULL) {
492 queue_len = g_queue_get_length(c->event_queue);
493 }
494
495 while (sent < 100) {
496 pcmk__ipc_header_t *header = NULL;
497 struct iovec *event = NULL;
498
499 if ((c->event_queue == NULL) || g_queue_is_empty(c->event_queue)) {
500 break;
501 }
502
503 // We don't pop unless send is successful
504 event = g_queue_peek_head(c->event_queue);
505
506 /* Retry sending the event up to five times. If we get -EAGAIN, sleep
507 * a very short amount of time (too long here is bad) and try again.
508 * If we simply exit the while loop on -EAGAIN, we'll have to wait until
509 * the timer fires off again (up to 1.5 seconds - see delay_next_flush)
510 * to retry sending the message.
511 *
512 * In that case, the queue may just continue to grow faster than we are
513 * processing it, eventually leading to daemons timing out waiting for
514 * replies, which will cause wider failures.
515 */
516 for (unsigned int retries = 5; retries > 0; retries--) {
517 qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
518
519 if (qb_rc >= 0) {
520 break;
521 }
522
523 if (retries == 1 || qb_rc != -EAGAIN) {
524 rc = (int) -qb_rc;
525 goto no_more_retries;
526 }
527
528 pcmk__sleep_ms(5);
529 }
530
531 event = g_queue_pop_head(c->event_queue);
532
533 sent++;
534 header = event[0].iov_base;
535
536 pcmk__trace("Event %" PRId32 " to %p[%u] (%zd bytes) sent: %.120s",
537 header->qb.id, c->ipcs, c->pid, qb_rc,
538 (char *) (event[1].iov_base));
539 pcmk_free_ipc_event(event);
540 }
541
542 no_more_retries:
543 queue_len -= sent;
544 if (sent > 0 || queue_len) {
545 pcmk__trace("Sent %u events (%u remaining) for %p[%d]: %s (%zd)", sent,
546 queue_len, c->ipcs, c->pid, pcmk_rc_str(rc), qb_rc);
547 }
548
549 if (queue_len == 0) {
550 /* Event queue is empty, there is no backlog */
551 c->queue_backlog = 0;
552 return rc;
553 }
554
555 /* Allow clients to briefly fall behind on processing incoming messages,
556 * but drop completely unresponsive clients so the connection doesn't
557 * consume resources indefinitely.
558 */
559 if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
560 /* Don't evict:
561 * - Clients with a new backlog.
562 * - Clients with a shrinking backlog (the client is processing
563 * messages faster than the server is sending them).
564 * - Clients that are pacemaker daemons and have had any messages sent
565 * to them in this flush call (the server is sending messages faster
566 * than the client is processing them, but the client is not dead).
567 */
568 if ((c->queue_backlog <= 1)
569 || (queue_len < c->queue_backlog)
570 || ((sent > 0) && (pcmk__parse_server(c->name) != pcmk_ipc_unknown))) {
571 pcmk__warn("Client with process ID %u has a backlog of %u messages "
572 QB_XS " %p", c->pid, queue_len, c->ipcs);
573
574 } else {
575 pcmk__err("Evicting client with process ID %u due to backlog of %u "
576 "messages " QB_XS " %p",
577 c->pid, queue_len, c->ipcs);
578 c->queue_backlog = 0;
579 qb_ipcs_disconnect(c->ipcs);
580 return rc;
581 }
582 }
583
584 c->queue_backlog = queue_len;
585 delay_next_flush(c, queue_len);
586
587 return rc;
588 }
589
590 /*!
591 * \internal
592 * \brief Create an I/O vector for sending an IPC XML message
593 *
594 * If the message is too large to fit into a single buffer, this function will
595 * prepare an I/O vector that only holds as much as fits. The remainder can be
596 * prepared in a separate call by keeping a running count of the number of times
597 * this function has been called and passing that in for \p index.
598 *
599 * \param[in] request Identifier for libqb response header
600 * \param[in] message Message to send
601 * \param[in] index How many times this function has been called - basically,
602 * a count of how many chunks of \p message have already
603 * been sent
604 * \param[out] result Where to store prepared I/O vector - NULL on error
605 * \param[out] bytes Size of prepared data in bytes (includes header)
606 *
607 * \return Standard Pacemaker return code
608 */
609 int
610 pcmk__ipc_prepare_iov(uint32_t request, const GString *message, uint16_t index,
611 struct iovec **result, ssize_t *bytes)
612 {
613 struct iovec *iov = NULL;
614 unsigned int payload_size = 0;
615 unsigned int total = 0;
616 unsigned int max_send_size = crm_ipc_default_buffer_size();
617 unsigned int max_chunk_size = 0;
618 size_t offset = 0;
619 pcmk__ipc_header_t *header = NULL;
620 int rc = pcmk_rc_ok;
621
622 if ((message == NULL) || (result == NULL)) {
623 rc = EINVAL;
624 goto done;
625 }
626
627 header = calloc(1, sizeof(pcmk__ipc_header_t));
628 if (header == NULL) {
629 rc = ENOMEM;
630 goto done;
631 }
632
633 *result = NULL;
634 iov = pcmk__new_ipc_event();
635 iov[0].iov_len = sizeof(pcmk__ipc_header_t);
636 iov[0].iov_base = header;
637
638 header->version = PCMK__IPC_VERSION;
639
640 /* We are passed an index, which is basically how many times this function
641 * has been called. This is how we support multi-part IPC messages. We
642 * need to convert that into an offset into the buffer that we want to start
643 * reading from.
644 *
645 * Each call to this function can send max_send_size, but this also includes
646 * the header and a null terminator character for the end of the payload.
647 * We need to subtract those out here.
648 */
649 max_chunk_size = max_send_size - iov[0].iov_len - 1;
650 offset = index * max_chunk_size;
651
652 /* How much of message is left to send? This does not include the null
653 * terminator character.
654 */
655 payload_size = message->len - offset;
656
657 /* How much would be transmitted, including the header size and null
658 * terminator character for the buffer?
659 */
660 total = iov[0].iov_len + payload_size + 1;
661
662 if (total >= max_send_size) {
663 /* The entire packet is too big to fit in a single buffer. Calculate
664 * how much of it we can send - buffer size, minus header size, minus
665 * one for the null terminator.
666 */
667 payload_size = max_chunk_size;
668
669 header->size = payload_size + 1;
670
671 iov[1].iov_base = strndup(message->str + offset, payload_size);
672 if (iov[1].iov_base == NULL) {
673 rc = ENOMEM;
674 goto done;
675 }
676
677 iov[1].iov_len = header->size;
678 rc = pcmk_rc_ipc_more;
679
680 } else {
681 /* The entire packet fits in a single buffer. We can copy the entirety
682 * of it into the payload.
683 */
684 header->size = payload_size + 1;
685
686 iov[1].iov_base = pcmk__str_copy(message->str + offset);
687 iov[1].iov_len = header->size;
688 }
689
690 header->part_id = index;
691 header->qb.size = iov[0].iov_len + iov[1].iov_len;
692 header->qb.id = (int32_t)request; /* Replying to a specific request */
693
694 if ((rc == pcmk_rc_ok) && (index != 0)) {
695 pcmk__set_ipc_flags(header->flags, "multipart ipc",
696 crm_ipc_multipart | crm_ipc_multipart_end);
697 } else if (rc == pcmk_rc_ipc_more) {
698 pcmk__set_ipc_flags(header->flags, "multipart ipc",
699 crm_ipc_multipart);
700 }
701
702 *result = iov;
703 pcmk__assert(header->qb.size > 0);
704 if (bytes != NULL) {
705 *bytes = header->qb.size;
706 }
707
708 done:
709 if ((rc != pcmk_rc_ok) && (rc != pcmk_rc_ipc_more)) {
710 pcmk_free_ipc_event(iov);
711 }
712
713 return rc;
714 }
715
716 /* Return the next available ID for a server event.
717 *
718 * For the parts of a multipart event, all parts should have the same ID as
719 * the first part.
720 */
721 static uint32_t
722 id_for_server_event(pcmk__ipc_header_t *header)
723 {
724 static uint32_t id = 1;
725
726 if (pcmk__is_set(header->flags, crm_ipc_multipart)
727 && (header->part_id != 0)) {
728 return id;
729 } else {
730 id++;
731 return id;
732 }
733 }
734
735 int
736 pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
737 {
738 int rc = pcmk_rc_ok;
739 pcmk__ipc_header_t *header = iov[0].iov_base;
740
741 /* _ALL_ replies to proxied connections need to be sent as events */
742 if (pcmk__is_set(c->flags, pcmk__client_proxied)
743 && !pcmk__is_set(flags, crm_ipc_server_event)) {
744 /* The proxied flag lets us know this was originally meant to be a
745 * response, even though we're sending it over the event channel.
746 */
747 pcmk__set_ipc_flags(flags, "server event",
748 crm_ipc_server_event|crm_ipc_proxied_relay_response);
749 }
750
751 pcmk__set_ipc_flags(header->flags, "server event", flags);
752 if (pcmk__is_set(flags, crm_ipc_server_event)) {
753 /* Server events don't use an ID, though we do set one in
754 * pcmk__ipc_prepare_iov if the event is in response to a particular
755 * request. In that case, we don't want to set a new ID here that
756 * overwrites that one.
757 *
758 * @TODO: Since server event IDs aren't used anywhere, do we really
759 * need to set this for any reason other than ease of logging?
760 */
761 if (header->qb.id == 0) {
762 header->qb.id = id_for_server_event(header);
763 }
764
765 if (pcmk__is_set(flags, crm_ipc_server_free)) {
766 pcmk__trace("Sending the original to %p[%d]", c->ipcs, c->pid);
767 add_event(c, iov);
768
769 } else {
770 struct iovec *iov_copy = pcmk__new_ipc_event();
771
772 pcmk__trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
773 iov_copy[0].iov_len = iov[0].iov_len;
774 iov_copy[0].iov_base = malloc(iov[0].iov_len);
775 memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
776
777 iov_copy[1].iov_len = iov[1].iov_len;
778 iov_copy[1].iov_base = malloc(iov[1].iov_len);
779 memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
780
781 add_event(c, iov_copy);
782 }
783
784 rc = crm_ipcs_flush_events(c);
785
786 } else {
787 ssize_t qb_rc;
788 char *part_text = NULL;
789
790 CRM_LOG_ASSERT(header->qb.id != 0); /* Replying to a specific request */
791
792 if (pcmk__is_set(header->flags, crm_ipc_multipart_end)) {
793 part_text = pcmk__assert_asprintf(" (final part %d) ",
794 header->part_id);
795 } else if (pcmk__is_set(header->flags, crm_ipc_multipart)) {
796 if (header->part_id == 0) {
797 part_text = pcmk__assert_asprintf(" (initial part %d) ",
798 header->part_id);
799 } else {
800 part_text = pcmk__assert_asprintf(" (part %d) ",
801 header->part_id);
802 }
803 } else {
804 part_text = pcmk__str_copy(" ");
805 }
806
807 qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
808 if (qb_rc < header->qb.size) {
809 if (qb_rc < 0) {
810 rc = (int) -qb_rc;
811 }
812
813 } else {
814 pcmk__trace("Response %" PRId32 "%ssent, %zd bytes to %p[%u]",
815 header->qb.id, part_text, qb_rc, c->ipcs, c->pid);
816 pcmk__trace("Text = %s", (char *) iov[1].iov_base);
817 }
818
819 free(part_text);
820
821 if (pcmk__is_set(flags, crm_ipc_server_free)) {
822 pcmk_free_ipc_event(iov);
823 }
824
825 crm_ipcs_flush_events(c);
826 }
827
828 if ((rc == EPIPE) || (rc == ENOTCONN)) {
829 pcmk__trace("Client %p disconnected", c->ipcs);
830 }
831
832 return rc;
833 }
834
835 int
836 pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message,
837 uint32_t flags)
838 {
839 struct iovec *iov = NULL;
840 int rc = pcmk_rc_ok;
841 GString *iov_buffer = NULL;
842 uint16_t index = 0;
843 bool event_or_proxied = false;
844
845 if (c == NULL) {
846 return EINVAL;
847 }
848
849 iov_buffer = g_string_sized_new(1024);
850 pcmk__xml_string(message, 0, iov_buffer, 0);
851
852 /* Testing crm_ipc_server_event is obvious. pcmk__client_proxied is less
853 * obvious. According to pcmk__ipc_send_iov, replies to proxied connections
854 * need to be sent as events. However, do_local_notify (which calls this
855 * function) will clear all flags so we can't go just by crm_ipc_server_event.
856 *
857 * Changing do_local_notify to check for a proxied connection first results
858 * in processes on the Pacemaker Remote node (like cibadmin or crm_mon)
859 * timing out when waiting for a reply.
860 */
861 event_or_proxied = pcmk__is_set(flags, crm_ipc_server_event)
862 || pcmk__is_set(c->flags, pcmk__client_proxied);
863
864 do {
865 rc = pcmk__ipc_prepare_iov(request, iov_buffer, index, &iov, NULL);
866
867 switch (rc) {
868 case pcmk_rc_ok:
869 /* No more chunks to send after this one */
870 pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
871 rc = pcmk__ipc_send_iov(c, iov, flags);
872
873 if (event_or_proxied) {
874 if (rc == EAGAIN) {
875 /* Return pcmk_rc_ok instead so callers don't have to know
876 * whether they passed an event or not when interpreting
877 * the return code.
878 */
879 rc = pcmk_rc_ok;
880 }
881 } else {
882 /* EAGAIN is an error for IPC messages. We don't have a
883 * send queue for these, so we need to try again. If there
884 * was some other error, we need to break out of this loop
885 * and report it.
886 *
887 * FIXME: Retry limit for EAGAIN?
888 */
889 if (rc == EAGAIN) {
890 break;
891 }
892 }
893
894 goto done;
895
896 case pcmk_rc_ipc_more:
897 /* There are more chunks to send after this one */
898 pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
899 rc = pcmk__ipc_send_iov(c, iov, flags);
900
901 /* Did an error occur during transmission? */
902 if (event_or_proxied) {
903 /* EAGAIN is not an error for server events. The event
904 * will be queued for transmission and we will attempt
905 * sending it again the next time pcmk__ipc_send_iov is
906 * called, or when the crm_ipcs_flush_events_cb happens.
907 */
908 if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
909 goto done;
910 }
911
912 index++;
913 break;
914
915 } else {
916 /* EAGAIN is an error for IPC messages. We don't have a
917 * send queue for these, so we need to try again. If there
918 * was some other error, we need to break out of this loop
919 * and report it.
920 *
921 * FIXME: Retry limit for EAGAIN?
922 */
923 if (rc == pcmk_rc_ok) {
924 index++;
925 break;
926 } else if (rc == EAGAIN) {
927 break;
928 } else {
929 goto done;
930 }
931 }
932
933 default:
934 /* An error occurred during preparation */
935 goto done;
936 }
937 } while (true);
938
939 done:
940 if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
941 pcmk__notice("IPC message to pid %u failed: %s " QB_XS " rc=%d", c->pid,
942 pcmk_rc_str(rc), rc);
943 }
944
945 g_string_free(iov_buffer, TRUE);
946 return rc;
947 }
948
949 /*!
950 * \internal
951 * \brief Create an acknowledgement with a status code to send to a client
952 *
953 * \param[in] function Calling function
954 * \param[in] line Source file line within calling function
955 * \param[in] flags IPC flags to use when sending
956 * \param[in] ver IPC protocol version (can be NULL)
957 * \param[in] status Exit status code to add to ack
958 *
959 * \return Newly created XML for ack
960 *
961 * \note The caller is responsible for freeing the return value with
962 * \c pcmk__xml_free().
963 */
964 xmlNode *
965 pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags,
966 const char *ver, crm_exit_t status)
967 {
968 xmlNode *ack = NULL;
969
970 if (!pcmk__is_set(flags, crm_ipc_client_response)) {
971 return NULL;
972 }
973
974 ack = pcmk__xe_create(NULL, PCMK__XE_ACK);
975 pcmk__xe_set(ack, PCMK_XA_FUNCTION, function);
976 pcmk__xe_set_int(ack, PCMK__XA_LINE, line);
977 pcmk__xe_set_int(ack, PCMK_XA_STATUS, (int) status);
978 pcmk__xe_set(ack, PCMK__XA_IPC_PROTO_VERSION, ver);
979 return ack;
980 }
981
982 /*!
983 * \internal
984 * \brief Send an acknowledgement with a status code to a client
985 *
986 * \param[in] function Calling function
987 * \param[in] line Source file line within calling function
988 * \param[in] c Client to send ack to
989 * \param[in] request Request ID being replied to
990 * \param[in] flags IPC flags to use when sending
991 * \param[in] ver IPC protocol version (can be NULL)
992 * \param[in] status Status code to send with acknowledgement
993 *
994 * \return Standard Pacemaker return code
995 */
996 int
997 pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c,
998 uint32_t request, uint32_t flags, const char *ver,
999 crm_exit_t status)
1000 {
1001 int rc = pcmk_rc_ok;
1002 xmlNode *ack = pcmk__ipc_create_ack_as(function, line, flags, ver, status);
1003
1004 if (ack == NULL) {
1005 return pcmk_rc_ok;
1006 }
1007
1008 pcmk__trace("Ack'ing IPC message from client %s as <" PCMK__XE_ACK
1009 " status=%d>",
1010 pcmk__client_name(c), status);
1011 pcmk__log_xml_trace(ack, "sent-ack");
1012 c->request_id = 0;
1013 rc = pcmk__ipc_send_xml(c, request, ack, flags);
1014 pcmk__xml_free(ack);
1015 return rc;
1016 }
1017
1018 /*!
1019 * \internal
1020 * \brief Add an IPC server to the main loop for the CIB manager API
1021 *
1022 * \param[out] ipcs_ro New IPC server for read-only CIB manager API
1023 * \param[out] ipcs_rw New IPC server for read/write CIB manager API
1024 * \param[out] ipcs_shm New IPC server for shared-memory CIB manager API
1025 * \param[in] ro_cb IPC callbacks for read-only API
1026 * \param[in] rw_cb IPC callbacks for read/write and shared-memory APIs
1027 *
1028 * \note This function exits fatally on error.
1029 * \note There is no actual difference between the three IPC endpoints other
1030 * than their names.
1031 */
1032 void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro,
1033 qb_ipcs_service_t **ipcs_rw,
1034 qb_ipcs_service_t **ipcs_shm,
1035 struct qb_ipcs_service_handlers *ro_cb,
1036 struct qb_ipcs_service_handlers *rw_cb)
1037 {
1038 *ipcs_ro = mainloop_add_ipc_server(PCMK__SERVER_BASED_RO,
1039 QB_IPC_NATIVE, ro_cb);
1040
1041 *ipcs_rw = mainloop_add_ipc_server(PCMK__SERVER_BASED_RW,
1042 QB_IPC_NATIVE, rw_cb);
1043
1044 *ipcs_shm = mainloop_add_ipc_server(PCMK__SERVER_BASED_SHM,
1045 QB_IPC_SHM, rw_cb);
1046
1047 if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
1048 pcmk__crit("Failed to create %s IPC server; shutting down",
1049 pcmk__server_log_name(pcmk_ipc_based));
1050 pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1051 "enabled");
1052 crm_exit(CRM_EX_FATAL);
1053 }
1054 }
1055
1056 /*!
1057 * \internal
1058 * \brief Destroy IPC servers for the CIB manager API
1059 *
1060 * \param[out] ipcs_ro IPC server for read-only the CIB manager API
1061 * \param[out] ipcs_rw IPC server for read/write the CIB manager API
1062 * \param[out] ipcs_shm IPC server for shared-memory the CIB manager API
1063 *
1064 * \note This is a convenience function for calling qb_ipcs_destroy() for each
1065 * argument.
1066 */
1067 void
1068 pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro,
1069 qb_ipcs_service_t *ipcs_rw,
1070 qb_ipcs_service_t *ipcs_shm)
1071 {
1072 qb_ipcs_destroy(ipcs_ro);
1073 qb_ipcs_destroy(ipcs_rw);
1074 qb_ipcs_destroy(ipcs_shm);
1075 }
1076
1077 /*!
1078 * \internal
1079 * \brief Add an IPC server to the main loop for the controller API
1080 *
1081 * \param[in] cb IPC callbacks
1082 *
1083 * \return Newly created IPC server
1084 */
1085 qb_ipcs_service_t *
1086 pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
1087 {
1088 return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb);
1089 }
1090
1091 /*!
1092 * \internal
1093 * \brief Add an IPC server to the main loop for the attribute manager API
1094 *
1095 * \param[out] ipcs Where to store newly created IPC server
1096 * \param[in] cb IPC callbacks
1097 *
1098 * \note This function exits fatally on error.
1099 */
1100 void
1101 pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs,
1102 struct qb_ipcs_service_handlers *cb)
1103 {
1104 *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_attrd),
1105 QB_IPC_NATIVE, cb);
1106
1107 if (*ipcs == NULL) {
1108 pcmk__crit("Failed to create %s IPC server; shutting down",
1109 pcmk__server_log_name(pcmk_ipc_attrd));
1110 pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1111 "enabled");
1112 crm_exit(CRM_EX_FATAL);
1113 }
1114 }
1115
1116 /*!
1117 * \internal
1118 * \brief Add an IPC server to the main loop for the executor API
1119 *
1120 * \param[out] ipcs Where to store newly created IPC server
1121 * \param[in] cb IPC callbacks
1122 *
1123 * \note This function exits fatally on error.
1124 */
1125 void
1126 pcmk__serve_execd_ipc(qb_ipcs_service_t **ipcs,
1127 struct qb_ipcs_service_handlers *cb)
1128 {
1129 *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_execd),
1130 QB_IPC_SHM, cb);
1131
1132 if (*ipcs == NULL) {
1133 pcmk__crit("Failed to create %s IPC server; shutting down",
1134 pcmk__server_log_name(pcmk_ipc_execd));
1135 crm_exit(CRM_EX_FATAL);
1136 }
1137 }
1138
1139 /*!
1140 * \internal
1141 * \brief Add an IPC server to the main loop for the fencer API
1142 *
1143 * \param[out] ipcs Where to store newly created IPC server
1144 * \param[in] cb IPC callbacks
1145 *
1146 * \note This function exits fatally on error.
1147 */
1148 void
1149 pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs,
1150 struct qb_ipcs_service_handlers *cb)
1151 {
1152 *ipcs = mainloop_add_ipc_server_with_prio(pcmk__server_ipc_name(pcmk_ipc_fenced),
1153 QB_IPC_NATIVE, cb, QB_LOOP_HIGH);
1154
1155 if (*ipcs == NULL) {
1156 pcmk__crit("Failed to create %s IPC server; shutting down",
1157 pcmk__server_log_name(pcmk_ipc_fenced));
1158 pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1159 "enabled");
1160 crm_exit(CRM_EX_FATAL);
1161 }
1162 }
1163
1164 /*!
1165 * \internal
1166 * \brief Add an IPC server to the main loop for the pacemakerd API
1167 *
1168 * \param[out] ipcs Where to store newly created IPC server
1169 * \param[in] cb IPC callbacks
1170 *
1171 * \note This function exits with CRM_EX_OSERR on error.
1172 */
1173 void
1174 pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs,
1175 struct qb_ipcs_service_handlers *cb)
1176 {
1177 *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_pacemakerd),
1178 QB_IPC_NATIVE, cb);
1179
1180 if (*ipcs == NULL) {
1181 pcmk__crit("Failed to create %s IPC server; shutting down",
1182 pcmk__server_log_name(pcmk_ipc_pacemakerd));
1183 pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1184 "enabled");
1185 /* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL
1186 * if we want to prevent pacemakerd from restarting them.
1187 * With pacemakerd we leave the exit-code shown to e.g. systemd
1188 * to what it was prior to moving the code here from pacemakerd.c
1189 */
1190 crm_exit(CRM_EX_OSERR);
1191 }
1192 }
1193
1194 /*!
1195 * \internal
1196 * \brief Add an IPC server to the main loop for the scheduler API
1197 *
1198 * \param[out] ipcs Where to store newly created IPC server
1199 * \param[in] cb IPC callbacks
1200 *
1201 * \return Newly created IPC server
1202 * \note This function exits fatally on error.
1203 */
1204 void
1205 pcmk__serve_schedulerd_ipc(qb_ipcs_service_t **ipcs,
1206 struct qb_ipcs_service_handlers *cb)
1207 {
1208 *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_schedulerd),
1209 QB_IPC_NATIVE, cb);
1210
1211 if (*ipcs == NULL) {
1212 pcmk__crit("Failed to create %s IPC server; shutting down",
1213 pcmk__server_log_name(pcmk_ipc_schedulerd));
1214 crm_exit(CRM_EX_FATAL);
1215 }
1216 }
1217