1 /*
2 * vi: set autoindent tabstop=4 shiftwidth=4 :
3 *
4 * Copyright (c) 2006-2015 Red Hat, Inc.
5 *
6 * All rights reserved.
7 *
8 * Author: Christine Caulfield (ccaulfi@redhat.com)
9 * Author: Jan Friesse (jfriesse@redhat.com)
10 *
11 * This software licensed under BSD license, the text of which follows:
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions are met:
15 *
16 * - Redistributions of source code must retain the above copyright notice,
17 * this list of conditions and the following disclaimer.
18 * - Redistributions in binary form must reproduce the above copyright notice,
19 * this list of conditions and the following disclaimer in the documentation
20 * and/or other materials provided with the distribution.
21 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22 * contributors may be used to endorse or promote products derived from this
23 * software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35 * THE POSSIBILITY OF SUCH DAMAGE.
36 */
37 /*
38 * Provides a closed process group API using the coroipcc executive
39 */
40
41 #include <config.h>
42
43 #include <stdlib.h>
44 #include <stdio.h>
45 #include <string.h>
46 #include <unistd.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #include <sys/mman.h>
50 #include <sys/uio.h>
51 #include <sys/stat.h>
52 #include <errno.h>
53 #include <limits.h>
54
55 #include <qb/qblist.h>
56 #include <qb/qbdefs.h>
57 #include <qb/qbipcc.h>
58 #include <qb/qblog.h>
59
60 #include <corosync/hdb.h>
61 #include <corosync/corotypes.h>
62 #include <corosync/corodefs.h>
63 #include <corosync/cpg.h>
64 #include <corosync/ipc_cpg.h>
65
66 #include "util.h"
67
68 #ifndef MAP_ANONYMOUS
69 #define MAP_ANONYMOUS MAP_ANON
70 #endif
71
72 /*
73 * Maximum number of times to retry a send when transmitting
74 * a large message fragment
75 */
76 #define MAX_RETRIES 100
77
78 /*
79 * ZCB files have following umask (umask is same as used in libqb)
80 */
81 #define CPG_MEMORY_MAP_UMASK 077
82
83 struct cpg_assembly_data
84 {
85 struct qb_list_head list;
86 uint32_t nodeid;
87 uint32_t pid;
88 char *assembly_buf;
89 uint32_t assembly_buf_ptr;
90 };
91
92 struct cpg_inst {
93 qb_ipcc_connection_t *c;
94 int finalize;
95 void *context;
96 union {
97 cpg_model_data_t model_data;
98 cpg_model_v1_data_t model_v1_data;
99 };
100 struct qb_list_head iteration_list_head;
101 uint32_t max_msg_size;
102 struct qb_list_head assembly_list_head;
103 };
104 static void cpg_inst_free (void *inst);
105
106 DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free);
107
108 struct cpg_iteration_instance_t {
109 cpg_iteration_handle_t cpg_iteration_handle;
110 qb_ipcc_connection_t *conn;
111 hdb_handle_t executive_iteration_handle;
112 struct qb_list_head list;
113 };
114
115 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
116
117
118 /*
119 * Internal (not visible by API) functions
120 */
121
122 static cs_error_t
123 coroipcc_msg_send_reply_receive (
124 qb_ipcc_connection_t *c,
125 const struct iovec *iov,
126 unsigned int iov_len,
127 void *res_msg,
128 size_t res_len)
129 {
130 return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
131 CS_IPC_TIMEOUT_MS));
132 }
133
134 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance_t *cpg_iteration_instance)
135 {
136 qb_list_del (&cpg_iteration_instance->list);
137 hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
138 }
139
140 static void cpg_inst_free (void *inst)
141 {
142 struct cpg_inst *cpg_inst = (struct cpg_inst *)inst;
143 qb_ipcc_disconnect(cpg_inst->c);
144 }
145
146 static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
147 {
148 struct qb_list_head *iter, *tmp_iter;
149 struct cpg_iteration_instance_t *cpg_iteration_instance;
150
151 /*
152 * Traverse thru iteration instances and delete them
153 */
154 qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->iteration_list_head)) {
155 cpg_iteration_instance = qb_list_entry (iter, struct cpg_iteration_instance_t, list);
156
157 cpg_iteration_instance_finalize (cpg_iteration_instance);
158 }
159 hdb_handle_destroy (&cpg_handle_t_db, handle);
160 }
161
162 /**
163 * @defgroup cpg_coroipcc The closed process group API
164 * @ingroup coroipcc
165 *
166 * @{
167 */
168
169 cs_error_t cpg_initialize (
170 cpg_handle_t *handle,
171 cpg_callbacks_t *callbacks)
172 {
173 cpg_model_v1_data_t model_v1_data;
174
175 memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
176
177 if (callbacks) {
178 model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
179 model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
180 }
181
182 return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
183 }
184
185 cs_error_t cpg_model_initialize (
186 cpg_handle_t *handle,
187 cpg_model_t model,
188 cpg_model_data_t *model_data,
189 void *context)
190 {
191 cs_error_t error;
192 struct cpg_inst *cpg_inst;
193
194 if (model != CPG_MODEL_V1) {
195 error = CS_ERR_INVALID_PARAM;
196 goto error_no_destroy;
197 }
198
199 error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
200 if (error != CS_OK) {
201 goto error_no_destroy;
202 }
203
204 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (void *)&cpg_inst));
205 if (error != CS_OK) {
206 goto error_destroy;
207 }
208
209 cpg_inst->c = qb_ipcc_connect ("cpg", IPC_REQUEST_SIZE);
210 if (cpg_inst->c == NULL) {
211 error = qb_to_cs_error(-errno);
212 goto error_put_destroy;
213 }
214
215 if (model_data != NULL) {
216 switch (model) {
217 case CPG_MODEL_V1:
218 memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
219 if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
220 error = CS_ERR_INVALID_PARAM;
221
222 goto error_destroy;
223 }
224 break;
225 }
226 }
227
228 /* Allow space for corosync internal headers */
229 cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
230 cpg_inst->model_data.model = model;
231 cpg_inst->context = context;
232
233 qb_list_init(&cpg_inst->iteration_list_head);
234
235 qb_list_init(&cpg_inst->assembly_list_head);
236
237 hdb_handle_put (&cpg_handle_t_db, *handle);
238
239 return (CS_OK);
240
241 error_put_destroy:
242 hdb_handle_put (&cpg_handle_t_db, *handle);
243 error_destroy:
244 hdb_handle_destroy (&cpg_handle_t_db, *handle);
245 error_no_destroy:
246 return (error);
247 }
248
249 cs_error_t cpg_finalize (
250 cpg_handle_t handle)
251 {
252 struct cpg_inst *cpg_inst;
253 struct iovec iov;
254 struct req_lib_cpg_finalize req_lib_cpg_finalize;
255 struct res_lib_cpg_finalize res_lib_cpg_finalize;
256 cs_error_t error;
257
258 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
259 if (error != CS_OK) {
260 return (error);
261 }
262
263 /*
264 * Another thread has already started finalizing
265 */
266 if (cpg_inst->finalize) {
267 hdb_handle_put (&cpg_handle_t_db, handle);
268 return (CS_ERR_BAD_HANDLE);
269 }
270
271 cpg_inst->finalize = 1;
272
273 /*
274 * Send service request
275 */
276 req_lib_cpg_finalize.header.size = sizeof (struct req_lib_cpg_finalize);
277 req_lib_cpg_finalize.header.id = MESSAGE_REQ_CPG_FINALIZE;
278
279 iov.iov_base = (void *)&req_lib_cpg_finalize;
280 iov.iov_len = sizeof (struct req_lib_cpg_finalize);
281
282 error = coroipcc_msg_send_reply_receive (cpg_inst->c,
283 &iov,
284 1,
285 &res_lib_cpg_finalize,
286 sizeof (struct res_lib_cpg_finalize));
287
288 cpg_inst_finalize (cpg_inst, handle);
289 hdb_handle_put (&cpg_handle_t_db, handle);
290
291 return (error);
292 }
293
294 cs_error_t cpg_fd_get (
295 cpg_handle_t handle,
296 int *fd)
297 {
298 cs_error_t error;
299 struct cpg_inst *cpg_inst;
300
301 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
302 if (error != CS_OK) {
303 return (error);
304 }
305
306 error = qb_to_cs_error (qb_ipcc_fd_get (cpg_inst->c, fd));
307
308 hdb_handle_put (&cpg_handle_t_db, handle);
309
310 return (error);
311 }
312
313 cs_error_t cpg_max_atomic_msgsize_get (
314 cpg_handle_t handle,
315 uint32_t *size)
316 {
317 cs_error_t error;
318 struct cpg_inst *cpg_inst;
319
320 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
321 if (error != CS_OK) {
322 return (error);
323 }
324
325 *size = cpg_inst->max_msg_size;
326
327 hdb_handle_put (&cpg_handle_t_db, handle);
328
329 return (error);
330 }
331
332 cs_error_t cpg_context_get (
333 cpg_handle_t handle,
334 void **context)
335 {
336 cs_error_t error;
337 struct cpg_inst *cpg_inst;
338
339 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
340 if (error != CS_OK) {
341 return (error);
342 }
343
344 *context = cpg_inst->context;
345
346 hdb_handle_put (&cpg_handle_t_db, handle);
347
348 return (CS_OK);
349 }
350
351 cs_error_t cpg_context_set (
352 cpg_handle_t handle,
353 void *context)
354 {
355 cs_error_t error;
356 struct cpg_inst *cpg_inst;
357
358 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
359 if (error != CS_OK) {
360 return (error);
361 }
362
363 cpg_inst->context = context;
364
365 hdb_handle_put (&cpg_handle_t_db, handle);
366
367 return (CS_OK);
368 }
369
370 cs_error_t cpg_dispatch (
371 cpg_handle_t handle,
372 cs_dispatch_flags_t dispatch_types)
373 {
374 int timeout = -1;
375 cs_error_t error;
376 int cont = 1; /* always continue do loop except when set to 0 */
377 struct cpg_inst *cpg_inst;
378 struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
379 struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
380 struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
381 struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
382 struct cpg_inst cpg_inst_copy;
383 struct qb_ipc_response_header *dispatch_data;
384 struct cpg_address member_list[CPG_MEMBERS_MAX];
385 struct cpg_address left_list[CPG_MEMBERS_MAX];
386 struct cpg_address joined_list[CPG_MEMBERS_MAX];
387 struct cpg_name group_name;
388 struct cpg_assembly_data *assembly_data;
389 struct qb_list_head *iter, *tmp_iter;
390 mar_cpg_address_t *left_list_start;
391 mar_cpg_address_t *joined_list_start;
392 unsigned int i;
393 struct cpg_ring_id ring_id;
394 uint32_t totem_member_list[CPG_MEMBERS_MAX];
395 int32_t errno_res;
396 char dispatch_buf[IPC_DISPATCH_SIZE];
397
398 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
399 if (error != CS_OK) {
400 return (error);
401 }
402
403 /*
404 * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
405 * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
406 */
407 if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
408 timeout = 0;
409 }
410
411 dispatch_data = (struct qb_ipc_response_header *)dispatch_buf;
412 do {
413 errno_res = qb_ipcc_event_recv (
414 cpg_inst->c,
415 dispatch_buf,
416 IPC_DISPATCH_SIZE,
417 timeout);
418 error = qb_to_cs_error (errno_res);
419 if (error == CS_ERR_BAD_HANDLE) {
420 error = CS_OK;
421 goto error_put;
422 }
423 if (error == CS_ERR_TRY_AGAIN) {
424 if (dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
425 /*
426 * Don't mask error
427 */
428 goto error_put;
429 }
430 error = CS_OK;
431 if (dispatch_types == CS_DISPATCH_ALL) {
432 break; /* exit do while cont is 1 loop */
433 } else {
434 continue; /* next poll */
435 }
436 }
437 if (error != CS_OK) {
438 goto error_put;
439 }
440
441 /*
442 * Make copy of callbacks, message data, unlock instance, and call callback
443 * A risk of this dispatch method is that the callback routines may
444 * operate at the same time that cpgFinalize has been called.
445 */
446 memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
447 switch (cpg_inst_copy.model_data.model) {
448 case CPG_MODEL_V1:
449 /*
450 * Dispatch incoming message
451 */
452 switch (dispatch_data->id) {
453 case MESSAGE_RES_CPG_DELIVER_CALLBACK:
454 if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
455 break;
456 }
457
458 res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
459
460 marshall_from_mar_cpg_name_t (
461 &group_name,
462 &res_cpg_deliver_callback->group_name);
463
464 cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
465 &group_name,
466 res_cpg_deliver_callback->nodeid,
467 res_cpg_deliver_callback->pid,
468 &res_cpg_deliver_callback->message,
469 res_cpg_deliver_callback->msglen);
470 break;
471
472 case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK:
473 res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
474
475 marshall_from_mar_cpg_name_t (
476 &group_name,
477 &res_cpg_partial_deliver_callback->group_name);
478
479 /*
480 * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
481 */
482 assembly_data = NULL;
483 qb_list_for_each(iter, &(cpg_inst->assembly_list_head)) {
484 struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
485 if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
486 assembly_data = current_assembly_data;
487 break;
488 }
489 }
490
491 if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
492
493 /*
494 * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly.
495 * Otherwise the sending of packet must have been interrupted and error should have
496 * been reported to sending client. Therefore here last assembly will be dropped.
497 */
498 if (assembly_data) {
499 qb_list_del (&assembly_data->list);
500 free(assembly_data->assembly_buf);
501 free(assembly_data);
502 // coverity[UNUSED_VALUE:SUPPRESS] defensive programming
503 assembly_data = NULL;
504 }
505
506 assembly_data = malloc(sizeof(struct cpg_assembly_data));
507 if (!assembly_data) {
508 error = CS_ERR_NO_MEMORY;
509 goto error_put;
510 }
511
512 assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
513 assembly_data->pid = res_cpg_partial_deliver_callback->pid;
514 assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
515 if (!assembly_data->assembly_buf) {
516 free(assembly_data);
517 error = CS_ERR_NO_MEMORY;
518 goto error_put;
519 }
520 assembly_data->assembly_buf_ptr = 0;
521 qb_list_init (&assembly_data->list);
522
523 qb_list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
524 }
525 if (assembly_data) {
526 memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
527 res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
528 assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
529
530 if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
531 if (cpg_inst_copy.model_v1_data.cpg_deliver_fn != NULL) {
532 cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
533 &group_name,
534 res_cpg_partial_deliver_callback->nodeid,
535 res_cpg_partial_deliver_callback->pid,
536 assembly_data->assembly_buf,
537 res_cpg_partial_deliver_callback->msglen);
538 }
539
540 qb_list_del (&assembly_data->list);
541 free(assembly_data->assembly_buf);
542 free(assembly_data);
543 }
544 }
545 break;
546
547 case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
548 if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
549 break;
550 }
551
552 res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
553
554 for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
555 marshall_from_mar_cpg_address_t (&member_list[i],
556 &res_cpg_confchg_callback->member_list[i]);
557 }
558 left_list_start = res_cpg_confchg_callback->member_list +
559 res_cpg_confchg_callback->member_list_entries;
560 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
561 marshall_from_mar_cpg_address_t (&left_list[i],
562 &left_list_start[i]);
563 }
564 joined_list_start = res_cpg_confchg_callback->member_list +
565 res_cpg_confchg_callback->member_list_entries +
566 res_cpg_confchg_callback->left_list_entries;
567 for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
568 marshall_from_mar_cpg_address_t (&joined_list[i],
569 &joined_list_start[i]);
570 }
571 marshall_from_mar_cpg_name_t (
572 &group_name,
573 &res_cpg_confchg_callback->group_name);
574
575 cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
576 &group_name,
577 member_list,
578 res_cpg_confchg_callback->member_list_entries,
579 left_list,
580 res_cpg_confchg_callback->left_list_entries,
581 joined_list,
582 res_cpg_confchg_callback->joined_list_entries);
583
584 /*
585 * If member left while his partial packet was being assembled, assembly data must be removed from list
586 */
587 for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
588 qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->assembly_list_head)) {
589 struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
590 if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
591 continue;
592
593 qb_list_del (¤t_assembly_data->list);
594 free(current_assembly_data->assembly_buf);
595 free(current_assembly_data);
596 }
597 }
598
599 break;
600 case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
601 if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
602 break;
603 }
604
605 res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
606
607 marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
608 for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
609 totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
610 }
611
612 cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
613 ring_id,
614 res_cpg_totem_confchg_callback->member_list_entries,
615 totem_member_list);
616 break;
617 default:
618 error = CS_ERR_LIBRARY;
619 goto error_put;
620 break;
621 } /* - switch (dispatch_data->id) */
622 break; /* case CPG_MODEL_V1 */
623 } /* - switch (cpg_inst_copy.model_data.model) */
624
625 if (cpg_inst_copy.finalize || cpg_inst->finalize) {
626 /*
627 * If the finalize has been called then get out of the dispatch.
628 */
629 cpg_inst->finalize = 1;
630 error = CS_ERR_BAD_HANDLE;
631 goto error_put;
632 }
633
634 /*
635 * Determine if more messages should be processed
636 */
637 if (dispatch_types == CS_DISPATCH_ONE || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
638 cont = 0;
639 }
640 } while (cont);
641
642 error_put:
643 hdb_handle_put (&cpg_handle_t_db, handle);
644 return (error);
645 }
646
647 cs_error_t cpg_join (
648 cpg_handle_t handle,
649 const struct cpg_name *group)
650 {
651 cs_error_t error;
652 struct cpg_inst *cpg_inst;
653 struct iovec iov[2];
654 struct req_lib_cpg_join req_lib_cpg_join;
655 struct res_lib_cpg_join response;
656
657 if (group->length > CPG_MAX_NAME_LENGTH) {
658 return (CS_ERR_NAME_TOO_LONG);
659 }
660
661 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
662 if (error != CS_OK) {
663 return (error);
664 }
665
666 /* Now join */
667 req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
668 req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
669 req_lib_cpg_join.pid = getpid();
670 req_lib_cpg_join.flags = 0;
671
672 switch (cpg_inst->model_data.model) {
673 case CPG_MODEL_V1:
674 req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
675 break;
676 }
677
678 marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
679 group);
680
681 iov[0].iov_base = (void *)&req_lib_cpg_join;
682 iov[0].iov_len = sizeof (struct req_lib_cpg_join);
683
684 do {
685 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
686 &response, sizeof (struct res_lib_cpg_join));
687
688 if (error != CS_OK) {
689 goto error_exit;
690 }
691 } while (response.header.error == CS_ERR_BUSY);
692
693 error = response.header.error;
694
695 error_exit:
696 hdb_handle_put (&cpg_handle_t_db, handle);
697
698 return (error);
699 }
700
701 cs_error_t cpg_leave (
702 cpg_handle_t handle,
703 const struct cpg_name *group)
704 {
705 cs_error_t error;
706 struct cpg_inst *cpg_inst;
707 struct iovec iov[2];
708 struct req_lib_cpg_leave req_lib_cpg_leave;
709 struct res_lib_cpg_leave res_lib_cpg_leave;
710
711 if (group->length > CPG_MAX_NAME_LENGTH) {
712 return (CS_ERR_NAME_TOO_LONG);
713 }
714
715 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
716 if (error != CS_OK) {
717 return (error);
718 }
719
720 req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave);
721 req_lib_cpg_leave.header.id = MESSAGE_REQ_CPG_LEAVE;
722 req_lib_cpg_leave.pid = getpid();
723 marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
724 group);
725
726 iov[0].iov_base = (void *)&req_lib_cpg_leave;
727 iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
728
729 do {
730 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
731 &res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
732
733 if (error != CS_OK) {
734 goto error_exit;
735 }
736 } while (res_lib_cpg_leave.header.error == CS_ERR_BUSY);
737
738 error = res_lib_cpg_leave.header.error;
739
740 error_exit:
741 hdb_handle_put (&cpg_handle_t_db, handle);
742
743 return (error);
744 }
745
746 cs_error_t cpg_membership_get (
747 cpg_handle_t handle,
748 struct cpg_name *group_name,
749 struct cpg_address *member_list,
750 int *member_list_entries)
751 {
752 cs_error_t error;
753 struct cpg_inst *cpg_inst;
754 struct iovec iov;
755 struct req_lib_cpg_membership_get req_lib_cpg_membership_get;
756 struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
757 unsigned int i;
758
759 if (group_name->length > CPG_MAX_NAME_LENGTH) {
760 return (CS_ERR_NAME_TOO_LONG);
761 }
762 if (member_list == NULL) {
763 return (CS_ERR_INVALID_PARAM);
764 }
765 if (member_list_entries == NULL) {
766 return (CS_ERR_INVALID_PARAM);
767 }
768
769 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
770 if (error != CS_OK) {
771 return (error);
772 }
773
774 req_lib_cpg_membership_get.header.size = sizeof (struct req_lib_cpg_membership_get);
775 req_lib_cpg_membership_get.header.id = MESSAGE_REQ_CPG_MEMBERSHIP;
776
777 marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
778 group_name);
779
780 iov.iov_base = (void *)&req_lib_cpg_membership_get;
781 iov.iov_len = sizeof (struct req_lib_cpg_membership_get);
782
783 error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
784 &res_lib_cpg_membership_get, sizeof (res_lib_cpg_membership_get));
785
786 if (error != CS_OK) {
787 goto error_exit;
788 }
789
790 error = res_lib_cpg_membership_get.header.error;
791
792 /*
793 * Copy results to caller
794 */
795 *member_list_entries = res_lib_cpg_membership_get.member_count;
796 if (member_list) {
797 for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
798 marshall_from_mar_cpg_address_t (&member_list[i],
799 &res_lib_cpg_membership_get.member_list[i]);
800 }
801 }
802
803 error_exit:
804 hdb_handle_put (&cpg_handle_t_db, handle);
805
806 return (error);
807 }
808
809 cs_error_t cpg_local_get (
810 cpg_handle_t handle,
811 unsigned int *local_nodeid)
812 {
813 cs_error_t error;
814 struct cpg_inst *cpg_inst;
815 struct iovec iov;
816 struct req_lib_cpg_local_get req_lib_cpg_local_get;
817 struct res_lib_cpg_local_get res_lib_cpg_local_get;
818
819 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
820 if (error != CS_OK) {
821 return (error);
822 }
823
824 req_lib_cpg_local_get.header.size = sizeof (struct qb_ipc_request_header);
825 req_lib_cpg_local_get.header.id = MESSAGE_REQ_CPG_LOCAL_GET;
826
827 iov.iov_base = (void *)&req_lib_cpg_local_get;
828 iov.iov_len = sizeof (struct req_lib_cpg_local_get);
829
830 error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
831 &res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
832
833 if (error != CS_OK) {
834 goto error_exit;
835 }
836
837 error = res_lib_cpg_local_get.header.error;
838
839 *local_nodeid = res_lib_cpg_local_get.local_nodeid;
840
841 error_exit:
842 hdb_handle_put (&cpg_handle_t_db, handle);
843
844 return (error);
845 }
846
847 cs_error_t cpg_flow_control_state_get (
848 cpg_handle_t handle,
849 cpg_flow_control_state_t *flow_control_state)
850 {
851 cs_error_t error;
852 struct cpg_inst *cpg_inst;
853
854 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
855 if (error != CS_OK) {
856 return (error);
857 }
858 *flow_control_state = CPG_FLOW_CONTROL_DISABLED;
859 error = CS_OK;
860
861 hdb_handle_put (&cpg_handle_t_db, handle);
862
863 return (error);
864 }
865
866 static int
867 memory_map (char *path, const char *file, void **buf, size_t bytes)
868 {
869 int32_t fd;
870 void *addr;
871 int32_t res;
872 char *buffer;
873 int32_t i;
874 size_t written;
875 size_t page_size;
876 long int sysconf_page_size;
877 mode_t old_umask;
878
879 snprintf (path, PATH_MAX, "/dev/shm/%s", file);
880
881 old_umask = umask(CPG_MEMORY_MAP_UMASK);
882 fd = mkstemp (path);
883 (void)umask(old_umask);
|
(1) Event cond_true: |
Condition "fd == -1", taking true branch. |
884 if (fd == -1) {
885 snprintf (path, PATH_MAX, LOCALSTATEDIR "/run/%s", file);
886 old_umask = umask(CPG_MEMORY_MAP_UMASK);
887 fd = mkstemp (path);
888 (void)umask(old_umask);
|
(2) Event cond_false: |
Condition "fd == -1", taking false branch. |
889 if (fd == -1) {
890 return (-1);
|
(3) Event if_end: |
End of if statement. |
891 }
892 }
893
894 res = ftruncate (fd, bytes);
|
(4) Event cond_false: |
Condition "res == -1", taking false branch. |
895 if (res == -1) {
896 goto error_close_unlink;
|
(5) Event if_end: |
End of if statement. |
897 }
898 sysconf_page_size = sysconf(_SC_PAGESIZE);
|
(6) Event cond_false: |
Condition "sysconf_page_size <= 0", taking false branch. |
899 if (sysconf_page_size <= 0) {
900 goto error_close_unlink;
|
(7) Event if_end: |
End of if statement. |
901 }
902 page_size = sysconf_page_size;
903 buffer = malloc (page_size);
|
(8) Event cond_false: |
Condition "buffer == NULL", taking false branch. |
904 if (buffer == NULL) {
905 goto error_close_unlink;
|
(9) Event if_end: |
End of if statement. |
906 }
907 memset (buffer, 0, page_size);
|
(10) Event cond_true: |
Condition "i < bytes / page_size", taking true branch. |
|
(21) Event loop_begin: |
Jumped back to beginning of loop. |
|
(22) Event cond_true: |
Condition "i < bytes / page_size", taking true branch. |
|
(29) Event loop_begin: |
Jumped back to beginning of loop. |
|
(30) Event cond_false: |
Condition "i < bytes / page_size", taking false branch. |
908 for (i = 0; i < (bytes / page_size); i++) {
|
(14) Event label: |
Reached label "retry_write". |
909 retry_write:
910 written = write (fd, buffer, page_size);
|
(11) Event cond_true: |
Condition "written == -1", taking true branch. |
|
(12) Event cond_true: |
Condition "*__errno_location() == 4", taking true branch. |
|
(15) Event cond_true: |
Condition "written == -1", taking true branch. |
|
(16) Event cond_false: |
Condition "*__errno_location() == 4", taking false branch. |
|
(23) Event cond_true: |
Condition "written == -1", taking true branch. |
|
(24) Event cond_false: |
Condition "*__errno_location() == 4", taking false branch. |
911 if (written == -1 && errno == EINTR) {
|
(13) Event goto: |
Jumping to label "retry_write". |
912 goto retry_write;
|
(17) Event if_end: |
End of if statement. |
|
(25) Event if_end: |
End of if statement. |
913 }
|
(18) Event cond_false: |
Condition "written != page_size", taking false branch. |
|
(26) Event cond_false: |
Condition "written != page_size", taking false branch. |
914 if (written != page_size) {
915 free (buffer);
916 goto error_close_unlink;
|
(19) Event if_end: |
End of if statement. |
|
(27) Event if_end: |
End of if statement. |
917 }
|
(20) Event loop: |
Jumping back to the beginning of the loop. |
|
(28) Event loop: |
Jumping back to the beginning of the loop. |
|
(31) Event loop_end: |
Reached end of loop. |
918 }
919 free (buffer);
920
|
(32) Event alloc_fn: |
Storage is returned from allocation function "mmap". |
|
(33) Event assign: |
Assigning: "addr" = "mmap(NULL, bytes, 3, 1, fd, 0L)". |
| Also see events: |
[assign] |
921 addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
922 MAP_SHARED, fd, 0);
923
|
(34) Event cond_false: |
Condition "addr == (void *)0xffffffffffffffff", taking false branch. |
924 if (addr == MAP_FAILED) {
925 goto error_close_unlink;
|
(35) Event if_end: |
End of if statement. |
926 }
927 #ifdef MADV_NOSYNC
928 madvise(addr, bytes, MADV_NOSYNC);
929 #endif
930
931 res = close (fd);
|
(36) Event cond_false: |
Condition "res", taking false branch. |
932 if (res) {
933 munmap(addr, bytes);
934
935 return (-1);
|
(37) Event if_end: |
End of if statement. |
936 }
|
(38) Event assign: |
Assigning: "*buf" = "addr". |
| Also see events: |
[alloc_fn][assign] |
937 *buf = addr;
938
939 return 0;
940
941 error_close_unlink:
942 close (fd);
943 unlink(path);
944 return -1;
945 }
946
947 cs_error_t cpg_zcb_alloc (
948 cpg_handle_t handle,
949 size_t size,
950 void **buffer)
951 {
952 void *buf = NULL;
953 char path[PATH_MAX];
954 mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc;
955 struct qb_ipc_response_header res_coroipcs_zc_alloc;
956 size_t map_size;
957 struct iovec iovec;
958 struct coroipcs_zc_header *hdr;
959 cs_error_t error;
960 struct cpg_inst *cpg_inst;
961
962 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
|
(1) Event cond_false: |
Condition "error != CS_OK", taking false branch. |
963 if (error != CS_OK) {
964 return (error);
|
(2) Event if_end: |
End of if statement. |
965 }
966
967 map_size = size + sizeof (struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
|
(3) Event alloc_arg: |
"memory_map" allocates memory that is stored into "buf". [details] |
|
(4) Event cond_true: |
Condition "memory_map(path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1", taking true branch. |
|
(5) Event if_fallthrough: |
Falling through to end of if statement. |
|
(6) Event if_end: |
End of if statement. |
| Also see events: |
[leaked_storage] |
968 assert(memory_map (path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
969
|
(7) Event cond_false: |
Condition "strlen(path) >= 128", taking false branch. |
970 if (strlen(path) >= CPG_ZC_PATH_LEN) {
971 unlink(path);
972 munmap (buf, map_size);
973 return (CS_ERR_NAME_TOO_LONG);
|
(8) Event if_end: |
End of if statement. |
974 }
975
976 req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
977 req_coroipcc_zc_alloc.header.id = MESSAGE_REQ_CPG_ZC_ALLOC;
978 req_coroipcc_zc_alloc.map_size = map_size;
979 strcpy (req_coroipcc_zc_alloc.path_to_file, path);
980
981 iovec.iov_base = (void *)&req_coroipcc_zc_alloc;
982 iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
983
984 error = coroipcc_msg_send_reply_receive (
985 cpg_inst->c,
986 &iovec,
987 1,
988 &res_coroipcs_zc_alloc,
989 sizeof (struct qb_ipc_response_header));
990
|
(9) Event cond_true: |
Condition "error != CS_OK", taking true branch. |
991 if (error != CS_OK) {
|
(10) Event goto: |
Jumping to label "error_exit". |
992 goto error_exit;
993 }
994
995 hdr = (struct coroipcs_zc_header *)buf;
996 hdr->map_size = map_size;
997 *buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header) + sizeof (struct req_lib_cpg_mcast);
998
|
(11) Event label: |
Reached label "error_exit". |
999 error_exit:
1000 hdb_handle_put (&cpg_handle_t_db, handle);
1001 /*
1002 * Coverity correctly reports an error here. We cannot safely munmap and unlink the file, because
1003 * the timing of the failure is the key issue: if a failure occurs before the IPC reply,
1004 * the file should be deleted.
1005 * However, if the failure happens during the IPC reply, Corosync has already deleted the file.
1006 * This means the cpg library could attempt to delete a non-existing file (not a problem) or,
1007 * in a theoretical race condition, delete a new file created by another application.
1008 * There are multiple possible solutions, but none of them are ready to be implemented yet.
1009 */
|
(12) Event leaked_storage: |
Variable "buf" going out of scope leaks the storage it points to. |
| Also see events: |
[alloc_arg] |
1010 return (error);
1011 }
1012
1013 cs_error_t cpg_zcb_free (
1014 cpg_handle_t handle,
1015 void *buffer)
1016 {
1017 cs_error_t error;
1018 unsigned int res;
1019 struct cpg_inst *cpg_inst;
1020 mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
1021 struct qb_ipc_response_header res_coroipcs_zc_free;
1022 struct iovec iovec;
1023 struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header) - sizeof (struct req_lib_cpg_mcast));
1024
1025 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1026 if (error != CS_OK) {
1027 return (error);
1028 }
1029
1030 req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
1031 req_coroipcc_zc_free.header.id = MESSAGE_REQ_CPG_ZC_FREE;
1032 req_coroipcc_zc_free.map_size = header->map_size;
1033 req_coroipcc_zc_free.server_address = header->server_address;
1034
1035 iovec.iov_base = (void *)&req_coroipcc_zc_free;
1036 iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
1037
1038 error = coroipcc_msg_send_reply_receive (
1039 cpg_inst->c,
1040 &iovec,
1041 1,
1042 &res_coroipcs_zc_free,
1043 sizeof (struct qb_ipc_response_header));
1044
1045 if (error != CS_OK) {
1046 goto error_exit;
1047 }
1048
1049 res = munmap ((void *)header, header->map_size);
1050 if (res == -1) {
1051 error = qb_to_cs_error(-errno);
1052
1053 goto error_exit;
1054 }
1055
1056 error_exit:
1057 hdb_handle_put (&cpg_handle_t_db, handle);
1058
1059 return (error);
1060 }
1061
1062 cs_error_t cpg_zcb_mcast_joined (
1063 cpg_handle_t handle,
1064 cpg_guarantee_t guarantee,
1065 void *msg,
1066 size_t msg_len)
1067 {
1068 cs_error_t error;
1069 struct cpg_inst *cpg_inst;
1070 struct req_lib_cpg_mcast *req_lib_cpg_mcast;
1071 struct res_lib_cpg_mcast res_lib_cpg_mcast;
1072 mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
1073 struct coroipcs_zc_header *hdr;
1074 struct iovec iovec;
1075
1076 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1077 if (error != CS_OK) {
1078 return (error);
1079 }
1080
1081 if (msg_len > IPC_REQUEST_SIZE) {
1082 error = CS_ERR_TOO_BIG;
1083 goto error_exit;
1084 }
1085
1086 req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
1087 req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
1088 msg_len;
1089
1090 req_lib_cpg_mcast->header.id = MESSAGE_REQ_CPG_MCAST;
1091 req_lib_cpg_mcast->guarantee = guarantee;
1092 req_lib_cpg_mcast->msglen = msg_len;
1093
1094 hdr = (struct coroipcs_zc_header *)(((char *)req_lib_cpg_mcast) - sizeof (struct coroipcs_zc_header));
1095
1096 req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
1097 req_coroipcc_zc_execute.header.id = MESSAGE_REQ_CPG_ZC_EXECUTE;
1098 req_coroipcc_zc_execute.server_address = hdr->server_address;
1099
1100 iovec.iov_base = (void *)&req_coroipcc_zc_execute;
1101 iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
1102
1103 error = coroipcc_msg_send_reply_receive (
1104 cpg_inst->c,
1105 &iovec,
1106 1,
1107 &res_lib_cpg_mcast,
1108 sizeof(res_lib_cpg_mcast));
1109
1110 if (error != CS_OK) {
1111 goto error_exit;
1112 }
1113
1114 error = res_lib_cpg_mcast.header.error;
1115
1116 error_exit:
1117 hdb_handle_put (&cpg_handle_t_db, handle);
1118
1119 return (error);
1120 }
1121
1122 static cs_error_t send_fragments (
1123 struct cpg_inst *cpg_inst,
1124 cpg_guarantee_t guarantee,
1125 size_t msg_len,
1126 const struct iovec *iovec,
1127 unsigned int iov_len)
1128 {
1129 int i;
1130 cs_error_t error = CS_OK;
1131 struct iovec iov[2];
1132 struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
1133 struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
1134 size_t sent = 0;
1135 size_t iov_sent = 0;
1136 int retry_count;
1137
1138 req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST;
1139 req_lib_cpg_mcast.guarantee = guarantee;
1140 req_lib_cpg_mcast.msglen = msg_len;
1141
1142 iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1143 iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
1144
1145 i=0;
1146 iov_sent = 0 ;
1147 qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1148
1149 while (error == CS_OK && sent < msg_len) {
1150
1151 retry_count = 0;
1152 if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
1153 iov[1].iov_len = cpg_inst->max_msg_size;
1154 }
1155 else {
1156 iov[1].iov_len = iovec[i].iov_len - iov_sent;
1157 }
1158
1159 if (sent == 0) {
1160 req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
1161 }
1162 else if ((sent + iov[1].iov_len) == msg_len) {
1163 req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
1164 }
1165 else {
1166 req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
1167 }
1168
1169 req_lib_cpg_mcast.fraglen = iov[1].iov_len;
1170 req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
1171 iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
1172
1173 resend:
1174 error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
1175 &res_lib_cpg_partial_send,
1176 sizeof (res_lib_cpg_partial_send));
1177
1178 if (error == CS_ERR_TRY_AGAIN) {
1179 fprintf(stderr, "sleep. counter=%d\n", retry_count);
1180 if (++retry_count > MAX_RETRIES) {
1181 goto error_exit;
1182 }
1183 usleep(10000);
1184 goto resend;
1185 }
1186
1187 iov_sent += iov[1].iov_len;
1188 sent += iov[1].iov_len;
1189
1190 /* Next iovec */
1191 if (iov_sent >= iovec[i].iov_len) {
1192 i++;
1193 iov_sent = 0;
1194 }
1195 error = res_lib_cpg_partial_send.header.error;
1196 }
1197 error_exit:
1198 qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1199
1200 return error;
1201 }
1202
1203
1204 cs_error_t cpg_mcast_joined (
1205 cpg_handle_t handle,
1206 cpg_guarantee_t guarantee,
1207 const struct iovec *iovec,
1208 unsigned int iov_len)
1209 {
1210 int i;
1211 cs_error_t error;
1212 struct cpg_inst *cpg_inst;
1213 struct iovec iov[64];
1214 struct req_lib_cpg_mcast req_lib_cpg_mcast;
1215 size_t msg_len = 0;
1216
1217 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1218 if (error != CS_OK) {
1219 return (error);
1220 }
1221
1222 for (i = 0; i < iov_len; i++ ) {
1223 msg_len += iovec[i].iov_len;
1224 }
1225
1226 if (msg_len > cpg_inst->max_msg_size) {
1227 error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
1228 goto error_exit;
1229 }
1230
1231 req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
1232 msg_len;
1233
1234 req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST;
1235 req_lib_cpg_mcast.guarantee = guarantee;
1236 req_lib_cpg_mcast.msglen = msg_len;
1237
1238 iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1239 iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
1240 memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
1241
1242 qb_ipcc_fc_enable_max_set(cpg_inst->c, 2);
1243 error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
1244 qb_ipcc_fc_enable_max_set(cpg_inst->c, 1);
1245
1246 error_exit:
1247 hdb_handle_put (&cpg_handle_t_db, handle);
1248
1249 return (error);
1250 }
1251
1252 cs_error_t cpg_iteration_initialize(
1253 cpg_handle_t handle,
1254 cpg_iteration_type_t iteration_type,
1255 const struct cpg_name *group,
1256 cpg_iteration_handle_t *cpg_iteration_handle)
1257 {
1258 cs_error_t error;
1259 struct iovec iov;
1260 struct cpg_inst *cpg_inst;
1261 struct cpg_iteration_instance_t *cpg_iteration_instance;
1262 struct req_lib_cpg_iterationinitialize req_lib_cpg_iterationinitialize;
1263 struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
1264
1265 if (group && group->length > CPG_MAX_NAME_LENGTH) {
1266 return (CS_ERR_NAME_TOO_LONG);
1267 }
1268 if (cpg_iteration_handle == NULL) {
1269 return (CS_ERR_INVALID_PARAM);
1270 }
1271
1272 if ((iteration_type == CPG_ITERATION_ONE_GROUP && group == NULL) ||
1273 (iteration_type != CPG_ITERATION_ONE_GROUP && group != NULL)) {
1274 return (CS_ERR_INVALID_PARAM);
1275 }
1276
1277 if (iteration_type != CPG_ITERATION_NAME_ONLY && iteration_type != CPG_ITERATION_ONE_GROUP &&
1278 iteration_type != CPG_ITERATION_ALL) {
1279
1280 return (CS_ERR_INVALID_PARAM);
1281 }
1282
1283 error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1284 if (error != CS_OK) {
1285 return (error);
1286 }
1287
1288 error = hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1289 sizeof (struct cpg_iteration_instance_t), cpg_iteration_handle));
1290 if (error != CS_OK) {
1291 goto error_put_cpg_db;
1292 }
1293
1294 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1295 (void *)&cpg_iteration_instance));
1296 if (error != CS_OK) {
1297 goto error_destroy;
1298 }
1299
1300 cpg_iteration_instance->conn = cpg_inst->c;
1301
1302 qb_list_init (&cpg_iteration_instance->list);
1303
1304 req_lib_cpg_iterationinitialize.header.size = sizeof (struct req_lib_cpg_iterationinitialize);
1305 req_lib_cpg_iterationinitialize.header.id = MESSAGE_REQ_CPG_ITERATIONINITIALIZE;
1306 req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
1307 if (group) {
1308 marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
1309 }
1310
1311 iov.iov_base = (void *)&req_lib_cpg_iterationinitialize;
1312 iov.iov_len = sizeof (struct req_lib_cpg_iterationinitialize);
1313
1314 error = coroipcc_msg_send_reply_receive (cpg_inst->c,
1315 &iov,
1316 1,
1317 &res_lib_cpg_iterationinitialize,
1318 sizeof (struct res_lib_cpg_iterationinitialize));
1319
1320 if (error != CS_OK) {
1321 goto error_put_destroy;
1322 }
1323
1324 cpg_iteration_instance->executive_iteration_handle =
1325 res_lib_cpg_iterationinitialize.iteration_handle;
1326 cpg_iteration_instance->cpg_iteration_handle = *cpg_iteration_handle;
1327
1328 qb_list_add (&cpg_iteration_instance->list, &cpg_inst->iteration_list_head);
1329
1330 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1331 hdb_handle_put (&cpg_handle_t_db, handle);
1332
1333 return (res_lib_cpg_iterationinitialize.header.error);
1334
1335 error_put_destroy:
1336 hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1337 error_destroy:
1338 hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1339 error_put_cpg_db:
1340 hdb_handle_put (&cpg_handle_t_db, handle);
1341
1342 return (error);
1343 }
1344
1345 cs_error_t cpg_iteration_next(
1346 cpg_iteration_handle_t handle,
1347 struct cpg_iteration_description_t *description)
1348 {
1349 cs_error_t error;
1350 struct cpg_iteration_instance_t *cpg_iteration_instance;
1351 struct req_lib_cpg_iterationnext req_lib_cpg_iterationnext;
1352 struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
1353
1354 if (description == NULL) {
1355 return CS_ERR_INVALID_PARAM;
1356 }
1357
1358 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1359 (void *)&cpg_iteration_instance));
1360 if (error != CS_OK) {
1361 goto error_exit;
1362 }
1363
1364 req_lib_cpg_iterationnext.header.size = sizeof (struct req_lib_cpg_iterationnext);
1365 req_lib_cpg_iterationnext.header.id = MESSAGE_REQ_CPG_ITERATIONNEXT;
1366 req_lib_cpg_iterationnext.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1367
1368 error = qb_to_cs_error (qb_ipcc_send (cpg_iteration_instance->conn,
1369 &req_lib_cpg_iterationnext,
1370 req_lib_cpg_iterationnext.header.size));
1371 if (error != CS_OK) {
1372 goto error_put;
1373 }
1374
1375 error = qb_to_cs_error (qb_ipcc_recv (cpg_iteration_instance->conn,
1376 &res_lib_cpg_iterationnext,
1377 sizeof(struct res_lib_cpg_iterationnext), -1));
1378 if (error != CS_OK) {
1379 goto error_put;
1380 }
1381
1382 marshall_from_mar_cpg_iteration_description_t(
1383 description,
1384 &res_lib_cpg_iterationnext.description);
1385
1386 error = res_lib_cpg_iterationnext.header.error;
1387
1388 error_put:
1389 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1390
1391 error_exit:
1392 return (error);
1393 }
1394
1395 cs_error_t cpg_iteration_finalize (
1396 cpg_iteration_handle_t handle)
1397 {
1398 cs_error_t error;
1399 struct iovec iov;
1400 struct cpg_iteration_instance_t *cpg_iteration_instance;
1401 struct req_lib_cpg_iterationfinalize req_lib_cpg_iterationfinalize;
1402 struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
1403
1404 error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1405 (void *)&cpg_iteration_instance));
1406 if (error != CS_OK) {
1407 goto error_exit;
1408 }
1409
1410 req_lib_cpg_iterationfinalize.header.size = sizeof (struct req_lib_cpg_iterationfinalize);
1411 req_lib_cpg_iterationfinalize.header.id = MESSAGE_REQ_CPG_ITERATIONFINALIZE;
1412 req_lib_cpg_iterationfinalize.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1413
1414 iov.iov_base = (void *)&req_lib_cpg_iterationfinalize;
1415 iov.iov_len = sizeof (struct req_lib_cpg_iterationfinalize);
1416
1417 error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->conn,
1418 &iov,
1419 1,
1420 &res_lib_cpg_iterationfinalize,
1421 sizeof (struct req_lib_cpg_iterationfinalize));
1422
1423 if (error != CS_OK) {
1424 goto error_put;
1425 }
1426
1427 cpg_iteration_instance_finalize (cpg_iteration_instance);
1428 hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
1429
1430 return (res_lib_cpg_iterationfinalize.header.error);
1431
1432 error_put:
1433 hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1434 error_exit:
1435 return (error);
1436 }
1437
1438 /** @} */
1439