1    	/*
2    	 * Copyright 2004-2026 the Pacemaker project contributors
3    	 *
4    	 * The version control history for this file may have further details.
5    	 *
6    	 * This source code is licensed under the GNU General Public License version 2
7    	 * or later (GPLv2+) WITHOUT ANY WARRANTY.
8    	 */
9    	
10   	#include <crm_internal.h>
11   	
12   	#include <errno.h>                  // EACCES, ECONNREFUSED
13   	#include <inttypes.h>               // PRIu64
14   	#include <stdbool.h>
15   	#include <stddef.h>                 // NULL, size_t
16   	#include <stdint.h>                 // uint32_t, uint64_t
17   	#include <stdlib.h>                 // free
18   	#include <syslog.h>                 // LOG_INFO, LOG_DEBUG
19   	#include <time.h>                   // time_t
20   	#include <unistd.h>                 // close
21   	
22   	#include <glib.h>                   // gboolean, gpointer, g_*, etc.
23   	#include <libxml/tree.h>            // xmlNode
24   	#include <qb/qbdefs.h>              // QB_FALSE
25   	#include <qb/qbipcs.h>              // qb_ipcs_connection_t
26   	#include <qb/qblog.h>               // LOG_TRACE
27   	
28   	#include <crm/cib.h>                // cib_call_options values
29   	#include <crm/cib/internal.h>       // cib__*
30   	#include <crm/cluster.h>            // pcmk_cluster_disconnect
31   	#include <crm/cluster/internal.h>   // pcmk__cluster_send_message
32   	#include <crm/common/internal.h>    // pcmk__s, pcmk__str_eq
33   	#include <crm/common/ipc.h>         // crm_ipc_*, pcmk_ipc_*
34   	#include <crm/common/logging.h>     // CRM_LOG_ASSERT, CRM_CHECK
35   	#include <crm/common/mainloop.h>    // mainloop_*
36   	#include <crm/common/results.h>     // pcmk_rc_*
37   	#include <crm/common/xml.h>         // PCMK_XA_*, PCMK_XE_*
38   	#include <crm/crm.h>                // CRM_OP_*
39   	
40   	#include "pacemaker-based.h"
41   	
42   	#define EXIT_ESCALATION_MS 10000
43   	
44   	static uint64_t ping_seq = 0;
45   	static char *ping_digest = NULL;
46   	static bool ping_modified_since = false;
47   	
48   	/*!
49   	 * \internal
50   	 * \brief Create reply XML for a CIB request
51   	 *
52   	 * \param[in] op            CIB operation type
53   	 * \param[in] call_id       CIB call ID
54   	 * \param[in] client_id     CIB client ID
55   	 * \param[in] call_options  Group of <tt>enum cib_call_options</tt> flags
56   	 * \param[in] rc            Request return code
57   	 * \param[in] call_data     Request output data
58   	 *
59   	 * \return Reply XML (guaranteed not to be \c NULL)
60   	 *
61   	 * \note The caller is responsible for freeing the return value using
62   	 *       \p pcmk__xml_free().
63   	 */
64   	static xmlNode *
65   	create_cib_reply(const char *op, const char *call_id, const char *client_id,
66   	                 uint32_t call_options, int rc, xmlNode *call_data)
67   	{
68   	    xmlNode *reply = pcmk__xe_create(NULL, PCMK__XE_CIB_REPLY);
69   	
70   	    pcmk__xe_set(reply, PCMK__XA_T, PCMK__VALUE_CIB);
71   	    pcmk__xe_set(reply, PCMK__XA_CIB_OP, op);
72   	    pcmk__xe_set(reply, PCMK__XA_CIB_CALLID, call_id);
73   	    pcmk__xe_set(reply, PCMK__XA_CIB_CLIENTID, client_id);
74   	    pcmk__xe_set_int(reply, PCMK__XA_CIB_CALLOPT, call_options);
75   	    pcmk__xe_set_int(reply, PCMK__XA_CIB_RC, rc);
76   	
77   	    if (call_data != NULL) {
78   	        xmlNode *wrapper = pcmk__xe_create(reply, PCMK__XE_CIB_CALLDATA);
79   	
80   	        pcmk__trace("Attaching reply output");
81   	        pcmk__xml_copy(wrapper, call_data);
82   	    }
83   	
84   	    crm_log_xml_explicit(reply, "cib:reply");
85   	    return reply;
86   	}
87   	
88   	static void
89   	do_local_notify(const xmlNode *xml, const char *client_id, bool sync_reply,
90   	                bool from_peer)
91   	{
92   	    int call_id = 0;
93   	    int rc = pcmk_rc_ok;
94   	    pcmk__client_t *client = NULL;
95   	    uint32_t flags = crm_ipc_server_event;
96   	    const char *client_type = NULL;
97   	    const char *client_name = NULL;
98   	    const char *client_desc = "";
99   	    const char *sync_s = (sync_reply? "synchronously" : "asynchronously");
100  	
101  	    CRM_CHECK((xml != NULL) && (client_id != NULL), return);
102  	
103  	    if (from_peer) {
104  	        client_desc = " (originator of delegated_request)";
105  	    }
106  	
107  	    pcmk__trace("Performing local %s notification for %s", sync_s, client_id);
108  	
109  	    pcmk__xe_get_int(xml, PCMK__XA_CIB_CALLID, &call_id);
110  	
111  	    client = pcmk__find_client_by_id(client_id);
112  	    if (client == NULL) {
113  	        pcmk__debug("Could not notify client %s%s %s of call %d result: client "
114  	                    "no longer exists", client_id, client_desc, sync_s,
115  	                    call_id);
116  	        return;
117  	    }
118  	
119  	    client_type = pcmk__client_type_str(PCMK__CLIENT_TYPE(client));
120  	    client_name = pcmk__client_name(client);
121  	
122  	    if (sync_reply) {
123  	        flags = crm_ipc_flags_none;
124  	        if (client->ipcs != NULL) {
125  	            call_id = client->request_id;
126  	            client->request_id = 0;
127  	        }
128  	    }
129  	
130  	    switch (PCMK__CLIENT_TYPE(client)) {
131  	        case pcmk__client_ipc:
132  	            rc = pcmk__ipc_send_xml(client, call_id, xml, flags);
133  	            break;
134  	        case pcmk__client_tls:
135  	        case pcmk__client_tcp:
136  	            rc = pcmk__remote_send_xml(client->remote, xml);
137  	            break;
138  	        default:
139  	            rc = EPROTONOSUPPORT;
140  	            break;
141  	    }
142  	
143  	    if (rc == pcmk_rc_ok) {
144  	        pcmk__trace("Notified %s client %s%s %s of call %d result",
145  	                    client_type, client_name, client_desc, sync_s, call_id);
146  	    } else {
147  	        pcmk__warn("Could not notify %s client %s%s %s of call %d result: %s",
148  	                   client_type, client_name, client_desc, sync_s, call_id,
149  	                   pcmk_rc_str(rc));
150  	    }
151  	}
152  	
153  	/*!
154  	 * \internal
155  	 * \brief Request CIB digests from all peer nodes
156  	 *
157  	 * This is used as a callback that runs 5 seconds after we modify the CIB. It
158  	 * sends a ping request to all cluster nodes. They will respond by sending their
159  	 * current digests and version info, which we will validate in
160  	 * process_ping_reply(). This serves as a check of consistency across the
161  	 * cluster after a CIB update.
162  	 *
163  	 * \param[in] data  Ignored
164  	 *
165  	 * \return \c G_SOURCE_REMOVE (to destroy the timeout)
166  	 */
167  	static gboolean
168  	digest_timer_cb(gpointer data)
169  	{
170  	    char *buf = NULL;
171  	    xmlNode *ping = NULL;
172  	
173  	    if (!based_is_primary) {
174  	        return G_SOURCE_REMOVE;
175  	    }
176  	
177  	    ping_seq++;
178  	    g_clear_pointer(&ping_digest, free);
179  	    ping_modified_since = false;
180  	
181  	    buf = pcmk__assert_asprintf("%" PRIu64, ping_seq);
182  	
183  	    ping = pcmk__xe_create(NULL, PCMK__XE_PING);
184  	    pcmk__xe_set(ping, PCMK__XA_T, PCMK__VALUE_CIB);
185  	    pcmk__xe_set(ping, PCMK__XA_CIB_OP, CRM_OP_PING);
186  	    pcmk__xe_set(ping, PCMK__XA_CIB_PING_ID, buf);
187  	    pcmk__xe_set(ping, PCMK_XA_CRM_FEATURE_SET, CRM_FEATURE_SET);
188  	
189  	    pcmk__trace("Requesting peer digests (%s)", buf);
190  	    pcmk__cluster_send_message(NULL, pcmk_ipc_based, ping);
191  	
192  	    free(buf);
193  	    pcmk__xml_free(ping);
194  	    return G_SOURCE_REMOVE;
195  	}
196  	
197  	static void
198  	process_ping_reply(xmlNode *reply) 
199  	{
200  	    uint64_t seq = 0;
201  	    const char *host = pcmk__xe_get(reply, PCMK__XA_SRC);
202  	
203  	    xmlNode *wrapper = pcmk__xe_first_child(reply, PCMK__XE_CIB_CALLDATA, NULL,
204  	                                            NULL);
205  	    xmlNode *pong = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
206  	
207  	    const char *seq_s = pcmk__xe_get(pong, PCMK__XA_CIB_PING_ID);
208  	    const char *digest = pcmk__xe_get(pong, PCMK_XA_DIGEST);
209  	
210  	    if (seq_s == NULL) {
211  	        pcmk__debug("Ignoring ping reply with no " PCMK__XA_CIB_PING_ID);
212  	        return;
213  	
214  	    } else {
215  	        long long seq_ll;
216  	        int rc = pcmk__scan_ll(seq_s, &seq_ll, 0LL);
217  	
218  	        if (rc != pcmk_rc_ok) {
219  	            pcmk__debug("Ignoring ping reply with invalid " PCMK__XA_CIB_PING_ID
220  	                        " '%s': %s",
221  	                        seq_s, pcmk_rc_str(rc));
222  	            return;
223  	        }
224  	        seq = (uint64_t) seq_ll;
225  	    }
226  	
227  	    if(digest == NULL) {
228  	        pcmk__trace("Ignoring ping reply %s from %s with no digest", seq_s,
229  	                    host);
230  	
231  	    } else if(seq != ping_seq) {
232  	        pcmk__trace("Ignoring out of sequence ping reply %s from %s", seq_s,
233  	                    host);
234  	
235  	    } else if(ping_modified_since) {
236  	        pcmk__trace("Ignoring ping reply %s from %s: cib updated since", seq_s,
237  	                    host);
238  	
239  	    } else {
240  	        if(ping_digest == NULL) {
241  	            pcmk__trace("Calculating new digest");
242  	            ping_digest = pcmk__digest_xml(the_cib, true);
243  	        }
244  	
245  	        pcmk__trace("Processing ping reply %s from %s (%s)", seq_s, host,
246  	                    digest);
247  	        if (!pcmk__str_eq(ping_digest, digest, pcmk__str_casei)) {
248  	            xmlNode *wrapper = pcmk__xe_first_child(pong, PCMK__XE_CIB_CALLDATA,
249  	                                                    NULL, NULL);
250  	            xmlNode *remote_versions = pcmk__xe_first_child(wrapper, NULL, NULL,
251  	                                                            NULL);
252  	
253  	            const char *admin_epoch_s = NULL;
254  	            const char *epoch_s = NULL;
255  	            const char *num_updates_s = NULL;
256  	
257  	            if (remote_versions != NULL) {
258  	                admin_epoch_s = pcmk__xe_get(remote_versions,
259  	                                             PCMK_XA_ADMIN_EPOCH);
260  	                epoch_s = pcmk__xe_get(remote_versions,
261  	                                       PCMK_XA_EPOCH);
262  	                num_updates_s = pcmk__xe_get(remote_versions,
263  	                                             PCMK_XA_NUM_UPDATES);
264  	            }
265  	
266  	            pcmk__notice("Local CIB %s.%s.%s.%s differs from %s: %s.%s.%s.%s",
267  	                         pcmk__xe_get(the_cib, PCMK_XA_ADMIN_EPOCH),
268  	                         pcmk__xe_get(the_cib, PCMK_XA_EPOCH),
269  	                         pcmk__xe_get(the_cib, PCMK_XA_NUM_UPDATES),
270  	                         ping_digest, host,
271  	                         pcmk__s(admin_epoch_s, "_"),
272  	                         pcmk__s(epoch_s, "_"),
273  	                         pcmk__s(num_updates_s, "_"), digest);
274  	
275  	            pcmk__xml_free(remote_versions);
276  	            sync_our_cib(reply, false);
277  	        }
278  	    }
279  	}
280  	
281  	static void
282  	parse_local_options(const pcmk__client_t *client,
283  	                    const cib__operation_t *operation,
284  	                    const char *host, const char *op, bool *local_notify,
285  	                    bool *needs_reply, bool *process, bool *needs_forward)
286  	{
287  	    // Process locally and notify local client
288  	    *process = true;
289  	    *needs_reply = false;
290  	    *local_notify = true;
291  	    *needs_forward = false;
292  	
293  	    if (pcmk__is_set(operation->flags, cib__op_attr_local)) {
294  	        /* Always process locally if cib__op_attr_local is set.
295  	         *
296  	         * @COMPAT: Currently host is ignored. At a compatibility break, throw
297  	         * an error (from based_process_request() or earlier) if host is not
298  	         * NULL or OUR_NODENAME.
299  	         */
300  	        pcmk__trace("Processing always-local %s op from client %s", op,
301  	                    pcmk__client_name(client));
302  	
303  	        if (!pcmk__str_eq(host, OUR_NODENAME,
304  	                          pcmk__str_casei|pcmk__str_null_matches)) {
305  	
306  	            pcmk__warn("Operation '%s' is always local but its target host is "
307  	                       "set to '%s'",
308  	                       op, host);
309  	        }
310  	        return;
311  	    }
312  	
313  	    if (pcmk__is_set(operation->flags, cib__op_attr_modifies)
314  	        || !pcmk__str_eq(host, OUR_NODENAME,
315  	                         pcmk__str_casei|pcmk__str_null_matches)) {
316  	
317  	        // Forward modifying and non-local requests via cluster
318  	        *process = false;
319  	        *needs_reply = false;
320  	        *local_notify = false;
321  	        *needs_forward = true;
322  	
323  	        pcmk__trace("%s op from %s needs to be forwarded to %s", op,
324  	                    pcmk__client_name(client), pcmk__s(host, "all nodes"));
325  	        return;
326  	    }
327  	
328  	    if (stand_alone) {
329  	        pcmk__trace("Processing %s op from client %s (stand-alone)", op,
330  	                    pcmk__client_name(client));
331  	
332  	    } else {
333  	        pcmk__trace("Processing %saddressed %s op from client %s",
334  	                    ((host != NULL)? "locally " : "un"), op,
335  	                    pcmk__client_name(client));
336  	    }
337  	}
338  	
339  	static bool
340  	parse_peer_options(const cib__operation_t *operation, xmlNode *request,
341  	                   bool *local_notify, bool *needs_reply, bool *process)
342  	{
343  	    /* TODO: What happens when an update comes in after node A
344  	     * requests the CIB from node B, but before it gets the reply (and
345  	     * sends out the replace operation)?
346  	     *
347  	     * (This may no longer be relevant since legacy mode was dropped; need to
348  	     * trace code more closely to check.)
349  	     */
350  	    const char *host = NULL;
351  	    const char *delegated = pcmk__xe_get(request, PCMK__XA_CIB_DELEGATED_FROM);
352  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
353  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
354  	    const char *reply_to = pcmk__xe_get(request, PCMK__XA_CIB_ISREPLYTO);
355  	
356  	    bool is_reply = pcmk__str_eq(reply_to, OUR_NODENAME, pcmk__str_casei);
357  	
358  	    if (originator == NULL) { // Shouldn't be possible
359  	        originator = "peer";
360  	    }
361  	
362  	    if (pcmk__str_eq(op, PCMK__CIB_REQUEST_REPLACE, pcmk__str_none)) {
363  	        // sync_our_cib() sets PCMK__XA_CIB_ISREPLYTO
364  	        if (reply_to) {
365  	            delegated = reply_to;
366  	        }
367  	        goto skip_is_reply;
368  	    }
369  	
370  	    if (pcmk__str_eq(op, PCMK__CIB_REQUEST_SYNC, pcmk__str_none)) {
371  	        // Nothing to do
372  	
373  	    } else if (is_reply && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) {
374  	        process_ping_reply(request);
375  	        return false;
376  	
377  	    } else if (pcmk__str_eq(op, PCMK__CIB_REQUEST_UPGRADE, pcmk__str_none)) {
378  	        /* Only the DC (node with the oldest software) should process
379  	         * this operation if PCMK__XA_CIB_SCHEMA_MAX is unset.
380  	         *
381  	         * If the DC is happy it will then send out another
382  	         * PCMK__CIB_REQUEST_UPGRADE which will tell all nodes to do the actual
383  	         * upgrade.
384  	         *
385  	         * Except this time PCMK__XA_CIB_SCHEMA_MAX will be set which puts a
386  	         * limit on how far newer nodes will go
387  	         */
388  	        const char *max = pcmk__xe_get(request, PCMK__XA_CIB_SCHEMA_MAX);
389  	        const char *upgrade_rc = pcmk__xe_get(request, PCMK__XA_CIB_UPGRADE_RC);
390  	
391  	        pcmk__trace("Parsing upgrade %s for %s with max=%s and upgrade_rc=%s",
392  	                    (is_reply? "reply" : "request"),
393  	                    (based_is_primary? "primary" : "secondary"),
394  	                    pcmk__s(max, "none"), pcmk__s(upgrade_rc, "none"));
395  	
396  	        if (upgrade_rc != NULL) {
397  	            // Our upgrade request was rejected by DC, notify clients of result
398  	            pcmk__xe_set(request, PCMK__XA_CIB_RC, upgrade_rc);
399  	
400  	        } else if ((max == NULL) && based_is_primary) {
401  	            /* We are the DC, check if this upgrade is allowed */
402  	            goto skip_is_reply;
403  	
404  	        } else if(max) {
405  	            /* Ok, go ahead and upgrade to 'max' */
406  	            goto skip_is_reply;
407  	
408  	        } else {
409  	            // Ignore broadcast client requests when we're not primary
410  	            return false;
411  	        }
412  	
413  	    } else if (is_reply
414  	               && pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
415  	
416  	        pcmk__trace("Ignoring legacy %s reply sent from %s to local clients",
417  	                    op, originator);
418  	        return false;
419  	
420  	    } else if (pcmk__str_eq(op, PCMK__CIB_REQUEST_SHUTDOWN, pcmk__str_none)) {
421  	        *local_notify = false;
422  	        if (reply_to == NULL) {
423  	            *process = true;
424  	        } else { // Not possible?
425  	            pcmk__debug("Ignoring shutdown request from %s because reply_to=%s",
426  	                        originator, reply_to);
427  	        }
428  	        return *process;
429  	    }
430  	
431  	    if (is_reply) {
432  	        pcmk__trace("Will notify local clients for %s reply from %s", op,
433  	                    originator);
434  	        *process = false;
435  	        *needs_reply = false;
436  	        *local_notify = true;
437  	        return true;
438  	    }
439  	
440  	  skip_is_reply:
441  	    *process = true;
442  	    *needs_reply = false;
443  	
444  	    *local_notify = pcmk__str_eq(delegated, OUR_NODENAME, pcmk__str_casei);
445  	
446  	    host = pcmk__xe_get(request, PCMK__XA_CIB_HOST);
447  	    if (pcmk__str_eq(host, OUR_NODENAME, pcmk__str_casei)) {
448  	        pcmk__trace("Processing %s request sent to us from %s", op, originator);
449  	        *needs_reply = true;
450  	        return true;
451  	
452  	    } else if (host != NULL) {
453  	        pcmk__trace("Ignoring %s request intended for CIB manager on %s", op,
454  	                    host);
455  	        return false;
456  	
457  	    } else if (!is_reply && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) {
458  	        *needs_reply = true;
459  	    }
460  	
461  	    pcmk__trace("Processing %s request broadcast by %s call %s on %s "
462  	                "(local clients will%s be notified)", op,
463  	                pcmk__s(pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME),
464  	                        "client"),
465  	                pcmk__s(pcmk__xe_get(request, PCMK__XA_CIB_CALLID),
466  	                        "without ID"),
467  	                originator, (*local_notify? "" : "not"));
468  	    return true;
469  	}
470  	
471  	/*!
472  	 * \internal
473  	 * \brief Forward a CIB request to the appropriate target host(s)
474  	 *
475  	 * \param[in] request  CIB request to forward
476  	 */
477  	static void
478  	forward_request(xmlNode *request)
479  	{
480  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
481  	    const char *section = pcmk__xe_get(request, PCMK__XA_CIB_SECTION);
482  	    const char *host = pcmk__xe_get(request, PCMK__XA_CIB_HOST);
483  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
484  	    const char *client_name = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME);
485  	    const char *call_id = pcmk__xe_get(request, PCMK__XA_CIB_CALLID);
486  	    pcmk__node_status_t *peer = NULL;
487  	
488  	    int log_level = LOG_INFO;
489  	
490  	    if (pcmk__str_eq(op, PCMK__CIB_REQUEST_NOOP, pcmk__str_none)) {
491  	        log_level = LOG_DEBUG;
492  	    }
493  	
494  	    do_crm_log(log_level,
495  	               "Forwarding %s operation for section %s to %s (origin=%s/%s/%s)",
496  	               pcmk__s(op, "invalid"),
497  	               pcmk__s(section, "all"),
498  	               pcmk__s(host, "all"),
499  	               pcmk__s(originator, "local"),
500  	               pcmk__s(client_name, "unspecified"),
501  	               pcmk__s(call_id, "unspecified"));
502  	
503  	    pcmk__xe_set(request, PCMK__XA_CIB_DELEGATED_FROM, OUR_NODENAME);
504  	
505  	    if (host != NULL) {
506  	        peer = pcmk__get_node(0, host, NULL, pcmk__node_search_cluster_member);
507  	    }
508  	    pcmk__cluster_send_message(peer, pcmk_ipc_based, request);
509  	
510  	    // Return the request to its original state
511  	    pcmk__xe_remove_attr(request, PCMK__XA_CIB_DELEGATED_FROM);
512  	}
513  	
514  	static int
515  	cib_process_command(xmlNode *request, const cib__operation_t *operation,
516  	                    cib__op_fn_t op_function, xmlNode **reply, bool privileged)
517  	{
518  	    xmlNode *cib_diff = NULL;
519  	    xmlNode *output = NULL;
520  	    xmlNode *result_cib = NULL;
521  	
522  	    uint32_t call_options = cib_none;
523  	
524  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
525  	    const char *call_id = pcmk__xe_get(request, PCMK__XA_CIB_CALLID);
526  	    const char *client_id = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTID);
527  	    const char *client_name = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME);
528  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
529  	
530  	    int rc = pcmk_rc_ok;
531  	
532  	    bool config_changed = false;
533  	
534  	    static mainloop_timer_t *digest_timer = NULL;
535  	
536  	    pcmk__assert(cib_status == pcmk_rc_ok);
537  	
538  	    if (digest_timer == NULL) {
539  	        digest_timer = mainloop_timer_add("based_digest_timer", 5000, false,
540  	                                          digest_timer_cb, NULL);
541  	    }
542  	
543  	    *reply = NULL;
544  	
545  	    /* Start processing the request... */
546  	    rc = pcmk__xe_get_flags(request, PCMK__XA_CIB_CALLOPT, &call_options,
547  	                            cib_none);
548  	    if (rc != pcmk_rc_ok) {
549  	        pcmk__warn("Couldn't parse options from request: %s", pcmk_rc_str(rc));
550  	    }
551  	
552  	    if (!privileged
553  	        && pcmk__is_set(operation->flags, cib__op_attr_privileged)) {
554  	
555  	        rc = EACCES;
556  	        goto done;
557  	    }
558  	
559  	    if (!pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
560  	        rc = cib__perform_query(op_function, request, &the_cib, &output);
561  	        goto done;
562  	    }
563  	
564  	    ping_modified_since = true;
565  	
566  	    /* result_cib must not be modified after cib_perform_op() returns.
567  	     *
568  	     * It's not important whether the client variant is cib_native or
569  	     * cib_remote.
570  	     */
571  	    result_cib = the_cib;
572  	    rc = cib_perform_op(cib_undefined, op_function, request, &config_changed,
573  	                        &result_cib, &cib_diff, &output);
574  	
575  	    /* Always write to disk for successful ops with the flag set. This also
576  	     * negates the need to detect ordering changes.
577  	     */
578  	    if ((rc == pcmk_rc_ok)
579  	        && pcmk__is_set(operation->flags, cib__op_attr_writes_through)) {
580  	
581  	        config_changed = true;
582  	    }
583  	
584  	    if ((rc == pcmk_rc_ok)
585  	        && !pcmk__any_flags_set(call_options, cib_dryrun|cib_transaction)) {
586  	
587  	        if (result_cib != the_cib) {
588  	            if (pcmk__is_set(operation->flags, cib__op_attr_writes_through)) {
589  	                config_changed = true;
590  	            }
591  	
592  	            rc = based_activate_cib(result_cib, config_changed, op);
593  	        }
594  	
595  	        /* @COMPAT Nodes older than feature set 3.19.0 don't support
596  	         * transactions. In a mixed-version cluster with nodes <3.19.0, we must
597  	         * sync the updated CIB, so that the older nodes receive the changes.
598  	         * Any node that has already applied the transaction will ignore the
599  	         * synced CIB.
600  	         *
601  	         * To ensure the updated CIB is synced from only one node, we sync it
602  	         * from the originator.
603  	         */
604  	        if ((operation->type == cib__op_commit_transact)
605  	            && pcmk__str_eq(originator, OUR_NODENAME, pcmk__str_casei)
606  	            && (pcmk__compare_versions(pcmk__xe_get(the_cib,
607  	                                                    PCMK_XA_CRM_FEATURE_SET),
608  	                                       "3.19.0") < 0)) {
609  	
610  	            sync_our_cib(request, true);
611  	        }
612  	
613  	        mainloop_timer_start(digest_timer);
614  	
615  	    } else if (rc == pcmk_rc_schema_validation) {
616  	        pcmk__assert(result_cib != the_cib);
617  	
618  	        if (output != NULL) {
619  	            pcmk__log_xml_info(output, "cib:output");
620  	            pcmk__xml_free(output);
621  	        }
622  	
623  	        output = result_cib;
624  	
625  	    } else if (result_cib != the_cib) {
626  	        pcmk__xml_free(result_cib);
627  	    }
628  	
629  	    if (!pcmk__any_flags_set(call_options,
630  	                             cib_dryrun|cib_inhibit_notify|cib_transaction)) {
631  	        pcmk__trace("Sending notifications %d",
632  	                    pcmk__is_set(call_options, cib_dryrun));
633  	        based_diff_notify(op, pcmk_rc2legacy(rc), call_id, client_id,
634  	                          client_name, originator, cib_diff);
635  	    }
636  	
637  	  done:
638  	    if (!pcmk__is_set(call_options, cib_discard_reply)) {
639  	        *reply = create_cib_reply(op, call_id, client_id, call_options,
640  	                                  pcmk_rc2legacy(rc), output);
641  	    }
642  	
643  	    if (output != the_cib) {
644  	        pcmk__xml_free(output);
645  	    }
646  	
647  	    pcmk__trace("done");
648  	    pcmk__xml_free(cib_diff);
649  	    return rc;
650  	}
651  	
652  	/*!
653  	 * \internal
654  	 * \brief Log the result of processing a CIB request locally
655  	 *
656  	 * \param[in] request    Request XML
657  	 * \param[in] operation  Operation info
658  	 * \param[in] rc         Return code from processing the request
659  	 * \param[in] elapsed    How long processing took in seconds
660  	 */
661  	static void
662  	log_op_result(const xmlNode *request, const cib__operation_t *operation, int rc,
663  	              double elapsed)
664  	{
665  	    int level = LOG_INFO;
666  	
667  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
668  	    const char *section = pcmk__xe_get(request, PCMK__XA_CIB_SECTION);
669  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
670  	    const char *client_name = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME);
671  	    const char *call_id = pcmk__xe_get(request, PCMK__XA_CIB_CALLID);
672  	
673  	    int admin_epoch = 0;
674  	    int epoch = 0;
675  	    int num_updates = 0;
676  	
677  	    if (!pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
678  	        level = LOG_TRACE;
679  	
680  	    } else if (rc != pcmk_rc_ok) {
681  	        level = LOG_WARNING;
682  	    }
683  	
684  	    section = pcmk__s(section, "'all'");
685  	    originator = pcmk__s(originator, "local");
686  	    client_name = pcmk__s(client_name, "client");
687  	
688  	    /* @FIXME the_cib should always be non-NULL, but that's currently not the
689  	     * case during shutdown
690  	     */
691  	    if (the_cib != NULL) {
692  	        pcmk__xe_get_int(the_cib, PCMK_XA_ADMIN_EPOCH, &admin_epoch);
693  	        pcmk__xe_get_int(the_cib, PCMK_XA_EPOCH, &epoch);
694  	        pcmk__xe_get_int(the_cib, PCMK_XA_NUM_UPDATES, &num_updates);
695  	    }
696  	
697  	    do_crm_log(level,
698  	               "Completed %s operation for section %s: %s (rc=%d, "
699  	               "origin=%s/%s/%s, version=%d.%d.%d)",
700  	               op, section, pcmk_rc_str(rc), rc,
701  	               originator, client_name, call_id,
702  	               admin_epoch, epoch, num_updates);
703  	
704  	    if (elapsed > 3) {
705  	        pcmk__trace("%s operation took %.2fs to complete", op, elapsed);
706  	        crm_write_blackbox(0, NULL);
707  	    }
708  	}
709  	
710  	static void
711  	send_peer_reply(xmlNode *msg, const char *originator)
712  	{
713  	    const pcmk__node_status_t *node = NULL;
714  	
715  	    if ((msg == NULL) || (originator == NULL)) {
716  	        return;
717  	    }
718  	
719  	    // Send reply via cluster to originating node
720  	    node = pcmk__get_node(0, originator, NULL,
721  	                          pcmk__node_search_cluster_member);
722  	
723  	    pcmk__trace("Sending request result to %s only", originator);
724  	    pcmk__xe_set(msg, PCMK__XA_CIB_ISREPLYTO, originator);
725  	    pcmk__cluster_send_message(node, pcmk_ipc_based, msg);
726  	}
727  	
728  	/*!
729  	 * \internal
730  	 * \brief Handle an IPC or CPG message containing a request
731  	 *
732  	 * \param[in,out] request     Request XML
733  	 * \param[in]     privileged  If \c true, operations with
734  	 *                            \c cib__op_attr_privileged can be run
735  	 * \param[in]     client      IPC client that sent request (\c NULL if request
736  	 *                            came from CPG)
737  	 *
738  	 * \return Standard Pacemaker return code
739  	 */
740  	int
741  	based_process_request(xmlNode *request, bool privileged,
742  	                      const pcmk__client_t *client)
743  	{
744  	    // @TODO: Break into multiple smaller functions
745  	    uint32_t call_options = cib_none;
746  	
747  	    bool process = true;        // Whether to process request locally now
748  	    bool needs_reply = true;    // Whether to build a reply
749  	    bool local_notify = false;  // Whether to notify (local) requester
750  	    bool needs_forward = false; // Whether to forward request somewhere else
751  	
752  	    xmlNode *reply = NULL;
753  	
754  	    int rc = pcmk_rc_ok;
755  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
756  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
757  	    const char *host = pcmk__xe_get(request, PCMK__XA_CIB_HOST);
758  	    const char *call_id = pcmk__xe_get(request, PCMK__XA_CIB_CALLID);
759  	    const char *client_id = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTID);
760  	    const char *client_name = pcmk__s(pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME),
761  	                                      "client");
762  	    const char *reply_to = pcmk__xe_get(request, PCMK__XA_CIB_ISREPLYTO);
763  	
764  	    const cib__operation_t *operation = NULL;
765  	    cib__op_fn_t op_function = NULL;
766  	
767  	    rc = pcmk__xe_get_flags(request, PCMK__XA_CIB_CALLOPT, &call_options,
768  	                            cib_none);
769  	    if (rc != pcmk_rc_ok) {
770  	        pcmk__warn("Couldn't parse options from request: %s", pcmk_rc_str(rc));
771  	    }
772  	
773  	    if (pcmk__str_empty(host)) {
774  	        host = NULL;
775  	    }
776  	
777  	    if (client == NULL) {
778  	        pcmk__trace("Processing peer %s operation from %s/%s on %s intended "
779  	                    "for %s (reply=%s)", op, client_name, call_id, originator,
780  	                    pcmk__s(host, "all"), reply_to);
781  	    } else {
782  	        pcmk__xe_set(request, PCMK__XA_SRC, OUR_NODENAME);
783  	        pcmk__trace("Processing local %s operation from %s/%s intended for %s",
784  	                    op, client_name, call_id, pcmk__s(host, "all"));
785  	    }
786  	
787  	    rc = cib__get_operation(op, &operation);
788  	    if (rc != pcmk_rc_ok) {
789  	        /* TODO: construct error reply? */
790  	        pcmk__err("Pre-processing of command failed: %s", pcmk_rc_str(rc));
791  	        return rc;
792  	    }
793  	
794  	    op_function = based_get_op_function(operation);
795  	    if (op_function == NULL) {
796  	        pcmk__err("Operation %s not supported by CIB manager", op);
797  	        return EOPNOTSUPP;
798  	    }
799  	
800  	    if (client != NULL) {
801  	        parse_local_options(client, operation, host, op, &local_notify,
802  	                            &needs_reply, &process, &needs_forward);
803  	
804  	    } else if (!parse_peer_options(operation, request, &local_notify,
805  	                                   &needs_reply, &process)) {
806  	        return pcmk_rc_ok;
807  	    }
808  	
809  	    if (pcmk__is_set(call_options, cib_transaction)) {
810  	        /* All requests in a transaction are processed locally against a working
811  	         * CIB copy, and we don't notify for individual requests because the
812  	         * entire transaction is atomic.
813  	         *
814  	         * We still call the option parser functions above, for the sake of log
815  	         * messages and checking whether we're the target for peer requests.
816  	         */
817  	        process = true;
818  	        needs_reply = false;
819  	        local_notify = false;
820  	        needs_forward = false;
821  	    }
822  	
823  	    if (pcmk__is_set(call_options, cib_discard_reply)) {
824  	        /* If the request will modify the CIB, and we are in legacy mode, we
825  	         * need to build a reply so we can broadcast a diff, even if the
826  	         * requester doesn't want one.
827  	         */
828  	        needs_reply = false;
829  	        local_notify = false;
830  	        pcmk__trace("Client is not interested in the reply");
831  	    }
832  	
833  	    if (needs_forward) {
834  	        forward_request(request);
835  	        return pcmk_rc_ok;
836  	    }
837  	
838  	    if (cib_status != pcmk_rc_ok) {
839  	        rc = cib_status;
840  	        pcmk__err("Ignoring request because cluster configuration is invalid "
841  	                  "(please repair and restart): %s", pcmk_rc_str(rc));
842  	        reply = create_cib_reply(op, call_id, client_id, call_options,
843  	                                 pcmk_rc2legacy(rc), the_cib);
844  	
845  	    } else if (process) {
846  	        time_t start_time = time(NULL);
847  	
848  	        rc = cib_process_command(request, operation, op_function, &reply,
849  	                                 privileged);
850  	        log_op_result(request, operation, rc, difftime(time(NULL), start_time));
851  	
852  	        if ((reply == NULL) && (needs_reply || local_notify)) {
853  	            pcmk__err("Unexpected NULL reply to message");
854  	            pcmk__log_xml_err(request, "null reply");
855  	            goto done;
856  	        }
857  	    }
858  	
859  	    if (pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
860  	        pcmk__trace("Completed pre-sync update from %s/%s/%s%s",
861  	                    pcmk__s(originator, "local"), client_name, call_id,
862  	                    (local_notify? " with local notification" : ""));
863  	
864  	    } else if (needs_reply && !stand_alone && (client == NULL)
865  	               && !pcmk__is_set(call_options, cib_discard_reply)) {
866  	        send_peer_reply(reply, originator);
867  	    }
868  	
869  	    if (local_notify && (client_id != NULL)) {
870  	        do_local_notify((process? reply : request), client_id,
871  	                        pcmk__is_set(call_options, cib_sync_call),
872  	                        (client == NULL));
873  	    }
874  	
875  	done:
876  	    pcmk__xml_free(reply);
877  	    return rc;
878  	}
879  	
880  	void
881  	based_peer_callback(xmlNode *msg, void *private_data)
882  	{
883  	    const char *reason = NULL;
884  	    const char *originator = pcmk__xe_get(msg, PCMK__XA_SRC);
885  	
886  	    if (pcmk__peer_cache == NULL) {
887  	        reason = "membership not established";
888  	        goto bail;
889  	    }
890  	
891  	    if (pcmk__xe_get(msg, PCMK__XA_CIB_CLIENTNAME) == NULL) {
892  	        pcmk__xe_set(msg, PCMK__XA_CIB_CLIENTNAME, originator);
893  	    }
894  	
895  	    based_process_request(msg, true, NULL);
896  	    return;
897  	
898  	  bail:
899  	    if (reason) {
900  	        const char *op = pcmk__xe_get(msg, PCMK__XA_CIB_OP);
901  	
902  	        pcmk__warn("Discarding %s message from %s: %s", op, originator, reason);
903  	    }
904  	}
905  	
906  	static gboolean
907  	cib_force_exit(gpointer data)
908  	{
909  	    pcmk__notice("Exiting immediately after %s without shutdown acknowledgment",
910  	                 pcmk__readable_interval(EXIT_ESCALATION_MS));
911  	    based_terminate(CRM_EX_ERROR);
912  	    return FALSE;
913  	}
914  	
915  	static void
916  	disconnect_remote_client(gpointer key, gpointer value, gpointer user_data)
917  	{
918  	    pcmk__client_t *a_client = value;
919  	
920  	    pcmk__err("Can't disconnect client %s: Not implemented",
921  	              pcmk__client_name(a_client));
922  	}
923  	
924  	static void
925  	initiate_exit(void)
926  	{
927  	    int active = 0;
928  	    xmlNode *leaving = NULL;
929  	
930  	    active = pcmk__cluster_num_active_nodes();
931  	    if (active < 2) { // This is the last active node
932  	        pcmk__info("Exiting without sending shutdown request (no active "
933  	                   "peers)");
934  	        based_terminate(CRM_EX_OK);
935  	        return;
936  	    }
937  	
938  	    pcmk__info("Sending shutdown request to %d peers", active);
939  	
940  	    leaving = pcmk__xe_create(NULL, PCMK__XE_EXIT_NOTIFICATION);
941  	    pcmk__xe_set(leaving, PCMK__XA_T, PCMK__VALUE_CIB);
942  	    pcmk__xe_set(leaving, PCMK__XA_CIB_OP, PCMK__CIB_REQUEST_SHUTDOWN);
943  	
944  	    pcmk__cluster_send_message(NULL, pcmk_ipc_based, leaving);
945  	    pcmk__xml_free(leaving);
946  	
947  	    pcmk__create_timer(EXIT_ESCALATION_MS, cib_force_exit, NULL);
948  	}
949  	
950  	void
951  	based_shutdown(int nsig)
952  	{
953  	    struct qb_ipcs_stats srv_stats;
954  	
955  	    if (!cib_shutdown_flag) {
956  	        int disconnects = 0;
957  	        qb_ipcs_connection_t *c = NULL;
958  	
959  	        cib_shutdown_flag = true;
960  	
961  	        c = qb_ipcs_connection_first_get(ipcs_rw);
962  	        while (c != NULL) {
963  	            qb_ipcs_connection_t *last = c;
964  	
965  	            c = qb_ipcs_connection_next_get(ipcs_rw, last);
966  	
967  	            pcmk__debug("Disconnecting r/w client %p...", last);
968  	            qb_ipcs_disconnect(last);
969  	            qb_ipcs_connection_unref(last);
970  	            disconnects++;
971  	        }
972  	
973  	        c = qb_ipcs_connection_first_get(ipcs_ro);
974  	        while (c != NULL) {
975  	            qb_ipcs_connection_t *last = c;
976  	
977  	            c = qb_ipcs_connection_next_get(ipcs_ro, last);
978  	
979  	            pcmk__debug("Disconnecting r/o client %p...", last);
980  	            qb_ipcs_disconnect(last);
981  	            qb_ipcs_connection_unref(last);
982  	            disconnects++;
983  	        }
984  	
985  	        c = qb_ipcs_connection_first_get(ipcs_shm);
986  	        while (c != NULL) {
987  	            qb_ipcs_connection_t *last = c;
988  	
989  	            c = qb_ipcs_connection_next_get(ipcs_shm, last);
990  	
991  	            pcmk__debug("Disconnecting non-blocking r/w client %p...", last);
992  	            qb_ipcs_disconnect(last);
993  	            qb_ipcs_connection_unref(last);
994  	            disconnects++;
995  	        }
996  	
997  	        disconnects += pcmk__ipc_client_count();
998  	
999  	        pcmk__debug("Disconnecting %d remote clients",
1000 	                    pcmk__ipc_client_count());
1001 	        pcmk__foreach_ipc_client(disconnect_remote_client, NULL);
1002 	        pcmk__info("Disconnected %d clients", disconnects);
1003 	    }
1004 	
1005 	    qb_ipcs_stats_get(ipcs_rw, &srv_stats, QB_FALSE);
1006 	
1007 	    if (pcmk__ipc_client_count() == 0) {
1008 	        pcmk__info("All clients disconnected (%d)", srv_stats.active_connections);
1009 	        initiate_exit();
1010 	
1011 	    } else {
1012 	        pcmk__info("Waiting on %d clients to disconnect (%d)",
1013 	                   pcmk__ipc_client_count(), srv_stats.active_connections);
1014 	    }
1015 	}
1016 	
1017 	/*!
1018 	 * \internal
1019 	 * \brief Close remote sockets, free the global CIB and quit
1020 	 *
1021 	 * \param[in] exit_status  What exit status to use (if -1, use CRM_EX_OK, but
1022 	 *                         skip disconnecting from the cluster layer)
1023 	 */
1024 	void
1025 	based_terminate(int exit_status)
1026 	{
(1) Event path: Condition "remote_fd > 0", taking true branch.
1027 	    if (remote_fd > 0) {
1028 	        close(remote_fd);
1029 	        remote_fd = 0;
1030 	    }
(2) Event path: Condition "remote_tls_fd > 0", taking true branch.
1031 	    if (remote_tls_fd > 0) {
1032 	        close(remote_tls_fd);
1033 	        remote_tls_fd = 0;
1034 	    }
1035 	
CID (unavailable; MK=a2bec0b8ef6c7e3bfcd2b76f64b922db) (#1 of 1): Inconsistent C union access (INCONSISTENT_UNION_ACCESS):
(3) Event assign_union_field: The union field "in" of "_pp" is written.
(4) Event inconsistent_union_field_access: In "_pp.out", the union field used: "out" is inconsistent with the field most recently stored: "in".
1036 	    g_clear_pointer(&the_cib, pcmk__xml_free);
1037 	
1038 	    // Exit immediately on error
1039 	    if (exit_status > CRM_EX_OK) {
1040 	        pcmk__stop_based_ipc(ipcs_ro, ipcs_rw, ipcs_shm);
1041 	        crm_exit(exit_status);
1042 	        return;
1043 	    }
1044 	
1045 	    if ((mainloop != NULL) && g_main_loop_is_running(mainloop)) {
1046 	        /* Quit via returning from the main loop. If exit_status has the special
1047 	         * value -1, we skip the disconnect here, and it will be done when the
1048 	         * main loop returns (this allows the peer status callback to avoid
1049 	         * messing with the peer caches).
1050 	         */
1051 	        if (exit_status == CRM_EX_OK) {
1052 	            pcmk_cluster_disconnect(crm_cluster);
1053 	        }
1054 	        g_main_loop_quit(mainloop);
1055 	        return;
1056 	    }
1057 	
1058 	    /* Exit cleanly. Even the peer status callback can disconnect here, because
1059 	     * we're not returning control to the caller.
1060 	     */
1061 	    pcmk_cluster_disconnect(crm_cluster);
1062 	    pcmk__stop_based_ipc(ipcs_ro, ipcs_rw, ipcs_shm);
1063 	    crm_exit(CRM_EX_OK);
1064 	}
1065