1    	#include "config.h"
2    	
3    	#include <stdio.h>
4    	#include <sys/types.h>
5    	#include <stdint.h>
6    	#include <malloc.h>
7    	#include <signal.h>
8    	#include <unistd.h>
9    	#include <sys/select.h>
10   	#include <string.h>
11   	#include <errno.h>
12   	#include <time.h>
13   	#include <sys/uio.h>
14   	#include <list.h>
15   	#include <pthread.h>
16   	
17   	#include <corosync/cpg.h>
18   	
19   	#include "debug.h"
20   	#include "virt.h"
21   	#include "cpg.h"
22   	
23   	#define NODE_ID_NONE ((uint32_t) -1)
24   	
25   	struct msg_queue_node {
26   		list_head();
27   		uint32_t seqno;
28   	#define STATE_CLEAR 0
29   	#define STATE_MESSAGE 1
30   		uint32_t state;
31   		void *msg;
32   		size_t msglen;
33   	};
34   	
35   	struct wire_msg {
36   	#define TYPE_REQUEST 0
37   	#define TYPE_REPLY 1
38   	#define TYPE_STORE_VM 2
39   		uint32_t type;
40   		uint32_t seqno;
41   		uint32_t target;
42   		uint32_t pad;
43   		char data[0];
44   	};
45   	
46   	static uint32_t seqnum = 0;
47   	static struct msg_queue_node *pending = NULL;
48   	static cpg_handle_t cpg_handle;
49   	static struct cpg_name gname;
50   	
51   	static pthread_mutex_t cpg_mutex = PTHREAD_MUTEX_INITIALIZER;
52   	static pthread_cond_t cpg_cond = PTHREAD_COND_INITIALIZER;
53   	static pthread_t cpg_thread = 0;
54   	
55   	static pthread_mutex_t cpg_ids_mutex = PTHREAD_MUTEX_INITIALIZER;
56   	static uint32_t my_node_id = NODE_ID_NONE;
57   	static uint32_t high_id_from_callback = NODE_ID_NONE;
58   	
59   	static request_callback_fn req_callback_fn;
60   	static request_callback_fn store_callback_fn;
61   	static confchange_callback_fn conf_leave_fn;
62   	static confchange_callback_fn conf_join_fn;
63   	
64   	
65   	int
66   	cpg_get_ids(uint32_t *my_id, uint32_t *high_id)
67   	{
68   		if (!my_id && !high_id)
69   			return -1;
70   	
71   		pthread_mutex_lock(&cpg_ids_mutex);
72   		if (my_id)
73   			*my_id = my_node_id;
74   	
75   		if (high_id)
76   			*high_id = high_id_from_callback;
77   		pthread_mutex_unlock(&cpg_ids_mutex);
78   	
79   		return 0;
80   	}
81   	
82   	static void
83   	cpg_deliver_func(cpg_handle_t h,
84   			 const struct cpg_name *group_name,
85   			 uint32_t nodeid,
86   			 uint32_t pid,
87   			 void *msg,
88   			 size_t msglen)
89   	{
90   		struct msg_queue_node *n;
91   		struct wire_msg *m = msg;
92   		int x, found;
93   	
94   		pthread_mutex_lock(&cpg_mutex);
(1) Event cond_false: Condition "m->type == 1", taking false branch.
95   		if (m->type == TYPE_REPLY) {
96   			/* Reply to a request we sent */
97   			found = 0;
98   	
99   			list_for(&pending, n, x) {
100  				if (m->seqno != n->seqno)
101  					continue;
102  				if (m->target != my_node_id)
103  					continue;
104  				found = 1;
105  				break;
106  			}
107  	
108  			if (!found)
109  				goto out_unlock;
110  	
111  			/* Copy our message in to a buffer */
112  			n->msglen = msglen - sizeof(*m);
113  			if (!n->msglen) {
114  				/* XXX do what? */
115  			}
116  			n->msg = malloc(n->msglen);
117  			if (!n->msg) {
118  				goto out_unlock;
119  			}
120  			n->state = STATE_MESSAGE;
121  			memcpy(n->msg, (char *)msg + sizeof(*m), n->msglen);
122  	
123  			list_remove(&pending, n);
124  			list_insert(&pending, n);
125  	
126  			dbg_printf(2, "Seqnum %d replied; removing from list\n", n->seqno);
127  	
128  			pthread_cond_broadcast(&cpg_cond);
129  			goto out_unlock;
(2) Event if_end: End of if statement.
130  		}
131  		pthread_mutex_unlock(&cpg_mutex);
132  	
(3) Event cond_true: Condition "m->type == 0", taking true branch.
133  		if (m->type == TYPE_REQUEST) {
(4) Event missing_lock: Accessing "req_callback_fn" without holding lock "cpg_mutex". Elsewhere, "req_callback_fn" is written to with "cpg_mutex" held 1 out of 1 times.
Also see events: [lock_acquire][example_access]
134  			req_callback_fn(&m->data, msglen - sizeof(*m),
135  					 nodeid, m->seqno);
136  		}
137  		if (m->type == TYPE_STORE_VM) {
138  			store_callback_fn(&m->data, msglen - sizeof(*m),
139  					 nodeid, m->seqno);
140  		}
141  	
142  		return;
143  	
144  	out_unlock:
145  		pthread_mutex_unlock(&cpg_mutex);
146  	}
147  	
148  	
149  	static void
150  	cpg_config_change(cpg_handle_t h,
151  			  const struct cpg_name *group_name,
152  			  const struct cpg_address *members, size_t memberlen,
153  			  const struct cpg_address *left, size_t leftlen,
154  			  const struct cpg_address *join, size_t joinlen)
155  	{
156  		int x;
157  		int high;
158  	
159  		pthread_mutex_lock(&cpg_ids_mutex);
160  		high = my_node_id;
161  	
162  		for (x = 0; x < memberlen; x++) {
163  			if (members[x].nodeid > high)
164  				high = members[x].nodeid;
165  		}
166  	
167  		high_id_from_callback = high;
168  		pthread_mutex_unlock(&cpg_ids_mutex);
169  	
170  		if (joinlen > 0)
171  			conf_join_fn(join, joinlen);
172  	
173  		if (leftlen > 0)
174  			conf_leave_fn(left, leftlen);
175  	}
176  	
177  	
178  	static cpg_callbacks_t my_callbacks = {
179  		.cpg_deliver_fn = cpg_deliver_func,
180  		.cpg_confchg_fn = cpg_config_change
181  	};
182  	
183  	
184  	int
185  	cpg_send_req(void *data, size_t len, uint32_t *seqno)
186  	{
187  		struct iovec iov;
188  		struct msg_queue_node *n;
189  		struct wire_msg *m;
190  		size_t msgsz = sizeof(*m) + len;
191  		int ret;
192  	
193  		n = malloc(sizeof(*n));
194  		if (!n)
195  			return -1;
196  	
197  		m = malloc(msgsz);
198  		if (!m) {
199  			free(n);
200  			return -1;
201  		}
202  	
203  		/* only incremented on send */
204  		n->state = STATE_CLEAR;
205  		n->msg = NULL;
206  		n->msglen = 0;
207  	
208  		pthread_mutex_lock(&cpg_mutex);
209  		list_insert(&pending, n);
210  		n->seqno = ++seqnum;
211  		m->seqno = seqnum;
212  		*seqno = seqnum;
213  		pthread_mutex_unlock(&cpg_mutex);
214  	
215  		m->type = TYPE_REQUEST;		/* XXX swab? */
216  		m->target = NODE_ID_NONE;
217  		memcpy(&m->data, data, len);
218  	
219  		iov.iov_base = m;
220  		iov.iov_len = msgsz;
221  		ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
222  	
223  		free(m);
224  		if (ret == CS_OK)
225  			return 0;
226  		return -1;
227  	}
228  	
229  	
230  	int
231  	cpg_send_vm_state(virt_state_t *vs)
232  	{
233  		struct iovec iov;
234  		struct msg_queue_node *n;
235  		struct wire_msg *m;
236  		size_t msgsz = sizeof(*m) + sizeof(*vs);
237  		int ret;
238  	
239  		n = calloc(1, (sizeof(*n)));
240  		if (!n)
241  			return -1;
242  	
243  		m = calloc(1, msgsz);
244  		if (!m) {
245  			free(n);
246  			return -1;
247  		}
248  	
249  		n->state = STATE_MESSAGE;
250  		n->msg = NULL;
251  		n->msglen = 0;
252  	
253  		pthread_mutex_lock(&cpg_mutex);
254  		list_insert(&pending, n);
255  		pthread_mutex_unlock(&cpg_mutex);
256  	
257  		m->type = TYPE_STORE_VM;
258  		m->target = NODE_ID_NONE;
259  	
260  		memcpy(&m->data, vs, sizeof(*vs));
261  	
262  		iov.iov_base = m;
263  		iov.iov_len = msgsz;
264  		ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
265  	
266  		free(m);
267  		if (ret == CS_OK)
268  			return 0;
269  	
270  		return -1;
271  	}
272  	
273  	
274  	int
275  	cpg_send_reply(void *data, size_t len, uint32_t nodeid, uint32_t seqno)
276  	{
277  		struct iovec iov;
278  		struct wire_msg *m;
279  		size_t msgsz = sizeof(*m) + len;
280  		int ret;
281  	
282  		m = malloc(msgsz);
283  		if (!m)
284  			return -1;
285  	
286  		/* only incremented on send */
287  		m->seqno = seqno;
288  		m->type = TYPE_REPLY;		/* XXX swab? */
289  		m->target = nodeid;
290  		memcpy(&m->data, data, len);
291  	
292  		iov.iov_base = m;
293  		iov.iov_len = msgsz;
294  		ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
295  	
296  		free(m);
297  		if (ret == CS_OK)
298  			return 0;
299  	
300  		return -1;
301  	}
302  	
303  	
304  	int
305  	cpg_wait_reply(void **data, size_t *len, uint32_t seqno)
306  	{
307  		struct msg_queue_node *n;
308  		int x, found = 0;
309  	
310  		while (!found) {
311  			found = 0;
312  			pthread_mutex_lock(&cpg_mutex);
313  			pthread_cond_wait(&cpg_cond, &cpg_mutex);
314  	
315  			list_for(&pending, n, x) {
316  				if (n->seqno != seqno)
317  					continue;
318  				if (n->state != STATE_MESSAGE)
319  					continue;
320  				found = 1;
321  				goto out;
322  			}
323  			pthread_mutex_unlock(&cpg_mutex);
324  		}
325  	
326  	out:
327  		list_remove(&pending, n);
328  		pthread_mutex_unlock(&cpg_mutex);
329  	
330  		*data = n->msg;
331  		*len = n->msglen;
332  		free(n);
333  	
334  		return 0;
335  	}
336  	
337  	
338  	static void *
339  	cpg_dispatch_thread(void *arg)
340  	{
341  		cpg_dispatch(cpg_handle, CS_DISPATCH_BLOCKING);
342  	
343  		return NULL;
344  	}
345  	
346  	
347  	int
348  	cpg_start(	const char *name,
349  				request_callback_fn req_cb_fn,
350  				request_callback_fn store_cb_fn,
351  				confchange_callback_fn join_fn,
352  				confchange_callback_fn leave_fn)
353  	{
354  		cpg_handle_t h;
355  		int ret;
356  	
357  		errno = EINVAL;
358  	
359  		if (!name)
360  			return -1;
361  	
362  		ret = snprintf(gname.value, sizeof(gname.value), "%s", name);
363  		if (ret <= 0)
364  			return -1;
365  	
366  		if (ret >= sizeof(gname.value)) {
367  			errno = ENAMETOOLONG;
368  			return -1;
369  		}
370  		gname.length = ret;
371  	
372  		memset(&h, 0, sizeof(h));
373  		if (cpg_initialize(&h, &my_callbacks) != CS_OK) {
374  			perror("cpg_initialize");
375  			return -1;
376  		}
377  	
378  		if (cpg_join(h, &gname) != CS_OK) {
379  			perror("cpg_join");
380  			return -1;
381  		}
382  	
383  		cpg_local_get(h, &my_node_id);
384  		dbg_printf(2, "My CPG nodeid is %d\n", my_node_id);
385  	
(5) Event lock_acquire: Example 1: Calling "pthread_mutex_lock" acquires lock "cpg_mutex".
Also see events: [missing_lock][example_access]
386  		pthread_mutex_lock(&cpg_mutex);
387  		pthread_create(&cpg_thread, NULL, cpg_dispatch_thread, NULL);
388  	
389  		memcpy(&cpg_handle, &h, sizeof(h));
390  	
(6) Event example_access: Example 1 (cont.): "req_callback_fn" is written to with lock "cpg_mutex" held.
Also see events: [missing_lock][lock_acquire]
391  		req_callback_fn = req_cb_fn;
392  		store_callback_fn = store_cb_fn;
393  		conf_join_fn = join_fn;
394  		conf_leave_fn = leave_fn;
395  	
396  		pthread_mutex_unlock(&cpg_mutex);
397  	
398  		return 0;
399  	}
400  	
401  	
402  	int
403  	cpg_stop(void)
404  	{
405  		pthread_cancel(cpg_thread);
406  		pthread_join(cpg_thread, NULL);
407  		cpg_leave(cpg_handle, &gname);
408  		cpg_finalize(cpg_handle);
409  	
410  		return 0;
411  	}
412