1    	/*
2    	 * Copyright (c) 2008-2020 Red Hat, Inc.
3    	 *
4    	 * All rights reserved.
5    	 *
6    	 * Author: Christine Caulfield (ccaulfie@redhat.com)
7    	 *
8    	 * This software licensed under BSD license, the text of which follows:
9    	 *
10   	 * Redistribution and use in source and binary forms, with or without
11   	 * modification, are permitted provided that the following conditions are met:
12   	 *
13   	 * - Redistributions of source code must retain the above copyright notice,
14   	 *   this list of conditions and the following disclaimer.
15   	 * - Redistributions in binary form must reproduce the above copyright notice,
16   	 *   this list of conditions and the following disclaimer in the documentation
17   	 *   and/or other materials provided with the distribution.
18   	 * - Neither the name of the MontaVista Software, Inc. nor the names of its
19   	 *   contributors may be used to endorse or promote products derived from this
20   	 *   software without specific prior written permission.
21   	 *
22   	 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
23   	 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24   	 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25   	 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
26   	 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
27   	 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
28   	 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
29   	 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
30   	 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
31   	 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
32   	 * THE POSSIBILITY OF SUCH DAMAGE.
33   	 */
34   	/*
35   	 * Provides a quorum API using the corosync executive
36   	 */
37   	
38   	#include <config.h>
39   	
40   	#include <stdlib.h>
41   	#include <string.h>
42   	#include <unistd.h>
43   	#include <sys/types.h>
44   	#include <sys/socket.h>
45   	#include <sys/uio.h>
46   	#include <errno.h>
47   	
48   	#include <qb/qbipcc.h>
49   	#include <corosync/corotypes.h>
50   	#include <corosync/corodefs.h>
51   	#include <corosync/hdb.h>
52   	
53   	#include <corosync/quorum.h>
54   	#include <corosync/ipc_quorum.h>
55   	
56   	#include "util.h"
57   	
58   	struct quorum_inst {
59   		qb_ipcc_connection_t *c;
60   		int finalize;
61   		const void *context;
62   		union {
63   			quorum_model_data_t model_data;
64   			quorum_model_v0_data_t model_v0_data;
65   			quorum_model_v1_data_t model_v1_data;
66   		};
67   	};
68   	
69   	static void quorum_inst_free (void *inst);
70   	
71   	DECLARE_HDB_DATABASE(quorum_handle_t_db, quorum_inst_free);
72   	
73   	cs_error_t quorum_initialize (
74   		quorum_handle_t *handle,
75   		quorum_callbacks_t *callbacks,
76   		uint32_t *quorum_type)
77   	{
78   		quorum_model_v0_data_t model_v0_data;
79   	
80   		memset (&model_v0_data, 0, sizeof(quorum_model_v0_data_t));
81   	
82   		if (callbacks) {
83   			model_v0_data.quorum_notify_fn = callbacks->quorum_notify_fn;
84   		}
85   	
86   		return (quorum_model_initialize(handle, QUORUM_MODEL_V0,
87   		    (quorum_model_data_t *)&model_v0_data, quorum_type, NULL));
88   	}
89   	
90   	cs_error_t quorum_model_initialize (
91   		quorum_handle_t *handle,
92   		quorum_model_t model,
93   		quorum_model_data_t *model_data,
94   		uint32_t *quorum_type,
95   		void *context)
96   	{
97   		cs_error_t error;
98   		struct quorum_inst *quorum_inst;
99   		struct iovec iov;
100  		struct qb_ipc_request_header quorum_gettype_req;
101  		struct req_lib_quorum_model_gettype quorum_model_gettype_req;
102  		struct res_lib_quorum_gettype res_lib_quorum_gettype;
103  		struct res_lib_quorum_model_gettype res_lib_quorum_model_gettype;
104  		uint32_t local_quorum_type;
105  	
106  		if (model != QUORUM_MODEL_V0 && model != QUORUM_MODEL_V1) {
107  			error = CS_ERR_INVALID_PARAM;
108  			goto error_no_destroy;
109  		}
110  	
111  		error = hdb_error_to_cs(hdb_handle_create (&quorum_handle_t_db, sizeof (struct quorum_inst), handle));
112  		if (error != CS_OK) {
113  			goto error_no_destroy;
114  		}
115  	
116  		error = hdb_error_to_cs(hdb_handle_get (&quorum_handle_t_db, *handle, (void *)&quorum_inst));
117  		if (error != CS_OK) {
118  			goto error_destroy;
119  		}
120  	
121  		error = CS_OK;
122  		quorum_inst->finalize = 0;
123  		quorum_inst->c = qb_ipcc_connect ("quorum", IPC_REQUEST_SIZE);
124  		if (quorum_inst->c == NULL) {
125  			error = qb_to_cs_error(-errno);
126  			goto error_put_destroy;
127  		}
128  	
129  		switch (model) {
130  		case QUORUM_MODEL_V0:
131  			quorum_gettype_req.size = sizeof (quorum_gettype_req);
132  			quorum_gettype_req.id = MESSAGE_REQ_QUORUM_GETTYPE;
133  	
134  			iov.iov_base = (char *)&quorum_gettype_req;
135  			iov.iov_len = sizeof (quorum_gettype_req);
136  	
137  			error = qb_to_cs_error(qb_ipcc_sendv_recv (
138  				quorum_inst->c,
139  				&iov,
140  				1,
141  				&res_lib_quorum_gettype,
142  				sizeof(res_lib_quorum_gettype), -1));
143  	
144  			if (error != CS_OK) {
145  				goto error_put_destroy;
146  			}
147  			error = res_lib_quorum_gettype.header.error;
148  			local_quorum_type = res_lib_quorum_gettype.quorum_type;
149  			break;
150  		case QUORUM_MODEL_V1:
151  			quorum_model_gettype_req.header.size = sizeof (quorum_model_gettype_req);
152  			quorum_model_gettype_req.header.id = MESSAGE_REQ_QUORUM_MODEL_GETTYPE;
153  			quorum_model_gettype_req.model = model;
154  	
155  			iov.iov_base = (char *)&quorum_model_gettype_req;
156  			iov.iov_len = sizeof (quorum_model_gettype_req);
157  	
158  			error = qb_to_cs_error(qb_ipcc_sendv_recv (
159  				quorum_inst->c,
160  				&iov,
161  				1,
162  				&res_lib_quorum_model_gettype,
163  				sizeof(res_lib_quorum_model_gettype), -1));
164  	
165  			if (error != CS_OK) {
166  				goto error_put_destroy;
167  			}
168  			error = res_lib_quorum_model_gettype.header.error;
169  			local_quorum_type = res_lib_quorum_model_gettype.quorum_type;
170  			break;
171  		}
172  	
173  		if (quorum_type != NULL) {
174  			*quorum_type = local_quorum_type;
175  		}
176  	
177  		if (model_data != NULL) {
178  			switch (model) {
179  			case QUORUM_MODEL_V0:
180  				memcpy(&quorum_inst->model_v0_data, model_data, sizeof(quorum_model_v0_data_t));
181  				break;
182  			case QUORUM_MODEL_V1:
183  				memcpy(&quorum_inst->model_v1_data, model_data, sizeof(quorum_model_v1_data_t));
184  				break;
185  			}
186  		}
187  	
188  		quorum_inst->model_data.model = model;
189  		quorum_inst->context = context;
190  	
191  		(void)hdb_handle_put (&quorum_handle_t_db, *handle);
192  	
193  		return (error);
194  	
195  	error_put_destroy:
196  		(void)hdb_handle_put (&quorum_handle_t_db, *handle);
197  	error_destroy:
198  		(void)hdb_handle_destroy (&quorum_handle_t_db, *handle);
199  	error_no_destroy:
200  		return (error);
201  	}
202  	
203  	static void quorum_inst_free (void *inst)
204  	{
205  		struct quorum_inst *quorum_inst = (struct quorum_inst *)inst;
206  		qb_ipcc_disconnect(quorum_inst->c);
207  	}
208  	
209  	cs_error_t quorum_finalize (
210  		quorum_handle_t handle)
211  	{
212  		struct quorum_inst *quorum_inst;
213  		cs_error_t error;
214  	
215  		error = hdb_error_to_cs(hdb_handle_get (&quorum_handle_t_db, handle, (void *)&quorum_inst));
216  		if (error != CS_OK) {
217  			return (error);
218  		}
219  	
220  		/*
221  		 * Another thread has already started finalizing
222  		 */
223  		if (quorum_inst->finalize) {
224  			(void)hdb_handle_put (&quorum_handle_t_db, handle);
225  			return (CS_ERR_BAD_HANDLE);
226  		}
227  	
228  		quorum_inst->finalize = 1;
229  	
230  		(void)hdb_handle_destroy (&quorum_handle_t_db, handle);
231  	
232  		(void)hdb_handle_put (&quorum_handle_t_db, handle);
233  	
234  		return (CS_OK);
235  	}
236  	
237  	cs_error_t quorum_getquorate (
238  		quorum_handle_t handle,
239  		int *quorate)
240  	{
241  		cs_error_t error;
242  		struct quorum_inst *quorum_inst;
243  		struct iovec iov;
244  		struct qb_ipc_request_header req;
245  		struct res_lib_quorum_getquorate res_lib_quorum_getquorate;
246  	
247  		error = hdb_error_to_cs(hdb_handle_get (&quorum_handle_t_db, handle, (void *)&quorum_inst));
248  		if (error != CS_OK) {
249  			return (error);
250  		}
251  	
252  		req.size = sizeof (req);
253  		req.id = MESSAGE_REQ_QUORUM_GETQUORATE;
254  	
255  		iov.iov_base = (char *)&req;
256  		iov.iov_len = sizeof (req);
257  	
258  		error = qb_to_cs_error(qb_ipcc_sendv_recv (
259  			quorum_inst->c,
260  			&iov,
261  			1,
262  			&res_lib_quorum_getquorate,
263  			sizeof (struct res_lib_quorum_getquorate), CS_IPC_TIMEOUT_MS));
264  	
265  		if (error != CS_OK) {
266  			goto error_exit;
267  		}
268  	
269  		error = res_lib_quorum_getquorate.header.error;
270  	
271  		*quorate = res_lib_quorum_getquorate.quorate;
272  	
273  	error_exit:
274  		(void)hdb_handle_put (&quorum_handle_t_db, handle);
275  	
276  		return (error);
277  	}
278  	
279  	cs_error_t quorum_fd_get (
280  		quorum_handle_t handle,
281  		int *fd)
282  	{
283  		cs_error_t error;
284  		struct quorum_inst *quorum_inst;
285  	
286  		error = hdb_error_to_cs(hdb_handle_get (&quorum_handle_t_db, handle, (void *)&quorum_inst));
287  		if (error != CS_OK) {
288  			return (error);
289  		}
290  	
291  		error = qb_to_cs_error(qb_ipcc_fd_get (quorum_inst->c, fd));
292  	
293  		(void)hdb_handle_put (&quorum_handle_t_db, handle);
294  	
295  		return (error);
296  	}
297  	
298  	
299  	cs_error_t quorum_context_get (
300  		quorum_handle_t handle,
301  		const void **context)
302  	{
303  		cs_error_t error;
304  		struct quorum_inst *quorum_inst;
305  	
306  		error = hdb_error_to_cs(hdb_handle_get (&quorum_handle_t_db, handle, (void *)&quorum_inst));
307  		if (error != CS_OK) {
308  			return (error);
309  		}
310  	
311  		*context = quorum_inst->context;
312  	
313  		(void)hdb_handle_put (&quorum_handle_t_db, handle);
314  	
315  		return (CS_OK);
316  	}
317  	
318  	cs_error_t quorum_context_set (
319  		quorum_handle_t handle,
320  		const void *context)
321  	{
322  		cs_error_t error;
323  		struct quorum_inst *quorum_inst;
324  	
325  		error = hdb_error_to_cs(hdb_handle_get (&quorum_handle_t_db, handle, (void *)&quorum_inst));
326  		if (error != CS_OK) {
327  			return (error);
328  		}
329  	
330  		quorum_inst->context = context;
331  	
332  		(void)hdb_handle_put (&quorum_handle_t_db, handle);
333  	
334  		return (CS_OK);
335  	}
336  	
337  	
338  	cs_error_t quorum_trackstart (
339  		quorum_handle_t handle,
340  		unsigned int flags )
341  	{
342  		cs_error_t error;
343  		struct quorum_inst *quorum_inst;
344  		struct iovec iov;
345  		struct req_lib_quorum_trackstart req_lib_quorum_trackstart;
346  		struct qb_ipc_response_header res;
347  	
348  		error = hdb_error_to_cs(hdb_handle_get (&quorum_handle_t_db, handle, (void *)&quorum_inst));
349  		if (error != CS_OK) {
350  			return (error);
351  		}
352  	
353  		req_lib_quorum_trackstart.header.size = sizeof (struct req_lib_quorum_trackstart);
354  		req_lib_quorum_trackstart.header.id = MESSAGE_REQ_QUORUM_TRACKSTART;
355  		req_lib_quorum_trackstart.track_flags = flags;
356  	
357  		iov.iov_base = (char *)&req_lib_quorum_trackstart;
358  		iov.iov_len = sizeof (struct req_lib_quorum_trackstart);
359  	
360  	       error = qb_to_cs_error(qb_ipcc_sendv_recv (
361  			quorum_inst->c,
362  	                &iov,
363  	                1,
364  	                &res,
365  	                sizeof (res), CS_IPC_TIMEOUT_MS));
366  	
367  		if (error != CS_OK) {
368  			goto error_exit;
369  		}
370  	
371  		error = res.error;
372  	
373  	error_exit:
374  		(void)hdb_handle_put (&quorum_handle_t_db, handle);
375  	
376  		return (error);
377  	}
378  	
379  	cs_error_t quorum_trackstop (
380  		quorum_handle_t handle)
381  	{
382  		cs_error_t error;
383  		struct quorum_inst *quorum_inst;
384  		struct iovec iov;
385  		struct qb_ipc_request_header req;
386  		struct qb_ipc_response_header res;
387  	
388  		error = hdb_error_to_cs(hdb_handle_get (&quorum_handle_t_db, handle, (void *)&quorum_inst));
389  		if (error != CS_OK) {
390  			return (error);
391  		}
392  	
393  		req.size = sizeof (req);
394  		req.id = MESSAGE_REQ_QUORUM_TRACKSTOP;
395  	
396  		iov.iov_base = (char *)&req;
397  		iov.iov_len = sizeof (req);
398  	
399  	       error = qb_to_cs_error(qb_ipcc_sendv_recv (
400  			quorum_inst->c,
401  	                &iov,
402  	                1,
403  	                &res,
404  	                sizeof (res), CS_IPC_TIMEOUT_MS));
405  	
406  		if (error != CS_OK) {
407  			goto error_exit;
408  		}
409  	
410  		error = res.error;
411  	
412  	error_exit:
413  		(void)hdb_handle_put (&quorum_handle_t_db, handle);
414  	
415  		return (error);
416  	}
417  	
418  	cs_error_t quorum_dispatch (
419  		quorum_handle_t handle,
420  		cs_dispatch_flags_t dispatch_types)
421  	{
422  		int timeout = -1;
423  		cs_error_t error;
424  		int cont = 1; /* always continue do loop except when set to 0 */
425  		struct quorum_inst *quorum_inst;
426  		struct quorum_inst quorum_inst_copy;
427  		struct qb_ipc_response_header *dispatch_data;
428  		char dispatch_buf[IPC_DISPATCH_SIZE];
429  		struct res_lib_quorum_notification *res_lib_quorum_notification;
430  		struct res_lib_quorum_v1_quorum_notification *res_lib_quorum_v1_quorum_notification;
431  		struct res_lib_quorum_v1_nodelist_notification *res_lib_quorum_v1_nodelist_notification;
432  		struct quorum_ring_id ring_id;
433  		mar_uint32_t *joined_list;
434  		mar_uint32_t *left_list;
435  	
(1) Event path: Condition "dispatch_types != CS_DISPATCH_ONE", taking true branch.
(2) Event path: Condition "dispatch_types != CS_DISPATCH_ALL", taking true branch.
(3) Event path: Condition "dispatch_types != CS_DISPATCH_BLOCKING", taking true branch.
(4) Event path: Condition "dispatch_types != CS_DISPATCH_ONE_NONBLOCKING", taking false branch.
436  		if (dispatch_types != CS_DISPATCH_ONE &&
437  			dispatch_types != CS_DISPATCH_ALL &&
438  			dispatch_types != CS_DISPATCH_BLOCKING &&
439  			dispatch_types != CS_DISPATCH_ONE_NONBLOCKING) {
440  	
441  			return (CS_ERR_INVALID_PARAM);
442  		}
443  	
444  		error = hdb_error_to_cs(hdb_handle_get (&quorum_handle_t_db, handle,
445  			(void *)&quorum_inst));
(5) Event path: Condition "error != CS_OK", taking false branch.
446  		if (error != CS_OK) {
447  			return (error);
448  		}
449  	
450  		/*
451  		 * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
452  		 * wait indefinately for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
453  		 */
(6) Event path: Condition "dispatch_types == CS_DISPATCH_ALL", taking false branch.
(7) Event path: Condition "dispatch_types == CS_DISPATCH_ONE_NONBLOCKING", taking true branch.
454  		if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
455  			timeout = 0;
456  		}
457  	
458  		dispatch_data = (struct qb_ipc_response_header *)dispatch_buf;
459  		do {
460  			error = qb_to_cs_error (qb_ipcc_event_recv (
461  				quorum_inst->c,
462  				dispatch_buf,
463  				IPC_DISPATCH_SIZE,
464  				timeout));
(8) Event path: Condition "error == CS_ERR_BAD_HANDLE", taking false branch.
465  			if (error == CS_ERR_BAD_HANDLE) {
466  				error = CS_OK;
467  				goto error_put;
468  			}
(9) Event path: Condition "error == CS_ERR_TRY_AGAIN", taking false branch.
469  			if (error == CS_ERR_TRY_AGAIN) {
470  				if (dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
471  					/*
472  					 * Don't mask error
473  					 */
474  					goto error_put;
475  				}
476  				error = CS_OK;
477  				if (dispatch_types == CS_DISPATCH_ALL) {
478  					break; /* exit do while cont is 1 loop */
479  				} else {
480  					continue; /* next poll */
481  				}
482  			}
(10) Event path: Condition "error != CS_OK", taking false branch.
483  			if (error != CS_OK) {
484  				goto error_put;
485  			}
486  	
487  			/*
488  			 * Make copy of callbacks, message data, unlock instance, and call callback
489  			 * A risk of this dispatch method is that the callback routines may
490  			 * operate at the same time that quorum_finalize has been called in another thread.
491  			 */
492  			memcpy (&quorum_inst_copy, quorum_inst, sizeof(quorum_inst_copy));
(11) Event inferred_valid_union_field: The union field "model_data" of "quorum_inst_copy" is assumed to be valid.
(12) Event path: Switch case value "QUORUM_MODEL_V0".
Also see events: [inconsistent_union_field_access]
493  			switch (quorum_inst_copy.model_data.model) {
494  			case QUORUM_MODEL_V0:
495  				/*
496  				 * Dispatch incoming message
497  				 */
(13) Event path: Switch case value "MESSAGE_RES_QUORUM_NOTIFICATION".
498  				switch (dispatch_data->id) {
499  				case MESSAGE_RES_QUORUM_NOTIFICATION:
CID (unavailable; MK=f35f4c285367371b5cdbbc57c42496d9) (#1 of 1): Inconsistent C union access (INCONSISTENT_UNION_ACCESS):
(14) Event inconsistent_union_field_access: In "quorum_inst_copy.model_v0_data", the union field used: "model_v0_data" is inconsistent with the field most recently stored: "model_data".
Also see events: [inferred_valid_union_field]
500  					if (quorum_inst_copy.model_v0_data.quorum_notify_fn == NULL) {
501  						break;
502  					}
503  					res_lib_quorum_notification = (struct res_lib_quorum_notification *)dispatch_data;
504  	
505  					quorum_inst_copy.model_v0_data.quorum_notify_fn ( handle,
506  						res_lib_quorum_notification->quorate,
507  						res_lib_quorum_notification->ring_seq,
508  						res_lib_quorum_notification->view_list_entries,
509  						res_lib_quorum_notification->view_list);
510  					break;
511  				default:
512  					error = CS_ERR_LIBRARY;
513  					goto error_put;
514  					break;
515  				} /* switch (dispatch_data->id) */
516  				break; /* case QUORUM_MODEL_V0 */
517  			case QUORUM_MODEL_V1:
518  				/*
519  				 * Dispatch incoming message
520  				 */
521  				switch (dispatch_data->id) {
522  				case MESSAGE_RES_QUORUM_V1_QUORUM_NOTIFICATION:
523  					if (quorum_inst_copy.model_v1_data.quorum_notify_fn == NULL) {
524  						break;
525  					}
526  					res_lib_quorum_v1_quorum_notification =
527  						(struct res_lib_quorum_v1_quorum_notification *)dispatch_data;
528  	
529  					ring_id.nodeid = res_lib_quorum_v1_quorum_notification->ring_id.nodeid;
530  					ring_id.seq = res_lib_quorum_v1_quorum_notification->ring_id.seq;
531  	
532  					quorum_inst_copy.model_v1_data.quorum_notify_fn ( handle,
533  						res_lib_quorum_v1_quorum_notification->quorate,
534  						ring_id,
535  						res_lib_quorum_v1_quorum_notification->view_list_entries,
536  						res_lib_quorum_v1_quorum_notification->view_list);
537  					break;
538  				case MESSAGE_RES_QUORUM_V1_NODELIST_NOTIFICATION:
539  					if (quorum_inst_copy.model_v1_data.nodelist_notify_fn == NULL) {
540  						break;
541  					}
542  					res_lib_quorum_v1_nodelist_notification =
543  						(struct res_lib_quorum_v1_nodelist_notification *)dispatch_data;
544  	
545  					ring_id.nodeid = res_lib_quorum_v1_nodelist_notification->ring_id.nodeid;
546  					ring_id.seq = res_lib_quorum_v1_nodelist_notification->ring_id.seq;
547  	
548  					joined_list = res_lib_quorum_v1_nodelist_notification->member_list +
549  					    res_lib_quorum_v1_nodelist_notification->member_list_entries;
550  					left_list = joined_list +
551  					    res_lib_quorum_v1_nodelist_notification->joined_list_entries;
552  	
553  					quorum_inst_copy.model_v1_data.nodelist_notify_fn ( handle,
554  						ring_id,
555  						res_lib_quorum_v1_nodelist_notification->member_list_entries,
556  						res_lib_quorum_v1_nodelist_notification->member_list,
557  						res_lib_quorum_v1_nodelist_notification->joined_list_entries,
558  						joined_list,
559  						res_lib_quorum_v1_nodelist_notification->left_list_entries,
560  						left_list);
561  					break;
562  				default:
563  					error = CS_ERR_LIBRARY;
564  					goto error_put;
565  					break;
566  				} /* switch (dispatch_data->id) */
567  				break; /* case QUORUM_MODEL_V1 */
568  			}
569  			if (quorum_inst->finalize) {
570  				/*
571  				 * If the finalize has been called then get out of the dispatch.
572  				 */
573  				error = CS_ERR_BAD_HANDLE;
574  				goto error_put;
575  			}
576  	
577  			/*
578  			 * Determine if more messages should be processed
579  			 */
580  			if (dispatch_types == CS_DISPATCH_ONE || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
581  				cont = 0;
582  			}
583  		} while (cont);
584  	
585  	error_put:
586  		(void)hdb_handle_put (&quorum_handle_t_db, handle);
587  		return (error);
588  	}
589