1 /*
2 * Copyright 2022-2023 the Pacemaker project contributors
3 *
4 * The version control history for this file may have further details.
5 *
6 * This source code is licensed under the GNU General Public License version 2
7 * or later (GPLv2+) WITHOUT ANY WARRANTY.
8 */
9
10 #include <crm_internal.h>
11
12 #include <crm/msg_xml.h>
13 #include <crm/common/attrd_internal.h>
14
15 #include "pacemaker-attrd.h"
16
17 /* A hash table storing clients that are waiting on a sync point to be reached.
18 * The key is waitlist_client - just a plain int. The obvious key would be
19 * the IPC client's ID, but this is not guaranteed to be unique. A single client
20 * could be waiting on a sync point for multiple attributes at the same time.
21 *
22 * It is not expected that this hash table will ever be especially large.
23 */
24 static GHashTable *waitlist = NULL;
25 static int waitlist_client = 0;
26
27 struct waitlist_node {
28 /* What kind of sync point does this node describe? */
29 enum attrd_sync_point sync_point;
30
31 /* Information required to construct and send a reply to the client. */
32 char *client_id;
33 uint32_t ipc_id;
34 uint32_t flags;
35 };
36
37 /* A hash table storing information on in-progress IPC requests that are awaiting
38 * confirmations. These requests are currently being processed by peer attrds and
39 * we are waiting to receive confirmation messages from each peer indicating that
40 * processing is complete.
41 *
42 * Multiple requests could be waiting on confirmations at the same time.
43 *
44 * The key is the unique callid for the IPC request, and the value is a
45 * confirmation_action struct.
46 */
47 static GHashTable *expected_confirmations = NULL;
48
49 /*!
50 * \internal
51 * \brief A structure describing a single IPC request that is awaiting confirmations
52 */
53 struct confirmation_action {
54 /*!
55 * \brief A list of peer attrds that we are waiting to receive confirmation
56 * messages from
57 *
58 * This list is dynamic - as confirmations arrive from peer attrds, they will
59 * be removed from this list. When the list is empty, all peers have processed
60 * the request and the associated confirmation action will be taken.
61 */
62 GList *respondents;
63
64 /*!
65 * \brief A timer that will be used to remove the client should it time out
66 * before receiving all confirmations
67 */
68 mainloop_timer_t *timer;
69
70 /*!
71 * \brief A function to run when all confirmations have been received
72 */
73 attrd_confirmation_action_fn fn;
74
75 /*!
76 * \brief Information required to construct and send a reply to the client
77 */
78 char *client_id;
79 uint32_t ipc_id;
80 uint32_t flags;
81
82 /*!
83 * \brief The XML request containing the callid associated with this action
84 */
85 void *xml;
86 };
87
88 static void
89 next_key(void)
90 {
91 do {
92 waitlist_client++;
93 if (waitlist_client < 0) {
94 waitlist_client = 1;
95 }
96 } while (g_hash_table_contains(waitlist, GINT_TO_POINTER(waitlist_client)));
97 }
98
99 static void
100 free_waitlist_node(gpointer data)
101 {
102 struct waitlist_node *wl = (struct waitlist_node *) data;
103
104 free(wl->client_id);
105 free(wl);
106 }
107
108 static const char *
109 sync_point_str(enum attrd_sync_point sync_point)
110 {
111 if (sync_point == attrd_sync_point_local) {
112 return PCMK__VALUE_LOCAL;
113 } else if (sync_point == attrd_sync_point_cluster) {
114 return PCMK__VALUE_CLUSTER;
115 } else {
116 return "unknown";
117 }
118 }
119
120 /*!
121 * \internal
122 * \brief Add a client to the attrd waitlist
123 *
124 * Typically, a client receives an ACK for its XML IPC request immediately. However,
125 * some clients want to wait until their request has been processed and taken effect.
126 * This is called a sync point. Any client placed on this waitlist will have its
127 * ACK message delayed until either its requested sync point is hit, or until it
128 * times out.
129 *
130 * The XML IPC request must specify the type of sync point it wants to wait for.
131 *
132 * \param[in,out] request The request describing the client to place on the waitlist.
133 */
134 void
135 attrd_add_client_to_waitlist(pcmk__request_t *request)
136 {
137 const char *sync_point = attrd_request_sync_point(request->xml);
138 struct waitlist_node *wl = NULL;
139
140 if (sync_point == NULL) {
141 return;
142 }
143
144 if (waitlist == NULL) {
145 waitlist = pcmk__intkey_table(free_waitlist_node);
146 }
147
148 wl = calloc(sizeof(struct waitlist_node), 1);
149
150 CRM_ASSERT(wl != NULL);
151
152 wl->client_id = strdup(request->ipc_client->id);
153
154 CRM_ASSERT(wl->client_id);
155
156 if (pcmk__str_eq(sync_point, PCMK__VALUE_LOCAL, pcmk__str_none)) {
157 wl->sync_point = attrd_sync_point_local;
158 } else if (pcmk__str_eq(sync_point, PCMK__VALUE_CLUSTER, pcmk__str_none)) {
159 wl->sync_point = attrd_sync_point_cluster;
160 } else {
161 free_waitlist_node(wl);
162 return;
163 }
164
165 wl->ipc_id = request->ipc_id;
166 wl->flags = request->flags;
167
168 next_key();
169 pcmk__intkey_table_insert(waitlist, waitlist_client, wl);
170
171 crm_trace("Added client %s to waitlist for %s sync point",
172 wl->client_id, sync_point_str(wl->sync_point));
173 crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist));
174
175 /* And then add the key to the request XML so we can uniquely identify
176 * it when it comes time to issue the ACK.
177 */
178 crm_xml_add_int(request->xml, XML_LRM_ATTR_CALLID, waitlist_client);
179 }
180
181 /*!
182 * \internal
183 * \brief Free all memory associated with the waitlist. This is most typically
184 * used when attrd shuts down.
185 */
186 void
187 attrd_free_waitlist(void)
188 {
189 if (waitlist == NULL) {
190 return;
191 }
192
193 g_hash_table_destroy(waitlist);
194 waitlist = NULL;
195 }
196
197 /*!
198 * \internal
199 * \brief Unconditionally remove a client from the waitlist, such as when the client
200 * node disconnects from the cluster
201 *
202 * \param[in] client The client to remove
203 */
204 void
205 attrd_remove_client_from_waitlist(pcmk__client_t *client)
206 {
207 GHashTableIter iter;
208 gpointer value;
209
210 if (waitlist == NULL) {
211 return;
212 }
213
214 g_hash_table_iter_init(&iter, waitlist);
215
216 while (g_hash_table_iter_next(&iter, NULL, &value)) {
217 struct waitlist_node *wl = (struct waitlist_node *) value;
218
219 if (pcmk__str_eq(wl->client_id, client->id, pcmk__str_none)) {
220 g_hash_table_iter_remove(&iter);
221 crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist));
222 }
223 }
224 }
225
226 /*!
227 * \internal
228 * \brief Send an IPC ACK message to all awaiting clients
229 *
230 * This function will search the waitlist for all clients that are currently awaiting
231 * an ACK indicating their attrd operation is complete. Only those clients with a
232 * matching sync point type and callid from their original XML IPC request will be
233 * ACKed. Once they have received an ACK, they will be removed from the waitlist.
234 *
235 * \param[in] sync_point What kind of sync point have we hit?
236 * \param[in] xml The original XML IPC request.
237 */
238 void
239 attrd_ack_waitlist_clients(enum attrd_sync_point sync_point, const xmlNode *xml)
240 {
241 int callid;
242 gpointer value;
243
244 if (waitlist == NULL) {
245 return;
246 }
247
248 if (crm_element_value_int(xml, XML_LRM_ATTR_CALLID, &callid) == -1) {
249 crm_warn("Could not get callid from request XML");
250 return;
251 }
252
253 value = pcmk__intkey_table_lookup(waitlist, callid);
254 if (value != NULL) {
255 struct waitlist_node *wl = (struct waitlist_node *) value;
256 pcmk__client_t *client = NULL;
257
258 if (wl->sync_point != sync_point) {
259 return;
260 }
261
262 crm_notice("Alerting client %s for reached %s sync point",
263 wl->client_id, sync_point_str(wl->sync_point));
264
265 client = pcmk__find_client_by_id(wl->client_id);
266 if (client == NULL) {
267 return;
268 }
269
270 attrd_send_ack(client, wl->ipc_id, wl->flags | crm_ipc_client_response);
271
272 /* And then remove the client so it doesn't get alerted again. */
273 pcmk__intkey_table_remove(waitlist, callid);
274
275 crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist));
276 }
277 }
278
279 /*!
280 * \internal
281 * \brief Action to take when a cluster sync point is hit for a
282 * PCMK__ATTRD_CMD_UPDATE* message.
283 *
284 * \param[in] xml The request that should be passed along to
285 * attrd_ack_waitlist_clients. This should be the original
286 * IPC request containing the callid for this update message.
287 */
288 int
289 attrd_cluster_sync_point_update(xmlNode *xml)
290 {
291 crm_trace("Hit cluster sync point for attribute update");
292 attrd_ack_waitlist_clients(attrd_sync_point_cluster, xml);
293 return pcmk_rc_ok;
294 }
295
296 /*!
297 * \internal
298 * \brief Return the sync point attribute for an IPC request
299 *
300 * This function will check both the top-level element of \p xml for a sync
301 * point attribute, as well as all of its \p op children, if any. The latter
302 * is useful for newer versions of attrd that can put multiple IPC requests
303 * into a single message.
304 *
305 * \param[in] xml An XML IPC request
306 *
307 * \note It is assumed that if one child element has a sync point attribute,
308 * all will have a sync point attribute and they will all be the same
309 * sync point. No other configuration is supported.
310 *
311 * \return The sync point attribute of \p xml, or NULL if none.
312 */
313 const char *
314 attrd_request_sync_point(xmlNode *xml)
315 {
316 CRM_CHECK(xml != NULL, return NULL);
317
318 if (xml->children != NULL) {
319 xmlNode *child = pcmk__xe_match(xml, XML_ATTR_OP, PCMK__XA_ATTR_SYNC_POINT, NULL);
320
321 if (child) {
322 return crm_element_value(child, PCMK__XA_ATTR_SYNC_POINT);
323 } else {
324 return NULL;
325 }
326
327 } else {
328 return crm_element_value(xml, PCMK__XA_ATTR_SYNC_POINT);
329 }
330 }
331
332 /*!
333 * \internal
334 * \brief Does an IPC request contain any sync point attribute?
335 *
336 * \param[in] xml An XML IPC request
337 *
338 * \return true if there's a sync point attribute, false otherwise
339 */
340 bool
341 attrd_request_has_sync_point(xmlNode *xml)
342 {
343 return attrd_request_sync_point(xml) != NULL;
344 }
345
346 static void
347 free_action(gpointer data)
348 {
349 struct confirmation_action *action = (struct confirmation_action *) data;
350 g_list_free_full(action->respondents, free);
351 mainloop_timer_del(action->timer);
352 free_xml(action->xml);
353 free(action->client_id);
354 free(action);
355 }
356
357 /* Remove an IPC request from the expected_confirmations table if the peer attrds
358 * don't respond before the timeout is hit. We set the timeout to 15s. The exact
359 * number isn't critical - we just want to make sure that the table eventually gets
360 * cleared of things that didn't complete.
361 */
362 static gboolean
363 confirmation_timeout_cb(gpointer data)
364 {
365 struct confirmation_action *action = (struct confirmation_action *) data;
366
367 GHashTableIter iter;
368 gpointer value;
369
370 if (expected_confirmations == NULL) {
371 return G_SOURCE_REMOVE;
372 }
373
374 g_hash_table_iter_init(&iter, expected_confirmations);
375
376 while (g_hash_table_iter_next(&iter, NULL, &value)) {
377 if (value == action) {
378 pcmk__client_t *client = pcmk__find_client_by_id(action->client_id);
379 if (client == NULL) {
380 return G_SOURCE_REMOVE;
381 }
382
383 crm_trace("Timed out waiting for confirmations for client %s", client->id);
384 pcmk__ipc_send_ack(client, action->ipc_id, action->flags | crm_ipc_client_response,
385 "ack", ATTRD_PROTOCOL_VERSION, CRM_EX_TIMEOUT);
386
387 g_hash_table_iter_remove(&iter);
388 crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
389 break;
390 }
391 }
392
393 return G_SOURCE_REMOVE;
394 }
395
396 /*!
397 * \internal
398 * \brief When a peer disconnects from the cluster, no longer wait for its confirmation
399 * for any IPC action. If this peer is the last one being waited on, this will
400 * trigger the confirmation action.
401 *
402 * \param[in] host The disconnecting peer attrd's uname
403 */
404 void
405 attrd_do_not_expect_from_peer(const char *host)
406 {
407 GList *keys = NULL;
408
409 if (expected_confirmations == NULL) {
410 return;
411 }
412
413 keys = g_hash_table_get_keys(expected_confirmations);
414
415 crm_trace("Removing peer %s from expected confirmations", host);
416
417 for (GList *node = keys; node != NULL; node = node->next) {
418 int callid = *(int *) node->data;
419 attrd_handle_confirmation(callid, host);
420 }
421
422 g_list_free(keys);
423 }
424
425 /*!
426 * \internal
427 * \brief When a client disconnects from the cluster, no longer wait on confirmations
428 * for it. Because the peer attrds may still be processing the original IPC
429 * message, they may still send us confirmations. However, we will take no
430 * action on them.
431 *
432 * \param[in] client The disconnecting client
433 */
434 void
435 attrd_do_not_wait_for_client(pcmk__client_t *client)
436 {
437 GHashTableIter iter;
438 gpointer value;
439
440 if (expected_confirmations == NULL) {
441 return;
442 }
443
444 g_hash_table_iter_init(&iter, expected_confirmations);
445
446 while (g_hash_table_iter_next(&iter, NULL, &value)) {
447 struct confirmation_action *action = (struct confirmation_action *) value;
448
449 if (pcmk__str_eq(action->client_id, client->id, pcmk__str_none)) {
450 crm_trace("Removing client %s from expected confirmations", client->id);
451 g_hash_table_iter_remove(&iter);
452 crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
453 break;
454 }
455 }
456 }
457
458 /*!
459 * \internal
460 * \brief Register some action to be taken when IPC request confirmations are
461 * received
462 *
463 * When this function is called, a list of all peer attrds that support confirming
464 * requests is generated. As confirmations from these peer attrds are received,
465 * they are removed from this list. When the list is empty, the registered action
466 * will be called.
467 *
468 * \note This function should always be called before attrd_send_message is called
469 * to broadcast to the peers to ensure that we know what replies we are
470 * waiting on. Otherwise, it is possible the peer could finish and confirm
471 * before we know to expect it.
472 *
473 * \param[in] request The request that is awaiting confirmations
474 * \param[in] fn A function to be run after all confirmations are received
475 */
476 void
477 attrd_expect_confirmations(pcmk__request_t *request, attrd_confirmation_action_fn fn)
478 {
479 struct confirmation_action *action = NULL;
480 GHashTableIter iter;
481 gpointer host, ver;
482 GList *respondents = NULL;
483 int callid;
484
485 if (expected_confirmations == NULL) {
486 expected_confirmations = pcmk__intkey_table((GDestroyNotify) free_action);
487 }
488
489 if (crm_element_value_int(request->xml, XML_LRM_ATTR_CALLID, &callid) == -1) {
490 crm_err("Could not get callid from xml");
491 return;
492 }
493
494 if (pcmk__intkey_table_lookup(expected_confirmations, callid)) {
495 crm_err("Already waiting on confirmations for call id %d", callid);
496 return;
497 }
498
499 g_hash_table_iter_init(&iter, peer_protocol_vers);
500 while (g_hash_table_iter_next(&iter, &host, &ver)) {
501 if (ATTRD_SUPPORTS_CONFIRMATION(GPOINTER_TO_INT(ver))) {
502 char *s = strdup((char *) host);
503
504 CRM_ASSERT(s != NULL);
505 respondents = g_list_prepend(respondents, s);
506 }
507 }
508
509 action = calloc(1, sizeof(struct confirmation_action));
510 CRM_ASSERT(action != NULL);
511
512 action->respondents = respondents;
513 action->fn = fn;
514 action->xml = copy_xml(request->xml);
515
516 action->client_id = strdup(request->ipc_client->id);
517 CRM_ASSERT(action->client_id != NULL);
518
519 action->ipc_id = request->ipc_id;
520 action->flags = request->flags;
521
522 action->timer = mainloop_timer_add(NULL, 15000, FALSE, confirmation_timeout_cb, action);
523 mainloop_timer_start(action->timer);
524
525 pcmk__intkey_table_insert(expected_confirmations, callid, action);
526 crm_trace("Callid %d now waiting on %d confirmations", callid, g_list_length(respondents));
527 crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
528 }
529
530 void
531 attrd_free_confirmations(void)
532 {
533 if (expected_confirmations != NULL) {
534 g_hash_table_destroy(expected_confirmations);
535 expected_confirmations = NULL;
536 }
537 }
538
539 /*!
540 * \internal
541 * \brief Process a confirmation message from a peer attrd
542 *
543 * This function is called every time a PCMK__ATTRD_CMD_CONFIRM message is
544 * received from a peer attrd. If this is the last confirmation we are waiting
545 * on for a given operation, the registered action will be called.
546 *
547 * \param[in] callid The unique callid for the XML IPC request
548 * \param[in] host The confirming peer attrd's uname
549 */
550 void
551 attrd_handle_confirmation(int callid, const char *host)
552 {
553 struct confirmation_action *action = NULL;
554 GList *node = NULL;
555
556 if (expected_confirmations == NULL) {
557 return;
558 }
559
560 action = pcmk__intkey_table_lookup(expected_confirmations, callid);
561 if (action == NULL) {
562 return;
563 }
564
565 node = g_list_find_custom(action->respondents, host, (GCompareFunc) strcasecmp);
566
567 if (node == NULL) {
568 return;
569 }
570
571 action->respondents = g_list_remove(action->respondents, node->data);
572 crm_trace("Callid %d now waiting on %d confirmations", callid, g_list_length(action->respondents));
573
574 if (action->respondents == NULL) {
575 action->fn(action->xml);
576 pcmk__intkey_table_remove(expected_confirmations, callid);
577 crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
578 }
579 }
580