1    	/*
2    	 * Copyright 2004-2026 the Pacemaker project contributors
3    	 *
4    	 * The version control history for this file may have further details.
5    	 *
6    	 * This source code is licensed under the GNU Lesser General Public License
7    	 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8    	 */
9    	
10   	#include <crm_internal.h>
11   	
12   	#include <arpa/inet.h>
13   	#include <inttypes.h>                   // PRIu32
14   	#include <netdb.h>
15   	#include <netinet/in.h>
16   	#include <stdbool.h>
17   	#include <stdint.h>                     // uint32_t
18   	#include <sys/socket.h>
19   	#include <sys/types.h>                  // size_t
20   	#include <sys/utsname.h>
21   	
22   	#include <bzlib.h>
23   	#include <corosync/corodefs.h>
24   	#include <corosync/corotypes.h>
25   	#include <corosync/hdb.h>
26   	#include <corosync/cpg.h>
27   	#include <qb/qbipc_common.h>
28   	#include <qb/qbipcc.h>
29   	#include <qb/qbutil.h>
30   	
31   	#include <crm/cluster/internal.h>
32   	#include <crm/common/ipc.h>
33   	#include <crm/common/mainloop.h>
34   	#include <crm/common/xml.h>
35   	
36   	#include "crmcluster_private.h"
37   	
38   	/* @TODO Once we can update the public API to require pcmk_cluster_t* in more
39   	 *       functions, we can ditch this in favor of cluster->cpg_handle.
40   	 */
41   	static cpg_handle_t pcmk_cpg_handle = 0;
42   	
43   	// @TODO These could be moved to pcmk_cluster_t* at that time as well
44   	static bool cpg_evicted = false;
45   	static GList *cs_message_queue = NULL;
46   	static int cs_message_timer = 0;
47   	
48   	/* @COMPAT Any changes to these structs (other than renames) will break all
49   	 * rolling upgrades, and should be avoided if possible or done at a major
50   	 * version bump if not
51   	 */
52   	
53   	struct pcmk__cpg_host_s {
54   	    uint32_t id;
55   	    uint32_t pid;
56   	    gboolean local;             // Unused but needed for compatibility
57   	    enum pcmk_ipc_server type;  // For logging only
58   	    uint32_t size;
59   	    char uname[MAX_NAME];
60   	} __attribute__ ((packed));
61   	
62   	typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
63   	
64   	struct pcmk__cpg_msg_s {
65   	    struct qb_ipc_response_header header __attribute__ ((aligned(8)));
66   	    uint32_t id;
67   	    gboolean is_compressed;
68   	
69   	    pcmk__cpg_host_t host;
70   	    pcmk__cpg_host_t sender;
71   	
72   	    uint32_t size;
73   	    uint32_t compressed_size;
74   	    /* 584 bytes */
75   	    char data[0];
76   	
77   	} __attribute__ ((packed));
78   	
79   	typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
80   	
81   	static void crm_cs_flush(gpointer data);
82   	
83   	#define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
84   	
85   	#define cs_repeat(rc, counter, max, code) do {                          \
86   	        rc = code;                                                      \
87   	        if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) {    \
88   	            counter++;                                                  \
89   	            pcmk__debug("Retrying operation after %ds", counter);       \
90   	            sleep(counter);                                             \
91   	        } else {                                                        \
92   	            break;                                                      \
93   	        }                                                               \
94   	    } while (counter < max)
95   	
96   	/*!
97   	 * \internal
98   	 * \brief Get the local Corosync node ID (via CPG)
99   	 *
100  	 * \param[in] handle  CPG connection to use (or 0 to use new connection)
101  	 *
102  	 * \return Corosync ID of local node (or 0 if not known)
103  	 */
104  	uint32_t
105  	pcmk__cpg_local_nodeid(cpg_handle_t handle)
106  	{
107  	    cs_error_t rc = CS_OK;
108  	    int retries = 0;
109  	    static uint32_t local_nodeid = 0;
110  	    cpg_handle_t local_handle = handle;
111  	    cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
112  	    int fd = -1;
113  	    uid_t found_uid = 0;
114  	    gid_t found_gid = 0;
115  	    pid_t found_pid = 0;
116  	    int rv = 0;
117  	
118  	    if (local_nodeid != 0) {
119  	        return local_nodeid;
120  	    }
121  	
122  	    if (handle == 0) {
123  	        pcmk__trace("Creating connection");
124  	        cs_repeat(rc, retries, 5,
125  	                  cpg_model_initialize(&local_handle, CPG_MODEL_V1,
126  	                                       (cpg_model_data_t *) &cpg_model_info,
127  	                                       NULL));
128  	        if (rc != CS_OK) {
129  	            pcmk__err("Could not connect to the CPG API: %s (%d)",
130  	                      pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
131  	            return 0;
132  	        }
133  	
134  	        rc = cpg_fd_get(local_handle, &fd);
135  	        if (rc != CS_OK) {
136  	            pcmk__err("Could not obtain the CPG API connection: %s (%d)",
137  	                      pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
138  	            goto bail;
139  	        }
140  	
141  	        // CPG provider run as root (at least in given user namespace)?
142  	        rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
143  	                                          &found_uid, &found_gid);
144  	        if (rv == 0) {
145  	            pcmk__err("CPG provider is not authentic: process %lld "
146  	                      "(uid: %lld, gid: %lld)",
147  	                      (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
148  	                      (long long) found_uid, (long long) found_gid);
149  	            goto bail;
150  	
151  	        } else if (rv < 0) {
152  	            pcmk__err("Could not verify authenticity of CPG provider: %s (%d)",
153  	                      strerror(-rv), -rv);
154  	            goto bail;
155  	        }
156  	    }
157  	
158  	    if (rc == CS_OK) {
159  	        retries = 0;
160  	        pcmk__trace("Performing lookup");
161  	        cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
162  	    }
163  	
164  	    if (rc != CS_OK) {
165  	        pcmk__err("Could not get local node id from the CPG API: %s (%d)",
166  	                  pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
167  	    }
168  	
169  	bail:
170  	    if (handle == 0) {
171  	        pcmk__trace("Closing connection");
172  	        cpg_finalize(local_handle);
173  	    }
174  	    pcmk__debug("Local nodeid is %u", local_nodeid);
175  	    return local_nodeid;
176  	}
177  	
178  	/*!
179  	 * \internal
180  	 * \brief Callback function for Corosync message queue timer
181  	 *
182  	 * \param[in] data  CPG handle
183  	 *
184  	 * \return FALSE (to indicate to glib that timer should not be removed)
185  	 */
186  	static gboolean
187  	crm_cs_flush_cb(gpointer data)
188  	{
189  	    cs_message_timer = 0;
190  	    crm_cs_flush(data);
191  	    return FALSE;
192  	}
193  	
194  	// Send no more than this many CPG messages in one flush
195  	#define CS_SEND_MAX 200
196  	
197  	/*!
198  	 * \internal
199  	 * \brief Send messages in Corosync CPG message queue
200  	 *
201  	 * \param[in] data   CPG handle
202  	 */
203  	static void
204  	crm_cs_flush(gpointer data)
205  	{
206  	    unsigned int sent = 0;
207  	    guint queue_len = 0;
208  	    cs_error_t rc = 0;
209  	    cpg_handle_t *handle = (cpg_handle_t *) data;
210  	
211  	    if (*handle == 0) {
212  	        pcmk__trace("Connection is dead");
213  	        return;
214  	    }
215  	
216  	    queue_len = g_list_length(cs_message_queue);
217  	    if (((queue_len % 1000) == 0) && (queue_len > 1)) {
218  	        pcmk__err("CPG queue has grown to %d", queue_len);
219  	
220  	    } else if (queue_len == CS_SEND_MAX) {
221  	        pcmk__warn("CPG queue has grown to %d", queue_len);
222  	    }
223  	
224  	    if (cs_message_timer != 0) {
225  	        /* There is already a timer, wait until it goes off */
226  	        pcmk__trace("Timer active %d", cs_message_timer);
227  	        return;
228  	    }
229  	
230  	    while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
231  	        struct iovec *iov = cs_message_queue->data;
232  	
233  	        rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
234  	        if (rc != CS_OK) {
235  	            break;
236  	        }
237  	
238  	        sent++;
239  	        pcmk__trace("CPG message sent, size=%zu", iov->iov_len);
240  	
241  	        cs_message_queue = g_list_remove(cs_message_queue, iov);
242  	        free(iov->iov_base);
243  	        free(iov);
244  	    }
245  	
246  	    queue_len -= sent;
247  	    do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
248  	               "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
249  	               sent, pcmk__plural_s(sent), queue_len,
250  	               pcmk_rc_str(pcmk__corosync2rc(rc)), (int) rc);
251  	
252  	    if (cs_message_queue) {
253  	        uint32_t delay_ms = 100;
254  	        if (rc != CS_OK) {
255  	            /* Proportionally more if sending failed but cap at 1s */
256  	            delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
257  	        }
258  	        cs_message_timer = pcmk__create_timer(delay_ms, crm_cs_flush_cb, data);
259  	    }
260  	}
261  	
262  	/*!
263  	 * \internal
264  	 * \brief Dispatch function for CPG handle
265  	 *
266  	 * \param[in,out] user_data  Cluster object
267  	 *
268  	 * \return 0 on success, -1 on error (per mainloop_io_t interface)
269  	 */
270  	static int
271  	pcmk_cpg_dispatch(gpointer user_data)
272  	{
273  	    cs_error_t rc = CS_OK;
274  	    pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
275  	
276  	    rc = cpg_dispatch(cluster->priv->cpg_handle, CS_DISPATCH_ONE);
277  	    if (rc != CS_OK) {
278  	        pcmk__err("Connection to the CPG API failed: %s (%d)",
279  	                  pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
280  	        cpg_finalize(cluster->priv->cpg_handle);
281  	        cluster->priv->cpg_handle = 0;
282  	        return -1;
283  	
284  	    } else if (cpg_evicted) {
285  	        pcmk__err("Evicted from CPG membership");
286  	        return -1;
287  	    }
288  	    return 0;
289  	}
290  	
291  	static inline const char *
292  	ais_dest(const pcmk__cpg_host_t *host)
293  	{
294  	    return (host->size > 0)? host->uname : "<all>";
295  	}
296  	
297  	static inline const char *
298  	msg_type2text(enum pcmk_ipc_server type)
299  	{
300  	    const char *name = pcmk__server_message_type(type);
301  	
302  	    return pcmk__s(name, "unknown");
303  	}
304  	
305  	/*!
306  	 * \internal
307  	 * \brief Check whether a Corosync CPG message is valid
308  	 *
309  	 * \param[in] msg   Corosync CPG message to check
310  	 *
311  	 * \return true if \p msg is valid, otherwise false
312  	 */
313  	static bool
314  	check_message_sanity(const pcmk__cpg_msg_t *msg)
315  	{
316  	    int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
317  	
318  	    if (payload_size < 1) {
319  	        pcmk__err("%sCPG message %d from %s invalid: Claimed size of %d bytes "
320  	                  "is too small " QB_XS " from %s[%u] to %s@%s",
321  	                  (msg->is_compressed? "Compressed " : ""),
322  	                  msg->id, ais_dest(&(msg->sender)),
323  	                  (int) msg->header.size,
324  	                  msg_type2text(msg->sender.type), msg->sender.pid,
325  	                  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
326  	        return false;
327  	    }
328  	
329  	    if (msg->header.error != CS_OK) {
330  	        pcmk__err("%sCPG message %d from %s invalid: Sender indicated error %d "
331  	                  QB_XS " from %s[%u] to %s@%s",
332  	                  (msg->is_compressed? "Compressed " : ""),
333  	                  msg->id, ais_dest(&(msg->sender)),
334  	                  msg->header.error,
335  	                  msg_type2text(msg->sender.type), msg->sender.pid,
336  	                  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
337  	        return false;
338  	    }
339  	
340  	    if (msg_data_len(msg) != payload_size) {
341  	        pcmk__err("%sCPG message %d from %s invalid: Total size %d "
342  	                  "inconsistent with payload size %d "
343  	                  QB_XS " from %s[%u] to %s@%s",
344  	                  (msg->is_compressed? "Compressed " : ""),
345  	                  msg->id, ais_dest(&(msg->sender)),
346  	                  (int) msg->header.size, (int) msg_data_len(msg),
347  	                  msg_type2text(msg->sender.type), msg->sender.pid,
348  	                  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
349  	        return false;
350  	    }
351  	
352  	    if (!msg->is_compressed &&
353  	        /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
354  	         * but checking the last byte or two should be quick
355  	         */
356  	        (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
357  	         || (msg->data[msg->size - 1] != '\0'))) {
358  	        pcmk__err("CPG message %d from %s invalid: Payload does not end at "
359  	                  "byte %" PRIu32 " " QB_XS " from %s[%u] to %s@%s",
360  	                  msg->id, ais_dest(&(msg->sender)), msg->size,
361  	                  msg_type2text(msg->sender.type), msg->sender.pid,
362  	                  msg_type2text(msg->host.type), ais_dest(&(msg->host)));
363  	        return false;
364  	    }
365  	
366  	    pcmk__trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
367  	                (int) msg->header.size,
368  	                (msg->is_compressed? "compressed " : ""), msg->id,
369  	                msg_type2text(msg->sender.type), msg->sender.pid,
370  	                ais_dest(&(msg->sender)),
371  	                msg_type2text(msg->host.type), ais_dest(&(msg->host)));
372  	    return true;
373  	}
374  	
375  	/*!
376  	 * \internal
377  	 * \brief Extract text data from a Corosync CPG message
378  	 *
379  	 * \param[in]     handle     CPG connection (to get local node ID if not known)
380  	 * \param[in]     sender_id  Corosync ID of node that sent message
381  	 * \param[in]     pid        Process ID of message sender (for logging only)
382  	 * \param[in,out] content    CPG message
383  	 * \param[out]    from       If not \c NULL, will be set to sender uname
384  	 *                           (valid for the lifetime of \p content)
385  	 *
386  	 * \return Newly allocated string with message data, or NULL for errors and
387  	 *         messages not intended for the local node
388  	 *
389  	 * \note The caller is responsible for freeing the return value using \c free().
390  	 */
391  	char *
392  	pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
393  	                       void *content, const char **from)
394  	{
395  	    char *data = NULL;
396  	    pcmk__cpg_msg_t *msg = content;
397  	
398  	    if (from != NULL) {
399  	        *from = NULL;
400  	    }
401  	
402  	    if (handle != 0) {
403  	        uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
404  	        const char *local_name = pcmk__cluster_local_node_name();
405  	
406  	        // Update or validate message sender ID
407  	        if (msg->sender.id == 0) {
408  	            msg->sender.id = sender_id;
409  	        } else if (msg->sender.id != sender_id) {
410  	            pcmk__warn("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
411  	                       ": claimed ID %" PRIu32,
412  	                       sender_id, pid, msg->sender.id);
413  	            return NULL;
414  	        }
415  	
416  	        // Ignore messages that aren't for the local node
417  	        if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
418  	            pcmk__trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
419  	                        ": for ID %" PRIu32 " not %" PRIu32,
420  	                        sender_id, pid, msg->host.id, local_nodeid);
421  	            return NULL;
422  	        }
423  	        if ((msg->host.size > 0)
424  	            && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
425  	
426  	            pcmk__trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
427  	                        ": for name %s not %s",
428  	                        sender_id, pid, msg->host.uname, local_name);
429  	            return NULL;
430  	        }
431  	
432  	        // Add sender name if not in original message
433  	        if (msg->sender.size == 0) {
434  	            const pcmk__node_status_t *peer =
435  	                pcmk__get_node(sender_id, NULL, NULL,
436  	                               pcmk__node_search_cluster_member);
437  	
438  	            if (peer->name == NULL) {
439  	                pcmk__debug("Received CPG message from node with ID %" PRIu32
440  	                            " but its name is unknown",
441  	                            sender_id);
442  	            } else {
443  	                pcmk__debug("Updating name of CPG message sender with ID %" PRIu32
444  	                            " to %s",
445  	                            sender_id, peer->name);
446  	                msg->sender.size = strlen(peer->name);
447  	                memset(msg->sender.uname, 0, MAX_NAME);
448  	                memcpy(msg->sender.uname, peer->name, msg->sender.size);
449  	            }
450  	        }
451  	    }
452  	
453  	    // Ensure sender is in peer cache (though it should already be)
454  	    pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
455  	                   pcmk__node_search_cluster_member);
456  	
457  	    if (from != NULL) {
458  	        *from = msg->sender.uname;
459  	    }
460  	
461  	    if (!check_message_sanity(msg)) {
462  	        return NULL;
463  	    }
464  	
465  	    if (msg->is_compressed && (msg->size > 0)) {
466  	        int rc = BZ_OK;
467  	        unsigned int new_size = msg->size + 1;
468  	        char *uncompressed = pcmk__assert_alloc(new_size, sizeof(char));
469  	
470  	        rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
471  	                                        msg->compressed_size, 1, 0);
472  	        rc = pcmk__bzlib2rc(rc);
473  	        if ((rc == pcmk_rc_ok) && (msg->size != new_size)) { // libbz2 bug?
474  	            rc = pcmk_rc_compression;
475  	        }
476  	        if (rc != pcmk_rc_ok) {
477  	            free(uncompressed);
478  	            pcmk__warn("Ignoring compressed CPG message %d from %s (ID %" PRIu32
479  	                       " PID %" PRIu32 "): %s",
480  	                       msg->id, ais_dest(&(msg->sender)), sender_id, pid,
481  	                       pcmk_rc_str(rc));
482  	            return NULL;
483  	        }
484  	        data = uncompressed;
485  	
486  	    } else {
487  	        data = pcmk__str_copy(msg->data);
488  	    }
489  	
490  	    pcmk__trace("Received %sCPG message %d from %s (ID %" PRIu32
491  	                " PID %" PRIu32 "): %.40s...",
492  	                (msg->is_compressed? "compressed " : ""),
493  	                msg->id, ais_dest(&(msg->sender)), sender_id, pid, msg->data);
494  	    return data;
495  	}
496  	
497  	/*!
498  	 * \internal
499  	 * \brief Compare cpg_address objects by node ID
500  	 *
501  	 * \param[in] first   First cpg_address structure to compare
502  	 * \param[in] second  Second cpg_address structure to compare
503  	 *
504  	 * \return Negative number if first's node ID is lower,
505  	 *         positive number if first's node ID is greater,
506  	 *         or 0 if both node IDs are equal
507  	 */
508  	static int
509  	cmp_member_list_nodeid(const void *first, const void *second)
510  	{
511  	    const struct cpg_address *const a = *((const struct cpg_address **) first),
512  	                             *const b = *((const struct cpg_address **) second);
513  	    if (a->nodeid < b->nodeid) {
514  	        return -1;
515  	    } else if (a->nodeid > b->nodeid) {
516  	        return 1;
517  	    }
518  	    /* don't bother with "reason" nor "pid" */
519  	    return 0;
520  	}
521  	
522  	/*!
523  	 * \internal
524  	 * \brief Get a readable string equivalent of a cpg_reason_t value
525  	 *
526  	 * \param[in] reason  CPG reason value
527  	 *
528  	 * \return Readable string suitable for logging
529  	 */
530  	static const char *
531  	cpgreason2str(cpg_reason_t reason)
532  	{
533  	    switch (reason) {
534  	        case CPG_REASON_JOIN:       return " via cpg_join";
535  	        case CPG_REASON_LEAVE:      return " via cpg_leave";
536  	        case CPG_REASON_NODEDOWN:   return " via cluster exit";
537  	        case CPG_REASON_NODEUP:     return " via cluster join";
538  	        case CPG_REASON_PROCDOWN:   return " for unknown reason";
539  	        default:                    break;
540  	    }
541  	    return "";
542  	}
543  	
544  	/*!
545  	 * \internal
546  	 * \brief Get a log-friendly node name
547  	 *
548  	 * \param[in] peer  Node to check
549  	 *
550  	 * \return Node's uname, or readable string if not known
551  	 */
552  	static inline const char *
553  	peer_name(const pcmk__node_status_t *peer)
554  	{
555  	    return (peer != NULL)? pcmk__s(peer->name, "peer node") : "unknown node";
556  	}
557  	
558  	/*!
559  	 * \internal
560  	 * \brief Process a CPG peer's leaving the cluster
561  	 *
562  	 * \param[in] cpg_group_name      CPG group name (for logging)
563  	 * \param[in] event_counter       Event number (for logging)
564  	 * \param[in] local_nodeid        Node ID of local node
565  	 * \param[in] cpg_peer            CPG peer that left
566  	 * \param[in] sorted_member_list  List of remaining members, qsort()-ed by ID
567  	 * \param[in] member_list_entries Number of entries in \p sorted_member_list
568  	 */
569  	static void
570  	node_left(const char *cpg_group_name, int event_counter,
571  	          uint32_t local_nodeid, const struct cpg_address *cpg_peer,
572  	          const struct cpg_address **sorted_member_list,
573  	          size_t member_list_entries)
574  	{
575  	    pcmk__node_status_t *peer =
576  	        pcmk__search_node_caches(cpg_peer->nodeid, NULL, NULL,
577  	                                 pcmk__node_search_cluster_member);
578  	    const struct cpg_address **rival = NULL;
579  	
580  	    /* Most CPG-related Pacemaker code assumes that only one process on a node
581  	     * can be in the process group, but Corosync does not impose this
582  	     * limitation, and more than one can be a member in practice due to a
583  	     * daemon attempting to start while another instance is already running.
584  	     *
585  	     * Check for any such duplicate instances, because we don't want to process
586  	     * their leaving as if our actual peer left. If the peer that left still has
587  	     * an entry in sorted_member_list (with a different PID), we will ignore the
588  	     * leaving.
589  	     *
590  	     * @TODO Track CPG members' PIDs so we can tell exactly who left.
591  	     */
592  	    if (peer != NULL) {
593  	        rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
594  	                        sizeof(const struct cpg_address *),
595  	                        cmp_member_list_nodeid);
596  	    }
597  	
598  	    if (rival == NULL) {
599  	        pcmk__info("Group %s event %d: %s (node %u pid %u) left%s",
600  	                   cpg_group_name, event_counter, peer_name(peer),
601  	                   cpg_peer->nodeid, cpg_peer->pid,
602  	                   cpgreason2str(cpg_peer->reason));
603  	        if (peer != NULL) {
604  	            crm_update_peer_proc(__func__, peer, crm_proc_cpg,
605  	                                 PCMK_VALUE_OFFLINE);
606  	        }
607  	    } else if (cpg_peer->nodeid == local_nodeid) {
608  	        pcmk__warn("Group %s event %d: duplicate local pid %u left%s",
609  	                   cpg_group_name, event_counter,
610  	                   cpg_peer->pid, cpgreason2str(cpg_peer->reason));
611  	    } else {
612  	        pcmk__warn("Group %s event %d: %s (node %u) duplicate pid %u left%s "
613  	                   "(%u remains)",
614  	                   cpg_group_name, event_counter, peer_name(peer),
615  	                   cpg_peer->nodeid, cpg_peer->pid,
616  	                   cpgreason2str(cpg_peer->reason), (*rival)->pid);
617  	    }
618  	}
619  	
620  	/*!
621  	 * \internal
622  	 * \brief Handle a CPG configuration change event
623  	 *
624  	 * \param[in] handle               CPG connection
625  	 * \param[in] group_name           CPG group name
626  	 * \param[in] member_list          List of current CPG members
627  	 * \param[in] member_list_entries  Number of entries in \p member_list
628  	 * \param[in] left_list            List of CPG members that left
629  	 * \param[in] left_list_entries    Number of entries in \p left_list
630  	 * \param[in] joined_list          List of CPG members that joined
631  	 * \param[in] joined_list_entries  Number of entries in \p joined_list
632  	 *
633  	 * \note This is of type \c cpg_confchg_fn_t, intended to be used in a
634  	 *       \c cpg_callbacks_t object.
635  	 */
636  	void
637  	pcmk__cpg_confchg_cb(cpg_handle_t handle,
638  	                     const struct cpg_name *group_name,
639  	                     const struct cpg_address *member_list,
640  	                     size_t member_list_entries,
641  	                     const struct cpg_address *left_list,
642  	                     size_t left_list_entries,
643  	                     const struct cpg_address *joined_list,
644  	                     size_t joined_list_entries)
645  	{
646  	    static int counter = 0;
647  	
648  	    bool found = false;
649  	    uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
650  	    const struct cpg_address **sorted = NULL;
651  	
652  	    sorted = pcmk__assert_alloc(member_list_entries,
653  	                                sizeof(const struct cpg_address *));
654  	
(1) Event path: Condition "iter < member_list_entries", taking true branch.
(3) Event path: Condition "iter < member_list_entries", taking false branch.
655  	    for (size_t iter = 0; iter < member_list_entries; iter++) {
656  	        sorted[iter] = member_list + iter;
(2) Event path: Jumping back to the beginning of the loop.
657  	    }
658  	
659  	    // So that the cross-matching of multiply-subscribed nodes is then cheap
660  	    qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
661  	          cmp_member_list_nodeid);
662  	
(4) Event path: Condition "i < left_list_entries", taking true branch.
(6) Event path: Condition "i < left_list_entries", taking false branch.
663  	    for (int i = 0; i < left_list_entries; i++) {
664  	        node_left(group_name->value, counter, local_nodeid, &left_list[i],
665  	                  sorted, member_list_entries);
(5) Event path: Jumping back to the beginning of the loop.
666  	    }
667  	
CID (unavailable; MK=7d5c0a2eccb043b4c051079114217918) (#1 of 1): Inconsistent C union access (INCONSISTENT_UNION_ACCESS):
(7) Event assign_union_field: The union field "in" of "_pp" is written.
(8) Event inconsistent_union_field_access: In "_pp.out", the union field used: "out" is inconsistent with the field most recently stored: "in".
668  	    g_clear_pointer(&sorted, free);
669  	
670  	    for (int i = 0; i < joined_list_entries; i++) {
671  	        pcmk__info("Group %s event %d: node %u pid %u joined%s",
672  	                   group_name->value, counter, joined_list[i].nodeid,
673  	                   joined_list[i].pid, cpgreason2str(joined_list[i].reason));
674  	    }
675  	
676  	    for (int i = 0; i < member_list_entries; i++) {
677  	        pcmk__node_status_t *peer =
678  	            pcmk__get_node(member_list[i].nodeid, NULL, NULL,
679  	                           pcmk__node_search_cluster_member);
680  	
681  	        if (member_list[i].nodeid == local_nodeid
682  	                && member_list[i].pid != getpid()) {
683  	            // See the note in node_left()
684  	            pcmk__warn("Group %s event %d: detected duplicate local pid %u",
685  	                       group_name->value, counter, member_list[i].pid);
686  	            continue;
687  	        }
688  	        pcmk__info("Group %s event %d: %s (node %u pid %u) is member",
689  	                 group_name->value, counter, peer_name(peer),
690  	                 member_list[i].nodeid, member_list[i].pid);
691  	
692  	        /* If the caller left auto-reaping enabled, this will also update the
693  	         * state to member.
694  	         */
695  	        peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
696  	                                    PCMK_VALUE_ONLINE);
697  	
698  	        if (peer && peer->state && strcmp(peer->state, PCMK_VALUE_MEMBER)) {
699  	            /* The node is a CPG member, but we currently think it's not a
700  	             * cluster member. This is possible only if auto-reaping was
701  	             * disabled. The node may be joining, and we happened to get the CPG
702  	             * notification before the quorum notification; or the node may have
703  	             * just died, and we are processing its final messages; or a bug
704  	             * has affected the peer cache.
705  	             */
706  	            time_t now = time(NULL);
707  	
708  	            if (peer->when_lost == 0) {
709  	                // Track when we first got into this contradictory state
710  	                peer->when_lost = now;
711  	
712  	            } else if (now > (peer->when_lost + 60)) {
713  	                // If it persists for more than a minute, update the state
714  	                pcmk__warn("Node %u is member of group %s but was believed "
715  	                           "offline",
716  	                           member_list[i].nodeid, group_name->value);
717  	                pcmk__update_peer_state(__func__, peer, PCMK_VALUE_MEMBER, 0);
718  	            }
719  	        }
720  	
721  	        if (local_nodeid == member_list[i].nodeid) {
722  	            found = true;
723  	        }
724  	    }
725  	
726  	    if (!found) {
727  	        pcmk__err("Local node was evicted from group %s", group_name->value);
728  	        cpg_evicted = true;
729  	    }
730  	
731  	    counter++;
732  	}
733  	
734  	/*!
735  	 * \brief Set the CPG deliver callback function for a cluster object
736  	 *
737  	 * \param[in,out] cluster  Cluster object
738  	 * \param[in]     fn       Deliver callback function to set
739  	 *
740  	 * \return Standard Pacemaker return code
741  	 */
742  	int
743  	pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
744  	{
745  	    if (cluster == NULL) {
746  	        return EINVAL;
747  	    }
748  	    cluster->cpg.cpg_deliver_fn = fn;
749  	    return pcmk_rc_ok;
750  	}
751  	
752  	/*!
753  	 * \brief Set the CPG config change callback function for a cluster object
754  	 *
755  	 * \param[in,out] cluster  Cluster object
756  	 * \param[in]     fn       Configuration change callback function to set
757  	 *
758  	 * \return Standard Pacemaker return code
759  	 */
760  	int
761  	pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
762  	{
763  	    if (cluster == NULL) {
764  	        return EINVAL;
765  	    }
766  	    cluster->cpg.cpg_confchg_fn = fn;
767  	    return pcmk_rc_ok;
768  	}
769  	
770  	/*!
771  	 * \brief Connect to Corosync CPG
772  	 *
773  	 * \param[in,out] cluster  Initialized cluster object to connect
774  	 *
775  	 * \return Standard Pacemaker return code
776  	 */
777  	int
778  	pcmk__cpg_connect(pcmk_cluster_t *cluster)
779  	{
780  	    cs_error_t rc;
781  	    int fd = -1;
782  	    int retries = 0;
783  	    uint32_t id = 0;
784  	    pcmk__node_status_t *peer = NULL;
785  	    cpg_handle_t handle = 0;
786  	    const char *cpg_group_name = NULL;
787  	    uid_t found_uid = 0;
788  	    gid_t found_gid = 0;
789  	    pid_t found_pid = 0;
790  	    int rv;
791  	
792  	    struct mainloop_fd_callbacks cpg_fd_callbacks = {
793  	        .dispatch = pcmk_cpg_dispatch,
794  	        .destroy = cluster->destroy,
795  	    };
796  	
797  	    cpg_model_v1_data_t cpg_model_info = {
798  		    .model = CPG_MODEL_V1,
799  		    .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
800  		    .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
801  		    .cpg_totem_confchg_fn = NULL,
802  		    .flags = 0,
803  	    };
804  	
805  	    cpg_evicted = false;
806  	
807  	    if (cluster->priv->server != pcmk_ipc_unknown) {
808  	        cpg_group_name = pcmk__server_message_type(cluster->priv->server);
809  	    }
810  	
811  	    if (cpg_group_name == NULL) {
812  	        /* The name will already be non-NULL for Pacemaker servers. If a
813  	         * command-line tool or external caller connects to the cluster,
814  	         * they will join this CPG group.
815  	         */
816  	        cpg_group_name = pcmk__s(crm_system_name, "unknown");
817  	    }
818  	    memset(cluster->priv->group.value, 0, 128);
819  	    strncpy(cluster->priv->group.value, cpg_group_name, 127);
820  	    cluster->priv->group.length = strlen(cluster->priv->group.value) + 1;
821  	
822  	    cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
823  	    if (rc != CS_OK) {
824  	        pcmk__err("Could not connect to the CPG API: %s (%d)",
825  	                  pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
826  	        goto bail;
827  	    }
828  	
829  	    rc = cpg_fd_get(handle, &fd);
830  	    if (rc != CS_OK) {
831  	        pcmk__err("Could not obtain the CPG API connection: %s (%d)",
832  	                  pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
833  	        goto bail;
834  	    }
835  	
836  	    /* CPG provider run as root (in given user namespace, anyway)? */
837  	    if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
838  	                                            &found_uid, &found_gid))) {
839  	        pcmk__err("CPG provider is not authentic: process %lld "
840  	                  "(uid: %lld, gid: %lld)",
841  	                  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
842  	                  (long long) found_uid, (long long) found_gid);
843  	        rc = CS_ERR_ACCESS;
844  	        goto bail;
845  	    } else if (rv < 0) {
846  	        pcmk__err("Could not verify authenticity of CPG provider: %s (%d)",
847  	                  strerror(-rv), -rv);
848  	        rc = CS_ERR_ACCESS;
849  	        goto bail;
850  	    }
851  	
852  	    id = pcmk__cpg_local_nodeid(handle);
853  	    if (id == 0) {
854  	        pcmk__err("Could not get local node id from the CPG API");
855  	        goto bail;
856  	
857  	    }
858  	    cluster->priv->node_id = id;
859  	
860  	    retries = 0;
861  	    cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->priv->group));
862  	    if (rc != CS_OK) {
863  	        pcmk__err("Could not join the CPG group '%s': %d", cpg_group_name, rc);
864  	        goto bail;
865  	    }
866  	
867  	    pcmk_cpg_handle = handle;
868  	    cluster->priv->cpg_handle = handle;
869  	    mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
870  	
871  	  bail:
872  	    if (rc != CS_OK) {
873  	        cpg_finalize(handle);
874  	        // @TODO Map rc to more specific Pacemaker return code
875  	        return ENOTCONN;
876  	    }
877  	
878  	    peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
879  	    crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE);
880  	    return pcmk_rc_ok;
881  	}
882  	
883  	/*!
884  	 * \internal
885  	 * \brief Disconnect from Corosync CPG
886  	 *
887  	 * \param[in,out] cluster  Cluster object to disconnect
888  	 */
889  	void
890  	pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
891  	{
892  	    pcmk_cpg_handle = 0;
893  	    if (cluster->priv->cpg_handle != 0) {
894  	        pcmk__trace("Disconnecting CPG");
895  	        cpg_leave(cluster->priv->cpg_handle, &cluster->priv->group);
896  	        cpg_finalize(cluster->priv->cpg_handle);
897  	        cluster->priv->cpg_handle = 0;
898  	
899  	    } else {
900  	        pcmk__info("No CPG connection");
901  	    }
902  	}
903  	
904  	/*!
905  	 * \internal
906  	 * \brief Send string data via Corosync CPG
907  	 *
908  	 * \param[in] data   Data to send
909  	 * \param[in] node   Cluster node to send message to
910  	 * \param[in] dest   Type of message to send
911  	 *
912  	 * \return \c true on success, or \c false otherwise
913  	 */
914  	static bool
915  	send_cpg_text(const char *data, const pcmk__node_status_t *node,
916  	              enum pcmk_ipc_server dest)
917  	{
918  	    static int msg_id = 0;
919  	    static int local_pid = 0;
920  	    static int local_name_len = 0;
921  	    static const char *local_name = NULL;
922  	
923  	    char *target = NULL;
924  	    struct iovec *iov;
925  	    pcmk__cpg_msg_t *msg = NULL;
926  	
927  	    if (local_name == NULL) {
928  	        local_name = pcmk__cluster_local_node_name();
929  	    }
930  	    if ((local_name_len == 0) && (local_name != NULL)) {
931  	        local_name_len = strlen(local_name);
932  	    }
933  	
934  	    if (data == NULL) {
935  	        data = "";
936  	    }
937  	
938  	    if (local_pid == 0) {
939  	        local_pid = getpid();
940  	    }
941  	
942  	    msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
943  	
944  	    msg_id++;
945  	    msg->id = msg_id;
946  	    msg->header.error = CS_OK;
947  	
948  	    msg->host.type = dest;
949  	
950  	    if (node != NULL) {
951  	        if (node->name != NULL) {
952  	            target = pcmk__str_copy(node->name);
953  	            msg->host.size = strlen(node->name);
954  	            memset(msg->host.uname, 0, MAX_NAME);
955  	            memcpy(msg->host.uname, node->name, msg->host.size);
956  	
957  	        } else {
958  	            target = pcmk__assert_asprintf("%" PRIu32, node->cluster_layer_id);
959  	        }
960  	        msg->host.id = node->cluster_layer_id;
961  	
962  	    } else {
963  	        target = pcmk__str_copy("all");
964  	    }
965  	
966  	    msg->sender.id = 0;
967  	    msg->sender.type = pcmk__parse_server(crm_system_name);
968  	    msg->sender.pid = local_pid;
969  	    msg->sender.size = local_name_len;
970  	    memset(msg->sender.uname, 0, MAX_NAME);
971  	
972  	    if ((local_name != NULL) && (msg->sender.size != 0)) {
973  	        memcpy(msg->sender.uname, local_name, msg->sender.size);
974  	    }
975  	
976  	    msg->size = 1 + strlen(data);
977  	    msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
978  	
979  	    if (msg->size < PCMK__BZ2_THRESHOLD) {
980  	        msg = pcmk__realloc(msg, msg->header.size);
981  	        memcpy(msg->data, data, msg->size);
982  	
983  	    } else {
984  	        char *compressed = NULL;
985  	        unsigned int new_size = 0;
986  	
987  	        if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
988  	                           &new_size) == pcmk_rc_ok) {
989  	
990  	            msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
991  	            msg = pcmk__realloc(msg, msg->header.size);
992  	            memcpy(msg->data, compressed, new_size);
993  	
994  	            msg->is_compressed = TRUE;
995  	            msg->compressed_size = new_size;
996  	
997  	        } else {
998  	            msg = pcmk__realloc(msg, msg->header.size);
999  	            memcpy(msg->data, data, msg->size);
1000 	        }
1001 	
1002 	        free(compressed);
1003 	    }
1004 	
1005 	    iov = pcmk__assert_alloc(1, sizeof(struct iovec));
1006 	    iov->iov_base = msg;
1007 	    iov->iov_len = msg->header.size;
1008 	
1009 	    if (msg->compressed_size > 0) {
1010 	        pcmk__trace("Queueing CPG message %" PRIu32 " to %s "
1011 	                    "(%zu bytes, %" PRIu32 " bytes compressed payload): %.200s",
1012 	                    msg->id, target, iov->iov_len, msg->compressed_size, data);
1013 	    } else {
1014 	        pcmk__trace("Queueing CPG message %" PRIu32 " to %s "
1015 	                    "(%zu bytes, %" PRIu32 " bytes payload): %.200s",
1016 	                    msg->id, target, iov->iov_len, msg->size, data);
1017 	    }
1018 	
1019 	    free(target);
1020 	
1021 	    cs_message_queue = g_list_append(cs_message_queue, iov);
1022 	    crm_cs_flush(&pcmk_cpg_handle);
1023 	
1024 	    return true;
1025 	}
1026 	
1027 	/*!
1028 	 * \internal
1029 	 * \brief Send an XML message via Corosync CPG
1030 	 *
1031 	 * \param[in] msg   XML message to send
1032 	 * \param[in] node  Cluster node to send message to
1033 	 * \param[in] dest  Type of message to send
1034 	 *
1035 	 * \return TRUE on success, otherwise FALSE
1036 	 */
1037 	bool
1038 	pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node,
1039 	                   enum pcmk_ipc_server dest)
1040 	{
1041 	    bool rc = true;
1042 	    GString *data = g_string_sized_new(1024);
1043 	
1044 	    pcmk__xml_string(msg, 0, data, 0);
1045 	
1046 	    rc = send_cpg_text(data->str, node, dest);
1047 	    g_string_free(data, TRUE);
1048 	    return rc;
1049 	}
1050