1    	/*
2    	 * Copyright (c) 2011 Red Hat, Inc.
3    	 *
4    	 * All rights reserved.
5    	 *
6    	 * Author: Angus Salkeld <asalkeld@redhat.com>
7    	 *
8    	 * libqb is free software: you can redistribute it and/or modify
9    	 * it under the terms of the GNU Lesser General Public License as published by
10   	 * the Free Software Foundation, either version 2.1 of the License, or
11   	 * (at your option) any later version.
12   	 *
13   	 * libqb is distributed in the hope that it will be useful,
14   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   	 * GNU Lesser General Public License for more details.
17   	 *
18   	 * You should have received a copy of the GNU Lesser General Public License
19   	 * along with libqb.  If not, see <http://www.gnu.org/licenses/>.
20   	 */
21   	#include "os_base.h"
22   	#include <signal.h>
23   	
24   	#include <qb/qbarray.h>
25   	#include <qb/qbdefs.h>
26   	#include <qb/qbutil.h>
27   	#include <qb/qblog.h>
28   	#include <qb/qbloop.h>
29   	#include <qb/qbipcs.h>
30   	
31   	#ifdef HAVE_GLIB
32   	#include <glib.h>
33   	static GMainLoop *glib_loop;
34   	static qb_array_t *gio_map;
35   	#endif /* HAVE_GLIB */
36   	
37   	#define ONE_MEG 1048576
38   	
39   	static int32_t use_glib = QB_FALSE;
40   	static int32_t use_events = QB_FALSE;
41   	static qb_loop_t *bms_loop;
42   	static qb_ipcs_service_t *s1;
43   	
44   	static int32_t
45   	s1_connection_accept_fn(qb_ipcs_connection_t * c, uid_t uid, gid_t gid)
46   	{
47   	#if 0
48   		if (uid == 0 && gid == 0) {
49   			qb_log(LOG_INFO, "Authenticated connection");
50   			return 1;
51   		}
52   		qb_log(LOG_NOTICE, "BAD user!");
53   		return 0;
54   	#else
55   		return 0;
56   	#endif
57   	}
58   	
59   	static void
60   	s1_connection_created_fn(qb_ipcs_connection_t * c)
61   	{
62   		struct qb_ipcs_stats srv_stats;
63   	
64   		qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
65   		qb_log(LOG_INFO, "Connection created (active:%d, closed:%d)",
66   		       srv_stats.active_connections, srv_stats.closed_connections);
67   	}
68   	
69   	static void
70   	s1_connection_destroyed_fn(qb_ipcs_connection_t * c)
71   	{
72   		qb_log(LOG_INFO, "Connection about to be freed");
73   	}
74   	
75   	static int32_t
76   	s1_connection_closed_fn(qb_ipcs_connection_t * c)
77   	{
78   		struct qb_ipcs_connection_stats stats;
79   		struct qb_ipcs_stats srv_stats;
80   	
81   		qb_ipcs_stats_get(s1, &srv_stats, QB_FALSE);
82   		qb_ipcs_connection_stats_get(c, &stats, QB_FALSE);
83   		qb_log(LOG_INFO,
84   		       "Connection to pid:%d destroyed (active:%d, closed:%d)",
85   		       stats.client_pid, srv_stats.active_connections,
86   		       srv_stats.closed_connections);
87   	
88   		qb_log(LOG_DEBUG, " Requests     %"PRIu64"", stats.requests);
89   		qb_log(LOG_DEBUG, " Responses    %"PRIu64"", stats.responses);
90   		qb_log(LOG_DEBUG, " Events       %"PRIu64"", stats.events);
91   		qb_log(LOG_DEBUG, " Send retries %"PRIu64"", stats.send_retries);
92   		qb_log(LOG_DEBUG, " Recv retries %"PRIu64"", stats.recv_retries);
93   		qb_log(LOG_DEBUG, " FC state     %d", stats.flow_control_state);
94   		qb_log(LOG_DEBUG, " FC count     %"PRIu64"", stats.flow_control_count);
95   		return 0;
96   	}
97   	
98   	struct my_req {
99   		struct qb_ipc_request_header hdr;
100  		char message[256];
101  	};
102  	
103  	static int32_t
104  	s1_msg_process_fn(qb_ipcs_connection_t * c, void *data, size_t size)
105  	{
106  		struct qb_ipc_request_header *hdr;
107  		struct my_req *req_pt;
108  		struct qb_ipc_response_header response;
109  		ssize_t res;
110  		struct iovec iov[2];
111  		char resp[100];
112  		int32_t sl;
113  		int32_t send_ten_events = QB_FALSE;
114  	
115  		hdr = (struct qb_ipc_request_header *)data;
(1) Event cond_false: Condition "hdr->id == 1 /* 0 + 1 */", taking false branch.
116  		if (hdr->id == (QB_IPC_MSG_USER_START + 1)) {
117  			return 0;
(2) Event if_end: End of if statement.
118  		}
119  	
120  		req_pt = (struct my_req *)data;
121  		qb_log(LOG_DEBUG, "msg received (id:%d, size:%d, data:%s)",
122  		       req_pt->hdr.id, req_pt->hdr.size, req_pt->message);
123  	
(3) Event cond_false: Condition "strcmp(req_pt->message, "kill") == 0", taking false branch.
124  		if (strcmp(req_pt->message, "kill") == 0) {
125  			exit(0);
(4) Event if_end: End of if statement.
126  		}
127  		response.size = sizeof(struct qb_ipc_response_header);
128  		response.id = 13;
129  		response.error = 0;
130  	
131  		sl = snprintf(resp, 100, "ACK %zu bytes", size) + 1;
132  		iov[0].iov_len = sizeof(response);
133  		iov[0].iov_base = (void*)&response;
134  		iov[1].iov_len = sl;
135  		iov[1].iov_base = resp;
136  		response.size += sl;
137  	
(5) Event cond_true: Condition "strcmp(req_pt->message, "events") == 0", taking true branch.
138  		send_ten_events = (strcmp(req_pt->message, "events") == 0);
139  	
(6) Event cond_true: Condition "use_events", taking true branch.
(7) Event cond_false: Condition "!send_ten_events", taking false branch.
140  		if (use_events && !send_ten_events) {
141  			res = qb_ipcs_event_sendv(c, iov, 2);
(8) Event else_branch: Reached else branch.
142  		} else {
143  			res = qb_ipcs_response_sendv(c, iov, 2);
144  		}
(9) Event cond_false: Condition "res < 0", taking false branch.
145  		if (res < 0) {
146  			errno = - res;
147  			qb_perror(LOG_ERR, "qb_ipcs_response_send");
(10) Event if_end: End of if statement.
148  		}
(11) Event cond_true: Condition "send_ten_events", taking true branch.
149  		if (send_ten_events) {
150  			int32_t i;
151  			qb_log(LOG_INFO, "request to send 10 events");
(12) Event cond_true: Condition "i < 10", taking true branch.
(14) Event loop_begin: Jumped back to beginning of loop.
(15) Event cond_true: Condition "i < 10", taking true branch.
(18) Event loop_begin: Jumped back to beginning of loop.
(19) Event cond_true: Condition "i < 10", taking true branch.
152  			for (i = 0; i < 10; i++) {
(16) Event freed_arg: "qb_ipcs_event_sendv" frees "c". [details]
(20) Event double_free: Calling "qb_ipcs_event_sendv" frees pointer "c" which has already been freed. [details]
153  				res = qb_ipcs_event_sendv(c, iov, 2);
154  				qb_log(LOG_INFO, "sent event %d res:%d", i, res);
(13) Event loop: Jumping back to the beginning of the loop.
(17) Event loop: Jumping back to the beginning of the loop.
155  			}
156  		}
157  		return 0;
158  	}
159  	
160  	static void
161  	sigusr1_handler(int32_t num)
162  	{
163  		qb_log(LOG_DEBUG, "(%d)", num);
164  		qb_ipcs_destroy(s1);
165  		exit(0);
166  	}
167  	
168  	static void
169  	show_usage(const char *name)
170  	{
171  		printf("usage: \n");
172  		printf("%s <options>\n", name);
173  		printf("\n");
174  		printf("  options:\n");
175  		printf("\n");
176  		printf("  -h             show this help text\n");
177  		printf("  -m             use shared memory\n");
178  		printf("  -u             use unix sockets\n");
179  		printf("  -g             use glib mainloop\n");
180  		printf("  -e             use events\n");
181  		printf("\n");
182  	}
183  	
184  	#ifdef HAVE_GLIB
185  	struct gio_to_qb_poll {
186  		int32_t is_used;
187  		int32_t events;
188  		int32_t source;
189  		int32_t fd;
190  		void *data;
191  		qb_ipcs_dispatch_fn_t fn;
192  		enum qb_loop_priority p;
193  	};
194  	
195  	static gboolean
196  	gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
197  	{
198  		struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
199  		gint fd = g_io_channel_unix_get_fd(gio);
200  	
201  		return (adaptor->fn(fd, condition, adaptor->data) == 0);
202  	}
203  	
204  	static void
205  	gio_poll_destroy(gpointer data)
206  	{
207  		struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
208  	
209  		adaptor->is_used--;
210  		if (adaptor->is_used == 0) {
211  			qb_log(LOG_DEBUG, "fd %d adaptor destroyed\n", adaptor->fd);
212  			adaptor->fd = 0;
213  			adaptor->source = 0;
214  		}
215  	}
216  	
217  	static int32_t
218  	my_g_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
219  			  void *data, qb_ipcs_dispatch_fn_t fn, gboolean is_new)
220  	{
221  		struct gio_to_qb_poll *adaptor;
222  		GIOChannel *channel;
223  		int32_t res = 0;
224  	
225  		res = qb_array_index(gio_map, fd, (void **)&adaptor);
226  		if (res < 0) {
227  			return res;
228  		}
229  		if (adaptor->is_used && adaptor->source) {
230  			if (is_new) {
231  				return -EEXIST;
232  			}
233  			g_source_remove(adaptor->source);
234  			adaptor->source = 0;
235  		}
236  	
237  		channel = g_io_channel_unix_new(fd);
238  		if (!channel) {
239  			return -ENOMEM;
240  		}
241  	
242  		adaptor->fn = fn;
243  		adaptor->events = evts;
244  		adaptor->data = data;
245  		adaptor->p = p;
246  		adaptor->is_used++;
247  		adaptor->fd = fd;
248  	
249  		adaptor->source = g_io_add_watch_full(channel, G_PRIORITY_DEFAULT, evts, gio_read_socket, adaptor, gio_poll_destroy);
250  	
251  		/* we are handing the channel off to be managed by mainloop now.
252  		 * remove our reference. */
253  		g_io_channel_unref(channel);
254  	
255  		return 0;
256  	}
257  	
258  	static int32_t
259  	my_g_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
260  			  void *data, qb_ipcs_dispatch_fn_t fn)
261  	{
262  		return my_g_dispatch_update(p, fd, evts, data, fn, TRUE);
263  	}
264  	
265  	static int32_t
266  	my_g_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
267  			  void *data, qb_ipcs_dispatch_fn_t fn)
268  	{
269  		return my_g_dispatch_update(p, fd, evts, data, fn, FALSE);
270  	}
271  	
272  	static int32_t
273  	my_g_dispatch_del(int32_t fd)
274  	{
275  		struct gio_to_qb_poll *adaptor;
276  		if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
277  			g_source_remove(adaptor->source);
278  			adaptor->source = 0;
279  		}
280  		return 0;
281  	}
282  	#endif /* HAVE_GLIB */
283  	
284  	static int32_t
285  	my_job_add(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn fn)
286  	{
287  		return qb_loop_job_add(bms_loop, p, data, fn);
288  	}
289  	
290  	static int32_t
291  	my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
292  			void *data, qb_ipcs_dispatch_fn_t fn)
293  	{
294  		return qb_loop_poll_add(bms_loop, p, fd, evts, data, fn);
295  	}
296  	
297  	static int32_t
298  	my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
299  			void *data, qb_ipcs_dispatch_fn_t fn)
300  	{
301  		return qb_loop_poll_mod(bms_loop, p, fd, evts, data, fn);
302  	}
303  	
304  	static int32_t
305  	my_dispatch_del(int32_t fd)
306  	{
307  		return qb_loop_poll_del(bms_loop, fd);
308  	}
309  	
310  	int32_t
311  	main(int32_t argc, char *argv[])
312  	{
313  		const char *options = "mpseugh";
314  		int32_t opt;
315  		int32_t rc;
316  		enum qb_ipc_type ipc_type = QB_IPC_NATIVE;
317  		struct qb_ipcs_service_handlers sh = {
318  			.connection_accept = s1_connection_accept_fn,
319  			.connection_created = s1_connection_created_fn,
320  			.msg_process = s1_msg_process_fn,
321  			.connection_destroyed = s1_connection_destroyed_fn,
322  			.connection_closed = s1_connection_closed_fn,
323  		};
324  		struct qb_ipcs_poll_handlers ph = {
325  			.job_add = my_job_add,
326  			.dispatch_add = my_dispatch_add,
327  			.dispatch_mod = my_dispatch_mod,
328  			.dispatch_del = my_dispatch_del,
329  		};
330  	#ifdef HAVE_GLIB
331  		struct qb_ipcs_poll_handlers glib_ph = {
332  			.job_add = NULL, /* FIXME */
333  			.dispatch_add = my_g_dispatch_add,
334  			.dispatch_mod = my_g_dispatch_mod,
335  			.dispatch_del = my_g_dispatch_del,
336  		};
337  	#endif /* HAVE_GLIB */
338  	
339  		while ((opt = getopt(argc, argv, options)) != -1) {
340  			switch (opt) {
341  			case 'm':
342  				ipc_type = QB_IPC_SHM;
343  				break;
344  			case 'u':
345  				ipc_type = QB_IPC_SOCKET;
346  				break;
347  			case 'g':
348  				use_glib = QB_TRUE;
349  				break;
350  			case 'e':
351  				use_events = QB_TRUE;
352  				break;
353  			case 'h':
354  			default:
355  				show_usage(argv[0]);
356  				exit(0);
357  				break;
358  			}
359  		}
360  		signal(SIGINT, sigusr1_handler);
361  	
362  		qb_log_init("ipcserver", LOG_USER, LOG_TRACE);
363  		qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
364  				  QB_LOG_FILTER_FILE, "*", LOG_TRACE);
365  		qb_log_format_set(QB_LOG_STDERR, "%f:%l [%p] %b");
366  		qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
367  	
368  		s1 = qb_ipcs_create("ipcserver", 0, ipc_type, &sh);
369  		if (s1 == 0) {
370  			qb_perror(LOG_ERR, "qb_ipcs_create");
371  			exit(1);
372  		}
373  		/* This forces the clients to use a minimum buffer size */
374  		qb_ipcs_enforce_buffer_size(s1, ONE_MEG);
375  	
376  		if (!use_glib) {
377  			bms_loop = qb_loop_create();
378  			qb_ipcs_poll_handlers_set(s1, &ph);
379  			rc = qb_ipcs_run(s1);
380  			if (rc != 0) {
381  				errno = -rc;
382  				qb_perror(LOG_ERR, "qb_ipcs_run");
383  				exit(1);
384  			}
385  			qb_loop_run(bms_loop);
386  		} else {
387  	#ifdef HAVE_GLIB
388  			glib_loop = g_main_loop_new(NULL, FALSE);
389  			gio_map = qb_array_create_2(16, sizeof(struct gio_to_qb_poll), 1);
390  			qb_ipcs_poll_handlers_set(s1, &glib_ph);
391  			rc = qb_ipcs_run(s1);
392  			if (rc != 0) {
393  				errno = -rc;
394  				qb_perror(LOG_ERR, "qb_ipcs_run");
395  				exit(1);
396  			}
397  			g_main_loop_run(glib_loop);
398  	#else
399  			qb_log(LOG_ERR,
400  			       "You don't seem to have glib-devel installed.\n");
401  	#endif
402  		}
403  		qb_log_fini();
404  		return EXIT_SUCCESS;
405  	}
406