1    	/*
2    	 * Copyright 2004-2026 the Pacemaker project contributors
3    	 *
4    	 * The version control history for this file may have further details.
5    	 *
6    	 * This source code is licensed under the GNU Lesser General Public License
7    	 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8    	 */
9    	
10   	#include <crm_internal.h>
11   	
12   	#include <stdbool.h>
13   	#include <stdio.h>
14   	#include <errno.h>
15   	#include <bzlib.h>
16   	#include <sys/stat.h>
17   	#include <sys/types.h>
18   	
19   	#include <crm/crm.h>
20   	#include <crm/common/xml.h>
21   	#include <crm/common/ipc.h>
22   	#include "crmcommon_private.h"
23   	
24   	/* Evict clients whose event queue grows this large (by default) */
25   	#define PCMK_IPC_DEFAULT_QUEUE_MAX 500
26   	
27   	static GHashTable *client_connections = NULL;
28   	
29   	/*!
30   	 * \internal
31   	 * \brief Count IPC clients
32   	 *
33   	 * \return Number of active IPC client connections
34   	 */
35   	guint
36   	pcmk__ipc_client_count(void)
37   	{
38   	    return client_connections? g_hash_table_size(client_connections) : 0;
39   	}
40   	
41   	/*!
42   	 * \internal
43   	 * \brief Execute a function for each active IPC client connection
44   	 *
45   	 * \param[in]     func       Function to call
46   	 * \param[in,out] user_data  Pointer to pass to function
47   	 *
48   	 * \note The parameters are the same as for g_hash_table_foreach().
49   	 */
50   	void
51   	pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
52   	{
53   	    pcmk__assert(func != NULL);
54   	
55   	    if (client_connections != NULL) {
56   	        g_hash_table_foreach(client_connections, func, user_data);
57   	    }
58   	}
59   	
60   	pcmk__client_t *
61   	pcmk__find_client(const qb_ipcs_connection_t *c)
62   	{
63   	    if (client_connections) {
64   	        return g_hash_table_lookup(client_connections, c);
65   	    }
66   	
67   	    pcmk__trace("No client found for %p", c);
68   	    return NULL;
69   	}
70   	
71   	pcmk__client_t *
72   	pcmk__find_client_by_id(const char *id)
73   	{
74   	    if ((client_connections != NULL) && (id != NULL)) {
75   	        gpointer key;
76   	        pcmk__client_t *client = NULL;
77   	        GHashTableIter iter;
78   	
79   	        g_hash_table_iter_init(&iter, client_connections);
80   	        while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
81   	            if (strcmp(client->id, id) == 0) {
82   	                return client;
83   	            }
84   	        }
85   	    }
86   	    pcmk__trace("No client found with id='%s'", pcmk__s(id, ""));
87   	    return NULL;
88   	}
89   	
90   	/*!
91   	 * \internal
92   	 * \brief Get a client identifier for use in log messages
93   	 *
94   	 * \param[in] c  Client
95   	 *
96   	 * \return Client's name, client's ID, or a string literal, as available
97   	 * \note This is intended to be used in format strings like "client %s".
98   	 */
99   	const char *
100  	pcmk__client_name(const pcmk__client_t *c)
101  	{
102  	    if (c == NULL) {
103  	        return "(unspecified)";
104  	
105  	    } else if (c->name != NULL) {
106  	        return c->name;
107  	
108  	    } else if (c->id != NULL) {
109  	        return c->id;
110  	
111  	    } else {
112  	        return "(unidentified)";
113  	    }
114  	}
115  	
116  	void
117  	pcmk__client_cleanup(void)
118  	{
(1) Event path: Condition "client_connections != NULL", taking true branch.
119  	    if (client_connections != NULL) {
120  	        int active = g_hash_table_size(client_connections);
121  	
(2) Event path: Condition "active > 0", taking true branch.
122  	        if (active > 0) {
(3) Event path: Condition "active == 1", taking true branch.
123  	            pcmk__warn("Exiting with %d active IPC client%s", active,
124  	                       pcmk__plural_s(active));
125  	        }
CID (unavailable; MK=7738de940091a0758ed0ae977b621ce2) (#1 of 1): Inconsistent C union access (INCONSISTENT_UNION_ACCESS):
(4) Event assign_union_field: The union field "in" of "_pp" is written.
(5) Event inconsistent_union_field_access: In "_pp.out", the union field used: "out" is inconsistent with the field most recently stored: "in".
126  	        g_clear_pointer(&client_connections, g_hash_table_destroy);
127  	    }
128  	}
129  	
130  	void
131  	pcmk__drop_all_clients(qb_ipcs_service_t *service)
132  	{
133  	    qb_ipcs_connection_t *c = NULL;
134  	
135  	    if (service == NULL) {
136  	        return;
137  	    }
138  	
139  	    c = qb_ipcs_connection_first_get(service);
140  	
141  	    while (c != NULL) {
142  	        qb_ipcs_connection_t *last = c;
143  	
144  	        c = qb_ipcs_connection_next_get(service, last);
145  	
146  	        /* There really shouldn't be anyone connected at this point */
147  	        pcmk__notice("Disconnecting client %p, pid=%d...", last,
148  	                     pcmk__client_pid(last));
149  	        qb_ipcs_disconnect(last);
150  	        qb_ipcs_connection_unref(last);
151  	    }
152  	}
153  	
154  	/*!
155  	 * \internal
156  	 * \brief Allocate a new pcmk__client_t object based on an IPC connection
157  	 *
158  	 * \param[in] c           IPC connection (NULL to allocate generic client)
159  	 * \param[in] key         Connection table key (NULL to use sane default)
160  	 * \param[in] uid_client  UID corresponding to c (ignored if c is NULL)
161  	 *
162  	 * \return Pointer to new pcmk__client_t (guaranteed not to be \c NULL)
163  	 */
164  	static pcmk__client_t *
165  	client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client)
166  	{
167  	    pcmk__client_t *client = pcmk__assert_alloc(1, sizeof(pcmk__client_t));
168  	
169  	    if (c) {
170  	        client->user = pcmk__uid2username(uid_client);
171  	        if (client->user == NULL) {
172  	            client->user = pcmk__str_copy("#unprivileged");
173  	            pcmk__err("Unable to enforce ACLs for user ID %d, assuming "
174  	                      "unprivileged",
175  	                      uid_client);
176  	        }
177  	        client->ipcs = c;
178  	        pcmk__set_client_flags(client, pcmk__client_ipc);
179  	        client->pid = pcmk__client_pid(c);
180  	        if (key == NULL) {
181  	            key = c;
182  	        }
183  	    }
184  	
185  	    client->id = pcmk__generate_uuid();
186  	    if (key == NULL) {
187  	        key = client->id;
188  	    }
189  	    if (client_connections == NULL) {
190  	        pcmk__trace("Creating IPC client table");
191  	        client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
192  	    }
193  	    g_hash_table_insert(client_connections, key, client);
194  	    return client;
195  	}
196  	
197  	/*!
198  	 * \brief Allocate a new pcmk__client_t object and generate its ID
199  	 *
200  	 * \param[in] key  What to use as connections hash table key (NULL to use ID)
201  	 *
202  	 * \return Pointer to new pcmk__client_t (asserts on failure)
203  	 */
204  	pcmk__client_t *
205  	pcmk__new_unauth_client(void *key)
206  	{
207  	    return client_from_connection(NULL, key, 0);
208  	}
209  	
210  	pcmk__client_t *
211  	pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
212  	{
213  	    gid_t uid_cluster = 0;
214  	    gid_t gid_cluster = 0;
215  	
216  	    pcmk__client_t *client = NULL;
217  	
218  	    CRM_CHECK(c != NULL, return NULL);
219  	
220  	    if (pcmk__daemon_user(&uid_cluster, &gid_cluster) != pcmk_rc_ok) {
221  	        static bool need_log = true;
222  	
223  	        if (need_log) {
224  	            pcmk__warn("Could not find user and group IDs for user "
225  	                       CRM_DAEMON_USER);
226  	            need_log = false;
227  	        }
228  	    }
229  	
230  	    if (uid_client != 0) {
231  	        pcmk__trace("Giving group %u access to new IPC connection",
232  	                    gid_cluster);
233  	        /* Passing -1 to chown(2) means don't change */
234  	        qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
235  	    }
236  	
237  	    /* TODO: Do our own auth checking, return NULL if unauthorized */
238  	    client = client_from_connection(c, NULL, uid_client);
239  	
240  	    if ((uid_client == 0) || (uid_client == uid_cluster)) {
241  	        /* Remember when a connection came from root or hacluster */
242  	        pcmk__set_client_flags(client, pcmk__client_privileged);
243  	    }
244  	
245  	    pcmk__debug("New IPC client %s for PID %u with uid %d and gid %d",
246  	                client->id, client->pid, uid_client, gid_client);
247  	    return client;
248  	}
249  	
250  	static struct iovec *
251  	pcmk__new_ipc_event(void)
252  	{
253  	    return (struct iovec *) pcmk__assert_alloc(2, sizeof(struct iovec));
254  	}
255  	
256  	/*!
257  	 * \brief Free an I/O vector created by pcmk__ipc_prepare_iov()
258  	 *
259  	 * \param[in,out] event  I/O vector to free
260  	 */
261  	void
262  	pcmk_free_ipc_event(struct iovec *event)
263  	{
264  	    if (event != NULL) {
265  	        free(event[0].iov_base);
266  	        free(event[1].iov_base);
267  	        free(event);
268  	    }
269  	}
270  	
271  	static void
272  	free_event(gpointer data)
273  	{
274  	    pcmk_free_ipc_event((struct iovec *) data);
275  	}
276  	
277  	static void
278  	add_event(pcmk__client_t *c, struct iovec *iov)
279  	{
280  	    if (c->event_queue == NULL) {
281  	        c->event_queue = g_queue_new();
282  	    }
283  	    g_queue_push_tail(c->event_queue, iov);
284  	}
285  	
286  	void
287  	pcmk__free_client(pcmk__client_t *c)
288  	{
289  	    if (c == NULL) {
290  	        return;
291  	    }
292  	
293  	    if (client_connections) {
294  	        if (c->ipcs) {
295  	            pcmk__trace("Destroying %p/%p (%u remaining)", c, c->ipcs,
296  	                        (g_hash_table_size(client_connections) - 1));
297  	            g_hash_table_remove(client_connections, c->ipcs);
298  	
299  	        } else {
300  	            pcmk__trace("Destroying remote connection %p (%u remaining)", c,
301  	                        (g_hash_table_size(client_connections) - 1));
302  	            g_hash_table_remove(client_connections, c->id);
303  	        }
304  	    }
305  	
306  	    if (c->event_timer) {
307  	        g_source_remove(c->event_timer);
308  	    }
309  	
310  	    if (c->event_queue) {
311  	        pcmk__debug("Destroying %d events", g_queue_get_length(c->event_queue));
312  	        g_queue_free_full(c->event_queue, free_event);
313  	    }
314  	
315  	    free(c->id);
316  	    free(c->name);
317  	    free(c->user);
318  	
319  	    if (c->buffer != NULL) {
320  	        g_byte_array_free(c->buffer, TRUE);
321  	        c->buffer = NULL;
322  	    }
323  	
324  	    if (c->remote) {
325  	        if (c->remote->auth_timeout) {
326  	            g_source_remove(c->remote->auth_timeout);
327  	        }
328  	        if (c->remote->tls_session != NULL) {
329  	            /* @TODO Reduce duplication at callers. Put here everything
330  	             * necessary to tear down and free tls_session.
331  	             */
332  	            gnutls_deinit(c->remote->tls_session);
333  	        }
334  	        free(c->remote->buffer);
335  	        free(c->remote);
336  	    }
337  	    free(c);
338  	}
339  	
340  	/*!
341  	 * \internal
342  	 * \brief Raise IPC eviction threshold for a client, if allowed
343  	 *
344  	 * \param[in,out] client     Client to modify
345  	 * \param[in]     qmax       New threshold
346  	 */
347  	void
348  	pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
349  	{
350  	    int rc = pcmk_rc_ok;
351  	    long long qmax_ll = 0LL;
352  	    unsigned int orig_value = 0U;
353  	
354  	    CRM_CHECK(client != NULL, return);
355  	
356  	    orig_value = client->queue_max;
357  	
358  	    if (pcmk__is_set(client->flags, pcmk__client_privileged)) {
359  	        rc = pcmk__scan_ll(qmax, &qmax_ll, 0LL);
360  	        if (rc == pcmk_rc_ok) {
361  	            if ((qmax_ll <= 0LL) || (qmax_ll > UINT_MAX)) {
362  	                rc = ERANGE;
363  	            } else {
364  	                client->queue_max = (unsigned int) qmax_ll;
365  	            }
366  	        }
367  	    } else {
368  	        rc = EACCES;
369  	    }
370  	
371  	    if (rc != pcmk_rc_ok) {
372  	        pcmk__info("Could not set IPC threshold for client %s[%u] to %s: %s",
373  	                   pcmk__client_name(client), client->pid,
374  	                   pcmk__s(qmax, "default"), pcmk_rc_str(rc));
375  	
376  	    } else if (client->queue_max != orig_value) {
377  	        pcmk__debug("IPC threshold for client %s[%u] is now %u (was %u)",
378  	                    pcmk__client_name(client), client->pid, client->queue_max,
379  	                    orig_value);
380  	    }
381  	}
382  	
383  	int
384  	pcmk__client_pid(qb_ipcs_connection_t *c)
385  	{
386  	    struct qb_ipcs_connection_stats stats;
387  	
388  	    stats.client_pid = 0;
389  	    qb_ipcs_connection_stats_get(c, &stats, 0);
390  	    return stats.client_pid;
391  	}
392  	
393  	/*!
394  	 * \internal
395  	 * \brief Retrieve message XML from data read from client IPC
396  	 *
397  	 * \param[in,out]  c       IPC client connection
398  	 * \param[out]     id      Where to store message ID from libqb header
399  	 * \param[out]     flags   Where to store flags from libqb header
400  	 *
401  	 * \return Message XML on success, NULL otherwise
402  	 */
403  	xmlNode *
404  	pcmk__client_data2xml(pcmk__client_t *c, uint32_t *id, uint32_t *flags)
405  	{
406  	    xmlNode *xml = NULL;
407  	    pcmk__ipc_header_t *header = (void *) c->buffer->data;
408  	    char *text = (char *) header + sizeof(pcmk__ipc_header_t);
409  	
410  	    if (!pcmk__valid_ipc_header(header)) {
411  	        return NULL;
412  	    }
413  	
414  	    if (id) {
415  	        *id = header->qb.id;
416  	    }
417  	
418  	    if (flags) {
419  	        *flags = header->flags;
420  	    }
421  	
422  	    if (pcmk__is_set(header->flags, crm_ipc_proxied)) {
423  	        /* Mark this client as being the endpoint of a proxy connection.
424  	         * Proxy connections responses are sent on the event channel, to avoid
425  	         * blocking the controller serving as proxy.
426  	         */
427  	        pcmk__set_client_flags(c, pcmk__client_proxied);
428  	    }
429  	
430  	    pcmk__assert(text[header->size - 1] == 0);
431  	
432  	    xml = pcmk__xml_parse(text);
433  	    pcmk__log_xml_trace(xml, "[IPC received]");
434  	    return xml;
435  	}
436  	
437  	static int crm_ipcs_flush_events(pcmk__client_t *c);
438  	
439  	static gboolean
440  	crm_ipcs_flush_events_cb(gpointer data)
441  	{
442  	    pcmk__client_t *c = data;
443  	
444  	    c->event_timer = 0;
445  	    crm_ipcs_flush_events(c);
446  	    return FALSE;
447  	}
448  	
449  	/*!
450  	 * \internal
451  	 * \brief Add progressive delay before next event queue flush
452  	 *
453  	 * \param[in,out] c          Client connection to add delay to
454  	 * \param[in]     queue_len  Current event queue length
455  	 */
456  	static inline void
457  	delay_next_flush(pcmk__client_t *c, unsigned int queue_len)
458  	{
459  	    /* Delay a maximum of 1.5 seconds */
460  	    guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
461  	
462  	    c->event_timer = pcmk__create_timer(delay, crm_ipcs_flush_events_cb, c);
463  	}
464  	
465  	/*!
466  	 * \internal
467  	 * \brief Send client any messages in its queue
468  	 *
469  	 * \param[in,out] c  Client to flush
470  	 *
471  	 * \return Standard Pacemaker return value
472  	 */
473  	static int
474  	crm_ipcs_flush_events(pcmk__client_t *c)
475  	{
476  	    int rc = pcmk_rc_ok;
477  	    ssize_t qb_rc = 0;
478  	    unsigned int sent = 0;
479  	    unsigned int queue_len = 0;
480  	
481  	    if (c == NULL) {
482  	        return rc;
483  	    }
484  	
485  	    if (c->event_timer != 0) {
486  	        /* There is already a timer, wait until it goes off */
487  	        pcmk__trace("Timer active for %p - %d", c->ipcs, c->event_timer);
488  	        return rc;
489  	    }
490  	
491  	    if (c->event_queue != NULL) {
492  	        queue_len = g_queue_get_length(c->event_queue);
493  	    }
494  	
495  	    while (sent < 100) {
496  	        pcmk__ipc_header_t *header = NULL;
497  	        struct iovec *event = NULL;
498  	
499  	        if ((c->event_queue == NULL) || g_queue_is_empty(c->event_queue)) {
500  	            break;
501  	        }
502  	
503  	        // We don't pop unless send is successful
504  	        event = g_queue_peek_head(c->event_queue);
505  	
506  	        /* Retry sending the event up to five times.  If we get -EAGAIN, sleep
507  	         * a very short amount of time (too long here is bad) and try again.
508  	         * If we simply exit the while loop on -EAGAIN, we'll have to wait until
509  	         * the timer fires off again (up to 1.5 seconds - see delay_next_flush)
510  	         * to retry sending the message.
511  	         *
512  	         * In that case, the queue may just continue to grow faster than we are
513  	         * processing it, eventually leading to daemons timing out waiting for
514  	         * replies, which will cause wider failures.
515  	         */
516  	        for (unsigned int retries = 5; retries > 0; retries--) {
517  	            qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
518  	
519  	            if (qb_rc >= 0) {
520  	                break;
521  	            }
522  	
523  	            if (retries == 1 || qb_rc != -EAGAIN) {
524  	                rc = (int) -qb_rc;
525  	                goto no_more_retries;
526  	            }
527  	
528  	            pcmk__sleep_ms(5);
529  	        }
530  	
531  	        event = g_queue_pop_head(c->event_queue);
532  	
533  	        sent++;
534  	        header = event[0].iov_base;
535  	
536  	        pcmk__trace("Event %" PRId32 " to %p[%u] (%zd bytes) sent: %.120s",
537  	                    header->qb.id, c->ipcs, c->pid, qb_rc,
538  	                    (char *) (event[1].iov_base));
539  	        pcmk_free_ipc_event(event);
540  	    }
541  	
542  	no_more_retries:
543  	    queue_len -= sent;
544  	    if (sent > 0 || queue_len) {
545  	        pcmk__trace("Sent %u events (%u remaining) for %p[%d]: %s (%zd)", sent,
546  	                    queue_len, c->ipcs, c->pid, pcmk_rc_str(rc), qb_rc);
547  	    }
548  	
549  	    if (queue_len == 0) {
550  	        /* Event queue is empty, there is no backlog */
551  	        c->queue_backlog = 0;
552  	        return rc;
553  	    }
554  	
555  	    /* Allow clients to briefly fall behind on processing incoming messages,
556  	     * but drop completely unresponsive clients so the connection doesn't
557  	     * consume resources indefinitely.
558  	     */
559  	    if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
560  	        /* Don't evict:
561  	         * - Clients with a new backlog.
562  	         * - Clients with a shrinking backlog (the client is processing
563  	         *   messages faster than the server is sending them).
564  	         * - Clients that are pacemaker daemons and have had any messages sent
565  	         *   to them in this flush call (the server is sending messages faster
566  	         *   than the client is processing them, but the client is not dead).
567  	         */
568  	        if ((c->queue_backlog <= 1)
569  	            || (queue_len < c->queue_backlog)
570  	            || ((sent > 0) && (pcmk__parse_server(c->name) != pcmk_ipc_unknown))) {
571  	            pcmk__warn("Client with process ID %u has a backlog of %u messages "
572  	                       QB_XS " %p", c->pid, queue_len, c->ipcs);
573  	
574  	        } else {
575  	            pcmk__err("Evicting client with process ID %u due to backlog of %u "
576  	                      "messages " QB_XS " %p",
577  	                      c->pid, queue_len, c->ipcs);
578  	            c->queue_backlog = 0;
579  	            qb_ipcs_disconnect(c->ipcs);
580  	            return rc;
581  	        }
582  	    }
583  	
584  	    c->queue_backlog = queue_len;
585  	    delay_next_flush(c, queue_len);
586  	
587  	    return rc;
588  	}
589  	
590  	/*!
591  	 * \internal
592  	 * \brief Create an I/O vector for sending an IPC XML message
593  	 *
594  	 * If the message is too large to fit into a single buffer, this function will
595  	 * prepare an I/O vector that only holds as much as fits.  The remainder can be
596  	 * prepared in a separate call by keeping a running count of the number of times
597  	 * this function has been called and passing that in for \p index.
598  	 *
599  	 * \param[in]  request Identifier for libqb response header
600  	 * \param[in]  message Message to send
601  	 * \param[in]  index   How many times this function has been called - basically,
602  	 *                     a count of how many chunks of \p message have already
603  	 *                     been sent
604  	 * \param[out] result  Where to store prepared I/O vector - NULL on error
605  	 * \param[out] bytes   Size of prepared data in bytes (includes header)
606  	 *
607  	 * \return Standard Pacemaker return code
608  	 */
609  	int
610  	pcmk__ipc_prepare_iov(uint32_t request, const GString *message, uint16_t index,
611  	                      struct iovec **result, ssize_t *bytes)
612  	{
613  	    struct iovec *iov = NULL;
614  	    unsigned int payload_size = 0;
615  	    unsigned int total = 0;
616  	    unsigned int max_send_size = crm_ipc_default_buffer_size();
617  	    unsigned int max_chunk_size = 0;
618  	    size_t offset = 0;
619  	    pcmk__ipc_header_t *header = NULL;
620  	    int rc = pcmk_rc_ok;
621  	
622  	    if ((message == NULL) || (result == NULL)) {
623  	        rc = EINVAL;
624  	        goto done;
625  	    }
626  	
627  	    header = calloc(1, sizeof(pcmk__ipc_header_t));
628  	    if (header == NULL) {
629  	       rc = ENOMEM;
630  	       goto done;
631  	    }
632  	
633  	    *result = NULL;
634  	    iov = pcmk__new_ipc_event();
635  	    iov[0].iov_len = sizeof(pcmk__ipc_header_t);
636  	    iov[0].iov_base = header;
637  	
638  	    header->version = PCMK__IPC_VERSION;
639  	
640  	    /* We are passed an index, which is basically how many times this function
641  	     * has been called.  This is how we support multi-part IPC messages.  We
642  	     * need to convert that into an offset into the buffer that we want to start
643  	     * reading from.
644  	     *
645  	     * Each call to this function can send max_send_size, but this also includes
646  	     * the header and a null terminator character for the end of the payload.
647  	     * We need to subtract those out here.
648  	     */
649  	    max_chunk_size = max_send_size - iov[0].iov_len - 1;
650  	    offset = index * max_chunk_size;
651  	
652  	    /* How much of message is left to send?  This does not include the null
653  	     * terminator character.
654  	     */
655  	    payload_size = message->len - offset;
656  	
657  	    /* How much would be transmitted, including the header size and null
658  	     * terminator character for the buffer?
659  	     */
660  	    total = iov[0].iov_len + payload_size + 1;
661  	
662  	    if (total >= max_send_size) {
663  	        /* The entire packet is too big to fit in a single buffer.  Calculate
664  	         * how much of it we can send - buffer size, minus header size, minus
665  	         * one for the null terminator.
666  	         */
667  	        payload_size = max_chunk_size;
668  	
669  	        header->size = payload_size + 1;
670  	
671  	        iov[1].iov_base = strndup(message->str + offset, payload_size);
672  	        if (iov[1].iov_base == NULL) {
673  	            rc = ENOMEM;
674  	            goto done;
675  	        }
676  	
677  	        iov[1].iov_len = header->size;
678  	        rc = pcmk_rc_ipc_more;
679  	
680  	    } else {
681  	        /* The entire packet fits in a single buffer.  We can copy the entirety
682  	         * of it into the payload.
683  	         */
684  	        header->size = payload_size + 1;
685  	
686  	        iov[1].iov_base = pcmk__str_copy(message->str + offset);
687  	        iov[1].iov_len = header->size;
688  	    }
689  	
690  	    header->part_id = index;
691  	    header->qb.size = iov[0].iov_len + iov[1].iov_len;
692  	    header->qb.id = (int32_t)request;    /* Replying to a specific request */
693  	
694  	    if ((rc == pcmk_rc_ok) && (index != 0)) {
695  	        pcmk__set_ipc_flags(header->flags, "multipart ipc",
696  	                            crm_ipc_multipart | crm_ipc_multipart_end);
697  	    } else if (rc == pcmk_rc_ipc_more) {
698  	        pcmk__set_ipc_flags(header->flags, "multipart ipc",
699  	                            crm_ipc_multipart);
700  	    }
701  	
702  	    *result = iov;
703  	    pcmk__assert(header->qb.size > 0);
704  	    if (bytes != NULL) {
705  	        *bytes = header->qb.size;
706  	    }
707  	
708  	done:
709  	    if ((rc != pcmk_rc_ok) && (rc != pcmk_rc_ipc_more)) {
710  	        pcmk_free_ipc_event(iov);
711  	    }
712  	
713  	    return rc;
714  	}
715  	
716  	/* Return the next available ID for a server event.
717  	 *
718  	 * For the parts of a multipart event, all parts should have the same ID as
719  	 * the first part.
720  	 */
721  	static uint32_t
722  	id_for_server_event(pcmk__ipc_header_t *header)
723  	{
724  	    static uint32_t id = 1;
725  	
726  	    if (pcmk__is_set(header->flags, crm_ipc_multipart)
727  	        && (header->part_id != 0)) {
728  	        return id;
729  	    } else {
730  	        id++;
731  	        return id;
732  	    }
733  	}
734  	
735  	int
736  	pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
737  	{
738  	    int rc = pcmk_rc_ok;
739  	    pcmk__ipc_header_t *header = iov[0].iov_base;
740  	
741  	    /* _ALL_ replies to proxied connections need to be sent as events */
742  	    if (pcmk__is_set(c->flags, pcmk__client_proxied)
743  	        && !pcmk__is_set(flags, crm_ipc_server_event)) {
744  	        /* The proxied flag lets us know this was originally meant to be a
745  	         * response, even though we're sending it over the event channel.
746  	         */
747  	        pcmk__set_ipc_flags(flags, "server event",
748  	                            crm_ipc_server_event|crm_ipc_proxied_relay_response);
749  	    }
750  	
751  	    pcmk__set_ipc_flags(header->flags, "server event", flags);
752  	    if (pcmk__is_set(flags, crm_ipc_server_event)) {
753  	        /* Server events don't use an ID, though we do set one in
754  	         * pcmk__ipc_prepare_iov if the event is in response to a particular
755  	         * request.  In that case, we don't want to set a new ID here that
756  	         * overwrites that one.
757  	         *
758  	         * @TODO: Since server event IDs aren't used anywhere, do we really
759  	         * need to set this for any reason other than ease of logging?
760  	         */
761  	        if (header->qb.id == 0) {
762  	            header->qb.id = id_for_server_event(header);
763  	        }
764  	
765  	        if (pcmk__is_set(flags, crm_ipc_server_free)) {
766  	            pcmk__trace("Sending the original to %p[%d]", c->ipcs, c->pid);
767  	            add_event(c, iov);
768  	
769  	        } else {
770  	            struct iovec *iov_copy = pcmk__new_ipc_event();
771  	
772  	            pcmk__trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
773  	            iov_copy[0].iov_len = iov[0].iov_len;
774  	            iov_copy[0].iov_base = malloc(iov[0].iov_len);
775  	            memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
776  	
777  	            iov_copy[1].iov_len = iov[1].iov_len;
778  	            iov_copy[1].iov_base = malloc(iov[1].iov_len);
779  	            memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
780  	
781  	            add_event(c, iov_copy);
782  	        }
783  	
784  	        rc = crm_ipcs_flush_events(c);
785  	
786  	    } else {
787  	        ssize_t qb_rc;
788  	        char *part_text = NULL;
789  	
790  	        CRM_LOG_ASSERT(header->qb.id != 0);     /* Replying to a specific request */
791  	
792  	        if (pcmk__is_set(header->flags, crm_ipc_multipart_end)) {
793  	            part_text = pcmk__assert_asprintf(" (final part %d) ",
794  	                                              header->part_id);
795  	        } else if (pcmk__is_set(header->flags, crm_ipc_multipart)) {
796  	            if (header->part_id == 0) {
797  	                part_text = pcmk__assert_asprintf(" (initial part %d) ",
798  	                                                  header->part_id);
799  	            } else {
800  	                part_text = pcmk__assert_asprintf(" (part %d) ",
801  	                                                  header->part_id);
802  	            }
803  	        } else {
804  	            part_text = pcmk__str_copy(" ");
805  	        }
806  	
807  	        qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
808  	        if (qb_rc < header->qb.size) {
809  	            if (qb_rc < 0) {
810  	                rc = (int) -qb_rc;
811  	            }
812  	
813  	        } else {
814  	            pcmk__trace("Response %" PRId32 "%ssent, %zd bytes to %p[%u]",
815  	                        header->qb.id, part_text, qb_rc, c->ipcs, c->pid);
816  	            pcmk__trace("Text = %s", (char *) iov[1].iov_base);
817  	        }
818  	
819  	        free(part_text);
820  	
821  	        if (pcmk__is_set(flags, crm_ipc_server_free)) {
822  	            pcmk_free_ipc_event(iov);
823  	        }
824  	
825  	        crm_ipcs_flush_events(c);
826  	    }
827  	
828  	    if ((rc == EPIPE) || (rc == ENOTCONN)) {
829  	        pcmk__trace("Client %p disconnected", c->ipcs);
830  	    }
831  	
832  	    return rc;
833  	}
834  	
835  	int
836  	pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message,
837  	                   uint32_t flags)
838  	{
839  	    struct iovec *iov = NULL;
840  	    int rc = pcmk_rc_ok;
841  	    GString *iov_buffer = NULL;
842  	    uint16_t index = 0;
843  	    bool event_or_proxied = false;
844  	
845  	    if (c == NULL) {
846  	        return EINVAL;
847  	    }
848  	
849  	    iov_buffer = g_string_sized_new(1024);
850  	    pcmk__xml_string(message, 0, iov_buffer, 0);
851  	
852  	    /* Testing crm_ipc_server_event is obvious.  pcmk__client_proxied is less
853  	     * obvious.  According to pcmk__ipc_send_iov, replies to proxied connections
854  	     * need to be sent as events.  However, do_local_notify (which calls this
855  	     * function) will clear all flags so we can't go just by crm_ipc_server_event.
856  	     *
857  	     * Changing do_local_notify to check for a proxied connection first results
858  	     * in processes on the Pacemaker Remote node (like cibadmin or crm_mon)
859  	     * timing out when waiting for a reply.
860  	     */
861  	    event_or_proxied = pcmk__is_set(flags, crm_ipc_server_event)
862  	                       || pcmk__is_set(c->flags, pcmk__client_proxied);
863  	
864  	    do {
865  	        rc = pcmk__ipc_prepare_iov(request, iov_buffer, index, &iov, NULL);
866  	
867  	        switch (rc) {
868  	            case pcmk_rc_ok:
869  	                /* No more chunks to send after this one */
870  	                pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
871  	                rc = pcmk__ipc_send_iov(c, iov, flags);
872  	
873  	                if (event_or_proxied) {
874  	                    if (rc == EAGAIN) {
875  	                        /* Return pcmk_rc_ok instead so callers don't have to know
876  	                         * whether they passed an event or not when interpreting
877  	                         * the return code.
878  	                         */
879  	                        rc = pcmk_rc_ok;
880  	                    }
881  	                } else {
882  	                    /* EAGAIN is an error for IPC messages.  We don't have a
883  	                     * send queue for these, so we need to try again.  If there
884  	                     * was some other error, we need to break out of this loop
885  	                     * and report it.
886  	                     *
887  	                     * FIXME: Retry limit for EAGAIN?
888  	                     */
889  	                    if (rc == EAGAIN) {
890  	                        break;
891  	                    }
892  	                }
893  	
894  	                goto done;
895  	
896  	            case pcmk_rc_ipc_more:
897  	                /* There are more chunks to send after this one */
898  	                pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
899  	                rc = pcmk__ipc_send_iov(c, iov, flags);
900  	
901  	                /* Did an error occur during transmission? */
902  	                if (event_or_proxied) {
903  	                    /* EAGAIN is not an error for server events.  The event
904  	                     * will be queued for transmission and we will attempt
905  	                     * sending it again the next time pcmk__ipc_send_iov is
906  	                     * called, or when the crm_ipcs_flush_events_cb happens.
907  	                     */
908  	                    if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
909  	                        goto done;
910  	                    }
911  	
912  	                    index++;
913  	                    break;
914  	
915  	                } else {
916  	                    /* EAGAIN is an error for IPC messages.  We don't have a
917  	                     * send queue for these, so we need to try again.  If there
918  	                     * was some other error, we need to break out of this loop
919  	                     * and report it.
920  	                     *
921  	                     * FIXME: Retry limit for EAGAIN?
922  	                     */
923  	                    if (rc == pcmk_rc_ok) {
924  	                        index++;
925  	                        break;
926  	                    } else if (rc == EAGAIN) {
927  	                        break;
928  	                    } else {
929  	                        goto done;
930  	                    }
931  	                }
932  	
933  	            default:
934  	                /* An error occurred during preparation */
935  	                goto done;
936  	        }
937  	    } while (true);
938  	
939  	done:
940  	    if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
941  	        pcmk__notice("IPC message to pid %u failed: %s " QB_XS " rc=%d", c->pid,
942  	                     pcmk_rc_str(rc), rc);
943  	    }
944  	
945  	    g_string_free(iov_buffer, TRUE);
946  	    return rc;
947  	}
948  	
949  	/*!
950  	 * \internal
951  	 * \brief Create an acknowledgement with a status code to send to a client
952  	 *
953  	 * \param[in] function  Calling function
954  	 * \param[in] line      Source file line within calling function
955  	 * \param[in] flags     IPC flags to use when sending
956  	 * \param[in] ver       IPC protocol version (can be NULL)
957  	 * \param[in] status    Exit status code to add to ack
958  	 *
959  	 * \return Newly created XML for ack
960  	 *
961  	 * \note The caller is responsible for freeing the return value with
962  	 *       \c pcmk__xml_free().
963  	 */
964  	xmlNode *
965  	pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags,
966  	                        const char *ver, crm_exit_t status)
967  	{
968  	    xmlNode *ack = NULL;
969  	
970  	    if (!pcmk__is_set(flags, crm_ipc_client_response)) {
971  	        return NULL;
972  	    }
973  	
974  	    ack = pcmk__xe_create(NULL, PCMK__XE_ACK);
975  	    pcmk__xe_set(ack, PCMK_XA_FUNCTION, function);
976  	    pcmk__xe_set_int(ack, PCMK__XA_LINE, line);
977  	    pcmk__xe_set_int(ack, PCMK_XA_STATUS, (int) status);
978  	    pcmk__xe_set(ack, PCMK__XA_IPC_PROTO_VERSION, ver);
979  	    return ack;
980  	}
981  	
982  	/*!
983  	 * \internal
984  	 * \brief Send an acknowledgement with a status code to a client
985  	 *
986  	 * \param[in] function  Calling function
987  	 * \param[in] line      Source file line within calling function
988  	 * \param[in] c         Client to send ack to
989  	 * \param[in] request   Request ID being replied to
990  	 * \param[in] flags     IPC flags to use when sending
991  	 * \param[in] ver       IPC protocol version (can be NULL)
992  	 * \param[in] status    Status code to send with acknowledgement
993  	 *
994  	 * \return Standard Pacemaker return code
995  	 */
996  	int
997  	pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c,
998  	                      uint32_t request, uint32_t flags, const char *ver,
999  	                      crm_exit_t status)
1000 	{
1001 	    int rc = pcmk_rc_ok;
1002 	    xmlNode *ack = pcmk__ipc_create_ack_as(function, line, flags, ver, status);
1003 	
1004 	    if (ack == NULL) {
1005 	        return pcmk_rc_ok;
1006 	    }
1007 	
1008 	    pcmk__trace("Ack'ing IPC message from client %s as <" PCMK__XE_ACK
1009 	                " status=%d>",
1010 	                pcmk__client_name(c), status);
1011 	    pcmk__log_xml_trace(ack, "sent-ack");
1012 	    c->request_id = 0;
1013 	    rc = pcmk__ipc_send_xml(c, request, ack, flags);
1014 	    pcmk__xml_free(ack);
1015 	    return rc;
1016 	}
1017 	
1018 	/*!
1019 	 * \internal
1020 	 * \brief Add an IPC server to the main loop for the CIB manager API
1021 	 *
1022 	 * \param[out] ipcs_ro   New IPC server for read-only CIB manager API
1023 	 * \param[out] ipcs_rw   New IPC server for read/write CIB manager API
1024 	 * \param[out] ipcs_shm  New IPC server for shared-memory CIB manager API
1025 	 * \param[in]  ro_cb     IPC callbacks for read-only API
1026 	 * \param[in]  rw_cb     IPC callbacks for read/write and shared-memory APIs
1027 	 *
1028 	 * \note This function exits fatally on error.
1029 	 * \note There is no actual difference between the three IPC endpoints other
1030 	 *       than their names.
1031 	 */
1032 	void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro,
1033 	                           qb_ipcs_service_t **ipcs_rw,
1034 	                           qb_ipcs_service_t **ipcs_shm,
1035 	                           struct qb_ipcs_service_handlers *ro_cb,
1036 	                           struct qb_ipcs_service_handlers *rw_cb)
1037 	{
1038 	    *ipcs_ro = mainloop_add_ipc_server(PCMK__SERVER_BASED_RO,
1039 	                                       QB_IPC_NATIVE, ro_cb);
1040 	
1041 	    *ipcs_rw = mainloop_add_ipc_server(PCMK__SERVER_BASED_RW,
1042 	                                       QB_IPC_NATIVE, rw_cb);
1043 	
1044 	    *ipcs_shm = mainloop_add_ipc_server(PCMK__SERVER_BASED_SHM,
1045 	                                        QB_IPC_SHM, rw_cb);
1046 	
1047 	    if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
1048 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1049 	                   pcmk__server_log_name(pcmk_ipc_based));
1050 	        pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1051 	                   "enabled");
1052 	        crm_exit(CRM_EX_FATAL);
1053 	    }
1054 	}
1055 	
1056 	/*!
1057 	 * \internal
1058 	 * \brief Destroy IPC servers for the CIB manager API
1059 	 *
1060 	 * \param[out] ipcs_ro   IPC server for read-only the CIB manager API
1061 	 * \param[out] ipcs_rw   IPC server for read/write the CIB manager API
1062 	 * \param[out] ipcs_shm  IPC server for shared-memory the CIB manager API
1063 	 *
1064 	 * \note This is a convenience function for calling qb_ipcs_destroy() for each
1065 	 *       argument.
1066 	 */
1067 	void
1068 	pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro,
1069 	                     qb_ipcs_service_t *ipcs_rw,
1070 	                     qb_ipcs_service_t *ipcs_shm)
1071 	{
1072 	    qb_ipcs_destroy(ipcs_ro);
1073 	    qb_ipcs_destroy(ipcs_rw);
1074 	    qb_ipcs_destroy(ipcs_shm);
1075 	}
1076 	
1077 	/*!
1078 	 * \internal
1079 	 * \brief Add an IPC server to the main loop for the controller API
1080 	 *
1081 	 * \param[in] cb  IPC callbacks
1082 	 *
1083 	 * \return Newly created IPC server
1084 	 */
1085 	qb_ipcs_service_t *
1086 	pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
1087 	{
1088 	    return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb);
1089 	}
1090 	
1091 	/*!
1092 	 * \internal
1093 	 * \brief Add an IPC server to the main loop for the attribute manager API
1094 	 *
1095 	 * \param[out] ipcs  Where to store newly created IPC server
1096 	 * \param[in]  cb    IPC callbacks
1097 	 *
1098 	 * \note This function exits fatally on error.
1099 	 */
1100 	void
1101 	pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs,
1102 	                      struct qb_ipcs_service_handlers *cb)
1103 	{
1104 	    *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_attrd),
1105 	                                    QB_IPC_NATIVE, cb);
1106 	
1107 	    if (*ipcs == NULL) {
1108 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1109 	                   pcmk__server_log_name(pcmk_ipc_attrd));
1110 	        pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1111 	                   "enabled");
1112 	        crm_exit(CRM_EX_FATAL);
1113 	    }
1114 	}
1115 	
1116 	/*!
1117 	 * \internal
1118 	 * \brief Add an IPC server to the main loop for the executor API
1119 	 *
1120 	 * \param[out] ipcs  Where to store newly created IPC server
1121 	 * \param[in]  cb    IPC callbacks
1122 	 *
1123 	 * \note This function exits fatally on error.
1124 	 */
1125 	void
1126 	pcmk__serve_execd_ipc(qb_ipcs_service_t **ipcs,
1127 	                      struct qb_ipcs_service_handlers *cb)
1128 	{
1129 	    *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_execd),
1130 	                                    QB_IPC_SHM, cb);
1131 	
1132 	    if (*ipcs == NULL) {
1133 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1134 	                   pcmk__server_log_name(pcmk_ipc_execd));
1135 	        crm_exit(CRM_EX_FATAL);
1136 	    }
1137 	}
1138 	
1139 	/*!
1140 	 * \internal
1141 	 * \brief Add an IPC server to the main loop for the fencer API
1142 	 *
1143 	 * \param[out] ipcs  Where to store newly created IPC server
1144 	 * \param[in]  cb    IPC callbacks
1145 	 *
1146 	 * \note This function exits fatally on error.
1147 	 */
1148 	void
1149 	pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs,
1150 	                       struct qb_ipcs_service_handlers *cb)
1151 	{
1152 	    *ipcs = mainloop_add_ipc_server_with_prio(pcmk__server_ipc_name(pcmk_ipc_fenced),
1153 	                                              QB_IPC_NATIVE, cb, QB_LOOP_HIGH);
1154 	
1155 	    if (*ipcs == NULL) {
1156 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1157 	                   pcmk__server_log_name(pcmk_ipc_fenced));
1158 	        pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1159 	                   "enabled");
1160 	        crm_exit(CRM_EX_FATAL);
1161 	    }
1162 	}
1163 	
1164 	/*!
1165 	 * \internal
1166 	 * \brief Add an IPC server to the main loop for the pacemakerd API
1167 	 *
1168 	 * \param[out] ipcs  Where to store newly created IPC server
1169 	 * \param[in]  cb    IPC callbacks
1170 	 *
1171 	 * \note This function exits with CRM_EX_OSERR on error.
1172 	 */
1173 	void
1174 	pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs,
1175 	                       struct qb_ipcs_service_handlers *cb)
1176 	{
1177 	    *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_pacemakerd),
1178 	                                    QB_IPC_NATIVE, cb);
1179 	
1180 	    if (*ipcs == NULL) {
1181 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1182 	                   pcmk__server_log_name(pcmk_ipc_pacemakerd));
1183 	        pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1184 	                   "enabled");
1185 	        /* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL
1186 	         * if we want to prevent pacemakerd from restarting them.
1187 	         * With pacemakerd we leave the exit-code shown to e.g. systemd
1188 	         * to what it was prior to moving the code here from pacemakerd.c
1189 	         */
1190 	        crm_exit(CRM_EX_OSERR);
1191 	    }
1192 	}
1193 	
1194 	/*!
1195 	 * \internal
1196 	 * \brief Add an IPC server to the main loop for the scheduler API
1197 	 *
1198 	 * \param[out] ipcs  Where to store newly created IPC server
1199 	 * \param[in]  cb    IPC callbacks
1200 	 *
1201 	 * \return Newly created IPC server
1202 	 * \note This function exits fatally on error.
1203 	 */
1204 	void
1205 	pcmk__serve_schedulerd_ipc(qb_ipcs_service_t **ipcs,
1206 	                           struct qb_ipcs_service_handlers *cb)
1207 	{
1208 	    *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_schedulerd),
1209 	                                    QB_IPC_NATIVE, cb);
1210 	
1211 	    if (*ipcs == NULL) {
1212 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1213 	                   pcmk__server_log_name(pcmk_ipc_schedulerd));
1214 	        crm_exit(CRM_EX_FATAL);
1215 	    }
1216 	}
1217