1 /*
2 * Copyright 2004-2026 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU Lesser General Public License
7 * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8 */
9
10 #include <crm_internal.h>
11
12 #include <arpa/inet.h>
13 #include <inttypes.h> // PRIu32
14 #include <netdb.h>
15 #include <netinet/in.h>
16 #include <stdbool.h>
17 #include <stdint.h> // uint32_t
18 #include <sys/socket.h>
19 #include <sys/types.h> // size_t
20 #include <sys/utsname.h>
21
22 #include <bzlib.h>
23 #include <corosync/corodefs.h>
24 #include <corosync/corotypes.h>
25 #include <corosync/hdb.h>
26 #include <corosync/cpg.h>
27 #include <qb/qbipc_common.h>
28 #include <qb/qbipcc.h>
29 #include <qb/qbutil.h>
30
31 #include <crm/cluster/internal.h>
32 #include <crm/common/ipc.h>
33 #include <crm/common/mainloop.h>
34 #include <crm/common/xml.h>
35
36 #include "crmcluster_private.h"
37
38 /* @TODO Once we can update the public API to require pcmk_cluster_t* in more
39 * functions, we can ditch this in favor of cluster->cpg_handle.
40 */
41 static cpg_handle_t pcmk_cpg_handle = 0;
42
43 // @TODO These could be moved to pcmk_cluster_t* at that time as well
44 static bool cpg_evicted = false;
45 static GList *cs_message_queue = NULL;
46 static int cs_message_timer = 0;
47
48 /* @COMPAT Any changes to these structs (other than renames) will break all
49 * rolling upgrades, and should be avoided if possible or done at a major
50 * version bump if not
51 */
52
53 struct pcmk__cpg_host_s {
54 uint32_t id;
55 uint32_t pid;
56 gboolean local; // Unused but needed for compatibility
57 enum pcmk_ipc_server type; // For logging only
58 uint32_t size;
59 char uname[MAX_NAME];
60 } __attribute__ ((packed));
61
62 typedef struct pcmk__cpg_host_s pcmk__cpg_host_t;
63
64 struct pcmk__cpg_msg_s {
65 struct qb_ipc_response_header header __attribute__ ((aligned(8)));
66 uint32_t id;
67 gboolean is_compressed;
68
69 pcmk__cpg_host_t host;
70 pcmk__cpg_host_t sender;
71
72 uint32_t size;
73 uint32_t compressed_size;
74 /* 584 bytes */
75 char data[0];
76
77 } __attribute__ ((packed));
78
79 typedef struct pcmk__cpg_msg_s pcmk__cpg_msg_t;
80
81 static void crm_cs_flush(gpointer data);
82
83 #define msg_data_len(msg) (msg->is_compressed?msg->compressed_size:msg->size)
84
85 #define cs_repeat(rc, counter, max, code) do { \
86 rc = code; \
87 if ((rc == CS_ERR_TRY_AGAIN) || (rc == CS_ERR_QUEUE_FULL)) { \
88 counter++; \
89 pcmk__debug("Retrying operation after %ds", counter); \
90 sleep(counter); \
91 } else { \
92 break; \
93 } \
94 } while (counter < max)
95
96 /*!
97 * \internal
98 * \brief Get the local Corosync node ID (via CPG)
99 *
100 * \param[in] handle CPG connection to use (or 0 to use new connection)
101 *
102 * \return Corosync ID of local node (or 0 if not known)
103 */
104 uint32_t
105 pcmk__cpg_local_nodeid(cpg_handle_t handle)
106 {
107 cs_error_t rc = CS_OK;
108 int retries = 0;
109 static uint32_t local_nodeid = 0;
110 cpg_handle_t local_handle = handle;
111 cpg_model_v1_data_t cpg_model_info = {CPG_MODEL_V1, NULL, NULL, NULL, 0};
112 int fd = -1;
113 uid_t found_uid = 0;
114 gid_t found_gid = 0;
115 pid_t found_pid = 0;
116 int rv = 0;
117
118 if (local_nodeid != 0) {
119 return local_nodeid;
120 }
121
122 if (handle == 0) {
123 pcmk__trace("Creating connection");
124 cs_repeat(rc, retries, 5,
125 cpg_model_initialize(&local_handle, CPG_MODEL_V1,
126 (cpg_model_data_t *) &cpg_model_info,
127 NULL));
128 if (rc != CS_OK) {
129 pcmk__err("Could not connect to the CPG API: %s (%d)",
130 pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
131 return 0;
132 }
133
134 rc = cpg_fd_get(local_handle, &fd);
135 if (rc != CS_OK) {
136 pcmk__err("Could not obtain the CPG API connection: %s (%d)",
137 pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
138 goto bail;
139 }
140
141 // CPG provider run as root (at least in given user namespace)?
142 rv = crm_ipc_is_authentic_process(fd, (uid_t) 0, (gid_t) 0, &found_pid,
143 &found_uid, &found_gid);
144 if (rv == 0) {
145 pcmk__err("CPG provider is not authentic: process %lld "
146 "(uid: %lld, gid: %lld)",
147 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
148 (long long) found_uid, (long long) found_gid);
149 goto bail;
150
151 } else if (rv < 0) {
152 pcmk__err("Could not verify authenticity of CPG provider: %s (%d)",
153 strerror(-rv), -rv);
154 goto bail;
155 }
156 }
157
158 if (rc == CS_OK) {
159 retries = 0;
160 pcmk__trace("Performing lookup");
161 cs_repeat(rc, retries, 5, cpg_local_get(local_handle, &local_nodeid));
162 }
163
164 if (rc != CS_OK) {
165 pcmk__err("Could not get local node id from the CPG API: %s (%d)",
166 pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
167 }
168
169 bail:
170 if (handle == 0) {
171 pcmk__trace("Closing connection");
172 cpg_finalize(local_handle);
173 }
174 pcmk__debug("Local nodeid is %u", local_nodeid);
175 return local_nodeid;
176 }
177
178 /*!
179 * \internal
180 * \brief Callback function for Corosync message queue timer
181 *
182 * \param[in] data CPG handle
183 *
184 * \return FALSE (to indicate to glib that timer should not be removed)
185 */
186 static gboolean
187 crm_cs_flush_cb(gpointer data)
188 {
189 cs_message_timer = 0;
190 crm_cs_flush(data);
191 return FALSE;
192 }
193
194 // Send no more than this many CPG messages in one flush
195 #define CS_SEND_MAX 200
196
197 /*!
198 * \internal
199 * \brief Send messages in Corosync CPG message queue
200 *
201 * \param[in] data CPG handle
202 */
203 static void
204 crm_cs_flush(gpointer data)
205 {
206 unsigned int sent = 0;
207 guint queue_len = 0;
208 cs_error_t rc = 0;
209 cpg_handle_t *handle = (cpg_handle_t *) data;
210
211 if (*handle == 0) {
212 pcmk__trace("Connection is dead");
213 return;
214 }
215
216 queue_len = g_list_length(cs_message_queue);
217 if (((queue_len % 1000) == 0) && (queue_len > 1)) {
218 pcmk__err("CPG queue has grown to %d", queue_len);
219
220 } else if (queue_len == CS_SEND_MAX) {
221 pcmk__warn("CPG queue has grown to %d", queue_len);
222 }
223
224 if (cs_message_timer != 0) {
225 /* There is already a timer, wait until it goes off */
226 pcmk__trace("Timer active %d", cs_message_timer);
227 return;
228 }
229
230 while ((cs_message_queue != NULL) && (sent < CS_SEND_MAX)) {
231 struct iovec *iov = cs_message_queue->data;
232
233 rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
234 if (rc != CS_OK) {
235 break;
236 }
237
238 sent++;
239 pcmk__trace("CPG message sent, size=%zu", iov->iov_len);
240
241 cs_message_queue = g_list_remove(cs_message_queue, iov);
242 free(iov->iov_base);
243 free(iov);
244 }
245
246 queue_len -= sent;
247 do_crm_log((queue_len > 5)? LOG_INFO : LOG_TRACE,
248 "Sent %u CPG message%s (%d still queued): %s (rc=%d)",
249 sent, pcmk__plural_s(sent), queue_len,
250 pcmk_rc_str(pcmk__corosync2rc(rc)), (int) rc);
251
252 if (cs_message_queue) {
253 uint32_t delay_ms = 100;
254 if (rc != CS_OK) {
255 /* Proportionally more if sending failed but cap at 1s */
256 delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
257 }
258 cs_message_timer = pcmk__create_timer(delay_ms, crm_cs_flush_cb, data);
259 }
260 }
261
262 /*!
263 * \internal
264 * \brief Dispatch function for CPG handle
265 *
266 * \param[in,out] user_data Cluster object
267 *
268 * \return 0 on success, -1 on error (per mainloop_io_t interface)
269 */
270 static int
271 pcmk_cpg_dispatch(gpointer user_data)
272 {
273 cs_error_t rc = CS_OK;
274 pcmk_cluster_t *cluster = (pcmk_cluster_t *) user_data;
275
276 rc = cpg_dispatch(cluster->priv->cpg_handle, CS_DISPATCH_ONE);
277 if (rc != CS_OK) {
278 pcmk__err("Connection to the CPG API failed: %s (%d)",
279 pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
280 cpg_finalize(cluster->priv->cpg_handle);
281 cluster->priv->cpg_handle = 0;
282 return -1;
283
284 } else if (cpg_evicted) {
285 pcmk__err("Evicted from CPG membership");
286 return -1;
287 }
288 return 0;
289 }
290
291 static inline const char *
292 ais_dest(const pcmk__cpg_host_t *host)
293 {
294 return (host->size > 0)? host->uname : "<all>";
295 }
296
297 static inline const char *
298 msg_type2text(enum pcmk_ipc_server type)
299 {
300 const char *name = pcmk__server_message_type(type);
301
302 return pcmk__s(name, "unknown");
303 }
304
305 /*!
306 * \internal
307 * \brief Check whether a Corosync CPG message is valid
308 *
309 * \param[in] msg Corosync CPG message to check
310 *
311 * \return true if \p msg is valid, otherwise false
312 */
313 static bool
314 check_message_sanity(const pcmk__cpg_msg_t *msg)
315 {
316 int32_t payload_size = msg->header.size - sizeof(pcmk__cpg_msg_t);
317
318 if (payload_size < 1) {
319 pcmk__err("%sCPG message %d from %s invalid: Claimed size of %d bytes "
320 "is too small " QB_XS " from %s[%u] to %s@%s",
321 (msg->is_compressed? "Compressed " : ""),
322 msg->id, ais_dest(&(msg->sender)),
323 (int) msg->header.size,
324 msg_type2text(msg->sender.type), msg->sender.pid,
325 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
326 return false;
327 }
328
329 if (msg->header.error != CS_OK) {
330 pcmk__err("%sCPG message %d from %s invalid: Sender indicated error %d "
331 QB_XS " from %s[%u] to %s@%s",
332 (msg->is_compressed? "Compressed " : ""),
333 msg->id, ais_dest(&(msg->sender)),
334 msg->header.error,
335 msg_type2text(msg->sender.type), msg->sender.pid,
336 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
337 return false;
338 }
339
340 if (msg_data_len(msg) != payload_size) {
341 pcmk__err("%sCPG message %d from %s invalid: Total size %d "
342 "inconsistent with payload size %d "
343 QB_XS " from %s[%u] to %s@%s",
344 (msg->is_compressed? "Compressed " : ""),
345 msg->id, ais_dest(&(msg->sender)),
346 (int) msg->header.size, (int) msg_data_len(msg),
347 msg_type2text(msg->sender.type), msg->sender.pid,
348 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
349 return false;
350 }
351
352 if (!msg->is_compressed &&
353 /* msg->size != (strlen(msg->data) + 1) would be a stronger check,
354 * but checking the last byte or two should be quick
355 */
356 (((msg->size > 1) && (msg->data[msg->size - 2] == '\0'))
357 || (msg->data[msg->size - 1] != '\0'))) {
358 pcmk__err("CPG message %d from %s invalid: Payload does not end at "
359 "byte %" PRIu32 " " QB_XS " from %s[%u] to %s@%s",
360 msg->id, ais_dest(&(msg->sender)), msg->size,
361 msg_type2text(msg->sender.type), msg->sender.pid,
362 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
363 return false;
364 }
365
366 pcmk__trace("Verified %d-byte %sCPG message %d from %s[%u]@%s to %s@%s",
367 (int) msg->header.size,
368 (msg->is_compressed? "compressed " : ""), msg->id,
369 msg_type2text(msg->sender.type), msg->sender.pid,
370 ais_dest(&(msg->sender)),
371 msg_type2text(msg->host.type), ais_dest(&(msg->host)));
372 return true;
373 }
374
375 /*!
376 * \internal
377 * \brief Extract text data from a Corosync CPG message
378 *
379 * \param[in] handle CPG connection (to get local node ID if not known)
380 * \param[in] sender_id Corosync ID of node that sent message
381 * \param[in] pid Process ID of message sender (for logging only)
382 * \param[in,out] content CPG message
383 * \param[out] from If not \c NULL, will be set to sender uname
384 * (valid for the lifetime of \p content)
385 *
386 * \return Newly allocated string with message data, or NULL for errors and
387 * messages not intended for the local node
388 *
389 * \note The caller is responsible for freeing the return value using \c free().
390 */
391 char *
392 pcmk__cpg_message_data(cpg_handle_t handle, uint32_t sender_id, uint32_t pid,
393 void *content, const char **from)
394 {
395 char *data = NULL;
396 pcmk__cpg_msg_t *msg = content;
397
398 if (from != NULL) {
399 *from = NULL;
400 }
401
402 if (handle != 0) {
403 uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
404 const char *local_name = pcmk__cluster_local_node_name();
405
406 // Update or validate message sender ID
407 if (msg->sender.id == 0) {
408 msg->sender.id = sender_id;
409 } else if (msg->sender.id != sender_id) {
410 pcmk__warn("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
411 ": claimed ID %" PRIu32,
412 sender_id, pid, msg->sender.id);
413 return NULL;
414 }
415
416 // Ignore messages that aren't for the local node
417 if ((msg->host.id != 0) && (local_nodeid != msg->host.id)) {
418 pcmk__trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
419 ": for ID %" PRIu32 " not %" PRIu32,
420 sender_id, pid, msg->host.id, local_nodeid);
421 return NULL;
422 }
423 if ((msg->host.size > 0)
424 && !pcmk__str_eq(msg->host.uname, local_name, pcmk__str_casei)) {
425
426 pcmk__trace("Ignoring CPG message from ID %" PRIu32 " PID %" PRIu32
427 ": for name %s not %s",
428 sender_id, pid, msg->host.uname, local_name);
429 return NULL;
430 }
431
432 // Add sender name if not in original message
433 if (msg->sender.size == 0) {
434 const pcmk__node_status_t *peer =
435 pcmk__get_node(sender_id, NULL, NULL,
436 pcmk__node_search_cluster_member);
437
438 if (peer->name == NULL) {
439 pcmk__debug("Received CPG message from node with ID %" PRIu32
440 " but its name is unknown",
441 sender_id);
442 } else {
443 pcmk__debug("Updating name of CPG message sender with ID %" PRIu32
444 " to %s",
445 sender_id, peer->name);
446 msg->sender.size = strlen(peer->name);
447 memset(msg->sender.uname, 0, MAX_NAME);
448 memcpy(msg->sender.uname, peer->name, msg->sender.size);
449 }
450 }
451 }
452
453 // Ensure sender is in peer cache (though it should already be)
454 pcmk__get_node(msg->sender.id, msg->sender.uname, NULL,
455 pcmk__node_search_cluster_member);
456
457 if (from != NULL) {
458 *from = msg->sender.uname;
459 }
460
461 if (!check_message_sanity(msg)) {
462 return NULL;
463 }
464
465 if (msg->is_compressed && (msg->size > 0)) {
466 int rc = BZ_OK;
467 unsigned int new_size = msg->size + 1;
468 char *uncompressed = pcmk__assert_alloc(new_size, sizeof(char));
469
470 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data,
471 msg->compressed_size, 1, 0);
472 rc = pcmk__bzlib2rc(rc);
473 if ((rc == pcmk_rc_ok) && (msg->size != new_size)) { // libbz2 bug?
474 rc = pcmk_rc_compression;
475 }
476 if (rc != pcmk_rc_ok) {
477 free(uncompressed);
478 pcmk__warn("Ignoring compressed CPG message %d from %s (ID %" PRIu32
479 " PID %" PRIu32 "): %s",
480 msg->id, ais_dest(&(msg->sender)), sender_id, pid,
481 pcmk_rc_str(rc));
482 return NULL;
483 }
484 data = uncompressed;
485
486 } else {
487 data = pcmk__str_copy(msg->data);
488 }
489
490 pcmk__trace("Received %sCPG message %d from %s (ID %" PRIu32
491 " PID %" PRIu32 "): %.40s...",
492 (msg->is_compressed? "compressed " : ""),
493 msg->id, ais_dest(&(msg->sender)), sender_id, pid, msg->data);
494 return data;
495 }
496
497 /*!
498 * \internal
499 * \brief Compare cpg_address objects by node ID
500 *
501 * \param[in] first First cpg_address structure to compare
502 * \param[in] second Second cpg_address structure to compare
503 *
504 * \return Negative number if first's node ID is lower,
505 * positive number if first's node ID is greater,
506 * or 0 if both node IDs are equal
507 */
508 static int
509 cmp_member_list_nodeid(const void *first, const void *second)
510 {
511 const struct cpg_address *const a = *((const struct cpg_address **) first),
512 *const b = *((const struct cpg_address **) second);
513 if (a->nodeid < b->nodeid) {
514 return -1;
515 } else if (a->nodeid > b->nodeid) {
516 return 1;
517 }
518 /* don't bother with "reason" nor "pid" */
519 return 0;
520 }
521
522 /*!
523 * \internal
524 * \brief Get a readable string equivalent of a cpg_reason_t value
525 *
526 * \param[in] reason CPG reason value
527 *
528 * \return Readable string suitable for logging
529 */
530 static const char *
531 cpgreason2str(cpg_reason_t reason)
532 {
533 switch (reason) {
534 case CPG_REASON_JOIN: return " via cpg_join";
535 case CPG_REASON_LEAVE: return " via cpg_leave";
536 case CPG_REASON_NODEDOWN: return " via cluster exit";
537 case CPG_REASON_NODEUP: return " via cluster join";
538 case CPG_REASON_PROCDOWN: return " for unknown reason";
539 default: break;
540 }
541 return "";
542 }
543
544 /*!
545 * \internal
546 * \brief Get a log-friendly node name
547 *
548 * \param[in] peer Node to check
549 *
550 * \return Node's uname, or readable string if not known
551 */
552 static inline const char *
553 peer_name(const pcmk__node_status_t *peer)
554 {
555 return (peer != NULL)? pcmk__s(peer->name, "peer node") : "unknown node";
556 }
557
558 /*!
559 * \internal
560 * \brief Process a CPG peer's leaving the cluster
561 *
562 * \param[in] cpg_group_name CPG group name (for logging)
563 * \param[in] event_counter Event number (for logging)
564 * \param[in] local_nodeid Node ID of local node
565 * \param[in] cpg_peer CPG peer that left
566 * \param[in] sorted_member_list List of remaining members, qsort()-ed by ID
567 * \param[in] member_list_entries Number of entries in \p sorted_member_list
568 */
569 static void
570 node_left(const char *cpg_group_name, int event_counter,
571 uint32_t local_nodeid, const struct cpg_address *cpg_peer,
572 const struct cpg_address **sorted_member_list,
573 size_t member_list_entries)
574 {
575 pcmk__node_status_t *peer =
576 pcmk__search_node_caches(cpg_peer->nodeid, NULL, NULL,
577 pcmk__node_search_cluster_member);
578 const struct cpg_address **rival = NULL;
579
580 /* Most CPG-related Pacemaker code assumes that only one process on a node
581 * can be in the process group, but Corosync does not impose this
582 * limitation, and more than one can be a member in practice due to a
583 * daemon attempting to start while another instance is already running.
584 *
585 * Check for any such duplicate instances, because we don't want to process
586 * their leaving as if our actual peer left. If the peer that left still has
587 * an entry in sorted_member_list (with a different PID), we will ignore the
588 * leaving.
589 *
590 * @TODO Track CPG members' PIDs so we can tell exactly who left.
591 */
592 if (peer != NULL) {
593 rival = bsearch(&cpg_peer, sorted_member_list, member_list_entries,
594 sizeof(const struct cpg_address *),
595 cmp_member_list_nodeid);
596 }
597
598 if (rival == NULL) {
599 pcmk__info("Group %s event %d: %s (node %u pid %u) left%s",
600 cpg_group_name, event_counter, peer_name(peer),
601 cpg_peer->nodeid, cpg_peer->pid,
602 cpgreason2str(cpg_peer->reason));
603 if (peer != NULL) {
604 crm_update_peer_proc(__func__, peer, crm_proc_cpg,
605 PCMK_VALUE_OFFLINE);
606 }
607 } else if (cpg_peer->nodeid == local_nodeid) {
608 pcmk__warn("Group %s event %d: duplicate local pid %u left%s",
609 cpg_group_name, event_counter,
610 cpg_peer->pid, cpgreason2str(cpg_peer->reason));
611 } else {
612 pcmk__warn("Group %s event %d: %s (node %u) duplicate pid %u left%s "
613 "(%u remains)",
614 cpg_group_name, event_counter, peer_name(peer),
615 cpg_peer->nodeid, cpg_peer->pid,
616 cpgreason2str(cpg_peer->reason), (*rival)->pid);
617 }
618 }
619
620 /*!
621 * \internal
622 * \brief Handle a CPG configuration change event
623 *
624 * \param[in] handle CPG connection
625 * \param[in] group_name CPG group name
626 * \param[in] member_list List of current CPG members
627 * \param[in] member_list_entries Number of entries in \p member_list
628 * \param[in] left_list List of CPG members that left
629 * \param[in] left_list_entries Number of entries in \p left_list
630 * \param[in] joined_list List of CPG members that joined
631 * \param[in] joined_list_entries Number of entries in \p joined_list
632 *
633 * \note This is of type \c cpg_confchg_fn_t, intended to be used in a
634 * \c cpg_callbacks_t object.
635 */
636 void
637 pcmk__cpg_confchg_cb(cpg_handle_t handle,
638 const struct cpg_name *group_name,
639 const struct cpg_address *member_list,
640 size_t member_list_entries,
641 const struct cpg_address *left_list,
642 size_t left_list_entries,
643 const struct cpg_address *joined_list,
644 size_t joined_list_entries)
645 {
646 static int counter = 0;
647
648 bool found = false;
649 uint32_t local_nodeid = pcmk__cpg_local_nodeid(handle);
650 const struct cpg_address **sorted = NULL;
651
652 sorted = pcmk__assert_alloc(member_list_entries,
653 sizeof(const struct cpg_address *));
654
|
(1) Event path: |
Condition "iter < member_list_entries", taking true branch. |
|
(3) Event path: |
Condition "iter < member_list_entries", taking false branch. |
655 for (size_t iter = 0; iter < member_list_entries; iter++) {
656 sorted[iter] = member_list + iter;
|
(2) Event path: |
Jumping back to the beginning of the loop. |
657 }
658
659 // So that the cross-matching of multiply-subscribed nodes is then cheap
660 qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
661 cmp_member_list_nodeid);
662
|
(4) Event path: |
Condition "i < left_list_entries", taking true branch. |
|
(6) Event path: |
Condition "i < left_list_entries", taking false branch. |
663 for (int i = 0; i < left_list_entries; i++) {
664 node_left(group_name->value, counter, local_nodeid, &left_list[i],
665 sorted, member_list_entries);
|
(5) Event path: |
Jumping back to the beginning of the loop. |
666 }
667
|
CID (unavailable; MK=7d5c0a2eccb043b4c051079114217918) (#1 of 1): Inconsistent C union access (INCONSISTENT_UNION_ACCESS): |
|
(7) Event assign_union_field: |
The union field "in" of "_pp" is written. |
|
(8) Event inconsistent_union_field_access: |
In "_pp.out", the union field used: "out" is inconsistent with the field most recently stored: "in". |
668 g_clear_pointer(&sorted, free);
669
670 for (int i = 0; i < joined_list_entries; i++) {
671 pcmk__info("Group %s event %d: node %u pid %u joined%s",
672 group_name->value, counter, joined_list[i].nodeid,
673 joined_list[i].pid, cpgreason2str(joined_list[i].reason));
674 }
675
676 for (int i = 0; i < member_list_entries; i++) {
677 pcmk__node_status_t *peer =
678 pcmk__get_node(member_list[i].nodeid, NULL, NULL,
679 pcmk__node_search_cluster_member);
680
681 if (member_list[i].nodeid == local_nodeid
682 && member_list[i].pid != getpid()) {
683 // See the note in node_left()
684 pcmk__warn("Group %s event %d: detected duplicate local pid %u",
685 group_name->value, counter, member_list[i].pid);
686 continue;
687 }
688 pcmk__info("Group %s event %d: %s (node %u pid %u) is member",
689 group_name->value, counter, peer_name(peer),
690 member_list[i].nodeid, member_list[i].pid);
691
692 /* If the caller left auto-reaping enabled, this will also update the
693 * state to member.
694 */
695 peer = crm_update_peer_proc(__func__, peer, crm_proc_cpg,
696 PCMK_VALUE_ONLINE);
697
698 if (peer && peer->state && strcmp(peer->state, PCMK_VALUE_MEMBER)) {
699 /* The node is a CPG member, but we currently think it's not a
700 * cluster member. This is possible only if auto-reaping was
701 * disabled. The node may be joining, and we happened to get the CPG
702 * notification before the quorum notification; or the node may have
703 * just died, and we are processing its final messages; or a bug
704 * has affected the peer cache.
705 */
706 time_t now = time(NULL);
707
708 if (peer->when_lost == 0) {
709 // Track when we first got into this contradictory state
710 peer->when_lost = now;
711
712 } else if (now > (peer->when_lost + 60)) {
713 // If it persists for more than a minute, update the state
714 pcmk__warn("Node %u is member of group %s but was believed "
715 "offline",
716 member_list[i].nodeid, group_name->value);
717 pcmk__update_peer_state(__func__, peer, PCMK_VALUE_MEMBER, 0);
718 }
719 }
720
721 if (local_nodeid == member_list[i].nodeid) {
722 found = true;
723 }
724 }
725
726 if (!found) {
727 pcmk__err("Local node was evicted from group %s", group_name->value);
728 cpg_evicted = true;
729 }
730
731 counter++;
732 }
733
734 /*!
735 * \brief Set the CPG deliver callback function for a cluster object
736 *
737 * \param[in,out] cluster Cluster object
738 * \param[in] fn Deliver callback function to set
739 *
740 * \return Standard Pacemaker return code
741 */
742 int
743 pcmk_cpg_set_deliver_fn(pcmk_cluster_t *cluster, cpg_deliver_fn_t fn)
744 {
745 if (cluster == NULL) {
746 return EINVAL;
747 }
748 cluster->cpg.cpg_deliver_fn = fn;
749 return pcmk_rc_ok;
750 }
751
752 /*!
753 * \brief Set the CPG config change callback function for a cluster object
754 *
755 * \param[in,out] cluster Cluster object
756 * \param[in] fn Configuration change callback function to set
757 *
758 * \return Standard Pacemaker return code
759 */
760 int
761 pcmk_cpg_set_confchg_fn(pcmk_cluster_t *cluster, cpg_confchg_fn_t fn)
762 {
763 if (cluster == NULL) {
764 return EINVAL;
765 }
766 cluster->cpg.cpg_confchg_fn = fn;
767 return pcmk_rc_ok;
768 }
769
770 /*!
771 * \brief Connect to Corosync CPG
772 *
773 * \param[in,out] cluster Initialized cluster object to connect
774 *
775 * \return Standard Pacemaker return code
776 */
777 int
778 pcmk__cpg_connect(pcmk_cluster_t *cluster)
779 {
780 cs_error_t rc;
781 int fd = -1;
782 int retries = 0;
783 uint32_t id = 0;
784 pcmk__node_status_t *peer = NULL;
785 cpg_handle_t handle = 0;
786 const char *cpg_group_name = NULL;
787 uid_t found_uid = 0;
788 gid_t found_gid = 0;
789 pid_t found_pid = 0;
790 int rv;
791
792 struct mainloop_fd_callbacks cpg_fd_callbacks = {
793 .dispatch = pcmk_cpg_dispatch,
794 .destroy = cluster->destroy,
795 };
796
797 cpg_model_v1_data_t cpg_model_info = {
798 .model = CPG_MODEL_V1,
799 .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
800 .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
801 .cpg_totem_confchg_fn = NULL,
802 .flags = 0,
803 };
804
805 cpg_evicted = false;
806
807 if (cluster->priv->server != pcmk_ipc_unknown) {
808 cpg_group_name = pcmk__server_message_type(cluster->priv->server);
809 }
810
811 if (cpg_group_name == NULL) {
812 /* The name will already be non-NULL for Pacemaker servers. If a
813 * command-line tool or external caller connects to the cluster,
814 * they will join this CPG group.
815 */
816 cpg_group_name = pcmk__s(crm_system_name, "unknown");
817 }
818 memset(cluster->priv->group.value, 0, 128);
819 strncpy(cluster->priv->group.value, cpg_group_name, 127);
820 cluster->priv->group.length = strlen(cluster->priv->group.value) + 1;
821
822 cs_repeat(rc, retries, 30, cpg_model_initialize(&handle, CPG_MODEL_V1, (cpg_model_data_t *)&cpg_model_info, NULL));
823 if (rc != CS_OK) {
824 pcmk__err("Could not connect to the CPG API: %s (%d)",
825 pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
826 goto bail;
827 }
828
829 rc = cpg_fd_get(handle, &fd);
830 if (rc != CS_OK) {
831 pcmk__err("Could not obtain the CPG API connection: %s (%d)",
832 pcmk_rc_str(pcmk__corosync2rc(rc)), rc);
833 goto bail;
834 }
835
836 /* CPG provider run as root (in given user namespace, anyway)? */
837 if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
838 &found_uid, &found_gid))) {
839 pcmk__err("CPG provider is not authentic: process %lld "
840 "(uid: %lld, gid: %lld)",
841 (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
842 (long long) found_uid, (long long) found_gid);
843 rc = CS_ERR_ACCESS;
844 goto bail;
845 } else if (rv < 0) {
846 pcmk__err("Could not verify authenticity of CPG provider: %s (%d)",
847 strerror(-rv), -rv);
848 rc = CS_ERR_ACCESS;
849 goto bail;
850 }
851
852 id = pcmk__cpg_local_nodeid(handle);
853 if (id == 0) {
854 pcmk__err("Could not get local node id from the CPG API");
855 goto bail;
856
857 }
858 cluster->priv->node_id = id;
859
860 retries = 0;
861 cs_repeat(rc, retries, 30, cpg_join(handle, &cluster->priv->group));
862 if (rc != CS_OK) {
863 pcmk__err("Could not join the CPG group '%s': %d", cpg_group_name, rc);
864 goto bail;
865 }
866
867 pcmk_cpg_handle = handle;
868 cluster->priv->cpg_handle = handle;
869 mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
870
871 bail:
872 if (rc != CS_OK) {
873 cpg_finalize(handle);
874 // @TODO Map rc to more specific Pacemaker return code
875 return ENOTCONN;
876 }
877
878 peer = pcmk__get_node(id, NULL, NULL, pcmk__node_search_cluster_member);
879 crm_update_peer_proc(__func__, peer, crm_proc_cpg, PCMK_VALUE_ONLINE);
880 return pcmk_rc_ok;
881 }
882
883 /*!
884 * \internal
885 * \brief Disconnect from Corosync CPG
886 *
887 * \param[in,out] cluster Cluster object to disconnect
888 */
889 void
890 pcmk__cpg_disconnect(pcmk_cluster_t *cluster)
891 {
892 pcmk_cpg_handle = 0;
893 if (cluster->priv->cpg_handle != 0) {
894 pcmk__trace("Disconnecting CPG");
895 cpg_leave(cluster->priv->cpg_handle, &cluster->priv->group);
896 cpg_finalize(cluster->priv->cpg_handle);
897 cluster->priv->cpg_handle = 0;
898
899 } else {
900 pcmk__info("No CPG connection");
901 }
902 }
903
904 /*!
905 * \internal
906 * \brief Send string data via Corosync CPG
907 *
908 * \param[in] data Data to send
909 * \param[in] node Cluster node to send message to
910 * \param[in] dest Type of message to send
911 *
912 * \return \c true on success, or \c false otherwise
913 */
914 static bool
915 send_cpg_text(const char *data, const pcmk__node_status_t *node,
916 enum pcmk_ipc_server dest)
917 {
918 static int msg_id = 0;
919 static int local_pid = 0;
920 static int local_name_len = 0;
921 static const char *local_name = NULL;
922
923 char *target = NULL;
924 struct iovec *iov;
925 pcmk__cpg_msg_t *msg = NULL;
926
927 if (local_name == NULL) {
928 local_name = pcmk__cluster_local_node_name();
929 }
930 if ((local_name_len == 0) && (local_name != NULL)) {
931 local_name_len = strlen(local_name);
932 }
933
934 if (data == NULL) {
935 data = "";
936 }
937
938 if (local_pid == 0) {
939 local_pid = getpid();
940 }
941
942 msg = pcmk__assert_alloc(1, sizeof(pcmk__cpg_msg_t));
943
944 msg_id++;
945 msg->id = msg_id;
946 msg->header.error = CS_OK;
947
948 msg->host.type = dest;
949
950 if (node != NULL) {
951 if (node->name != NULL) {
952 target = pcmk__str_copy(node->name);
953 msg->host.size = strlen(node->name);
954 memset(msg->host.uname, 0, MAX_NAME);
955 memcpy(msg->host.uname, node->name, msg->host.size);
956
957 } else {
958 target = pcmk__assert_asprintf("%" PRIu32, node->cluster_layer_id);
959 }
960 msg->host.id = node->cluster_layer_id;
961
962 } else {
963 target = pcmk__str_copy("all");
964 }
965
966 msg->sender.id = 0;
967 msg->sender.type = pcmk__parse_server(crm_system_name);
968 msg->sender.pid = local_pid;
969 msg->sender.size = local_name_len;
970 memset(msg->sender.uname, 0, MAX_NAME);
971
972 if ((local_name != NULL) && (msg->sender.size != 0)) {
973 memcpy(msg->sender.uname, local_name, msg->sender.size);
974 }
975
976 msg->size = 1 + strlen(data);
977 msg->header.size = sizeof(pcmk__cpg_msg_t) + msg->size;
978
979 if (msg->size < PCMK__BZ2_THRESHOLD) {
980 msg = pcmk__realloc(msg, msg->header.size);
981 memcpy(msg->data, data, msg->size);
982
983 } else {
984 char *compressed = NULL;
985 unsigned int new_size = 0;
986
987 if (pcmk__compress(data, (unsigned int) msg->size, 0, &compressed,
988 &new_size) == pcmk_rc_ok) {
989
990 msg->header.size = sizeof(pcmk__cpg_msg_t) + new_size;
991 msg = pcmk__realloc(msg, msg->header.size);
992 memcpy(msg->data, compressed, new_size);
993
994 msg->is_compressed = TRUE;
995 msg->compressed_size = new_size;
996
997 } else {
998 msg = pcmk__realloc(msg, msg->header.size);
999 memcpy(msg->data, data, msg->size);
1000 }
1001
1002 free(compressed);
1003 }
1004
1005 iov = pcmk__assert_alloc(1, sizeof(struct iovec));
1006 iov->iov_base = msg;
1007 iov->iov_len = msg->header.size;
1008
1009 if (msg->compressed_size > 0) {
1010 pcmk__trace("Queueing CPG message %" PRIu32 " to %s "
1011 "(%zu bytes, %" PRIu32 " bytes compressed payload): %.200s",
1012 msg->id, target, iov->iov_len, msg->compressed_size, data);
1013 } else {
1014 pcmk__trace("Queueing CPG message %" PRIu32 " to %s "
1015 "(%zu bytes, %" PRIu32 " bytes payload): %.200s",
1016 msg->id, target, iov->iov_len, msg->size, data);
1017 }
1018
1019 free(target);
1020
1021 cs_message_queue = g_list_append(cs_message_queue, iov);
1022 crm_cs_flush(&pcmk_cpg_handle);
1023
1024 return true;
1025 }
1026
1027 /*!
1028 * \internal
1029 * \brief Send an XML message via Corosync CPG
1030 *
1031 * \param[in] msg XML message to send
1032 * \param[in] node Cluster node to send message to
1033 * \param[in] dest Type of message to send
1034 *
1035 * \return TRUE on success, otherwise FALSE
1036 */
1037 bool
1038 pcmk__cpg_send_xml(const xmlNode *msg, const pcmk__node_status_t *node,
1039 enum pcmk_ipc_server dest)
1040 {
1041 bool rc = true;
1042 GString *data = g_string_sized_new(1024);
1043
1044 pcmk__xml_string(msg, 0, data, 0);
1045
1046 rc = send_cpg_text(data->str, node, dest);
1047 g_string_free(data, TRUE);
1048 return rc;
1049 }
1050