1    	/*
2    	 * Copyright (c) 2010 Red Hat, Inc.
3    	 *
4    	 * All rights reserved.
5    	 *
6    	 * Author: Angus Salkeld <asalkeld@redhat.com>
7    	 *
8    	 * This file is part of libqb.
9    	 *
10   	 * libqb is free software: you can redistribute it and/or modify
11   	 * it under the terms of the GNU Lesser General Public License as published by
12   	 * the Free Software Foundation, either version 2.1 of the License, or
13   	 * (at your option) any later version.
14   	 *
15   	 * libqb is distributed in the hope that it will be useful,
16   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18   	 * GNU Lesser General Public License for more details.
19   	 *
20   	 * You should have received a copy of the GNU Lesser General Public License
21   	 * along with libqb.  If not, see <http://www.gnu.org/licenses/>.
22   	 */
23   	
24   	#include "os_base.h"
25   	#include <sys/wait.h>
26   	#include <sys/un.h>
27   	#include <signal.h>
28   	#include <stdbool.h>
29   	#include <fcntl.h>
30   	
31   	#ifdef HAVE_GLIB
32   	#include <glib.h>
33   	#endif
34   	
35   	#include "check_common.h"
36   	
37   	#include <qb/qbdefs.h>
38   	#include <qb/qblog.h>
39   	#include <qb/qbipcc.h>
40   	#include <qb/qbipcs.h>
41   	#include <qb/qbloop.h>
42   	
43   	#ifdef HAVE_FAILURE_INJECTION
44   	#include "_failure_injection.h"
45   	#endif
46   	
47   	#define NUM_STRESS_CONNECTIONS 5000
48   	
49   	static char ipc_name[256];
50   	
51   	#define DEFAULT_MAX_MSG_SIZE (8192*16)
52   	#ifndef __clang__
53   	static int CALCULATED_DGRAM_MAX_MSG_SIZE = 0;
54   	
55   	#define DGRAM_MAX_MSG_SIZE \
56   		(CALCULATED_DGRAM_MAX_MSG_SIZE == 0 ? \
57   		CALCULATED_DGRAM_MAX_MSG_SIZE = qb_ipcc_verify_dgram_max_msg_size(DEFAULT_MAX_MSG_SIZE) : \
58   		CALCULATED_DGRAM_MAX_MSG_SIZE)
59   	
60   	#define MAX_MSG_SIZE (ipc_type == QB_IPC_SOCKET ? DGRAM_MAX_MSG_SIZE : DEFAULT_MAX_MSG_SIZE)
61   	
62   	#else
63   	/* because of clang's
64   	   'variable length array in structure' extension will never be supported;
65   	   assign default for SHM as we'll skip test that would use run-time
66   	   established value (via qb_ipcc_verify_dgram_max_msg_size), anyway */
67   	#define MAX_MSG_SIZE  DEFAULT_MAX_MSG_SIZE
68   	#endif
69   	
70   	/* The size the giant msg's data field needs to be to make
71   	 * this the largests msg we can successfully send. */
72   	#define GIANT_MSG_DATA_SIZE (MAX_MSG_SIZE - sizeof(struct qb_ipc_response_header) - 8)
73   	
74   	static int enforce_server_buffer;
75   	static qb_ipcc_connection_t *conn;
76   	static enum qb_ipc_type ipc_type;
77   	static enum qb_loop_priority global_loop_prio = QB_LOOP_MED;
78   	static bool global_use_glib;
79   	static int global_pipefd[2];
80   	
81   	enum my_msg_ids {
82   		IPC_MSG_REQ_TX_RX,
83   		IPC_MSG_RES_TX_RX,
84   		IPC_MSG_REQ_DISPATCH,
85   		IPC_MSG_RES_DISPATCH,
86   		IPC_MSG_REQ_BULK_EVENTS,
87   		IPC_MSG_RES_BULK_EVENTS,
88   		IPC_MSG_REQ_STRESS_EVENT,
89   		IPC_MSG_RES_STRESS_EVENT,
90   		IPC_MSG_REQ_SELF_FEED,
91   		IPC_MSG_RES_SELF_FEED,
92   		IPC_MSG_REQ_SERVER_FAIL,
93   		IPC_MSG_RES_SERVER_FAIL,
94   		IPC_MSG_REQ_SERVER_DISCONNECT,
95   		IPC_MSG_RES_SERVER_DISCONNECT,
96   	};
97   	
98   	
99   	/* these 2 functions from pacemaker code */
100  	static enum qb_ipcs_rate_limit
101  	conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
102  	{
103  		/* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */
104  		enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL;
105  		switch (prio) {
106  		case QB_LOOP_LOW:
107  			ret = QB_IPCS_RATE_SLOW;
108  			break;
109  		case QB_LOOP_HIGH:
110  			ret = QB_IPCS_RATE_FAST;
111  			break;
112  		default:
113  			qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d,"
114  			       " assuming QB_LOOP_MED", prio);
115  			/* fall-through */
116  		case QB_LOOP_MED:
117  			break;
118  		}
119  		return ret;
120  	}
121  	#ifdef HAVE_GLIB
122  	static gint
123  	conv_prio_libqb2glib(enum qb_loop_priority prio)
124  	{
125  		gint ret = G_PRIORITY_DEFAULT;
126  		switch (prio) {
127  		case QB_LOOP_LOW:
128  			ret = G_PRIORITY_LOW;
129  			break;
130  		case QB_LOOP_HIGH:
131  			ret = G_PRIORITY_HIGH;
132  			break;
133  		default:
134  			qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d,"
135  			       " assuming QB_LOOP_MED", prio);
136  			/* fall-through */
137  		case QB_LOOP_MED:
138  			break;
139  		}
140  		return ret;
141  	}
142  	
143  	/* these 3 glue functions inspired from pacemaker, too */
144  	static gboolean
145  	gio_source_prepare(GSource *source, gint *timeout)
146  	{
147  		qb_enter();
148  		*timeout = 500;
149  		return FALSE;
150  	}
151  	static gboolean
152  	gio_source_check(GSource *source)
153  	{
154  		qb_enter();
155  		return TRUE;
156  	}
157  	static gboolean
158  	gio_source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
159  	{
160  		gboolean ret = G_SOURCE_CONTINUE;
161  		qb_enter();
162  		if (callback) {
163  			ret = callback(user_data);
164  		}
165  		return ret;
166  	}
167  	static GSourceFuncs gio_source_funcs = {
168  	    .prepare = gio_source_prepare,
169  	    .check = gio_source_check,
170  	    .dispatch = gio_source_dispatch,
171  	};
172  	
173  	#endif
174  	
175  	
176  	/* Test Cases
177  	 *
178  	 * 1) basic send & recv different message sizes
179  	 *
180  	 * 2) send message to start dispatch (confirm receipt)
181  	 *
182  	 * 3) flow control
183  	 *
184  	 * 4) authentication
185  	 *
186  	 * 5) thread safety
187  	 *
188  	 * 6) cleanup
189  	 *
190  	 * 7) service availability
191  	 *
192  	 * 8) multiple services
193  	 *
194  	 * 9) setting perms on the sockets
195  	 */
196  	static qb_loop_t *my_loop;
197  	static qb_ipcs_service_t* s1;
198  	static int32_t turn_on_fc = QB_FALSE;
199  	static int32_t fc_enabled = 89;
200  	static int32_t send_event_on_created = QB_FALSE;
201  	static int32_t disconnect_after_created = QB_FALSE;
202  	static int32_t num_bulk_events = 10;
203  	static int32_t num_stress_events = 30000;
204  	static int32_t reference_count_test = QB_FALSE;
205  	static int32_t multiple_connections = QB_FALSE;
206  	static int32_t set_perms_on_socket = QB_FALSE;
207  	
208  	
209  	static int32_t
210  	exit_handler(int32_t rsignal, void *data)
211  	{
212  		qb_log(LOG_DEBUG, "caught signal %d", rsignal);
213  		qb_ipcs_destroy(s1);
214  		exit(0);
215  	}
216  	
217  	static void
218  	set_ipc_name(const char *prefix)
219  	{
220  		FILE *f;
221  		char process_name[256];
222  	
223  		/* The process-unique part of the IPC name has already been decided
224  		 * and stored in the file IPC_TEST_NAME_FILE
225  		 */
226  		f = fopen(IPC_TEST_NAME_FILE, "r");
227  		if (f) {
228  			fgets(process_name, sizeof(process_name), f);
229  			/* Remove any trailing LF that might be lurking */
230  			if (process_name[strlen(process_name)-1] == '\n') {
231  			        process_name[strlen(process_name)-1] = '\0';
232  			}
233  			fclose(f);
234  			snprintf(ipc_name, sizeof(ipc_name), "%.44s%s", prefix, process_name);
235  		} else {
236  			/* This is the old code, use only as a fallback */
237  			static char t_sec[3] = "";
238  			if (t_sec[0] == '\0') {
239  				const char *const found = strrchr(__TIME__, ':');
240  				strncpy(t_sec, found ? found + 1 : "-", sizeof(t_sec) - 1);
241  				t_sec[sizeof(t_sec) - 1] = '\0';
242  			}
243  	
244  			snprintf(ipc_name, sizeof(ipc_name), "%.44s%s%lX%.4x", prefix, t_sec,
245  				 (unsigned long)getpid(), (unsigned) ((long) time(NULL) % (0x10000)));
246  		}
247  	}
248  	
249  	static int
250  	pipe_writer(int fd, int revents, void *data) {
251  		qb_enter();
252  		static const char buf[8] = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h' };
253  	
254  		ssize_t wbytes = 0, wbytes_sum = 0;
255  	
256  		//for (size_t i = 0; i < SIZE_MAX; i++) {
257  		for (size_t i = 0; i < 4096; i++) {
258  			wbytes_sum += wbytes;
259  			if ((wbytes = write(fd, buf, sizeof(buf))) == -1) {
260  				if (errno != EAGAIN) {
261  					perror("write");
262  					exit(-1);
263  				}
264  				break;
265  			}
266  		}
267  		if (wbytes_sum > 0) {
268  			qb_log(LOG_DEBUG, "written %zd bytes", wbytes_sum);
269  		}
270  		qb_leave();
271  		return 1;
272  	}
273  	
274  	static int
275  	pipe_reader(int fd, int revents, void *data) {
276  		qb_enter();
277  		ssize_t rbytes, rbytes_sum = 0;
278  		size_t cnt = SIZE_MAX;
279  		char buf[4096] = { '\0' };
280  		while ((rbytes = read(fd, buf, sizeof(buf))) > 0 && rbytes < cnt) {
281  			cnt -= rbytes;
282  			rbytes_sum += rbytes;
283  		}
284  		if (rbytes_sum > 0) {
285  			ck_assert(buf[0] != '\0');  /* avoid dead store elimination */
286  			qb_log(LOG_DEBUG, "read %zd bytes", rbytes_sum);
287  			sleep(1);
288  		}
289  		qb_leave();
290  		return 1;
291  	}
292  	
293  	#if HAVE_GLIB
294  	static gboolean
295  	gio_pipe_reader(void *data) {
296  		return (pipe_reader(*((int *) data), 0, NULL) > 0);
297  	}
298  	static gboolean
299  	gio_pipe_writer(void *data) {
300  		return (pipe_writer(*((int *) data), 0, NULL) > 0);
301  	}
302  	#endif
303  	
304  	static int32_t
305  	s1_msg_process_fn(qb_ipcs_connection_t *c,
306  			void *data, size_t size)
307  	{
308  		struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
309  		struct qb_ipc_response_header response = { 0, };
310  		ssize_t res;
311  	
(1) Event cond_false: Condition "req_pt->id == IPC_MSG_REQ_TX_RX", taking false branch.
312  		if (req_pt->id == IPC_MSG_REQ_TX_RX) {
313  			response.size = sizeof(struct qb_ipc_response_header);
314  			response.id = IPC_MSG_RES_TX_RX;
315  			response.error = 0;
316  			res = qb_ipcs_response_send(c, &response, response.size);
317  			if (res < 0) {
318  				qb_perror(LOG_INFO, "qb_ipcs_response_send");
319  			} else if (res != response.size) {
320  				qb_log(LOG_DEBUG, "qb_ipcs_response_send %zd != %d",
321  				       res, response.size);
322  			}
323  			if (turn_on_fc) {
324  				qb_ipcs_request_rate_limit(s1, QB_IPCS_RATE_OFF);
325  			}
(2) Event else_branch: Reached else branch.
(3) Event cond_false: Condition "req_pt->id == IPC_MSG_REQ_DISPATCH", taking false branch.
326  		} else if (req_pt->id == IPC_MSG_REQ_DISPATCH) {
327  			response.size = sizeof(struct qb_ipc_response_header);
328  			response.id = IPC_MSG_RES_DISPATCH;
329  			response.error = 0;
330  			res = qb_ipcs_event_send(c, &response,
331  						 sizeof(response));
332  			if (res < 0) {
333  				qb_perror(LOG_INFO, "qb_ipcs_event_send");
334  			}
(4) Event else_branch: Reached else branch.
(5) Event cond_true: Condition "req_pt->id == IPC_MSG_REQ_BULK_EVENTS", taking true branch.
335  		} else if (req_pt->id == IPC_MSG_REQ_BULK_EVENTS) {
336  			int32_t m;
337  			int32_t num;
338  			struct qb_ipcs_connection_stats_2 *stats;
339  			uint32_t max_size = MAX_MSG_SIZE;
340  	
341  			response.size = sizeof(struct qb_ipc_response_header);
342  			response.error = 0;
343  	
344  			stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE);
345  			num = stats->event_q_length;
346  			free(stats);
347  	
348  			/* crazy large message */
349  			res = qb_ipcs_event_send(c, &response, max_size*10);
(6) Event cond_true: Condition "_ck_x == _ck_y", taking true branch.
350  			ck_assert_int_eq(res, -EMSGSIZE);
351  	
352  			/* send one event before responding */
353  			res = qb_ipcs_event_send(c, &response, sizeof(response));
(7) Event cond_true: Condition "_ck_x == _ck_y", taking true branch.
354  			ck_assert_int_eq(res, sizeof(response));
355  			response.id++;
356  	
357  			/* There should be one more item in the event queue now. */
358  			stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE);
(8) Event cond_true: Condition "_ck_x == _ck_y", taking true branch.
359  			ck_assert_int_eq(stats->event_q_length - num, 1);
360  			free(stats);
361  	
362  			/* send response */
363  			response.id = IPC_MSG_RES_BULK_EVENTS;
364  			res = qb_ipcs_response_send(c, &response, response.size);
(9) Event cond_true: Condition "_ck_x == _ck_y", taking true branch.
365  			ck_assert_int_eq(res, sizeof(response));
366  	
367  			/* send the rest of the events after the response */
(10) Event cond_true: Condition "m < num_bulk_events", taking true branch.
(16) Event loop_begin: Jumped back to beginning of loop.
(17) Event cond_true: Condition "m < num_bulk_events", taking true branch.
(24) Event loop_begin: Jumped back to beginning of loop.
(25) Event cond_true: Condition "m < num_bulk_events", taking true branch.
368  			for (m = 1; m < num_bulk_events; m++) {
(18) Event freed_arg: "qb_ipcs_event_send" frees "c". [details]
(26) Event double_free: Calling "qb_ipcs_event_send" frees pointer "c" which has already been freed. [details]
369  				res = qb_ipcs_event_send(c, &response, sizeof(response));
370  	
(11) Event cond_false: Condition "res == -11", taking false branch.
(12) Event cond_false: Condition "res == -105", taking false branch.
(19) Event cond_false: Condition "res == -11", taking false branch.
(20) Event cond_false: Condition "res == -105", taking false branch.
371  				if (res == -EAGAIN || res == -ENOBUFS) {
372  					/* retry */
373  					usleep(1000);
374  					m--;
375  					continue;
(13) Event if_end: End of if statement.
(21) Event if_end: End of if statement.
376  				}
(14) Event cond_true: Condition "_ck_x == _ck_y", taking true branch.
(22) Event cond_true: Condition "_ck_x == _ck_y", taking true branch.
377  				ck_assert_int_eq(res, sizeof(response));
378  				response.id++;
(15) Event loop: Jumping back to the beginning of the loop.
(23) Event loop: Jumping back to the beginning of the loop.
379  			}
380  	
381  		} else if (req_pt->id == IPC_MSG_REQ_STRESS_EVENT) {
382  			struct {
383  				struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
384  				char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
385  				uint32_t sent_msgs __attribute__ ((aligned(8)));
386  			} __attribute__ ((aligned(8))) giant_event_send;
387  			int32_t m;
388  	
389  			response.size = sizeof(struct qb_ipc_response_header);
390  			response.error = 0;
391  	
392  			response.id = IPC_MSG_RES_STRESS_EVENT;
393  			res = qb_ipcs_response_send(c, &response, response.size);
394  			ck_assert_int_eq(res, sizeof(response));
395  	
396  			giant_event_send.hdr.error = 0;
397  			giant_event_send.hdr.id = IPC_MSG_RES_STRESS_EVENT;
398  			for (m = 0; m < num_stress_events; m++) {
399  				size_t sent_len = sizeof(struct qb_ipc_response_header);
400  	
401  				if (((m+1) % 1000) == 0) {
402  					sent_len = sizeof(giant_event_send);
403  					giant_event_send.sent_msgs = m + 1;
404  				}
405  				giant_event_send.hdr.size = sent_len;
406  	
407  				res = qb_ipcs_event_send(c, &giant_event_send, sent_len);
408  				if (res < 0) {
409  					if (res == -EAGAIN || res == -ENOBUFS) {
410  						/* yield to the receive process */
411  						usleep(1000);
412  						m--;
413  						continue;
414  					} else {
415  						qb_perror(LOG_DEBUG, "sending stress events");
416  						ck_assert_int_eq(res, sent_len);
417  					}
418  				} else if (((m+1) % 1000) == 0) {
419  					qb_log(LOG_DEBUG, "SENT: %d stress events sent", m+1);
420  				}
421  				giant_event_send.hdr.id++;
422  			}
423  	
424  		} else if (req_pt->id == IPC_MSG_REQ_SELF_FEED) {
425  			if (pipe(global_pipefd) != 0) {
426  				perror("pipefd");
427  				ck_assert(0);
428  			}
429  			fcntl(global_pipefd[0], F_SETFL, O_NONBLOCK);
430  			fcntl(global_pipefd[1], F_SETFL, O_NONBLOCK);
431  			if (global_use_glib) {
432  	#ifdef HAVE_GLIB
433  				GSource *source_r, *source_w;
434  				source_r = g_source_new(&gio_source_funcs, sizeof(GSource));
435  				source_w = g_source_new(&gio_source_funcs, sizeof(GSource));
436  				ck_assert(source_r != NULL && source_w != NULL);
437  				g_source_set_priority(source_r, conv_prio_libqb2glib(QB_LOOP_HIGH));
438  				g_source_set_priority(source_w, conv_prio_libqb2glib(QB_LOOP_HIGH));
439  				g_source_set_can_recurse(source_r, FALSE);
440  				g_source_set_can_recurse(source_w, FALSE);
441  				g_source_set_callback(source_r, gio_pipe_reader, &global_pipefd[0], NULL);
442  				g_source_set_callback(source_w, gio_pipe_writer, &global_pipefd[1], NULL);
443  				g_source_add_unix_fd(source_r, global_pipefd[0], G_IO_IN);
444  				g_source_add_unix_fd(source_w, global_pipefd[1], G_IO_OUT);
445  				g_source_attach(source_r, NULL);
446  				g_source_attach(source_w, NULL);
447  	#else
448  				ck_assert(0);
449  	#endif
450  			} else {
451  				qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[1],
452  				                 POLLOUT|POLLERR, NULL, pipe_writer);
453  				qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[0],
454  				                 POLLIN|POLLERR, NULL, pipe_reader);
455  			}
456  	
457  		} else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) {
458  			exit(0);
459  		} else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) {
460  			multiple_connections = QB_FALSE;
461  			qb_ipcs_disconnect(c);
462  		}
463  		return 0;
464  	}
465  	
466  	static int32_t
467  	my_job_add(enum qb_loop_priority p,
468  				  void *data,
469  				  qb_loop_job_dispatch_fn fn)
470  	{
471  		return qb_loop_job_add(my_loop, p, data, fn);
472  	}
473  	
474  	static int32_t
475  	my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events,
476  		void *data, qb_ipcs_dispatch_fn_t fn)
477  	{
478  		return qb_loop_poll_add(my_loop, p, fd, events, data, fn);
479  	}
480  	
481  	static int32_t
482  	my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events,
483  		void *data, qb_ipcs_dispatch_fn_t fn)
484  	{
485  		return qb_loop_poll_mod(my_loop, p, fd, events, data, fn);
486  	}
487  	
488  	static int32_t
489  	my_dispatch_del(int32_t fd)
490  	{
491  		return qb_loop_poll_del(my_loop, fd);
492  	}
493  	
494  	
495  	/* taken from examples/ipcserver.c, with s/my_g/gio/ */
496  	#ifdef HAVE_GLIB
497  	
498  	#include <qb/qbarray.h>
499  	
500  	static qb_array_t *gio_map;
501  	static GMainLoop *glib_loop;
502  	
503  	struct gio_to_qb_poll {
504  		int32_t is_used;
505  		int32_t events;
506  		int32_t source;
507  		int32_t fd;
508  		void *data;
509  		qb_ipcs_dispatch_fn_t fn;
510  		enum qb_loop_priority p;
511  	};
512  	
513  	static gboolean
514  	gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
515  	{
516  		struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
517  		gint fd = g_io_channel_unix_get_fd(gio);
518  	
519  		qb_enter();
520  	
521  		return (adaptor->fn(fd, condition, adaptor->data) == 0);
522  	}
523  	
524  	static void
525  	gio_poll_destroy(gpointer data)
526  	{
527  		struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
528  	
529  		adaptor->is_used--;
530  		if (adaptor->is_used == 0) {
531  			qb_log(LOG_DEBUG, "fd %d adaptor destroyed\n", adaptor->fd);
532  			adaptor->fd = 0;
533  			adaptor->source = 0;
534  		}
535  	}
536  	
537  	static int32_t
538  	gio_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
539  	                    void *data, qb_ipcs_dispatch_fn_t fn, gboolean is_new)
540  	{
541  		struct gio_to_qb_poll *adaptor;
542  		GIOChannel *channel;
543  		int32_t res = 0;
544  	
545  		qb_enter();
546  	
547  		res = qb_array_index(gio_map, fd, (void **)&adaptor);
548  		if (res < 0) {
549  			return res;
550  		}
551  		if (adaptor->is_used && adaptor->source) {
552  			if (is_new) {
553  				return -EEXIST;
554  			}
555  			g_source_remove(adaptor->source);
556  			adaptor->source = 0;
557  		}
558  	
559  		channel = g_io_channel_unix_new(fd);
560  		if (!channel) {
561  			return -ENOMEM;
562  		}
563  	
564  		adaptor->fn = fn;
565  		adaptor->events = evts;
566  		adaptor->data = data;
567  		adaptor->p = p;
568  		adaptor->is_used++;
569  		adaptor->fd = fd;
570  	
571  		adaptor->source = g_io_add_watch_full(channel, conv_prio_libqb2glib(p),
572  		                                      evts, gio_read_socket, adaptor,
573  		                                      gio_poll_destroy);
574  	
575  		/* we are handing the channel off to be managed by mainloop now.
576  		 * remove our reference. */
577  		g_io_channel_unref(channel);
578  	
579  		return 0;
580  	}
581  	
582  	static int32_t
583  	gio_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
584  	                 void *data, qb_ipcs_dispatch_fn_t fn)
585  	{
586  		return gio_dispatch_update(p, fd, evts, data, fn, TRUE);
587  	}
588  	
589  	static int32_t
590  	gio_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
591  	                 void *data, qb_ipcs_dispatch_fn_t fn)
592  	{
593  		return gio_dispatch_update(p, fd, evts, data, fn, FALSE);
594  	}
595  	
596  	static int32_t
597  	gio_dispatch_del(int32_t fd)
598  	{
599  		struct gio_to_qb_poll *adaptor;
600  		if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
601  			g_source_remove(adaptor->source);
602  			adaptor->source = 0;
603  		}
604  		return 0;
605  	}
606  	
607  	#endif /* HAVE_GLIB */
608  	
609  	
610  	static int32_t
611  	s1_connection_closed(qb_ipcs_connection_t *c)
612  	{
613  		if (multiple_connections) {
614  			return 0;
615  		}
616  		/* Stop the connection being freed when we call qb_ipcs_disconnect()
617  		   in the callback */
618  		if (disconnect_after_created == QB_TRUE) {
619  			disconnect_after_created = QB_FALSE;
620  			return 1;
621  		}
622  	
623  		qb_enter();
624  		qb_leave();
625  		return 0;
626  	}
627  	
628  	static void
629  	outq_flush (void *data)
630  	{
631  		static int i = 0;
632  		struct cs_ipcs_conn_context *cnx;
633  		cnx = qb_ipcs_context_get(data);
634  	
635  		qb_log(LOG_DEBUG,"iter %u\n", i);
636  		i++;
637  		if (i == 2) {
638  			qb_ipcs_destroy(s1);
639  			s1 = NULL;
640  		}
641  		/* if the reference counting is not working, this should fail
642  		 * for i > 1.
643  		 */
644  		qb_ipcs_event_send(data, "test", 4);
645  		assert(memcmp(cnx, "test", 4) == 0);
646  		if (i < 5) {
647  			qb_loop_job_add(my_loop, QB_LOOP_HIGH, data, outq_flush);
648  		} else {
649  			/* this single unref should clean everything up.
650  			 */
651  			qb_ipcs_connection_unref(data);
652  			qb_log(LOG_INFO, "end of test, stopping loop");
653  			qb_loop_stop(my_loop);
654  		}
655  	}
656  	
657  	
658  	static void
659  	s1_connection_destroyed(qb_ipcs_connection_t *c)
660  	{
661  		if (multiple_connections) {
662  			return;
663  		}
664  	
665  		qb_enter();
666  		if (reference_count_test) {
667  			struct cs_ipcs_conn_context *cnx;
668  			cnx = qb_ipcs_context_get(c);
669  			free(cnx);
670  		} else {
671  			qb_loop_stop(my_loop);
672  		}
673  		qb_leave();
674  	}
675  	
676  	static int32_t
677  	s1_connection_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
678  	{
679  		if (set_perms_on_socket) {
680  			qb_ipcs_connection_auth_set(c, 555, 741, S_IRWXU|S_IRWXG|S_IROTH|S_IWOTH);
681  		}
682  		return 0;
683  	}
684  	
685  	
686  	static void
687  	s1_connection_created(qb_ipcs_connection_t *c)
688  	{
689  		uint32_t max = MAX_MSG_SIZE;
690  		if (multiple_connections) {
691  			return;
692  		}
693  	
694  		if (send_event_on_created) {
695  			struct qb_ipc_response_header response;
696  			int32_t res;
697  	
698  			response.size = sizeof(struct qb_ipc_response_header);
699  			response.id = IPC_MSG_RES_DISPATCH;
700  			response.error = 0;
701  			res = qb_ipcs_event_send(c, &response,
702  						 sizeof(response));
703  			ck_assert_int_eq(res, response.size);
704  		}
705  		if (reference_count_test) {
706  			struct cs_ipcs_conn_context *context;
707  	
708  			qb_ipcs_connection_ref(c);
709  			qb_loop_job_add(my_loop, QB_LOOP_HIGH, c, outq_flush);
710  	
711  			context = calloc(1, 20);
712  			memcpy(context, "test", 4);
713  			qb_ipcs_context_set(c, context);
714  		}
715  	
716  	
717  		ck_assert_int_le(max, qb_ipcs_connection_get_buffer_size(c));
718  	
719  	}
720  	
721  	static volatile sig_atomic_t usr1_bit;
722  	
723  	static void usr1_bit_setter(int signal) {
724  	    if (signal == SIGUSR1) {
725  	        usr1_bit = 1;
726  	    }
727  	}
728  	
729  	#define READY_SIGNALLER(name, data_arg)  void (name)(void *data_arg)
730  	typedef READY_SIGNALLER(ready_signaller_fn, );
731  	
732  	static
733  	READY_SIGNALLER(usr1_signaller, parent_target)
734  	{
735  		kill(*((pid_t *) parent_target), SIGUSR1);
736  	}
737  	
738  	#define NEW_PROCESS_RUNNER(name, ready_signaller_arg, signaller_data_arg, data_arg) \
739  		void (name)(ready_signaller_fn ready_signaller_arg, \
740  		      void *signaller_data_arg, void *data_arg)
741  	typedef NEW_PROCESS_RUNNER(new_process_runner_fn, , , );
742  	
743  	static
744  	NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data, data)
745  	{
746  		int32_t res;
747  		qb_loop_signal_handle handle;
748  	
749  		struct qb_ipcs_service_handlers sh = {
750  			.connection_accept = s1_connection_accept,
751  			.connection_created = s1_connection_created,
752  			.msg_process = s1_msg_process_fn,
753  			.connection_destroyed = s1_connection_destroyed,
754  			.connection_closed = s1_connection_closed,
755  		};
756  	
757  		struct qb_ipcs_poll_handlers ph;
758  		uint32_t max_size = MAX_MSG_SIZE;
759  	
760  		my_loop = qb_loop_create();
761  		qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM,
762  		                   NULL, exit_handler, &handle);
763  	
764  	
765  		s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh);
766  		ck_assert(s1 != 0);
767  	
768  		if (global_loop_prio != QB_LOOP_MED) {
769  			qb_ipcs_request_rate_limit(s1,
770  			                           conv_libqb_prio2ratelimit(global_loop_prio));
771  		}
772  		if (global_use_glib) {
773  	#ifdef HAVE_GLIB
774  			ph = (struct qb_ipcs_poll_handlers) {
775  				.job_add = NULL,
776  				.dispatch_add = gio_dispatch_add,
777  				.dispatch_mod = gio_dispatch_mod,
778  				.dispatch_del = gio_dispatch_del,
779  			};
780  			glib_loop = g_main_loop_new(NULL, FALSE);
781  			gio_map = qb_array_create_2(16, sizeof(struct gio_to_qb_poll), 1);
782  			ck_assert(gio_map != NULL);
783  	#else
784  			ck_assert(0);
785  	#endif
786  		} else {
787  			ph = (struct qb_ipcs_poll_handlers) {
788  				.job_add = my_job_add,
789  				.dispatch_add = my_dispatch_add,
790  				.dispatch_mod = my_dispatch_mod,
791  				.dispatch_del = my_dispatch_del,
792  			};
793  		}
794  	
795  		if (enforce_server_buffer) {
796  			qb_ipcs_enforce_buffer_size(s1, max_size);
797  		}
798  		qb_ipcs_poll_handlers_set(s1, &ph);
799  	
800  		res = qb_ipcs_run(s1);
801  		ck_assert_int_eq(res, 0);
802  	
803  		if (ready_signaller != NULL) {
804  			ready_signaller(signaller_data);
805  		}
806  	
807  		if (global_use_glib) {
808  	#ifdef HAVE_GLIB
809  			g_main_loop_run(glib_loop);
810  	#endif
811  		} else {
812  			qb_loop_run(my_loop);
813  		}
814  		qb_log(LOG_DEBUG, "loop finished - done ...");
815  	}
816  	
817  	static pid_t
818  	run_function_in_new_process(const char *role,
819  	                            new_process_runner_fn new_process_runner,
820  	                            void *data)
821  	{
822  		char formatbuf[1024];
823  		pid_t parent_target, pid1, pid2;
824  		struct sigaction orig_sa, purpose_sa;
825  		sigset_t orig_mask, purpose_mask, purpose_clear_mask;
826  	
827  		sigemptyset(&purpose_mask);
828  		sigaddset(&purpose_mask, SIGUSR1);
829  	
830  		sigprocmask(SIG_BLOCK, &purpose_mask, &orig_mask);
831  		purpose_clear_mask = orig_mask;
832  		sigdelset(&purpose_clear_mask, SIGUSR1);
833  	
834  		purpose_sa.sa_handler = usr1_bit_setter;
835  		purpose_sa.sa_mask = purpose_mask;
836  		purpose_sa.sa_flags = SA_RESTART;
837  	
838  		/* Double-fork so the servers can be reaped in a timely manner */
839  		parent_target = getpid();
840  		pid1 = fork();
841  		if (pid1 == 0) {
842  			pid2 = fork();
843  			if (pid2 == -1) {
844  				fprintf (stderr, "Can't fork twice\n");
845  				exit(0);
846  			}
847  			if (pid2 == 0) {
848  				sigprocmask(SIG_SETMASK, &orig_mask, NULL);
849  	
850  				if (role == NULL) {
851  					qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l[%P] %b");
852  				} else {
853  					snprintf(formatbuf, sizeof(formatbuf),
854  					         "lib/%%f|%%l|%s[%%P] %%b", role);
855  					qb_log_format_set(QB_LOG_STDERR, formatbuf);
856  				}
857  	
858  				new_process_runner(usr1_signaller, &parent_target, data);
859  				exit(0);
860  			} else {
861  				waitpid(pid2, NULL, 0);
862  				exit(0);
863  			}
864  		}
865  	
866  		usr1_bit = 0;
867  		/* XXX assume never fails */
868  		sigaction(SIGUSR1, &purpose_sa, &orig_sa);
869  	
870  		do {
871  			/* XXX assume never fails with EFAULT */
872  			sigsuspend(&purpose_clear_mask);
873  		} while (usr1_bit != 1);
874  		usr1_bit = 0;
875  		sigprocmask(SIG_SETMASK, &orig_mask, NULL);
876  		/* give children a slight/non-strict scheduling advantage */
877  		sched_yield();
878  	
879  		return pid1;
880  	}
881  	
882  	static void
883  	request_server_exit(void)
884  	{
885  		struct qb_ipc_request_header req_header;
886  		struct qb_ipc_response_header res_header;
887  		struct iovec iov[1];
888  		int32_t res;
889  	
890  		/*
891  		 * tell the server to exit
892  		 */
893  		req_header.id = IPC_MSG_REQ_SERVER_FAIL;
894  		req_header.size = sizeof(struct qb_ipc_request_header);
895  	
896  		iov[0].iov_len = req_header.size;
897  		iov[0].iov_base = (void*)&req_header;
898  	
899  		ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn));
900  	
901  		res = qb_ipcc_sendv_recv(conn, iov, 1,
902  					 &res_header,
903  					 sizeof(struct qb_ipc_response_header), -1);
904  		/*
905  		 * confirm we get -ENOTCONN or ECONNRESET
906  		 */
907  		if (res != -ECONNRESET && res != -ENOTCONN) {
908  			qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
909  			ck_assert_int_eq(res, -ENOTCONN);
910  		}
911  	}
912  	
913  	static void
914  	kill_server(pid_t pid)
915  	{
916  		kill(pid, SIGTERM);
917  		waitpid(pid, NULL, 0);
918  	}
919  	
920  	static int32_t
921  	verify_graceful_stop(pid_t pid)
922  	{
923  		int wait_rc = 0;
924  		int status = 0;
925  		int rc = 0;
926  		int tries;
927  	
928  		/* We need the server to be able to exit by itself */
929  		for (tries = 10;  tries >= 0; tries--) {
930  			sleep(1);
931  			wait_rc = waitpid(pid, &status, WNOHANG);
932  			if (wait_rc > 0) {
933  				break;
934  			}
935  		}
936  	
937  		ck_assert_int_eq(wait_rc, pid);
938  		rc = WIFEXITED(status);
939  		if (rc) {
940  			rc = WEXITSTATUS(status);
941  			ck_assert_int_eq(rc, 0);
942  		} else {
943  			ck_assert(rc != 0);
944  		}
945  	
946  		return 0;
947  	}
948  	
949  	struct my_req {
950  		struct qb_ipc_request_header hdr;
951  		char message[1024 * 1024];
952  	};
953  	
954  	static struct my_req request;
955  	static int32_t
956  	send_and_check(int32_t req_id, uint32_t size,
957  		       int32_t ms_timeout, int32_t expect_perfection)
958  	{
959  		struct qb_ipc_response_header res_header;
960  		int32_t res;
961  		int32_t try_times = 0;
962  		uint32_t max_size = MAX_MSG_SIZE;
963  	
964  		request.hdr.id = req_id;
965  		request.hdr.size = sizeof(struct qb_ipc_request_header) + size;
966  	
967  		/* check that we can't send a message that is too big
968  		 * and we get the right return code.
969  		 */
970  		res = qb_ipcc_send(conn, &request, max_size*2);
971  		ck_assert_int_eq(res, -EMSGSIZE);
972  	
973  	repeat_send:
974  		res = qb_ipcc_send(conn, &request, request.hdr.size);
975  		try_times++;
976  		if (res < 0) {
977  			if (res == -EAGAIN && try_times < 10) {
978  				goto repeat_send;
979  			} else {
980  				if (res == -EAGAIN && try_times >= 10) {
981  					fc_enabled = QB_TRUE;
982  				}
983  				errno = -res;
984  				qb_perror(LOG_INFO, "qb_ipcc_send");
985  				return res;
986  			}
987  		}
988  	
989  		if (req_id == IPC_MSG_REQ_DISPATCH) {
990  			res = qb_ipcc_event_recv(conn, &res_header,
991  						 sizeof(struct qb_ipc_response_header),
992  						 ms_timeout);
993  		} else {
994  			res = qb_ipcc_recv(conn, &res_header,
995  					   sizeof(struct qb_ipc_response_header),
996  					   ms_timeout);
997  		}
998  		if (res == -EINTR) {
999  			return -1;
1000 		}
1001 		if (res == -EAGAIN || res == -ETIMEDOUT) {
1002 			fc_enabled = QB_TRUE;
1003 			qb_perror(LOG_DEBUG, "qb_ipcc_recv");
1004 			return res;
1005 		}
1006 		if (expect_perfection) {
1007 			ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
1008 			ck_assert_int_eq(res_header.id, req_id + 1);
1009 			ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header));
1010 		}
1011 		return res;
1012 	}
1013 	
1014 	
1015 	static int32_t
1016 	process_async_connect(int32_t fd, int32_t revents, void *data)
1017 	{
1018 		qb_loop_t *cl = (qb_loop_t *)data;
1019 		int res;
1020 	
1021 		res = qb_ipcc_connect_continue(conn);
1022 		ck_assert_int_eq(res, 0);
1023 		qb_loop_stop(cl);
1024 		return 0;
1025 	}
1026 	static void test_ipc_connect_async(void)
1027 	{
1028 		struct qb_ipc_request_header req_header;
1029 		struct qb_ipc_response_header res_header;
1030 		int32_t res;
1031 		pid_t pid;
1032 		uint32_t max_size = MAX_MSG_SIZE;
1033 		int connect_fd;
1034 		struct iovec iov[1];
1035 		static qb_loop_t *cl;
1036 	
1037 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
1038 		ck_assert(pid != -1);
1039 	
1040 		conn = qb_ipcc_connect_async(ipc_name, max_size, &connect_fd);
1041 		ck_assert(conn != NULL);
1042 	
1043 		cl = qb_loop_create();
1044 		res = qb_loop_poll_add(cl, QB_LOOP_MED,
1045 				 connect_fd, POLLIN,
1046 				 cl, process_async_connect);
1047 		ck_assert_int_eq(res, 0);
1048 		qb_loop_run(cl);
1049 	
1050 		/* Send some data */
1051 		req_header.id = IPC_MSG_REQ_TX_RX;
1052 		req_header.size = sizeof(struct qb_ipc_request_header);
1053 	
1054 		iov[0].iov_len = req_header.size;
1055 		iov[0].iov_base = (void*)&req_header;
1056 	
1057 		res = qb_ipcc_sendv_recv(conn, iov, 1,
1058 					 &res_header,
1059 					 sizeof(struct qb_ipc_response_header), 5000);
1060 	
1061 		ck_assert_int_ge(res, 0);
1062 	
1063 		request_server_exit();
1064 		verify_graceful_stop(pid);
1065 	
1066 	
1067 		qb_ipcc_disconnect(conn);
1068 	}
1069 	
1070 	static void
1071 	test_ipc_txrx_timeout(void)
1072 	{
1073 		struct qb_ipc_request_header req_header;
1074 		struct qb_ipc_response_header res_header;
1075 		struct iovec iov[1];
1076 		int32_t res;
1077 		int32_t c = 0;
1078 		int32_t j = 0;
1079 		pid_t pid;
1080 		uint32_t max_size = MAX_MSG_SIZE;
1081 	
1082 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
1083 		ck_assert(pid != -1);
1084 	
1085 		do {
1086 			conn = qb_ipcc_connect(ipc_name, max_size);
1087 			if (conn == NULL) {
1088 				j = waitpid(pid, NULL, WNOHANG);
1089 				ck_assert_int_eq(j, 0);
1090 				poll(NULL, 0, 400);
1091 				c++;
1092 			}
1093 		} while (conn == NULL && c < 5);
1094 		ck_assert(conn != NULL);
1095 	
1096 		/* The dispatch response will only come over
1097 		 * the event channel, we want to verify the receive times
1098 		 * out when an event is returned with no response */
1099 		req_header.id = IPC_MSG_REQ_DISPATCH;
1100 		req_header.size = sizeof(struct qb_ipc_request_header);
1101 	
1102 		iov[0].iov_len = req_header.size;
1103 		iov[0].iov_base = (void*)&req_header;
1104 	
1105 		res = qb_ipcc_sendv_recv(conn, iov, 1,
1106 					 &res_header,
1107 					 sizeof(struct qb_ipc_response_header), 5000);
1108 	
1109 		ck_assert_int_eq(res, -ETIMEDOUT);
1110 	
1111 		request_server_exit();
1112 		verify_graceful_stop(pid);
1113 	
1114 		/*
1115 		 * this needs to free up the shared mem
1116 		 */
1117 		qb_ipcc_disconnect(conn);
1118 	}
1119 	
1120 	static int32_t recv_timeout = -1;
1121 	static void
1122 	test_ipc_txrx(void)
1123 	{
1124 		int32_t j;
1125 		int32_t c = 0;
1126 		size_t size;
1127 		pid_t pid;
1128 		uint32_t max_size = MAX_MSG_SIZE;
1129 	
1130 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
1131 		ck_assert(pid != -1);
1132 	
1133 		do {
1134 			conn = qb_ipcc_connect(ipc_name, max_size);
1135 			if (conn == NULL) {
1136 				j = waitpid(pid, NULL, WNOHANG);
1137 				ck_assert_int_eq(j, 0);
1138 				poll(NULL, 0, 400);
1139 				c++;
1140 			}
1141 		} while (conn == NULL && c < 5);
1142 		ck_assert(conn != NULL);
1143 	
1144 		size = QB_MIN(sizeof(struct qb_ipc_request_header), 64);
1145 		for (j = 1; j < 19; j++) {
1146 			size *= 2;
1147 			if (size >= max_size)
1148 				break;
1149 			if (send_and_check(IPC_MSG_REQ_TX_RX, size,
1150 					   recv_timeout, QB_TRUE) < 0) {
1151 				break;
1152 			}
1153 		}
1154 		if (turn_on_fc) {
1155 			/* can't signal server to shutdown if flow control is on */
1156 			ck_assert_int_eq(fc_enabled, QB_TRUE);
1157 			qb_ipcc_disconnect(conn);
1158 			/* TODO - figure out why this sleep is necessary */
1159 			sleep(1);
1160 			kill_server(pid);
1161 		} else {
1162 			request_server_exit();
1163 			qb_ipcc_disconnect(conn);
1164 			verify_graceful_stop(pid);
1165 		}
1166 	}
1167 	
1168 	static void
1169 	test_ipc_getauth(void)
1170 	{
1171 		int32_t j;
1172 		int32_t c = 0;
1173 		pid_t pid;
1174 		pid_t spid;
1175 		uid_t suid;
1176 		gid_t sgid;
1177 		int res;
1178 		uint32_t max_size = MAX_MSG_SIZE;
1179 	
1180 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
1181 		ck_assert(pid != -1);
1182 	
1183 		do {
1184 			conn = qb_ipcc_connect(ipc_name, max_size);
1185 			if (conn == NULL) {
1186 				j = waitpid(pid, NULL, WNOHANG);
1187 				ck_assert_int_eq(j, 0);
1188 				poll(NULL, 0, 400);
1189 				c++;
1190 			}
1191 		} while (conn == NULL && c < 5);
1192 		ck_assert(conn != NULL);
1193 	
1194 		res = qb_ipcc_auth_get(NULL, NULL, NULL, NULL);
1195 		ck_assert(res == -EINVAL);
1196 	
1197 		res = qb_ipcc_auth_get(conn, &spid, &suid, &sgid);
1198 		ck_assert(res == 0);
1199 	#ifndef HAVE_GETPEEREID
1200 		/* GETPEEREID doesn't return a PID */
1201 		ck_assert(spid != 0);
1202 	#endif
1203 		ck_assert(suid == getuid());
1204 		ck_assert(sgid == getgid());
1205 	
1206 		request_server_exit();
1207 		qb_ipcc_disconnect(conn);
1208 		verify_graceful_stop(pid);
1209 	}
1210 	
1211 	static void
1212 	test_ipc_exit(void)
1213 	{
1214 		struct qb_ipc_request_header req_header;
1215 		struct qb_ipc_response_header res_header;
1216 		struct iovec iov[1];
1217 		int32_t res;
1218 		int32_t c = 0;
1219 		int32_t j = 0;
1220 		pid_t pid;
1221 		uint32_t max_size = MAX_MSG_SIZE;
1222 	
1223 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
1224 		ck_assert(pid != -1);
1225 	
1226 		do {
1227 			conn = qb_ipcc_connect(ipc_name, max_size);
1228 			if (conn == NULL) {
1229 				j = waitpid(pid, NULL, WNOHANG);
1230 				ck_assert_int_eq(j, 0);
1231 				poll(NULL, 0, 400);
1232 				c++;
1233 			}
1234 		} while (conn == NULL && c < 5);
1235 		ck_assert(conn != NULL);
1236 	
1237 		req_header.id = IPC_MSG_REQ_TX_RX;
1238 		req_header.size = sizeof(struct qb_ipc_request_header);
1239 	
1240 		iov[0].iov_len = req_header.size;
1241 		iov[0].iov_base = (void*)&req_header;
1242 	
1243 		res = qb_ipcc_sendv_recv(conn, iov, 1,
1244 					 &res_header,
1245 					 sizeof(struct qb_ipc_response_header), -1);
1246 		ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
1247 	
1248 		request_server_exit();
1249 		verify_graceful_stop(pid);
1250 	
1251 		/*
1252 		 * this needs to free up the shared mem
1253 		 */
1254 		qb_ipcc_disconnect(conn);
1255 	}
1256 	
1257 	START_TEST(test_ipc_exit_us)
1258 	{
1259 		qb_enter();
1260 		ipc_type = QB_IPC_SOCKET;
1261 		set_ipc_name(__func__);
1262 		recv_timeout = 5000;
1263 		test_ipc_exit();
1264 		qb_leave();
1265 	}
1266 	END_TEST
1267 	
1268 	START_TEST(test_ipc_exit_shm)
1269 	{
1270 		qb_enter();
1271 		ipc_type = QB_IPC_SHM;
1272 		set_ipc_name(__func__);
1273 		recv_timeout = 1000;
1274 		test_ipc_exit();
1275 		qb_leave();
1276 	}
1277 	END_TEST
1278 	
1279 	START_TEST(test_ipc_txrx_shm_timeout)
1280 	{
1281 		qb_enter();
1282 		ipc_type = QB_IPC_SHM;
1283 		set_ipc_name(__func__);
1284 		test_ipc_txrx_timeout();
1285 		qb_leave();
1286 	}
1287 	END_TEST
1288 	
1289 	
1290 	START_TEST(test_ipc_txrx_us_timeout)
1291 	{
1292 		qb_enter();
1293 		ipc_type = QB_IPC_SOCKET;
1294 		set_ipc_name(__func__);
1295 		test_ipc_txrx_timeout();
1296 		qb_leave();
1297 	}
1298 	END_TEST
1299 	
1300 	START_TEST(test_ipc_shm_connect_async)
1301 	{
1302 		qb_enter();
1303 		ipc_type = QB_IPC_SHM;
1304 		set_ipc_name(__func__);
1305 		test_ipc_connect_async();
1306 		qb_leave();
1307 	}
1308 	END_TEST
1309 	
1310 	START_TEST(test_ipc_us_connect_async)
1311 	{
1312 		qb_enter();
1313 		ipc_type = QB_IPC_SOCKET;
1314 		set_ipc_name(__func__);
1315 		test_ipc_connect_async();
1316 		qb_leave();
1317 	}
1318 	END_TEST
1319 	
1320 	START_TEST(test_ipc_txrx_shm_getauth)
1321 	{
1322 		qb_enter();
1323 		ipc_type = QB_IPC_SHM;
1324 		set_ipc_name(__func__);
1325 		test_ipc_getauth();
1326 		qb_leave();
1327 	}
1328 	END_TEST
1329 	
1330 	START_TEST(test_ipc_txrx_us_getauth)
1331 	{
1332 		qb_enter();
1333 		ipc_type = QB_IPC_SOCKET;
1334 		set_ipc_name(__func__);
1335 		test_ipc_getauth();
1336 		qb_leave();
1337 	}
1338 	END_TEST
1339 	
1340 	START_TEST(test_ipc_txrx_shm_tmo)
1341 	{
1342 		qb_enter();
1343 		turn_on_fc = QB_FALSE;
1344 		ipc_type = QB_IPC_SHM;
1345 		set_ipc_name(__func__);
1346 		recv_timeout = 1000;
1347 		test_ipc_txrx();
1348 		qb_leave();
1349 	}
1350 	END_TEST
1351 	
1352 	START_TEST(test_ipc_txrx_shm_block)
1353 	{
1354 		qb_enter();
1355 		turn_on_fc = QB_FALSE;
1356 		ipc_type = QB_IPC_SHM;
1357 		set_ipc_name(__func__);
1358 		recv_timeout = -1;
1359 		test_ipc_txrx();
1360 		qb_leave();
1361 	}
1362 	END_TEST
1363 	
1364 	START_TEST(test_ipc_fc_shm)
1365 	{
1366 		qb_enter();
1367 		turn_on_fc = QB_TRUE;
1368 		ipc_type = QB_IPC_SHM;
1369 		recv_timeout = 500;
1370 		set_ipc_name(__func__);
1371 		test_ipc_txrx();
1372 		qb_leave();
1373 	}
1374 	END_TEST
1375 	
1376 	START_TEST(test_ipc_txrx_us_block)
1377 	{
1378 		qb_enter();
1379 		turn_on_fc = QB_FALSE;
1380 		ipc_type = QB_IPC_SOCKET;
1381 		set_ipc_name(__func__);
1382 		recv_timeout = -1;
1383 		test_ipc_txrx();
1384 		qb_leave();
1385 	}
1386 	END_TEST
1387 	
1388 	START_TEST(test_ipc_txrx_us_tmo)
1389 	{
1390 		qb_enter();
1391 		turn_on_fc = QB_FALSE;
1392 		ipc_type = QB_IPC_SOCKET;
1393 		set_ipc_name(__func__);
1394 		recv_timeout = 1000;
1395 		test_ipc_txrx();
1396 		qb_leave();
1397 	}
1398 	END_TEST
1399 	
1400 	START_TEST(test_ipc_fc_us)
1401 	{
1402 		qb_enter();
1403 		turn_on_fc = QB_TRUE;
1404 		ipc_type = QB_IPC_SOCKET;
1405 		recv_timeout = 500;
1406 		set_ipc_name(__func__);
1407 		test_ipc_txrx();
1408 		qb_leave();
1409 	}
1410 	END_TEST
1411 	
1412 	struct my_res {
1413 		struct qb_ipc_response_header hdr;
1414 		char message[1024 * 1024];
1415 	};
1416 	
1417 	struct dispatch_data {
1418 		pid_t server_pid;
1419 		enum my_msg_ids msg_type;
1420 		uint32_t repetitions;
1421 	};
1422 	
1423 	static inline
1424 	NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data, data)
1425 	{
1426 		uint32_t max_size = MAX_MSG_SIZE;
1427 		int32_t size;
1428 		int32_t c = 0;
1429 		int32_t j;
1430 		pid_t server_pid = ((struct dispatch_data *) data)->server_pid;
1431 		enum my_msg_ids msg_type = ((struct dispatch_data *) data)->msg_type;
1432 	
1433 		do {
1434 			conn = qb_ipcc_connect(ipc_name, max_size);
1435 			if (conn == NULL) {
1436 				j = waitpid(server_pid, NULL, WNOHANG);
1437 				ck_assert_int_eq(j, 0);
1438 				poll(NULL, 0, 400);
1439 				c++;
1440 			}
1441 		} while (conn == NULL && c < 5);
1442 		ck_assert(conn != NULL);
1443 	
1444 		if (ready_signaller != NULL) {
1445 			ready_signaller(signaller_data);
1446 		}
1447 	
1448 		size = QB_MIN(sizeof(struct qb_ipc_request_header), 64);
1449 	
1450 		for (uint32_t r = ((struct dispatch_data *) data)->repetitions;
1451 				r > 0; r--) {
1452 			for (j = 1; j < 19; j++) {
1453 				size *= 2;
1454 				if (size >= max_size)
1455 					break;
1456 				if (send_and_check(msg_type, size,
1457 						   recv_timeout, QB_TRUE) < 0) {
1458 					break;
1459 				}
1460 			}
1461 		}
1462 	}
1463 	
1464 	static void
1465 	test_ipc_dispatch(void)
1466 	{
1467 		pid_t pid;
1468 		struct dispatch_data data;
1469 	
1470 		pid = run_function_in_new_process(NULL, run_ipc_server, NULL);
1471 		ck_assert(pid != -1);
1472 		data = (struct dispatch_data){.server_pid = pid,
1473 		                              .msg_type = IPC_MSG_REQ_DISPATCH,
1474 		                              .repetitions = 1};
1475 	
1476 		client_dispatch(NULL, NULL, (void *) &data);
1477 	
1478 		request_server_exit();
1479 		qb_ipcc_disconnect(conn);
1480 		verify_graceful_stop(pid);
1481 	}
1482 	
1483 	START_TEST(test_ipc_dispatch_us)
1484 	{
1485 		qb_enter();
1486 		ipc_type = QB_IPC_SOCKET;
1487 		set_ipc_name(__func__);
1488 		test_ipc_dispatch();
1489 		qb_leave();
1490 	}
1491 	END_TEST
1492 	
1493 	static int32_t events_received;
1494 	
1495 	static int32_t
1496 	count_stress_events(int32_t fd, int32_t revents, void *data)
1497 	{
1498 		struct {
1499 			struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
1500 			char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
1501 			uint32_t sent_msgs __attribute__ ((aligned(8)));
1502 		} __attribute__ ((aligned(8))) giant_event_recv;
1503 		qb_loop_t *cl = (qb_loop_t*)data;
1504 		int32_t res;
1505 	
1506 		res = qb_ipcc_event_recv(conn, &giant_event_recv,
1507 					 sizeof(giant_event_recv),
1508 					 -1);
1509 		if (res > 0) {
1510 			events_received++;
1511 	
1512 			if ((events_received % 1000) == 0) {
1513 				qb_log(LOG_DEBUG, "RECV: %d stress events processed", events_received);
1514 				if (res != sizeof(giant_event_recv)) {
1515 					qb_log(LOG_DEBUG, "Unexpected recv size, expected %d got %d",
1516 					       sizeof(giant_event_recv), res);
1517 	
1518 					ck_assert_int_eq(res, sizeof(giant_event_recv));
1519 				} else if (giant_event_recv.sent_msgs != events_received) {
1520 					qb_log(LOG_DEBUG, "Server event mismatch. Server thinks we got %d msgs, but we only received %d",
1521 						giant_event_recv.sent_msgs, events_received);
1522 					/* This indicates that data corruption is occurring. Since the events
1523 					 * received is placed at the end of the giant msg, it is possible
1524 					 * that buffers were not allocated correctly resulting in us
1525 					 * reading/writing to uninitialized memeory at some point. */
1526 					ck_assert_int_eq(giant_event_recv.sent_msgs, events_received);
1527 				}
1528 			}
1529 		} else if (res != -EAGAIN) {
1530 			qb_perror(LOG_DEBUG, "count_stress_events");
1531 			qb_loop_stop(cl);
1532 			return -1;
1533 		}
1534 	
1535 		if (events_received >= num_stress_events) {
1536 			qb_loop_stop(cl);
1537 			return -1;
1538 		}
1539 		return 0;
1540 	}
1541 	
1542 	static int32_t
1543 	count_bulk_events(int32_t fd, int32_t revents, void *data)
1544 	{
1545 		qb_loop_t *cl = (qb_loop_t*)data;
1546 		struct qb_ipc_response_header res_header;
1547 		int32_t res;
1548 	
1549 		res = qb_ipcc_event_recv(conn, &res_header,
1550 					 sizeof(struct qb_ipc_response_header),
1551 					 -1);
1552 		if (res > 0) {
1553 			events_received++;
1554 		}
1555 	
1556 		if (events_received >= num_bulk_events) {
1557 			qb_loop_stop(cl);
1558 			return -1;
1559 		}
1560 		return 0;
1561 	}
1562 	
1563 	static void
1564 	test_ipc_stress_connections(void)
1565 	{
1566 		int32_t c = 0;
1567 		int32_t j = 0;
1568 		uint32_t max_size = MAX_MSG_SIZE;
1569 		int32_t connections = 0;
1570 		pid_t pid;
1571 	
1572 		multiple_connections = QB_TRUE;
1573 	
1574 		qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_CLEAR_ALL,
1575 				  QB_LOG_FILTER_FILE, "*", LOG_TRACE);
1576 		qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
1577 				  QB_LOG_FILTER_FILE, "*", LOG_INFO);
1578 		qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
1579 	
1580 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
1581 		ck_assert(pid != -1);
1582 	
1583 		for (connections = 1; connections < NUM_STRESS_CONNECTIONS; connections++) {
1584 			if (conn) {
1585 				qb_ipcc_disconnect(conn);
1586 				conn = NULL;
1587 			}
1588 			do {
1589 				conn = qb_ipcc_connect(ipc_name, max_size);
1590 				if (conn == NULL) {
1591 					j = waitpid(pid, NULL, WNOHANG);
1592 					ck_assert_int_eq(j, 0);
1593 					sleep(1);
1594 					c++;
1595 				}
1596 			} while (conn == NULL && c < 5);
1597 			ck_assert(conn != NULL);
1598 	
1599 			if (((connections+1) % 1000) == 0) {
1600 				qb_log(LOG_INFO, "%d ipc connections made", connections+1);
1601 			}
1602 		}
1603 		multiple_connections = QB_FALSE;
1604 	
1605 		/* Re-enable logging here so we get the "Free'ing" message which allows
1606 		   for resources.test to clear up after us if needed */
1607 		qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_CLEAR_ALL,
1608 				  QB_LOG_FILTER_FILE, "*", LOG_TRACE);
1609 		qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
1610 				  QB_LOG_FILTER_FILE, "*", LOG_TRACE);
1611 		qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
1612 	
1613 		request_server_exit();
1614 		qb_ipcc_disconnect(conn);
1615 		verify_graceful_stop(pid);
1616 	
1617 	}
1618 	
1619 	static void
1620 	test_ipc_bulk_events(void)
1621 	{
1622 		int32_t c = 0;
1623 		int32_t j = 0;
1624 		pid_t pid;
1625 		int32_t res;
1626 		qb_loop_t *cl;
1627 		int32_t fd;
1628 		uint32_t max_size = MAX_MSG_SIZE;
1629 	
1630 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
1631 		ck_assert(pid != -1);
1632 	
1633 		do {
1634 			conn = qb_ipcc_connect(ipc_name, max_size);
1635 			if (conn == NULL) {
1636 				j = waitpid(pid, NULL, WNOHANG);
1637 				ck_assert_int_eq(j, 0);
1638 				poll(NULL, 0, 400);
1639 				c++;
1640 			}
1641 		} while (conn == NULL && c < 5);
1642 		ck_assert(conn != NULL);
1643 	
1644 		events_received = 0;
1645 		cl = qb_loop_create();
1646 		res = qb_ipcc_fd_get(conn, &fd);
1647 		ck_assert_int_eq(res, 0);
1648 		res = qb_loop_poll_add(cl, QB_LOOP_MED,
1649 				 fd, POLLIN,
1650 				 cl, count_bulk_events);
1651 		ck_assert_int_eq(res, 0);
1652 	
1653 		res = send_and_check(IPC_MSG_REQ_BULK_EVENTS,
1654 				     0,
1655 				     recv_timeout, QB_TRUE);
1656 		ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
1657 	
1658 		qb_loop_run(cl);
1659 		ck_assert_int_eq(events_received, num_bulk_events);
1660 	
1661 		request_server_exit();
1662 		qb_ipcc_disconnect(conn);
1663 		verify_graceful_stop(pid);
1664 	}
1665 	
1666 	static void
1667 	test_ipc_stress_test(void)
1668 	{
1669 		struct {
1670 			struct qb_ipc_request_header hdr __attribute__ ((aligned(8)));
1671 			char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
1672 			uint32_t sent_msgs __attribute__ ((aligned(8)));
1673 		} __attribute__ ((aligned(8))) giant_req;
1674 	
1675 		struct qb_ipc_response_header res_header;
1676 		struct iovec iov[1];
1677 		int32_t c = 0;
1678 		int32_t j = 0;
1679 		pid_t pid;
1680 		int32_t res;
1681 		qb_loop_t *cl;
1682 		int32_t fd;
1683 		uint32_t max_size = MAX_MSG_SIZE;
1684 		/* This looks strange, but it serves an important purpose.
1685 		 * This test forces the server to enforce the MAX_MSG_SIZE
1686 		 * limit from the server side, which overrides the client's
1687 		 * buffer limit.  To verify this functionality is working
1688 		 * we set the client limit lower than what the server
1689 		 * is enforcing. */
1690 		int32_t client_buf_size = max_size - 1024;
1691 		int32_t real_buf_size;
1692 	
1693 		enforce_server_buffer = 1;
1694 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
1695 		enforce_server_buffer = 0;
1696 		ck_assert(pid != -1);
1697 	
1698 		do {
1699 			conn = qb_ipcc_connect(ipc_name, client_buf_size);
1700 			if (conn == NULL) {
1701 				j = waitpid(pid, NULL, WNOHANG);
1702 				ck_assert_int_eq(j, 0);
1703 				poll(NULL, 0, 400);
1704 				c++;
1705 			}
1706 		} while (conn == NULL && c < 5);
1707 		ck_assert(conn != NULL);
1708 	
1709 		real_buf_size = qb_ipcc_get_buffer_size(conn);
1710 		ck_assert_int_ge(real_buf_size, max_size);
1711 	
1712 		qb_log(LOG_DEBUG, "Testing %d iterations of EVENT msg passing.", num_stress_events);
1713 	
1714 		events_received = 0;
1715 		cl = qb_loop_create();
1716 		res = qb_ipcc_fd_get(conn, &fd);
1717 		ck_assert_int_eq(res, 0);
1718 		res = qb_loop_poll_add(cl, QB_LOOP_MED,
1719 				 fd, POLLIN,
1720 				 cl, count_stress_events);
1721 		ck_assert_int_eq(res, 0);
1722 	
1723 		res = send_and_check(IPC_MSG_REQ_STRESS_EVENT, 0, recv_timeout, QB_TRUE);
1724 	
1725 		qb_loop_run(cl);
1726 		ck_assert_int_eq(events_received, num_stress_events);
1727 	
1728 		giant_req.hdr.id = IPC_MSG_REQ_SERVER_FAIL;
1729 		giant_req.hdr.size = sizeof(giant_req);
1730 	
1731 		if (giant_req.hdr.size <= client_buf_size) {
1732 			ck_assert_int_eq(1, 0);
1733 		}
1734 	
1735 		iov[0].iov_len = giant_req.hdr.size;
1736 		iov[0].iov_base = (void*)&giant_req;
1737 		res = qb_ipcc_sendv_recv(conn, iov, 1,
1738 					 &res_header,
1739 					 sizeof(struct qb_ipc_response_header), -1);
1740 		if (res != -ECONNRESET && res != -ENOTCONN) {
1741 			qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
1742 			ck_assert_int_eq(res, -ENOTCONN);
1743 		}
1744 	
1745 		qb_ipcc_disconnect(conn);
1746 		verify_graceful_stop(pid);
1747 	}
1748 	
1749 	#ifndef __clang__ /* see variable length array in structure' at the top */
1750 	START_TEST(test_ipc_stress_test_us)
1751 	{
1752 		qb_enter();
1753 		send_event_on_created = QB_FALSE;
1754 		ipc_type = QB_IPC_SOCKET;
1755 		set_ipc_name(__func__);
1756 		test_ipc_stress_test();
1757 		qb_leave();
1758 	}
1759 	END_TEST
1760 	#endif
1761 	
1762 	START_TEST(test_ipc_stress_connections_us)
1763 	{
1764 		qb_enter();
1765 		ipc_type = QB_IPC_SOCKET;
1766 		set_ipc_name(__func__);
1767 		test_ipc_stress_connections();
1768 		qb_leave();
1769 	}
1770 	END_TEST
1771 	
1772 	START_TEST(test_ipc_bulk_events_us)
1773 	{
1774 		qb_enter();
1775 		ipc_type = QB_IPC_SOCKET;
1776 		set_ipc_name(__func__);
1777 		test_ipc_bulk_events();
1778 		qb_leave();
1779 	}
1780 	END_TEST
1781 	
1782 	static
1783 	READY_SIGNALLER(connected_signaller, _)
1784 	{
1785 		request_server_exit();
1786 	}
1787 	
1788 	START_TEST(test_ipc_us_native_prio_dlock)
1789 	{
1790 		pid_t server_pid, alphaclient_pid;
1791 		struct dispatch_data data;
1792 	
1793 		qb_enter();
1794 		ipc_type = QB_IPC_SOCKET;
1795 		set_ipc_name(__func__);
1796 	
1797 		/* this is to demonstrate that native event loop can deal even
1798 		   with "extreme" priority disproportions */
1799 		global_loop_prio = QB_LOOP_LOW;
1800 		multiple_connections = QB_TRUE;
1801 		recv_timeout = -1;
1802 	
1803 		server_pid = run_function_in_new_process("server", run_ipc_server,
1804 		                                         NULL);
1805 		ck_assert(server_pid != -1);
1806 		data = (struct dispatch_data){.server_pid = server_pid,
1807 		                              .msg_type = IPC_MSG_REQ_SELF_FEED,
1808 		                              .repetitions = 1};
1809 		alphaclient_pid = run_function_in_new_process("alphaclient",
1810 		                                              client_dispatch,
1811 		                                              (void *) &data);
1812 		ck_assert(alphaclient_pid != -1);
1813 	
1814 		//sleep(1);
1815 		sched_yield();
1816 	
1817 		data.repetitions = 0;
1818 		client_dispatch(connected_signaller, NULL, (void *) &data);
1819 		verify_graceful_stop(server_pid);
1820 	
1821 		multiple_connections = QB_FALSE;
1822 		qb_leave();
1823 	}
1824 	END_TEST
1825 	
1826 	#if HAVE_GLIB
1827 	START_TEST(test_ipc_us_glib_prio_dlock)
1828 	{
1829 		pid_t server_pid, alphaclient_pid;
1830 		struct dispatch_data data;
1831 	
1832 		qb_enter();
1833 		ipc_type = QB_IPC_SOCKET;
1834 		set_ipc_name(__func__);
1835 	
1836 		global_use_glib = QB_TRUE;
1837 		/* this is to make the test pass at all, since GLib is strict
1838 		   on priorities -- QB_LOOP_MED or lower would fail for sure */
1839 		global_loop_prio = QB_LOOP_HIGH;
1840 		multiple_connections = QB_TRUE;
1841 		recv_timeout = -1;
1842 	
1843 		server_pid = run_function_in_new_process("server", run_ipc_server,
1844 		                                         NULL);
1845 		ck_assert(server_pid != -1);
1846 		data = (struct dispatch_data){.server_pid = server_pid,
1847 		                              .msg_type = IPC_MSG_REQ_SELF_FEED,
1848 		                              .repetitions = 1};
1849 		alphaclient_pid = run_function_in_new_process("alphaclient",
1850 		                                              client_dispatch,
1851 		                                              (void *) &data);
1852 		ck_assert(alphaclient_pid != -1);
1853 	
1854 		//sleep(1);
1855 		sched_yield();
1856 	
1857 		data.repetitions = 0;
1858 		client_dispatch(connected_signaller, NULL, (void *) &data);
1859 		verify_graceful_stop(server_pid);
1860 	
1861 		multiple_connections = QB_FALSE;
1862 		global_loop_prio = QB_LOOP_MED;
1863 		global_use_glib = QB_FALSE;
1864 		qb_leave();
1865 	}
1866 	END_TEST
1867 	#endif
1868 	
1869 	static void
1870 	test_ipc_event_on_created(void)
1871 	{
1872 		int32_t c = 0;
1873 		int32_t j = 0;
1874 		pid_t pid;
1875 		int32_t res;
1876 		qb_loop_t *cl;
1877 		int32_t fd;
1878 		uint32_t max_size = MAX_MSG_SIZE;
1879 	
1880 		num_bulk_events = 1;
1881 	
1882 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
1883 		ck_assert(pid != -1);
1884 	
1885 		do {
1886 			conn = qb_ipcc_connect(ipc_name, max_size);
1887 			if (conn == NULL) {
1888 				j = waitpid(pid, NULL, WNOHANG);
1889 				ck_assert_int_eq(j, 0);
1890 				poll(NULL, 0, 400);
1891 				c++;
1892 			}
1893 		} while (conn == NULL && c < 5);
1894 		ck_assert(conn != NULL);
1895 	
1896 		events_received = 0;
1897 		cl = qb_loop_create();
1898 		res = qb_ipcc_fd_get(conn, &fd);
1899 		ck_assert_int_eq(res, 0);
1900 		res = qb_loop_poll_add(cl, QB_LOOP_MED,
1901 				 fd, POLLIN,
1902 				 cl, count_bulk_events);
1903 		ck_assert_int_eq(res, 0);
1904 	
1905 		qb_loop_run(cl);
1906 		ck_assert_int_eq(events_received, num_bulk_events);
1907 	
1908 		request_server_exit();
1909 		qb_ipcc_disconnect(conn);
1910 		verify_graceful_stop(pid);
1911 	}
1912 	
1913 	START_TEST(test_ipc_event_on_created_us)
1914 	{
1915 		qb_enter();
1916 		send_event_on_created = QB_TRUE;
1917 		ipc_type = QB_IPC_SOCKET;
1918 		set_ipc_name(__func__);
1919 		test_ipc_event_on_created();
1920 		qb_leave();
1921 	}
1922 	END_TEST
1923 	
1924 	static void
1925 	test_ipc_disconnect_after_created(void)
1926 	{
1927 		struct qb_ipc_request_header req_header;
1928 		struct qb_ipc_response_header res_header;
1929 		struct iovec iov[1];
1930 		int32_t c = 0;
1931 		int32_t j = 0;
1932 		pid_t pid;
1933 		int32_t res;
1934 		uint32_t max_size = MAX_MSG_SIZE;
1935 	
1936 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
1937 		ck_assert(pid != -1);
1938 	
1939 		do {
1940 			conn = qb_ipcc_connect(ipc_name, max_size);
1941 			if (conn == NULL) {
1942 				j = waitpid(pid, NULL, WNOHANG);
1943 				ck_assert_int_eq(j, 0);
1944 				poll(NULL, 0, 400);
1945 				c++;
1946 			}
1947 		} while (conn == NULL && c < 5);
1948 		ck_assert(conn != NULL);
1949 	
1950 		ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn));
1951 	
1952 		req_header.id = IPC_MSG_REQ_SERVER_DISCONNECT;
1953 		req_header.size = sizeof(struct qb_ipc_request_header);
1954 	
1955 		iov[0].iov_len = req_header.size;
1956 		iov[0].iov_base = (void*)&req_header;
1957 	
1958 		res = qb_ipcc_sendv_recv(conn, iov, 1,
1959 					 &res_header,
1960 					 sizeof(struct qb_ipc_response_header), -1);
1961 		/*
1962 		 * confirm we get -ENOTCONN or -ECONNRESET
1963 		 */
1964 		if (res != -ECONNRESET && res != -ENOTCONN) {
1965 			qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
1966 			ck_assert_int_eq(res, -ENOTCONN);
1967 		}
1968 		ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn));
1969 		qb_ipcc_disconnect(conn);
1970 		sleep(1); /* Give it time to stop */
1971 		kill_server(pid);
1972 	}
1973 	
1974 	START_TEST(test_ipc_disconnect_after_created_us)
1975 	{
1976 		qb_enter();
1977 		disconnect_after_created = QB_TRUE;
1978 		ipc_type = QB_IPC_SOCKET;
1979 		set_ipc_name(__func__);
1980 		test_ipc_disconnect_after_created();
1981 		qb_leave();
1982 	}
1983 	END_TEST
1984 	
1985 	static void
1986 	test_ipc_server_fail(void)
1987 	{
1988 		int32_t j;
1989 		int32_t c = 0;
1990 		pid_t pid;
1991 		uint32_t max_size = MAX_MSG_SIZE;
1992 	
1993 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
1994 		ck_assert(pid != -1);
1995 	
1996 		do {
1997 			conn = qb_ipcc_connect(ipc_name, max_size);
1998 			if (conn == NULL) {
1999 				j = waitpid(pid, NULL, WNOHANG);
2000 				ck_assert_int_eq(j, 0);
2001 				poll(NULL, 0, 400);
2002 				c++;
2003 			}
2004 		} while (conn == NULL && c < 5);
2005 		ck_assert(conn != NULL);
2006 	
2007 		request_server_exit();
2008 		if (_fi_unlink_inject_failure == QB_TRUE) {
2009 			_fi_truncate_called = _fi_openat_called = 0;
2010 		}
2011 		ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn));
2012 		qb_ipcc_disconnect(conn);
2013 		if (_fi_unlink_inject_failure == QB_TRUE) {
2014 			ck_assert_int_ne(_fi_truncate_called + _fi_openat_called, 0);
2015 		}
2016 		verify_graceful_stop(pid);
2017 	}
2018 	
2019 	START_TEST(test_ipc_server_fail_soc)
2020 	{
2021 		qb_enter();
2022 		ipc_type = QB_IPC_SOCKET;
2023 		set_ipc_name(__func__);
2024 		test_ipc_server_fail();
2025 		qb_leave();
2026 	}
2027 	END_TEST
2028 	
2029 	START_TEST(test_ipc_dispatch_shm)
2030 	{
2031 		qb_enter();
2032 		ipc_type = QB_IPC_SHM;
2033 		set_ipc_name(__func__);
2034 		test_ipc_dispatch();
2035 		qb_leave();
2036 	}
2037 	END_TEST
2038 	
2039 	START_TEST(test_ipc_stress_test_shm)
2040 	{
2041 		qb_enter();
2042 		send_event_on_created = QB_FALSE;
2043 		ipc_type = QB_IPC_SHM;
2044 		set_ipc_name(__func__);
2045 		test_ipc_stress_test();
2046 		qb_leave();
2047 	}
2048 	END_TEST
2049 	
2050 	START_TEST(test_ipc_stress_connections_shm)
2051 	{
2052 		qb_enter();
2053 		ipc_type = QB_IPC_SHM;
2054 		set_ipc_name(__func__);
2055 		test_ipc_stress_connections();
2056 		qb_leave();
2057 	}
2058 	END_TEST
2059 	
2060 	// Check perms uses illegal access to libqb internals
2061 	// DO NOT try this at home.
2062 	#include "../lib/ipc_int.h"
2063 	#include "../lib/ringbuffer_int.h"
2064 	START_TEST(test_ipc_server_perms)
2065 	{
2066 		pid_t pid;
2067 		struct stat st;
2068 		int j;
2069 		uint32_t max_size;
2070 		int res;
2071 		int c = 0;
2072 	
2073 		// Can only test this if we are root
2074 		if (getuid() != 0) {
2075 			return;
2076 		}
2077 	
2078 		ipc_type = QB_IPC_SHM;
2079 		set_perms_on_socket = QB_TRUE;
2080 		max_size = MAX_MSG_SIZE;
2081 	
2082 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
2083 		ck_assert(pid != -1);
2084 	
2085 		do {
2086 			conn = qb_ipcc_connect(ipc_name, max_size);
2087 			if (conn == NULL) {
2088 				j = waitpid(pid, NULL, WNOHANG);
2089 				ck_assert_int_eq(j, 0);
2090 				poll(NULL, 0, 400);
2091 				c++;
2092 			}
2093 		} while (conn == NULL && c < 5);
2094 		ck_assert(conn != NULL);
2095 	
2096 		/* Check perms - uses illegal access to libqb internals */
2097 	
2098 		/* BSD uses /var/run for sockets so we can't alter the perms on the
2099 		   directory */
2100 	#ifdef __linux__
2101 		char sockdir[PATH_MAX];
2102 		strcpy(sockdir, conn->request.u.shm.rb->shared_hdr->hdr_path);
2103 		*strrchr(sockdir, '/') = 0;
2104 	
2105 		res = stat(sockdir, &st);
2106 	
2107 		ck_assert_int_eq(res, 0);
2108 		ck_assert(st.st_mode & S_IRWXG);
2109 		ck_assert_int_eq(st.st_uid, 555);
2110 		ck_assert_int_eq(st.st_gid, 741);
2111 	#endif
2112 	
2113 		res = stat(conn->request.u.shm.rb->shared_hdr->hdr_path, &st);
2114 		ck_assert_int_eq(res, 0);
2115 		ck_assert_int_eq(st.st_uid, 555);
2116 		ck_assert_int_eq(st.st_gid, 741);
2117 	
2118 		qb_ipcc_disconnect(conn);
2119 		verify_graceful_stop(pid);
2120 	}
2121 	END_TEST
2122 	
2123 	START_TEST(test_ipc_disp_shm_native_prio_dlock)
2124 	{
2125 		pid_t server_pid, alphaclient_pid;
2126 		struct dispatch_data data;
2127 	
2128 		qb_enter();
2129 		ipc_type = QB_IPC_SHM;
2130 		set_ipc_name(__func__);
2131 	
2132 		/* this is to demonstrate that native event loop can deal even
2133 		   with "extreme" priority disproportions */
2134 		global_loop_prio = QB_LOOP_LOW;
2135 		multiple_connections = QB_TRUE;
2136 		recv_timeout = -1;
2137 	
2138 		server_pid = run_function_in_new_process("server", run_ipc_server,
2139 		                                         NULL);
2140 		ck_assert(server_pid != -1);
2141 		data = (struct dispatch_data){.server_pid = server_pid,
2142 		                              .msg_type = IPC_MSG_REQ_SELF_FEED,
2143 		                              .repetitions = 1};
2144 		alphaclient_pid = run_function_in_new_process("alphaclient",
2145 		                                              client_dispatch,
2146 		                                              (void *) &data);
2147 		ck_assert(alphaclient_pid != -1);
2148 	
2149 		//sleep(1);
2150 		sched_yield();
2151 	
2152 		data.repetitions = 0;
2153 		client_dispatch(connected_signaller, NULL, (void *) &data);
2154 		verify_graceful_stop(server_pid);
2155 	
2156 		multiple_connections = QB_FALSE;
2157 		qb_leave();
2158 	}
2159 	END_TEST
2160 	
2161 	#if HAVE_GLIB
2162 	START_TEST(test_ipc_disp_shm_glib_prio_dlock)
2163 	{
2164 		pid_t server_pid, alphaclient_pid;
2165 		struct dispatch_data data;
2166 	
2167 		qb_enter();
2168 		ipc_type = QB_IPC_SOCKET;
2169 		set_ipc_name(__func__);
2170 	
2171 		global_use_glib = QB_TRUE;
2172 		/* this is to make the test pass at all, since GLib is strict
2173 		   on priorities -- QB_LOOP_MED or lower would fail for sure */
2174 		global_loop_prio = QB_LOOP_HIGH;
2175 		multiple_connections = QB_TRUE;
2176 		recv_timeout = -1;
2177 	
2178 		server_pid = run_function_in_new_process("server", run_ipc_server,
2179 		                                         NULL);
2180 		ck_assert(server_pid != -1);
2181 		data = (struct dispatch_data){.server_pid = server_pid,
2182 		                              .msg_type = IPC_MSG_REQ_SELF_FEED,
2183 		                              .repetitions = 1};
2184 		alphaclient_pid = run_function_in_new_process("alphaclient",
2185 		                                              client_dispatch,
2186 		                                              (void *) &data);
2187 		ck_assert(alphaclient_pid != -1);
2188 	
2189 		//sleep(1);
2190 		sched_yield();
2191 	
2192 		data.repetitions = 0;
2193 		client_dispatch(connected_signaller, NULL, (void *) &data);
2194 		verify_graceful_stop(server_pid);
2195 	
2196 		multiple_connections = QB_FALSE;
2197 		global_loop_prio = QB_LOOP_MED;
2198 		global_use_glib = QB_FALSE;
2199 		qb_leave();
2200 	}
2201 	END_TEST
2202 	#endif
2203 	
2204 	START_TEST(test_ipc_bulk_events_shm)
2205 	{
2206 		qb_enter();
2207 		ipc_type = QB_IPC_SHM;
2208 		set_ipc_name(__func__);
2209 		test_ipc_bulk_events();
2210 		qb_leave();
2211 	}
2212 	END_TEST
2213 	
2214 	START_TEST(test_ipc_event_on_created_shm)
2215 	{
2216 		qb_enter();
2217 		send_event_on_created = QB_TRUE;
2218 		ipc_type = QB_IPC_SHM;
2219 		set_ipc_name(__func__);
2220 		test_ipc_event_on_created();
2221 		qb_leave();
2222 	}
2223 	END_TEST
2224 	
2225 	START_TEST(test_ipc_server_fail_shm)
2226 	{
2227 		qb_enter();
2228 		ipc_type = QB_IPC_SHM;
2229 		set_ipc_name(__func__);
2230 		test_ipc_server_fail();
2231 		qb_leave();
2232 	}
2233 	END_TEST
2234 	
2235 	#ifdef HAVE_FAILURE_INJECTION
2236 	START_TEST(test_ipcc_truncate_when_unlink_fails_shm)
2237 	{
2238 		char sock_file[PATH_MAX];
2239 		struct sockaddr_un socka;
2240 	
2241 		qb_enter();
2242 		ipc_type = QB_IPC_SHM;
2243 		set_ipc_name(__func__);
2244 	
2245 		sprintf(sock_file, "%s/%s", SOCKETDIR, ipc_name);
2246 		sock_file[sizeof(socka.sun_path)] = '\0';
2247 	
2248 		/* If there's an old socket left from a previous run this test will fail
2249 		   unexpectedly, so try to remove it first */
2250 		unlink(sock_file);
2251 	
2252 		_fi_unlink_inject_failure = QB_TRUE;
2253 		test_ipc_server_fail();
2254 		_fi_unlink_inject_failure = QB_FALSE;
2255 		unlink(sock_file);
2256 		qb_leave();
2257 	}
2258 	END_TEST
2259 	#endif
2260 	
2261 	static void
2262 	test_ipc_service_ref_count(void)
2263 	{
2264 		int32_t c = 0;
2265 		int32_t j = 0;
2266 		pid_t pid;
2267 		uint32_t max_size = MAX_MSG_SIZE;
2268 	
2269 		reference_count_test = QB_TRUE;
2270 	
2271 		pid = run_function_in_new_process("server", run_ipc_server, NULL);
2272 		ck_assert(pid != -1);
2273 	
2274 		do {
2275 			conn = qb_ipcc_connect(ipc_name, max_size);
2276 			if (conn == NULL) {
2277 				j = waitpid(pid, NULL, WNOHANG);
2278 				ck_assert_int_eq(j, 0);
2279 				(void)poll(NULL, 0, 400);
2280 				c++;
2281 			}
2282 		} while (conn == NULL && c < 5);
2283 		ck_assert(conn != NULL);
2284 	
2285 		sleep(5);
2286 	
2287 		kill_server(pid);
2288 	}
2289 	
2290 	
2291 	START_TEST(test_ipc_service_ref_count_shm)
2292 	{
2293 		qb_enter();
2294 		ipc_type = QB_IPC_SHM;
2295 		set_ipc_name(__func__);
2296 		test_ipc_service_ref_count();
2297 		qb_leave();
2298 	}
2299 	END_TEST
2300 	
2301 	START_TEST(test_ipc_service_ref_count_us)
2302 	{
2303 		qb_enter();
2304 		ipc_type = QB_IPC_SOCKET;
2305 		set_ipc_name(__func__);
2306 		test_ipc_service_ref_count();
2307 		qb_leave();
2308 	}
2309 	END_TEST
2310 	
2311 	#if 0
2312 	static void test_max_dgram_size(void)
2313 	{
2314 		/* most implementations will not let you set a dgram buffer
2315 		 * of 1 million bytes. This test verifies that the we can detect
2316 		 * the max dgram buffersize regardless, and that the value we detect
2317 		 * is consistent. */
2318 		int32_t init;
2319 		int32_t i;
2320 	
2321 		qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_REMOVE,
2322 				  QB_LOG_FILTER_FILE, "*", LOG_TRACE);
2323 	
2324 		init = qb_ipcc_verify_dgram_max_msg_size(1000000);
2325 		ck_assert(init > 0);
2326 		for (i = 0; i < 100; i++) {
2327 			int try = qb_ipcc_verify_dgram_max_msg_size(1000000);
2328 	#if 0
2329 			ck_assert_int_eq(init, try);
2330 	#else
2331 			/* extra troubleshooting, report also on i and errno variables;
2332 			   related: https://github.com/ClusterLabs/libqb/issues/234 */
2333 			if (init != try) {
2334 	#ifdef ci_dump_shm_usage
2335 				system("df -h | grep -e /shm >/tmp/_shm_usage");
2336 	#endif
2337 				ck_abort_msg("Assertion 'init==try' failed:"
2338 					     " init==%#x, try==%#x, i=%d, errno=%d",
2339 					     init, try, i, errno);
2340 			}
2341 	#endif
2342 		}
2343 	
2344 		qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
2345 				  QB_LOG_FILTER_FILE, "*", LOG_TRACE);
2346 	}
2347 	
2348 	START_TEST(test_ipc_max_dgram_size)
2349 	{
2350 		qb_enter();
2351 		test_max_dgram_size();
2352 		qb_leave();
2353 	}
2354 	END_TEST
2355 	#endif
2356 	
2357 	static Suite *
2358 	make_shm_suite(void)
2359 	{
2360 		TCase *tc;
2361 		Suite *s = suite_create("shm");
2362 	
2363 		add_tcase(s, tc, test_ipc_shm_connect_async, 7);
2364 	
2365 		add_tcase(s, tc, test_ipc_txrx_shm_getauth, 7);
2366 		add_tcase(s, tc, test_ipc_txrx_shm_timeout, 28);
2367 		add_tcase(s, tc, test_ipc_server_fail_shm, 7);
2368 		add_tcase(s, tc, test_ipc_txrx_shm_block, 7);
2369 		add_tcase(s, tc, test_ipc_txrx_shm_tmo, 7);
2370 		add_tcase(s, tc, test_ipc_fc_shm, 7);
2371 		add_tcase(s, tc, test_ipc_dispatch_shm, 15);
2372 		add_tcase(s, tc, test_ipc_stress_test_shm, 15);
2373 		add_tcase(s, tc, test_ipc_bulk_events_shm, 15);
2374 		add_tcase(s, tc, test_ipc_exit_shm, 6);
2375 		add_tcase(s, tc, test_ipc_event_on_created_shm, 9);
2376 		add_tcase(s, tc, test_ipc_service_ref_count_shm, 9);
2377 		add_tcase(s, tc, test_ipc_server_perms, 7);
2378 		add_tcase(s, tc, test_ipc_stress_connections_shm, 3600 /* ? */);
2379 		add_tcase(s, tc, test_ipc_disp_shm_native_prio_dlock, 15);
2380 	#if HAVE_GLIB
2381 		add_tcase(s, tc, test_ipc_disp_shm_glib_prio_dlock, 15);
2382 	#endif
2383 	#ifdef HAVE_FAILURE_INJECTION
2384 		add_tcase(s, tc, test_ipcc_truncate_when_unlink_fails_shm, 8);
2385 	#endif
2386 	
2387 		return s;
2388 	}
2389 	
2390 	static Suite *
2391 	make_soc_suite(void)
2392 	{
2393 		Suite *s = suite_create("socket");
2394 		TCase *tc;
2395 	
2396 		add_tcase(s, tc, test_ipc_us_connect_async, 7);
2397 	
2398 		add_tcase(s, tc, test_ipc_txrx_us_getauth, 7);
2399 		add_tcase(s, tc, test_ipc_txrx_us_timeout, 28);
2400 	/* Commented out for the moment as space in /dev/shm on the CI machines
2401 	   causes random failures */
2402 	/*	add_tcase(s, tc, test_ipc_max_dgram_size, 30); */
2403 		add_tcase(s, tc, test_ipc_server_fail_soc, 7);
2404 		add_tcase(s, tc, test_ipc_txrx_us_block, 7);
2405 		add_tcase(s, tc, test_ipc_txrx_us_tmo, 7);
2406 		add_tcase(s, tc, test_ipc_fc_us, 7);
2407 		add_tcase(s, tc, test_ipc_exit_us, 6);
2408 		add_tcase(s, tc, test_ipc_dispatch_us, 15);
2409 	#ifndef __clang__ /* see variable length array in structure' at the top */
2410 		add_tcase(s, tc, test_ipc_stress_test_us, 58);
2411 	#endif
2412 		add_tcase(s, tc, test_ipc_bulk_events_us, 15);
2413 		add_tcase(s, tc, test_ipc_event_on_created_us, 9);
2414 		add_tcase(s, tc, test_ipc_disconnect_after_created_us, 9);
2415 		add_tcase(s, tc, test_ipc_service_ref_count_us, 9);
2416 		add_tcase(s, tc, test_ipc_stress_connections_us, 3600 /* ? */);
2417 		add_tcase(s, tc, test_ipc_us_native_prio_dlock, 15);
2418 	#if HAVE_GLIB
2419 		add_tcase(s, tc, test_ipc_us_glib_prio_dlock, 15);
2420 	#endif
2421 	
2422 		return s;
2423 	}
2424 	
2425 	int32_t
2426 	main(void)
2427 	{
2428 		int32_t number_failed;
2429 		SRunner *sr;
2430 		Suite *s;
2431 		int32_t do_shm_tests = QB_TRUE;
2432 	
2433 		set_ipc_name("ipc_test");
2434 	#ifdef DISABLE_IPC_SHM
2435 		do_shm_tests = QB_FALSE;
2436 	#endif /* DISABLE_IPC_SHM */
2437 	
2438 		s = make_soc_suite();
2439 		sr = srunner_create(s);
2440 	
2441 		if (do_shm_tests) {
2442 			srunner_add_suite(sr, make_shm_suite());
2443 		}
2444 	
2445 		qb_log_init("check", LOG_USER, LOG_EMERG);
2446 		atexit(qb_log_fini);
2447 		qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
2448 		qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
2449 				  QB_LOG_FILTER_FILE, "*", LOG_TRACE);
2450 		qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
2451 		qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l| %b");
2452 	
2453 		srunner_run_all(sr, CK_VERBOSE);
2454 		number_failed = srunner_ntests_failed(sr);
2455 		srunner_free(sr);
2456 		return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
2457 	}
2458