1    	/*
2    	 * Copyright 2004-2026 the Pacemaker project contributors
3    	 *
4    	 * The version control history for this file may have further details.
5    	 *
6    	 * This source code is licensed under the GNU General Public License version 2
7    	 * or later (GPLv2+) WITHOUT ANY WARRANTY.
8    	 */
9    	
10   	#include <crm_internal.h>
11   	
12   	#include <errno.h>                  // EACCES, ECONNREFUSED
13   	#include <inttypes.h>               // PRIu64
14   	#include <stdbool.h>
15   	#include <stddef.h>                 // NULL, size_t
16   	#include <stdint.h>                 // uint32_t, uint64_t
17   	#include <stdlib.h>                 // free
18   	#include <syslog.h>                 // LOG_INFO, LOG_DEBUG
19   	#include <time.h>                   // time_t
20   	#include <unistd.h>                 // close
21   	
22   	#include <glib.h>                   // gboolean, gpointer, g_*, etc.
23   	#include <libxml/tree.h>            // xmlNode
24   	#include <qb/qbdefs.h>              // QB_FALSE
25   	#include <qb/qbipcs.h>              // qb_ipcs_connection_t
26   	#include <qb/qblog.h>               // LOG_TRACE
27   	
28   	#include <crm/cib.h>                // cib_call_options values
29   	#include <crm/cib/internal.h>       // cib__*
30   	#include <crm/cluster.h>            // pcmk_cluster_disconnect
31   	#include <crm/cluster/internal.h>   // pcmk__cluster_send_message
32   	#include <crm/common/cib.h>         // pcmk_find_cib_element
33   	#include <crm/common/internal.h>    // pcmk__s, pcmk__str_eq
34   	#include <crm/common/ipc.h>         // crm_ipc_*, pcmk_ipc_*
35   	#include <crm/common/logging.h>     // CRM_LOG_ASSERT, CRM_CHECK
36   	#include <crm/common/mainloop.h>    // mainloop_*
37   	#include <crm/common/results.h>     // pcmk_rc_*
38   	#include <crm/common/xml.h>         // PCMK_XA_*, PCMK_XE_*
39   	#include <crm/crm.h>                // CRM_OP_*
40   	
41   	#include "pacemaker-based.h"
42   	
43   	#define EXIT_ESCALATION_MS 10000
44   	
45   	static uint64_t ping_seq = 0;
46   	static char *ping_digest = NULL;
47   	static bool ping_modified_since = false;
48   	
49   	/*!
50   	 * \internal
51   	 * \brief Create reply XML for a CIB request
52   	 *
53   	 * \param[in] op            CIB operation type
54   	 * \param[in] call_id       CIB call ID
55   	 * \param[in] client_id     CIB client ID
56   	 * \param[in] call_options  Group of <tt>enum cib_call_options</tt> flags
57   	 * \param[in] rc            Request return code
58   	 * \param[in] call_data     Request output data
59   	 *
60   	 * \return Reply XML (guaranteed not to be \c NULL)
61   	 *
62   	 * \note The caller is responsible for freeing the return value using
63   	 *       \p pcmk__xml_free().
64   	 */
65   	static xmlNode *
66   	create_cib_reply(const char *op, const char *call_id, const char *client_id,
67   	                 uint32_t call_options, int rc, xmlNode *call_data)
68   	{
69   	    xmlNode *reply = pcmk__xe_create(NULL, PCMK__XE_CIB_REPLY);
70   	
71   	    pcmk__xe_set(reply, PCMK__XA_T, PCMK__VALUE_CIB);
72   	    pcmk__xe_set(reply, PCMK__XA_CIB_OP, op);
73   	    pcmk__xe_set(reply, PCMK__XA_CIB_CALLID, call_id);
74   	    pcmk__xe_set(reply, PCMK__XA_CIB_CLIENTID, client_id);
75   	    pcmk__xe_set_int(reply, PCMK__XA_CIB_CALLOPT, call_options);
76   	    pcmk__xe_set_int(reply, PCMK__XA_CIB_RC, rc);
77   	
78   	    if (call_data != NULL) {
79   	        xmlNode *wrapper = pcmk__xe_create(reply, PCMK__XE_CIB_CALLDATA);
80   	
81   	        pcmk__trace("Attaching reply output");
82   	        pcmk__xml_copy(wrapper, call_data);
83   	    }
84   	
85   	    crm_log_xml_explicit(reply, "cib:reply");
86   	    return reply;
87   	}
88   	
89   	static void
90   	do_local_notify(const xmlNode *xml, const char *client_id, bool sync_reply,
91   	                bool from_peer)
92   	{
93   	    int call_id = 0;
94   	    int rc = pcmk_rc_ok;
95   	    pcmk__client_t *client = NULL;
96   	    uint32_t flags = crm_ipc_server_event;
97   	    const char *client_type = NULL;
98   	    const char *client_name = NULL;
99   	    const char *client_desc = "";
100  	    const char *sync_s = (sync_reply? "synchronously" : "asynchronously");
101  	
102  	    CRM_CHECK((xml != NULL) && (client_id != NULL), return);
103  	
104  	    if (from_peer) {
105  	        client_desc = " (originator of delegated_request)";
106  	    }
107  	
108  	    pcmk__trace("Performing local %s notification for %s", sync_s, client_id);
109  	
110  	    pcmk__xe_get_int(xml, PCMK__XA_CIB_CALLID, &call_id);
111  	
112  	    client = pcmk__find_client_by_id(client_id);
113  	    if (client == NULL) {
114  	        pcmk__debug("Could not notify client %s%s %s of call %d result: client "
115  	                    "no longer exists", client_id, client_desc, sync_s,
116  	                    call_id);
117  	        return;
118  	    }
119  	
120  	    client_type = pcmk__client_type_str(PCMK__CLIENT_TYPE(client));
121  	    client_name = pcmk__client_name(client);
122  	
123  	    if (sync_reply) {
124  	        flags = crm_ipc_flags_none;
125  	        if (client->ipcs != NULL) {
126  	            call_id = client->request_id;
127  	            client->request_id = 0;
128  	        }
129  	    }
130  	
131  	    switch (PCMK__CLIENT_TYPE(client)) {
132  	        case pcmk__client_ipc:
133  	            rc = pcmk__ipc_send_xml(client, call_id, xml, flags);
134  	            break;
135  	        case pcmk__client_tls:
136  	        case pcmk__client_tcp:
137  	            rc = pcmk__remote_send_xml(client->remote, xml);
138  	            break;
139  	        default:
140  	            rc = EPROTONOSUPPORT;
141  	            break;
142  	    }
143  	
144  	    if (rc == pcmk_rc_ok) {
145  	        pcmk__trace("Notified %s client %s%s %s of call %d result",
146  	                    client_type, client_name, client_desc, sync_s, call_id);
147  	    } else {
148  	        pcmk__warn("Could not notify %s client %s%s %s of call %d result: %s",
149  	                   client_type, client_name, client_desc, sync_s, call_id,
150  	                   pcmk_rc_str(rc));
151  	    }
152  	}
153  	
154  	/*!
155  	 * \internal
156  	 * \brief Request CIB digests from all peer nodes
157  	 *
158  	 * This is used as a callback that runs 5 seconds after we modify the CIB. It
159  	 * sends a ping request to all cluster nodes. They will respond by sending their
160  	 * current digests and version info, which we will validate in
161  	 * process_ping_reply(). This serves as a check of consistency across the
162  	 * cluster after a CIB update.
163  	 *
164  	 * \param[in] data  Ignored
165  	 *
166  	 * \return \c G_SOURCE_REMOVE (to destroy the timeout)
167  	 */
168  	static gboolean
169  	digest_timer_cb(gpointer data)
170  	{
171  	    char *buf = NULL;
172  	    xmlNode *ping = NULL;
173  	
174  	    if (!based_is_primary) {
175  	        return G_SOURCE_REMOVE;
176  	    }
177  	
178  	    ping_seq++;
179  	    g_clear_pointer(&ping_digest, free);
180  	    ping_modified_since = false;
181  	
182  	    buf = pcmk__assert_asprintf("%" PRIu64, ping_seq);
183  	
184  	    ping = pcmk__xe_create(NULL, PCMK__XE_PING);
185  	    pcmk__xe_set(ping, PCMK__XA_T, PCMK__VALUE_CIB);
186  	    pcmk__xe_set(ping, PCMK__XA_CIB_OP, CRM_OP_PING);
187  	    pcmk__xe_set(ping, PCMK__XA_CIB_PING_ID, buf);
188  	    pcmk__xe_set(ping, PCMK_XA_CRM_FEATURE_SET, CRM_FEATURE_SET);
189  	
190  	    pcmk__trace("Requesting peer digests (%s)", buf);
191  	    pcmk__cluster_send_message(NULL, pcmk_ipc_based, ping);
192  	
193  	    free(buf);
194  	    pcmk__xml_free(ping);
195  	    return G_SOURCE_REMOVE;
196  	}
197  	
198  	static void
199  	process_ping_reply(xmlNode *reply) 
200  	{
201  	    uint64_t seq = 0;
202  	    const char *host = pcmk__xe_get(reply, PCMK__XA_SRC);
203  	
204  	    xmlNode *wrapper = pcmk__xe_first_child(reply, PCMK__XE_CIB_CALLDATA, NULL,
205  	                                            NULL);
206  	    xmlNode *pong = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
207  	
208  	    const char *seq_s = pcmk__xe_get(pong, PCMK__XA_CIB_PING_ID);
209  	    const char *digest = pcmk__xe_get(pong, PCMK_XA_DIGEST);
210  	
211  	    if (seq_s == NULL) {
212  	        pcmk__debug("Ignoring ping reply with no " PCMK__XA_CIB_PING_ID);
213  	        return;
214  	
215  	    } else {
216  	        long long seq_ll;
217  	        int rc = pcmk__scan_ll(seq_s, &seq_ll, 0LL);
218  	
219  	        if (rc != pcmk_rc_ok) {
220  	            pcmk__debug("Ignoring ping reply with invalid " PCMK__XA_CIB_PING_ID
221  	                        " '%s': %s",
222  	                        seq_s, pcmk_rc_str(rc));
223  	            return;
224  	        }
225  	        seq = (uint64_t) seq_ll;
226  	    }
227  	
228  	    if(digest == NULL) {
229  	        pcmk__trace("Ignoring ping reply %s from %s with no digest", seq_s,
230  	                    host);
231  	
232  	    } else if(seq != ping_seq) {
233  	        pcmk__trace("Ignoring out of sequence ping reply %s from %s", seq_s,
234  	                    host);
235  	
236  	    } else if(ping_modified_since) {
237  	        pcmk__trace("Ignoring ping reply %s from %s: cib updated since", seq_s,
238  	                    host);
239  	
240  	    } else {
241  	        if(ping_digest == NULL) {
242  	            pcmk__trace("Calculating new digest");
243  	            ping_digest = pcmk__digest_xml(the_cib, true);
244  	        }
245  	
246  	        pcmk__trace("Processing ping reply %s from %s (%s)", seq_s, host,
247  	                    digest);
248  	        if (!pcmk__str_eq(ping_digest, digest, pcmk__str_casei)) {
249  	            xmlNode *wrapper = pcmk__xe_first_child(pong, PCMK__XE_CIB_CALLDATA,
250  	                                                    NULL, NULL);
251  	            xmlNode *remote_cib = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
252  	
253  	            const char *admin_epoch_s = NULL;
254  	            const char *epoch_s = NULL;
255  	            const char *num_updates_s = NULL;
256  	
257  	            if (remote_cib != NULL) {
258  	                admin_epoch_s = pcmk__xe_get(remote_cib, PCMK_XA_ADMIN_EPOCH);
259  	                epoch_s = pcmk__xe_get(remote_cib, PCMK_XA_EPOCH);
260  	                num_updates_s = pcmk__xe_get(remote_cib, PCMK_XA_NUM_UPDATES);
261  	            }
262  	
263  	            pcmk__notice("Local CIB %s.%s.%s.%s differs from %s: %s.%s.%s.%s "
264  	                         "%p",
265  	                         pcmk__xe_get(the_cib, PCMK_XA_ADMIN_EPOCH),
266  	                         pcmk__xe_get(the_cib, PCMK_XA_EPOCH),
267  	                         pcmk__xe_get(the_cib, PCMK_XA_NUM_UPDATES),
268  	                         ping_digest, host,
269  	                         pcmk__s(admin_epoch_s, "_"),
270  	                         pcmk__s(epoch_s, "_"),
271  	                         pcmk__s(num_updates_s, "_"),
272  	                         digest, remote_cib);
273  	
274  	            if(remote_cib && remote_cib->children) {
275  	                // Additional debug
276  	                pcmk__xml_mark_changes(the_cib, remote_cib);
277  	                pcmk__log_xml_changes(LOG_INFO, remote_cib);
278  	                pcmk__trace("End of differences");
279  	            }
280  	
281  	            pcmk__xml_free(remote_cib);
282  	            sync_our_cib(reply, false);
283  	        }
284  	    }
285  	}
286  	
287  	static void
288  	parse_local_options(const pcmk__client_t *client,
289  	                    const cib__operation_t *operation,
290  	                    const char *host, const char *op, bool *local_notify,
291  	                    bool *needs_reply, bool *process, bool *needs_forward)
292  	{
293  	    // Process locally and notify local client
294  	    *process = true;
295  	    *needs_reply = false;
296  	    *local_notify = true;
297  	    *needs_forward = false;
298  	
299  	    if (pcmk__is_set(operation->flags, cib__op_attr_local)) {
300  	        /* Always process locally if cib__op_attr_local is set.
301  	         *
302  	         * @COMPAT: Currently host is ignored. At a compatibility break, throw
303  	         * an error (from based_process_request() or earlier) if host is not
304  	         * NULL or OUR_NODENAME.
305  	         */
306  	        pcmk__trace("Processing always-local %s op from client %s", op,
307  	                    pcmk__client_name(client));
308  	
309  	        if (!pcmk__str_eq(host, OUR_NODENAME,
310  	                          pcmk__str_casei|pcmk__str_null_matches)) {
311  	
312  	            pcmk__warn("Operation '%s' is always local but its target host is "
313  	                       "set to '%s'",
314  	                       op, host);
315  	        }
316  	        return;
317  	    }
318  	
319  	    if (pcmk__is_set(operation->flags, cib__op_attr_modifies)
320  	        || !pcmk__str_eq(host, OUR_NODENAME,
321  	                         pcmk__str_casei|pcmk__str_null_matches)) {
322  	
323  	        // Forward modifying and non-local requests via cluster
324  	        *process = false;
325  	        *needs_reply = false;
326  	        *local_notify = false;
327  	        *needs_forward = true;
328  	
329  	        pcmk__trace("%s op from %s needs to be forwarded to %s", op,
330  	                    pcmk__client_name(client), pcmk__s(host, "all nodes"));
331  	        return;
332  	    }
333  	
334  	    if (stand_alone) {
335  	        pcmk__trace("Processing %s op from client %s (stand-alone)", op,
336  	                    pcmk__client_name(client));
337  	
338  	    } else {
339  	        pcmk__trace("Processing %saddressed %s op from client %s",
340  	                    ((host != NULL)? "locally " : "un"), op,
341  	                    pcmk__client_name(client));
342  	    }
343  	}
344  	
345  	static bool
346  	parse_peer_options(const cib__operation_t *operation, xmlNode *request,
347  	                   bool *local_notify, bool *needs_reply, bool *process)
348  	{
349  	    /* TODO: What happens when an update comes in after node A
350  	     * requests the CIB from node B, but before it gets the reply (and
351  	     * sends out the replace operation)?
352  	     *
353  	     * (This may no longer be relevant since legacy mode was dropped; need to
354  	     * trace code more closely to check.)
355  	     */
356  	    const char *host = NULL;
357  	    const char *delegated = pcmk__xe_get(request, PCMK__XA_CIB_DELEGATED_FROM);
358  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
359  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
360  	    const char *reply_to = pcmk__xe_get(request, PCMK__XA_CIB_ISREPLYTO);
361  	
362  	    bool is_reply = pcmk__str_eq(reply_to, OUR_NODENAME, pcmk__str_casei);
363  	
364  	    if (originator == NULL) { // Shouldn't be possible
365  	        originator = "peer";
366  	    }
367  	
368  	    if (pcmk__str_eq(op, PCMK__CIB_REQUEST_REPLACE, pcmk__str_none)) {
369  	        // sync_our_cib() sets PCMK__XA_CIB_ISREPLYTO
370  	        if (reply_to) {
371  	            delegated = reply_to;
372  	        }
373  	        goto skip_is_reply;
374  	    }
375  	
376  	    if (pcmk__str_eq(op, PCMK__CIB_REQUEST_SYNC, pcmk__str_none)) {
377  	        // Nothing to do
378  	
379  	    } else if (is_reply && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) {
380  	        process_ping_reply(request);
381  	        return false;
382  	
383  	    } else if (pcmk__str_eq(op, PCMK__CIB_REQUEST_UPGRADE, pcmk__str_none)) {
384  	        /* Only the DC (node with the oldest software) should process
385  	         * this operation if PCMK__XA_CIB_SCHEMA_MAX is unset.
386  	         *
387  	         * If the DC is happy it will then send out another
388  	         * PCMK__CIB_REQUEST_UPGRADE which will tell all nodes to do the actual
389  	         * upgrade.
390  	         *
391  	         * Except this time PCMK__XA_CIB_SCHEMA_MAX will be set which puts a
392  	         * limit on how far newer nodes will go
393  	         */
394  	        const char *max = pcmk__xe_get(request, PCMK__XA_CIB_SCHEMA_MAX);
395  	        const char *upgrade_rc = pcmk__xe_get(request, PCMK__XA_CIB_UPGRADE_RC);
396  	
397  	        pcmk__trace("Parsing upgrade %s for %s with max=%s and upgrade_rc=%s",
398  	                    (is_reply? "reply" : "request"),
399  	                    (based_is_primary? "primary" : "secondary"),
400  	                    pcmk__s(max, "none"), pcmk__s(upgrade_rc, "none"));
401  	
402  	        if (upgrade_rc != NULL) {
403  	            // Our upgrade request was rejected by DC, notify clients of result
404  	            pcmk__xe_set(request, PCMK__XA_CIB_RC, upgrade_rc);
405  	
406  	        } else if ((max == NULL) && based_is_primary) {
407  	            /* We are the DC, check if this upgrade is allowed */
408  	            goto skip_is_reply;
409  	
410  	        } else if(max) {
411  	            /* Ok, go ahead and upgrade to 'max' */
412  	            goto skip_is_reply;
413  	
414  	        } else {
415  	            // Ignore broadcast client requests when we're not primary
416  	            return false;
417  	        }
418  	
419  	    } else if (is_reply
420  	               && pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
421  	
422  	        pcmk__trace("Ignoring legacy %s reply sent from %s to local clients",
423  	                    op, originator);
424  	        return false;
425  	
426  	    } else if (pcmk__str_eq(op, PCMK__CIB_REQUEST_SHUTDOWN, pcmk__str_none)) {
427  	        *local_notify = false;
428  	        if (reply_to == NULL) {
429  	            *process = true;
430  	        } else { // Not possible?
431  	            pcmk__debug("Ignoring shutdown request from %s because reply_to=%s",
432  	                        originator, reply_to);
433  	        }
434  	        return *process;
435  	    }
436  	
437  	    if (is_reply) {
438  	        pcmk__trace("Will notify local clients for %s reply from %s", op,
439  	                    originator);
440  	        *process = false;
441  	        *needs_reply = false;
442  	        *local_notify = true;
443  	        return true;
444  	    }
445  	
446  	  skip_is_reply:
447  	    *process = true;
448  	    *needs_reply = false;
449  	
450  	    *local_notify = pcmk__str_eq(delegated, OUR_NODENAME, pcmk__str_casei);
451  	
452  	    host = pcmk__xe_get(request, PCMK__XA_CIB_HOST);
453  	    if (pcmk__str_eq(host, OUR_NODENAME, pcmk__str_casei)) {
454  	        pcmk__trace("Processing %s request sent to us from %s", op, originator);
455  	        *needs_reply = true;
456  	        return true;
457  	
458  	    } else if (host != NULL) {
459  	        pcmk__trace("Ignoring %s request intended for CIB manager on %s", op,
460  	                    host);
461  	        return false;
462  	
463  	    } else if (!is_reply && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) {
464  	        *needs_reply = true;
465  	    }
466  	
467  	    pcmk__trace("Processing %s request broadcast by %s call %s on %s "
468  	                "(local clients will%s be notified)", op,
469  	                pcmk__s(pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME),
470  	                        "client"),
471  	                pcmk__s(pcmk__xe_get(request, PCMK__XA_CIB_CALLID),
472  	                        "without ID"),
473  	                originator, (*local_notify? "" : "not"));
474  	    return true;
475  	}
476  	
477  	/*!
478  	 * \internal
479  	 * \brief Forward a CIB request to the appropriate target host(s)
480  	 *
481  	 * \param[in] request  CIB request to forward
482  	 */
483  	static void
484  	forward_request(xmlNode *request)
485  	{
486  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
487  	    const char *section = pcmk__xe_get(request, PCMK__XA_CIB_SECTION);
488  	    const char *host = pcmk__xe_get(request, PCMK__XA_CIB_HOST);
489  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
490  	    const char *client_name = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME);
491  	    const char *call_id = pcmk__xe_get(request, PCMK__XA_CIB_CALLID);
492  	    pcmk__node_status_t *peer = NULL;
493  	
494  	    int log_level = LOG_INFO;
495  	
496  	    if (pcmk__str_eq(op, PCMK__CIB_REQUEST_NOOP, pcmk__str_none)) {
497  	        log_level = LOG_DEBUG;
498  	    }
499  	
500  	    do_crm_log(log_level,
501  	               "Forwarding %s operation for section %s to %s (origin=%s/%s/%s)",
502  	               pcmk__s(op, "invalid"),
503  	               pcmk__s(section, "all"),
504  	               pcmk__s(host, "all"),
505  	               pcmk__s(originator, "local"),
506  	               pcmk__s(client_name, "unspecified"),
507  	               pcmk__s(call_id, "unspecified"));
508  	
509  	    pcmk__xe_set(request, PCMK__XA_CIB_DELEGATED_FROM, OUR_NODENAME);
510  	
511  	    if (host != NULL) {
512  	        peer = pcmk__get_node(0, host, NULL, pcmk__node_search_cluster_member);
513  	    }
514  	    pcmk__cluster_send_message(peer, pcmk_ipc_based, request);
515  	
516  	    // Return the request to its original state
517  	    pcmk__xe_remove_attr(request, PCMK__XA_CIB_DELEGATED_FROM);
518  	}
519  	
520  	/*!
521  	 * \internal
522  	 * \brief Get a CIB operation's input from the request XML
523  	 *
524  	 * \param[in]  request  CIB request XML
525  	 * \param[in]  type     CIB operation type
526  	 * \param[out] section  Where to store CIB section name
527  	 *
528  	 * \return Input XML for CIB operation
529  	 *
530  	 * \note If not \c NULL, the return value is a non-const pointer to part of
531  	 *       \p request. The caller should not free it directly.
532  	 */
533  	static xmlNode *
534  	prepare_input(const xmlNode *request, enum cib__op_type type,
535  	              const char **section)
536  	{
537  	    xmlNode *wrapper = pcmk__xe_first_child(request, PCMK__XE_CIB_CALLDATA,
538  	                                            NULL, NULL);
539  	    xmlNode *input = pcmk__xe_first_child(wrapper, NULL, NULL, NULL);
540  	
541  	    if (type == cib__op_apply_patch) {
542  	        *section = NULL;
543  	    } else {
544  	        *section = pcmk__xe_get(request, PCMK__XA_CIB_SECTION);
545  	    }
546  	
547  	    // Grab the specified section
548  	    if ((*section != NULL) && pcmk__xe_is(input, PCMK_XE_CIB)) {
549  	        input = pcmk_find_cib_element(input, *section);
550  	    }
551  	
552  	    return input;
553  	}
554  	
555  	static int
556  	cib_process_command(xmlNode *request, const cib__operation_t *operation,
557  	                    cib__op_fn_t op_function, xmlNode **reply, bool privileged)
558  	{
559  	    xmlNode *cib_diff = NULL;
560  	    xmlNode *input = NULL;
561  	    xmlNode *output = NULL;
562  	    xmlNode *result_cib = NULL;
563  	
564  	    uint32_t call_options = cib_none;
565  	
566  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
567  	    const char *section = NULL;
568  	    const char *call_id = pcmk__xe_get(request, PCMK__XA_CIB_CALLID);
569  	    const char *client_id = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTID);
570  	    const char *client_name = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME);
571  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
572  	
573  	    int rc = pcmk_rc_ok;
574  	
575  	    bool config_changed = false;
576  	    bool manage_counters = true;
577  	
578  	    static mainloop_timer_t *digest_timer = NULL;
579  	
580  	    pcmk__assert(cib_status == pcmk_rc_ok);
581  	
582  	    if (digest_timer == NULL) {
583  	        digest_timer = mainloop_timer_add("based_digest_timer", 5000, false,
584  	                                          digest_timer_cb, NULL);
585  	    }
586  	
587  	    *reply = NULL;
588  	
589  	    /* Start processing the request... */
590  	    rc = pcmk__xe_get_flags(request, PCMK__XA_CIB_CALLOPT, &call_options,
591  	                            cib_none);
592  	    if (rc != pcmk_rc_ok) {
593  	        pcmk__warn("Couldn't parse options from request: %s", pcmk_rc_str(rc));
594  	    }
595  	
596  	    if (!privileged
597  	        && pcmk__is_set(operation->flags, cib__op_attr_privileged)) {
598  	
599  	        rc = EACCES;
600  	        goto done;
601  	    }
602  	
603  	    input = prepare_input(request, operation->type, &section);
604  	
605  	    if (!pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
606  	        rc = cib__perform_query(op, call_options, op_function, section, request,
607  	                                input, &the_cib, &output);
608  	        goto done;
609  	    }
610  	
611  	    /* @COMPAT: Handle a valid write action (legacy)
612  	     *
613  	     * @TODO: Re-evaluate whether this is truly legacy. PCMK__XA_CIB_UPDATE may
614  	     * be set by a sync operation even in non-legacy mode, and manage_counters
615  	     * tells xml_create_patchset() whether to update version/epoch info.
616  	     */
617  	    if (pcmk__xe_attr_is_true(request, PCMK__XA_CIB_UPDATE)) {
618  	        manage_counters = false;
619  	        CRM_LOG_ASSERT(pcmk__str_eq(op, PCMK__CIB_REQUEST_REPLACE,
620  	                                    pcmk__str_none));
621  	    }
622  	
623  	    ping_modified_since = true;
624  	
625  	    /* result_cib must not be modified after cib_perform_op() returns.
626  	     *
627  	     * It's not important whether the client variant is cib_native or
628  	     * cib_remote.
629  	     */
630  	    rc = cib_perform_op(cib_undefined, op, call_options, op_function, section,
631  	                        request, input, manage_counters, &config_changed,
632  	                        &the_cib, &result_cib, &cib_diff, &output);
633  	
634  	    /* Always write to disk for successful ops with the flag set. This also
635  	     * negates the need to detect ordering changes.
636  	     */
637  	    if ((rc == pcmk_rc_ok)
638  	        && pcmk__is_set(operation->flags, cib__op_attr_writes_through)) {
639  	
640  	        config_changed = true;
641  	    }
642  	
643  	    if ((rc == pcmk_rc_ok)
644  	        && !pcmk__any_flags_set(call_options, cib_dryrun|cib_transaction)) {
645  	
646  	        if (result_cib != the_cib) {
647  	            if (pcmk__is_set(operation->flags, cib__op_attr_writes_through)) {
648  	                config_changed = true;
649  	            }
650  	
651  	            rc = based_activate_cib(result_cib, config_changed, op);
652  	        }
653  	
654  	        /* @COMPAT Nodes older than feature set 3.19.0 don't support
655  	         * transactions. In a mixed-version cluster with nodes <3.19.0, we must
656  	         * sync the updated CIB, so that the older nodes receive the changes.
657  	         * Any node that has already applied the transaction will ignore the
658  	         * synced CIB.
659  	         *
660  	         * To ensure the updated CIB is synced from only one node, we sync it
661  	         * from the originator.
662  	         */
663  	        if ((operation->type == cib__op_commit_transact)
664  	            && pcmk__str_eq(originator, OUR_NODENAME, pcmk__str_casei)
665  	            && (pcmk__compare_versions(pcmk__xe_get(the_cib,
666  	                                                    PCMK_XA_CRM_FEATURE_SET),
667  	                                       "3.19.0") < 0)) {
668  	
669  	            sync_our_cib(request, true);
670  	        }
671  	
672  	        mainloop_timer_start(digest_timer);
673  	
674  	    } else if (rc == pcmk_rc_schema_validation) {
675  	        pcmk__assert(result_cib != the_cib);
676  	
677  	        if (output != NULL) {
678  	            pcmk__log_xml_info(output, "cib:output");
679  	            pcmk__xml_free(output);
680  	        }
681  	
682  	        output = result_cib;
683  	
684  	    } else if (result_cib != the_cib) {
685  	        pcmk__xml_free(result_cib);
686  	    }
687  	
688  	    if (!pcmk__any_flags_set(call_options,
689  	                             cib_dryrun|cib_inhibit_notify|cib_transaction)) {
690  	        pcmk__trace("Sending notifications %d",
691  	                    pcmk__is_set(call_options, cib_dryrun));
692  	        based_diff_notify(op, pcmk_rc2legacy(rc), call_id, client_id,
693  	                          client_name, originator, cib_diff);
694  	    }
695  	
696  	  done:
697  	    if (!pcmk__is_set(call_options, cib_discard_reply)) {
698  	        *reply = create_cib_reply(op, call_id, client_id, call_options,
699  	                                  pcmk_rc2legacy(rc), output);
700  	    }
701  	
702  	    if (output != the_cib) {
703  	        pcmk__xml_free(output);
704  	    }
705  	
706  	    pcmk__trace("done");
707  	    pcmk__xml_free(cib_diff);
708  	    return rc;
709  	}
710  	
711  	/*!
712  	 * \internal
713  	 * \brief Log the result of processing a CIB request locally
714  	 *
715  	 * \param[in] request    Request XML
716  	 * \param[in] operation  Operation info
717  	 * \param[in] rc         Return code from processing the request
718  	 * \param[in] elapsed    How long processing took in seconds
719  	 */
720  	static void
721  	log_op_result(const xmlNode *request, const cib__operation_t *operation, int rc,
722  	              double elapsed)
723  	{
724  	    int level = LOG_INFO;
725  	
726  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
727  	    const char *section = pcmk__xe_get(request, PCMK__XA_CIB_SECTION);
728  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
729  	    const char *client_name = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME);
730  	    const char *call_id = pcmk__xe_get(request, PCMK__XA_CIB_CALLID);
731  	
732  	    int admin_epoch = 0;
733  	    int epoch = 0;
734  	    int num_updates = 0;
735  	
736  	    if (!pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
737  	        level = LOG_TRACE;
738  	
739  	    } else if (rc != pcmk_rc_ok) {
740  	        level = LOG_WARNING;
741  	    }
742  	
743  	    section = pcmk__s(section, "'all'");
744  	    originator = pcmk__s(originator, "local");
745  	    client_name = pcmk__s(client_name, "client");
746  	
747  	    /* @FIXME the_cib should always be non-NULL, but that's currently not the
748  	     * case during shutdown
749  	     */
750  	    if (the_cib != NULL) {
751  	        pcmk__xe_get_int(the_cib, PCMK_XA_ADMIN_EPOCH, &admin_epoch);
752  	        pcmk__xe_get_int(the_cib, PCMK_XA_EPOCH, &epoch);
753  	        pcmk__xe_get_int(the_cib, PCMK_XA_NUM_UPDATES, &num_updates);
754  	    }
755  	
756  	    do_crm_log(level,
757  	               "Completed %s operation for section %s: %s (rc=%d, "
758  	               "origin=%s/%s/%s, version=%d.%d.%d)",
759  	               op, section, pcmk_rc_str(rc), rc,
760  	               originator, client_name, call_id,
761  	               admin_epoch, epoch, num_updates);
762  	
763  	    if (elapsed > 3) {
764  	        pcmk__trace("%s operation took %.2fs to complete", op, elapsed);
765  	        crm_write_blackbox(0, NULL);
766  	    }
767  	}
768  	
769  	static void
770  	send_peer_reply(xmlNode *msg, const char *originator)
771  	{
772  	    const pcmk__node_status_t *node = NULL;
773  	
774  	    if ((msg == NULL) || (originator == NULL)) {
775  	        return;
776  	    }
777  	
778  	    // Send reply via cluster to originating node
779  	    node = pcmk__get_node(0, originator, NULL,
780  	                          pcmk__node_search_cluster_member);
781  	
782  	    pcmk__trace("Sending request result to %s only", originator);
783  	    pcmk__xe_set(msg, PCMK__XA_CIB_ISREPLYTO, originator);
784  	    pcmk__cluster_send_message(node, pcmk_ipc_based, msg);
785  	}
786  	
787  	/*!
788  	 * \internal
789  	 * \brief Handle an IPC or CPG message containing a request
790  	 *
791  	 * \param[in,out] request     Request XML
792  	 * \param[in]     privileged  If \c true, operations with
793  	 *                            \c cib__op_attr_privileged can be run
794  	 * \param[in]     client      IPC client that sent request (\c NULL if request
795  	 *                            came from CPG)
796  	 *
797  	 * \return Standard Pacemaker return code
798  	 */
799  	int
800  	based_process_request(xmlNode *request, bool privileged,
801  	                      const pcmk__client_t *client)
802  	{
803  	    // @TODO: Break into multiple smaller functions
804  	    uint32_t call_options = cib_none;
805  	
806  	    bool process = true;        // Whether to process request locally now
807  	    bool needs_reply = true;    // Whether to build a reply
808  	    bool local_notify = false;  // Whether to notify (local) requester
809  	    bool needs_forward = false; // Whether to forward request somewhere else
810  	
811  	    xmlNode *reply = NULL;
812  	
813  	    int rc = pcmk_rc_ok;
814  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
815  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
816  	    const char *host = pcmk__xe_get(request, PCMK__XA_CIB_HOST);
817  	    const char *call_id = pcmk__xe_get(request, PCMK__XA_CIB_CALLID);
818  	    const char *client_id = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTID);
819  	    const char *client_name = pcmk__s(pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME),
820  	                                      "client");
821  	    const char *reply_to = pcmk__xe_get(request, PCMK__XA_CIB_ISREPLYTO);
822  	
823  	    const cib__operation_t *operation = NULL;
824  	    cib__op_fn_t op_function = NULL;
825  	
826  	    rc = pcmk__xe_get_flags(request, PCMK__XA_CIB_CALLOPT, &call_options,
827  	                            cib_none);
828  	    if (rc != pcmk_rc_ok) {
829  	        pcmk__warn("Couldn't parse options from request: %s", pcmk_rc_str(rc));
830  	    }
831  	
832  	    if (pcmk__str_empty(host)) {
833  	        host = NULL;
834  	    }
835  	
836  	    if (client == NULL) {
837  	        pcmk__trace("Processing peer %s operation from %s/%s on %s intended "
838  	                    "for %s (reply=%s)", op, client_name, call_id, originator,
839  	                    pcmk__s(host, "all"), reply_to);
840  	    } else {
841  	        pcmk__xe_set(request, PCMK__XA_SRC, OUR_NODENAME);
842  	        pcmk__trace("Processing local %s operation from %s/%s intended for %s",
843  	                    op, client_name, call_id, pcmk__s(host, "all"));
844  	    }
845  	
846  	    rc = cib__get_operation(op, &operation);
847  	    if (rc != pcmk_rc_ok) {
848  	        /* TODO: construct error reply? */
849  	        pcmk__err("Pre-processing of command failed: %s", pcmk_rc_str(rc));
850  	        return rc;
851  	    }
852  	
853  	    op_function = based_get_op_function(operation);
854  	    if (op_function == NULL) {
855  	        pcmk__err("Operation %s not supported by CIB manager", op);
856  	        return EOPNOTSUPP;
857  	    }
858  	
859  	    if (client != NULL) {
860  	        parse_local_options(client, operation, host, op, &local_notify,
861  	                            &needs_reply, &process, &needs_forward);
862  	
863  	    } else if (!parse_peer_options(operation, request, &local_notify,
864  	                                   &needs_reply, &process)) {
865  	        return pcmk_rc_ok;
866  	    }
867  	
868  	    if (pcmk__is_set(call_options, cib_transaction)) {
869  	        /* All requests in a transaction are processed locally against a working
870  	         * CIB copy, and we don't notify for individual requests because the
871  	         * entire transaction is atomic.
872  	         *
873  	         * We still call the option parser functions above, for the sake of log
874  	         * messages and checking whether we're the target for peer requests.
875  	         */
876  	        process = true;
877  	        needs_reply = false;
878  	        local_notify = false;
879  	        needs_forward = false;
880  	    }
881  	
882  	    if (pcmk__is_set(call_options, cib_discard_reply)) {
883  	        /* If the request will modify the CIB, and we are in legacy mode, we
884  	         * need to build a reply so we can broadcast a diff, even if the
885  	         * requester doesn't want one.
886  	         */
887  	        needs_reply = false;
888  	        local_notify = false;
889  	        pcmk__trace("Client is not interested in the reply");
890  	    }
891  	
892  	    if (needs_forward) {
893  	        forward_request(request);
894  	        return pcmk_rc_ok;
895  	    }
896  	
897  	    if (cib_status != pcmk_rc_ok) {
898  	        rc = cib_status;
899  	        pcmk__err("Ignoring request because cluster configuration is invalid "
900  	                  "(please repair and restart): %s", pcmk_rc_str(rc));
901  	        reply = create_cib_reply(op, call_id, client_id, call_options,
902  	                                 pcmk_rc2legacy(rc), the_cib);
903  	
904  	    } else if (process) {
905  	        time_t start_time = time(NULL);
906  	
907  	        rc = cib_process_command(request, operation, op_function, &reply,
908  	                                 privileged);
909  	        log_op_result(request, operation, rc, difftime(time(NULL), start_time));
910  	
911  	        if ((reply == NULL) && (needs_reply || local_notify)) {
912  	            pcmk__err("Unexpected NULL reply to message");
913  	            pcmk__log_xml_err(request, "null reply");
914  	            goto done;
915  	        }
916  	    }
917  	
918  	    if (pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
919  	        pcmk__trace("Completed pre-sync update from %s/%s/%s%s",
920  	                    pcmk__s(originator, "local"), client_name, call_id,
921  	                    (local_notify? " with local notification" : ""));
922  	
923  	    } else if (needs_reply && !stand_alone && (client == NULL)
924  	               && !pcmk__is_set(call_options, cib_discard_reply)) {
925  	        send_peer_reply(reply, originator);
926  	    }
927  	
928  	    if (local_notify && (client_id != NULL)) {
929  	        do_local_notify((process? reply : request), client_id,
930  	                        pcmk__is_set(call_options, cib_sync_call),
931  	                        (client == NULL));
932  	    }
933  	
934  	done:
935  	    pcmk__xml_free(reply);
936  	    return rc;
937  	}
938  	
939  	void
940  	based_peer_callback(xmlNode *msg, void *private_data)
941  	{
942  	    const char *reason = NULL;
943  	    const char *originator = pcmk__xe_get(msg, PCMK__XA_SRC);
944  	
945  	    if (pcmk__peer_cache == NULL) {
946  	        reason = "membership not established";
947  	        goto bail;
948  	    }
949  	
950  	    if (pcmk__xe_get(msg, PCMK__XA_CIB_CLIENTNAME) == NULL) {
951  	        pcmk__xe_set(msg, PCMK__XA_CIB_CLIENTNAME, originator);
952  	    }
953  	
954  	    based_process_request(msg, true, NULL);
955  	    return;
956  	
957  	  bail:
958  	    if (reason) {
959  	        const char *op = pcmk__xe_get(msg, PCMK__XA_CIB_OP);
960  	
961  	        pcmk__warn("Discarding %s message from %s: %s", op, originator, reason);
962  	    }
963  	}
964  	
965  	static gboolean
966  	cib_force_exit(gpointer data)
967  	{
968  	    pcmk__notice("Exiting immediately after %s without shutdown acknowledgment",
969  	                 pcmk__readable_interval(EXIT_ESCALATION_MS));
970  	    based_terminate(CRM_EX_ERROR);
971  	    return FALSE;
972  	}
973  	
974  	static void
975  	disconnect_remote_client(gpointer key, gpointer value, gpointer user_data)
976  	{
977  	    pcmk__client_t *a_client = value;
978  	
979  	    pcmk__err("Can't disconnect client %s: Not implemented",
980  	              pcmk__client_name(a_client));
981  	}
982  	
983  	static void
984  	initiate_exit(void)
985  	{
986  	    int active = 0;
987  	    xmlNode *leaving = NULL;
988  	
989  	    active = pcmk__cluster_num_active_nodes();
990  	    if (active < 2) { // This is the last active node
991  	        pcmk__info("Exiting without sending shutdown request (no active "
992  	                   "peers)");
993  	        based_terminate(CRM_EX_OK);
994  	        return;
995  	    }
996  	
997  	    pcmk__info("Sending shutdown request to %d peers", active);
998  	
999  	    leaving = pcmk__xe_create(NULL, PCMK__XE_EXIT_NOTIFICATION);
1000 	    pcmk__xe_set(leaving, PCMK__XA_T, PCMK__VALUE_CIB);
1001 	    pcmk__xe_set(leaving, PCMK__XA_CIB_OP, PCMK__CIB_REQUEST_SHUTDOWN);
1002 	
1003 	    pcmk__cluster_send_message(NULL, pcmk_ipc_based, leaving);
1004 	    pcmk__xml_free(leaving);
1005 	
1006 	    pcmk__create_timer(EXIT_ESCALATION_MS, cib_force_exit, NULL);
1007 	}
1008 	
1009 	void
1010 	based_shutdown(int nsig)
1011 	{
1012 	    struct qb_ipcs_stats srv_stats;
1013 	
1014 	    if (!cib_shutdown_flag) {
1015 	        int disconnects = 0;
1016 	        qb_ipcs_connection_t *c = NULL;
1017 	
1018 	        cib_shutdown_flag = true;
1019 	
1020 	        c = qb_ipcs_connection_first_get(ipcs_rw);
1021 	        while (c != NULL) {
1022 	            qb_ipcs_connection_t *last = c;
1023 	
1024 	            c = qb_ipcs_connection_next_get(ipcs_rw, last);
1025 	
1026 	            pcmk__debug("Disconnecting r/w client %p...", last);
1027 	            qb_ipcs_disconnect(last);
1028 	            qb_ipcs_connection_unref(last);
1029 	            disconnects++;
1030 	        }
1031 	
1032 	        c = qb_ipcs_connection_first_get(ipcs_ro);
1033 	        while (c != NULL) {
1034 	            qb_ipcs_connection_t *last = c;
1035 	
1036 	            c = qb_ipcs_connection_next_get(ipcs_ro, last);
1037 	
1038 	            pcmk__debug("Disconnecting r/o client %p...", last);
1039 	            qb_ipcs_disconnect(last);
1040 	            qb_ipcs_connection_unref(last);
1041 	            disconnects++;
1042 	        }
1043 	
1044 	        c = qb_ipcs_connection_first_get(ipcs_shm);
1045 	        while (c != NULL) {
1046 	            qb_ipcs_connection_t *last = c;
1047 	
1048 	            c = qb_ipcs_connection_next_get(ipcs_shm, last);
1049 	
1050 	            pcmk__debug("Disconnecting non-blocking r/w client %p...", last);
1051 	            qb_ipcs_disconnect(last);
1052 	            qb_ipcs_connection_unref(last);
1053 	            disconnects++;
1054 	        }
1055 	
1056 	        disconnects += pcmk__ipc_client_count();
1057 	
1058 	        pcmk__debug("Disconnecting %d remote clients",
1059 	                    pcmk__ipc_client_count());
1060 	        pcmk__foreach_ipc_client(disconnect_remote_client, NULL);
1061 	        pcmk__info("Disconnected %d clients", disconnects);
1062 	    }
1063 	
1064 	    qb_ipcs_stats_get(ipcs_rw, &srv_stats, QB_FALSE);
1065 	
1066 	    if (pcmk__ipc_client_count() == 0) {
1067 	        pcmk__info("All clients disconnected (%d)", srv_stats.active_connections);
1068 	        initiate_exit();
1069 	
1070 	    } else {
1071 	        pcmk__info("Waiting on %d clients to disconnect (%d)",
1072 	                   pcmk__ipc_client_count(), srv_stats.active_connections);
1073 	    }
1074 	}
1075 	
1076 	/*!
1077 	 * \internal
1078 	 * \brief Close remote sockets, free the global CIB and quit
1079 	 *
1080 	 * \param[in] exit_status  What exit status to use (if -1, use CRM_EX_OK, but
1081 	 *                         skip disconnecting from the cluster layer)
1082 	 */
1083 	void
1084 	based_terminate(int exit_status)
1085 	{
(1) Event path: Condition "remote_fd > 0", taking true branch.
1086 	    if (remote_fd > 0) {
1087 	        close(remote_fd);
1088 	        remote_fd = 0;
1089 	    }
(2) Event path: Condition "remote_tls_fd > 0", taking true branch.
1090 	    if (remote_tls_fd > 0) {
1091 	        close(remote_tls_fd);
1092 	        remote_tls_fd = 0;
1093 	    }
1094 	
CID (unavailable; MK=a2bec0b8ef6c7e3bfcd2b76f64b922db) (#1 of 1): Inconsistent C union access (INCONSISTENT_UNION_ACCESS):
(3) Event assign_union_field: The union field "in" of "_pp" is written.
(4) Event inconsistent_union_field_access: In "_pp.out", the union field used: "out" is inconsistent with the field most recently stored: "in".
1095 	    g_clear_pointer(&the_cib, pcmk__xml_free);
1096 	
1097 	    // Exit immediately on error
1098 	    if (exit_status > CRM_EX_OK) {
1099 	        pcmk__stop_based_ipc(ipcs_ro, ipcs_rw, ipcs_shm);
1100 	        crm_exit(exit_status);
1101 	        return;
1102 	    }
1103 	
1104 	    if ((mainloop != NULL) && g_main_loop_is_running(mainloop)) {
1105 	        /* Quit via returning from the main loop. If exit_status has the special
1106 	         * value -1, we skip the disconnect here, and it will be done when the
1107 	         * main loop returns (this allows the peer status callback to avoid
1108 	         * messing with the peer caches).
1109 	         */
1110 	        if (exit_status == CRM_EX_OK) {
1111 	            pcmk_cluster_disconnect(crm_cluster);
1112 	        }
1113 	        g_main_loop_quit(mainloop);
1114 	        return;
1115 	    }
1116 	
1117 	    /* Exit cleanly. Even the peer status callback can disconnect here, because
1118 	     * we're not returning control to the caller.
1119 	     */
1120 	    pcmk_cluster_disconnect(crm_cluster);
1121 	    pcmk__stop_based_ipc(ipcs_ro, ipcs_rw, ipcs_shm);
1122 	    crm_exit(CRM_EX_OK);
1123 	}
1124