1 #include "config.h"
2
3 #include <stdio.h>
4 #include <sys/types.h>
5 #include <stdint.h>
6 #include <malloc.h>
7 #include <signal.h>
8 #include <unistd.h>
9 #include <sys/select.h>
10 #include <string.h>
11 #include <errno.h>
12 #include <time.h>
13 #include <sys/uio.h>
14 #include <list.h>
15 #include <pthread.h>
16
17 #include <corosync/cpg.h>
18
19 #include "debug.h"
20 #include "virt.h"
21 #include "cpg.h"
22
23 #define NODE_ID_NONE ((uint32_t) -1)
24
25 struct msg_queue_node {
26 list_head();
27 uint32_t seqno;
28 #define STATE_CLEAR 0
29 #define STATE_MESSAGE 1
30 uint32_t state;
31 void *msg;
32 size_t msglen;
33 };
34
35 struct wire_msg {
36 #define TYPE_REQUEST 0
37 #define TYPE_REPLY 1
38 #define TYPE_STORE_VM 2
39 uint32_t type;
40 uint32_t seqno;
41 uint32_t target;
42 uint32_t pad;
43 char data[0];
44 };
45
46 static uint32_t seqnum = 0;
47 static struct msg_queue_node *pending = NULL;
48 static cpg_handle_t cpg_handle;
49 static struct cpg_name gname;
50
51 static pthread_mutex_t cpg_mutex = PTHREAD_MUTEX_INITIALIZER;
52 static pthread_cond_t cpg_cond = PTHREAD_COND_INITIALIZER;
53 static pthread_t cpg_thread = 0;
54
55 static pthread_mutex_t cpg_ids_mutex = PTHREAD_MUTEX_INITIALIZER;
56 static uint32_t my_node_id = NODE_ID_NONE;
57 static uint32_t high_id_from_callback = NODE_ID_NONE;
58
59 static request_callback_fn req_callback_fn;
60 static request_callback_fn store_callback_fn;
61 static confchange_callback_fn conf_leave_fn;
62 static confchange_callback_fn conf_join_fn;
63
64
65 int
66 cpg_get_ids(uint32_t *my_id, uint32_t *high_id)
67 {
68 if (!my_id && !high_id)
69 return -1;
70
71 pthread_mutex_lock(&cpg_ids_mutex);
72 if (my_id)
73 *my_id = my_node_id;
74
75 if (high_id)
76 *high_id = high_id_from_callback;
77 pthread_mutex_unlock(&cpg_ids_mutex);
78
79 return 0;
80 }
81
82 static void
83 cpg_deliver_func(cpg_handle_t h,
84 const struct cpg_name *group_name,
85 uint32_t nodeid,
86 uint32_t pid,
87 void *msg,
88 size_t msglen)
89 {
90 struct msg_queue_node *n;
91 struct wire_msg *m = msg;
92 int x, found;
93
94 pthread_mutex_lock(&cpg_mutex);
|
(1) Event cond_false: |
Condition "m->type == 1", taking false branch. |
95 if (m->type == TYPE_REPLY) {
96 /* Reply to a request we sent */
97 found = 0;
98
99 list_for(&pending, n, x) {
100 if (m->seqno != n->seqno)
101 continue;
102 if (m->target != my_node_id)
103 continue;
104 found = 1;
105 break;
106 }
107
108 if (!found)
109 goto out_unlock;
110
111 /* Copy our message in to a buffer */
112 n->msglen = msglen - sizeof(*m);
113 if (!n->msglen) {
114 /* XXX do what? */
115 }
116 n->msg = malloc(n->msglen);
117 if (!n->msg) {
118 goto out_unlock;
119 }
120 n->state = STATE_MESSAGE;
121 memcpy(n->msg, (char *)msg + sizeof(*m), n->msglen);
122
123 list_remove(&pending, n);
124 list_insert(&pending, n);
125
126 dbg_printf(2, "Seqnum %d replied; removing from list\n", n->seqno);
127
128 pthread_cond_broadcast(&cpg_cond);
129 goto out_unlock;
|
(2) Event if_end: |
End of if statement. |
130 }
131 pthread_mutex_unlock(&cpg_mutex);
132
|
(3) Event cond_true: |
Condition "m->type == 0", taking true branch. |
133 if (m->type == TYPE_REQUEST) {
|
(4) Event missing_lock: |
Accessing "req_callback_fn" without holding lock "cpg_mutex". Elsewhere, "req_callback_fn" is written to with "cpg_mutex" held 1 out of 1 times. |
| Also see events: |
[lock_acquire][example_access] |
134 req_callback_fn(&m->data, msglen - sizeof(*m),
135 nodeid, m->seqno);
136 }
137 if (m->type == TYPE_STORE_VM) {
138 store_callback_fn(&m->data, msglen - sizeof(*m),
139 nodeid, m->seqno);
140 }
141
142 return;
143
144 out_unlock:
145 pthread_mutex_unlock(&cpg_mutex);
146 }
147
148
149 static void
150 cpg_config_change(cpg_handle_t h,
151 const struct cpg_name *group_name,
152 const struct cpg_address *members, size_t memberlen,
153 const struct cpg_address *left, size_t leftlen,
154 const struct cpg_address *join, size_t joinlen)
155 {
156 int x;
157 int high;
158
159 pthread_mutex_lock(&cpg_ids_mutex);
160 high = my_node_id;
161
162 for (x = 0; x < memberlen; x++) {
163 if (members[x].nodeid > high)
164 high = members[x].nodeid;
165 }
166
167 high_id_from_callback = high;
168 pthread_mutex_unlock(&cpg_ids_mutex);
169
170 if (joinlen > 0)
171 conf_join_fn(join, joinlen);
172
173 if (leftlen > 0)
174 conf_leave_fn(left, leftlen);
175 }
176
177
178 static cpg_callbacks_t my_callbacks = {
179 .cpg_deliver_fn = cpg_deliver_func,
180 .cpg_confchg_fn = cpg_config_change
181 };
182
183
184 int
185 cpg_send_req(void *data, size_t len, uint32_t *seqno)
186 {
187 struct iovec iov;
188 struct msg_queue_node *n;
189 struct wire_msg *m;
190 size_t msgsz = sizeof(*m) + len;
191 int ret;
192
193 n = malloc(sizeof(*n));
194 if (!n)
195 return -1;
196
197 m = malloc(msgsz);
198 if (!m) {
199 free(n);
200 return -1;
201 }
202
203 /* only incremented on send */
204 n->state = STATE_CLEAR;
205 n->msg = NULL;
206 n->msglen = 0;
207
208 pthread_mutex_lock(&cpg_mutex);
209 list_insert(&pending, n);
210 n->seqno = ++seqnum;
211 m->seqno = seqnum;
212 *seqno = seqnum;
213 pthread_mutex_unlock(&cpg_mutex);
214
215 m->type = TYPE_REQUEST; /* XXX swab? */
216 m->target = NODE_ID_NONE;
217 memcpy(&m->data, data, len);
218
219 iov.iov_base = m;
220 iov.iov_len = msgsz;
221 ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
222
223 free(m);
224 if (ret == CS_OK)
225 return 0;
226 return -1;
227 }
228
229
230 int
231 cpg_send_vm_state(virt_state_t *vs)
232 {
233 struct iovec iov;
234 struct msg_queue_node *n;
235 struct wire_msg *m;
236 size_t msgsz = sizeof(*m) + sizeof(*vs);
237 int ret;
238
239 n = calloc(1, (sizeof(*n)));
240 if (!n)
241 return -1;
242
243 m = calloc(1, msgsz);
244 if (!m) {
245 free(n);
246 return -1;
247 }
248
249 n->state = STATE_MESSAGE;
250 n->msg = NULL;
251 n->msglen = 0;
252
253 pthread_mutex_lock(&cpg_mutex);
254 list_insert(&pending, n);
255 pthread_mutex_unlock(&cpg_mutex);
256
257 m->type = TYPE_STORE_VM;
258 m->target = NODE_ID_NONE;
259
260 memcpy(&m->data, vs, sizeof(*vs));
261
262 iov.iov_base = m;
263 iov.iov_len = msgsz;
264 ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
265
266 free(m);
267 if (ret == CS_OK)
268 return 0;
269
270 return -1;
271 }
272
273
274 int
275 cpg_send_reply(void *data, size_t len, uint32_t nodeid, uint32_t seqno)
276 {
277 struct iovec iov;
278 struct wire_msg *m;
279 size_t msgsz = sizeof(*m) + len;
280 int ret;
281
282 m = malloc(msgsz);
283 if (!m)
284 return -1;
285
286 /* only incremented on send */
287 m->seqno = seqno;
288 m->type = TYPE_REPLY; /* XXX swab? */
289 m->target = nodeid;
290 memcpy(&m->data, data, len);
291
292 iov.iov_base = m;
293 iov.iov_len = msgsz;
294 ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
295
296 free(m);
297 if (ret == CS_OK)
298 return 0;
299
300 return -1;
301 }
302
303
304 int
305 cpg_wait_reply(void **data, size_t *len, uint32_t seqno)
306 {
307 struct msg_queue_node *n;
308 int x, found = 0;
309
310 while (!found) {
311 found = 0;
312 pthread_mutex_lock(&cpg_mutex);
313 pthread_cond_wait(&cpg_cond, &cpg_mutex);
314
315 list_for(&pending, n, x) {
316 if (n->seqno != seqno)
317 continue;
318 if (n->state != STATE_MESSAGE)
319 continue;
320 found = 1;
321 goto out;
322 }
323 pthread_mutex_unlock(&cpg_mutex);
324 }
325
326 out:
327 list_remove(&pending, n);
328 pthread_mutex_unlock(&cpg_mutex);
329
330 *data = n->msg;
331 *len = n->msglen;
332 free(n);
333
334 return 0;
335 }
336
337
338 static void *
339 cpg_dispatch_thread(void *arg)
340 {
341 cpg_dispatch(cpg_handle, CS_DISPATCH_BLOCKING);
342
343 return NULL;
344 }
345
346
347 int
348 cpg_start( const char *name,
349 request_callback_fn req_cb_fn,
350 request_callback_fn store_cb_fn,
351 confchange_callback_fn join_fn,
352 confchange_callback_fn leave_fn)
353 {
354 cpg_handle_t h;
355 int ret;
356
357 errno = EINVAL;
358
359 if (!name)
360 return -1;
361
362 ret = snprintf(gname.value, sizeof(gname.value), "%s", name);
363 if (ret <= 0)
364 return -1;
365
366 if (ret >= sizeof(gname.value)) {
367 errno = ENAMETOOLONG;
368 return -1;
369 }
370 gname.length = ret;
371
372 memset(&h, 0, sizeof(h));
373 if (cpg_initialize(&h, &my_callbacks) != CS_OK) {
374 perror("cpg_initialize");
375 return -1;
376 }
377
378 if (cpg_join(h, &gname) != CS_OK) {
379 perror("cpg_join");
380 return -1;
381 }
382
383 cpg_local_get(h, &my_node_id);
384 dbg_printf(2, "My CPG nodeid is %d\n", my_node_id);
385
|
(5) Event lock_acquire: |
Example 1: Calling "pthread_mutex_lock" acquires lock "cpg_mutex". |
| Also see events: |
[missing_lock][example_access] |
386 pthread_mutex_lock(&cpg_mutex);
387 pthread_create(&cpg_thread, NULL, cpg_dispatch_thread, NULL);
388
389 memcpy(&cpg_handle, &h, sizeof(h));
390
|
(6) Event example_access: |
Example 1 (cont.): "req_callback_fn" is written to with lock "cpg_mutex" held. |
| Also see events: |
[missing_lock][lock_acquire] |
391 req_callback_fn = req_cb_fn;
392 store_callback_fn = store_cb_fn;
393 conf_join_fn = join_fn;
394 conf_leave_fn = leave_fn;
395
396 pthread_mutex_unlock(&cpg_mutex);
397
398 return 0;
399 }
400
401
402 int
403 cpg_stop(void)
404 {
405 pthread_cancel(cpg_thread);
406 pthread_join(cpg_thread, NULL);
407 cpg_leave(cpg_handle, &gname);
408 cpg_finalize(cpg_handle);
409
410 return 0;
411 }
412