1 /*
2 * Copyright 2004-2026 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
10 #include <crm_internal.h>
11
12 #include <stdbool.h>
13 #include <stdio.h>
14 #include <errno.h>
15 #include <bzlib.h>
16 #include <sys/stat.h>
17 #include <sys/types.h>
18
19 #include <crm/crm.h>
20 #include <crm/common/xml.h>
21 #include <crm/common/ipc.h>
22 #include "crmcommon_private.h"
23
24 /* Evict clients whose event queue grows this large (by default) */
25 #define PCMK_IPC_DEFAULT_QUEUE_MAX 500
26
27 static GHashTable *client_connections = NULL;
28
29 /*!
30 * \internal
31 * \brief Count IPC clients
32 *
33 * \return Number of active IPC client connections
34 */
35 guint
36 pcmk__ipc_client_count(void)
37 {
38 return client_connections? g_hash_table_size(client_connections) : 0;
39 }
40
41 /*!
42 * \internal
43 * \brief Execute a function for each active IPC client connection
44 *
45 * \param[in] func Function to call
46 * \param[in,out] user_data Pointer to pass to function
47 *
48 * \note The parameters are the same as for g_hash_table_foreach().
49 */
50 void
51 pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
52 {
53 if ((func != NULL) && (client_connections != NULL)) {
54 g_hash_table_foreach(client_connections, func, user_data);
55 }
56 }
57
58 pcmk__client_t *
59 pcmk__find_client(const qb_ipcs_connection_t *c)
60 {
61 if (client_connections) {
62 return g_hash_table_lookup(client_connections, c);
63 }
64
65 pcmk__trace("No client found for %p", c);
66 return NULL;
67 }
68
69 pcmk__client_t *
70 pcmk__find_client_by_id(const char *id)
71 {
72 if ((client_connections != NULL) && (id != NULL)) {
73 gpointer key;
74 pcmk__client_t *client = NULL;
75 GHashTableIter iter;
76
77 g_hash_table_iter_init(&iter, client_connections);
78 while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
79 if (strcmp(client->id, id) == 0) {
80 return client;
81 }
82 }
83 }
84 pcmk__trace("No client found with id='%s'", pcmk__s(id, ""));
85 return NULL;
86 }
87
88 /*!
89 * \internal
90 * \brief Get a client identifier for use in log messages
91 *
92 * \param[in] c Client
93 *
94 * \return Client's name, client's ID, or a string literal, as available
95 * \note This is intended to be used in format strings like "client %s".
96 */
97 const char *
98 pcmk__client_name(const pcmk__client_t *c)
99 {
100 if (c == NULL) {
101 return "(unspecified)";
102
103 } else if (c->name != NULL) {
104 return c->name;
105
106 } else if (c->id != NULL) {
107 return c->id;
108
109 } else {
110 return "(unidentified)";
111 }
112 }
113
114 void
115 pcmk__client_cleanup(void)
116 {
|
(1) Event path: |
Condition "client_connections != NULL", taking true branch. |
117 if (client_connections != NULL) {
118 int active = g_hash_table_size(client_connections);
119
|
(2) Event path: |
Condition "active > 0", taking true branch. |
120 if (active > 0) {
|
(3) Event path: |
Condition "active == 1", taking true branch. |
121 pcmk__warn("Exiting with %d active IPC client%s", active,
122 pcmk__plural_s(active));
123 }
|
CID (unavailable; MK=7738de940091a0758ed0ae977b621ce2) (#1 of 1): Inconsistent C union access (INCONSISTENT_UNION_ACCESS): |
|
(4) Event assign_union_field: |
The union field "in" of "_pp" is written. |
|
(5) Event inconsistent_union_field_access: |
In "_pp.out", the union field used: "out" is inconsistent with the field most recently stored: "in". |
124 g_clear_pointer(&client_connections, g_hash_table_destroy);
125 }
126 }
127
128 void
129 pcmk__drop_all_clients(qb_ipcs_service_t *service)
130 {
131 qb_ipcs_connection_t *c = NULL;
132
133 if (service == NULL) {
134 return;
135 }
136
137 c = qb_ipcs_connection_first_get(service);
138
139 while (c != NULL) {
140 qb_ipcs_connection_t *last = c;
141
142 c = qb_ipcs_connection_next_get(service, last);
143
144 /* There really shouldn't be anyone connected at this point */
145 pcmk__notice("Disconnecting client %p, pid=%d...", last,
146 pcmk__client_pid(last));
147 qb_ipcs_disconnect(last);
148 qb_ipcs_connection_unref(last);
149 }
150 }
151
152 /*!
153 * \internal
154 * \brief Allocate a new pcmk__client_t object based on an IPC connection
155 *
156 * \param[in] c IPC connection (NULL to allocate generic client)
157 * \param[in] key Connection table key (NULL to use sane default)
158 * \param[in] uid_client UID corresponding to c (ignored if c is NULL)
159 *
160 * \return Pointer to new pcmk__client_t (guaranteed not to be \c NULL)
161 */
162 static pcmk__client_t *
163 client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client)
164 {
165 pcmk__client_t *client = pcmk__assert_alloc(1, sizeof(pcmk__client_t));
166
167 if (c) {
168 client->user = pcmk__uid2username(uid_client);
169 if (client->user == NULL) {
170 client->user = pcmk__str_copy("#unprivileged");
171 pcmk__err("Unable to enforce ACLs for user ID %d, assuming "
172 "unprivileged",
173 uid_client);
174 }
175 client->ipcs = c;
176 pcmk__set_client_flags(client, pcmk__client_ipc);
177 client->pid = pcmk__client_pid(c);
178 if (key == NULL) {
179 key = c;
180 }
181 }
182
183 client->id = pcmk__generate_uuid();
184 if (key == NULL) {
185 key = client->id;
186 }
187 if (client_connections == NULL) {
188 pcmk__trace("Creating IPC client table");
189 client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
190 }
191 g_hash_table_insert(client_connections, key, client);
192 return client;
193 }
194
195 /*!
196 * \brief Allocate a new pcmk__client_t object and generate its ID
197 *
198 * \param[in] key What to use as connections hash table key (NULL to use ID)
199 *
200 * \return Pointer to new pcmk__client_t (asserts on failure)
201 */
202 pcmk__client_t *
203 pcmk__new_unauth_client(void *key)
204 {
205 return client_from_connection(NULL, key, 0);
206 }
207
208 pcmk__client_t *
209 pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
210 {
211 gid_t uid_cluster = 0;
212 gid_t gid_cluster = 0;
213
214 pcmk__client_t *client = NULL;
215
216 CRM_CHECK(c != NULL, return NULL);
217
218 if (pcmk__daemon_user(&uid_cluster, &gid_cluster) != pcmk_rc_ok) {
219 static bool need_log = true;
220
221 if (need_log) {
222 pcmk__warn("Could not find user and group IDs for user "
223 CRM_DAEMON_USER);
224 need_log = false;
225 }
226 }
227
228 if (uid_client != 0) {
229 pcmk__trace("Giving group %u access to new IPC connection",
230 gid_cluster);
231 /* Passing -1 to chown(2) means don't change */
232 qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
233 }
234
235 /* TODO: Do our own auth checking, return NULL if unauthorized */
236 client = client_from_connection(c, NULL, uid_client);
237
238 if ((uid_client == 0) || (uid_client == uid_cluster)) {
239 /* Remember when a connection came from root or hacluster */
240 pcmk__set_client_flags(client, pcmk__client_privileged);
241 }
242
243 pcmk__debug("New IPC client %s for PID %u with uid %d and gid %d",
244 client->id, client->pid, uid_client, gid_client);
245 return client;
246 }
247
248 static struct iovec *
249 pcmk__new_ipc_event(void)
250 {
251 return (struct iovec *) pcmk__assert_alloc(2, sizeof(struct iovec));
252 }
253
254 /*!
255 * \brief Free an I/O vector created by pcmk__ipc_prepare_iov()
256 *
257 * \param[in,out] event I/O vector to free
258 */
259 void
260 pcmk_free_ipc_event(struct iovec *event)
261 {
262 if (event != NULL) {
263 free(event[0].iov_base);
264 free(event[1].iov_base);
265 free(event);
266 }
267 }
268
269 static void
270 free_event(gpointer data)
271 {
272 pcmk_free_ipc_event((struct iovec *) data);
273 }
274
275 static void
276 add_event(pcmk__client_t *c, struct iovec *iov)
277 {
278 if (c->event_queue == NULL) {
279 c->event_queue = g_queue_new();
280 }
281 g_queue_push_tail(c->event_queue, iov);
282 }
283
284 void
285 pcmk__free_client(pcmk__client_t *c)
286 {
287 if (c == NULL) {
288 return;
289 }
290
291 if (client_connections) {
292 if (c->ipcs) {
293 pcmk__trace("Destroying %p/%p (%u remaining)", c, c->ipcs,
294 (g_hash_table_size(client_connections) - 1));
295 g_hash_table_remove(client_connections, c->ipcs);
296
297 } else {
298 pcmk__trace("Destroying remote connection %p (%u remaining)", c,
299 (g_hash_table_size(client_connections) - 1));
300 g_hash_table_remove(client_connections, c->id);
301 }
302 }
303
304 if (c->event_timer) {
305 g_source_remove(c->event_timer);
306 }
307
308 if (c->event_queue) {
309 pcmk__debug("Destroying %d events", g_queue_get_length(c->event_queue));
310 g_queue_free_full(c->event_queue, free_event);
311 }
312
313 free(c->id);
314 free(c->name);
315 free(c->user);
316
317 if (c->buffer != NULL) {
318 g_byte_array_free(c->buffer, TRUE);
319 c->buffer = NULL;
320 }
321
322 if (c->remote) {
323 if (c->remote->auth_timeout) {
324 g_source_remove(c->remote->auth_timeout);
325 }
326 if (c->remote->tls_session != NULL) {
327 /* @TODO Reduce duplication at callers. Put here everything
328 * necessary to tear down and free tls_session.
329 */
330 gnutls_deinit(c->remote->tls_session);
331 }
332 free(c->remote->buffer);
333 free(c->remote);
334 }
335 free(c);
336 }
337
338 /*!
339 * \internal
340 * \brief Raise IPC eviction threshold for a client, if allowed
341 *
342 * \param[in,out] client Client to modify
343 * \param[in] qmax New threshold
344 */
345 void
346 pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
347 {
348 int rc = pcmk_rc_ok;
349 long long qmax_ll = 0LL;
350 unsigned int orig_value = 0U;
351
352 CRM_CHECK(client != NULL, return);
353
354 orig_value = client->queue_max;
355
356 if (pcmk__is_set(client->flags, pcmk__client_privileged)) {
357 rc = pcmk__scan_ll(qmax, &qmax_ll, 0LL);
358 if (rc == pcmk_rc_ok) {
359 if ((qmax_ll <= 0LL) || (qmax_ll > UINT_MAX)) {
360 rc = ERANGE;
361 } else {
362 client->queue_max = (unsigned int) qmax_ll;
363 }
364 }
365 } else {
366 rc = EACCES;
367 }
368
369 if (rc != pcmk_rc_ok) {
370 pcmk__info("Could not set IPC threshold for client %s[%u] to %s: %s",
371 pcmk__client_name(client), client->pid,
372 pcmk__s(qmax, "default"), pcmk_rc_str(rc));
373
374 } else if (client->queue_max != orig_value) {
375 pcmk__debug("IPC threshold for client %s[%u] is now %u (was %u)",
376 pcmk__client_name(client), client->pid, client->queue_max,
377 orig_value);
378 }
379 }
380
381 int
382 pcmk__client_pid(qb_ipcs_connection_t *c)
383 {
384 struct qb_ipcs_connection_stats stats;
385
386 stats.client_pid = 0;
387 qb_ipcs_connection_stats_get(c, &stats, 0);
388 return stats.client_pid;
389 }
390
391 /*!
392 * \internal
393 * \brief Retrieve message XML from data read from client IPC
394 *
395 * \param[in,out] c IPC client connection
396 * \param[out] id Where to store message ID from libqb header
397 * \param[out] flags Where to store flags from libqb header
398 *
399 * \return Message XML on success, NULL otherwise
400 */
401 xmlNode *
402 pcmk__client_data2xml(pcmk__client_t *c, uint32_t *id, uint32_t *flags)
403 {
404 xmlNode *xml = NULL;
405 pcmk__ipc_header_t *header = (void *) c->buffer->data;
406 char *text = (char *) header + sizeof(pcmk__ipc_header_t);
407
408 if (!pcmk__valid_ipc_header(header)) {
409 return NULL;
410 }
411
412 if (id) {
413 *id = header->qb.id;
414 }
415
416 if (flags) {
417 *flags = header->flags;
418 }
419
420 if (pcmk__is_set(header->flags, crm_ipc_proxied)) {
421 /* Mark this client as being the endpoint of a proxy connection.
422 * Proxy connections responses are sent on the event channel, to avoid
423 * blocking the controller serving as proxy.
424 */
425 pcmk__set_client_flags(c, pcmk__client_proxied);
426 }
427
428 pcmk__assert(text[header->size - 1] == 0);
429
430 xml = pcmk__xml_parse(text);
431 pcmk__log_xml_trace(xml, "[IPC received]");
432 return xml;
433 }
434
435 static int crm_ipcs_flush_events(pcmk__client_t *c);
436
437 static gboolean
438 crm_ipcs_flush_events_cb(gpointer data)
439 {
440 pcmk__client_t *c = data;
441
442 c->event_timer = 0;
443 crm_ipcs_flush_events(c);
444 return FALSE;
445 }
446
447 /*!
448 * \internal
449 * \brief Add progressive delay before next event queue flush
450 *
451 * \param[in,out] c Client connection to add delay to
452 * \param[in] queue_len Current event queue length
453 */
454 static inline void
455 delay_next_flush(pcmk__client_t *c, unsigned int queue_len)
456 {
457 /* Delay a maximum of 1.5 seconds */
458 guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
459
460 c->event_timer = pcmk__create_timer(delay, crm_ipcs_flush_events_cb, c);
461 }
462
463 /*!
464 * \internal
465 * \brief Send client any messages in its queue
466 *
467 * \param[in,out] c Client to flush
468 *
469 * \return Standard Pacemaker return value
470 */
471 static int
472 crm_ipcs_flush_events(pcmk__client_t *c)
473 {
474 int rc = pcmk_rc_ok;
475 ssize_t qb_rc = 0;
476 unsigned int sent = 0;
477 unsigned int queue_len = 0;
478
479 if (c == NULL) {
480 return rc;
481 }
482
483 if (c->event_timer != 0) {
484 /* There is already a timer, wait until it goes off */
485 pcmk__trace("Timer active for %p - %d", c->ipcs, c->event_timer);
486 return rc;
487 }
488
489 if (c->event_queue != NULL) {
490 queue_len = g_queue_get_length(c->event_queue);
491 }
492
493 while (sent < 100) {
494 pcmk__ipc_header_t *header = NULL;
495 struct iovec *event = NULL;
496
497 if ((c->event_queue == NULL) || g_queue_is_empty(c->event_queue)) {
498 break;
499 }
500
501 // We don't pop unless send is successful
502 event = g_queue_peek_head(c->event_queue);
503
504 /* Retry sending the event up to five times. If we get -EAGAIN, sleep
505 * a very short amount of time (too long here is bad) and try again.
506 * If we simply exit the while loop on -EAGAIN, we'll have to wait until
507 * the timer fires off again (up to 1.5 seconds - see delay_next_flush)
508 * to retry sending the message.
509 *
510 * In that case, the queue may just continue to grow faster than we are
511 * processing it, eventually leading to daemons timing out waiting for
512 * replies, which will cause wider failures.
513 */
514 for (unsigned int retries = 5; retries > 0; retries--) {
515 qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
516
517 if (qb_rc >= 0) {
518 break;
519 }
520
521 if (retries == 1 || qb_rc != -EAGAIN) {
522 rc = (int) -qb_rc;
523 goto no_more_retries;
524 }
525
526 pcmk__sleep_ms(5);
527 }
528
529 event = g_queue_pop_head(c->event_queue);
530
531 sent++;
532 header = event[0].iov_base;
533
534 pcmk__trace("Event %" PRId32 " to %p[%u] (%zd bytes) sent: %.120s",
535 header->qb.id, c->ipcs, c->pid, qb_rc,
536 (char *) (event[1].iov_base));
537 pcmk_free_ipc_event(event);
538 }
539
540 no_more_retries:
541 queue_len -= sent;
542 if (sent > 0 || queue_len) {
543 pcmk__trace("Sent %u events (%u remaining) for %p[%d]: %s (%zd)", sent,
544 queue_len, c->ipcs, c->pid, pcmk_rc_str(rc), qb_rc);
545 }
546
547 if (queue_len == 0) {
548 /* Event queue is empty, there is no backlog */
549 c->queue_backlog = 0;
550 return rc;
551 }
552
553 /* Allow clients to briefly fall behind on processing incoming messages,
554 * but drop completely unresponsive clients so the connection doesn't
555 * consume resources indefinitely.
556 */
557 if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
558 /* Don't evict:
559 * - Clients with a new backlog.
560 * - Clients with a shrinking backlog (the client is processing
561 * messages faster than the server is sending them).
562 * - Clients that are pacemaker daemons and have had any messages sent
563 * to them in this flush call (the server is sending messages faster
564 * than the client is processing them, but the client is not dead).
565 */
566 if ((c->queue_backlog <= 1)
567 || (queue_len < c->queue_backlog)
568 || ((sent > 0) && (pcmk__parse_server(c->name) != pcmk_ipc_unknown))) {
569 pcmk__warn("Client with process ID %u has a backlog of %u messages "
570 QB_XS " %p", c->pid, queue_len, c->ipcs);
571
572 } else {
573 pcmk__err("Evicting client with process ID %u due to backlog of %u "
574 "messages " QB_XS " %p",
575 c->pid, queue_len, c->ipcs);
576 c->queue_backlog = 0;
577 qb_ipcs_disconnect(c->ipcs);
578 return rc;
579 }
580 }
581
582 c->queue_backlog = queue_len;
583 delay_next_flush(c, queue_len);
584
585 return rc;
586 }
587
588 /*!
589 * \internal
590 * \brief Create an I/O vector for sending an IPC XML message
591 *
592 * If the message is too large to fit into a single buffer, this function will
593 * prepare an I/O vector that only holds as much as fits. The remainder can be
594 * prepared in a separate call by keeping a running count of the number of times
595 * this function has been called and passing that in for \p index.
596 *
597 * \param[in] request Identifier for libqb response header
598 * \param[in] message Message to send
599 * \param[in] index How many times this function has been called - basically,
600 * a count of how many chunks of \p message have already
601 * been sent
602 * \param[out] result Where to store prepared I/O vector - NULL on error
603 * \param[out] bytes Size of prepared data in bytes (includes header)
604 *
605 * \return Standard Pacemaker return code
606 */
607 int
608 pcmk__ipc_prepare_iov(uint32_t request, const GString *message, uint16_t index,
609 struct iovec **result, ssize_t *bytes)
610 {
611 struct iovec *iov = NULL;
612 unsigned int payload_size = 0;
613 unsigned int total = 0;
614 unsigned int max_send_size = crm_ipc_default_buffer_size();
615 unsigned int max_chunk_size = 0;
616 size_t offset = 0;
617 pcmk__ipc_header_t *header = NULL;
618 int rc = pcmk_rc_ok;
619
620 if ((message == NULL) || (result == NULL)) {
621 rc = EINVAL;
622 goto done;
623 }
624
625 header = calloc(1, sizeof(pcmk__ipc_header_t));
626 if (header == NULL) {
627 rc = ENOMEM;
628 goto done;
629 }
630
631 *result = NULL;
632 iov = pcmk__new_ipc_event();
633 iov[0].iov_len = sizeof(pcmk__ipc_header_t);
634 iov[0].iov_base = header;
635
636 header->version = PCMK__IPC_VERSION;
637
638 /* We are passed an index, which is basically how many times this function
639 * has been called. This is how we support multi-part IPC messages. We
640 * need to convert that into an offset into the buffer that we want to start
641 * reading from.
642 *
643 * Each call to this function can send max_send_size, but this also includes
644 * the header and a null terminator character for the end of the payload.
645 * We need to subtract those out here.
646 */
647 max_chunk_size = max_send_size - iov[0].iov_len - 1;
648 offset = index * max_chunk_size;
649
650 /* How much of message is left to send? This does not include the null
651 * terminator character.
652 */
653 payload_size = message->len - offset;
654
655 /* How much would be transmitted, including the header size and null
656 * terminator character for the buffer?
657 */
658 total = iov[0].iov_len + payload_size + 1;
659
660 if (total >= max_send_size) {
661 /* The entire packet is too big to fit in a single buffer. Calculate
662 * how much of it we can send - buffer size, minus header size, minus
663 * one for the null terminator.
664 */
665 payload_size = max_chunk_size;
666
667 header->size = payload_size + 1;
668
669 iov[1].iov_base = strndup(message->str + offset, payload_size);
670 if (iov[1].iov_base == NULL) {
671 rc = ENOMEM;
672 goto done;
673 }
674
675 iov[1].iov_len = header->size;
676 rc = pcmk_rc_ipc_more;
677
678 } else {
679 /* The entire packet fits in a single buffer. We can copy the entirety
680 * of it into the payload.
681 */
682 header->size = payload_size + 1;
683
684 iov[1].iov_base = pcmk__str_copy(message->str + offset);
685 iov[1].iov_len = header->size;
686 }
687
688 header->part_id = index;
689 header->qb.size = iov[0].iov_len + iov[1].iov_len;
690 header->qb.id = (int32_t)request; /* Replying to a specific request */
691
692 if ((rc == pcmk_rc_ok) && (index != 0)) {
693 pcmk__set_ipc_flags(header->flags, "multipart ipc",
694 crm_ipc_multipart | crm_ipc_multipart_end);
695 } else if (rc == pcmk_rc_ipc_more) {
696 pcmk__set_ipc_flags(header->flags, "multipart ipc",
697 crm_ipc_multipart);
698 }
699
700 *result = iov;
701 pcmk__assert(header->qb.size > 0);
702 if (bytes != NULL) {
703 *bytes = header->qb.size;
704 }
705
706 done:
707 if ((rc != pcmk_rc_ok) && (rc != pcmk_rc_ipc_more)) {
708 pcmk_free_ipc_event(iov);
709 }
710
711 return rc;
712 }
713
714 /* Return the next available ID for a server event.
715 *
716 * For the parts of a multipart event, all parts should have the same ID as
717 * the first part.
718 */
719 static uint32_t
720 id_for_server_event(pcmk__ipc_header_t *header)
721 {
722 static uint32_t id = 1;
723
724 if (pcmk__is_set(header->flags, crm_ipc_multipart)
725 && (header->part_id != 0)) {
726 return id;
727 } else {
728 id++;
729 return id;
730 }
731 }
732
733 int
734 pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
735 {
736 int rc = pcmk_rc_ok;
737 pcmk__ipc_header_t *header = iov[0].iov_base;
738
739 /* _ALL_ replies to proxied connections need to be sent as events */
740 if (pcmk__is_set(c->flags, pcmk__client_proxied)
741 && !pcmk__is_set(flags, crm_ipc_server_event)) {
742 /* The proxied flag lets us know this was originally meant to be a
743 * response, even though we're sending it over the event channel.
744 */
745 pcmk__set_ipc_flags(flags, "server event",
746 crm_ipc_server_event|crm_ipc_proxied_relay_response);
747 }
748
749 pcmk__set_ipc_flags(header->flags, "server event", flags);
750 if (pcmk__is_set(flags, crm_ipc_server_event)) {
751 /* Server events don't use an ID, though we do set one in
752 * pcmk__ipc_prepare_iov if the event is in response to a particular
753 * request. In that case, we don't want to set a new ID here that
754 * overwrites that one.
755 *
756 * @TODO: Since server event IDs aren't used anywhere, do we really
757 * need to set this for any reason other than ease of logging?
758 */
759 if (header->qb.id == 0) {
760 header->qb.id = id_for_server_event(header);
761 }
762
763 if (pcmk__is_set(flags, crm_ipc_server_free)) {
764 pcmk__trace("Sending the original to %p[%d]", c->ipcs, c->pid);
765 add_event(c, iov);
766
767 } else {
768 struct iovec *iov_copy = pcmk__new_ipc_event();
769
770 pcmk__trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
771 iov_copy[0].iov_len = iov[0].iov_len;
772 iov_copy[0].iov_base = malloc(iov[0].iov_len);
773 memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
774
775 iov_copy[1].iov_len = iov[1].iov_len;
776 iov_copy[1].iov_base = malloc(iov[1].iov_len);
777 memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
778
779 add_event(c, iov_copy);
780 }
781
782 rc = crm_ipcs_flush_events(c);
783
784 } else {
785 ssize_t qb_rc;
786 char *part_text = NULL;
787
788 CRM_LOG_ASSERT(header->qb.id != 0); /* Replying to a specific request */
789
790 if (pcmk__is_set(header->flags, crm_ipc_multipart_end)) {
791 part_text = pcmk__assert_asprintf(" (final part %d) ",
792 header->part_id);
793 } else if (pcmk__is_set(header->flags, crm_ipc_multipart)) {
794 if (header->part_id == 0) {
795 part_text = pcmk__assert_asprintf(" (initial part %d) ",
796 header->part_id);
797 } else {
798 part_text = pcmk__assert_asprintf(" (part %d) ",
799 header->part_id);
800 }
801 } else {
802 part_text = pcmk__str_copy(" ");
803 }
804
805 qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
806 if (qb_rc < header->qb.size) {
807 if (qb_rc < 0) {
808 rc = (int) -qb_rc;
809 }
810
811 } else {
812 pcmk__trace("Response %" PRId32 "%ssent, %zd bytes to %p[%u]",
813 header->qb.id, part_text, qb_rc, c->ipcs, c->pid);
814 pcmk__trace("Text = %s", (char *) iov[1].iov_base);
815 }
816
817 free(part_text);
818
819 if (pcmk__is_set(flags, crm_ipc_server_free)) {
820 pcmk_free_ipc_event(iov);
821 }
822
823 crm_ipcs_flush_events(c);
824 }
825
826 if ((rc == EPIPE) || (rc == ENOTCONN)) {
827 pcmk__trace("Client %p disconnected", c->ipcs);
828 }
829
830 return rc;
831 }
832
833 int
834 pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message,
835 uint32_t flags)
836 {
837 struct iovec *iov = NULL;
838 int rc = pcmk_rc_ok;
839 GString *iov_buffer = NULL;
840 uint16_t index = 0;
841 bool event_or_proxied = false;
842
843 if (c == NULL) {
844 return EINVAL;
845 }
846
847 iov_buffer = g_string_sized_new(1024);
848 pcmk__xml_string(message, 0, iov_buffer, 0);
849
850 /* Testing crm_ipc_server_event is obvious. pcmk__client_proxied is less
851 * obvious. According to pcmk__ipc_send_iov, replies to proxied connections
852 * need to be sent as events. However, do_local_notify (which calls this
853 * function) will clear all flags so we can't go just by crm_ipc_server_event.
854 *
855 * Changing do_local_notify to check for a proxied connection first results
856 * in processes on the Pacemaker Remote node (like cibadmin or crm_mon)
857 * timing out when waiting for a reply.
858 */
859 event_or_proxied = pcmk__is_set(flags, crm_ipc_server_event)
860 || pcmk__is_set(c->flags, pcmk__client_proxied);
861
862 do {
863 rc = pcmk__ipc_prepare_iov(request, iov_buffer, index, &iov, NULL);
864
865 switch (rc) {
866 case pcmk_rc_ok:
867 /* No more chunks to send after this one */
868 pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
869 rc = pcmk__ipc_send_iov(c, iov, flags);
870
871 if (event_or_proxied) {
872 if (rc == EAGAIN) {
873 /* Return pcmk_rc_ok instead so callers don't have to know
874 * whether they passed an event or not when interpreting
875 * the return code.
876 */
877 rc = pcmk_rc_ok;
878 }
879 } else {
880 /* EAGAIN is an error for IPC messages. We don't have a
881 * send queue for these, so we need to try again. If there
882 * was some other error, we need to break out of this loop
883 * and report it.
884 *
885 * FIXME: Retry limit for EAGAIN?
886 */
887 if (rc == EAGAIN) {
888 break;
889 }
890 }
891
892 goto done;
893
894 case pcmk_rc_ipc_more:
895 /* There are more chunks to send after this one */
896 pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
897 rc = pcmk__ipc_send_iov(c, iov, flags);
898
899 /* Did an error occur during transmission? */
900 if (event_or_proxied) {
901 /* EAGAIN is not an error for server events. The event
902 * will be queued for transmission and we will attempt
903 * sending it again the next time pcmk__ipc_send_iov is
904 * called, or when the crm_ipcs_flush_events_cb happens.
905 */
906 if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
907 goto done;
908 }
909
910 index++;
911 break;
912
913 } else {
914 /* EAGAIN is an error for IPC messages. We don't have a
915 * send queue for these, so we need to try again. If there
916 * was some other error, we need to break out of this loop
917 * and report it.
918 *
919 * FIXME: Retry limit for EAGAIN?
920 */
921 if (rc == pcmk_rc_ok) {
922 index++;
923 break;
924 } else if (rc == EAGAIN) {
925 break;
926 } else {
927 goto done;
928 }
929 }
930
931 default:
932 /* An error occurred during preparation */
933 goto done;
934 }
935 } while (true);
936
937 done:
938 if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
939 pcmk__notice("IPC message to pid %u failed: %s " QB_XS " rc=%d", c->pid,
940 pcmk_rc_str(rc), rc);
941 }
942
943 g_string_free(iov_buffer, TRUE);
944 return rc;
945 }
946
947 /*!
948 * \internal
949 * \brief Create an acknowledgement with a status code to send to a client
950 *
951 * \param[in] function Calling function
952 * \param[in] line Source file line within calling function
953 * \param[in] flags IPC flags to use when sending
954 * \param[in] ver IPC protocol version (can be NULL)
955 * \param[in] status Exit status code to add to ack
956 *
957 * \return Newly created XML for ack
958 *
959 * \note The caller is responsible for freeing the return value with
960 * \c pcmk__xml_free().
961 */
962 xmlNode *
963 pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags,
964 const char *ver, crm_exit_t status)
965 {
966 xmlNode *ack = NULL;
967
968 if (!pcmk__is_set(flags, crm_ipc_client_response)) {
969 return NULL;
970 }
971
972 ack = pcmk__xe_create(NULL, PCMK__XE_ACK);
973 pcmk__xe_set(ack, PCMK_XA_FUNCTION, function);
974 pcmk__xe_set_int(ack, PCMK__XA_LINE, line);
975 pcmk__xe_set_int(ack, PCMK_XA_STATUS, (int) status);
976 pcmk__xe_set(ack, PCMK__XA_IPC_PROTO_VERSION, ver);
977 return ack;
978 }
979
980 /*!
981 * \internal
982 * \brief Send an acknowledgement with a status code to a client
983 *
984 * \param[in] function Calling function
985 * \param[in] line Source file line within calling function
986 * \param[in] c Client to send ack to
987 * \param[in] request Request ID being replied to
988 * \param[in] flags IPC flags to use when sending
989 * \param[in] ver IPC protocol version (can be NULL)
990 * \param[in] status Status code to send with acknowledgement
991 *
992 * \return Standard Pacemaker return code
993 */
994 int
995 pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c,
996 uint32_t request, uint32_t flags, const char *ver,
997 crm_exit_t status)
998 {
999 int rc = pcmk_rc_ok;
1000 xmlNode *ack = pcmk__ipc_create_ack_as(function, line, flags, ver, status);
1001
1002 if (ack == NULL) {
1003 return pcmk_rc_ok;
1004 }
1005
1006 pcmk__trace("Ack'ing IPC message from client %s as <" PCMK__XE_ACK
1007 " status=%d>",
1008 pcmk__client_name(c), status);
1009 pcmk__log_xml_trace(ack, "sent-ack");
1010 c->request_id = 0;
1011 rc = pcmk__ipc_send_xml(c, request, ack, flags);
1012 pcmk__xml_free(ack);
1013 return rc;
1014 }
1015
1016 /*!
1017 * \internal
1018 * \brief Add an IPC server to the main loop for the CIB manager API
1019 *
1020 * \param[out] ipcs_ro New IPC server for read-only CIB manager API
1021 * \param[out] ipcs_rw New IPC server for read/write CIB manager API
1022 * \param[out] ipcs_shm New IPC server for shared-memory CIB manager API
1023 * \param[in] ro_cb IPC callbacks for read-only API
1024 * \param[in] rw_cb IPC callbacks for read/write and shared-memory APIs
1025 *
1026 * \note This function exits fatally on error.
1027 * \note There is no actual difference between the three IPC endpoints other
1028 * than their names.
1029 */
1030 void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro,
1031 qb_ipcs_service_t **ipcs_rw,
1032 qb_ipcs_service_t **ipcs_shm,
1033 struct qb_ipcs_service_handlers *ro_cb,
1034 struct qb_ipcs_service_handlers *rw_cb)
1035 {
1036 *ipcs_ro = mainloop_add_ipc_server(PCMK__SERVER_BASED_RO,
1037 QB_IPC_NATIVE, ro_cb);
1038
1039 *ipcs_rw = mainloop_add_ipc_server(PCMK__SERVER_BASED_RW,
1040 QB_IPC_NATIVE, rw_cb);
1041
1042 *ipcs_shm = mainloop_add_ipc_server(PCMK__SERVER_BASED_SHM,
1043 QB_IPC_SHM, rw_cb);
1044
1045 if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
1046 pcmk__crit("Failed to create %s IPC server; shutting down",
1047 pcmk__server_log_name(pcmk_ipc_based));
1048 pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1049 "enabled");
1050 crm_exit(CRM_EX_FATAL);
1051 }
1052 }
1053
1054 /*!
1055 * \internal
1056 * \brief Destroy IPC servers for the CIB manager API
1057 *
1058 * \param[out] ipcs_ro IPC server for read-only the CIB manager API
1059 * \param[out] ipcs_rw IPC server for read/write the CIB manager API
1060 * \param[out] ipcs_shm IPC server for shared-memory the CIB manager API
1061 *
1062 * \note This is a convenience function for calling qb_ipcs_destroy() for each
1063 * argument.
1064 */
1065 void
1066 pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro,
1067 qb_ipcs_service_t *ipcs_rw,
1068 qb_ipcs_service_t *ipcs_shm)
1069 {
1070 qb_ipcs_destroy(ipcs_ro);
1071 qb_ipcs_destroy(ipcs_rw);
1072 qb_ipcs_destroy(ipcs_shm);
1073 }
1074
1075 /*!
1076 * \internal
1077 * \brief Add an IPC server to the main loop for the controller API
1078 *
1079 * \param[in] cb IPC callbacks
1080 *
1081 * \return Newly created IPC server
1082 */
1083 qb_ipcs_service_t *
1084 pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
1085 {
1086 return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb);
1087 }
1088
1089 /*!
1090 * \internal
1091 * \brief Add an IPC server to the main loop for the attribute manager API
1092 *
1093 * \param[out] ipcs Where to store newly created IPC server
1094 * \param[in] cb IPC callbacks
1095 *
1096 * \note This function exits fatally on error.
1097 */
1098 void
1099 pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs,
1100 struct qb_ipcs_service_handlers *cb)
1101 {
1102 *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_attrd),
1103 QB_IPC_NATIVE, cb);
1104
1105 if (*ipcs == NULL) {
1106 pcmk__crit("Failed to create %s IPC server; shutting down",
1107 pcmk__server_log_name(pcmk_ipc_attrd));
1108 pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1109 "enabled");
1110 crm_exit(CRM_EX_FATAL);
1111 }
1112 }
1113
1114 /*!
1115 * \internal
1116 * \brief Add an IPC server to the main loop for the executor API
1117 *
1118 * \param[out] ipcs Where to store newly created IPC server
1119 * \param[in] cb IPC callbacks
1120 *
1121 * \note This function exits fatally on error.
1122 */
1123 void
1124 pcmk__serve_execd_ipc(qb_ipcs_service_t **ipcs,
1125 struct qb_ipcs_service_handlers *cb)
1126 {
1127 *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_execd),
1128 QB_IPC_SHM, cb);
1129
1130 if (*ipcs == NULL) {
1131 pcmk__crit("Failed to create %s IPC server; shutting down",
1132 pcmk__server_log_name(pcmk_ipc_execd));
1133 crm_exit(CRM_EX_FATAL);
1134 }
1135 }
1136
1137 /*!
1138 * \internal
1139 * \brief Add an IPC server to the main loop for the fencer API
1140 *
1141 * \param[out] ipcs Where to store newly created IPC server
1142 * \param[in] cb IPC callbacks
1143 *
1144 * \note This function exits fatally on error.
1145 */
1146 void
1147 pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs,
1148 struct qb_ipcs_service_handlers *cb)
1149 {
1150 *ipcs = mainloop_add_ipc_server_with_prio(pcmk__server_ipc_name(pcmk_ipc_fenced),
1151 QB_IPC_NATIVE, cb, QB_LOOP_HIGH);
1152
1153 if (*ipcs == NULL) {
1154 pcmk__crit("Failed to create %s IPC server; shutting down",
1155 pcmk__server_log_name(pcmk_ipc_fenced));
1156 pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1157 "enabled");
1158 crm_exit(CRM_EX_FATAL);
1159 }
1160 }
1161
1162 /*!
1163 * \internal
1164 * \brief Add an IPC server to the main loop for the pacemakerd API
1165 *
1166 * \param[out] ipcs Where to store newly created IPC server
1167 * \param[in] cb IPC callbacks
1168 *
1169 * \note This function exits with CRM_EX_OSERR on error.
1170 */
1171 void
1172 pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs,
1173 struct qb_ipcs_service_handlers *cb)
1174 {
1175 *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_pacemakerd),
1176 QB_IPC_NATIVE, cb);
1177
1178 if (*ipcs == NULL) {
1179 pcmk__crit("Failed to create %s IPC server; shutting down",
1180 pcmk__server_log_name(pcmk_ipc_pacemakerd));
1181 pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1182 "enabled");
1183 /* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL
1184 * if we want to prevent pacemakerd from restarting them.
1185 * With pacemakerd we leave the exit-code shown to e.g. systemd
1186 * to what it was prior to moving the code here from pacemakerd.c
1187 */
1188 crm_exit(CRM_EX_OSERR);
1189 }
1190 }
1191
1192 /*!
1193 * \internal
1194 * \brief Add an IPC server to the main loop for the scheduler API
1195 *
1196 * \param[out] ipcs Where to store newly created IPC server
1197 * \param[in] cb IPC callbacks
1198 *
1199 * \return Newly created IPC server
1200 * \note This function exits fatally on error.
1201 */
1202 void
1203 pcmk__serve_schedulerd_ipc(qb_ipcs_service_t **ipcs,
1204 struct qb_ipcs_service_handlers *cb)
1205 {
1206 *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_schedulerd),
1207 QB_IPC_NATIVE, cb);
1208
1209 if (*ipcs == NULL) {
1210 pcmk__crit("Failed to create %s IPC server; shutting down",
1211 pcmk__server_log_name(pcmk_ipc_schedulerd));
1212 crm_exit(CRM_EX_FATAL);
1213 }
1214 }
1215