1    	/*
2    	 * vi: set autoindent tabstop=4 shiftwidth=4 :
3    	 *
4    	 * Copyright (c) 2006-2015 Red Hat, Inc.
5    	 *
6    	 * All rights reserved.
7    	 *
8    	 * Author: Christine Caulfield (ccaulfi@redhat.com)
9    	 * Author: Jan Friesse (jfriesse@redhat.com)
10   	 *
11   	 * This software licensed under BSD license, the text of which follows:
12   	 *
13   	 * Redistribution and use in source and binary forms, with or without
14   	 * modification, are permitted provided that the following conditions are met:
15   	 *
16   	 * - Redistributions of source code must retain the above copyright notice,
17   	 *   this list of conditions and the following disclaimer.
18   	 * - Redistributions in binary form must reproduce the above copyright notice,
19   	 *   this list of conditions and the following disclaimer in the documentation
20   	 *   and/or other materials provided with the distribution.
21   	 * - Neither the name of the MontaVista Software, Inc. nor the names of its
22   	 *   contributors may be used to endorse or promote products derived from this
23   	 *   software without specific prior written permission.
24   	 *
25   	 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26   	 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27   	 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28   	 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29   	 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30   	 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31   	 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32   	 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33   	 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34   	 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
35   	 * THE POSSIBILITY OF SUCH DAMAGE.
36   	 */
37   	/*
38   	 * Provides a closed process group API using the coroipcc executive
39   	 */
40   	
41   	#include <config.h>
42   	
43   	#include <stdlib.h>
44   	#include <stdio.h>
45   	#include <string.h>
46   	#include <unistd.h>
47   	#include <sys/types.h>
48   	#include <sys/socket.h>
49   	#include <sys/mman.h>
50   	#include <sys/uio.h>
51   	#include <sys/stat.h>
52   	#include <errno.h>
53   	#include <limits.h>
54   	
55   	#include <qb/qblist.h>
56   	#include <qb/qbdefs.h>
57   	#include <qb/qbipcc.h>
58   	#include <qb/qblog.h>
59   	
60   	#include <corosync/hdb.h>
61   	#include <corosync/corotypes.h>
62   	#include <corosync/corodefs.h>
63   	#include <corosync/cpg.h>
64   	#include <corosync/ipc_cpg.h>
65   	
66   	#include "util.h"
67   	
68   	#ifndef MAP_ANONYMOUS
69   	#define MAP_ANONYMOUS MAP_ANON
70   	#endif
71   	
72   	/*
73   	 * Maximum number of times to retry a send when transmitting
74   	 * a large message fragment
75   	 */
76   	#define MAX_RETRIES 100
77   	
78   	/*
79   	 * ZCB files have following umask (umask is same as used in libqb)
80   	 */
81   	#define CPG_MEMORY_MAP_UMASK		077
82   	
83   	struct cpg_assembly_data
84   	{
85   		struct qb_list_head list;
86   		uint32_t nodeid;
87   		uint32_t pid;
88   		char *assembly_buf;
89   		uint32_t assembly_buf_ptr;
90   	};
91   	
92   	struct cpg_inst {
93   		qb_ipcc_connection_t *c;
94   		int finalize;
95   		void *context;
96   		union {
97   			cpg_model_data_t model_data;
98   			cpg_model_v1_data_t model_v1_data;
99   		};
100  		struct qb_list_head iteration_list_head;
101  		uint32_t max_msg_size;
102  		struct qb_list_head assembly_list_head;
103  	};
104  	static void cpg_inst_free (void *inst);
105  	
106  	DECLARE_HDB_DATABASE(cpg_handle_t_db, cpg_inst_free);
107  	
108  	struct cpg_iteration_instance_t {
109  		cpg_iteration_handle_t cpg_iteration_handle;
110  		qb_ipcc_connection_t *conn;
111  		hdb_handle_t executive_iteration_handle;
112  		struct qb_list_head list;
113  	};
114  	
115  	DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
116  	
117  	
118  	/*
119  	 * Internal (not visible by API) functions
120  	 */
121  	
122  	static cs_error_t
123  	coroipcc_msg_send_reply_receive (
124  		qb_ipcc_connection_t *c,
125  		const struct iovec *iov,
126  		unsigned int iov_len,
127  		void *res_msg,
128  		size_t res_len)
129  	{
130  		return qb_to_cs_error(qb_ipcc_sendv_recv(c, iov, iov_len, res_msg, res_len,
131  					CS_IPC_TIMEOUT_MS));
132  	}
133  	
134  	static void cpg_iteration_instance_finalize (struct cpg_iteration_instance_t *cpg_iteration_instance)
135  	{
136  		qb_list_del (&cpg_iteration_instance->list);
137  		hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
138  	}
139  	
140  	static void cpg_inst_free (void *inst)
141  	{
142  		struct cpg_inst *cpg_inst = (struct cpg_inst *)inst;
143  		qb_ipcc_disconnect(cpg_inst->c);
144  	}
145  	
146  	static void cpg_inst_finalize (struct cpg_inst *cpg_inst, hdb_handle_t handle)
147  	{
148  		struct qb_list_head *iter, *tmp_iter;
149  		struct cpg_iteration_instance_t *cpg_iteration_instance;
150  	
151  		/*
152  		 * Traverse thru iteration instances and delete them
153  		 */
154  		qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->iteration_list_head)) {
155  			cpg_iteration_instance = qb_list_entry (iter, struct cpg_iteration_instance_t, list);
156  	
157  			cpg_iteration_instance_finalize (cpg_iteration_instance);
158  		}
159  		hdb_handle_destroy (&cpg_handle_t_db, handle);
160  	}
161  	
162  	/**
163  	 * @defgroup cpg_coroipcc The closed process group API
164  	 * @ingroup coroipcc
165  	 *
166  	 * @{
167  	 */
168  	
169  	cs_error_t cpg_initialize (
170  		cpg_handle_t *handle,
171  		cpg_callbacks_t *callbacks)
172  	{
173  		cpg_model_v1_data_t model_v1_data;
174  	
175  		memset (&model_v1_data, 0, sizeof (cpg_model_v1_data_t));
176  	
177  		if (callbacks) {
178  			model_v1_data.cpg_deliver_fn = callbacks->cpg_deliver_fn;
179  			model_v1_data.cpg_confchg_fn = callbacks->cpg_confchg_fn;
180  		}
181  	
182  		return (cpg_model_initialize (handle, CPG_MODEL_V1, (cpg_model_data_t *)&model_v1_data, NULL));
183  	}
184  	
185  	cs_error_t cpg_model_initialize (
186  		cpg_handle_t *handle,
187  		cpg_model_t model,
188  		cpg_model_data_t *model_data,
189  		void *context)
190  	{
191  		cs_error_t error;
192  		struct cpg_inst *cpg_inst;
193  	
194  		if (model != CPG_MODEL_V1) {
195  			error = CS_ERR_INVALID_PARAM;
196  			goto error_no_destroy;
197  		}
198  	
199  		error = hdb_error_to_cs (hdb_handle_create (&cpg_handle_t_db, sizeof (struct cpg_inst), handle));
200  		if (error != CS_OK) {
201  			goto error_no_destroy;
202  		}
203  	
204  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, *handle, (void *)&cpg_inst));
205  		if (error != CS_OK) {
206  			goto error_destroy;
207  		}
208  	
209  		cpg_inst->c = qb_ipcc_connect ("cpg", IPC_REQUEST_SIZE);
210  		if (cpg_inst->c == NULL) {
211  			error = qb_to_cs_error(-errno);
212  			goto error_put_destroy;
213  		}
214  	
215  		if (model_data != NULL) {
216  			switch (model) {
217  			case CPG_MODEL_V1:
218  				memcpy (&cpg_inst->model_v1_data, model_data, sizeof (cpg_model_v1_data_t));
219  				if ((cpg_inst->model_v1_data.flags & ~(CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF)) != 0) {
220  					error = CS_ERR_INVALID_PARAM;
221  	
222  					goto error_destroy;
223  				}
224  				break;
225  			}
226  		}
227  	
228  		/* Allow space for corosync internal headers */
229  		cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
230  		cpg_inst->model_data.model = model;
231  		cpg_inst->context = context;
232  	
233  		qb_list_init(&cpg_inst->iteration_list_head);
234  	
235  		qb_list_init(&cpg_inst->assembly_list_head);
236  	
237  		hdb_handle_put (&cpg_handle_t_db, *handle);
238  	
239  		return (CS_OK);
240  	
241  	error_put_destroy:
242  		hdb_handle_put (&cpg_handle_t_db, *handle);
243  	error_destroy:
244  		hdb_handle_destroy (&cpg_handle_t_db, *handle);
245  	error_no_destroy:
246  		return (error);
247  	}
248  	
249  	cs_error_t cpg_finalize (
250  		cpg_handle_t handle)
251  	{
252  		struct cpg_inst *cpg_inst;
253  		struct iovec iov;
254  		struct req_lib_cpg_finalize req_lib_cpg_finalize;
255  		struct res_lib_cpg_finalize res_lib_cpg_finalize;
256  		cs_error_t error;
257  	
258  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
259  		if (error != CS_OK) {
260  			return (error);
261  		}
262  	
263  		/*
264  		 * Another thread has already started finalizing
265  		 */
266  		if (cpg_inst->finalize) {
267  			hdb_handle_put (&cpg_handle_t_db, handle);
268  			return (CS_ERR_BAD_HANDLE);
269  		}
270  	
271  		cpg_inst->finalize = 1;
272  	
273  		/*
274  		 * Send service request
275  		 */
276  		req_lib_cpg_finalize.header.size = sizeof (struct req_lib_cpg_finalize);
277  		req_lib_cpg_finalize.header.id = MESSAGE_REQ_CPG_FINALIZE;
278  	
279  		iov.iov_base = (void *)&req_lib_cpg_finalize;
280  		iov.iov_len = sizeof (struct req_lib_cpg_finalize);
281  	
282  		error = coroipcc_msg_send_reply_receive (cpg_inst->c,
283  			&iov,
284  			1,
285  			&res_lib_cpg_finalize,
286  			sizeof (struct res_lib_cpg_finalize));
287  	
288  		cpg_inst_finalize (cpg_inst, handle);
289  		hdb_handle_put (&cpg_handle_t_db, handle);
290  	
291  		return (error);
292  	}
293  	
294  	cs_error_t cpg_fd_get (
295  		cpg_handle_t handle,
296  		int *fd)
297  	{
298  		cs_error_t error;
299  		struct cpg_inst *cpg_inst;
300  	
301  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
302  		if (error != CS_OK) {
303  			return (error);
304  		}
305  	
306  		error = qb_to_cs_error (qb_ipcc_fd_get (cpg_inst->c, fd));
307  	
308  		hdb_handle_put (&cpg_handle_t_db, handle);
309  	
310  		return (error);
311  	}
312  	
313  	cs_error_t cpg_max_atomic_msgsize_get (
314  		cpg_handle_t handle,
315  		uint32_t *size)
316  	{
317  		cs_error_t error;
318  		struct cpg_inst *cpg_inst;
319  	
320  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
321  		if (error != CS_OK) {
322  			return (error);
323  		}
324  	
325  		*size = cpg_inst->max_msg_size;
326  	
327  		hdb_handle_put (&cpg_handle_t_db, handle);
328  	
329  		return (error);
330  	}
331  	
332  	cs_error_t cpg_context_get (
333  		cpg_handle_t handle,
334  		void **context)
335  	{
336  		cs_error_t error;
337  		struct cpg_inst *cpg_inst;
338  	
339  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
340  		if (error != CS_OK) {
341  			return (error);
342  		}
343  	
344  		*context = cpg_inst->context;
345  	
346  		hdb_handle_put (&cpg_handle_t_db, handle);
347  	
348  		return (CS_OK);
349  	}
350  	
351  	cs_error_t cpg_context_set (
352  		cpg_handle_t handle,
353  		void *context)
354  	{
355  		cs_error_t error;
356  		struct cpg_inst *cpg_inst;
357  	
358  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
359  		if (error != CS_OK) {
360  			return (error);
361  		}
362  	
363  		cpg_inst->context = context;
364  	
365  		hdb_handle_put (&cpg_handle_t_db, handle);
366  	
367  		return (CS_OK);
368  	}
369  	
370  	cs_error_t cpg_dispatch (
371  		cpg_handle_t handle,
372  		cs_dispatch_flags_t dispatch_types)
373  	{
374  		int timeout = -1;
375  		cs_error_t error;
376  		int cont = 1; /* always continue do loop except when set to 0 */
377  		struct cpg_inst *cpg_inst;
378  		struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
379  		struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
380  		struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
381  		struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
382  		struct cpg_inst cpg_inst_copy;
383  		struct qb_ipc_response_header *dispatch_data;
384  		struct cpg_address member_list[CPG_MEMBERS_MAX];
385  		struct cpg_address left_list[CPG_MEMBERS_MAX];
386  		struct cpg_address joined_list[CPG_MEMBERS_MAX];
387  		struct cpg_name group_name;
388  		struct cpg_assembly_data *assembly_data;
389  		struct qb_list_head *iter, *tmp_iter;
390  		mar_cpg_address_t *left_list_start;
391  		mar_cpg_address_t *joined_list_start;
392  		unsigned int i;
393  		struct cpg_ring_id ring_id;
394  		uint32_t totem_member_list[CPG_MEMBERS_MAX];
395  		int32_t errno_res;
396  		char dispatch_buf[IPC_DISPATCH_SIZE];
397  	
398  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
399  		if (error != CS_OK) {
400  			return (error);
401  		}
402  	
403  		/*
404  		 * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
405  		 * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
406  		 */
407  		if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
408  			timeout = 0;
409  		}
410  	
411  		dispatch_data = (struct qb_ipc_response_header *)dispatch_buf;
412  		do {
413  			errno_res = qb_ipcc_event_recv (
414  				cpg_inst->c,
415  				dispatch_buf,
416  				IPC_DISPATCH_SIZE,
417  				timeout);
418  			error = qb_to_cs_error (errno_res);
419  			if (error == CS_ERR_BAD_HANDLE) {
420  				error = CS_OK;
421  				goto error_put;
422  			}
423  			if (error == CS_ERR_TRY_AGAIN) {
424  				if (dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
425  					/*
426  					 * Don't mask error
427  					 */
428  					goto error_put;
429  				}
430  				error = CS_OK;
431  				if (dispatch_types == CS_DISPATCH_ALL) {
432  					break; /* exit do while cont is 1 loop */
433  				} else {
434  					continue; /* next poll */
435  				}
436  			}
437  			if (error != CS_OK) {
438  				goto error_put;
439  			}
440  	
441  			/*
442  			 * Make copy of callbacks, message data, unlock instance, and call callback
443  			 * A risk of this dispatch method is that the callback routines may
444  			 * operate at the same time that cpgFinalize has been called.
445  			 */
446  			memcpy (&cpg_inst_copy, cpg_inst, sizeof (struct cpg_inst));
447  			switch (cpg_inst_copy.model_data.model) {
448  			case CPG_MODEL_V1:
449  				/*
450  				 * Dispatch incoming message
451  				 */
452  				switch (dispatch_data->id) {
453  				case MESSAGE_RES_CPG_DELIVER_CALLBACK:
454  					if (cpg_inst_copy.model_v1_data.cpg_deliver_fn == NULL) {
455  						break;
456  					}
457  	
458  					res_cpg_deliver_callback = (struct res_lib_cpg_deliver_callback *)dispatch_data;
459  	
460  					marshall_from_mar_cpg_name_t (
461  						&group_name,
462  						&res_cpg_deliver_callback->group_name);
463  	
464  					cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
465  						&group_name,
466  						res_cpg_deliver_callback->nodeid,
467  						res_cpg_deliver_callback->pid,
468  						&res_cpg_deliver_callback->message,
469  						res_cpg_deliver_callback->msglen);
470  					break;
471  	
472  				case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK:
473  					res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
474  	
475  					marshall_from_mar_cpg_name_t (
476  						&group_name,
477  						&res_cpg_partial_deliver_callback->group_name);
478  	
479  					/*
480  					 * Search for assembly data for current messages (nodeid, pid) pair in list of assemblies
481  					 */
482  					assembly_data = NULL;
483  					qb_list_for_each(iter, &(cpg_inst->assembly_list_head)) {
484  						struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
485  						if (current_assembly_data->nodeid == res_cpg_partial_deliver_callback->nodeid && current_assembly_data->pid == res_cpg_partial_deliver_callback->pid) {
486  							assembly_data = current_assembly_data;
487  							break;
488  						}
489  					}
490  	
491  					if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
492  	
493  						/*
494  						 * As this is LIBCPG_PARTIAL_FIRST packet, check that there is no ongoing assembly.
495  						 * Otherwise the sending of packet must have been interrupted and error should have
496  						 * been reported to sending client. Therefore here last assembly will be dropped.
497  						 */
498  						if (assembly_data) {
499  							qb_list_del (&assembly_data->list);
500  							free(assembly_data->assembly_buf);
501  							free(assembly_data);
502  							// coverity[UNUSED_VALUE:SUPPRESS] defensive programming
503  							assembly_data = NULL;
504  						}
505  	
506  						assembly_data = malloc(sizeof(struct cpg_assembly_data));
507  						if (!assembly_data) {
508  							error = CS_ERR_NO_MEMORY;
509  							goto error_put;
510  						}
511  	
512  						assembly_data->nodeid = res_cpg_partial_deliver_callback->nodeid;
513  						assembly_data->pid = res_cpg_partial_deliver_callback->pid;
514  						assembly_data->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
515  						if (!assembly_data->assembly_buf) {
516  							free(assembly_data);
517  							error = CS_ERR_NO_MEMORY;
518  							goto error_put;
519  						}
520  						assembly_data->assembly_buf_ptr = 0;
521  						qb_list_init (&assembly_data->list);
522  	
523  						qb_list_add (&assembly_data->list, &cpg_inst->assembly_list_head);
524  					}
525  					if (assembly_data) {
526  						memcpy(assembly_data->assembly_buf + assembly_data->assembly_buf_ptr,
527  							res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
528  						assembly_data->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
529  	
530  						if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
531  							if (cpg_inst_copy.model_v1_data.cpg_deliver_fn != NULL) {
532  								cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
533  									&group_name,
534  									res_cpg_partial_deliver_callback->nodeid,
535  									res_cpg_partial_deliver_callback->pid,
536  									assembly_data->assembly_buf,
537  									res_cpg_partial_deliver_callback->msglen);
538  							}
539  	
540  							qb_list_del (&assembly_data->list);
541  							free(assembly_data->assembly_buf);
542  							free(assembly_data);
543  						}
544  					}
545  					break;
546  	
547  				case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
548  					if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
549  						break;
550  					}
551  	
552  					res_cpg_confchg_callback = (struct res_lib_cpg_confchg_callback *)dispatch_data;
553  	
554  					for (i = 0; i < res_cpg_confchg_callback->member_list_entries; i++) {
555  						marshall_from_mar_cpg_address_t (&member_list[i],
556  							&res_cpg_confchg_callback->member_list[i]);
557  					}
558  					left_list_start = res_cpg_confchg_callback->member_list +
559  						res_cpg_confchg_callback->member_list_entries;
560  					for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
561  						marshall_from_mar_cpg_address_t (&left_list[i],
562  							&left_list_start[i]);
563  					}
564  					joined_list_start = res_cpg_confchg_callback->member_list +
565  						res_cpg_confchg_callback->member_list_entries +
566  						res_cpg_confchg_callback->left_list_entries;
567  					for (i = 0; i < res_cpg_confchg_callback->joined_list_entries; i++) {
568  						marshall_from_mar_cpg_address_t (&joined_list[i],
569  							&joined_list_start[i]);
570  					}
571  					marshall_from_mar_cpg_name_t (
572  						&group_name,
573  						&res_cpg_confchg_callback->group_name);
574  	
575  					cpg_inst_copy.model_v1_data.cpg_confchg_fn (handle,
576  						&group_name,
577  						member_list,
578  						res_cpg_confchg_callback->member_list_entries,
579  						left_list,
580  						res_cpg_confchg_callback->left_list_entries,
581  						joined_list,
582  						res_cpg_confchg_callback->joined_list_entries);
583  	
584  					/*
585  					 * If member left while his partial packet was being assembled, assembly data must be removed from list
586  					 */
587  					for (i = 0; i < res_cpg_confchg_callback->left_list_entries; i++) {
588  						qb_list_for_each_safe(iter, tmp_iter, &(cpg_inst->assembly_list_head)) {
589  							struct cpg_assembly_data *current_assembly_data = qb_list_entry (iter, struct cpg_assembly_data, list);
590  							if (current_assembly_data->nodeid != left_list[i].nodeid || current_assembly_data->pid != left_list[i].pid)
591  								continue;
592  	
593  							qb_list_del (&current_assembly_data->list);
594  							free(current_assembly_data->assembly_buf);
595  							free(current_assembly_data);
596  						}
597  					}
598  	
599  					break;
600  				case MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK:
601  					if (cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn == NULL) {
602  						break;
603  					}
604  	
605  					res_cpg_totem_confchg_callback = (struct res_lib_cpg_totem_confchg_callback *)dispatch_data;
606  	
607  					marshall_from_mar_cpg_ring_id_t (&ring_id, &res_cpg_totem_confchg_callback->ring_id);
608  					for (i = 0; i < res_cpg_totem_confchg_callback->member_list_entries; i++) {
609  						totem_member_list[i] = res_cpg_totem_confchg_callback->member_list[i];
610  					}
611  	
612  					cpg_inst_copy.model_v1_data.cpg_totem_confchg_fn (handle,
613  						ring_id,
614  						res_cpg_totem_confchg_callback->member_list_entries,
615  						totem_member_list);
616  					break;
617  				default:
618  					error = CS_ERR_LIBRARY;
619  					goto error_put;
620  					break;
621  				} /* - switch (dispatch_data->id) */
622  				break; /* case CPG_MODEL_V1 */
623  			} /* - switch (cpg_inst_copy.model_data.model) */
624  	
625  			if (cpg_inst_copy.finalize || cpg_inst->finalize) {
626  				/*
627  				 * If the finalize has been called then get out of the dispatch.
628  				 */
629  				cpg_inst->finalize = 1;
630  				error = CS_ERR_BAD_HANDLE;
631  				goto error_put;
632  			}
633  	
634  			/*
635  			 * Determine if more messages should be processed
636  			 */
637  			if (dispatch_types == CS_DISPATCH_ONE || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
638  				cont = 0;
639  			}
640  		} while (cont);
641  	
642  	error_put:
643  		hdb_handle_put (&cpg_handle_t_db, handle);
644  		return (error);
645  	}
646  	
647  	cs_error_t cpg_join (
648  	    cpg_handle_t handle,
649  	    const struct cpg_name *group)
650  	{
651  		cs_error_t error;
652  		struct cpg_inst *cpg_inst;
653  		struct iovec iov[2];
654  		struct req_lib_cpg_join req_lib_cpg_join;
655  		struct res_lib_cpg_join response;
656  	
657  		if (group->length > CPG_MAX_NAME_LENGTH) {
658  			return (CS_ERR_NAME_TOO_LONG);
659  		}
660  	
661  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
662  		if (error != CS_OK) {
663  			return (error);
664  		}
665  	
666  		/* Now join */
667  		req_lib_cpg_join.header.size = sizeof (struct req_lib_cpg_join);
668  		req_lib_cpg_join.header.id = MESSAGE_REQ_CPG_JOIN;
669  		req_lib_cpg_join.pid = getpid();
670  		req_lib_cpg_join.flags = 0;
671  	
672  		switch (cpg_inst->model_data.model) {
673  		case CPG_MODEL_V1:
674  			req_lib_cpg_join.flags = cpg_inst->model_v1_data.flags;
675  			break;
676  		}
677  	
678  		marshall_to_mar_cpg_name_t (&req_lib_cpg_join.group_name,
679  			group);
680  	
681  		iov[0].iov_base = (void *)&req_lib_cpg_join;
682  		iov[0].iov_len = sizeof (struct req_lib_cpg_join);
683  	
684  		do {
685  			error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
686  				&response, sizeof (struct res_lib_cpg_join));
687  	
688  			if (error != CS_OK) {
689  				goto error_exit;
690  			}
691  		} while (response.header.error == CS_ERR_BUSY);
692  	
693  		error = response.header.error;
694  	
695  	error_exit:
696  		hdb_handle_put (&cpg_handle_t_db, handle);
697  	
698  		return (error);
699  	}
700  	
701  	cs_error_t cpg_leave (
702  	    cpg_handle_t handle,
703  	    const struct cpg_name *group)
704  	{
705  		cs_error_t error;
706  		struct cpg_inst *cpg_inst;
707  		struct iovec iov[2];
708  		struct req_lib_cpg_leave req_lib_cpg_leave;
709  		struct res_lib_cpg_leave res_lib_cpg_leave;
710  	
711  	        if (group->length > CPG_MAX_NAME_LENGTH) {
712  			return (CS_ERR_NAME_TOO_LONG);
713  	        }
714  	
715  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
716  		if (error != CS_OK) {
717  			return (error);
718  		}
719  	
720  		req_lib_cpg_leave.header.size = sizeof (struct req_lib_cpg_leave);
721  		req_lib_cpg_leave.header.id = MESSAGE_REQ_CPG_LEAVE;
722  		req_lib_cpg_leave.pid = getpid();
723  		marshall_to_mar_cpg_name_t (&req_lib_cpg_leave.group_name,
724  			group);
725  	
726  		iov[0].iov_base = (void *)&req_lib_cpg_leave;
727  		iov[0].iov_len = sizeof (struct req_lib_cpg_leave);
728  	
729  		do {
730  			error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 1,
731  				&res_lib_cpg_leave, sizeof (struct res_lib_cpg_leave));
732  	
733  			if (error != CS_OK) {
734  				goto error_exit;
735  			}
736  		} while (res_lib_cpg_leave.header.error == CS_ERR_BUSY);
737  	
738  		error = res_lib_cpg_leave.header.error;
739  	
740  	error_exit:
741  		hdb_handle_put (&cpg_handle_t_db, handle);
742  	
743  		return (error);
744  	}
745  	
746  	cs_error_t cpg_membership_get (
747  		cpg_handle_t handle,
748  		struct cpg_name *group_name,
749  		struct cpg_address *member_list,
750  		int *member_list_entries)
751  	{
752  		cs_error_t error;
753  		struct cpg_inst *cpg_inst;
754  		struct iovec iov;
755  		struct req_lib_cpg_membership_get req_lib_cpg_membership_get;
756  		struct res_lib_cpg_membership_get res_lib_cpg_membership_get;
757  		unsigned int i;
758  	
759  		if (group_name->length > CPG_MAX_NAME_LENGTH) {
760  			return (CS_ERR_NAME_TOO_LONG);
761  		}
762  		if (member_list == NULL) {
763  			return (CS_ERR_INVALID_PARAM);
764  		}
765  		if (member_list_entries == NULL) {
766  			return (CS_ERR_INVALID_PARAM);
767  		}
768  	
769  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
770  		if (error != CS_OK) {
771  			return (error);
772  		}
773  	
774  		req_lib_cpg_membership_get.header.size = sizeof (struct req_lib_cpg_membership_get);
775  		req_lib_cpg_membership_get.header.id = MESSAGE_REQ_CPG_MEMBERSHIP;
776  	
777  		marshall_to_mar_cpg_name_t (&req_lib_cpg_membership_get.group_name,
778  			group_name);
779  	
780  		iov.iov_base = (void *)&req_lib_cpg_membership_get;
781  		iov.iov_len = sizeof (struct req_lib_cpg_membership_get);
782  	
783  		error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
784  				&res_lib_cpg_membership_get, sizeof (res_lib_cpg_membership_get));
785  	
786  		if (error != CS_OK) {
787  			goto error_exit;
788  		}
789  	
790  		error = res_lib_cpg_membership_get.header.error;
791  	
792  		/*
793  		 * Copy results to caller
794  		 */
795  		*member_list_entries = res_lib_cpg_membership_get.member_count;
796  		if (member_list) {
797  			for (i = 0; i < res_lib_cpg_membership_get.member_count; i++) {
798  				marshall_from_mar_cpg_address_t (&member_list[i],
799  					&res_lib_cpg_membership_get.member_list[i]);
800  			}
801  		}
802  	
803  	error_exit:
804  		hdb_handle_put (&cpg_handle_t_db, handle);
805  	
806  		return (error);
807  	}
808  	
809  	cs_error_t cpg_local_get (
810  		cpg_handle_t handle,
811  		unsigned int *local_nodeid)
812  	{
813  		cs_error_t error;
814  		struct cpg_inst *cpg_inst;
815  		struct iovec iov;
816  		struct req_lib_cpg_local_get req_lib_cpg_local_get;
817  		struct res_lib_cpg_local_get res_lib_cpg_local_get;
818  	
819  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
820  		if (error != CS_OK) {
821  			return (error);
822  		}
823  	
824  		req_lib_cpg_local_get.header.size = sizeof (struct qb_ipc_request_header);
825  		req_lib_cpg_local_get.header.id = MESSAGE_REQ_CPG_LOCAL_GET;
826  	
827  		iov.iov_base = (void *)&req_lib_cpg_local_get;
828  		iov.iov_len = sizeof (struct req_lib_cpg_local_get);
829  	
830  		error = coroipcc_msg_send_reply_receive (cpg_inst->c, &iov, 1,
831  			&res_lib_cpg_local_get, sizeof (res_lib_cpg_local_get));
832  	
833  		if (error != CS_OK) {
834  			goto error_exit;
835  		}
836  	
837  		error = res_lib_cpg_local_get.header.error;
838  	
839  		*local_nodeid = res_lib_cpg_local_get.local_nodeid;
840  	
841  	error_exit:
842  		hdb_handle_put (&cpg_handle_t_db, handle);
843  	
844  		return (error);
845  	}
846  	
847  	cs_error_t cpg_flow_control_state_get (
848  		cpg_handle_t handle,
849  		cpg_flow_control_state_t *flow_control_state)
850  	{
851  		cs_error_t error;
852  		struct cpg_inst *cpg_inst;
853  	
854  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
855  		if (error != CS_OK) {
856  			return (error);
857  		}
858  		*flow_control_state = CPG_FLOW_CONTROL_DISABLED;
859  		error = CS_OK;
860  	
861  		hdb_handle_put (&cpg_handle_t_db, handle);
862  	
863  		return (error);
864  	}
865  	
866  	static int
867  	memory_map (char *path, const char *file, void **buf, size_t bytes)
868  	{
869  		int32_t fd;
870  		void *addr;
871  		int32_t res;
872  		char *buffer;
873  		int32_t i;
874  		size_t written;
875  		size_t page_size; 
876  		long int sysconf_page_size;
877  		mode_t old_umask;
878  	
879  		snprintf (path, PATH_MAX, "/dev/shm/%s", file);
880  	
881  		old_umask = umask(CPG_MEMORY_MAP_UMASK);
882  		fd = mkstemp (path);
883  		(void)umask(old_umask);
(1) Event cond_true: Condition "fd == -1", taking true branch.
884  		if (fd == -1) {
885  			snprintf (path, PATH_MAX, LOCALSTATEDIR "/run/%s", file);
886  			old_umask = umask(CPG_MEMORY_MAP_UMASK);
887  			fd = mkstemp (path);
888  			(void)umask(old_umask);
(2) Event cond_false: Condition "fd == -1", taking false branch.
889  			if (fd == -1) {
890  				return (-1);
(3) Event if_end: End of if statement.
891  			}
892  		}
893  	
894  		res = ftruncate (fd, bytes);
(4) Event cond_false: Condition "res == -1", taking false branch.
895  		if (res == -1) {
896  			goto error_close_unlink;
(5) Event if_end: End of if statement.
897  		}
898  		sysconf_page_size = sysconf(_SC_PAGESIZE);
(6) Event cond_false: Condition "sysconf_page_size <= 0", taking false branch.
899  		if (sysconf_page_size <= 0) {
900  			goto error_close_unlink;
(7) Event if_end: End of if statement.
901  		}
902  		page_size = sysconf_page_size;
903  		buffer = malloc (page_size);
(8) Event cond_false: Condition "buffer == NULL", taking false branch.
904  		if (buffer == NULL) {
905  			goto error_close_unlink;
(9) Event if_end: End of if statement.
906  		}
907  		memset (buffer, 0, page_size);
(10) Event cond_true: Condition "i < bytes / page_size", taking true branch.
(21) Event loop_begin: Jumped back to beginning of loop.
(22) Event cond_true: Condition "i < bytes / page_size", taking true branch.
(29) Event loop_begin: Jumped back to beginning of loop.
(30) Event cond_false: Condition "i < bytes / page_size", taking false branch.
908  		for (i = 0; i < (bytes / page_size); i++) {
(14) Event label: Reached label "retry_write".
909  	retry_write:
910  			written = write (fd, buffer, page_size);
(11) Event cond_true: Condition "written == -1", taking true branch.
(12) Event cond_true: Condition "*__errno_location() == 4", taking true branch.
(15) Event cond_true: Condition "written == -1", taking true branch.
(16) Event cond_false: Condition "*__errno_location() == 4", taking false branch.
(23) Event cond_true: Condition "written == -1", taking true branch.
(24) Event cond_false: Condition "*__errno_location() == 4", taking false branch.
911  			if (written == -1 && errno == EINTR) {
(13) Event goto: Jumping to label "retry_write".
912  				goto retry_write;
(17) Event if_end: End of if statement.
(25) Event if_end: End of if statement.
913  			}
(18) Event cond_false: Condition "written != page_size", taking false branch.
(26) Event cond_false: Condition "written != page_size", taking false branch.
914  			if (written != page_size) {
915  				free (buffer);
916  				goto error_close_unlink;
(19) Event if_end: End of if statement.
(27) Event if_end: End of if statement.
917  			}
(20) Event loop: Jumping back to the beginning of the loop.
(28) Event loop: Jumping back to the beginning of the loop.
(31) Event loop_end: Reached end of loop.
918  		}
919  		free (buffer);
920  	
(32) Event alloc_fn: Storage is returned from allocation function "mmap".
(33) Event assign: Assigning: "addr" = "mmap(NULL, bytes, 3, 1, fd, 0L)".
Also see events: [assign]
921  		addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
922  			MAP_SHARED, fd, 0);
923  	
(34) Event cond_false: Condition "addr == (void *)0xffffffffffffffff", taking false branch.
924  		if (addr == MAP_FAILED) {
925  			goto error_close_unlink;
(35) Event if_end: End of if statement.
926  		}
927  	#ifdef MADV_NOSYNC
928  		madvise(addr, bytes, MADV_NOSYNC);
929  	#endif
930  	
931  		res = close (fd);
(36) Event cond_false: Condition "res", taking false branch.
932  		if (res) {
933  			munmap(addr, bytes);
934  	
935  			return (-1);
(37) Event if_end: End of if statement.
936  		}
(38) Event assign: Assigning: "*buf" = "addr".
Also see events: [alloc_fn][assign]
937  		*buf = addr;
938  	
939  		return 0;
940  	
941  	error_close_unlink:
942  		close (fd);
943  		unlink(path);
944  		return -1;
945  	}
946  	
947  	cs_error_t cpg_zcb_alloc (
948  		cpg_handle_t handle,
949  		size_t size,
950  		void **buffer)
951  	{
952  		void *buf = NULL;
953  		char path[PATH_MAX];
954  		mar_req_coroipcc_zc_alloc_t req_coroipcc_zc_alloc;
955  		struct qb_ipc_response_header res_coroipcs_zc_alloc;
956  		size_t map_size;
957  		struct iovec iovec;
958  		struct coroipcs_zc_header *hdr;
959  		cs_error_t error;
960  		struct cpg_inst *cpg_inst;
961  	
962  		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
(1) Event cond_false: Condition "error != CS_OK", taking false branch.
963  		if (error != CS_OK) {
964  			return (error);
(2) Event if_end: End of if statement.
965  		}
966  	
967  		map_size = size + sizeof (struct req_lib_cpg_mcast) + sizeof (struct coroipcs_zc_header);
(3) Event alloc_arg: "memory_map" allocates memory that is stored into "buf". [details]
(4) Event cond_true: Condition "memory_map(path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1", taking true branch.
(5) Event if_fallthrough: Falling through to end of if statement.
(6) Event if_end: End of if statement.
Also see events: [leaked_storage]
968  		assert(memory_map (path, "corosync_zerocopy-XXXXXX", &buf, map_size) != -1);
969  	
(7) Event cond_false: Condition "strlen(path) >= 128", taking false branch.
970  		if (strlen(path) >= CPG_ZC_PATH_LEN) {
971  			unlink(path);
972  			munmap (buf, map_size);
973  			return (CS_ERR_NAME_TOO_LONG);
(8) Event if_end: End of if statement.
974  		}
975  	
976  		req_coroipcc_zc_alloc.header.size = sizeof (mar_req_coroipcc_zc_alloc_t);
977  		req_coroipcc_zc_alloc.header.id = MESSAGE_REQ_CPG_ZC_ALLOC;
978  		req_coroipcc_zc_alloc.map_size = map_size;
979  		strcpy (req_coroipcc_zc_alloc.path_to_file, path);
980  	
981  		iovec.iov_base = (void *)&req_coroipcc_zc_alloc;
982  		iovec.iov_len = sizeof (mar_req_coroipcc_zc_alloc_t);
983  	
984  		error = coroipcc_msg_send_reply_receive (
985  			cpg_inst->c,
986  			&iovec,
987  			1,
988  			&res_coroipcs_zc_alloc,
989  			sizeof (struct qb_ipc_response_header));
990  	
(9) Event cond_true: Condition "error != CS_OK", taking true branch.
991  		if (error != CS_OK) {
(10) Event goto: Jumping to label "error_exit".
992  			goto error_exit;
993  		}
994  	
995  		hdr = (struct coroipcs_zc_header *)buf;
996  		hdr->map_size = map_size;
997  		*buffer = ((char *)buf) + sizeof (struct coroipcs_zc_header) + sizeof (struct req_lib_cpg_mcast);
998  	
(11) Event label: Reached label "error_exit".
999  	error_exit:
1000 		hdb_handle_put (&cpg_handle_t_db, handle);
1001 		/*
1002 		 * Coverity correctly reports an error here. We cannot safely munmap and unlink the file, because
1003 		 * the timing of the failure is the key issue: if a failure occurs before the IPC reply,
1004 		 * the file should be deleted.
1005 		 * However, if the failure happens during the IPC reply, Corosync has already deleted the file.
1006 		 * This means the cpg library could attempt to delete a non-existing file (not a problem) or,
1007 		 * in a theoretical race condition, delete a new file created by another application.
1008 		 * There are multiple possible solutions, but none of them are ready to be implemented yet.
1009 		 */
(12) Event leaked_storage: Variable "buf" going out of scope leaks the storage it points to.
Also see events: [alloc_arg]
1010 		return (error);
1011 	}
1012 	
1013 	cs_error_t cpg_zcb_free (
1014 		cpg_handle_t handle,
1015 		void *buffer)
1016 	{
1017 		cs_error_t error;
1018 		unsigned int res;
1019 		struct cpg_inst *cpg_inst;
1020 		mar_req_coroipcc_zc_free_t req_coroipcc_zc_free;
1021 		struct qb_ipc_response_header res_coroipcs_zc_free;
1022 		struct iovec iovec;
1023 		struct coroipcs_zc_header *header = (struct coroipcs_zc_header *)((char *)buffer - sizeof (struct coroipcs_zc_header) - sizeof (struct req_lib_cpg_mcast));
1024 	
1025 		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1026 		if (error != CS_OK) {
1027 			return (error);
1028 		}
1029 	
1030 		req_coroipcc_zc_free.header.size = sizeof (mar_req_coroipcc_zc_free_t);
1031 		req_coroipcc_zc_free.header.id = MESSAGE_REQ_CPG_ZC_FREE;
1032 		req_coroipcc_zc_free.map_size = header->map_size;
1033 		req_coroipcc_zc_free.server_address = header->server_address;
1034 	
1035 		iovec.iov_base = (void *)&req_coroipcc_zc_free;
1036 		iovec.iov_len = sizeof (mar_req_coroipcc_zc_free_t);
1037 	
1038 		error = coroipcc_msg_send_reply_receive (
1039 			cpg_inst->c,
1040 			&iovec,
1041 			1,
1042 			&res_coroipcs_zc_free,
1043 			sizeof (struct qb_ipc_response_header));
1044 	
1045 		if (error != CS_OK) {
1046 			goto error_exit;
1047 		}
1048 	
1049 		res = munmap ((void *)header, header->map_size);
1050 		if (res == -1) {
1051 			error = qb_to_cs_error(-errno);
1052 	
1053 			goto error_exit;
1054 		}
1055 	
1056 	error_exit:
1057 		hdb_handle_put (&cpg_handle_t_db, handle);
1058 	
1059 		return (error);
1060 	}
1061 	
1062 	cs_error_t cpg_zcb_mcast_joined (
1063 		cpg_handle_t handle,
1064 		cpg_guarantee_t guarantee,
1065 		void *msg,
1066 		size_t msg_len)
1067 	{
1068 		cs_error_t error;
1069 		struct cpg_inst *cpg_inst;
1070 		struct req_lib_cpg_mcast *req_lib_cpg_mcast;
1071 		struct res_lib_cpg_mcast res_lib_cpg_mcast;
1072 		mar_req_coroipcc_zc_execute_t req_coroipcc_zc_execute;
1073 		struct coroipcs_zc_header *hdr;
1074 		struct iovec iovec;
1075 	
1076 		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1077 		if (error != CS_OK) {
1078 			return (error);
1079 		}
1080 	
1081 		if (msg_len > IPC_REQUEST_SIZE) {
1082 			error = CS_ERR_TOO_BIG;
1083 			goto error_exit;
1084 		}
1085 	
1086 		req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
1087 		req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
1088 			msg_len;
1089 	
1090 		req_lib_cpg_mcast->header.id = MESSAGE_REQ_CPG_MCAST;
1091 		req_lib_cpg_mcast->guarantee = guarantee;
1092 		req_lib_cpg_mcast->msglen = msg_len;
1093 	
1094 		hdr = (struct coroipcs_zc_header *)(((char *)req_lib_cpg_mcast) - sizeof (struct coroipcs_zc_header));
1095 	
1096 		req_coroipcc_zc_execute.header.size = sizeof (mar_req_coroipcc_zc_execute_t);
1097 		req_coroipcc_zc_execute.header.id = MESSAGE_REQ_CPG_ZC_EXECUTE;
1098 		req_coroipcc_zc_execute.server_address = hdr->server_address;
1099 	
1100 		iovec.iov_base = (void *)&req_coroipcc_zc_execute;
1101 		iovec.iov_len = sizeof (mar_req_coroipcc_zc_execute_t);
1102 	
1103 		error = coroipcc_msg_send_reply_receive (
1104 			cpg_inst->c,
1105 			&iovec,
1106 			1,
1107 			&res_lib_cpg_mcast,
1108 			sizeof(res_lib_cpg_mcast));
1109 	
1110 		if (error != CS_OK) {
1111 			goto error_exit;
1112 		}
1113 	
1114 		error = res_lib_cpg_mcast.header.error;
1115 	
1116 	error_exit:
1117 		hdb_handle_put (&cpg_handle_t_db, handle);
1118 	
1119 		return (error);
1120 	}
1121 	
1122 	static cs_error_t send_fragments (
1123 		struct cpg_inst *cpg_inst,
1124 		cpg_guarantee_t guarantee,
1125 		size_t msg_len,
1126 		const struct iovec *iovec,
1127 		unsigned int iov_len)
1128 	{
1129 		int i;
1130 		cs_error_t error = CS_OK;
1131 		struct iovec iov[2];
1132 		struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
1133 		struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
1134 		size_t sent = 0;
1135 		size_t iov_sent = 0;
1136 		int retry_count;
1137 	
1138 		req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST;
1139 		req_lib_cpg_mcast.guarantee = guarantee;
1140 		req_lib_cpg_mcast.msglen = msg_len;
1141 	
1142 		iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1143 		iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
1144 	
1145 		i=0;
1146 		iov_sent = 0 ;
1147 		qb_ipcc_fc_enable_max_set(cpg_inst->c,  2);
1148 	
1149 		while (error == CS_OK && sent < msg_len) {
1150 	
1151 			retry_count = 0;
1152 			if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
1153 				iov[1].iov_len = cpg_inst->max_msg_size;
1154 			}
1155 			else {
1156 				iov[1].iov_len = iovec[i].iov_len - iov_sent;
1157 			}
1158 	
1159 			if (sent == 0) {
1160 				req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
1161 			}
1162 			else if ((sent + iov[1].iov_len) == msg_len) {
1163 				req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
1164 			}
1165 			else {
1166 				req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
1167 			}
1168 	
1169 			req_lib_cpg_mcast.fraglen = iov[1].iov_len;
1170 			req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
1171 			iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
1172 	
1173 		resend:
1174 			error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
1175 								 &res_lib_cpg_partial_send,
1176 								 sizeof (res_lib_cpg_partial_send));
1177 	
1178 			if (error == CS_ERR_TRY_AGAIN) {
1179 				fprintf(stderr, "sleep. counter=%d\n", retry_count);
1180 				if (++retry_count > MAX_RETRIES) {
1181 					goto error_exit;
1182 				}
1183 				usleep(10000);
1184 				goto resend;
1185 			}
1186 	
1187 			iov_sent += iov[1].iov_len;
1188 			sent += iov[1].iov_len;
1189 	
1190 			/* Next iovec */
1191 			if (iov_sent >= iovec[i].iov_len) {
1192 				i++;
1193 				iov_sent = 0;
1194 			}
1195 			error = res_lib_cpg_partial_send.header.error;
1196 		}
1197 	error_exit:
1198 		qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
1199 	
1200 		return error;
1201 	}
1202 	
1203 	
1204 	cs_error_t cpg_mcast_joined (
1205 		cpg_handle_t handle,
1206 		cpg_guarantee_t guarantee,
1207 		const struct iovec *iovec,
1208 		unsigned int iov_len)
1209 	{
1210 		int i;
1211 		cs_error_t error;
1212 		struct cpg_inst *cpg_inst;
1213 		struct iovec iov[64];
1214 		struct req_lib_cpg_mcast req_lib_cpg_mcast;
1215 		size_t msg_len = 0;
1216 	
1217 		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1218 		if (error != CS_OK) {
1219 			return (error);
1220 		}
1221 	
1222 		for (i = 0; i < iov_len; i++ ) {
1223 			msg_len += iovec[i].iov_len;
1224 		}
1225 	
1226 		if (msg_len > cpg_inst->max_msg_size) {
1227 			error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
1228 			goto error_exit;
1229 		}
1230 	
1231 		req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
1232 			msg_len;
1233 	
1234 		req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_MCAST;
1235 		req_lib_cpg_mcast.guarantee = guarantee;
1236 		req_lib_cpg_mcast.msglen = msg_len;
1237 	
1238 		iov[0].iov_base = (void *)&req_lib_cpg_mcast;
1239 		iov[0].iov_len = sizeof (struct req_lib_cpg_mcast);
1240 		memcpy (&iov[1], iovec, iov_len * sizeof (struct iovec));
1241 	
1242 		qb_ipcc_fc_enable_max_set(cpg_inst->c,  2);
1243 		error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
1244 		qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
1245 	
1246 	error_exit:
1247 		hdb_handle_put (&cpg_handle_t_db, handle);
1248 	
1249 		return (error);
1250 	}
1251 	
1252 	cs_error_t cpg_iteration_initialize(
1253 		cpg_handle_t handle,
1254 		cpg_iteration_type_t iteration_type,
1255 		const struct cpg_name *group,
1256 		cpg_iteration_handle_t *cpg_iteration_handle)
1257 	{
1258 		cs_error_t error;
1259 		struct iovec iov;
1260 		struct cpg_inst *cpg_inst;
1261 		struct cpg_iteration_instance_t *cpg_iteration_instance;
1262 		struct req_lib_cpg_iterationinitialize req_lib_cpg_iterationinitialize;
1263 		struct res_lib_cpg_iterationinitialize res_lib_cpg_iterationinitialize;
1264 	
1265 		if (group && group->length > CPG_MAX_NAME_LENGTH) {
1266 			return (CS_ERR_NAME_TOO_LONG);
1267 		}
1268 		if (cpg_iteration_handle == NULL) {
1269 			return (CS_ERR_INVALID_PARAM);
1270 		}
1271 	
1272 		if ((iteration_type == CPG_ITERATION_ONE_GROUP && group == NULL) ||
1273 			(iteration_type != CPG_ITERATION_ONE_GROUP && group != NULL)) {
1274 			return (CS_ERR_INVALID_PARAM);
1275 		}
1276 	
1277 		if (iteration_type != CPG_ITERATION_NAME_ONLY && iteration_type != CPG_ITERATION_ONE_GROUP &&
1278 		    iteration_type != CPG_ITERATION_ALL) {
1279 	
1280 			return (CS_ERR_INVALID_PARAM);
1281 		}
1282 	
1283 		error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
1284 		if (error != CS_OK) {
1285 			return (error);
1286 		}
1287 	
1288 		error = hdb_error_to_cs (hdb_handle_create (&cpg_iteration_handle_t_db,
1289 			sizeof (struct cpg_iteration_instance_t), cpg_iteration_handle));
1290 		if (error != CS_OK) {
1291 			goto error_put_cpg_db;
1292 		}
1293 	
1294 		error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, *cpg_iteration_handle,
1295 			(void *)&cpg_iteration_instance));
1296 		if (error != CS_OK) {
1297 			goto error_destroy;
1298 		}
1299 	
1300 		cpg_iteration_instance->conn = cpg_inst->c;
1301 	
1302 		qb_list_init (&cpg_iteration_instance->list);
1303 	
1304 		req_lib_cpg_iterationinitialize.header.size = sizeof (struct req_lib_cpg_iterationinitialize);
1305 		req_lib_cpg_iterationinitialize.header.id = MESSAGE_REQ_CPG_ITERATIONINITIALIZE;
1306 		req_lib_cpg_iterationinitialize.iteration_type = iteration_type;
1307 		if (group) {
1308 			marshall_to_mar_cpg_name_t (&req_lib_cpg_iterationinitialize.group_name, group);
1309 		}
1310 	
1311 		iov.iov_base = (void *)&req_lib_cpg_iterationinitialize;
1312 		iov.iov_len = sizeof (struct req_lib_cpg_iterationinitialize);
1313 	
1314 		error = coroipcc_msg_send_reply_receive (cpg_inst->c,
1315 			&iov,
1316 			1,
1317 			&res_lib_cpg_iterationinitialize,
1318 			sizeof (struct res_lib_cpg_iterationinitialize));
1319 	
1320 		if (error != CS_OK) {
1321 			goto error_put_destroy;
1322 		}
1323 	
1324 		cpg_iteration_instance->executive_iteration_handle =
1325 			res_lib_cpg_iterationinitialize.iteration_handle;
1326 		cpg_iteration_instance->cpg_iteration_handle = *cpg_iteration_handle;
1327 	
1328 		qb_list_add (&cpg_iteration_instance->list, &cpg_inst->iteration_list_head);
1329 	
1330 		hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1331 		hdb_handle_put (&cpg_handle_t_db, handle);
1332 	
1333 		return (res_lib_cpg_iterationinitialize.header.error);
1334 	
1335 	error_put_destroy:
1336 		hdb_handle_put (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1337 	error_destroy:
1338 		hdb_handle_destroy (&cpg_iteration_handle_t_db, *cpg_iteration_handle);
1339 	error_put_cpg_db:
1340 		hdb_handle_put (&cpg_handle_t_db, handle);
1341 	
1342 		return (error);
1343 	}
1344 	
1345 	cs_error_t cpg_iteration_next(
1346 		cpg_iteration_handle_t handle,
1347 		struct cpg_iteration_description_t *description)
1348 	{
1349 		cs_error_t error;
1350 		struct cpg_iteration_instance_t *cpg_iteration_instance;
1351 		struct req_lib_cpg_iterationnext req_lib_cpg_iterationnext;
1352 		struct res_lib_cpg_iterationnext res_lib_cpg_iterationnext;
1353 	
1354 		if (description == NULL) {
1355 			return CS_ERR_INVALID_PARAM;
1356 		}
1357 	
1358 		error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1359 			(void *)&cpg_iteration_instance));
1360 		if (error != CS_OK) {
1361 			goto error_exit;
1362 		}
1363 	
1364 		req_lib_cpg_iterationnext.header.size = sizeof (struct req_lib_cpg_iterationnext);
1365 		req_lib_cpg_iterationnext.header.id = MESSAGE_REQ_CPG_ITERATIONNEXT;
1366 		req_lib_cpg_iterationnext.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1367 	
1368 		error = qb_to_cs_error (qb_ipcc_send (cpg_iteration_instance->conn,
1369 					&req_lib_cpg_iterationnext,
1370 					req_lib_cpg_iterationnext.header.size));
1371 		if (error != CS_OK) {
1372 			goto error_put;
1373 		}
1374 	
1375 		error = qb_to_cs_error (qb_ipcc_recv (cpg_iteration_instance->conn,
1376 					&res_lib_cpg_iterationnext,
1377 					sizeof(struct res_lib_cpg_iterationnext), -1));
1378 		if (error != CS_OK) {
1379 			goto error_put;
1380 		}
1381 	
1382 		marshall_from_mar_cpg_iteration_description_t(
1383 				description,
1384 				&res_lib_cpg_iterationnext.description);
1385 	
1386 		error = res_lib_cpg_iterationnext.header.error;
1387 	
1388 	error_put:
1389 		hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1390 	
1391 	error_exit:
1392 		return (error);
1393 	}
1394 	
1395 	cs_error_t cpg_iteration_finalize (
1396 		cpg_iteration_handle_t handle)
1397 	{
1398 		cs_error_t error;
1399 		struct iovec iov;
1400 		struct cpg_iteration_instance_t *cpg_iteration_instance;
1401 		struct req_lib_cpg_iterationfinalize req_lib_cpg_iterationfinalize;
1402 		struct res_lib_cpg_iterationfinalize res_lib_cpg_iterationfinalize;
1403 	
1404 		error = hdb_error_to_cs (hdb_handle_get (&cpg_iteration_handle_t_db, handle,
1405 			(void *)&cpg_iteration_instance));
1406 		if (error != CS_OK) {
1407 			goto error_exit;
1408 		}
1409 	
1410 		req_lib_cpg_iterationfinalize.header.size = sizeof (struct req_lib_cpg_iterationfinalize);
1411 		req_lib_cpg_iterationfinalize.header.id = MESSAGE_REQ_CPG_ITERATIONFINALIZE;
1412 		req_lib_cpg_iterationfinalize.iteration_handle = cpg_iteration_instance->executive_iteration_handle;
1413 	
1414 		iov.iov_base = (void *)&req_lib_cpg_iterationfinalize;
1415 		iov.iov_len = sizeof (struct req_lib_cpg_iterationfinalize);
1416 	
1417 		error = coroipcc_msg_send_reply_receive (cpg_iteration_instance->conn,
1418 			&iov,
1419 			1,
1420 			&res_lib_cpg_iterationfinalize,
1421 			sizeof (struct req_lib_cpg_iterationfinalize));
1422 	
1423 		if (error != CS_OK) {
1424 			goto error_put;
1425 		}
1426 	
1427 		cpg_iteration_instance_finalize (cpg_iteration_instance);
1428 		hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->cpg_iteration_handle);
1429 	
1430 		return (res_lib_cpg_iterationfinalize.header.error);
1431 	
1432 	error_put:
1433 		hdb_handle_put (&cpg_iteration_handle_t_db, handle);
1434 	error_exit:
1435 		return (error);
1436 	}
1437 	
1438 	/** @} */
1439