1    	/*
2    	 * Copyright 2004-2026 the Pacemaker project contributors
3    	 *
4    	 * The version control history for this file may have further details.
5    	 *
6    	 * This source code is licensed under the GNU General Public License version 2
7    	 * or later (GPLv2+) WITHOUT ANY WARRANTY.
8    	 */
9    	
10   	#include <crm_internal.h>
11   	
12   	#include <regex.h>
13   	#include <stdbool.h>
14   	#include <sys/param.h>
15   	#include <sys/types.h>
16   	#include <sys/wait.h>
17   	
18   	#include <crm/crm.h>
19   	#include <crm/lrmd.h>           // lrmd_event_data_t, lrmd_rsc_info_t, etc.
20   	#include <crm/services.h>
21   	#include <crm/common/xml.h>
22   	#include <crm/lrmd_internal.h>
23   	
24   	#include <pacemaker-internal.h>
25   	#include <pacemaker-controld.h>
26   	
27   	#define START_DELAY_THRESHOLD 5 * 60 * 1000
28   	#define MAX_LRM_REG_FAILS 30
29   	
30   	struct delete_event_s {
31   	    int rc;
32   	    const char *rsc;
33   	    lrm_state_t *lrm_state;
34   	};
35   	
36   	static gboolean is_rsc_active(lrm_state_t * lrm_state, const char *rsc_id);
37   	static gboolean build_active_RAs(lrm_state_t * lrm_state, xmlNode * rsc_list);
38   	static gboolean stop_recurring_actions(gpointer key, gpointer value, gpointer user_data);
39   	
40   	static lrmd_event_data_t *construct_op(const lrm_state_t *lrm_state,
41   	                                       const xmlNode *rsc_op,
42   	                                       const char *rsc_id,
43   	                                       const char *operation);
44   	static void do_lrm_rsc_op(lrm_state_t *lrm_state, lrmd_rsc_info_t *rsc,
45   	                          xmlNode *msg, struct ra_metadata_s *md);
46   	
47   	static gboolean lrm_state_verify_stopped(lrm_state_t * lrm_state, enum crmd_fsa_state cur_state,
48   	                                         int log_level);
49   	
50   	static void
51   	lrm_connection_destroy(void)
52   	{
53   	    if (pcmk__is_set(controld_globals.fsa_input_register, R_LRM_CONNECTED)) {
54   	        pcmk__crit("Lost connection to local executor");
55   	        controld_fsa_append(C_FSA_INTERNAL, I_ERROR, NULL);
56   	        controld_clear_fsa_input_flags(R_LRM_CONNECTED);
57   	    }
58   	}
59   	
60   	static char *
61   	make_stop_id(const char *rsc, int call_id)
62   	{
63   	    return pcmk__assert_asprintf("%s:%d", rsc, call_id);
64   	}
65   	
66   	static void
67   	copy_instance_keys(gpointer key, gpointer value, gpointer user_data)
68   	{
69   	    if (!g_str_has_prefix(key, CRM_META "_")) {
70   	        pcmk__insert_dup(user_data, (const char *) key, (const char *) value);
71   	    }
72   	}
73   	
74   	static void
75   	copy_meta_keys(gpointer key, gpointer value, gpointer user_data)
76   	{
77   	    if (g_str_has_prefix(key, CRM_META "_")) {
78   	        pcmk__insert_dup(user_data, (const char *) key, (const char *) value);
79   	    }
80   	}
81   	
82   	/*!
83   	 * \internal
84   	 * \brief Remove a recurring operation from a resource's history
85   	 *
86   	 * \param[in,out] history  Resource history to modify
87   	 * \param[in]     op       Operation to remove
88   	 *
89   	 * \return TRUE if the operation was found and removed, FALSE otherwise
90   	 */
91   	static gboolean
92   	history_remove_recurring_op(rsc_history_t *history, const lrmd_event_data_t *op)
93   	{
94   	    GList *iter;
95   	
96   	    for (iter = history->recurring_op_list; iter != NULL; iter = iter->next) {
97   	        lrmd_event_data_t *existing = iter->data;
98   	
99   	        if ((op->interval_ms == existing->interval_ms)
100  	            && pcmk__str_eq(op->rsc_id, existing->rsc_id, pcmk__str_none)
101  	            && pcmk__str_eq(op->op_type, existing->op_type, pcmk__str_casei)) {
102  	
103  	            history->recurring_op_list = g_list_delete_link(history->recurring_op_list, iter);
104  	            lrmd_free_event(existing);
105  	            return TRUE;
106  	        }
107  	    }
108  	    return FALSE;
109  	}
110  	
111  	/*!
112  	 * \internal
113  	 * \brief Free all recurring operations in resource history
114  	 *
115  	 * \param[in,out] history  Resource history to modify
116  	 */
117  	static void
118  	history_free_recurring_ops(rsc_history_t *history)
119  	{
120  	    for (GList *iter = history->recurring_op_list; iter != NULL;
121  	         iter = iter->next) {
122  	
123  	        lrmd_free_event(iter->data);
124  	    }
125  	
126  	    g_clear_pointer(&history->recurring_op_list, g_list_free);
127  	}
128  	
129  	/*!
130  	 * \internal
131  	 * \brief Free resource history
132  	 *
133  	 * \param[in,out] history  Resource history to free
134  	 */
135  	void
136  	history_free(gpointer data)
137  	{
138  	    rsc_history_t *history = (rsc_history_t*)data;
139  	
CID (unavailable; MK=362ee0c1d3986b6390492392f71387f1) (#1 of 1): Inconsistent C union access (INCONSISTENT_UNION_ACCESS):
(1) Event assign_union_field: The union field "in" of "_pp" is written.
(2) Event inconsistent_union_field_access: In "_pp.out", the union field used: "out" is inconsistent with the field most recently stored: "in".
140  	    g_clear_pointer(&history->stop_params, g_hash_table_destroy);
141  	
142  	    /* Don't need to free history->rsc.id because it's set to history->id */
143  	    free(history->rsc.type);
144  	    free(history->rsc.standard);
145  	    free(history->rsc.provider);
146  	
147  	    lrmd_free_event(history->failed);
148  	    lrmd_free_event(history->last);
149  	    free(history->id);
150  	    history_free_recurring_ops(history);
151  	    free(history);
152  	}
153  	
154  	static void
155  	update_history_cache(lrm_state_t * lrm_state, lrmd_rsc_info_t * rsc, lrmd_event_data_t * op)
156  	{
157  	    int target_rc = 0;
158  	    rsc_history_t *entry = NULL;
159  	
160  	    if (op->rsc_deleted) {
161  	        pcmk__debug("Purged history for '%s' after %s", op->rsc_id,
162  	                    op->op_type);
163  	        controld_delete_resource_history(op->rsc_id, lrm_state->node_name,
164  	                                         NULL, crmd_cib_smart_opt());
165  	        return;
166  	    }
167  	
168  	    if (pcmk__str_eq(op->op_type, PCMK_ACTION_NOTIFY, pcmk__str_casei)) {
169  	        return;
170  	    }
171  	
172  	    pcmk__debug("Updating history for '%s' with %s op", op->rsc_id,
173  	                op->op_type);
174  	
175  	    entry = g_hash_table_lookup(lrm_state->resource_history, op->rsc_id);
176  	    if (entry == NULL && rsc) {
177  	        entry = pcmk__assert_alloc(1, sizeof(rsc_history_t));
178  	        entry->id = pcmk__str_copy(op->rsc_id);
179  	        g_hash_table_insert(lrm_state->resource_history, entry->id, entry);
180  	
181  	        entry->rsc.id = entry->id;
182  	        entry->rsc.type = pcmk__str_copy(rsc->type);
183  	        entry->rsc.standard = pcmk__str_copy(rsc->standard);
184  	        entry->rsc.provider = pcmk__str_copy(rsc->provider);
185  	
186  	    } else if (entry == NULL) {
187  	        pcmk__info("Resource %s no longer exists, not updating cache",
188  	                   op->rsc_id);
189  	        return;
190  	    }
191  	
192  	    entry->last_callid = op->call_id;
193  	    target_rc = rsc_op_expected_rc(op);
194  	    if (op->op_status == PCMK_EXEC_CANCELLED) {
195  	        if (op->interval_ms > 0) {
196  	            pcmk__trace("Removing cancelled recurring op: " PCMK__OP_FMT,
197  	                        op->rsc_id, op->op_type, op->interval_ms);
198  	            history_remove_recurring_op(entry, op);
199  	            return;
200  	        } else {
201  	            pcmk__trace("Skipping " PCMK__OP_FMT " rc=%d, status=%d",
202  	                        op->rsc_id, op->op_type, op->interval_ms, op->rc,
203  	                        op->op_status);
204  	        }
205  	
206  	    } else if (did_rsc_op_fail(op, target_rc)) {
207  	        /* Store failed monitors here, otherwise the block below will cause them
208  	         * to be forgotten when a stop happens.
209  	         */
210  	        lrmd_free_event(entry->failed);
211  	        entry->failed = lrmd_copy_event(op);
212  	
213  	    } else if (op->interval_ms == 0) {
214  	        lrmd_free_event(entry->last);
215  	        entry->last = lrmd_copy_event(op);
216  	
217  	        if (op->params && pcmk__strcase_any_of(op->op_type, PCMK_ACTION_START,
218  	                                               PCMK_ACTION_RELOAD,
219  	                                               PCMK_ACTION_RELOAD_AGENT,
220  	                                               PCMK_ACTION_MONITOR, NULL)) {
221  	
222  	            g_clear_pointer(&entry->stop_params, g_hash_table_destroy);
223  	            entry->stop_params = pcmk__strkey_table(free, free);
224  	
225  	            g_hash_table_foreach(op->params, copy_instance_keys, entry->stop_params);
226  	        }
227  	    }
228  	
229  	    if (op->interval_ms > 0) {
230  	        /* Ensure there are no duplicates */
231  	        history_remove_recurring_op(entry, op);
232  	
233  	        pcmk__trace("Adding recurring op: " PCMK__OP_FMT, op->rsc_id,
234  	                    op->op_type, op->interval_ms);
235  	        entry->recurring_op_list = g_list_prepend(entry->recurring_op_list, lrmd_copy_event(op));
236  	
237  	    } else if ((entry->recurring_op_list != NULL)
238  	                && !pcmk__str_eq(op->op_type, PCMK_ACTION_MONITOR,
239  	                                 pcmk__str_casei)) {
240  	        pcmk__trace("Dropping %u recurring ops because of: " PCMK__OP_FMT,
241  	                    g_list_length(entry->recurring_op_list), op->rsc_id,
242  	                    op->op_type, op->interval_ms);
243  	        history_free_recurring_ops(entry);
244  	    }
245  	}
246  	
247  	/*!
248  	 * \internal
249  	 * \brief Send a direct OK ack for a resource task
250  	 *
251  	 * \param[in] lrm_state  LRM connection
252  	 * \param[in] input      Input message being ack'ed
253  	 * \param[in] rsc_id     ID of affected resource
254  	 * \param[in] rsc        Affected resource (if available)
255  	 * \param[in] task       Operation task being ack'ed
256  	 * \param[in] ack_host   Name of host to send ack to
257  	 * \param[in] ack_sys    IPC system name to ack
258  	 */
259  	static void
260  	send_task_ok_ack(const lrm_state_t *lrm_state, const ha_msg_input_t *input,
261  	                 const char *rsc_id, const lrmd_rsc_info_t *rsc,
262  	                 const char *task, const char *ack_host, const char *ack_sys)
263  	{
264  	    lrmd_event_data_t *op = construct_op(lrm_state, input->xml, rsc_id, task);
265  	
266  	    lrmd__set_result(op, PCMK_OCF_OK, PCMK_EXEC_DONE, NULL);
267  	    controld_ack_event_directly(ack_host, ack_sys, rsc, op, rsc_id);
268  	    lrmd_free_event(op);
269  	}
270  	
271  	static inline const char *
272  	op_node_name(lrmd_event_data_t *op)
273  	{
274  	    return pcmk__s(op->remote_nodename,
275  	                   controld_globals.cluster->priv->node_name);
276  	}
277  	
278  	void
279  	lrm_op_callback(lrmd_event_data_t * op)
280  	{
281  	    CRM_CHECK(op != NULL, return);
282  	    switch (op->type) {
283  	        case lrmd_event_disconnect:
284  	            if (op->remote_nodename == NULL) {
285  	                /* If this is the local executor IPC connection, set the right
286  	                 * bits in the controller when the connection goes down.
287  	                 */
288  	                lrm_connection_destroy();
289  	            }
290  	            break;
291  	
292  	        case lrmd_event_exec_complete:
293  	            {
294  	                lrm_state_t *lrm_state =
295  	                    controld_get_executor_state(op_node_name(op), false);
296  	
297  	                pcmk__assert(lrm_state != NULL);
298  	                process_lrm_event(lrm_state, op, NULL, NULL);
299  	            }
300  	            break;
301  	
302  	        default:
303  	            break;
304  	    }
305  	}
306  	
307  	static void
308  	try_local_executor_connect(long long action, fsa_data_t *msg_data,
309  	                           lrm_state_t *lrm_state)
310  	{
311  	    int rc = pcmk_rc_ok;
312  	
313  	    pcmk__debug("Connecting to the local executor");
314  	
315  	    // If we can connect, great
316  	    rc = controld_connect_local_executor(lrm_state);
317  	    if (rc == pcmk_rc_ok) {
318  	        controld_set_fsa_input_flags(R_LRM_CONNECTED);
319  	        pcmk__info("Connection to the local executor established");
320  	        return;
321  	    }
322  	
323  	    // Otherwise, if we can try again, set a timer to do so
324  	    if (lrm_state->num_lrm_register_fails < MAX_LRM_REG_FAILS) {
325  	        pcmk__warn("Failed to connect to the local executor %d time%s "
326  	                   "(%d max): %s",
327  	                   lrm_state->num_lrm_register_fails,
328  	                   pcmk__plural_s(lrm_state->num_lrm_register_fails),
329  	                   MAX_LRM_REG_FAILS, pcmk_rc_str(rc));
330  	        controld_start_wait_timer();
331  	        controld_fsa_stall(msg_data, action);
332  	        return;
333  	    }
334  	
335  	    // Otherwise give up
336  	    pcmk__err("Failed to connect to the executor the max allowed %d time%s: %s",
337  	              lrm_state->num_lrm_register_fails,
338  	              pcmk__plural_s(lrm_state->num_lrm_register_fails),
339  	              pcmk_rc_str(rc));
340  	    register_fsa_error(I_ERROR, msg_data);
341  	}
342  	
343  	// A_LRM_CONNECT
344  	void
345  	do_lrm_control(long long action, enum crmd_fsa_cause cause,
346  	               enum crmd_fsa_state cur_state, enum crmd_fsa_input current_input,
347  	               fsa_data_t *msg_data)
348  	{
349  	    /* This only pertains to local executor connections. Remote connections are
350  	     * handled as resources within the scheduler. Connecting and disconnecting
351  	     * from remote executor instances is handled differently.
352  	     */
353  	    lrm_state_t *lrm_state = NULL;
354  	
355  	    if (controld_globals.cluster->priv->node_name == NULL) {
356  	        return; // Shouldn't be possible
357  	    }
358  	
359  	    lrm_state = controld_get_executor_state(NULL, true);
360  	    if (lrm_state == NULL) {
361  	        register_fsa_error(I_ERROR, msg_data);
362  	        return;
363  	    }
364  	
365  	    if (pcmk__is_set(action, A_LRM_DISCONNECT)) {
366  	        if (!lrm_state_verify_stopped(lrm_state, cur_state, LOG_INFO)
367  	            && (action == A_LRM_DISCONNECT)) {
368  	
369  	            controld_fsa_stall(msg_data, action);
370  	            return;
371  	        }
372  	
373  	        controld_clear_fsa_input_flags(R_LRM_CONNECTED);
374  	        lrm_state_disconnect(lrm_state);
375  	        lrm_state_reset_tables(lrm_state, FALSE);
376  	    }
377  	
378  	    if (pcmk__is_set(action, A_LRM_CONNECT)) {
379  	        try_local_executor_connect(action, msg_data, lrm_state);
380  	    }
381  	
382  	    if ((action & ~(A_LRM_CONNECT|A_LRM_DISCONNECT)) != 0) {
383  	        pcmk__err("Unexpected action %s in %s", fsa_action2string(action),
384  	                  __func__);
385  	    }
386  	}
387  	
388  	static gboolean
389  	lrm_state_verify_stopped(lrm_state_t * lrm_state, enum crmd_fsa_state cur_state, int log_level)
390  	{
391  	    int counter = 0;
392  	    gboolean rc = TRUE;
393  	    const char *when = "lrm disconnect";
394  	
395  	    GHashTableIter gIter;
396  	    const char *key = NULL;
397  	    rsc_history_t *entry = NULL;
398  	    active_op_t *pending = NULL;
399  	
400  	    pcmk__debug("Checking for active resources before exit");
401  	
402  	    if (cur_state == S_TERMINATE) {
403  	        log_level = LOG_ERR;
404  	        when = "shutdown";
405  	
406  	    } else if (pcmk__is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
407  	        when = "shutdown... waiting";
408  	    }
409  	
410  	    if ((lrm_state->active_ops != NULL) && lrm_state_is_connected(lrm_state)) {
411  	        guint removed = g_hash_table_foreach_remove(lrm_state->active_ops,
412  	                                                    stop_recurring_actions,
413  	                                                    lrm_state);
414  	        guint nremaining = g_hash_table_size(lrm_state->active_ops);
415  	
416  	        if (removed || nremaining) {
417  	            pcmk__notice("Stopped %u recurring operation%s at %s (%u "
418  	                         "remaining)",
419  	                         removed, pcmk__plural_s(removed), when, nremaining);
420  	        }
421  	    }
422  	
423  	    if (lrm_state->active_ops != NULL) {
424  	        g_hash_table_iter_init(&gIter, lrm_state->active_ops);
425  	        while (g_hash_table_iter_next(&gIter, NULL, (void **)&pending)) {
426  	            /* Ignore recurring actions in the shutdown calculations */
427  	            if (pending->interval_ms == 0) {
428  	                counter++;
429  	            }
430  	        }
431  	    }
432  	
433  	    if (counter > 0) {
434  	        do_crm_log(log_level, "%d pending executor operation%s at %s",
435  	                   counter, pcmk__plural_s(counter), when);
436  	
437  	        if ((cur_state == S_TERMINATE)
438  	            || !pcmk__is_set(controld_globals.fsa_input_register,
439  	                             R_SENT_RSC_STOP)) {
440  	            g_hash_table_iter_init(&gIter, lrm_state->active_ops);
441  	            while (g_hash_table_iter_next(&gIter, (gpointer*)&key, (gpointer*)&pending)) {
442  	                do_crm_log(log_level, "Pending action: %s (%s)", key, pending->op_key);
443  	            }
444  	
445  	        } else {
446  	            rc = FALSE;
447  	        }
448  	        return rc;
449  	    }
450  	
451  	    if (lrm_state->resource_history == NULL) {
452  	        return rc;
453  	    }
454  	
455  	    if (pcmk__is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
456  	        /* At this point we're not waiting, we're just shutting down */
457  	        when = "shutdown";
458  	    }
459  	
460  	    counter = 0;
461  	    g_hash_table_iter_init(&gIter, lrm_state->resource_history);
462  	    while (g_hash_table_iter_next(&gIter, NULL, (gpointer*)&entry)) {
463  	        if (is_rsc_active(lrm_state, entry->id) == FALSE) {
464  	            continue;
465  	        }
466  	
467  	        counter++;
468  	        if (log_level == LOG_ERR) {
469  	            pcmk__info("Found %s active at %s", entry->id, when);
470  	        } else {
471  	            pcmk__trace("Found %s active at %s", entry->id, when);
472  	        }
473  	        if (lrm_state->active_ops != NULL) {
474  	            GHashTableIter hIter;
475  	
476  	            g_hash_table_iter_init(&hIter, lrm_state->active_ops);
477  	            while (g_hash_table_iter_next(&hIter, (gpointer*)&key, (gpointer*)&pending)) {
478  	                if (pcmk__str_eq(entry->id, pending->rsc_id, pcmk__str_none)) {
479  	                    const bool recurring = (pending->interval_ms != 0);
480  	
481  	                    pcmk__notice("%s %s (%s) incomplete at %s",
482  	                                 (recurring? "Recurring action" : "Action"),
483  	                                 key, pending->op_key, when);
484  	                }
485  	            }
486  	        }
487  	    }
488  	
489  	    if (counter) {
490  	        pcmk__err("%d resource%s active at %s",
491  	                  counter, ((counter == 1)? " was" : "s were"), when);
492  	    }
493  	
494  	    return rc;
495  	}
496  	
497  	static gboolean
498  	is_rsc_active(lrm_state_t * lrm_state, const char *rsc_id)
499  	{
500  	    rsc_history_t *entry = NULL;
501  	
502  	    entry = g_hash_table_lookup(lrm_state->resource_history, rsc_id);
503  	    if (entry == NULL || entry->last == NULL) {
504  	        return FALSE;
505  	    }
506  	
507  	    pcmk__trace("Processing %s: %s.%d=%d", rsc_id, entry->last->op_type,
508  	                entry->last->interval_ms, entry->last->rc);
509  	    if ((entry->last->rc == PCMK_OCF_OK)
510  	        && pcmk__str_eq(entry->last->op_type, PCMK_ACTION_STOP,
511  	                        pcmk__str_casei)) {
512  	        return FALSE;
513  	
514  	    } else if (entry->last->rc == PCMK_OCF_OK
515  	               && pcmk__str_eq(entry->last->op_type, PCMK_ACTION_MIGRATE_TO,
516  	                               pcmk__str_casei)) {
517  	        // A stricter check is too complex ... leave that to the scheduler
518  	        return FALSE;
519  	
520  	    } else if (entry->last->rc == PCMK_OCF_NOT_RUNNING) {
521  	        return FALSE;
522  	
523  	    } else if ((entry->last->interval_ms == 0)
524  	               && (entry->last->rc == PCMK_OCF_NOT_CONFIGURED)) {
525  	        /* Badly configured resources can't be reliably stopped */
526  	        return FALSE;
527  	    }
528  	
529  	    return TRUE;
530  	}
531  	
532  	static gboolean
533  	build_active_RAs(lrm_state_t * lrm_state, xmlNode * rsc_list)
534  	{
535  	    GHashTableIter iter;
536  	    rsc_history_t *entry = NULL;
537  	
538  	    g_hash_table_iter_init(&iter, lrm_state->resource_history);
539  	    while (g_hash_table_iter_next(&iter, NULL, (void **)&entry)) {
540  	
541  	        GList *gIter = NULL;
542  	        xmlNode *xml_rsc = pcmk__xe_create(rsc_list, PCMK__XE_LRM_RESOURCE);
543  	
544  	        pcmk__xe_set(xml_rsc, PCMK_XA_ID, entry->id);
545  	        pcmk__xe_set(xml_rsc, PCMK_XA_TYPE, entry->rsc.type);
546  	        pcmk__xe_set(xml_rsc, PCMK_XA_CLASS, entry->rsc.standard);
547  	        pcmk__xe_set(xml_rsc, PCMK_XA_PROVIDER, entry->rsc.provider);
548  	
549  	        if (entry->last && entry->last->params) {
550  	            static const char *name = CRM_META "_" PCMK__META_CONTAINER;
551  	            const char *container = g_hash_table_lookup(entry->last->params,
552  	                                                        name);
553  	
554  	            if (container) {
555  	                pcmk__trace("Resource %s is a part of container resource %s",
556  	                            entry->id, container);
557  	                pcmk__xe_set(xml_rsc, PCMK__META_CONTAINER, container);
558  	            }
559  	        }
560  	        controld_add_resource_history_xml(xml_rsc, &(entry->rsc), entry->failed,
561  	                                          lrm_state->node_name);
562  	        controld_add_resource_history_xml(xml_rsc, &(entry->rsc), entry->last,
563  	                                          lrm_state->node_name);
564  	        for (gIter = entry->recurring_op_list; gIter != NULL; gIter = gIter->next) {
565  	            controld_add_resource_history_xml(xml_rsc, &(entry->rsc), gIter->data,
566  	                                              lrm_state->node_name);
567  	        }
568  	    }
569  	
570  	    return FALSE;
571  	}
572  	
573  	xmlNode *
574  	controld_query_executor_state(void)
575  	{
576  	    // @TODO Ensure all callers handle NULL returns
577  	    xmlNode *xml_state = NULL;
578  	    xmlNode *xml_data = NULL;
579  	    xmlNode *rsc_list = NULL;
580  	    pcmk__node_status_t *peer = NULL;
581  	    lrm_state_t *lrm_state = controld_get_executor_state(NULL, false);
582  	
583  	    if (!lrm_state) {
584  	        pcmk__err("Could not get executor state for local node");
585  	        return NULL;
586  	    }
587  	
588  	    peer = pcmk__get_node(0, lrm_state->node_name, NULL, pcmk__node_search_any);
589  	    CRM_CHECK(peer != NULL, return NULL);
590  	
591  	    xml_state = create_node_state_update(peer,
592  	                                         controld_node_update_cluster
593  	                                         |controld_node_update_peer,
594  	                                         NULL, __func__);
595  	    if (xml_state == NULL) {
596  	        return NULL;
597  	    }
598  	
599  	    xml_data = pcmk__xe_create(xml_state, PCMK__XE_LRM);
600  	    pcmk__xe_set(xml_data, PCMK_XA_ID, peer->xml_id);
601  	    rsc_list = pcmk__xe_create(xml_data, PCMK__XE_LRM_RESOURCES);
602  	
603  	    // Build a list of active (not necessarily running) resources
604  	    build_active_RAs(lrm_state, rsc_list);
605  	
606  	    pcmk__log_xml_trace(xml_state, "Current executor state");
607  	
608  	    return xml_state;
609  	}
610  	
611  	/*!
612  	 * \internal
613  	 * \brief Map standard Pacemaker return code to operation status and OCF code
614  	 *
615  	 * \param[out] event  Executor event whose status and return code should be set
616  	 * \param[in]  rc     Standard Pacemaker return code
617  	 */
618  	void
619  	controld_rc2event(lrmd_event_data_t *event, int rc)
620  	{
621  	    /* This is called for cleanup requests from controller peers/clients, not
622  	     * for resource actions, so no exit reason is needed.
623  	     */
624  	    switch (rc) {
625  	        case pcmk_rc_ok:
626  	            lrmd__set_result(event, PCMK_OCF_OK, PCMK_EXEC_DONE, NULL);
627  	            break;
628  	        case EACCES:
629  	            lrmd__set_result(event, PCMK_OCF_INSUFFICIENT_PRIV,
630  	                             PCMK_EXEC_ERROR, NULL);
631  	            break;
632  	        default:
633  	            lrmd__set_result(event, PCMK_OCF_UNKNOWN_ERROR, PCMK_EXEC_ERROR,
634  	                             NULL);
635  	            break;
636  	    }
637  	}
638  	
639  	/*!
640  	 * \internal
641  	 * \brief Trigger a new transition after CIB status was deleted
642  	 *
643  	 * If a CIB status delete was not expected (as part of the transition graph),
644  	 * trigger a new transition by updating the (arbitrary) "last-lrm-refresh"
645  	 * cluster property.
646  	 *
647  	 * \param[in] from_sys  IPC name that requested the delete
648  	 * \param[in] rsc_id    Resource whose status was deleted (for logging only)
649  	 */
650  	void
651  	controld_trigger_delete_refresh(const char *from_sys, const char *rsc_id)
652  	{
653  	    if (!pcmk__str_eq(from_sys, CRM_SYSTEM_TENGINE, pcmk__str_casei)) {
654  	        char *now_s = pcmk__assert_asprintf("%lld", (long long) time(NULL));
655  	
656  	        pcmk__debug("Triggering a refresh after %s cleaned %s", from_sys,
657  	                    rsc_id);
658  	        cib__update_node_attr(controld_globals.logger_out,
659  	                              controld_globals.cib_conn, cib_none,
660  	                              PCMK_XE_CRM_CONFIG, NULL, NULL, NULL, NULL,
661  	                              "last-lrm-refresh", now_s, NULL, NULL);
662  	        free(now_s);
663  	    }
664  	}
665  	
666  	static void
667  	notify_deleted(lrm_state_t * lrm_state, ha_msg_input_t * input, const char *rsc_id, int rc)
668  	{
669  	    lrmd_event_data_t *op = NULL;
670  	    const char *from_sys = pcmk__xe_get(input->msg, PCMK__XA_CRM_SYS_FROM);
671  	    const char *from_host = pcmk__xe_get(input->msg, PCMK__XA_SRC);
672  	
673  	    pcmk__info("Notifying %s on %s that %s was%s deleted", from_sys,
674  	               pcmk__s(from_host, "localhost"), rsc_id,
675  	               ((rc == pcmk_ok)? "" : " not"));
676  	    op = construct_op(lrm_state, input->xml, rsc_id, PCMK_ACTION_DELETE);
677  	    controld_rc2event(op, pcmk_legacy2rc(rc));
678  	    controld_ack_event_directly(from_host, from_sys, NULL, op, rsc_id);
679  	    lrmd_free_event(op);
680  	    controld_trigger_delete_refresh(from_sys, rsc_id);
681  	}
682  	
683  	static gboolean
684  	lrm_remove_deleted_rsc(gpointer key, gpointer value, gpointer user_data)
685  	{
686  	    struct delete_event_s *event = user_data;
687  	    struct pending_deletion_op_s *op = value;
688  	
689  	    if (pcmk__str_eq(event->rsc, op->rsc, pcmk__str_none)) {
690  	        notify_deleted(event->lrm_state, op->input, event->rsc, event->rc);
691  	        return TRUE;
692  	    }
693  	    return FALSE;
694  	}
695  	
696  	static gboolean
697  	lrm_remove_deleted_op(gpointer key, gpointer value, gpointer user_data)
698  	{
699  	    const char *rsc = user_data;
700  	    active_op_t *pending = value;
701  	
702  	    if (pcmk__str_eq(rsc, pending->rsc_id, pcmk__str_none)) {
703  	        pcmk__info("Removing op %s:%d for deleted resource %s", pending->op_key,
704  	                   pending->call_id, rsc);
705  	        return TRUE;
706  	    }
707  	    return FALSE;
708  	}
709  	
710  	static void
711  	delete_rsc_entry(lrm_state_t *lrm_state, ha_msg_input_t *input,
712  	                 const char *rsc_id, GHashTableIter *rsc_iter, int rc,
713  	                 const char *user_name, bool from_cib)
714  	{
715  	    struct delete_event_s event;
716  	
717  	    CRM_CHECK(rsc_id != NULL, return);
718  	
719  	    if (rc == pcmk_ok) {
720  	        char *rsc_id_copy = pcmk__str_copy(rsc_id);
721  	
722  	        if (rsc_iter) {
723  	            g_hash_table_iter_remove(rsc_iter);
724  	        } else {
725  	            g_hash_table_remove(lrm_state->resource_history, rsc_id_copy);
726  	        }
727  	
728  	        if (from_cib) {
729  	            controld_delete_resource_history(rsc_id_copy, lrm_state->node_name,
730  	                                             user_name, crmd_cib_smart_opt());
731  	        }
732  	        g_hash_table_foreach_remove(lrm_state->active_ops,
733  	                                    lrm_remove_deleted_op, rsc_id_copy);
734  	        free(rsc_id_copy);
735  	    }
736  	
737  	    if (input) {
738  	        notify_deleted(lrm_state, input, rsc_id, rc);
739  	    }
740  	
741  	    event.rc = rc;
742  	    event.rsc = rsc_id;
743  	    event.lrm_state = lrm_state;
744  	    g_hash_table_foreach_remove(lrm_state->deletion_ops, lrm_remove_deleted_rsc, &event);
745  	}
746  	
747  	static inline gboolean
748  	last_failed_matches_op(rsc_history_t *entry, const char *op, guint interval_ms)
749  	{
750  	    if (entry == NULL) {
751  	        return FALSE;
752  	    }
753  	    if (op == NULL) {
754  	        return TRUE;
755  	    }
756  	    return (pcmk__str_eq(op, entry->failed->op_type, pcmk__str_casei)
757  	            && (interval_ms == entry->failed->interval_ms));
758  	}
759  	
760  	/*!
761  	 * \internal
762  	 * \brief Clear a resource's last failure
763  	 *
764  	 * Erase a resource's last failure on a particular node from both the
765  	 * LRM resource history in the CIB, and the resource history remembered
766  	 * for the LRM state.
767  	 *
768  	 * \param[in] rsc_id      Resource name
769  	 * \param[in] node_name   Node name
770  	 * \param[in] operation   If specified, only clear if matching this operation
771  	 * \param[in] interval_ms If operation is specified, it has this interval
772  	 */
773  	void
774  	lrm_clear_last_failure(const char *rsc_id, const char *node_name,
775  	                       const char *operation, guint interval_ms)
776  	{
777  	    lrm_state_t *lrm_state = controld_get_executor_state(node_name, false);
778  	
779  	    if (lrm_state == NULL) {
780  	        return;
781  	    }
782  	    if (lrm_state->resource_history != NULL) {
783  	        rsc_history_t *entry = g_hash_table_lookup(lrm_state->resource_history,
784  	                                                   rsc_id);
785  	
786  	        if (last_failed_matches_op(entry, operation, interval_ms)) {
787  	            g_clear_pointer(&entry->failed, lrmd_free_event);
788  	        }
789  	    }
790  	}
791  	
792  	/* Returns: gboolean - cancellation is in progress */
793  	static gboolean
794  	cancel_op(lrm_state_t * lrm_state, const char *rsc_id, const char *key, int op, gboolean remove)
795  	{
796  	    int rc = pcmk_ok;
797  	    char *local_key = NULL;
798  	    active_op_t *pending = NULL;
799  	
800  	    CRM_CHECK(op != 0, return FALSE);
801  	    CRM_CHECK(rsc_id != NULL, return FALSE);
802  	    if (key == NULL) {
803  	        local_key = make_stop_id(rsc_id, op);
804  	        key = local_key;
805  	    }
806  	    pending = g_hash_table_lookup(lrm_state->active_ops, key);
807  	
808  	    if (pending) {
809  	        if (remove && !pcmk__is_set(pending->flags, active_op_remove)) {
810  	            controld_set_active_op_flags(pending, active_op_remove);
811  	            pcmk__debug("Scheduling %s for removal", key);
812  	        }
813  	
814  	        if (pcmk__is_set(pending->flags, active_op_cancelled)) {
815  	            pcmk__debug("Operation %s already cancelled", key);
816  	            free(local_key);
817  	            return FALSE;
818  	        }
819  	        controld_set_active_op_flags(pending, active_op_cancelled);
820  	
821  	    } else {
822  	        pcmk__info("No pending op found for %s", key);
823  	        free(local_key);
824  	        return FALSE;
825  	    }
826  	
827  	    pcmk__debug("Cancelling op %d for %s (%s)", op, rsc_id, key);
828  	    rc = lrm_state_cancel(lrm_state, pending->rsc_id, pending->op_type,
829  	                          pending->interval_ms);
830  	    if (rc == pcmk_ok) {
831  	        pcmk__debug("Op %d for %s (%s): cancelled", op, rsc_id, key);
832  	        free(local_key);
833  	        return TRUE;
834  	    }
835  	
836  	    pcmk__debug("Op %d for %s (%s): Nothing to cancel", op, rsc_id, key);
837  	    /* The caller needs to make sure the entry is
838  	     * removed from the active operations list
839  	     *
840  	     * Usually by returning TRUE inside the worker function
841  	     * supplied to g_hash_table_foreach_remove()
842  	     *
843  	     * Not removing the entry from active operations will block
844  	     * the node from shutting down
845  	     */
846  	    free(local_key);
847  	    return FALSE;
848  	}
849  	
850  	struct cancel_data {
851  	    gboolean done;
852  	    gboolean remove;
853  	    const char *key;
854  	    lrmd_rsc_info_t *rsc;
855  	    lrm_state_t *lrm_state;
856  	};
857  	
858  	static gboolean
859  	cancel_action_by_key(gpointer key, gpointer value, gpointer user_data)
860  	{
861  	    gboolean remove = FALSE;
862  	    struct cancel_data *data = user_data;
863  	    active_op_t *op = value;
864  	
865  	    if (pcmk__str_eq(op->op_key, data->key, pcmk__str_none)) {
866  	        data->done = TRUE;
867  	        remove = !cancel_op(data->lrm_state, data->rsc->id, key, op->call_id, data->remove);
868  	    }
869  	    return remove;
870  	}
871  	
872  	static gboolean
873  	cancel_op_key(lrm_state_t * lrm_state, lrmd_rsc_info_t * rsc, const char *key, gboolean remove)
874  	{
875  	    guint removed = 0;
876  	    struct cancel_data data;
877  	
878  	    CRM_CHECK(rsc != NULL, return FALSE);
879  	    CRM_CHECK(key != NULL, return FALSE);
880  	
881  	    data.key = key;
882  	    data.rsc = rsc;
883  	    data.done = FALSE;
884  	    data.remove = remove;
885  	    data.lrm_state = lrm_state;
886  	
887  	    removed = g_hash_table_foreach_remove(lrm_state->active_ops,
888  	                                          cancel_action_by_key, &data);
889  	    pcmk__trace("Removed %u op cache entries, new size: %u",
890  	                removed, g_hash_table_size(lrm_state->active_ops));
891  	    return data.done;
892  	}
893  	
894  	/*!
895  	 * \internal
896  	 * \brief Retrieve resource information from LRM
897  	 *
898  	 * \param[in,out]  lrm_state  Executor connection state to use
899  	 * \param[in]      rsc_xml    XML containing resource configuration
900  	 * \param[in]      do_create  If true, register resource if not already
901  	 * \param[out]     rsc_info   Where to store information obtained from executor
902  	 *
903  	 * \retval pcmk_ok   Success (and rsc_info holds newly allocated result)
904  	 * \retval -EINVAL   Required information is missing from arguments
905  	 * \retval -ENOTCONN No active connection to LRM
906  	 * \retval -ENODEV   Resource not found
907  	 * \retval -errno    Error communicating with executor when registering resource
908  	 *
909  	 * \note Caller is responsible for freeing result on success.
910  	 */
911  	static int
912  	get_lrm_resource(lrm_state_t *lrm_state, const xmlNode *rsc_xml,
913  	                 gboolean do_create, lrmd_rsc_info_t **rsc_info)
914  	{
915  	    const char *id = pcmk__xe_id(rsc_xml);
916  	
917  	    CRM_CHECK(lrm_state && rsc_xml && rsc_info, return -EINVAL);
918  	    CRM_CHECK(id, return -EINVAL);
919  	
920  	    if (lrm_state_is_connected(lrm_state) == FALSE) {
921  	        return -ENOTCONN;
922  	    }
923  	
924  	    pcmk__trace("Retrieving resource information for %s from the executor",
925  	                id);
926  	    *rsc_info = lrm_state_get_rsc_info(lrm_state, id, 0);
927  	
928  	    // If resource isn't known by ID, try clone name, if provided
929  	    if (!*rsc_info) {
930  	        const char *long_id = pcmk__xe_get(rsc_xml, PCMK__XA_LONG_ID);
931  	
932  	        if (long_id) {
933  	            *rsc_info = lrm_state_get_rsc_info(lrm_state, long_id, 0);
934  	        }
935  	    }
936  	
937  	    if ((*rsc_info == NULL) && do_create) {
938  	        const char *class = pcmk__xe_get(rsc_xml, PCMK_XA_CLASS);
939  	        const char *provider = pcmk__xe_get(rsc_xml, PCMK_XA_PROVIDER);
940  	        const char *type = pcmk__xe_get(rsc_xml, PCMK_XA_TYPE);
941  	        int rc;
942  	
943  	        pcmk__trace("Registering resource %s with the executor", id);
944  	        rc = lrm_state_register_rsc(lrm_state, id, class, provider, type,
945  	                                    lrmd_opt_drop_recurring);
946  	        if (rc != pcmk_ok) {
947  	            pcmk__err("Could not register resource %s with the executor on %s: "
948  	                      "%s " QB_XS " rc=%d",
949  	                      id, lrm_state->node_name, pcmk_strerror(rc), rc);
950  	
951  	            /* Register this as an internal error if this involves the local
952  	             * executor. Otherwise, we're likely dealing with an unresponsive
953  	             * remote node, which is not an FSA failure.
954  	             */
955  	            if (lrm_state_is_local(lrm_state) == TRUE) {
956  	                register_fsa_error(I_FAIL, NULL);
957  	            }
958  	            return rc;
959  	        }
960  	
961  	        *rsc_info = lrm_state_get_rsc_info(lrm_state, id, 0);
962  	    }
963  	    return *rsc_info? pcmk_ok : -ENODEV;
964  	}
965  	
966  	static void
967  	delete_resource(lrm_state_t *lrm_state, const char *id, lrmd_rsc_info_t *rsc,
968  	                GHashTableIter *iter, const char *sys, const char *user,
969  	                ha_msg_input_t *request, bool unregister, bool from_cib)
970  	{
971  	    int rc = pcmk_ok;
972  	
973  	    pcmk__info("Removing resource %s from executor for %s%s%s", id, sys,
974  	               ((user != NULL)? " as " : ""), pcmk__s(user, ""));
975  	
976  	    if (rsc && unregister) {
977  	        rc = lrm_state_unregister_rsc(lrm_state, id, 0);
978  	    }
979  	
980  	    if (rc == pcmk_ok) {
981  	        pcmk__trace("Resource %s deleted from executor", id);
982  	    } else if (rc == -EINPROGRESS) {
983  	        pcmk__info("Deletion of resource '%s' from executor is pending", id);
984  	        if (request) {
985  	            struct pending_deletion_op_s *op = NULL;
986  	            char *ref = pcmk__xe_get_copy(request->msg, PCMK_XA_REFERENCE);
987  	
988  	            op = pcmk__assert_alloc(1, sizeof(struct pending_deletion_op_s));
989  	            op->rsc = pcmk__str_copy(rsc->id);
990  	            op->input = copy_ha_msg_input(request);
991  	            g_hash_table_insert(lrm_state->deletion_ops, ref, op);
992  	        }
993  	        return;
994  	    } else {
995  	        pcmk__warn("Could not delete '%s' from executor for %s%s%s: %s "
996  	                   QB_XS " rc=%d",
997  	                   id, sys, ((user != NULL)? " as " : ""), pcmk__s(user, ""),
998  	                   pcmk_strerror(rc), rc);
999  	    }
1000 	
1001 	    delete_rsc_entry(lrm_state, request, id, iter, rc, user, from_cib);
1002 	}
1003 	
1004 	static int
1005 	get_fake_call_id(lrm_state_t *lrm_state, const char *rsc_id)
1006 	{
1007 	    int call_id = 999999999;
1008 	    rsc_history_t *entry = NULL;
1009 	
1010 	    if(lrm_state) {
1011 	        entry = g_hash_table_lookup(lrm_state->resource_history, rsc_id);
1012 	    }
1013 	
1014 	    /* Make sure the call id is greater than the last successful operation,
1015 	     * otherwise the failure will not result in a possible recovery of the resource
1016 	     * as it could appear the failure occurred before the successful start */
1017 	    if (entry) {
1018 	        call_id = entry->last_callid + 1;
1019 	    }
1020 	
1021 	    if (call_id < 0) {
1022 	        call_id = 1;
1023 	    }
1024 	    return call_id;
1025 	}
1026 	
1027 	static void
1028 	fake_op_status(lrm_state_t *lrm_state, lrmd_event_data_t *op, int op_status,
1029 	               enum ocf_exitcode op_exitcode, const char *exit_reason)
1030 	{
1031 	    op->call_id = get_fake_call_id(lrm_state, op->rsc_id);
1032 	    op->t_run = time(NULL);
1033 	    op->t_rcchange = op->t_run;
1034 	    lrmd__set_result(op, op_exitcode, op_status, exit_reason);
1035 	}
1036 	
1037 	static void
1038 	force_reprobe(lrm_state_t *lrm_state, const char *from_sys,
1039 	              const char *from_host, const char *user_name,
1040 	              gboolean is_remote_node, bool reprobe_all_nodes)
1041 	{
1042 	    GHashTableIter gIter;
1043 	    rsc_history_t *entry = NULL;
1044 	
1045 	    pcmk__info("Clearing resource history on node %s", lrm_state->node_name);
1046 	    g_hash_table_iter_init(&gIter, lrm_state->resource_history);
1047 	    while (g_hash_table_iter_next(&gIter, NULL, (void **)&entry)) {
1048 	        /* only unregister the resource during a reprobe if it is not a remote connection
1049 	         * resource. otherwise unregistering the connection will terminate remote-node
1050 	         * membership */
1051 	        bool unregister = true;
1052 	
1053 	        if (is_remote_lrmd_ra(NULL, NULL, entry->id)) {
1054 	            unregister = false;
1055 	
1056 	            if (reprobe_all_nodes) {
1057 	                lrm_state_t *remote_lrm_state =
1058 	                    controld_get_executor_state(entry->id, false);
1059 	
1060 	                if (remote_lrm_state != NULL) {
1061 	                    /* If reprobing all nodes, be sure to reprobe the remote
1062 	                     * node before clearing its connection resource
1063 	                     */
1064 	                    force_reprobe(remote_lrm_state, from_sys, from_host,
1065 	                                  user_name, TRUE, reprobe_all_nodes);
1066 	                }
1067 	            }
1068 	        }
1069 	
1070 	        /* Don't delete from the CIB, since we'll delete the whole node's LRM
1071 	         * state from the CIB soon
1072 	         */
1073 	        delete_resource(lrm_state, entry->id, &entry->rsc, &gIter, from_sys,
1074 	                        user_name, NULL, unregister, false);
1075 	    }
1076 	
1077 	    /* Now delete the copy in the CIB */
1078 	    controld_delete_node_history(lrm_state->node_name, false, cib_none);
1079 	}
1080 	
1081 	/*!
1082 	 * \internal
1083 	 * \brief Fail a requested action without actually executing it
1084 	 *
1085 	 * For an action that can't be executed, process it similarly to an actual
1086 	 * execution result, with specified error status (except for notify actions,
1087 	 * which will always be treated as successful).
1088 	 *
1089 	 * \param[in,out] lrm_state    Executor connection that action is for
1090 	 * \param[in]     action       Action XML from request
1091 	 * \param[in]     rc           Desired return code to use
1092 	 * \param[in]     op_status    Desired operation status to use
1093 	 * \param[in]     exit_reason  Human-friendly detail, if error
1094 	 */
1095 	static void
1096 	synthesize_lrmd_failure(lrm_state_t *lrm_state, const xmlNode *action,
1097 	                        int op_status, enum ocf_exitcode rc,
1098 	                        const char *exit_reason)
1099 	{
1100 	    lrmd_event_data_t *op = NULL;
1101 	    const char *operation = pcmk__xe_get(action, PCMK_XA_OPERATION);
1102 	    const char *target_node = pcmk__xe_get(action, PCMK__META_ON_NODE);
1103 	    xmlNode *xml_rsc = pcmk__xe_first_child(action, PCMK_XE_PRIMITIVE, NULL,
1104 	                                            NULL);
1105 	
1106 	    if ((xml_rsc == NULL) || (pcmk__xe_id(xml_rsc) == NULL)) {
1107 	        /* @TODO Should we do something else, like direct ack? */
1108 	        pcmk__info("Can't fake %s failure (%d) on %s without resource "
1109 	                   "configuration",
1110 	                   pcmk__xe_get(action, PCMK__XA_OPERATION_KEY), rc,
1111 	                   target_node);
1112 	        return;
1113 	
1114 	    } else if(operation == NULL) {
1115 	        /* This probably came from crm_resource -C, nothing to do */
1116 	        pcmk__info("Can't fake %s failure (%d) on %s without operation",
1117 	                   pcmk__xe_id(xml_rsc), rc, target_node);
1118 	        return;
1119 	    }
1120 	
1121 	    op = construct_op(lrm_state, action, pcmk__xe_id(xml_rsc), operation);
1122 	
1123 	    if (pcmk__str_eq(operation, PCMK_ACTION_NOTIFY, pcmk__str_casei)) {
1124 	        // Notifications can't fail
1125 	        fake_op_status(lrm_state, op, PCMK_EXEC_DONE, PCMK_OCF_OK, NULL);
1126 	    } else {
1127 	        fake_op_status(lrm_state, op, op_status, rc, exit_reason);
1128 	    }
1129 	
1130 	    pcmk__info("Faking " PCMK__OP_FMT " result (%d) on %s", op->rsc_id,
1131 	               op->op_type, op->interval_ms, op->rc, target_node);
1132 	
1133 	    // Process the result as if it came from the LRM
1134 	    process_lrm_event(lrm_state, op, NULL, action);
1135 	    lrmd_free_event(op);
1136 	}
1137 	
1138 	/*!
1139 	 * \internal
1140 	 * \brief Get target of an LRM operation (replacing \p NULL with local node
1141 	 *        name)
1142 	 *
1143 	 * \param[in] xml  LRM operation data XML
1144 	 *
1145 	 * \return LRM operation target node name (local node or Pacemaker Remote node)
1146 	 */
1147 	static const char *
1148 	lrm_op_target(const xmlNode *xml)
1149 	{
1150 	    const char *target = NULL;
1151 	
1152 	    if (xml) {
1153 	        target = pcmk__xe_get(xml, PCMK__META_ON_NODE);
1154 	    }
1155 	    if (target == NULL) {
1156 	        target = controld_globals.cluster->priv->node_name;
1157 	    }
1158 	    return target;
1159 	}
1160 	
1161 	static void
1162 	fail_lrm_resource(xmlNode *xml, lrm_state_t *lrm_state, const char *user_name,
1163 	                  const char *from_host, const char *from_sys)
1164 	{
1165 	    lrmd_event_data_t *op = NULL;
1166 	    lrmd_rsc_info_t *rsc = NULL;
1167 	    xmlNode *xml_rsc = pcmk__xe_first_child(xml, PCMK_XE_PRIMITIVE, NULL, NULL);
1168 	
1169 	    CRM_CHECK(xml_rsc != NULL, return);
1170 	
1171 	    /* The executor simply executes operations and reports the results, without
1172 	     * any concept of success or failure, so to fail a resource, we must fake
1173 	     * what a failure looks like.
1174 	     *
1175 	     * To do this, we create a fake executor operation event for the resource,
1176 	     * and pass that event to the executor client callback so it will be
1177 	     * processed as if it came from the executor.
1178 	     */
1179 	    op = construct_op(lrm_state, xml, pcmk__xe_id(xml_rsc), "asyncmon");
1180 	
1181 	    g_clear_pointer(&op->user_data, free);
1182 	    op->interval_ms = 0;
1183 	
1184 	    if (user_name && !pcmk__is_privileged(user_name)) {
1185 	        pcmk__err("%s does not have permission to fail %s", user_name,
1186 	                  pcmk__xe_id(xml_rsc));
1187 	        fake_op_status(lrm_state, op, PCMK_EXEC_ERROR,
1188 	                       PCMK_OCF_INSUFFICIENT_PRIV,
1189 	                       "Unprivileged user cannot fail resources");
1190 	        controld_ack_event_directly(from_host, from_sys, NULL, op,
1191 	                                    pcmk__xe_id(xml_rsc));
1192 	        lrmd_free_event(op);
1193 	        return;
1194 	    }
1195 	
1196 	
1197 	    if (get_lrm_resource(lrm_state, xml_rsc, TRUE, &rsc) == pcmk_ok) {
1198 	        pcmk__info("Failing resource %s...", rsc->id);
1199 	        fake_op_status(lrm_state, op, PCMK_EXEC_DONE, PCMK_OCF_UNKNOWN_ERROR,
1200 	                       "Simulated failure");
1201 	        process_lrm_event(lrm_state, op, NULL, xml);
1202 	        op->rc = PCMK_OCF_OK; // The request to fail the resource succeeded
1203 	        lrmd_free_rsc_info(rsc);
1204 	
1205 	    } else {
1206 	        pcmk__info("Cannot find/create resource in order to fail it...");
1207 	        pcmk__log_xml_warn(xml, "bad input");
1208 	        fake_op_status(lrm_state, op, PCMK_EXEC_ERROR, PCMK_OCF_UNKNOWN_ERROR,
1209 	                       "Cannot fail unknown resource");
1210 	    }
1211 	
1212 	    controld_ack_event_directly(from_host, from_sys, NULL, op,
1213 	                                pcmk__xe_id(xml_rsc));
1214 	    lrmd_free_event(op);
1215 	}
1216 	
1217 	static void
1218 	handle_reprobe_op(lrm_state_t *lrm_state, xmlNode *msg, const char *from_sys,
1219 	                  const char *from_host, const char *user_name,
1220 	                  gboolean is_remote_node, bool reprobe_all_nodes)
1221 	{
1222 	    pcmk__notice("Forcing the status of all resources to be redetected");
1223 	    force_reprobe(lrm_state, from_sys, from_host, user_name, is_remote_node,
1224 	                  reprobe_all_nodes);
1225 	
1226 	    if (!pcmk__strcase_any_of(from_sys, CRM_SYSTEM_PENGINE, CRM_SYSTEM_TENGINE, NULL)) {
1227 	        xmlNode *reply = pcmk__new_reply(msg, NULL);
1228 	
1229 	        pcmk__debug("ACK'ing re-probe from %s (%s)", from_sys, from_host);
1230 	
1231 	        if (relay_message(reply, TRUE) == FALSE) {
1232 	            pcmk__log_xml_err(reply, "Unable to route reply");
1233 	        }
1234 	        pcmk__xml_free(reply);
1235 	    }
1236 	}
1237 	
1238 	static bool do_lrm_cancel(ha_msg_input_t *input, lrm_state_t *lrm_state,
1239 	              lrmd_rsc_info_t *rsc, const char *from_host, const char *from_sys)
1240 	{
1241 	    char *op_key = NULL;
1242 	    char *meta_key = NULL;
1243 	    int call = 0;
1244 	    const char *call_id = NULL;
1245 	    const char *op_task = NULL;
1246 	    guint interval_ms = 0;
1247 	    gboolean in_progress = FALSE;
1248 	    xmlNode *params = pcmk__xe_first_child(input->xml, PCMK__XE_ATTRIBUTES,
1249 	                                           NULL, NULL);
1250 	
1251 	    CRM_CHECK(params != NULL, return FALSE);
1252 	
1253 	    meta_key = crm_meta_name(PCMK_XA_OPERATION);
1254 	    op_task = pcmk__xe_get(params, meta_key);
1255 	    free(meta_key);
1256 	    CRM_CHECK(op_task != NULL, return FALSE);
1257 	
1258 	    meta_key = crm_meta_name(PCMK_META_INTERVAL);
1259 	    if (pcmk__xe_get_guint(params, meta_key, &interval_ms) != pcmk_rc_ok) {
1260 	        free(meta_key);
1261 	        return FALSE;
1262 	    }
1263 	    free(meta_key);
1264 	
1265 	    op_key = pcmk__op_key(rsc->id, op_task, interval_ms);
1266 	
1267 	    meta_key = crm_meta_name(PCMK__XA_CALL_ID);
1268 	    call_id = pcmk__xe_get(params, meta_key);
1269 	    free(meta_key);
1270 	
1271 	    pcmk__debug("Scheduler requested op %s (call=%s) be cancelled", op_key,
1272 	                pcmk__s(call_id, "NA"));
1273 	    pcmk__scan_min_int(call_id, &call, 0);
1274 	    if (call == 0) {
1275 	        // Normal case when the scheduler cancels a recurring op
1276 	        in_progress = cancel_op_key(lrm_state, rsc, op_key, TRUE);
1277 	
1278 	    } else {
1279 	        // Normal case when the scheduler cancels a removed op
1280 	        in_progress = cancel_op(lrm_state, rsc->id, NULL, call, TRUE);
1281 	    }
1282 	
1283 	    // Acknowledge cancellation operation if for a remote connection resource
1284 	    if (!in_progress || is_remote_lrmd_ra(NULL, NULL, rsc->id)) {
1285 	        char *op_id = make_stop_id(rsc->id, call);
1286 	
1287 	        if (is_remote_lrmd_ra(NULL, NULL, rsc->id) == FALSE) {
1288 	            pcmk__info("Nothing known about operation %d for %s", call, op_key);
1289 	        }
1290 	        controld_delete_action_history_by_key(rsc->id, lrm_state->node_name,
1291 	                                              op_key, call);
1292 	        send_task_ok_ack(lrm_state, input, rsc->id, rsc, op_task,
1293 	                         from_host, from_sys);
1294 	
1295 	        /* needed at least for cancellation of a remote operation */
1296 	        if (lrm_state->active_ops != NULL) {
1297 	            g_hash_table_remove(lrm_state->active_ops, op_id);
1298 	        }
1299 	        free(op_id);
1300 	    }
1301 	
1302 	    free(op_key);
1303 	    return TRUE;
1304 	}
1305 	
1306 	static void
1307 	do_lrm_delete(ha_msg_input_t *input, lrm_state_t *lrm_state,
1308 	              lrmd_rsc_info_t *rsc, const char *from_sys, const char *from_host,
1309 	              bool crm_rsc_delete, const char *user_name)
1310 	{
1311 	    bool unregister = true;
1312 	    int cib_rc = controld_delete_resource_history(rsc->id, lrm_state->node_name,
1313 	                                                  user_name,
1314 	                                                  cib_dryrun|cib_sync_call);
1315 	
1316 	    if (cib_rc != pcmk_rc_ok) {
1317 	        lrmd_event_data_t *op = NULL;
1318 	
1319 	        op = construct_op(lrm_state, input->xml, rsc->id, PCMK_ACTION_DELETE);
1320 	
1321 	        /* These are resource clean-ups, not actions, so no exit reason is
1322 	         * needed.
1323 	         */
1324 	        lrmd__set_result(op, pcmk_rc2ocf(cib_rc), PCMK_EXEC_ERROR, NULL);
1325 	        controld_ack_event_directly(from_host, from_sys, NULL, op, rsc->id);
1326 	        lrmd_free_event(op);
1327 	        return;
1328 	    }
1329 	
1330 	    if (crm_rsc_delete && is_remote_lrmd_ra(NULL, NULL, rsc->id)) {
1331 	        unregister = false;
1332 	    }
1333 	
1334 	    delete_resource(lrm_state, rsc->id, rsc, NULL, from_sys,
1335 	                    user_name, input, unregister, true);
1336 	}
1337 	
1338 	// User data for asynchronous metadata execution
1339 	struct metadata_cb_data {
1340 	    lrmd_rsc_info_t *rsc;   // Copy of resource information
1341 	    xmlNode *input_xml;     // Copy of FSA input XML
1342 	};
1343 	
1344 	static struct metadata_cb_data *
1345 	new_metadata_cb_data(lrmd_rsc_info_t *rsc, xmlNode *input_xml)
1346 	{
1347 	    struct metadata_cb_data *data = NULL;
1348 	
1349 	    data = pcmk__assert_alloc(1, sizeof(struct metadata_cb_data));
1350 	    data->input_xml = pcmk__xml_copy(NULL, input_xml);
1351 	    data->rsc = lrmd_copy_rsc_info(rsc);
1352 	    return data;
1353 	}
1354 	
1355 	static void
1356 	free_metadata_cb_data(struct metadata_cb_data *data)
1357 	{
1358 	    lrmd_free_rsc_info(data->rsc);
1359 	    pcmk__xml_free(data->input_xml);
1360 	    free(data);
1361 	}
1362 	
1363 	/*!
1364 	 * \internal
1365 	 * \brief Execute an action after metadata has been retrieved
1366 	 *
1367 	 * \param[in] pid        Ignored
1368 	 * \param[in] result     Result of metadata action
1369 	 * \param[in] user_data  Metadata callback data
1370 	 */
1371 	static void
1372 	metadata_complete(int pid, const pcmk__action_result_t *result, void *user_data)
1373 	{
1374 	    struct metadata_cb_data *data = (struct metadata_cb_data *) user_data;
1375 	
1376 	    struct ra_metadata_s *md = NULL;
1377 	    lrm_state_t *lrm_state =
1378 	        controld_get_executor_state(lrm_op_target(data->input_xml), false);
1379 	
1380 	    if ((lrm_state != NULL) && pcmk__result_ok(result)) {
1381 	        md = controld_cache_metadata(lrm_state->metadata_cache, data->rsc,
1382 	                                     result->action_stdout);
1383 	    }
1384 	    if (!pcmk__is_set(controld_globals.fsa_input_register, R_HA_DISCONNECTED)) {
1385 	        do_lrm_rsc_op(lrm_state, data->rsc, data->input_xml, md);
1386 	    }
1387 	    free_metadata_cb_data(data);
1388 	}
1389 	
1390 	void
1391 	controld_invoke_execd(fsa_data_t *msg_data)
1392 	{
1393 	    lrm_state_t *lrm_state = NULL;
1394 	    const char *crm_op = NULL;
1395 	    const char *from_sys = NULL;
1396 	    const char *from_host = NULL;
1397 	    const char *operation = NULL;
1398 	    const char *user_name = NULL;
1399 	    ha_msg_input_t *input = NULL;
1400 	    const char *target_node = NULL;
1401 	    gboolean is_remote_node = FALSE;
1402 	    bool crm_rsc_delete = FALSE;
1403 	
1404 	    pcmk__assert((msg_data != NULL) && (msg_data->data != NULL));
1405 	
1406 	    input = msg_data->data;
1407 	    target_node = lrm_op_target(input->xml);
1408 	
1409 	    // Message routed to the local node is targeting a specific, non-local node
1410 	    is_remote_node = !controld_is_local_node(target_node);
1411 	
1412 	    lrm_state = controld_get_executor_state(target_node, false);
1413 	    if ((lrm_state == NULL) && is_remote_node) {
1414 	        pcmk__err("Failing action because local node has never had connection "
1415 	                  "to remote node %s",
1416 	                  target_node);
1417 	        synthesize_lrmd_failure(NULL, input->xml, PCMK_EXEC_NOT_CONNECTED,
1418 	                                PCMK_OCF_UNKNOWN_ERROR,
1419 	                                "Local node has no connection to remote");
1420 	        return;
1421 	    }
1422 	    pcmk__assert(lrm_state != NULL);
1423 	
1424 	    user_name = pcmk__update_acl_user(input->msg, PCMK__XA_CRM_USER, NULL);
1425 	    crm_op = pcmk__xe_get(input->msg, PCMK__XA_CRM_TASK);
1426 	    from_sys = pcmk__xe_get(input->msg, PCMK__XA_CRM_SYS_FROM);
1427 	    if (!pcmk__str_eq(from_sys, CRM_SYSTEM_TENGINE, pcmk__str_none)) {
1428 	        from_host = pcmk__xe_get(input->msg, PCMK__XA_SRC);
1429 	    }
1430 	
1431 	    if (pcmk__str_eq(crm_op, PCMK_ACTION_LRM_DELETE, pcmk__str_none)) {
1432 	        if (!pcmk__str_eq(from_sys, CRM_SYSTEM_TENGINE, pcmk__str_none)) {
1433 	            crm_rsc_delete = TRUE; // from crm_resource
1434 	        }
1435 	        operation = PCMK_ACTION_DELETE;
1436 	
1437 	    } else if (input->xml != NULL) {
1438 	        operation = pcmk__xe_get(input->xml, PCMK_XA_OPERATION);
1439 	    }
1440 	
1441 	    CRM_CHECK(!pcmk__str_empty(crm_op) || !pcmk__str_empty(operation), return);
1442 	
1443 	    pcmk__trace("'%s' execution request from %s as %s user",
1444 	                pcmk__s(crm_op, operation),
1445 	                pcmk__s(from_sys, "unknown subsystem"),
1446 	                pcmk__s(user_name, "current"));
1447 	
1448 	    if (pcmk__str_eq(crm_op, CRM_OP_LRM_FAIL, pcmk__str_none)) {
1449 	        fail_lrm_resource(input->xml, lrm_state, user_name, from_host,
1450 	                          from_sys);
1451 	
1452 	    } else if (pcmk__str_eq(crm_op, CRM_OP_REPROBE, pcmk__str_none)
1453 	               || pcmk__str_eq(operation, CRM_OP_REPROBE, pcmk__str_none)) {
1454 	        const char *raw_target = NULL;
1455 	
1456 	        if (input->xml != NULL) {
1457 	            // For CRM_OP_REPROBE, a NULL target means we're targeting all nodes
1458 	            raw_target = pcmk__xe_get(input->xml, PCMK__META_ON_NODE);
1459 	        }
1460 	        handle_reprobe_op(lrm_state, input->msg, from_sys, from_host, user_name,
1461 	                          is_remote_node, (raw_target == NULL));
1462 	
1463 	    } else if (operation != NULL) {
1464 	        lrmd_rsc_info_t *rsc = NULL;
1465 	        xmlNode *xml_rsc = pcmk__xe_first_child(input->xml, PCMK_XE_PRIMITIVE,
1466 	                                                NULL, NULL);
1467 	        gboolean create_rsc = !pcmk__str_eq(operation, PCMK_ACTION_DELETE,
1468 	                                            pcmk__str_none);
1469 	        int rc;
1470 	
1471 	        // We can't return anything meaningful without a resource ID
1472 	        CRM_CHECK((xml_rsc != NULL) && (pcmk__xe_id(xml_rsc) != NULL), return);
1473 	
1474 	        rc = get_lrm_resource(lrm_state, xml_rsc, create_rsc, &rsc);
1475 	        if (rc == -ENOTCONN) {
1476 	            synthesize_lrmd_failure(lrm_state, input->xml,
1477 	                                    PCMK_EXEC_NOT_CONNECTED,
1478 	                                    PCMK_OCF_UNKNOWN_ERROR,
1479 	                                    "Not connected to remote executor");
1480 	            return;
1481 	
1482 	        } else if ((rc < 0) && !create_rsc) {
1483 	            /* Delete of malformed or nonexistent resource
1484 	             * (deleting something that does not exist is a success)
1485 	             */
1486 	            pcmk__debug("Not registering resource '%s' for a %s event "
1487 	                        QB_XS " get-rc=%d (%s) transition-key=%s",
1488 	                        pcmk__xe_id(xml_rsc), operation, rc, pcmk_strerror(rc),
1489 	                        pcmk__xe_id(input->xml));
1490 	            delete_rsc_entry(lrm_state, input, pcmk__xe_id(xml_rsc), NULL,
1491 	                             pcmk_ok, user_name, true);
1492 	            return;
1493 	
1494 	        } else if (rc == -EINVAL) {
1495 	            // Resource operation on malformed resource
1496 	            pcmk__err("Invalid resource definition for %s",
1497 	                      pcmk__xe_id(xml_rsc));
1498 	            pcmk__log_xml_warn(input->msg, "invalid resource");
1499 	            synthesize_lrmd_failure(lrm_state, input->xml, PCMK_EXEC_ERROR,
1500 	                                    PCMK_OCF_NOT_CONFIGURED, // fatal error
1501 	                                    "Invalid resource definition");
1502 	            return;
1503 	
1504 	        } else if (rc < 0) {
1505 	            // Error communicating with the executor
1506 	            pcmk__err("Could not register resource '%s' with executor: %s "
1507 	                      QB_XS " rc=%d",
1508 	                      pcmk__xe_id(xml_rsc), pcmk_strerror(rc), rc);
1509 	            pcmk__log_xml_warn(input->msg, "failed registration");
1510 	            synthesize_lrmd_failure(lrm_state, input->xml, PCMK_EXEC_ERROR,
1511 	                                    PCMK_OCF_INVALID_PARAM, // hard error
1512 	                                    "Could not register resource with executor");
1513 	            return;
1514 	        }
1515 	
1516 	        if (pcmk__str_eq(operation, PCMK_ACTION_CANCEL, pcmk__str_none)) {
1517 	            if (!do_lrm_cancel(input, lrm_state, rsc, from_host, from_sys)) {
1518 	                pcmk__log_xml_warn(input->xml, "Bad command");
1519 	            }
1520 	
1521 	        } else if (pcmk__str_eq(operation, PCMK_ACTION_DELETE,
1522 	                                pcmk__str_none)) {
1523 	            do_lrm_delete(input, lrm_state, rsc, from_sys, from_host,
1524 	                          crm_rsc_delete, user_name);
1525 	
1526 	        } else {
1527 	            struct ra_metadata_s *md = NULL;
1528 	
1529 	            /* Getting metadata from cache is OK except for start actions --
1530 	             * always refresh from the agent for those, in case the resource
1531 	             * agent was updated.
1532 	             *
1533 	             * @TODO Only refresh metadata for starts if the agent actually
1534 	             * changed (using something like inotify, or a hash or modification
1535 	             * time of the agent executable).
1536 	             */
1537 	            if (strcmp(operation, PCMK_ACTION_START) != 0) {
1538 	                md = controld_get_rsc_metadata(lrm_state, rsc,
1539 	                                               controld_metadata_from_cache);
1540 	            }
1541 	
1542 	            if ((md == NULL) && crm_op_needs_metadata(rsc->standard,
1543 	                                                      operation)) {
1544 	                /* Most likely, we'll need the agent metadata to record the
1545 	                 * pending operation and the operation result. Get it now rather
1546 	                 * than wait until then, so the metadata action doesn't eat into
1547 	                 * the real action's timeout.
1548 	                 *
1549 	                 * @TODO Metadata is retrieved via direct execution of the
1550 	                 * agent, which has a couple of related issues: the executor
1551 	                 * should execute agents, not the controller; and metadata for
1552 	                 * Pacemaker Remote nodes should be collected on those nodes,
1553 	                 * not locally.
1554 	                 */
1555 	                struct metadata_cb_data *data = NULL;
1556 	
1557 	                data = new_metadata_cb_data(rsc, input->xml);
1558 	                pcmk__info("Retrieving metadata for %s (%s%s%s:%s) "
1559 	                           "asynchronously",
1560 	                           rsc->id, rsc->standard,
1561 	                           ((rsc->provider != NULL)? ":" : ""),
1562 	                           pcmk__s(rsc->provider, ""), rsc->type);
1563 	                (void) lrmd__metadata_async(rsc, metadata_complete,
1564 	                                            (void *) data);
1565 	            } else {
1566 	                do_lrm_rsc_op(lrm_state, rsc, input->xml, md);
1567 	            }
1568 	        }
1569 	
1570 	        lrmd_free_rsc_info(rsc);
1571 	
1572 	    } else {
1573 	        pcmk__err("Invalid execution request: unknown command '%s' (bug?)",
1574 	                  crm_op);
1575 	        register_fsa_error(I_ERROR, msg_data);
1576 	    }
1577 	}
1578 	
1579 	static lrmd_event_data_t *
1580 	construct_op(const lrm_state_t *lrm_state, const xmlNode *rsc_op,
1581 	             const char *rsc_id, const char *operation)
1582 	{
1583 	    lrmd_event_data_t *op = NULL;
1584 	    const char *op_delay = NULL;
1585 	    const char *op_timeout = NULL;
1586 	    GHashTable *params = NULL;
1587 	
1588 	    xmlNode *primitive = NULL;
1589 	    const char *class = NULL;
1590 	
1591 	    const char *transition = NULL;
1592 	
1593 	    pcmk__assert((rsc_id != NULL) && (operation != NULL));
1594 	
1595 	    op = lrmd_new_event(rsc_id, operation, 0);
1596 	    op->type = lrmd_event_exec_complete;
1597 	    op->timeout = 0;
1598 	    op->start_delay = 0;
1599 	    lrmd__set_result(op, PCMK_OCF_UNKNOWN, PCMK_EXEC_PENDING, NULL);
1600 	
1601 	    if (rsc_op == NULL) {
1602 	        CRM_LOG_ASSERT(pcmk__str_eq(operation, PCMK_ACTION_STOP,
1603 	                                    pcmk__str_casei));
1604 	        op->user_data = NULL;
1605 	        /* the stop_all_resources() case
1606 	         * by definition there is no DC (or they'd be shutting
1607 	         *   us down).
1608 	         * So we should put our version here.
1609 	         */
1610 	        op->params = pcmk__strkey_table(free, free);
1611 	
1612 	        pcmk__insert_dup(op->params, PCMK_XA_CRM_FEATURE_SET, CRM_FEATURE_SET);
1613 	
1614 	        pcmk__trace("Constructed %s op for %s", operation, rsc_id);
1615 	        return op;
1616 	    }
1617 	
1618 	    params = xml2list(rsc_op);
1619 	    g_hash_table_remove(params, CRM_META "_" PCMK__META_OP_TARGET_RC);
1620 	
1621 	    op_delay = crm_meta_value(params, PCMK_META_START_DELAY);
1622 	    pcmk__scan_min_int(op_delay, &op->start_delay, 0);
1623 	
1624 	    op_timeout = crm_meta_value(params, PCMK_META_TIMEOUT);
1625 	    pcmk__scan_min_int(op_timeout, &op->timeout, 0);
1626 	
1627 	    if (pcmk__guint_from_hash(params, CRM_META "_" PCMK_META_INTERVAL, 0,
1628 	                              &(op->interval_ms)) != pcmk_rc_ok) {
1629 	        op->interval_ms = 0;
1630 	    }
1631 	
1632 	    /* Use pcmk_monitor_timeout instead of meta timeout for stonith
1633 	       recurring monitor, if set */
1634 	    primitive = pcmk__xe_first_child(rsc_op, PCMK_XE_PRIMITIVE, NULL, NULL);
1635 	    class = pcmk__xe_get(primitive, PCMK_XA_CLASS);
1636 	
1637 	    if (pcmk__is_set(pcmk_get_ra_caps(class), pcmk_ra_cap_fence_params)
1638 	        && pcmk__str_eq(operation, PCMK_ACTION_MONITOR, pcmk__str_casei)
1639 	        && (op->interval_ms > 0)) {
1640 	
1641 	        op_timeout = g_hash_table_lookup(params, "pcmk_monitor_timeout");
1642 	        if (op_timeout != NULL) {
1643 	            long long timeout_ms = 0;
1644 	
1645 	            if ((pcmk__parse_ms(op_timeout, &timeout_ms) == pcmk_rc_ok)
1646 	                && (timeout_ms >= 0)) {
1647 	
1648 	                op->timeout = (int) QB_MIN(timeout_ms, INT_MAX);
1649 	            }
1650 	        }
1651 	    }
1652 	
1653 	    if (!pcmk__str_eq(operation, PCMK_ACTION_STOP, pcmk__str_casei)) {
1654 	        op->params = params;
1655 	
1656 	    } else {
1657 	        rsc_history_t *entry = NULL;
1658 	
1659 	        if (lrm_state) {
1660 	            entry = g_hash_table_lookup(lrm_state->resource_history, rsc_id);
1661 	        }
1662 	
1663 	        /* If we do not have stop parameters cached, use
1664 	         * whatever we are given */
1665 	        if (!entry || !entry->stop_params) {
1666 	            op->params = params;
1667 	        } else {
1668 	            /* Copy the cached parameter list so that we stop the resource
1669 	             * with the old attributes, not the new ones */
1670 	            op->params = pcmk__strkey_table(free, free);
1671 	
1672 	            g_hash_table_foreach(params, copy_meta_keys, op->params);
1673 	            g_hash_table_foreach(entry->stop_params, copy_instance_keys, op->params);
1674 	            g_clear_pointer(&params, g_hash_table_destroy);
1675 	        }
1676 	    }
1677 	
1678 	    /* sanity */
1679 	    if (op->timeout <= 0) {
1680 	        op->timeout = op->interval_ms;
1681 	    }
1682 	    if (op->start_delay < 0) {
1683 	        op->start_delay = 0;
1684 	    }
1685 	
1686 	    transition = pcmk__xe_get(rsc_op, PCMK__XA_TRANSITION_KEY);
1687 	    CRM_CHECK(transition != NULL, return op);
1688 	
1689 	    op->user_data = pcmk__str_copy(transition);
1690 	
1691 	    if (op->interval_ms != 0) {
1692 	        if (pcmk__strcase_any_of(operation, PCMK_ACTION_START, PCMK_ACTION_STOP,
1693 	                                 NULL)) {
1694 	            pcmk__err("Start and stop actions cannot have an interval: %u",
1695 	                      op->interval_ms);
1696 	            op->interval_ms = 0;
1697 	        }
1698 	    }
1699 	
1700 	    pcmk__trace("Constructed %s op for %s: interval=%u", operation, rsc_id,
1701 	                op->interval_ms);
1702 	
1703 	    return op;
1704 	}
1705 	
1706 	/*!
1707 	 * \internal
1708 	 * \brief Send a (synthesized) event result
1709 	 *
1710 	 * Reply with a synthesized event result directly, as opposed to going through
1711 	 * the executor.
1712 	 *
1713 	 * \param[in]     to_host  Host to send result to
1714 	 * \param[in]     to_sys   IPC name to send result (NULL for transition engine)
1715 	 * \param[in]     rsc      Type information about resource the result is for
1716 	 * \param[in,out] op       Event with result to send
1717 	 * \param[in]     rsc_id   ID of resource the result is for
1718 	 */
1719 	void
1720 	controld_ack_event_directly(const char *to_host, const char *to_sys,
1721 	                            const lrmd_rsc_info_t *rsc, lrmd_event_data_t *op,
1722 	                            const char *rsc_id)
1723 	{
1724 	    xmlNode *reply = NULL;
1725 	    xmlNode *update, *iter;
1726 	    pcmk__node_status_t *peer = NULL;
1727 	
1728 	    CRM_CHECK(op != NULL, return);
1729 	    if (op->rsc_id == NULL) {
1730 	        // op->rsc_id is a (const char *) but lrmd_free_event() frees it
1731 	        pcmk__assert(rsc_id != NULL);
1732 	        op->rsc_id = pcmk__str_copy(rsc_id);
1733 	    }
1734 	    if (to_sys == NULL) {
1735 	        to_sys = CRM_SYSTEM_TENGINE;
1736 	    }
1737 	
1738 	    peer = controld_get_local_node_status();
1739 	    update = create_node_state_update(peer, controld_node_update_none, NULL,
1740 	                                      __func__);
1741 	
1742 	    iter = pcmk__xe_create(update, PCMK__XE_LRM);
1743 	    pcmk__xe_set(iter, PCMK_XA_ID, controld_globals.our_uuid);
1744 	    iter = pcmk__xe_create(iter, PCMK__XE_LRM_RESOURCES);
1745 	    iter = pcmk__xe_create(iter, PCMK__XE_LRM_RESOURCE);
1746 	
1747 	    pcmk__xe_set(iter, PCMK_XA_ID, op->rsc_id);
1748 	
1749 	    controld_add_resource_history_xml(iter, rsc, op,
1750 	                                      controld_globals.cluster->priv->node_name);
1751 	
1752 	    /* We don't have the original message ID, so use "direct-ack" (we just need
1753 	     * something non-NULL for this to create a reply)
1754 	     *
1755 	     * @TODO It would be better to use the server, message ID, and task from the
1756 	     * original request when callers have it available
1757 	     */
1758 	    reply = pcmk__new_message(pcmk_ipc_controld, "direct-ack", CRM_SYSTEM_LRMD,
1759 	                              to_host, to_sys, CRM_OP_INVOKE_LRM, update);
1760 	
1761 	    pcmk__log_xml_trace(update, "[direct ACK]");
1762 	
1763 	    pcmk__debug("ACK'ing resource op " PCMK__OP_FMT " from %s: %s", op->rsc_id,
1764 	                op->op_type, op->interval_ms, op->user_data,
1765 	                pcmk__xe_get(reply, PCMK_XA_REFERENCE));
1766 	
1767 	    if (relay_message(reply, TRUE) == FALSE) {
1768 	        pcmk__log_xml_err(reply, "Unable to route reply");
1769 	    }
1770 	
1771 	    pcmk__xml_free(update);
1772 	    pcmk__xml_free(reply);
1773 	}
1774 	
1775 	gboolean
1776 	verify_stopped(enum crmd_fsa_state cur_state, int log_level)
1777 	{
1778 	    gboolean res = TRUE;
1779 	    GList *lrm_state_list = lrm_state_get_list();
1780 	    GList *state_entry;
1781 	
1782 	    for (state_entry = lrm_state_list; state_entry != NULL; state_entry = state_entry->next) {
1783 	        lrm_state_t *lrm_state = state_entry->data;
1784 	
1785 	        if (!lrm_state_verify_stopped(lrm_state, cur_state, log_level)) {
1786 	            /* keep iterating through all even when false is returned */
1787 	            res = FALSE;
1788 	        }
1789 	    }
1790 	
1791 	    controld_set_fsa_input_flags(R_SENT_RSC_STOP);
1792 	    g_clear_pointer(&lrm_state_list, g_list_free);
1793 	    return res;
1794 	}
1795 	
1796 	struct stop_recurring_action_s {
1797 	    lrmd_rsc_info_t *rsc;
1798 	    lrm_state_t *lrm_state;
1799 	};
1800 	
1801 	static gboolean
1802 	stop_recurring_action_by_rsc(gpointer key, gpointer value, gpointer user_data)
1803 	{
1804 	    gboolean remove = FALSE;
1805 	    struct stop_recurring_action_s *event = user_data;
1806 	    active_op_t *op = value;
1807 	
1808 	    if ((op->interval_ms != 0)
1809 	        && pcmk__str_eq(op->rsc_id, event->rsc->id, pcmk__str_none)) {
1810 	
1811 	        pcmk__debug("Cancelling op %d for %s (%s)", op->call_id, op->rsc_id,
1812 	                    (const char *) key);
1813 	        remove = !cancel_op(event->lrm_state, event->rsc->id, key, op->call_id, FALSE);
1814 	    }
1815 	
1816 	    return remove;
1817 	}
1818 	
1819 	static gboolean
1820 	stop_recurring_actions(gpointer key, gpointer value, gpointer user_data)
1821 	{
1822 	    gboolean remove = FALSE;
1823 	    lrm_state_t *lrm_state = user_data;
1824 	    active_op_t *op = value;
1825 	
1826 	    if (op->interval_ms != 0) {
1827 	        pcmk__info("Cancelling op %d for %s (%s)", op->call_id, op->rsc_id,
1828 	                   (const char *) key);
1829 	        remove = !cancel_op(lrm_state, op->rsc_id, key, op->call_id, FALSE);
1830 	    }
1831 	
1832 	    return remove;
1833 	}
1834 	
1835 	/*!
1836 	 * \internal
1837 	 * \brief Check whether recurring actions should be cancelled before an action
1838 	 *
1839 	 * \param[in] rsc_id       Resource that action is for
1840 	 * \param[in] action       Action being performed
1841 	 * \param[in] interval_ms  Operation interval of \p action (in milliseconds)
1842 	 *
1843 	 * \return true if recurring actions should be cancelled, otherwise false
1844 	 */
1845 	static bool
1846 	should_cancel_recurring(const char *rsc_id, const char *action, guint interval_ms)
1847 	{
1848 	    if (is_remote_lrmd_ra(NULL, NULL, rsc_id) && (interval_ms == 0)
1849 	        && (strcmp(action, PCMK_ACTION_MIGRATE_TO) == 0)) {
1850 	        /* Don't stop monitoring a migrating Pacemaker Remote connection
1851 	         * resource until the entire migration has completed. We must detect if
1852 	         * the connection is unexpectedly severed, even during a migration.
1853 	         */
1854 	        return false;
1855 	    }
1856 	
1857 	    // Cancel recurring actions before changing resource state
1858 	    return (interval_ms == 0)
1859 	            && !pcmk__str_any_of(action, PCMK_ACTION_MONITOR,
1860 	                                 PCMK_ACTION_NOTIFY, NULL);
1861 	}
1862 	
1863 	/*!
1864 	 * \internal
1865 	 * \brief Check whether an action should not be performed at this time
1866 	 *
1867 	 * \param[in] operation  Action to be performed
1868 	 *
1869 	 * \return Readable description of why action should not be performed,
1870 	 *         or NULL if it should be performed
1871 	 */
1872 	static const char *
1873 	should_nack_action(const char *action)
1874 	{
1875 	    if (pcmk__is_set(controld_globals.fsa_input_register, R_SHUTDOWN)
1876 	        && pcmk__str_eq(action, PCMK_ACTION_START, pcmk__str_none)) {
1877 	
1878 	        controld_fsa_append(C_SHUTDOWN, I_SHUTDOWN, NULL);
1879 	        return "Not attempting start due to shutdown in progress";
1880 	    }
1881 	
1882 	    switch (controld_globals.fsa_state) {
1883 	        case S_NOT_DC:
1884 	        case S_POLICY_ENGINE:   // Recalculating
1885 	        case S_TRANSITION_ENGINE:
1886 	            break;
1887 	        default:
1888 	            if (!pcmk__str_eq(action, PCMK_ACTION_STOP, pcmk__str_none)) {
1889 	                return "Controller cannot attempt actions at this time";
1890 	            }
1891 	            break;
1892 	    }
1893 	    return NULL;
1894 	}
1895 	
1896 	static void
1897 	do_lrm_rsc_op(lrm_state_t *lrm_state, lrmd_rsc_info_t *rsc, xmlNode *msg,
1898 	              struct ra_metadata_s *md)
1899 	{
1900 	    int rc;
1901 	    int call_id = 0;
1902 	    char *op_id = NULL;
1903 	    lrmd_event_data_t *op = NULL;
1904 	    const char *transition = NULL;
1905 	    const char *operation = NULL;
1906 	    const char *nack_reason = NULL;
1907 	
1908 	    CRM_CHECK((rsc != NULL) && (msg != NULL), return);
1909 	
1910 	    operation = pcmk__xe_get(msg, PCMK_XA_OPERATION);
1911 	    CRM_CHECK(!pcmk__str_empty(operation), return);
1912 	
1913 	    transition = pcmk__xe_get(msg, PCMK__XA_TRANSITION_KEY);
1914 	    if (pcmk__str_empty(transition)) {
1915 	        pcmk__log_xml_err(msg, "Missing transition number");
1916 	    }
1917 	
1918 	    if (lrm_state == NULL) {
1919 	        // This shouldn't be possible, but provide a failsafe just in case
1920 	        pcmk__err("Cannot execute %s of %s: No executor connection "
1921 	                  QB_XS " transition_key=%s",
1922 	                  operation, rsc->id, pcmk__s(transition, ""));
1923 	        synthesize_lrmd_failure(NULL, msg, PCMK_EXEC_INVALID,
1924 	                                PCMK_OCF_UNKNOWN_ERROR,
1925 	                                "No executor connection");
1926 	        return;
1927 	    }
1928 	
1929 	    if (pcmk__str_any_of(operation, PCMK_ACTION_RELOAD,
1930 	                         PCMK_ACTION_RELOAD_AGENT, NULL)) {
1931 	        /* Pre-2.1.0 DCs will schedule reload actions only, and 2.1.0+ DCs
1932 	         * will schedule reload-agent actions only. In either case, we need
1933 	         * to map that to whatever the resource agent actually supports.
1934 	         * Default to the OCF 1.1 name.
1935 	         */
1936 	        if ((md != NULL)
1937 	            && pcmk__is_set(md->ra_flags, ra_supports_legacy_reload)) {
1938 	            operation = PCMK_ACTION_RELOAD;
1939 	        } else {
1940 	            operation = PCMK_ACTION_RELOAD_AGENT;
1941 	        }
1942 	    }
1943 	
1944 	    op = construct_op(lrm_state, msg, rsc->id, operation);
1945 	    CRM_CHECK(op != NULL, return);
1946 	
1947 	    if (should_cancel_recurring(rsc->id, operation, op->interval_ms)) {
1948 	        guint removed = 0;
1949 	        struct stop_recurring_action_s data;
1950 	
1951 	        data.rsc = rsc;
1952 	        data.lrm_state = lrm_state;
1953 	        removed = g_hash_table_foreach_remove(lrm_state->active_ops,
1954 	                                              stop_recurring_action_by_rsc,
1955 	                                              &data);
1956 	
1957 	        if (removed) {
1958 	            pcmk__debug("Stopped %u recurring operation%s in preparation for "
1959 	                        PCMK__OP_FMT,
1960 	                        removed, pcmk__plural_s(removed), rsc->id, operation,
1961 	                        op->interval_ms);
1962 	        }
1963 	    }
1964 	
1965 	    nack_reason = should_nack_action(operation);
1966 	    if (nack_reason != NULL) {
1967 	        pcmk__notice("Not requesting local execution of %s operation for %s on "
1968 	                     "%s in state %s: %s",
1969 	                     pcmk__readable_action(op->op_type, op->interval_ms),
1970 	                     rsc->id, lrm_state->node_name,
1971 	                     fsa_state2string(controld_globals.fsa_state), nack_reason);
1972 	
1973 	        lrmd__set_result(op, PCMK_OCF_UNKNOWN_ERROR, PCMK_EXEC_INVALID,
1974 	                         nack_reason);
1975 	        controld_ack_event_directly(NULL, NULL, rsc, op, rsc->id);
1976 	        lrmd_free_event(op);
1977 	        free(op_id);
1978 	        return;
1979 	    }
1980 	
1981 	    pcmk__notice("Requesting local execution of %s operation for %s on %s "
1982 	                 QB_XS " transition %s",
1983 	                 pcmk__readable_action(op->op_type, op->interval_ms), rsc->id,
1984 	                 lrm_state->node_name, pcmk__s(transition, ""));
1985 	
1986 	    controld_record_pending_op(lrm_state->node_name, rsc, op);
1987 	
1988 	    op_id = pcmk__op_key(rsc->id, op->op_type, op->interval_ms);
1989 	
1990 	    if (op->interval_ms > 0) {
1991 	        /* cancel it so we can then restart it without conflict */
1992 	        cancel_op_key(lrm_state, rsc, op_id, FALSE);
1993 	    }
1994 	
1995 	    rc = controld_execute_resource_agent(lrm_state, rsc->id, op->op_type,
1996 	                                         op->user_data, op->interval_ms,
1997 	                                         op->timeout, op->start_delay,
1998 	                                         op->params, &call_id);
1999 	    if (rc == pcmk_rc_ok) {
2000 	        /* record all operations so we can wait
2001 	         * for them to complete during shutdown
2002 	         */
2003 	        char *call_id_s = make_stop_id(rsc->id, call_id);
2004 	        active_op_t *pending = NULL;
2005 	
2006 	        pending = pcmk__assert_alloc(1, sizeof(active_op_t));
2007 	        pcmk__trace("Recording pending op: %d - %s %s", call_id, op_id,
2008 	                    call_id_s);
2009 	
2010 	        pending->call_id = call_id;
2011 	        pending->interval_ms = op->interval_ms;
2012 	        pending->op_type = pcmk__str_copy(operation);
2013 	        pending->op_key = pcmk__str_copy(op_id);
2014 	        pending->rsc_id = pcmk__str_copy(rsc->id);
2015 	        pending->start_time = time(NULL);
2016 	        pending->user_data = pcmk__str_copy(op->user_data);
2017 	        pcmk__xe_get_time(msg, PCMK_OPT_SHUTDOWN_LOCK, &(pending->lock_time));
2018 	        g_hash_table_replace(lrm_state->active_ops, call_id_s, pending);
2019 	
2020 	        if ((op->interval_ms > 0)
2021 	            && (op->start_delay > START_DELAY_THRESHOLD)) {
2022 	            int target_rc = PCMK_OCF_OK;
2023 	
2024 	            pcmk__info("Faking confirmation of %s: execution postponed for "
2025 	                       "over 5 minutes",
2026 	                       op_id);
2027 	            decode_transition_key(op->user_data, NULL, NULL, NULL, &target_rc);
2028 	            lrmd__set_result(op, target_rc, PCMK_EXEC_DONE, NULL);
2029 	            controld_ack_event_directly(NULL, NULL, rsc, op, rsc->id);
2030 	        }
2031 	
2032 	        pending->params = op->params;
2033 	        op->params = NULL;
2034 	
2035 	    } else if (lrm_state_is_local(lrm_state)) {
2036 	        pcmk__err("Could not initiate %s action for resource %s locally: %s "
2037 	                  QB_XS " rc=%d",
2038 	                  operation, rsc->id, pcmk_rc_str(rc), rc);
2039 	        fake_op_status(lrm_state, op, PCMK_EXEC_NOT_CONNECTED,
2040 	                       PCMK_OCF_UNKNOWN_ERROR, pcmk_rc_str(rc));
2041 	        process_lrm_event(lrm_state, op, NULL, NULL);
2042 	        register_fsa_error(I_FAIL, NULL);
2043 	
2044 	    } else {
2045 	        pcmk__err("Could not initiate %s action for resource %s remotely on "
2046 	                  "%s: %s " QB_XS " rc=%d",
2047 	                  operation, rsc->id, lrm_state->node_name, pcmk_rc_str(rc),
2048 	                  rc);
2049 	        fake_op_status(lrm_state, op, PCMK_EXEC_NOT_CONNECTED,
2050 	                       PCMK_OCF_UNKNOWN_ERROR, pcmk_rc_str(rc));
2051 	        process_lrm_event(lrm_state, op, NULL, NULL);
2052 	    }
2053 	
2054 	    free(op_id);
2055 	    lrmd_free_event(op);
2056 	}
2057 	
2058 	static bool
2059 	did_lrm_rsc_op_fail(lrm_state_t *lrm_state, const char * rsc_id,
2060 	                    const char * op_type, guint interval_ms)
2061 	{
2062 	    rsc_history_t *entry = NULL;
2063 	
2064 	    CRM_CHECK(lrm_state != NULL, return FALSE);
2065 	    CRM_CHECK(rsc_id != NULL, return FALSE);
2066 	    CRM_CHECK(op_type != NULL, return FALSE);
2067 	
2068 	    entry = g_hash_table_lookup(lrm_state->resource_history, rsc_id);
2069 	    if (entry == NULL || entry->failed == NULL) {
2070 	        return FALSE;
2071 	    }
2072 	
2073 	    if (pcmk__str_eq(entry->failed->rsc_id, rsc_id, pcmk__str_none)
2074 	        && pcmk__str_eq(entry->failed->op_type, op_type, pcmk__str_casei)
2075 	        && entry->failed->interval_ms == interval_ms) {
2076 	        return TRUE;
2077 	    }
2078 	
2079 	    return FALSE;
2080 	}
2081 	
2082 	/*!
2083 	 * \internal
2084 	 * \brief Log the result of an executor action (actual or synthesized)
2085 	 *
2086 	 * \param[in] op         Executor action to log result for
2087 	 * \param[in] op_key     Operation key for action
2088 	 * \param[in] node_name  Name of node action was performed on, if known
2089 	 * \param[in] confirmed  Whether to log that graph action was confirmed
2090 	 */
2091 	static void
2092 	log_executor_event(const lrmd_event_data_t *op, const char *op_key,
2093 	                   const char *node_name, gboolean confirmed)
2094 	{
2095 	    int log_level = LOG_ERR;
2096 	    GString *str = g_string_sized_new(100); // reasonable starting size
2097 	
2098 	    pcmk__g_strcat(str,
2099 	                   "Result of ",
2100 	                   pcmk__readable_action(op->op_type, op->interval_ms),
2101 	                   " operation for ", op->rsc_id, NULL);
2102 	
2103 	    if (node_name != NULL) {
2104 	        pcmk__g_strcat(str, " on ", node_name, NULL);
2105 	    }
2106 	
2107 	    switch (op->op_status) {
2108 	        case PCMK_EXEC_DONE:
2109 	            log_level = LOG_NOTICE;
2110 	            pcmk__g_strcat(str, ": ", crm_exit_str((crm_exit_t) op->rc), NULL);
2111 	            break;
2112 	
2113 	        case PCMK_EXEC_TIMEOUT:
2114 	            pcmk__g_strcat(str,
2115 	                           ": ", pcmk_exec_status_str(op->op_status), " after ",
2116 	                           pcmk__readable_interval(op->timeout), NULL);
2117 	            break;
2118 	
2119 	        case PCMK_EXEC_CANCELLED:
2120 	            log_level = LOG_INFO;
2121 	            pcmk__g_strcat(str, ": ", pcmk_exec_status_str(op->op_status),
2122 	                           NULL);
2123 	            break;
2124 	
2125 	        default:
2126 	            pcmk__g_strcat(str, ": ", pcmk_exec_status_str(op->op_status),
2127 	                           NULL);
2128 	            break;
2129 	    }
2130 	
2131 	    if ((op->exit_reason != NULL)
2132 	        && ((op->op_status != PCMK_EXEC_DONE) || (op->rc != PCMK_OCF_OK))) {
2133 	
2134 	        pcmk__g_strcat(str, " (", op->exit_reason, ")", NULL);
2135 	    }
2136 	
2137 	    g_string_append(str, " " QB_XS);
2138 	    g_string_append_printf(str, " graph action %sconfirmed; call=%d key=%s",
2139 	                           (confirmed? "" : "un"), op->call_id, op_key);
2140 	    if (op->op_status == PCMK_EXEC_DONE) {
2141 	        g_string_append_printf(str, " rc=%d", op->rc);
2142 	    }
2143 	
2144 	    do_crm_log(log_level, "%s", str->str);
2145 	    g_string_free(str, TRUE);
2146 	
2147 	    /* The services library has already logged the output at info or debug
2148 	     * level, so just raise to notice if it looks like a failure.
2149 	     */
2150 	    if ((op->output != NULL) && (op->rc != PCMK_OCF_OK)) {
2151 	        char *prefix = pcmk__assert_asprintf(PCMK__OP_FMT "@%s output",
2152 	                                             op->rsc_id, op->op_type,
2153 	                                             op->interval_ms, node_name);
2154 	
2155 	        crm_log_output(LOG_NOTICE, prefix, op->output);
2156 	        free(prefix);
2157 	    }
2158 	}
2159 	
2160 	void
2161 	process_lrm_event(lrm_state_t *lrm_state, lrmd_event_data_t *op,
2162 	                  active_op_t *pending, const xmlNode *action_xml)
2163 	{
2164 	    char *op_id = NULL;
2165 	    char *op_key = NULL;
2166 	
2167 	    gboolean remove = FALSE;
2168 	    gboolean removed = FALSE;
2169 	    bool need_direct_ack = FALSE;
2170 	    lrmd_rsc_info_t *rsc = NULL;
2171 	    const char *node_name = NULL;
2172 	
2173 	    CRM_CHECK(op != NULL, return);
2174 	    CRM_CHECK(op->rsc_id != NULL, return);
2175 	
2176 	    // Remap new status codes for older DCs
2177 	    if (pcmk__compare_versions(controld_globals.dc_version, "3.2.0") < 0) {
2178 	        switch (op->op_status) {
2179 	            case PCMK_EXEC_NOT_CONNECTED:
2180 	                lrmd__set_result(op, PCMK_OCF_CONNECTION_DIED,
2181 	                                 PCMK_EXEC_ERROR, op->exit_reason);
2182 	                break;
2183 	            case PCMK_EXEC_INVALID:
2184 	                lrmd__set_result(op, CRM_DIRECT_NACK_RC, PCMK_EXEC_ERROR,
2185 	                                 op->exit_reason);
2186 	                break;
2187 	            default:
2188 	                break;
2189 	        }
2190 	    }
2191 	
2192 	    op_id = make_stop_id(op->rsc_id, op->call_id);
2193 	    op_key = pcmk__op_key(op->rsc_id, op->op_type, op->interval_ms);
2194 	
2195 	    // Get resource info if available (from executor state or action XML)
2196 	    if (lrm_state) {
2197 	        rsc = lrm_state_get_rsc_info(lrm_state, op->rsc_id, 0);
2198 	    }
2199 	    if ((rsc == NULL) && action_xml) {
2200 	        xmlNode *xml = pcmk__xe_first_child(action_xml, PCMK_XE_PRIMITIVE, NULL,
2201 	                                            NULL);
2202 	
2203 	        const char *standard = pcmk__xe_get(xml, PCMK_XA_CLASS);
2204 	        const char *provider = pcmk__xe_get(xml, PCMK_XA_PROVIDER);
2205 	        const char *type = pcmk__xe_get(xml, PCMK_XA_TYPE);
2206 	
2207 	        if (standard && type) {
2208 	            pcmk__info("%s agent information not cached, using %s%s%s:%s from "
2209 	                       "action XML",
2210 	                       op->rsc_id, standard, (provider? ":" : ""),
2211 	                       pcmk__s(provider, ""), type);
2212 	            rsc = lrmd_new_rsc_info(op->rsc_id, standard, provider, type);
2213 	        } else {
2214 	            pcmk__err("Can't process %s result because %s agent information "
2215 	                      "not cached or in XML",
2216 	                      op_key, op->rsc_id);
2217 	        }
2218 	    }
2219 	
2220 	    // Get node name if available (from executor state or action XML)
2221 	    if (lrm_state) {
2222 	        node_name = lrm_state->node_name;
2223 	    } else if (action_xml) {
2224 	        node_name = pcmk__xe_get(action_xml, PCMK__META_ON_NODE);
2225 	    }
2226 	
2227 	    if(pending == NULL) {
2228 	        remove = TRUE;
2229 	        if (lrm_state) {
2230 	            pending = g_hash_table_lookup(lrm_state->active_ops, op_id);
2231 	        }
2232 	    }
2233 	
2234 	    if (op->op_status == PCMK_EXEC_ERROR) {
2235 	        switch(op->rc) {
2236 	            case PCMK_OCF_NOT_RUNNING:
2237 	            case PCMK_OCF_RUNNING_PROMOTED:
2238 	            case PCMK_OCF_DEGRADED:
2239 	            case PCMK_OCF_DEGRADED_PROMOTED:
2240 	                // Leave it to the TE/scheduler to decide if this is an error
2241 	                op->op_status = PCMK_EXEC_DONE;
2242 	                break;
2243 	            default:
2244 	                /* Nothing to do */
2245 	                break;
2246 	        }
2247 	    }
2248 	
2249 	    if (op->op_status != PCMK_EXEC_CANCELLED) {
2250 	        /* We might not record the result, so directly acknowledge it to the
2251 	         * originator instead, so it doesn't time out waiting for the result
2252 	         * (especially important if part of a transition).
2253 	         */
2254 	        need_direct_ack = TRUE;
2255 	
2256 	        if (controld_action_is_recordable(op->op_type)) {
2257 	            if (node_name && rsc) {
2258 	                // We should record the result, and happily, we can
2259 	                time_t lock_time = (pending == NULL)? 0 : pending->lock_time;
2260 	
2261 	                controld_update_resource_history(node_name, rsc, op, lock_time);
2262 	                need_direct_ack = FALSE;
2263 	
2264 	            } else if (op->rsc_deleted) {
2265 	                /* We shouldn't record the result (likely the resource was
2266 	                 * refreshed, cleaned, or removed while this operation was
2267 	                 * in flight).
2268 	                 */
2269 	                pcmk__notice("Not recording %s result in CIB because resource "
2270 	                             "information was removed since it was initiated",
2271 	                             op_key);
2272 	            } else {
2273 	                /* This shouldn't be possible; the executor didn't consider the
2274 	                 * resource deleted, but we couldn't find resource or node
2275 	                 * information.
2276 	                 */
2277 	                const char *missing = "node name";
2278 	
2279 	                if (node_name != NULL) {
2280 	                    missing = "resource information";
2281 	                }
2282 	                pcmk__err("Unable to record %s result in CIB: No %s", op_key,
2283 	                          missing);
2284 	            }
2285 	        }
2286 	
2287 	    } else if (op->interval_ms == 0) {
2288 	        /* A non-recurring operation was cancelled. Most likely, the
2289 	         * never-initiated action was removed from the executor's pending
2290 	         * operations list upon resource removal.
2291 	         */
2292 	        need_direct_ack = TRUE;
2293 	
2294 	    } else if (pending == NULL) {
2295 	        /* This recurring operation was cancelled, but was not pending. No
2296 	         * transition actions are waiting on it, nothing needs to be done.
2297 	         */
2298 	
2299 	    } else if (op->user_data == NULL) {
2300 	        /* This recurring operation was cancelled and pending, but we don't
2301 	         * have a transition key. This should never happen.
2302 	         */
2303 	        pcmk__err("Recurring operation %s was cancelled without transition "
2304 	                  "information",
2305 	                  op_key);
2306 	
2307 	    } else if (pcmk__is_set(pending->flags, active_op_remove)) {
2308 	        /* This recurring operation was cancelled (by us) and pending, and we
2309 	         * have been waiting for it to finish.
2310 	         */
2311 	        if (lrm_state) {
2312 	            controld_delete_action_history(op);
2313 	        }
2314 	
2315 	        /* Directly acknowledge failed recurring actions here. The above call to
2316 	         * controld_delete_action_history() will not erase any corresponding
2317 	         * last_failure entry, which means that the DC won't confirm the
2318 	         * cancellation via process_op_deletion(), and the transition would
2319 	         * otherwise wait for the action timer to pop.
2320 	         */
2321 	        if (did_lrm_rsc_op_fail(lrm_state, pending->rsc_id,
2322 	                                pending->op_type, pending->interval_ms)) {
2323 	            need_direct_ack = TRUE;
2324 	        }
2325 	
2326 	    } else if (op->rsc_deleted) {
2327 	        /* This recurring operation was cancelled (but not by us, and the
2328 	         * executor does not have resource information, likely due to resource
2329 	         * cleanup, refresh, or removal) and pending.
2330 	         */
2331 	        pcmk__debug("Recurring op %s was cancelled due to resource deletion",
2332 	                    op_key);
2333 	        need_direct_ack = TRUE;
2334 	
2335 	    } else {
2336 	        /* This recurring operation was cancelled (but not by us, likely by the
2337 	         * executor before stopping the resource) and pending. We don't need to
2338 	         * do anything special.
2339 	         */
2340 	    }
2341 	
2342 	    if (need_direct_ack) {
2343 	        controld_ack_event_directly(NULL, NULL, NULL, op, op->rsc_id);
2344 	    }
2345 	
2346 	    if(remove == FALSE) {
2347 	        /* The caller will do this afterwards, but keep the logging consistent */
2348 	        removed = TRUE;
2349 	
2350 	    } else if (lrm_state && ((op->interval_ms == 0)
2351 	                             || (op->op_status == PCMK_EXEC_CANCELLED))) {
2352 	
2353 	        gboolean found = g_hash_table_remove(lrm_state->active_ops, op_id);
2354 	
2355 	        if (op->interval_ms != 0) {
2356 	            removed = TRUE;
2357 	        } else if (found) {
2358 	            removed = TRUE;
2359 	            pcmk__trace("Op %s (call=%d, stop-id=%s, remaining=%u): Confirmed",
2360 	                        op_key, op->call_id, op_id,
2361 	                        g_hash_table_size(lrm_state->active_ops));
2362 	        }
2363 	    }
2364 	
2365 	    log_executor_event(op, op_key, node_name, removed);
2366 	
2367 	    if (lrm_state) {
2368 	        if (!pcmk__str_eq(op->op_type, PCMK_ACTION_META_DATA,
2369 	                          pcmk__str_casei)) {
2370 	            crmd_alert_resource_op(lrm_state->node_name, op);
2371 	        } else if (rsc && (op->rc == PCMK_OCF_OK)) {
2372 	            controld_cache_metadata(lrm_state->metadata_cache, rsc, op->output);
2373 	        }
2374 	    }
2375 	
2376 	    if (op->rsc_deleted) {
2377 	        pcmk__info("Deletion of resource '%s' complete after %s", op->rsc_id,
2378 	                   op_key);
2379 	        if (lrm_state) {
2380 	            delete_rsc_entry(lrm_state, NULL, op->rsc_id, NULL, pcmk_ok, NULL,
2381 	                             true);
2382 	        }
2383 	    }
2384 	
2385 	    /* If a shutdown was escalated while operations were pending,
2386 	     * then the FSA will be stalled right now... allow it to continue
2387 	     */
2388 	    controld_trigger_fsa();
2389 	    if (lrm_state && rsc) {
2390 	        update_history_cache(lrm_state, rsc, op);
2391 	    }
2392 	
2393 	    lrmd_free_rsc_info(rsc);
2394 	    free(op_key);
2395 	    free(op_id);
2396 	}
2397