1    	/*
2    	 * Copyright 2004-2026 the Pacemaker project contributors
3    	 *
4    	 * The version control history for this file may have further details.
5    	 *
6    	 * This source code is licensed under the GNU General Public License version 2
7    	 * or later (GPLv2+) WITHOUT ANY WARRANTY.
8    	 */
9    	
10   	#include <crm_internal.h>
11   	
12   	#include <errno.h>                  // EACCES, ECONNREFUSED
13   	#include <stdbool.h>
14   	#include <stddef.h>                 // NULL, size_t
15   	#include <stdint.h>                 // uint32_t, uint64_t
16   	#include <stdlib.h>                 // free
17   	#include <syslog.h>                 // LOG_INFO, LOG_DEBUG
18   	#include <time.h>                   // time_t
19   	#include <unistd.h>                 // close
20   	
21   	#include <glib.h>                   // gboolean, gpointer, g_*, etc.
22   	#include <libxml/tree.h>            // xmlNode
23   	#include <qb/qbipcs.h>              // qb_ipcs_connection_t
24   	#include <qb/qblog.h>               // LOG_TRACE
25   	
26   	#include <crm/cib.h>                // cib_call_options values
27   	#include <crm/cib/internal.h>       // cib__*
28   	#include <crm/cluster.h>            // pcmk_cluster_disconnect
29   	#include <crm/cluster/internal.h>   // pcmk__cluster_send_message
30   	#include <crm/common/internal.h>    // pcmk__s, pcmk__str_eq
31   	#include <crm/common/ipc.h>         // crm_ipc_*, pcmk_ipc_*
32   	#include <crm/common/logging.h>     // CRM_LOG_ASSERT, CRM_CHECK
33   	#include <crm/common/mainloop.h>    // mainloop_*
34   	#include <crm/common/results.h>     // pcmk_rc_*
35   	#include <crm/common/xml.h>         // PCMK_XA_*, PCMK_XE_*
36   	#include <crm/crm.h>                // CRM_OP_*
37   	
38   	#include "pacemaker-based.h"
39   	
40   	#define EXIT_ESCALATION_MS 10000
41   	
42   	static long long ping_seq = 0;
43   	static char *ping_digest = NULL;
44   	static bool ping_modified_since = false;
45   	
46   	/*!
47   	 * \internal
48   	 * \brief Create reply XML for a CIB request
49   	 *
50   	 * \param[in] request    CIB request
51   	 * \param[in] rc         Request return code (standard Pacemaker return code)
52   	 * \param[in] call_data  Request output data (may be entire live CIB or result
53   	 *                       CIB in case of error)
54   	 *
55   	 * \return Reply XML (guaranteed not to be \c NULL)
56   	 *
57   	 * \note The caller is responsible for freeing the return value using
58   	 *       \p pcmk__xml_free().
59   	 */
60   	static xmlNode *
61   	create_cib_reply(const xmlNode *request, int rc, xmlNode *call_data)
62   	{
63   	    xmlNode *reply = pcmk__xe_create(NULL, PCMK__XE_CIB_REPLY);
64   	
65   	    pcmk__xe_set(reply, PCMK__XA_T, PCMK__VALUE_CIB);
66   	
67   	    /* We could simplify by copying all attributes from request. We would just
68   	     * have to ensure that there are never "private" attributes that we want to
69   	     * hide from external clients with notify callbacks.
70   	     */
71   	    pcmk__xe_set(reply, PCMK__XA_CIB_OP,
72   	                 pcmk__xe_get(request, PCMK__XA_CIB_OP));
73   	
74   	    pcmk__xe_set(reply, PCMK__XA_CIB_CALLID,
75   	                 pcmk__xe_get(request, PCMK__XA_CIB_CALLID));
76   	
77   	    pcmk__xe_set(reply, PCMK__XA_CIB_CLIENTID,
78   	                 pcmk__xe_get(request, PCMK__XA_CIB_CLIENTID));
79   	
80   	    pcmk__xe_set(reply, PCMK__XA_CIB_CALLOPT,
81   	                 pcmk__xe_get(request, PCMK__XA_CIB_CALLOPT));
82   	
83   	    pcmk__xe_set_int(reply, PCMK__XA_CIB_RC, pcmk_rc2legacy(rc));
84   	    cib__set_calldata(reply, call_data);
85   	
86   	    crm_log_xml_explicit(reply, "cib:reply");
87   	    return reply;
88   	}
89   	
90   	static void
91   	do_local_notify(const xmlNode *xml, const char *client_id, bool sync_reply,
92   	                bool from_peer)
93   	{
94   	    int call_id = 0;
95   	    int rc = pcmk_rc_ok;
96   	    pcmk__client_t *client = NULL;
97   	    uint32_t flags = crm_ipc_server_event;
98   	    const char *client_type = NULL;
99   	    const char *client_name = NULL;
100  	    const char *client_desc = "";
101  	    const char *sync_s = (sync_reply? "synchronously" : "asynchronously");
102  	
103  	    CRM_CHECK((xml != NULL) && (client_id != NULL), return);
104  	
105  	    if (from_peer) {
106  	        client_desc = " (originator of delegated_request)";
107  	    }
108  	
109  	    pcmk__trace("Performing local %s notification for %s", sync_s, client_id);
110  	
111  	    pcmk__xe_get_int(xml, PCMK__XA_CIB_CALLID, &call_id);
112  	
113  	    client = pcmk__find_client_by_id(client_id);
114  	    if (client == NULL) {
115  	        pcmk__debug("Could not notify client %s%s %s of call %d result: client "
116  	                    "no longer exists", client_id, client_desc, sync_s,
117  	                    call_id);
118  	        return;
119  	    }
120  	
121  	    client_type = pcmk__client_type_str(PCMK__CLIENT_TYPE(client));
122  	    client_name = pcmk__client_name(client);
123  	
124  	    if (sync_reply) {
125  	        flags = crm_ipc_flags_none;
126  	        if (client->ipcs != NULL) {
127  	            call_id = client->request_id;
128  	            client->request_id = 0;
129  	        }
130  	    }
131  	
132  	    switch (PCMK__CLIENT_TYPE(client)) {
133  	        case pcmk__client_ipc:
134  	            rc = pcmk__ipc_send_xml(client, call_id, xml, flags);
135  	            break;
136  	        case pcmk__client_tls:
137  	        case pcmk__client_tcp:
138  	            rc = pcmk__remote_send_xml(client->remote, xml);
139  	            break;
140  	        default:
141  	            rc = EPROTONOSUPPORT;
142  	            break;
143  	    }
144  	
145  	    if (rc == pcmk_rc_ok) {
146  	        pcmk__trace("Notified %s client %s%s %s of call %d result",
147  	                    client_type, client_name, client_desc, sync_s, call_id);
148  	    } else {
149  	        pcmk__warn("Could not notify %s client %s%s %s of call %d result: %s",
150  	                   client_type, client_name, client_desc, sync_s, call_id,
151  	                   pcmk_rc_str(rc));
152  	    }
153  	}
154  	
155  	/*!
156  	 * \internal
157  	 * \brief Request CIB digests from all peer nodes
158  	 *
159  	 * This is used as a callback that runs 5 seconds after we modify the CIB on the
160  	 * DC. It sends a ping request to all cluster nodes. They will respond by
161  	 * sending their current digests and version info, which we will validate in
162  	 * process_ping_reply(). If their digest doesn't match, we'll sync our own CIB
163  	 * to them. This helps ensure consistency across the cluster after a CIB update.
164  	 *
165  	 * \param[in] data  Ignored
166  	 *
167  	 * \return \c G_SOURCE_REMOVE (to destroy the timeout)
168  	 *
169  	 * \note It's not clear why we wait 5 seconds rather than sending the ping
170  	 *       request immediately after a performing a modifying op. Perhaps it's to
171  	 *       avoid overwhelming other nodes with ping requests when there are a lot
172  	 *       of modifying requests in a short period. The timer restarts after
173  	 *       every successful modifying op, so we send ping requests **at most**
174  	 *       every 5 seconds. Or perhaps it's a remnant of legacy mode (pre-1.1.12).
175  	 *       In any case, the other nodes shouldn't need time to process the
176  	 *       modifying op before responding to the ping request. The ping request is
177  	 *       sent after the op is sent, so it should also be received after the op
178  	 *       is received.
179  	 */
180  	static gboolean
181  	digest_timer_cb(gpointer data)
182  	{
183  	    xmlNode *ping = NULL;
184  	
185  	    if (!based_is_primary) {
186  	        // Only the DC sends a ping
187  	        return G_SOURCE_REMOVE;
188  	    }
189  	
190  	    if (++ping_seq < 0) {
191  	        ping_seq = 0;
192  	    }
193  	
194  	    g_clear_pointer(&ping_digest, free);
195  	    ping_modified_since = false;
196  	
197  	    ping = pcmk__xe_create(NULL, PCMK__XE_PING);
198  	    pcmk__xe_set(ping, PCMK__XA_T, PCMK__VALUE_CIB);
199  	    pcmk__xe_set(ping, PCMK__XA_CIB_OP, CRM_OP_PING);
200  	    pcmk__xe_set_ll(ping, PCMK__XA_CIB_PING_ID, ping_seq);
201  	    pcmk__xe_set(ping, PCMK_XA_CRM_FEATURE_SET, CRM_FEATURE_SET);
202  	
203  	    pcmk__trace("Requesting peer digests (%lld)", ping_seq);
204  	    pcmk__cluster_send_message(NULL, pcmk_ipc_based, ping);
205  	
206  	    pcmk__xml_free(ping);
207  	    return G_SOURCE_REMOVE;
208  	}
209  	
210  	/*!
211  	 * \internal
212  	 * \brief Process a reply to a \c CRM_OP_PING request
213  	 *
214  	 * See \c digest_timer_cb() for details on how the ping process works, and see
215  	 * \c based_process_ping() for the construction of the ping reply.
216  	 *
217  	 * We ignore the reply if we are no longer the DC, if the reply is malformed or
218  	 * received out of sequence, or if we may have modified the CIB since the last
219  	 * time we sent a ping request.
220  	 *
221  	 * Otherwise, we compare the CIB digest received in the reply against the digest
222  	 * of the local CIB. If the digests don't match, we sync our CIB to the node
223  	 * that sent the reply. This helps to ensure that all other nodes' views of the
224  	 * CIB eventually match the DC's view of the CIB.
225  	 *
226  	 * \param[in] reply  Ping reply
227  	 */
228  	static void
229  	process_ping_reply(const xmlNode *reply)
230  	{
231  	    const char *host = pcmk__xe_get(reply, PCMK__XA_SRC);
232  	
233  	    xmlNode *pong = cib__get_calldata(reply);
234  	    long long seq = 0;
235  	    const char *digest = pcmk__xe_get(pong, PCMK_XA_DIGEST);
236  	
237  	    xmlNode *remote_versions = cib__get_calldata(pong);
238  	
239  	    int rc = pcmk__xe_get_ll(pong, PCMK__XA_CIB_PING_ID, &seq);
240  	
241  	    if (rc != pcmk_rc_ok) {
242  	        pcmk__debug("Ignoring ping reply with unset or invalid "
243  	                    PCMK__XA_CIB_PING_ID ": %s", pcmk_rc_str(rc));
244  	        return;
245  	    }
246  	
247  	    if (!based_is_primary) {
248  	        pcmk__trace("Ignoring ping reply %lld from %s because we are no longer "
249  	                    "DC", seq, host);
250  	        return;
251  	    }
252  	
253  	    if (digest == NULL) {
254  	        pcmk__trace("Ignoring ping reply %lld from %s with no digest", seq,
255  	                    host);
256  	        return;
257  	    }
258  	
259  	    if (seq != ping_seq) {
260  	        pcmk__trace("Ignoring out-of-sequence ping reply %lld from %s", seq,
261  	                    host);
262  	        return;
263  	    }
264  	
265  	    if (ping_modified_since) {
266  	        pcmk__trace("Ignoring ping reply %lld from %s: CIB updated since", seq,
267  	                    host);
268  	        return;
269  	    }
270  	
271  	    if (ping_digest == NULL) {
272  	        ping_digest = pcmk__digest_xml(the_cib, true);
273  	    }
274  	
275  	    pcmk__trace("Processing ping reply %lld from %s (%s)", seq, host, digest);
276  	
277  	    if (pcmk__str_eq(ping_digest, digest, pcmk__str_casei)) {
278  	        return;
279  	    }
280  	
281  	    pcmk__notice("Local CIB %s.%s.%s.%s differs from %s: %s.%s.%s.%s",
282  	                 pcmk__xe_get(the_cib, PCMK_XA_ADMIN_EPOCH),
283  	                 pcmk__xe_get(the_cib, PCMK_XA_EPOCH),
284  	                 pcmk__xe_get(the_cib, PCMK_XA_NUM_UPDATES), ping_digest, host,
285  	                 pcmk__xe_get(remote_versions, PCMK_XA_ADMIN_EPOCH),
286  	                 pcmk__xe_get(remote_versions, PCMK_XA_EPOCH),
287  	                 pcmk__xe_get(remote_versions, PCMK_XA_NUM_UPDATES), digest);
288  	
289  	    sync_our_cib(reply, false);
290  	}
291  	
292  	static void
293  	parse_local_options(const pcmk__client_t *client,
294  	                    const cib__operation_t *operation,
295  	                    const char *host, const char *op, bool *local_notify,
296  	                    bool *needs_reply, bool *process, bool *needs_forward)
297  	{
298  	    // Process locally and notify local client
299  	    *process = true;
300  	    *needs_reply = false;
301  	    *local_notify = true;
302  	    *needs_forward = false;
303  	
304  	    if (pcmk__is_set(operation->flags, cib__op_attr_local)) {
305  	        /* Always process locally if cib__op_attr_local is set.
306  	         *
307  	         * @COMPAT: Currently host is ignored. At a compatibility break, throw
308  	         * an error (from based_process_request() or earlier) if host is not
309  	         * NULL or OUR_NODENAME.
310  	         */
311  	        pcmk__trace("Processing always-local %s op from client %s", op,
312  	                    pcmk__client_name(client));
313  	
314  	        if (!pcmk__str_eq(host, OUR_NODENAME,
315  	                          pcmk__str_casei|pcmk__str_null_matches)) {
316  	
317  	            pcmk__warn("Operation '%s' is always local but its target host is "
318  	                       "set to '%s'",
319  	                       op, host);
320  	        }
321  	        return;
322  	    }
323  	
324  	    if (pcmk__is_set(operation->flags, cib__op_attr_modifies)
325  	        || !pcmk__str_eq(host, OUR_NODENAME,
326  	                         pcmk__str_casei|pcmk__str_null_matches)) {
327  	
328  	        // Forward modifying and non-local requests via cluster
329  	        *process = false;
330  	        *needs_reply = false;
331  	        *local_notify = false;
332  	        *needs_forward = true;
333  	
334  	        pcmk__trace("%s op from %s needs to be forwarded to %s", op,
335  	                    pcmk__client_name(client), pcmk__s(host, "all nodes"));
336  	        return;
337  	    }
338  	
339  	    if (stand_alone) {
340  	        pcmk__trace("Processing %s op from client %s (stand-alone)", op,
341  	                    pcmk__client_name(client));
342  	
343  	    } else {
344  	        pcmk__trace("Processing %saddressed %s op from client %s",
345  	                    ((host != NULL)? "locally " : "un"), op,
346  	                    pcmk__client_name(client));
347  	    }
348  	}
349  	
350  	static bool
351  	parse_peer_options(const cib__operation_t *operation, xmlNode *request,
352  	                   bool *local_notify, bool *needs_reply, bool *process)
353  	{
354  	    /* TODO: What happens when an update comes in after node A
355  	     * requests the CIB from node B, but before it gets the reply (and
356  	     * sends out the replace operation)?
357  	     *
358  	     * (This may no longer be relevant since legacy mode was dropped; need to
359  	     * trace code more closely to check.)
360  	     */
361  	    const char *host = NULL;
362  	    const char *delegated = pcmk__xe_get(request, PCMK__XA_CIB_DELEGATED_FROM);
363  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
364  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
365  	    const char *reply_to = pcmk__xe_get(request, PCMK__XA_CIB_ISREPLYTO);
366  	
367  	    bool is_reply = pcmk__str_eq(reply_to, OUR_NODENAME, pcmk__str_casei);
368  	
369  	    if (originator == NULL) { // Shouldn't be possible
370  	        originator = "peer";
371  	    }
372  	
373  	    if (pcmk__str_eq(op, PCMK__CIB_REQUEST_REPLACE, pcmk__str_none)) {
374  	        // sync_our_cib() sets PCMK__XA_CIB_ISREPLYTO
375  	        if (reply_to) {
376  	            delegated = reply_to;
377  	        }
378  	        goto skip_is_reply;
379  	    }
380  	
381  	    if (pcmk__str_eq(op, PCMK__CIB_REQUEST_SYNC, pcmk__str_none)) {
382  	        // Nothing to do
383  	
384  	    } else if (is_reply && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) {
385  	        process_ping_reply(request);
386  	        return false;
387  	
388  	    } else if (pcmk__str_eq(op, PCMK__CIB_REQUEST_UPGRADE, pcmk__str_none)) {
389  	        /* Only the DC (node with the oldest software) should process
390  	         * this operation if PCMK__XA_CIB_SCHEMA_MAX is unset.
391  	         *
392  	         * If the DC is happy it will then send out another
393  	         * PCMK__CIB_REQUEST_UPGRADE which will tell all nodes to do the actual
394  	         * upgrade.
395  	         *
396  	         * Except this time PCMK__XA_CIB_SCHEMA_MAX will be set which puts a
397  	         * limit on how far newer nodes will go
398  	         */
399  	        const char *max = pcmk__xe_get(request, PCMK__XA_CIB_SCHEMA_MAX);
400  	        const char *upgrade_rc = pcmk__xe_get(request, PCMK__XA_CIB_UPGRADE_RC);
401  	
402  	        pcmk__trace("Parsing upgrade %s for %s with max=%s and upgrade_rc=%s",
403  	                    (is_reply? "reply" : "request"),
404  	                    (based_is_primary? "primary" : "secondary"),
405  	                    pcmk__s(max, "none"), pcmk__s(upgrade_rc, "none"));
406  	
407  	        if (upgrade_rc != NULL) {
408  	            // Our upgrade request was rejected by DC, notify clients of result
409  	            pcmk__xe_set(request, PCMK__XA_CIB_RC, upgrade_rc);
410  	
411  	        } else if ((max == NULL) && based_is_primary) {
412  	            /* We are the DC, check if this upgrade is allowed */
413  	            goto skip_is_reply;
414  	
415  	        } else if(max) {
416  	            /* Ok, go ahead and upgrade to 'max' */
417  	            goto skip_is_reply;
418  	
419  	        } else {
420  	            // Ignore broadcast client requests when we're not primary
421  	            return false;
422  	        }
423  	
424  	    } else if (is_reply
425  	               && pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
426  	
427  	        pcmk__trace("Ignoring legacy %s reply sent from %s to local clients",
428  	                    op, originator);
429  	        return false;
430  	
431  	    } else if (pcmk__str_eq(op, PCMK__CIB_REQUEST_SHUTDOWN, pcmk__str_none)) {
432  	        *local_notify = false;
433  	        if (reply_to == NULL) {
434  	            *process = true;
435  	        } else { // Not possible?
436  	            pcmk__debug("Ignoring shutdown request from %s because reply_to=%s",
437  	                        originator, reply_to);
438  	        }
439  	        return *process;
440  	    }
441  	
442  	    if (is_reply) {
443  	        pcmk__trace("Will notify local clients for %s reply from %s", op,
444  	                    originator);
445  	        *process = false;
446  	        *needs_reply = false;
447  	        *local_notify = true;
448  	        return true;
449  	    }
450  	
451  	  skip_is_reply:
452  	    *process = true;
453  	    *needs_reply = false;
454  	
455  	    *local_notify = pcmk__str_eq(delegated, OUR_NODENAME, pcmk__str_casei);
456  	
457  	    host = pcmk__xe_get(request, PCMK__XA_CIB_HOST);
458  	    if (pcmk__str_eq(host, OUR_NODENAME, pcmk__str_casei)) {
459  	        pcmk__trace("Processing %s request sent to us from %s", op, originator);
460  	        *needs_reply = true;
461  	        return true;
462  	
463  	    } else if (host != NULL) {
464  	        pcmk__trace("Ignoring %s request intended for CIB manager on %s", op,
465  	                    host);
466  	        return false;
467  	
468  	    } else if (!is_reply && pcmk__str_eq(op, CRM_OP_PING, pcmk__str_casei)) {
469  	        *needs_reply = true;
470  	    }
471  	
472  	    pcmk__trace("Processing %s request broadcast by %s call %s on %s "
473  	                "(local clients will%s be notified)", op,
474  	                pcmk__s(pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME),
475  	                        "client"),
476  	                pcmk__s(pcmk__xe_get(request, PCMK__XA_CIB_CALLID),
477  	                        "without ID"),
478  	                originator, (*local_notify? "" : "not"));
479  	    return true;
480  	}
481  	
482  	/*!
483  	 * \internal
484  	 * \brief Forward a CIB request to the appropriate target host(s)
485  	 *
486  	 * \param[in] request  CIB request to forward
487  	 */
488  	static void
489  	forward_request(xmlNode *request)
490  	{
491  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
492  	    const char *section = pcmk__xe_get(request, PCMK__XA_CIB_SECTION);
493  	    const char *host = pcmk__xe_get(request, PCMK__XA_CIB_HOST);
494  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
495  	    const char *client_name = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME);
496  	    const char *call_id = pcmk__xe_get(request, PCMK__XA_CIB_CALLID);
497  	    pcmk__node_status_t *peer = NULL;
498  	
499  	    int log_level = LOG_INFO;
500  	
501  	    if (pcmk__str_eq(op, PCMK__CIB_REQUEST_NOOP, pcmk__str_none)) {
502  	        log_level = LOG_DEBUG;
503  	    }
504  	
505  	    do_crm_log(log_level,
506  	               "Forwarding %s operation for section %s to %s (origin=%s/%s/%s)",
507  	               pcmk__s(op, "invalid"),
508  	               pcmk__s(section, "all"),
509  	               pcmk__s(host, "all"),
510  	               pcmk__s(originator, "local"),
511  	               pcmk__s(client_name, "unspecified"),
512  	               pcmk__s(call_id, "unspecified"));
513  	
514  	    pcmk__xe_set(request, PCMK__XA_CIB_DELEGATED_FROM, OUR_NODENAME);
515  	
516  	    if (host != NULL) {
517  	        peer = pcmk__get_node(0, host, NULL, pcmk__node_search_cluster_member);
518  	    }
519  	    pcmk__cluster_send_message(peer, pcmk_ipc_based, request);
520  	
521  	    // Return the request to its original state
522  	    pcmk__xe_remove_attr(request, PCMK__XA_CIB_DELEGATED_FROM);
523  	}
524  	
525  	static int
526  	cib_process_command(xmlNode *request, const cib__operation_t *operation,
527  	                    cib__op_fn_t op_function, xmlNode **reply)
528  	{
529  	    static mainloop_timer_t *digest_timer = NULL;
530  	
531  	    xmlNode *cib_diff = NULL;
532  	    xmlNode *output = NULL;
533  	    xmlNode *result_cib = NULL;
534  	
535  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
536  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
537  	    uint32_t call_options = cib_none;
538  	
539  	    bool config_changed = false;
540  	    int rc = pcmk_rc_ok;
541  	
542  	    if (digest_timer == NULL) {
543  	        digest_timer = mainloop_timer_add("based_digest_timer", 5000, false,
544  	                                          digest_timer_cb, NULL);
545  	    }
546  	
547  	    /* Start processing the request... */
548  	    pcmk__xe_get_flags(request, PCMK__XA_CIB_CALLOPT, &call_options, cib_none);
549  	
550  	    if (!pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
551  	        rc = cib__perform_op_ro(op_function, request, &the_cib, &output);
552  	        goto done;
553  	    }
554  	
555  	    /* result_cib must not be modified after cib__perform_op_rw() returns.
556  	     *
557  	     * It's not important whether the client variant is cib_native or
558  	     * cib_remote.
559  	     */
560  	    result_cib = the_cib;
561  	    rc = cib__perform_op_rw(cib_undefined, op_function, request,
562  	                            &config_changed, &result_cib, &cib_diff, &output);
563  	
564  	    if ((rc == pcmk_rc_ok)
565  	        && !pcmk__any_flags_set(call_options, cib_dryrun|cib_transaction)) {
566  	
567  	        /* Always write to disk for successful ops with the writes-through flag
568  	         * set. This also avoids the need to detect ordering changes.
569  	         */
570  	        const bool to_disk = config_changed
571  	                             || pcmk__is_set(operation->flags,
572  	                                             cib__op_attr_writes_through);
573  	
574  	        const char *feature_set = pcmk__xe_get(the_cib,
575  	                                               PCMK_XA_CRM_FEATURE_SET);
576  	
577  	        if (result_cib != the_cib) {
578  	            rc = based_activate_cib(result_cib, to_disk, op);
579  	        }
580  	
581  	        /* @COMPAT Nodes older than feature set 3.19.0 don't support
582  	         * transactions. In a mixed-version cluster with nodes <3.19.0, we must
583  	         * sync the updated CIB, so that the older nodes receive the changes.
584  	         * Any node that has already applied the transaction will ignore the
585  	         * synced CIB.
586  	         *
587  	         * To ensure the updated CIB is synced from only one node, we sync it
588  	         * from the originator.
589  	         */
590  	        if ((operation->type == cib__op_commit_transact)
591  	            && pcmk__str_eq(originator, OUR_NODENAME, pcmk__str_casei)
592  	            && (pcmk__compare_versions(feature_set, "3.19.0") < 0)) {
593  	
594  	            sync_our_cib(request, true);
595  	        }
596  	
597  	        if (cib_diff != NULL) {
598  	            ping_modified_since = true;
599  	        }
600  	
601  	        mainloop_timer_start(digest_timer);
602  	
603  	    } else if (rc == pcmk_rc_schema_validation) {
604  	        pcmk__assert(result_cib != the_cib);
605  	
606  	        if (output != NULL) {
607  	            pcmk__log_xml_info(output, "cib:output");
608  	            pcmk__xml_free(output);
609  	        }
610  	
611  	        output = result_cib;
612  	
613  	    } else if (result_cib != the_cib) {
614  	        pcmk__xml_free(result_cib);
615  	    }
616  	
617  	    if (pcmk__any_flags_set(call_options,
618  	                            cib_dryrun|cib_inhibit_notify|cib_transaction)) {
619  	        goto done;
620  	    }
621  	
622  	    based_diff_notify(request, rc, cib_diff);
623  	
624  	done:
625  	    if (!pcmk__is_set(call_options, cib_discard_reply)) {
626  	        *reply = create_cib_reply(request, rc, output);
627  	    }
628  	
629  	    if (output != the_cib) {
630  	        pcmk__xml_free(output);
631  	    }
632  	
633  	    pcmk__xml_free(cib_diff);
634  	    return rc;
635  	}
636  	
637  	/*!
638  	 * \internal
639  	 * \brief Log the result of processing a CIB request locally
640  	 *
641  	 * \param[in] request    Request XML
642  	 * \param[in] operation  Operation info
643  	 * \param[in] rc         Return code from processing the request
644  	 * \param[in] elapsed    How long processing took in seconds
645  	 */
646  	static void
647  	log_op_result(const xmlNode *request, const cib__operation_t *operation, int rc,
648  	              double elapsed)
649  	{
650  	    int level = LOG_INFO;
651  	
652  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
653  	    const char *section = pcmk__xe_get(request, PCMK__XA_CIB_SECTION);
654  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
655  	    const char *client_name = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME);
656  	    const char *call_id = pcmk__xe_get(request, PCMK__XA_CIB_CALLID);
657  	
658  	    int admin_epoch = 0;
659  	    int epoch = 0;
660  	    int num_updates = 0;
661  	
662  	    if (!pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
663  	        level = LOG_TRACE;
664  	
665  	    } else if (rc != pcmk_rc_ok) {
666  	        level = LOG_WARNING;
667  	    }
668  	
669  	    section = pcmk__s(section, "'all'");
670  	    originator = pcmk__s(originator, "local");
671  	    client_name = pcmk__s(client_name, "client");
672  	
673  	    /* @FIXME the_cib should always be non-NULL, but that's currently not the
674  	     * case during shutdown
675  	     */
676  	    if (the_cib != NULL) {
677  	        pcmk__xe_get_int(the_cib, PCMK_XA_ADMIN_EPOCH, &admin_epoch);
678  	        pcmk__xe_get_int(the_cib, PCMK_XA_EPOCH, &epoch);
679  	        pcmk__xe_get_int(the_cib, PCMK_XA_NUM_UPDATES, &num_updates);
680  	    }
681  	
682  	    do_crm_log(level,
683  	               "Completed %s operation for section %s: %s (rc=%d, "
684  	               "origin=%s/%s/%s, version=%d.%d.%d)",
685  	               op, section, pcmk_rc_str(rc), rc,
686  	               originator, client_name, call_id,
687  	               admin_epoch, epoch, num_updates);
688  	
689  	    if (elapsed > 3) {
690  	        pcmk__trace("%s operation took %.2fs to complete", op, elapsed);
691  	        crm_write_blackbox(0, NULL);
692  	    }
693  	}
694  	
695  	static void
696  	send_peer_reply(xmlNode *msg, const char *originator)
697  	{
698  	    const pcmk__node_status_t *node = NULL;
699  	
700  	    if ((msg == NULL) || (originator == NULL)) {
701  	        return;
702  	    }
703  	
704  	    // Send reply via cluster to originating node
705  	    node = pcmk__get_node(0, originator, NULL,
706  	                          pcmk__node_search_cluster_member);
707  	
708  	    pcmk__trace("Sending request result to %s only", originator);
709  	    pcmk__xe_set(msg, PCMK__XA_CIB_ISREPLYTO, originator);
710  	    pcmk__cluster_send_message(node, pcmk_ipc_based, msg);
711  	}
712  	
713  	/*!
714  	 * \internal
715  	 * \brief Handle an IPC or CPG message containing a request
716  	 *
717  	 * \param[in,out] request     Request XML
718  	 * \param[in]     privileged  If \c true, operations with
719  	 *                            \c cib__op_attr_privileged can be run
720  	 * \param[in]     client      IPC client that sent request (\c NULL if request
721  	 *                            came from CPG)
722  	 *
723  	 * \return Standard Pacemaker return code
724  	 */
725  	int
726  	based_process_request(xmlNode *request, bool privileged,
727  	                      const pcmk__client_t *client)
728  	{
729  	    // @TODO: Break into multiple smaller functions
730  	    uint32_t call_options = cib_none;
731  	
732  	    bool process = true;        // Whether to process request locally now
733  	    bool needs_reply = true;    // Whether to build a reply
734  	    bool local_notify = false;  // Whether to notify (local) requester
735  	    bool needs_forward = false; // Whether to forward request somewhere else
736  	
737  	    xmlNode *reply = NULL;
738  	
739  	    int rc = pcmk_rc_ok;
740  	    const char *op = pcmk__xe_get(request, PCMK__XA_CIB_OP);
741  	    const char *originator = pcmk__xe_get(request, PCMK__XA_SRC);
742  	    const char *host = pcmk__xe_get(request, PCMK__XA_CIB_HOST);
743  	    const char *call_id = pcmk__xe_get(request, PCMK__XA_CIB_CALLID);
744  	    const char *client_id = pcmk__xe_get(request, PCMK__XA_CIB_CLIENTID);
745  	    const char *client_name = pcmk__s(pcmk__xe_get(request, PCMK__XA_CIB_CLIENTNAME),
746  	                                      "client");
747  	    const char *reply_to = pcmk__xe_get(request, PCMK__XA_CIB_ISREPLYTO);
748  	
749  	    const cib__operation_t *operation = NULL;
750  	    cib__op_fn_t op_function = NULL;
751  	
752  	    rc = pcmk__xe_get_flags(request, PCMK__XA_CIB_CALLOPT, &call_options,
753  	                            cib_none);
754  	    if (rc != pcmk_rc_ok) {
755  	        pcmk__warn("Couldn't parse options from request: %s", pcmk_rc_str(rc));
756  	    }
757  	
758  	    if (pcmk__str_empty(host)) {
759  	        host = NULL;
760  	    }
761  	
762  	    if (client == NULL) {
763  	        pcmk__trace("Processing peer %s operation from %s/%s on %s intended "
764  	                    "for %s (reply=%s)", op, client_name, call_id, originator,
765  	                    pcmk__s(host, "all"), reply_to);
766  	    } else {
767  	        pcmk__xe_set(request, PCMK__XA_SRC, OUR_NODENAME);
768  	        pcmk__trace("Processing local %s operation from %s/%s intended for %s",
769  	                    op, client_name, call_id, pcmk__s(host, "all"));
770  	    }
771  	
(2) Event example_assign: Example 1: Assigning: "rc" = return value from "cib__get_operation(op, &operation)".
Also see events: [check_return][example_checked][example_assign][example_checked][example_assign][example_checked][example_assign][example_checked]
772  	    rc = cib__get_operation(op, &operation);
(3) Event example_checked: Example 1 (cont.): "rc" has its value checked in "rc != pcmk_rc_ok".
Also see events: [check_return][example_assign][example_assign][example_checked][example_assign][example_checked][example_assign][example_checked]
773  	    if (rc != pcmk_rc_ok) {
774  	        /* TODO: construct error reply? */
775  	        pcmk__err("Pre-processing of command failed: %s", pcmk_rc_str(rc));
776  	        return rc;
777  	    }
778  	
779  	    op_function = based_get_op_function(operation);
780  	    if (op_function == NULL) {
781  	        pcmk__err("Operation %s not supported by CIB manager", op);
782  	        return EOPNOTSUPP;
783  	    }
784  	
785  	    if (client != NULL) {
786  	        parse_local_options(client, operation, host, op, &local_notify,
787  	                            &needs_reply, &process, &needs_forward);
788  	
789  	    } else if (!parse_peer_options(operation, request, &local_notify,
790  	                                   &needs_reply, &process)) {
791  	        return pcmk_rc_ok;
792  	    }
793  	
794  	    if (pcmk__is_set(call_options, cib_transaction)) {
795  	        /* All requests in a transaction are processed locally against a working
796  	         * CIB copy, and we don't notify for individual requests because the
797  	         * entire transaction is atomic.
798  	         *
799  	         * We still call the option parser functions above, for the sake of log
800  	         * messages and checking whether we're the target for peer requests.
801  	         */
802  	        process = true;
803  	        needs_reply = false;
804  	        local_notify = false;
805  	        needs_forward = false;
806  	    }
807  	
808  	    if (pcmk__is_set(call_options, cib_discard_reply)) {
809  	        needs_reply = false;
810  	        local_notify = false;
811  	        pcmk__trace("Client is not interested in the reply");
812  	    }
813  	
814  	    if (needs_forward) {
815  	        forward_request(request);
816  	        return pcmk_rc_ok;
817  	    }
818  	
819  	    if (cib_status != pcmk_rc_ok) {
820  	        rc = cib_status;
821  	        pcmk__err("Ignoring request because cluster configuration is invalid "
822  	                  "(please repair and restart): %s", pcmk_rc_str(rc));
823  	
824  	        if (!pcmk__is_set(call_options, cib_discard_reply)) {
825  	            reply = create_cib_reply(request, rc, the_cib);
826  	        }
827  	
828  	    } else if (process) {
829  	        time_t start_time = time(NULL);
830  	
831  	        if (!privileged
832  	            && pcmk__is_set(operation->flags, cib__op_attr_privileged)) {
833  	
834  	            rc = EACCES;
835  	            if (!pcmk__is_set(call_options, cib_discard_reply)) {
836  	                reply = create_cib_reply(request, rc, NULL);
837  	            }
838  	
839  	        } else {
840  	            rc = cib_process_command(request, operation, op_function, &reply);
841  	        }
842  	
843  	        log_op_result(request, operation, rc, difftime(time(NULL), start_time));
844  	    }
845  	
846  	    if (pcmk__is_set(operation->flags, cib__op_attr_modifies)) {
847  	        pcmk__trace("Completed pre-sync update from %s/%s/%s%s",
848  	                    pcmk__s(originator, "local"), client_name, call_id,
849  	                    (local_notify? " with local notification" : ""));
850  	
851  	    } else if (needs_reply && !stand_alone && (client == NULL)) {
852  	        send_peer_reply(reply, originator);
853  	    }
854  	
855  	    if (!local_notify || (client_id == NULL)) {
856  	        goto done;
857  	    }
858  	
859  	    do_local_notify((process? reply : request), client_id,
860  	                    pcmk__is_set(call_options, cib_sync_call),
861  	                    (client == NULL));
862  	
863  	done:
864  	    pcmk__xml_free(reply);
865  	    return rc;
866  	}
867  	
868  	void
869  	based_peer_callback(xmlNode *msg, void *private_data)
870  	{
871  	    const char *reason = NULL;
872  	    const char *originator = pcmk__xe_get(msg, PCMK__XA_SRC);
873  	
874  	    if (pcmk__peer_cache == NULL) {
875  	        reason = "membership not established";
876  	        goto bail;
877  	    }
878  	
879  	    if (pcmk__xe_get(msg, PCMK__XA_CIB_CLIENTNAME) == NULL) {
880  	        pcmk__xe_set(msg, PCMK__XA_CIB_CLIENTNAME, originator);
881  	    }
882  	
883  	    based_process_request(msg, true, NULL);
884  	    return;
885  	
886  	  bail:
887  	    if (reason) {
888  	        const char *op = pcmk__xe_get(msg, PCMK__XA_CIB_OP);
889  	
890  	        pcmk__warn("Discarding %s message from %s: %s", op, originator, reason);
891  	    }
892  	}
893  	
894  	static gboolean
895  	cib_force_exit(gpointer data)
896  	{
897  	    pcmk__notice("Exiting immediately after %s without shutdown acknowledgment",
898  	                 pcmk__readable_interval(EXIT_ESCALATION_MS));
899  	    based_terminate(CRM_EX_ERROR);
900  	    return FALSE;
901  	}
902  	
903  	void
904  	based_shutdown(int nsig)
905  	{
906  	    int active = 0;
907  	    xmlNode *notification = NULL;
908  	
909  	    if (cib_shutdown_flag) {
910  	        // Already shutting down
911  	        return;
912  	    }
913  	
914  	    cib_shutdown_flag = true;
915  	
916  	    if (ipcs_ro != NULL) {
917  	        pcmk__drop_all_clients(ipcs_ro);
918  	        g_clear_pointer(&ipcs_ro, qb_ipcs_destroy);
919  	    }
920  	
921  	    if (ipcs_rw != NULL) {
922  	        pcmk__drop_all_clients(ipcs_rw);
923  	        g_clear_pointer(&ipcs_rw, qb_ipcs_destroy);
924  	    }
925  	
926  	    if (ipcs_shm != NULL) {
927  	        pcmk__drop_all_clients(ipcs_shm);
928  	        g_clear_pointer(&ipcs_shm, qb_ipcs_destroy);
929  	    }
930  	
931  	    based_drop_remote_clients();
932  	
933  	    active = pcmk__cluster_num_active_nodes();
934  	    if (active < 2) {
935  	        pcmk__info("Exiting without sending shutdown request (no active "
936  	                   "peers)");
937  	        based_terminate(CRM_EX_OK);
938  	        return;
939  	    }
940  	
941  	    pcmk__info("Sending shutdown request to %d peers", active);
942  	
943  	    notification = pcmk__xe_create(NULL, PCMK__XE_EXIT_NOTIFICATION);
944  	    pcmk__xe_set(notification, PCMK__XA_T, PCMK__VALUE_CIB);
945  	    pcmk__xe_set(notification, PCMK__XA_CIB_OP, PCMK__CIB_REQUEST_SHUTDOWN);
946  	
947  	    pcmk__cluster_send_message(NULL, pcmk_ipc_based, notification);
948  	    pcmk__xml_free(notification);
949  	
950  	    pcmk__create_timer(EXIT_ESCALATION_MS, cib_force_exit, NULL);
951  	}
952  	
953  	/*!
954  	 * \internal
955  	 * \brief Close remote sockets, free the global CIB and quit
956  	 *
957  	 * \param[in] exit_status  What exit status to use (if -1, use CRM_EX_OK, but
958  	 *                         skip disconnecting from the cluster layer)
959  	 */
960  	void
961  	based_terminate(int exit_status)
962  	{
963  	    if (remote_fd > 0) {
964  	        close(remote_fd);
965  	        remote_fd = 0;
966  	    }
967  	    if (remote_tls_fd > 0) {
968  	        close(remote_tls_fd);
969  	        remote_tls_fd = 0;
970  	    }
971  	
972  	    g_clear_pointer(&ping_digest, free);
973  	    g_clear_pointer(&the_cib, pcmk__xml_free);
974  	
975  	    // Exit immediately on error
976  	    if (exit_status > CRM_EX_OK) {
977  	        pcmk__stop_based_ipc(ipcs_ro, ipcs_rw, ipcs_shm);
978  	        crm_exit(exit_status);
979  	        return;
980  	    }
981  	
982  	    if ((mainloop != NULL) && g_main_loop_is_running(mainloop)) {
983  	        /* Quit via returning from the main loop. If exit_status has the special
984  	         * value -1, we skip the disconnect here, and it will be done when the
985  	         * main loop returns (this allows the peer status callback to avoid
986  	         * messing with the peer caches).
987  	         */
988  	        if (exit_status == CRM_EX_OK) {
989  	            pcmk_cluster_disconnect(crm_cluster);
990  	        }
991  	        g_main_loop_quit(mainloop);
992  	        return;
993  	    }
994  	
995  	    /* Exit cleanly. Even the peer status callback can disconnect here, because
996  	     * we're not returning control to the caller.
997  	     */
998  	    pcmk_cluster_disconnect(crm_cluster);
999  	    pcmk__stop_based_ipc(ipcs_ro, ipcs_rw, ipcs_shm);
1000 	    crm_exit(CRM_EX_OK);
1001 	}
1002