1    	/*
2    	 * Copyright 2004-2026 the Pacemaker project contributors
3    	 *
4    	 * The version control history for this file may have further details.
5    	 *
6    	 * This source code is licensed under the GNU Lesser General Public License
7    	 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8    	 */
9    	
10   	#include <crm_internal.h>
11   	
12   	#include <stdbool.h>
13   	#include <stdio.h>
14   	#include <errno.h>
15   	#include <bzlib.h>
16   	#include <sys/stat.h>
17   	#include <sys/types.h>
18   	
19   	#include <crm/crm.h>
20   	#include <crm/common/xml.h>
21   	#include <crm/common/ipc.h>
22   	#include "crmcommon_private.h"
23   	
24   	/* Evict clients whose event queue grows this large (by default) */
25   	#define PCMK_IPC_DEFAULT_QUEUE_MAX 500
26   	
27   	static GHashTable *client_connections = NULL;
28   	
29   	/*!
30   	 * \internal
31   	 * \brief Count IPC clients
32   	 *
33   	 * \return Number of active IPC client connections
34   	 */
35   	guint
36   	pcmk__ipc_client_count(void)
37   	{
38   	    return client_connections? g_hash_table_size(client_connections) : 0;
39   	}
40   	
41   	/*!
42   	 * \internal
43   	 * \brief Execute a function for each active IPC client connection
44   	 *
45   	 * \param[in]     func       Function to call
46   	 * \param[in,out] user_data  Pointer to pass to function
47   	 *
48   	 * \note The parameters are the same as for g_hash_table_foreach().
49   	 */
50   	void
51   	pcmk__foreach_ipc_client(GHFunc func, gpointer user_data)
52   	{
53   	    if ((func != NULL) && (client_connections != NULL)) {
54   	        g_hash_table_foreach(client_connections, func, user_data);
55   	    }
56   	}
57   	
58   	pcmk__client_t *
59   	pcmk__find_client(const qb_ipcs_connection_t *c)
60   	{
61   	    if (client_connections) {
62   	        return g_hash_table_lookup(client_connections, c);
63   	    }
64   	
65   	    pcmk__trace("No client found for %p", c);
66   	    return NULL;
67   	}
68   	
69   	pcmk__client_t *
70   	pcmk__find_client_by_id(const char *id)
71   	{
72   	    if ((client_connections != NULL) && (id != NULL)) {
73   	        gpointer key;
74   	        pcmk__client_t *client = NULL;
75   	        GHashTableIter iter;
76   	
77   	        g_hash_table_iter_init(&iter, client_connections);
78   	        while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
79   	            if (strcmp(client->id, id) == 0) {
80   	                return client;
81   	            }
82   	        }
83   	    }
84   	    pcmk__trace("No client found with id='%s'", pcmk__s(id, ""));
85   	    return NULL;
86   	}
87   	
88   	/*!
89   	 * \internal
90   	 * \brief Get a client identifier for use in log messages
91   	 *
92   	 * \param[in] c  Client
93   	 *
94   	 * \return Client's name, client's ID, or a string literal, as available
95   	 * \note This is intended to be used in format strings like "client %s".
96   	 */
97   	const char *
98   	pcmk__client_name(const pcmk__client_t *c)
99   	{
100  	    if (c == NULL) {
101  	        return "(unspecified)";
102  	
103  	    } else if (c->name != NULL) {
104  	        return c->name;
105  	
106  	    } else if (c->id != NULL) {
107  	        return c->id;
108  	
109  	    } else {
110  	        return "(unidentified)";
111  	    }
112  	}
113  	
114  	void
115  	pcmk__client_cleanup(void)
116  	{
(1) Event path: Condition "client_connections != NULL", taking true branch.
117  	    if (client_connections != NULL) {
118  	        int active = g_hash_table_size(client_connections);
119  	
(2) Event path: Condition "active > 0", taking true branch.
120  	        if (active > 0) {
(3) Event path: Condition "active == 1", taking true branch.
121  	            pcmk__warn("Exiting with %d active IPC client%s", active,
122  	                       pcmk__plural_s(active));
123  	        }
CID (unavailable; MK=7738de940091a0758ed0ae977b621ce2) (#1 of 1): Inconsistent C union access (INCONSISTENT_UNION_ACCESS):
(4) Event assign_union_field: The union field "in" of "_pp" is written.
(5) Event inconsistent_union_field_access: In "_pp.out", the union field used: "out" is inconsistent with the field most recently stored: "in".
124  	        g_clear_pointer(&client_connections, g_hash_table_destroy);
125  	    }
126  	}
127  	
128  	void
129  	pcmk__drop_all_clients(qb_ipcs_service_t *service)
130  	{
131  	    qb_ipcs_connection_t *c = NULL;
132  	
133  	    if (service == NULL) {
134  	        return;
135  	    }
136  	
137  	    c = qb_ipcs_connection_first_get(service);
138  	
139  	    while (c != NULL) {
140  	        qb_ipcs_connection_t *last = c;
141  	
142  	        c = qb_ipcs_connection_next_get(service, last);
143  	
144  	        /* There really shouldn't be anyone connected at this point */
145  	        pcmk__notice("Disconnecting client %p, pid=%d...", last,
146  	                     pcmk__client_pid(last));
147  	        qb_ipcs_disconnect(last);
148  	        qb_ipcs_connection_unref(last);
149  	    }
150  	}
151  	
152  	/*!
153  	 * \internal
154  	 * \brief Allocate a new pcmk__client_t object based on an IPC connection
155  	 *
156  	 * \param[in] c           IPC connection (NULL to allocate generic client)
157  	 * \param[in] key         Connection table key (NULL to use sane default)
158  	 * \param[in] uid_client  UID corresponding to c (ignored if c is NULL)
159  	 *
160  	 * \return Pointer to new pcmk__client_t (guaranteed not to be \c NULL)
161  	 */
162  	static pcmk__client_t *
163  	client_from_connection(qb_ipcs_connection_t *c, void *key, uid_t uid_client)
164  	{
165  	    pcmk__client_t *client = pcmk__assert_alloc(1, sizeof(pcmk__client_t));
166  	
167  	    if (c) {
168  	        client->user = pcmk__uid2username(uid_client);
169  	        if (client->user == NULL) {
170  	            client->user = pcmk__str_copy("#unprivileged");
171  	            pcmk__err("Unable to enforce ACLs for user ID %d, assuming "
172  	                      "unprivileged",
173  	                      uid_client);
174  	        }
175  	        client->ipcs = c;
176  	        pcmk__set_client_flags(client, pcmk__client_ipc);
177  	        client->pid = pcmk__client_pid(c);
178  	        if (key == NULL) {
179  	            key = c;
180  	        }
181  	    }
182  	
183  	    client->id = pcmk__generate_uuid();
184  	    if (key == NULL) {
185  	        key = client->id;
186  	    }
187  	    if (client_connections == NULL) {
188  	        pcmk__trace("Creating IPC client table");
189  	        client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
190  	    }
191  	    g_hash_table_insert(client_connections, key, client);
192  	    return client;
193  	}
194  	
195  	/*!
196  	 * \brief Allocate a new pcmk__client_t object and generate its ID
197  	 *
198  	 * \param[in] key  What to use as connections hash table key (NULL to use ID)
199  	 *
200  	 * \return Pointer to new pcmk__client_t (asserts on failure)
201  	 */
202  	pcmk__client_t *
203  	pcmk__new_unauth_client(void *key)
204  	{
205  	    return client_from_connection(NULL, key, 0);
206  	}
207  	
208  	pcmk__client_t *
209  	pcmk__new_client(qb_ipcs_connection_t *c, uid_t uid_client, gid_t gid_client)
210  	{
211  	    gid_t uid_cluster = 0;
212  	    gid_t gid_cluster = 0;
213  	
214  	    pcmk__client_t *client = NULL;
215  	
216  	    CRM_CHECK(c != NULL, return NULL);
217  	
218  	    if (pcmk__daemon_user(&uid_cluster, &gid_cluster) != pcmk_rc_ok) {
219  	        static bool need_log = true;
220  	
221  	        if (need_log) {
222  	            pcmk__warn("Could not find user and group IDs for user "
223  	                       CRM_DAEMON_USER);
224  	            need_log = false;
225  	        }
226  	    }
227  	
228  	    if (uid_client != 0) {
229  	        pcmk__trace("Giving group %u access to new IPC connection",
230  	                    gid_cluster);
231  	        /* Passing -1 to chown(2) means don't change */
232  	        qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
233  	    }
234  	
235  	    /* TODO: Do our own auth checking, return NULL if unauthorized */
236  	    client = client_from_connection(c, NULL, uid_client);
237  	
238  	    if ((uid_client == 0) || (uid_client == uid_cluster)) {
239  	        /* Remember when a connection came from root or hacluster */
240  	        pcmk__set_client_flags(client, pcmk__client_privileged);
241  	    }
242  	
243  	    pcmk__debug("New IPC client %s for PID %u with uid %d and gid %d",
244  	                client->id, client->pid, uid_client, gid_client);
245  	    return client;
246  	}
247  	
248  	static struct iovec *
249  	pcmk__new_ipc_event(void)
250  	{
251  	    return (struct iovec *) pcmk__assert_alloc(2, sizeof(struct iovec));
252  	}
253  	
254  	/*!
255  	 * \brief Free an I/O vector created by pcmk__ipc_prepare_iov()
256  	 *
257  	 * \param[in,out] event  I/O vector to free
258  	 */
259  	void
260  	pcmk_free_ipc_event(struct iovec *event)
261  	{
262  	    if (event != NULL) {
263  	        free(event[0].iov_base);
264  	        free(event[1].iov_base);
265  	        free(event);
266  	    }
267  	}
268  	
269  	static void
270  	free_event(gpointer data)
271  	{
272  	    pcmk_free_ipc_event((struct iovec *) data);
273  	}
274  	
275  	static void
276  	add_event(pcmk__client_t *c, struct iovec *iov)
277  	{
278  	    if (c->event_queue == NULL) {
279  	        c->event_queue = g_queue_new();
280  	    }
281  	    g_queue_push_tail(c->event_queue, iov);
282  	}
283  	
284  	void
285  	pcmk__free_client(pcmk__client_t *c)
286  	{
287  	    if (c == NULL) {
288  	        return;
289  	    }
290  	
291  	    if (client_connections) {
292  	        if (c->ipcs) {
293  	            pcmk__trace("Destroying %p/%p (%u remaining)", c, c->ipcs,
294  	                        (g_hash_table_size(client_connections) - 1));
295  	            g_hash_table_remove(client_connections, c->ipcs);
296  	
297  	        } else {
298  	            pcmk__trace("Destroying remote connection %p (%u remaining)", c,
299  	                        (g_hash_table_size(client_connections) - 1));
300  	            g_hash_table_remove(client_connections, c->id);
301  	        }
302  	    }
303  	
304  	    if (c->event_timer) {
305  	        g_source_remove(c->event_timer);
306  	    }
307  	
308  	    if (c->event_queue) {
309  	        pcmk__debug("Destroying %d events", g_queue_get_length(c->event_queue));
310  	        g_queue_free_full(c->event_queue, free_event);
311  	    }
312  	
313  	    free(c->id);
314  	    free(c->name);
315  	    free(c->user);
316  	
317  	    if (c->buffer != NULL) {
318  	        g_byte_array_free(c->buffer, TRUE);
319  	        c->buffer = NULL;
320  	    }
321  	
322  	    if (c->remote) {
323  	        if (c->remote->auth_timeout) {
324  	            g_source_remove(c->remote->auth_timeout);
325  	        }
326  	        if (c->remote->tls_session != NULL) {
327  	            /* @TODO Reduce duplication at callers. Put here everything
328  	             * necessary to tear down and free tls_session.
329  	             */
330  	            gnutls_deinit(c->remote->tls_session);
331  	        }
332  	        free(c->remote->buffer);
333  	        free(c->remote);
334  	    }
335  	    free(c);
336  	}
337  	
338  	/*!
339  	 * \internal
340  	 * \brief Raise IPC eviction threshold for a client, if allowed
341  	 *
342  	 * \param[in,out] client     Client to modify
343  	 * \param[in]     qmax       New threshold
344  	 */
345  	void
346  	pcmk__set_client_queue_max(pcmk__client_t *client, const char *qmax)
347  	{
348  	    int rc = pcmk_rc_ok;
349  	    long long qmax_ll = 0LL;
350  	    unsigned int orig_value = 0U;
351  	
352  	    CRM_CHECK(client != NULL, return);
353  	
354  	    orig_value = client->queue_max;
355  	
356  	    if (pcmk__is_set(client->flags, pcmk__client_privileged)) {
357  	        rc = pcmk__scan_ll(qmax, &qmax_ll, 0LL);
358  	        if (rc == pcmk_rc_ok) {
359  	            if ((qmax_ll <= 0LL) || (qmax_ll > UINT_MAX)) {
360  	                rc = ERANGE;
361  	            } else {
362  	                client->queue_max = (unsigned int) qmax_ll;
363  	            }
364  	        }
365  	    } else {
366  	        rc = EACCES;
367  	    }
368  	
369  	    if (rc != pcmk_rc_ok) {
370  	        pcmk__info("Could not set IPC threshold for client %s[%u] to %s: %s",
371  	                   pcmk__client_name(client), client->pid,
372  	                   pcmk__s(qmax, "default"), pcmk_rc_str(rc));
373  	
374  	    } else if (client->queue_max != orig_value) {
375  	        pcmk__debug("IPC threshold for client %s[%u] is now %u (was %u)",
376  	                    pcmk__client_name(client), client->pid, client->queue_max,
377  	                    orig_value);
378  	    }
379  	}
380  	
381  	int
382  	pcmk__client_pid(qb_ipcs_connection_t *c)
383  	{
384  	    struct qb_ipcs_connection_stats stats;
385  	
386  	    stats.client_pid = 0;
387  	    qb_ipcs_connection_stats_get(c, &stats, 0);
388  	    return stats.client_pid;
389  	}
390  	
391  	/*!
392  	 * \internal
393  	 * \brief Retrieve message XML from data read from client IPC
394  	 *
395  	 * \param[in,out]  c       IPC client connection
396  	 * \param[out]     id      Where to store message ID from libqb header
397  	 * \param[out]     flags   Where to store flags from libqb header
398  	 *
399  	 * \return Message XML on success, NULL otherwise
400  	 */
401  	xmlNode *
402  	pcmk__client_data2xml(pcmk__client_t *c, uint32_t *id, uint32_t *flags)
403  	{
404  	    xmlNode *xml = NULL;
405  	    pcmk__ipc_header_t *header = (void *) c->buffer->data;
406  	    char *text = (char *) header + sizeof(pcmk__ipc_header_t);
407  	
408  	    if (!pcmk__valid_ipc_header(header)) {
409  	        return NULL;
410  	    }
411  	
412  	    if (id) {
413  	        *id = header->qb.id;
414  	    }
415  	
416  	    if (flags) {
417  	        *flags = header->flags;
418  	    }
419  	
420  	    if (pcmk__is_set(header->flags, crm_ipc_proxied)) {
421  	        /* Mark this client as being the endpoint of a proxy connection.
422  	         * Proxy connections responses are sent on the event channel, to avoid
423  	         * blocking the controller serving as proxy.
424  	         */
425  	        pcmk__set_client_flags(c, pcmk__client_proxied);
426  	    }
427  	
428  	    pcmk__assert(text[header->size - 1] == 0);
429  	
430  	    xml = pcmk__xml_parse(text);
431  	    pcmk__log_xml_trace(xml, "[IPC received]");
432  	    return xml;
433  	}
434  	
435  	static int crm_ipcs_flush_events(pcmk__client_t *c);
436  	
437  	static gboolean
438  	crm_ipcs_flush_events_cb(gpointer data)
439  	{
440  	    pcmk__client_t *c = data;
441  	
442  	    c->event_timer = 0;
443  	    crm_ipcs_flush_events(c);
444  	    return FALSE;
445  	}
446  	
447  	/*!
448  	 * \internal
449  	 * \brief Add progressive delay before next event queue flush
450  	 *
451  	 * \param[in,out] c          Client connection to add delay to
452  	 * \param[in]     queue_len  Current event queue length
453  	 */
454  	static inline void
455  	delay_next_flush(pcmk__client_t *c, unsigned int queue_len)
456  	{
457  	    /* Delay a maximum of 1.5 seconds */
458  	    guint delay = (queue_len < 5)? (1000 + 100 * queue_len) : 1500;
459  	
460  	    c->event_timer = pcmk__create_timer(delay, crm_ipcs_flush_events_cb, c);
461  	}
462  	
463  	/*!
464  	 * \internal
465  	 * \brief Send client any messages in its queue
466  	 *
467  	 * \param[in,out] c  Client to flush
468  	 *
469  	 * \return Standard Pacemaker return value
470  	 */
471  	static int
472  	crm_ipcs_flush_events(pcmk__client_t *c)
473  	{
474  	    int rc = pcmk_rc_ok;
475  	    ssize_t qb_rc = 0;
476  	    unsigned int sent = 0;
477  	    unsigned int queue_len = 0;
478  	
479  	    if (c == NULL) {
480  	        return rc;
481  	    }
482  	
483  	    if (c->event_timer != 0) {
484  	        /* There is already a timer, wait until it goes off */
485  	        pcmk__trace("Timer active for %p - %d", c->ipcs, c->event_timer);
486  	        return rc;
487  	    }
488  	
489  	    if (c->event_queue != NULL) {
490  	        queue_len = g_queue_get_length(c->event_queue);
491  	    }
492  	
493  	    while (sent < 100) {
494  	        pcmk__ipc_header_t *header = NULL;
495  	        struct iovec *event = NULL;
496  	
497  	        if ((c->event_queue == NULL) || g_queue_is_empty(c->event_queue)) {
498  	            break;
499  	        }
500  	
501  	        // We don't pop unless send is successful
502  	        event = g_queue_peek_head(c->event_queue);
503  	
504  	        /* Retry sending the event up to five times.  If we get -EAGAIN, sleep
505  	         * a very short amount of time (too long here is bad) and try again.
506  	         * If we simply exit the while loop on -EAGAIN, we'll have to wait until
507  	         * the timer fires off again (up to 1.5 seconds - see delay_next_flush)
508  	         * to retry sending the message.
509  	         *
510  	         * In that case, the queue may just continue to grow faster than we are
511  	         * processing it, eventually leading to daemons timing out waiting for
512  	         * replies, which will cause wider failures.
513  	         */
514  	        for (unsigned int retries = 5; retries > 0; retries--) {
515  	            qb_rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
516  	
517  	            if (qb_rc >= 0) {
518  	                break;
519  	            }
520  	
521  	            if (retries == 1 || qb_rc != -EAGAIN) {
522  	                rc = (int) -qb_rc;
523  	                goto no_more_retries;
524  	            }
525  	
526  	            pcmk__sleep_ms(5);
527  	        }
528  	
529  	        event = g_queue_pop_head(c->event_queue);
530  	
531  	        sent++;
532  	        header = event[0].iov_base;
533  	
534  	        pcmk__trace("Event %" PRId32 " to %p[%u] (%zd bytes) sent: %.120s",
535  	                    header->qb.id, c->ipcs, c->pid, qb_rc,
536  	                    (char *) (event[1].iov_base));
537  	        pcmk_free_ipc_event(event);
538  	    }
539  	
540  	no_more_retries:
541  	    queue_len -= sent;
542  	    if (sent > 0 || queue_len) {
543  	        pcmk__trace("Sent %u events (%u remaining) for %p[%d]: %s (%zd)", sent,
544  	                    queue_len, c->ipcs, c->pid, pcmk_rc_str(rc), qb_rc);
545  	    }
546  	
547  	    if (queue_len == 0) {
548  	        /* Event queue is empty, there is no backlog */
549  	        c->queue_backlog = 0;
550  	        return rc;
551  	    }
552  	
553  	    /* Allow clients to briefly fall behind on processing incoming messages,
554  	     * but drop completely unresponsive clients so the connection doesn't
555  	     * consume resources indefinitely.
556  	     */
557  	    if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
558  	        /* Don't evict:
559  	         * - Clients with a new backlog.
560  	         * - Clients with a shrinking backlog (the client is processing
561  	         *   messages faster than the server is sending them).
562  	         * - Clients that are pacemaker daemons and have had any messages sent
563  	         *   to them in this flush call (the server is sending messages faster
564  	         *   than the client is processing them, but the client is not dead).
565  	         */
566  	        if ((c->queue_backlog <= 1)
567  	            || (queue_len < c->queue_backlog)
568  	            || ((sent > 0) && (pcmk__parse_server(c->name) != pcmk_ipc_unknown))) {
569  	            pcmk__warn("Client with process ID %u has a backlog of %u messages "
570  	                       QB_XS " %p", c->pid, queue_len, c->ipcs);
571  	
572  	        } else {
573  	            pcmk__err("Evicting client with process ID %u due to backlog of %u "
574  	                      "messages " QB_XS " %p",
575  	                      c->pid, queue_len, c->ipcs);
576  	            c->queue_backlog = 0;
577  	            qb_ipcs_disconnect(c->ipcs);
578  	            return rc;
579  	        }
580  	    }
581  	
582  	    c->queue_backlog = queue_len;
583  	    delay_next_flush(c, queue_len);
584  	
585  	    return rc;
586  	}
587  	
588  	/*!
589  	 * \internal
590  	 * \brief Create an I/O vector for sending an IPC XML message
591  	 *
592  	 * If the message is too large to fit into a single buffer, this function will
593  	 * prepare an I/O vector that only holds as much as fits.  The remainder can be
594  	 * prepared in a separate call by keeping a running count of the number of times
595  	 * this function has been called and passing that in for \p index.
596  	 *
597  	 * \param[in]  request Identifier for libqb response header
598  	 * \param[in]  message Message to send
599  	 * \param[in]  index   How many times this function has been called - basically,
600  	 *                     a count of how many chunks of \p message have already
601  	 *                     been sent
602  	 * \param[out] result  Where to store prepared I/O vector - NULL on error
603  	 * \param[out] bytes   Size of prepared data in bytes (includes header)
604  	 *
605  	 * \return Standard Pacemaker return code
606  	 */
607  	int
608  	pcmk__ipc_prepare_iov(uint32_t request, const GString *message, uint16_t index,
609  	                      struct iovec **result, ssize_t *bytes)
610  	{
611  	    struct iovec *iov = NULL;
612  	    unsigned int payload_size = 0;
613  	    unsigned int total = 0;
614  	    unsigned int max_send_size = crm_ipc_default_buffer_size();
615  	    unsigned int max_chunk_size = 0;
616  	    size_t offset = 0;
617  	    pcmk__ipc_header_t *header = NULL;
618  	    int rc = pcmk_rc_ok;
619  	
620  	    if ((message == NULL) || (result == NULL)) {
621  	        rc = EINVAL;
622  	        goto done;
623  	    }
624  	
625  	    header = calloc(1, sizeof(pcmk__ipc_header_t));
626  	    if (header == NULL) {
627  	       rc = ENOMEM;
628  	       goto done;
629  	    }
630  	
631  	    *result = NULL;
632  	    iov = pcmk__new_ipc_event();
633  	    iov[0].iov_len = sizeof(pcmk__ipc_header_t);
634  	    iov[0].iov_base = header;
635  	
636  	    header->version = PCMK__IPC_VERSION;
637  	
638  	    /* We are passed an index, which is basically how many times this function
639  	     * has been called.  This is how we support multi-part IPC messages.  We
640  	     * need to convert that into an offset into the buffer that we want to start
641  	     * reading from.
642  	     *
643  	     * Each call to this function can send max_send_size, but this also includes
644  	     * the header and a null terminator character for the end of the payload.
645  	     * We need to subtract those out here.
646  	     */
647  	    max_chunk_size = max_send_size - iov[0].iov_len - 1;
648  	    offset = index * max_chunk_size;
649  	
650  	    /* How much of message is left to send?  This does not include the null
651  	     * terminator character.
652  	     */
653  	    payload_size = message->len - offset;
654  	
655  	    /* How much would be transmitted, including the header size and null
656  	     * terminator character for the buffer?
657  	     */
658  	    total = iov[0].iov_len + payload_size + 1;
659  	
660  	    if (total >= max_send_size) {
661  	        /* The entire packet is too big to fit in a single buffer.  Calculate
662  	         * how much of it we can send - buffer size, minus header size, minus
663  	         * one for the null terminator.
664  	         */
665  	        payload_size = max_chunk_size;
666  	
667  	        header->size = payload_size + 1;
668  	
669  	        iov[1].iov_base = strndup(message->str + offset, payload_size);
670  	        if (iov[1].iov_base == NULL) {
671  	            rc = ENOMEM;
672  	            goto done;
673  	        }
674  	
675  	        iov[1].iov_len = header->size;
676  	        rc = pcmk_rc_ipc_more;
677  	
678  	    } else {
679  	        /* The entire packet fits in a single buffer.  We can copy the entirety
680  	         * of it into the payload.
681  	         */
682  	        header->size = payload_size + 1;
683  	
684  	        iov[1].iov_base = pcmk__str_copy(message->str + offset);
685  	        iov[1].iov_len = header->size;
686  	    }
687  	
688  	    header->part_id = index;
689  	    header->qb.size = iov[0].iov_len + iov[1].iov_len;
690  	    header->qb.id = (int32_t)request;    /* Replying to a specific request */
691  	
692  	    if ((rc == pcmk_rc_ok) && (index != 0)) {
693  	        pcmk__set_ipc_flags(header->flags, "multipart ipc",
694  	                            crm_ipc_multipart | crm_ipc_multipart_end);
695  	    } else if (rc == pcmk_rc_ipc_more) {
696  	        pcmk__set_ipc_flags(header->flags, "multipart ipc",
697  	                            crm_ipc_multipart);
698  	    }
699  	
700  	    *result = iov;
701  	    pcmk__assert(header->qb.size > 0);
702  	    if (bytes != NULL) {
703  	        *bytes = header->qb.size;
704  	    }
705  	
706  	done:
707  	    if ((rc != pcmk_rc_ok) && (rc != pcmk_rc_ipc_more)) {
708  	        pcmk_free_ipc_event(iov);
709  	    }
710  	
711  	    return rc;
712  	}
713  	
714  	/* Return the next available ID for a server event.
715  	 *
716  	 * For the parts of a multipart event, all parts should have the same ID as
717  	 * the first part.
718  	 */
719  	static uint32_t
720  	id_for_server_event(pcmk__ipc_header_t *header)
721  	{
722  	    static uint32_t id = 1;
723  	
724  	    if (pcmk__is_set(header->flags, crm_ipc_multipart)
725  	        && (header->part_id != 0)) {
726  	        return id;
727  	    } else {
728  	        id++;
729  	        return id;
730  	    }
731  	}
732  	
733  	int
734  	pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags)
735  	{
736  	    int rc = pcmk_rc_ok;
737  	    pcmk__ipc_header_t *header = iov[0].iov_base;
738  	
739  	    /* _ALL_ replies to proxied connections need to be sent as events */
740  	    if (pcmk__is_set(c->flags, pcmk__client_proxied)
741  	        && !pcmk__is_set(flags, crm_ipc_server_event)) {
742  	        /* The proxied flag lets us know this was originally meant to be a
743  	         * response, even though we're sending it over the event channel.
744  	         */
745  	        pcmk__set_ipc_flags(flags, "server event",
746  	                            crm_ipc_server_event|crm_ipc_proxied_relay_response);
747  	    }
748  	
749  	    pcmk__set_ipc_flags(header->flags, "server event", flags);
750  	    if (pcmk__is_set(flags, crm_ipc_server_event)) {
751  	        /* Server events don't use an ID, though we do set one in
752  	         * pcmk__ipc_prepare_iov if the event is in response to a particular
753  	         * request.  In that case, we don't want to set a new ID here that
754  	         * overwrites that one.
755  	         *
756  	         * @TODO: Since server event IDs aren't used anywhere, do we really
757  	         * need to set this for any reason other than ease of logging?
758  	         */
759  	        if (header->qb.id == 0) {
760  	            header->qb.id = id_for_server_event(header);
761  	        }
762  	
763  	        if (pcmk__is_set(flags, crm_ipc_server_free)) {
764  	            pcmk__trace("Sending the original to %p[%d]", c->ipcs, c->pid);
765  	            add_event(c, iov);
766  	
767  	        } else {
768  	            struct iovec *iov_copy = pcmk__new_ipc_event();
769  	
770  	            pcmk__trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
771  	            iov_copy[0].iov_len = iov[0].iov_len;
772  	            iov_copy[0].iov_base = malloc(iov[0].iov_len);
773  	            memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
774  	
775  	            iov_copy[1].iov_len = iov[1].iov_len;
776  	            iov_copy[1].iov_base = malloc(iov[1].iov_len);
777  	            memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
778  	
779  	            add_event(c, iov_copy);
780  	        }
781  	
782  	        rc = crm_ipcs_flush_events(c);
783  	
784  	    } else {
785  	        ssize_t qb_rc;
786  	        char *part_text = NULL;
787  	
788  	        CRM_LOG_ASSERT(header->qb.id != 0);     /* Replying to a specific request */
789  	
790  	        if (pcmk__is_set(header->flags, crm_ipc_multipart_end)) {
791  	            part_text = pcmk__assert_asprintf(" (final part %d) ",
792  	                                              header->part_id);
793  	        } else if (pcmk__is_set(header->flags, crm_ipc_multipart)) {
794  	            if (header->part_id == 0) {
795  	                part_text = pcmk__assert_asprintf(" (initial part %d) ",
796  	                                                  header->part_id);
797  	            } else {
798  	                part_text = pcmk__assert_asprintf(" (part %d) ",
799  	                                                  header->part_id);
800  	            }
801  	        } else {
802  	            part_text = pcmk__str_copy(" ");
803  	        }
804  	
805  	        qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
806  	        if (qb_rc < header->qb.size) {
807  	            if (qb_rc < 0) {
808  	                rc = (int) -qb_rc;
809  	            }
810  	
811  	        } else {
812  	            pcmk__trace("Response %" PRId32 "%ssent, %zd bytes to %p[%u]",
813  	                        header->qb.id, part_text, qb_rc, c->ipcs, c->pid);
814  	            pcmk__trace("Text = %s", (char *) iov[1].iov_base);
815  	        }
816  	
817  	        free(part_text);
818  	
819  	        if (pcmk__is_set(flags, crm_ipc_server_free)) {
820  	            pcmk_free_ipc_event(iov);
821  	        }
822  	
823  	        crm_ipcs_flush_events(c);
824  	    }
825  	
826  	    if ((rc == EPIPE) || (rc == ENOTCONN)) {
827  	        pcmk__trace("Client %p disconnected", c->ipcs);
828  	    }
829  	
830  	    return rc;
831  	}
832  	
833  	int
834  	pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message,
835  	                   uint32_t flags)
836  	{
837  	    struct iovec *iov = NULL;
838  	    int rc = pcmk_rc_ok;
839  	    GString *iov_buffer = NULL;
840  	    uint16_t index = 0;
841  	    bool event_or_proxied = false;
842  	
843  	    if (c == NULL) {
844  	        return EINVAL;
845  	    }
846  	
847  	    iov_buffer = g_string_sized_new(1024);
848  	    pcmk__xml_string(message, 0, iov_buffer, 0);
849  	
850  	    /* Testing crm_ipc_server_event is obvious.  pcmk__client_proxied is less
851  	     * obvious.  According to pcmk__ipc_send_iov, replies to proxied connections
852  	     * need to be sent as events.  However, do_local_notify (which calls this
853  	     * function) will clear all flags so we can't go just by crm_ipc_server_event.
854  	     *
855  	     * Changing do_local_notify to check for a proxied connection first results
856  	     * in processes on the Pacemaker Remote node (like cibadmin or crm_mon)
857  	     * timing out when waiting for a reply.
858  	     */
859  	    event_or_proxied = pcmk__is_set(flags, crm_ipc_server_event)
860  	                       || pcmk__is_set(c->flags, pcmk__client_proxied);
861  	
862  	    do {
863  	        rc = pcmk__ipc_prepare_iov(request, iov_buffer, index, &iov, NULL);
864  	
865  	        switch (rc) {
866  	            case pcmk_rc_ok:
867  	                /* No more chunks to send after this one */
868  	                pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
869  	                rc = pcmk__ipc_send_iov(c, iov, flags);
870  	
871  	                if (event_or_proxied) {
872  	                    if (rc == EAGAIN) {
873  	                        /* Return pcmk_rc_ok instead so callers don't have to know
874  	                         * whether they passed an event or not when interpreting
875  	                         * the return code.
876  	                         */
877  	                        rc = pcmk_rc_ok;
878  	                    }
879  	                } else {
880  	                    /* EAGAIN is an error for IPC messages.  We don't have a
881  	                     * send queue for these, so we need to try again.  If there
882  	                     * was some other error, we need to break out of this loop
883  	                     * and report it.
884  	                     *
885  	                     * FIXME: Retry limit for EAGAIN?
886  	                     */
887  	                    if (rc == EAGAIN) {
888  	                        break;
889  	                    }
890  	                }
891  	
892  	                goto done;
893  	
894  	            case pcmk_rc_ipc_more:
895  	                /* There are more chunks to send after this one */
896  	                pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free);
897  	                rc = pcmk__ipc_send_iov(c, iov, flags);
898  	
899  	                /* Did an error occur during transmission? */
900  	                if (event_or_proxied) {
901  	                    /* EAGAIN is not an error for server events.  The event
902  	                     * will be queued for transmission and we will attempt
903  	                     * sending it again the next time pcmk__ipc_send_iov is
904  	                     * called, or when the crm_ipcs_flush_events_cb happens.
905  	                     */
906  	                    if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
907  	                        goto done;
908  	                    }
909  	
910  	                    index++;
911  	                    break;
912  	
913  	                } else {
914  	                    /* EAGAIN is an error for IPC messages.  We don't have a
915  	                     * send queue for these, so we need to try again.  If there
916  	                     * was some other error, we need to break out of this loop
917  	                     * and report it.
918  	                     *
919  	                     * FIXME: Retry limit for EAGAIN?
920  	                     */
921  	                    if (rc == pcmk_rc_ok) {
922  	                        index++;
923  	                        break;
924  	                    } else if (rc == EAGAIN) {
925  	                        break;
926  	                    } else {
927  	                        goto done;
928  	                    }
929  	                }
930  	
931  	            default:
932  	                /* An error occurred during preparation */
933  	                goto done;
934  	        }
935  	    } while (true);
936  	
937  	done:
938  	    if ((rc != pcmk_rc_ok) && (rc != EAGAIN)) {
939  	        pcmk__notice("IPC message to pid %u failed: %s " QB_XS " rc=%d", c->pid,
940  	                     pcmk_rc_str(rc), rc);
941  	    }
942  	
943  	    g_string_free(iov_buffer, TRUE);
944  	    return rc;
945  	}
946  	
947  	/*!
948  	 * \internal
949  	 * \brief Create an acknowledgement with a status code to send to a client
950  	 *
951  	 * \param[in] function  Calling function
952  	 * \param[in] line      Source file line within calling function
953  	 * \param[in] flags     IPC flags to use when sending
954  	 * \param[in] ver       IPC protocol version (can be NULL)
955  	 * \param[in] status    Exit status code to add to ack
956  	 *
957  	 * \return Newly created XML for ack
958  	 *
959  	 * \note The caller is responsible for freeing the return value with
960  	 *       \c pcmk__xml_free().
961  	 */
962  	xmlNode *
963  	pcmk__ipc_create_ack_as(const char *function, int line, uint32_t flags,
964  	                        const char *ver, crm_exit_t status)
965  	{
966  	    xmlNode *ack = NULL;
967  	
968  	    if (!pcmk__is_set(flags, crm_ipc_client_response)) {
969  	        return NULL;
970  	    }
971  	
972  	    ack = pcmk__xe_create(NULL, PCMK__XE_ACK);
973  	    pcmk__xe_set(ack, PCMK_XA_FUNCTION, function);
974  	    pcmk__xe_set_int(ack, PCMK__XA_LINE, line);
975  	    pcmk__xe_set_int(ack, PCMK_XA_STATUS, (int) status);
976  	    pcmk__xe_set(ack, PCMK__XA_IPC_PROTO_VERSION, ver);
977  	    return ack;
978  	}
979  	
980  	/*!
981  	 * \internal
982  	 * \brief Send an acknowledgement with a status code to a client
983  	 *
984  	 * \param[in] function  Calling function
985  	 * \param[in] line      Source file line within calling function
986  	 * \param[in] c         Client to send ack to
987  	 * \param[in] request   Request ID being replied to
988  	 * \param[in] flags     IPC flags to use when sending
989  	 * \param[in] ver       IPC protocol version (can be NULL)
990  	 * \param[in] status    Status code to send with acknowledgement
991  	 *
992  	 * \return Standard Pacemaker return code
993  	 */
994  	int
995  	pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c,
996  	                      uint32_t request, uint32_t flags, const char *ver,
997  	                      crm_exit_t status)
998  	{
999  	    int rc = pcmk_rc_ok;
1000 	    xmlNode *ack = pcmk__ipc_create_ack_as(function, line, flags, ver, status);
1001 	
1002 	    if (ack == NULL) {
1003 	        return pcmk_rc_ok;
1004 	    }
1005 	
1006 	    pcmk__trace("Ack'ing IPC message from client %s as <" PCMK__XE_ACK
1007 	                " status=%d>",
1008 	                pcmk__client_name(c), status);
1009 	    pcmk__log_xml_trace(ack, "sent-ack");
1010 	    c->request_id = 0;
1011 	    rc = pcmk__ipc_send_xml(c, request, ack, flags);
1012 	    pcmk__xml_free(ack);
1013 	    return rc;
1014 	}
1015 	
1016 	/*!
1017 	 * \internal
1018 	 * \brief Add an IPC server to the main loop for the CIB manager API
1019 	 *
1020 	 * \param[out] ipcs_ro   New IPC server for read-only CIB manager API
1021 	 * \param[out] ipcs_rw   New IPC server for read/write CIB manager API
1022 	 * \param[out] ipcs_shm  New IPC server for shared-memory CIB manager API
1023 	 * \param[in]  ro_cb     IPC callbacks for read-only API
1024 	 * \param[in]  rw_cb     IPC callbacks for read/write and shared-memory APIs
1025 	 *
1026 	 * \note This function exits fatally on error.
1027 	 * \note There is no actual difference between the three IPC endpoints other
1028 	 *       than their names.
1029 	 */
1030 	void pcmk__serve_based_ipc(qb_ipcs_service_t **ipcs_ro,
1031 	                           qb_ipcs_service_t **ipcs_rw,
1032 	                           qb_ipcs_service_t **ipcs_shm,
1033 	                           struct qb_ipcs_service_handlers *ro_cb,
1034 	                           struct qb_ipcs_service_handlers *rw_cb)
1035 	{
1036 	    *ipcs_ro = mainloop_add_ipc_server(PCMK__SERVER_BASED_RO,
1037 	                                       QB_IPC_NATIVE, ro_cb);
1038 	
1039 	    *ipcs_rw = mainloop_add_ipc_server(PCMK__SERVER_BASED_RW,
1040 	                                       QB_IPC_NATIVE, rw_cb);
1041 	
1042 	    *ipcs_shm = mainloop_add_ipc_server(PCMK__SERVER_BASED_SHM,
1043 	                                        QB_IPC_SHM, rw_cb);
1044 	
1045 	    if (*ipcs_ro == NULL || *ipcs_rw == NULL || *ipcs_shm == NULL) {
1046 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1047 	                   pcmk__server_log_name(pcmk_ipc_based));
1048 	        pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1049 	                   "enabled");
1050 	        crm_exit(CRM_EX_FATAL);
1051 	    }
1052 	}
1053 	
1054 	/*!
1055 	 * \internal
1056 	 * \brief Destroy IPC servers for the CIB manager API
1057 	 *
1058 	 * \param[out] ipcs_ro   IPC server for read-only the CIB manager API
1059 	 * \param[out] ipcs_rw   IPC server for read/write the CIB manager API
1060 	 * \param[out] ipcs_shm  IPC server for shared-memory the CIB manager API
1061 	 *
1062 	 * \note This is a convenience function for calling qb_ipcs_destroy() for each
1063 	 *       argument.
1064 	 */
1065 	void
1066 	pcmk__stop_based_ipc(qb_ipcs_service_t *ipcs_ro,
1067 	                     qb_ipcs_service_t *ipcs_rw,
1068 	                     qb_ipcs_service_t *ipcs_shm)
1069 	{
1070 	    qb_ipcs_destroy(ipcs_ro);
1071 	    qb_ipcs_destroy(ipcs_rw);
1072 	    qb_ipcs_destroy(ipcs_shm);
1073 	}
1074 	
1075 	/*!
1076 	 * \internal
1077 	 * \brief Add an IPC server to the main loop for the controller API
1078 	 *
1079 	 * \param[in] cb  IPC callbacks
1080 	 *
1081 	 * \return Newly created IPC server
1082 	 */
1083 	qb_ipcs_service_t *
1084 	pcmk__serve_controld_ipc(struct qb_ipcs_service_handlers *cb)
1085 	{
1086 	    return mainloop_add_ipc_server(CRM_SYSTEM_CRMD, QB_IPC_NATIVE, cb);
1087 	}
1088 	
1089 	/*!
1090 	 * \internal
1091 	 * \brief Add an IPC server to the main loop for the attribute manager API
1092 	 *
1093 	 * \param[out] ipcs  Where to store newly created IPC server
1094 	 * \param[in]  cb    IPC callbacks
1095 	 *
1096 	 * \note This function exits fatally on error.
1097 	 */
1098 	void
1099 	pcmk__serve_attrd_ipc(qb_ipcs_service_t **ipcs,
1100 	                      struct qb_ipcs_service_handlers *cb)
1101 	{
1102 	    *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_attrd),
1103 	                                    QB_IPC_NATIVE, cb);
1104 	
1105 	    if (*ipcs == NULL) {
1106 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1107 	                   pcmk__server_log_name(pcmk_ipc_attrd));
1108 	        pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1109 	                   "enabled");
1110 	        crm_exit(CRM_EX_FATAL);
1111 	    }
1112 	}
1113 	
1114 	/*!
1115 	 * \internal
1116 	 * \brief Add an IPC server to the main loop for the executor API
1117 	 *
1118 	 * \param[out] ipcs  Where to store newly created IPC server
1119 	 * \param[in]  cb    IPC callbacks
1120 	 *
1121 	 * \note This function exits fatally on error.
1122 	 */
1123 	void
1124 	pcmk__serve_execd_ipc(qb_ipcs_service_t **ipcs,
1125 	                      struct qb_ipcs_service_handlers *cb)
1126 	{
1127 	    *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_execd),
1128 	                                    QB_IPC_SHM, cb);
1129 	
1130 	    if (*ipcs == NULL) {
1131 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1132 	                   pcmk__server_log_name(pcmk_ipc_execd));
1133 	        crm_exit(CRM_EX_FATAL);
1134 	    }
1135 	}
1136 	
1137 	/*!
1138 	 * \internal
1139 	 * \brief Add an IPC server to the main loop for the fencer API
1140 	 *
1141 	 * \param[out] ipcs  Where to store newly created IPC server
1142 	 * \param[in]  cb    IPC callbacks
1143 	 *
1144 	 * \note This function exits fatally on error.
1145 	 */
1146 	void
1147 	pcmk__serve_fenced_ipc(qb_ipcs_service_t **ipcs,
1148 	                       struct qb_ipcs_service_handlers *cb)
1149 	{
1150 	    *ipcs = mainloop_add_ipc_server_with_prio(pcmk__server_ipc_name(pcmk_ipc_fenced),
1151 	                                              QB_IPC_NATIVE, cb, QB_LOOP_HIGH);
1152 	
1153 	    if (*ipcs == NULL) {
1154 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1155 	                   pcmk__server_log_name(pcmk_ipc_fenced));
1156 	        pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1157 	                   "enabled");
1158 	        crm_exit(CRM_EX_FATAL);
1159 	    }
1160 	}
1161 	
1162 	/*!
1163 	 * \internal
1164 	 * \brief Add an IPC server to the main loop for the pacemakerd API
1165 	 *
1166 	 * \param[out] ipcs  Where to store newly created IPC server
1167 	 * \param[in]  cb    IPC callbacks
1168 	 *
1169 	 * \note This function exits with CRM_EX_OSERR on error.
1170 	 */
1171 	void
1172 	pcmk__serve_pacemakerd_ipc(qb_ipcs_service_t **ipcs,
1173 	                       struct qb_ipcs_service_handlers *cb)
1174 	{
1175 	    *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_pacemakerd),
1176 	                                    QB_IPC_NATIVE, cb);
1177 	
1178 	    if (*ipcs == NULL) {
1179 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1180 	                   pcmk__server_log_name(pcmk_ipc_pacemakerd));
1181 	        pcmk__crit("Verify pacemaker and pacemaker_remote are not both "
1182 	                   "enabled");
1183 	        /* sub-daemons are observed by pacemakerd. Thus we exit CRM_EX_FATAL
1184 	         * if we want to prevent pacemakerd from restarting them.
1185 	         * With pacemakerd we leave the exit-code shown to e.g. systemd
1186 	         * to what it was prior to moving the code here from pacemakerd.c
1187 	         */
1188 	        crm_exit(CRM_EX_OSERR);
1189 	    }
1190 	}
1191 	
1192 	/*!
1193 	 * \internal
1194 	 * \brief Add an IPC server to the main loop for the scheduler API
1195 	 *
1196 	 * \param[out] ipcs  Where to store newly created IPC server
1197 	 * \param[in]  cb    IPC callbacks
1198 	 *
1199 	 * \return Newly created IPC server
1200 	 * \note This function exits fatally on error.
1201 	 */
1202 	void
1203 	pcmk__serve_schedulerd_ipc(qb_ipcs_service_t **ipcs,
1204 	                           struct qb_ipcs_service_handlers *cb)
1205 	{
1206 	    *ipcs = mainloop_add_ipc_server(pcmk__server_ipc_name(pcmk_ipc_schedulerd),
1207 	                                    QB_IPC_NATIVE, cb);
1208 	
1209 	    if (*ipcs == NULL) {
1210 	        pcmk__crit("Failed to create %s IPC server; shutting down",
1211 	                   pcmk__server_log_name(pcmk_ipc_schedulerd));
1212 	        crm_exit(CRM_EX_FATAL);
1213 	    }
1214 	}
1215