1    	/*
2    	 * Copyright 2012-2023 the Pacemaker project contributors
3    	 *
4    	 * The version control history for this file may have further details.
5    	 *
6    	 * This source code is licensed under the GNU General Public License version 2
7    	 * or later (GPLv2+) WITHOUT ANY WARRANTY.
8    	 */
9    	
10   	#include <crm_internal.h>
11   	
12   	#include <errno.h>
13   	
14   	#include <crm/crm.h>
15   	#include <crm/msg_xml.h>
16   	#include <crm/common/iso8601.h>
17   	#include <crm/pengine/rules.h>
18   	#include <crm/pengine/rules_internal.h>
19   	#include <crm/lrmd_internal.h>
20   	
21   	#include <pacemaker-internal.h>
22   	#include <pacemaker-controld.h>
23   	
24   	static GHashTable *lrm_state_table = NULL;
25   	extern GHashTable *proxy_table;
26   	int lrmd_internal_proxy_send(lrmd_t * lrmd, xmlNode *msg);
27   	void lrmd_internal_set_proxy_callback(lrmd_t * lrmd, void *userdata, void (*callback)(lrmd_t *lrmd, void *userdata, xmlNode *msg));
28   	
29   	static void
30   	free_rsc_info(gpointer value)
31   	{
32   	    lrmd_rsc_info_t *rsc_info = value;
33   	
34   	    lrmd_free_rsc_info(rsc_info);
35   	}
36   	
37   	static void
38   	free_deletion_op(gpointer value)
39   	{
40   	    struct pending_deletion_op_s *op = value;
41   	
42   	    free(op->rsc);
43   	    delete_ha_msg_input(op->input);
44   	    free(op);
45   	}
46   	
47   	static void
48   	free_recurring_op(gpointer value)
49   	{
50   	    active_op_t *op = value;
51   	
52   	    free(op->user_data);
53   	    free(op->rsc_id);
54   	    free(op->op_type);
55   	    free(op->op_key);
56   	    if (op->params) {
57   	        g_hash_table_destroy(op->params);
58   	    }
59   	    free(op);
60   	}
61   	
62   	static gboolean
63   	fail_pending_op(gpointer key, gpointer value, gpointer user_data)
64   	{
65   	    lrmd_event_data_t event = { 0, };
66   	    lrm_state_t *lrm_state = user_data;
67   	    active_op_t *op = value;
68   	
69   	    crm_trace("Pre-emptively failing " PCMK__OP_FMT " on %s (call=%s, %s)",
70   	              op->rsc_id, op->op_type, op->interval_ms,
71   	              lrm_state->node_name, (char*)key, op->user_data);
72   	
73   	    event.type = lrmd_event_exec_complete;
74   	    event.rsc_id = op->rsc_id;
75   	    event.op_type = op->op_type;
76   	    event.user_data = op->user_data;
77   	    event.timeout = 0;
78   	    event.interval_ms = op->interval_ms;
79   	    lrmd__set_result(&event, PCMK_OCF_UNKNOWN_ERROR, PCMK_EXEC_NOT_CONNECTED,
80   	                     "Action was pending when executor connection was dropped");
81   	    event.t_run = (unsigned int) op->start_time;
CID (unavailable; MK=9619ad64c2b49cc1602ab65502dc9e59) (#2 of 2): Use of 32-bit time_t (Y2K38_SAFETY):
(1) Event store_truncates_time_t: A "time_t" value is stored in an integer with too few bits to accommodate it. The expression "op->start_time" is cast to "unsigned int".
82   	    event.t_rcchange = (unsigned int) op->start_time;
83   	
84   	    event.call_id = op->call_id;
85   	    event.remote_nodename = lrm_state->node_name;
86   	    event.params = op->params;
87   	
88   	    process_lrm_event(lrm_state, &event, op, NULL);
89   	    lrmd__reset_result(&event);
90   	    return TRUE;
91   	}
92   	
93   	gboolean
94   	lrm_state_is_local(lrm_state_t *lrm_state)
95   	{
96   	    return (lrm_state != NULL)
97   	           && pcmk__str_eq(lrm_state->node_name, controld_globals.our_nodename,
98   	                           pcmk__str_casei);
99   	}
100  	
101  	/*!
102  	 * \internal
103  	 * \brief Create executor state entry for a node and add it to the state table
104  	 *
105  	 * \param[in]  node_name  Node to create entry for
106  	 *
107  	 * \return Newly allocated executor state object initialized for \p node_name
108  	 */
109  	static lrm_state_t *
110  	lrm_state_create(const char *node_name)
111  	{
112  	    lrm_state_t *state = NULL;
113  	
114  	    if (!node_name) {
115  	        crm_err("No node name given for lrm state object");
116  	        return NULL;
117  	    }
118  	
119  	    state = calloc(1, sizeof(lrm_state_t));
120  	    if (!state) {
121  	        return NULL;
122  	    }
123  	
124  	    state->node_name = strdup(node_name);
125  	    state->rsc_info_cache = pcmk__strkey_table(NULL, free_rsc_info);
126  	    state->deletion_ops = pcmk__strkey_table(free, free_deletion_op);
127  	    state->active_ops = pcmk__strkey_table(free, free_recurring_op);
128  	    state->resource_history = pcmk__strkey_table(NULL, history_free);
129  	    state->metadata_cache = metadata_cache_new();
130  	
131  	    g_hash_table_insert(lrm_state_table, (char *)state->node_name, state);
132  	    return state;
133  	}
134  	
135  	static gboolean
136  	remote_proxy_remove_by_node(gpointer key, gpointer value, gpointer user_data)
137  	{
138  	    remote_proxy_t *proxy = value;
139  	    const char *node_name = user_data;
140  	
141  	    if (pcmk__str_eq(node_name, proxy->node_name, pcmk__str_casei)) {
142  	        return TRUE;
143  	    }
144  	
145  	    return FALSE;
146  	}
147  	
148  	static remote_proxy_t *
149  	find_connected_proxy_by_node(const char * node_name)
150  	{
151  	    GHashTableIter gIter;
152  	    remote_proxy_t *proxy = NULL;
153  	
154  	    CRM_CHECK(proxy_table != NULL, return NULL);
155  	
156  	    g_hash_table_iter_init(&gIter, proxy_table);
157  	
158  	    while (g_hash_table_iter_next(&gIter, NULL, (gpointer *) &proxy)) {
159  	        if (proxy->source
160  	            && pcmk__str_eq(node_name, proxy->node_name, pcmk__str_casei)) {
161  	            return proxy;
162  	        }
163  	    }
164  	
165  	    return NULL;
166  	}
167  	
168  	static void
169  	remote_proxy_disconnect_by_node(const char * node_name)
170  	{
171  	    remote_proxy_t *proxy = NULL;
172  	
173  	    CRM_CHECK(proxy_table != NULL, return);
174  	
175  	    while ((proxy = find_connected_proxy_by_node(node_name)) != NULL) {
176  	        /* mainloop_del_ipc_client() eventually calls remote_proxy_disconnected()
177  	         * , which removes the entry from proxy_table.
178  	         * Do not do this in a g_hash_table_iter_next() loop. */
179  	        if (proxy->source) {
180  	            mainloop_del_ipc_client(proxy->source);
181  	        }
182  	    }
183  	
184  	    return;
185  	}
186  	
187  	static void
188  	internal_lrm_state_destroy(gpointer data)
189  	{
190  	    lrm_state_t *lrm_state = data;
191  	
192  	    if (!lrm_state) {
193  	        return;
194  	    }
195  	
196  	    /* Rather than directly remove the recorded proxy entries from proxy_table,
197  	     * make sure any connected proxies get disconnected. So that
198  	     * remote_proxy_disconnected() will be called and as well remove the
199  	     * entries from proxy_table.
200  	     */
201  	    remote_proxy_disconnect_by_node(lrm_state->node_name);
202  	
203  	    crm_trace("Destroying proxy table %s with %u members",
204  	              lrm_state->node_name, g_hash_table_size(proxy_table));
205  	    // Just in case there's still any leftovers in proxy_table
206  	    g_hash_table_foreach_remove(proxy_table, remote_proxy_remove_by_node, (char *) lrm_state->node_name);
207  	    remote_ra_cleanup(lrm_state);
208  	    lrmd_api_delete(lrm_state->conn);
209  	
210  	    if (lrm_state->rsc_info_cache) {
211  	        crm_trace("Destroying rsc info cache with %u members",
212  	                  g_hash_table_size(lrm_state->rsc_info_cache));
213  	        g_hash_table_destroy(lrm_state->rsc_info_cache);
214  	    }
215  	    if (lrm_state->resource_history) {
216  	        crm_trace("Destroying history op cache with %u members",
217  	                  g_hash_table_size(lrm_state->resource_history));
218  	        g_hash_table_destroy(lrm_state->resource_history);
219  	    }
220  	    if (lrm_state->deletion_ops) {
221  	        crm_trace("Destroying deletion op cache with %u members",
222  	                  g_hash_table_size(lrm_state->deletion_ops));
223  	        g_hash_table_destroy(lrm_state->deletion_ops);
224  	    }
225  	    if (lrm_state->active_ops != NULL) {
226  	        crm_trace("Destroying pending op cache with %u members",
227  	                  g_hash_table_size(lrm_state->active_ops));
228  	        g_hash_table_destroy(lrm_state->active_ops);
229  	    }
230  	    metadata_cache_free(lrm_state->metadata_cache);
231  	
232  	    free((char *)lrm_state->node_name);
233  	    free(lrm_state);
234  	}
235  	
236  	void
237  	lrm_state_reset_tables(lrm_state_t * lrm_state, gboolean reset_metadata)
238  	{
239  	    if (lrm_state->resource_history) {
240  	        crm_trace("Resetting resource history cache with %u members",
241  	                  g_hash_table_size(lrm_state->resource_history));
242  	        g_hash_table_remove_all(lrm_state->resource_history);
243  	    }
244  	    if (lrm_state->deletion_ops) {
245  	        crm_trace("Resetting deletion operations cache with %u members",
246  	                  g_hash_table_size(lrm_state->deletion_ops));
247  	        g_hash_table_remove_all(lrm_state->deletion_ops);
248  	    }
249  	    if (lrm_state->active_ops != NULL) {
250  	        crm_trace("Resetting active operations cache with %u members",
251  	                  g_hash_table_size(lrm_state->active_ops));
252  	        g_hash_table_remove_all(lrm_state->active_ops);
253  	    }
254  	    if (lrm_state->rsc_info_cache) {
255  	        crm_trace("Resetting resource information cache with %u members",
256  	                  g_hash_table_size(lrm_state->rsc_info_cache));
257  	        g_hash_table_remove_all(lrm_state->rsc_info_cache);
258  	    }
259  	    if (reset_metadata) {
260  	        metadata_cache_reset(lrm_state->metadata_cache);
261  	    }
262  	}
263  	
264  	gboolean
265  	lrm_state_init_local(void)
266  	{
267  	    if (lrm_state_table) {
268  	        return TRUE;
269  	    }
270  	
271  	    lrm_state_table = pcmk__strikey_table(NULL, internal_lrm_state_destroy);
272  	    if (!lrm_state_table) {
273  	        return FALSE;
274  	    }
275  	
276  	    proxy_table = pcmk__strikey_table(NULL, remote_proxy_free);
277  	    if (!proxy_table) {
278  	        g_hash_table_destroy(lrm_state_table);
279  	        lrm_state_table = NULL;
280  	        return FALSE;
281  	    }
282  	
283  	    return TRUE;
284  	}
285  	
286  	void
287  	lrm_state_destroy_all(void)
288  	{
289  	    if (lrm_state_table) {
290  	        crm_trace("Destroying state table with %u members",
291  	                  g_hash_table_size(lrm_state_table));
292  	        g_hash_table_destroy(lrm_state_table); lrm_state_table = NULL;
293  	    }
294  	    if(proxy_table) {
295  	        crm_trace("Destroying proxy table with %u members",
296  	                  g_hash_table_size(proxy_table));
297  	        g_hash_table_destroy(proxy_table); proxy_table = NULL;
298  	    }
299  	}
300  	
301  	lrm_state_t *
302  	lrm_state_find(const char *node_name)
303  	{
304  	    if ((node_name == NULL) || (lrm_state_table == NULL)) {
305  	        return NULL;
306  	    }
307  	    return g_hash_table_lookup(lrm_state_table, node_name);
308  	}
309  	
310  	lrm_state_t *
311  	lrm_state_find_or_create(const char *node_name)
312  	{
313  	    lrm_state_t *lrm_state;
314  	
315  	    CRM_CHECK(lrm_state_table != NULL, return NULL);
316  	
317  	    lrm_state = g_hash_table_lookup(lrm_state_table, node_name);
318  	    if (!lrm_state) {
319  	        lrm_state = lrm_state_create(node_name);
320  	    }
321  	
322  	    return lrm_state;
323  	}
324  	
325  	GList *
326  	lrm_state_get_list(void)
327  	{
328  	    if (lrm_state_table == NULL) {
329  	        return NULL;
330  	    }
331  	    return g_hash_table_get_values(lrm_state_table);
332  	}
333  	
334  	void
335  	lrm_state_disconnect_only(lrm_state_t * lrm_state)
336  	{
337  	    int removed = 0;
338  	
339  	    if (!lrm_state->conn) {
340  	        return;
341  	    }
342  	    crm_trace("Disconnecting %s", lrm_state->node_name);
343  	
344  	    remote_proxy_disconnect_by_node(lrm_state->node_name);
345  	
346  	    ((lrmd_t *) lrm_state->conn)->cmds->disconnect(lrm_state->conn);
347  	
348  	    if (!pcmk_is_set(controld_globals.fsa_input_register, R_SHUTDOWN)) {
349  	        removed = g_hash_table_foreach_remove(lrm_state->active_ops,
350  	                                              fail_pending_op, lrm_state);
351  	        crm_trace("Synthesized %d operation failures for %s", removed, lrm_state->node_name);
352  	    }
353  	}
354  	
355  	void
356  	lrm_state_disconnect(lrm_state_t * lrm_state)
357  	{
358  	    if (!lrm_state->conn) {
359  	        return;
360  	    }
361  	
362  	    lrm_state_disconnect_only(lrm_state);
363  	
364  	    lrmd_api_delete(lrm_state->conn);
365  	    lrm_state->conn = NULL;
366  	}
367  	
368  	int
369  	lrm_state_is_connected(lrm_state_t * lrm_state)
370  	{
371  	    if (!lrm_state->conn) {
372  	        return FALSE;
373  	    }
374  	    return ((lrmd_t *) lrm_state->conn)->cmds->is_connected(lrm_state->conn);
375  	}
376  	
377  	int
378  	lrm_state_poke_connection(lrm_state_t * lrm_state)
379  	{
380  	
381  	    if (!lrm_state->conn) {
382  	        return -ENOTCONN;
383  	    }
384  	    return ((lrmd_t *) lrm_state->conn)->cmds->poke_connection(lrm_state->conn);
385  	}
386  	
387  	// \return Standard Pacemaker return code
388  	int
389  	controld_connect_local_executor(lrm_state_t *lrm_state)
390  	{
391  	    int rc = pcmk_rc_ok;
392  	
393  	    if (lrm_state->conn == NULL) {
394  	        lrmd_t *api = NULL;
395  	
396  	        rc = lrmd__new(&api, NULL, NULL, 0);
397  	        if (rc != pcmk_rc_ok) {
398  	            return rc;
399  	        }
400  	        api->cmds->set_callback(api, lrm_op_callback);
401  	        lrm_state->conn = api;
402  	    }
403  	
404  	    rc = ((lrmd_t *) lrm_state->conn)->cmds->connect(lrm_state->conn,
405  	                                                     CRM_SYSTEM_CRMD, NULL);
406  	    rc = pcmk_legacy2rc(rc);
407  	
408  	    if (rc == pcmk_rc_ok) {
409  	        lrm_state->num_lrm_register_fails = 0;
410  	    } else {
411  	        lrm_state->num_lrm_register_fails++;
412  	    }
413  	    return rc;
414  	}
415  	
416  	static remote_proxy_t *
417  	crmd_remote_proxy_new(lrmd_t *lrmd, const char *node_name, const char *session_id, const char *channel)
418  	{
419  	    struct ipc_client_callbacks proxy_callbacks = {
420  	        .dispatch = remote_proxy_dispatch,
421  	        .destroy = remote_proxy_disconnected
422  	    };
423  	    remote_proxy_t *proxy = remote_proxy_new(lrmd, &proxy_callbacks, node_name,
424  	                                             session_id, channel);
425  	    return proxy;
426  	}
427  	
428  	gboolean
429  	crmd_is_proxy_session(const char *session)
430  	{
431  	    return g_hash_table_lookup(proxy_table, session) ? TRUE : FALSE;
432  	}
433  	
434  	void
435  	crmd_proxy_send(const char *session, xmlNode *msg)
436  	{
437  	    remote_proxy_t *proxy = g_hash_table_lookup(proxy_table, session);
438  	    lrm_state_t *lrm_state = NULL;
439  	
440  	    if (!proxy) {
441  	        return;
442  	    }
443  	    crm_log_xml_trace(msg, "to-proxy");
444  	    lrm_state = lrm_state_find(proxy->node_name);
445  	    if (lrm_state) {
446  	        crm_trace("Sending event to %.8s on %s", proxy->session_id, proxy->node_name);
447  	        remote_proxy_relay_event(proxy, msg);
448  	    }
449  	}
450  	
451  	static void
452  	crmd_proxy_dispatch(const char *session, xmlNode *msg)
453  	{
454  	    crm_trace("Processing proxied IPC message from session %s", session);
455  	    crm_log_xml_trace(msg, "controller[inbound]");
456  	    crm_xml_add(msg, F_CRM_SYS_FROM, session);
457  	    if (controld_authorize_ipc_message(msg, NULL, session)) {
458  	        route_message(C_IPC_MESSAGE, msg);
459  	    }
460  	    controld_trigger_fsa();
461  	}
462  	
463  	static void
464  	remote_config_check(xmlNode * msg, int call_id, int rc, xmlNode * output, void *user_data)
465  	{
466  	    if (rc != pcmk_ok) {
467  	        crm_err("Query resulted in an error: %s", pcmk_strerror(rc));
468  	
469  	        if (rc == -EACCES || rc == -pcmk_err_schema_validation) {
470  	            crm_err("The cluster is mis-configured - shutting down and staying down");
471  	        }
472  	
473  	    } else {
474  	        lrmd_t * lrmd = (lrmd_t *)user_data;
475  	        crm_time_t *now = crm_time_new(NULL);
476  	        GHashTable *config_hash = pcmk__strkey_table(free, free);
477  	
478  	        crm_debug("Call %d : Parsing CIB options", call_id);
479  	
480  	        pe_unpack_nvpairs(output, output, XML_CIB_TAG_PROPSET, NULL,
481  	                          config_hash, CIB_OPTIONS_FIRST, FALSE, now, NULL);
482  	
483  	        /* Now send it to the remote peer */
484  	        lrmd__validate_remote_settings(lrmd, config_hash);
485  	
486  	        g_hash_table_destroy(config_hash);
487  	        crm_time_free(now);
488  	    }
489  	}
490  	
491  	static void
492  	crmd_remote_proxy_cb(lrmd_t *lrmd, void *userdata, xmlNode *msg)
493  	{
494  	    lrm_state_t *lrm_state = userdata;
495  	    const char *session = crm_element_value(msg, F_LRMD_IPC_SESSION);
496  	    remote_proxy_t *proxy = g_hash_table_lookup(proxy_table, session);
497  	
498  	    const char *op = crm_element_value(msg, F_LRMD_IPC_OP);
499  	    if (pcmk__str_eq(op, LRMD_IPC_OP_NEW, pcmk__str_casei)) {
500  	        const char *channel = crm_element_value(msg, F_LRMD_IPC_IPC_SERVER);
501  	
502  	        proxy = crmd_remote_proxy_new(lrmd, lrm_state->node_name, session, channel);
503  	        if (!remote_ra_controlling_guest(lrm_state)) {
504  	            if (proxy != NULL) {
505  	                cib_t *cib_conn = controld_globals.cib_conn;
506  	
507  	                /* Look up stonith-watchdog-timeout and send to the remote peer for validation */
508  	                int rc = cib_conn->cmds->query(cib_conn, XML_CIB_TAG_CRMCONFIG,
509  	                                               NULL, cib_scope_local);
510  	                cib_conn->cmds->register_callback_full(cib_conn, rc, 10, FALSE,
511  	                                                       lrmd,
512  	                                                       "remote_config_check",
513  	                                                       remote_config_check,
514  	                                                       NULL);
515  	            }
516  	        } else {
517  	            crm_debug("Skipping remote_config_check for guest-nodes");
518  	        }
519  	
520  	    } else if (pcmk__str_eq(op, LRMD_IPC_OP_SHUTDOWN_REQ, pcmk__str_casei)) {
521  	        char *now_s = NULL;
522  	
523  	        crm_notice("%s requested shutdown of its remote connection",
524  	                   lrm_state->node_name);
525  	
526  	        if (!remote_ra_is_in_maintenance(lrm_state)) {
527  	            now_s = pcmk__ttoa(time(NULL));
528  	            update_attrd(lrm_state->node_name, XML_CIB_ATTR_SHUTDOWN, now_s, NULL, TRUE);
529  	            free(now_s);
530  	
531  	            remote_proxy_ack_shutdown(lrmd);
532  	
533  	            crm_warn("Reconnection attempts to %s may result in failures that must be cleared",
534  	                    lrm_state->node_name);
535  	        } else {
536  	            remote_proxy_nack_shutdown(lrmd);
537  	
538  	            crm_notice("Remote resource for %s is not managed so no ordered shutdown happening",
539  	                    lrm_state->node_name);
540  	        }
541  	        return;
542  	
543  	    } else if (pcmk__str_eq(op, LRMD_IPC_OP_REQUEST, pcmk__str_casei) && proxy && proxy->is_local) {
544  	        /* This is for the controller, which we are, so don't try
545  	         * to send to ourselves over IPC -- do it directly.
546  	         */
547  	        int flags = 0;
548  	        xmlNode *request = get_message_xml(msg, F_LRMD_IPC_MSG);
549  	
550  	        CRM_CHECK(request != NULL, return);
551  	        CRM_CHECK(lrm_state->node_name, return);
552  	        crm_xml_add(request, XML_ACL_TAG_ROLE, "pacemaker-remote");
553  	        pcmk__update_acl_user(request, F_LRMD_IPC_USER, lrm_state->node_name);
554  	
555  	        /* Pacemaker Remote nodes don't know their own names (as known to the
556  	         * cluster). When getting a node info request with no name or ID, add
557  	         * the name, so we don't return info for ourselves instead of the
558  	         * Pacemaker Remote node.
559  	         */
560  	        if (pcmk__str_eq(crm_element_value(request, F_CRM_TASK), CRM_OP_NODE_INFO, pcmk__str_casei)) {
561  	            int node_id = 0;
562  	
563  	            crm_element_value_int(request, XML_ATTR_ID, &node_id);
564  	            if ((node_id <= 0)
565  	                && (crm_element_value(request, XML_ATTR_UNAME) == NULL)) {
566  	                crm_xml_add(request, XML_ATTR_UNAME, lrm_state->node_name);
567  	            }
568  	        }
569  	
570  	        crmd_proxy_dispatch(session, request);
571  	
572  	        crm_element_value_int(msg, F_LRMD_IPC_MSG_FLAGS, &flags);
573  	        if (flags & crm_ipc_client_response) {
574  	            int msg_id = 0;
575  	            xmlNode *op_reply = create_xml_node(NULL, "ack");
576  	
577  	            crm_xml_add(op_reply, "function", __func__);
578  	            crm_xml_add_int(op_reply, "line", __LINE__);
579  	
580  	            crm_element_value_int(msg, F_LRMD_IPC_MSG_ID, &msg_id);
581  	            remote_proxy_relay_response(proxy, op_reply, msg_id);
582  	
583  	            free_xml(op_reply);
584  	        }
585  	
586  	    } else {
587  	        remote_proxy_cb(lrmd, lrm_state->node_name, msg);
588  	    }
589  	}
590  	
591  	
592  	// \return Standard Pacemaker return code
593  	int
594  	controld_connect_remote_executor(lrm_state_t *lrm_state, const char *server,
595  	                                 int port, int timeout_ms)
596  	{
597  	    int rc = pcmk_rc_ok;
598  	
599  	    if (lrm_state->conn == NULL) {
600  	        lrmd_t *api = NULL;
601  	
602  	        rc = lrmd__new(&api, lrm_state->node_name, server, port);
603  	        if (rc != pcmk_rc_ok) {
604  	            crm_warn("Pacemaker Remote connection to %s:%s failed: %s "
605  	                     CRM_XS " rc=%d", server, port, pcmk_rc_str(rc), rc);
606  	
607  	            return rc;
608  	        }
609  	        lrm_state->conn = api;
610  	        api->cmds->set_callback(api, remote_lrm_op_callback);
611  	        lrmd_internal_set_proxy_callback(api, lrm_state, crmd_remote_proxy_cb);
612  	    }
613  	
614  	    crm_trace("Initiating remote connection to %s:%d with timeout %dms",
615  	              server, port, timeout_ms);
616  	    rc = ((lrmd_t *) lrm_state->conn)->cmds->connect_async(lrm_state->conn,
617  	                                                           lrm_state->node_name,
618  	                                                           timeout_ms);
619  	    if (rc == pcmk_ok) {
620  	        lrm_state->num_lrm_register_fails = 0;
621  	    } else {
622  	        lrm_state->num_lrm_register_fails++; // Ignored for remote connections
623  	    }
624  	    return pcmk_legacy2rc(rc);
625  	}
626  	
627  	int
628  	lrm_state_get_metadata(lrm_state_t * lrm_state,
629  	                       const char *class,
630  	                       const char *provider,
631  	                       const char *agent, char **output, enum lrmd_call_options options)
632  	{
633  	    lrmd_key_value_t *params = NULL;
634  	
635  	    if (!lrm_state->conn) {
636  	        return -ENOTCONN;
637  	    }
638  	
639  	    /* Add the node name to the environment, as is done with normal resource
640  	     * action calls. Meta-data calls shouldn't need it, but some agents are
641  	     * written with an ocf_local_nodename call at the beginning regardless of
642  	     * action. Without the environment variable, the agent would try to contact
643  	     * the controller to get the node name -- but the controller would be
644  	     * blocking on the synchronous meta-data call.
645  	     *
646  	     * At this point, we have to assume that agents are unlikely to make other
647  	     * calls that require the controller, such as crm_node --quorum or
648  	     * --cluster-id.
649  	     *
650  	     * @TODO Make meta-data calls asynchronous. (This will be part of a larger
651  	     * project to make meta-data calls via the executor rather than directly.)
652  	     */
653  	    params = lrmd_key_value_add(params, CRM_META "_" XML_LRM_ATTR_TARGET,
654  	                                lrm_state->node_name);
655  	
656  	    return ((lrmd_t *) lrm_state->conn)->cmds->get_metadata_params(lrm_state->conn,
657  	            class, provider, agent, output, options, params);
658  	}
659  	
660  	int
661  	lrm_state_cancel(lrm_state_t *lrm_state, const char *rsc_id, const char *action,
662  	                 guint interval_ms)
663  	{
664  	    if (!lrm_state->conn) {
665  	        return -ENOTCONN;
666  	    }
667  	
668  	    /* Figure out a way to make this async?
669  	     * NOTICE: Currently it's synced and directly acknowledged in do_lrm_invoke(). */
670  	    if (is_remote_lrmd_ra(NULL, NULL, rsc_id)) {
671  	        return remote_ra_cancel(lrm_state, rsc_id, action, interval_ms);
672  	    }
673  	    return ((lrmd_t *) lrm_state->conn)->cmds->cancel(lrm_state->conn, rsc_id,
674  	                                                      action, interval_ms);
675  	}
676  	
677  	lrmd_rsc_info_t *
678  	lrm_state_get_rsc_info(lrm_state_t * lrm_state, const char *rsc_id, enum lrmd_call_options options)
679  	{
680  	    lrmd_rsc_info_t *rsc = NULL;
681  	
682  	    if (!lrm_state->conn) {
683  	        return NULL;
684  	    }
685  	    if (is_remote_lrmd_ra(NULL, NULL, rsc_id)) {
686  	        return remote_ra_get_rsc_info(lrm_state, rsc_id);
687  	    }
688  	
689  	    rsc = g_hash_table_lookup(lrm_state->rsc_info_cache, rsc_id);
690  	    if (rsc == NULL) {
691  	        /* only contact the lrmd if we don't already have a cached rsc info */
692  	        rsc = ((lrmd_t *) lrm_state->conn)->cmds->get_rsc_info(lrm_state->conn, rsc_id, options);
693  	        if (rsc == NULL) {
694  			    return NULL;
695  	        }
696  	        /* cache the result */
697  	        g_hash_table_insert(lrm_state->rsc_info_cache, rsc->id, rsc);
698  	    }
699  	
700  	    return lrmd_copy_rsc_info(rsc);
701  	
702  	}
703  	
704  	/*!
705  	 * \internal
706  	 * \brief Initiate a resource agent action
707  	 *
708  	 * \param[in,out] lrm_state       Executor state object
709  	 * \param[in]     rsc_id          ID of resource for action
710  	 * \param[in]     action          Action to execute
711  	 * \param[in]     userdata        String to copy and pass to execution callback
712  	 * \param[in]     interval_ms     Action interval (in milliseconds)
713  	 * \param[in]     timeout_ms      Action timeout (in milliseconds)
714  	 * \param[in]     start_delay_ms  Delay (in ms) before initiating action
715  	 * \param[in]     parameters      Hash table of resource parameters
716  	 * \param[out]    call_id         Where to store call ID on success
717  	 *
718  	 * \return Standard Pacemaker return code
719  	 */
720  	int
721  	controld_execute_resource_agent(lrm_state_t *lrm_state, const char *rsc_id,
722  	                                const char *action, const char *userdata,
723  	                                guint interval_ms, int timeout_ms,
724  	                                int start_delay_ms, GHashTable *parameters,
725  	                                int *call_id)
726  	{
727  	    int rc = pcmk_rc_ok;
728  	    lrmd_key_value_t *params = NULL;
729  	
730  	    if (lrm_state->conn == NULL) {
731  	        return ENOTCONN;
732  	    }
733  	
734  	    // Convert parameters from hash table to list
735  	    if (parameters != NULL) {
736  	        const char *key = NULL;
737  	        const char *value = NULL;
738  	        GHashTableIter iter;
739  	
740  	        g_hash_table_iter_init(&iter, parameters);
741  	        while (g_hash_table_iter_next(&iter, (gpointer *) &key,
742  	                                      (gpointer *) &value)) {
743  	            params = lrmd_key_value_add(params, key, value);
744  	        }
745  	    }
746  	
747  	    if (is_remote_lrmd_ra(NULL, NULL, rsc_id)) {
748  	        rc = controld_execute_remote_agent(lrm_state, rsc_id, action,
749  	                                           userdata, interval_ms, timeout_ms,
750  	                                           start_delay_ms, params, call_id);
751  	
752  	    } else {
753  	        rc = ((lrmd_t *) lrm_state->conn)->cmds->exec(lrm_state->conn, rsc_id,
754  	                                                      action, userdata,
755  	                                                      interval_ms, timeout_ms,
756  	                                                      start_delay_ms,
757  	                                                      lrmd_opt_notify_changes_only,
758  	                                                      params);
759  	        if (rc < 0) {
760  	            rc = pcmk_legacy2rc(rc);
761  	        } else {
762  	            *call_id = rc;
763  	            rc = pcmk_rc_ok;
764  	        }
765  	    }
766  	    return rc;
767  	}
768  	
769  	int
770  	lrm_state_register_rsc(lrm_state_t * lrm_state,
771  	                       const char *rsc_id,
772  	                       const char *class,
773  	                       const char *provider, const char *agent, enum lrmd_call_options options)
774  	{
775  	    lrmd_t *conn = (lrmd_t *) lrm_state->conn;
776  	
777  	    if (conn == NULL) {
778  	        return -ENOTCONN;
779  	    }
780  	
781  	    if (is_remote_lrmd_ra(agent, provider, NULL)) {
782  	        return lrm_state_find_or_create(rsc_id)? pcmk_ok : -EINVAL;
783  	    }
784  	
785  	    /* @TODO Implement an asynchronous version of this (currently a blocking
786  	     * call to the lrmd).
787  	     */
788  	    return conn->cmds->register_rsc(lrm_state->conn, rsc_id, class, provider,
789  	                                    agent, options);
790  	}
791  	
792  	int
793  	lrm_state_unregister_rsc(lrm_state_t * lrm_state,
794  	                         const char *rsc_id, enum lrmd_call_options options)
795  	{
796  	    if (!lrm_state->conn) {
797  	        return -ENOTCONN;
798  	    }
799  	
800  	    if (is_remote_lrmd_ra(NULL, NULL, rsc_id)) {
801  	        g_hash_table_remove(lrm_state_table, rsc_id);
802  	        return pcmk_ok;
803  	    }
804  	
805  	    g_hash_table_remove(lrm_state->rsc_info_cache, rsc_id);
806  	
807  	    /* @TODO Optimize this ... this function is a blocking round trip from
808  	     * client to daemon. The controld_execd_state.c code path that uses this
809  	     * function should always treat it as an async operation. The executor API
810  	     * should make an async version available.
811  	     */
812  	    return ((lrmd_t *) lrm_state->conn)->cmds->unregister_rsc(lrm_state->conn, rsc_id, options);
813  	}
814