1    	/*
2    	 * Copyright (C) 2010 Red Hat, Inc.
3    	 *
4    	 * Author: Angus Salkeld <asalkeld@redhat.com>
5    	 *
6    	 * This file is part of libqb.
7    	 *
8    	 * libqb is free software: you can redistribute it and/or modify
9    	 * it under the terms of the GNU Lesser General Public License as published by
10   	 * the Free Software Foundation, either version 2.1 of the License, or
11   	 * (at your option) any later version.
12   	 *
13   	 * libqb is distributed in the hope that it will be useful,
14   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   	 * GNU Lesser General Public License for more details.
17   	 *
18   	 * You should have received a copy of the GNU Lesser General Public License
19   	 * along with libqb.  If not, see <http://www.gnu.org/licenses/>.
20   	 */
21   	#include "os_base.h"
22   	#include <poll.h>
23   	
24   	#include "util_int.h"
25   	#include "ipc_int.h"
26   	#include <qb/qbdefs.h>
27   	#include <qb/qbatomic.h>
28   	#include <qb/qbipcs.h>
29   	
30   	static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c,
31   					    int32_t fc_enable);
32   	static int32_t
33   	new_event_notification(struct qb_ipcs_connection * c);
34   	
35   	
36   	qb_ipcs_service_t *
37   	qb_ipcs_create(const char *name,
38   		       int32_t service_id,
39   		       enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers)
40   	{
41   		struct qb_ipcs_service *s;
42   	
43   		s = calloc(1, sizeof(struct qb_ipcs_service));
44   		if (s == NULL) {
45   			return NULL;
46   		}
47   		if (type == QB_IPC_NATIVE) {
48   	#ifdef DISABLE_IPC_SHM
49   			s->type = QB_IPC_SOCKET;
50   	#else
51   			s->type = QB_IPC_SHM;
52   	#endif /* DISABLE_IPC_SHM */
53   		} else {
54   			s->type = type;
55   		}
56   	
57   		s->pid = getpid();
58   		s->needs_sock_for_poll = QB_FALSE;
59   		s->poll_priority = QB_LOOP_MED;
60   	
61   		/* Initial alloc ref */
62   		qb_ipcs_ref(s);
63   	
64   		s->service_id = service_id;
65   		(void)strlcpy(s->name, name, NAME_MAX);
66   	
67   		s->serv_fns.connection_accept = handlers->connection_accept;
68   		s->serv_fns.connection_created = handlers->connection_created;
69   		s->serv_fns.msg_process = handlers->msg_process;
70   		s->serv_fns.connection_closed = handlers->connection_closed;
71   		s->serv_fns.connection_destroyed = handlers->connection_destroyed;
72   	
73   		qb_list_init(&s->connections);
74   	
75   		return s;
76   	}
77   	
78   	void
79   	qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s,
80   				  struct qb_ipcs_poll_handlers *handlers)
81   	{
82   		s->poll_fns.job_add = handlers->job_add;
83   		s->poll_fns.dispatch_add = handlers->dispatch_add;
84   		s->poll_fns.dispatch_mod = handlers->dispatch_mod;
85   		s->poll_fns.dispatch_del = handlers->dispatch_del;
86   	}
87   	
88   	void
89   	qb_ipcs_service_context_set(qb_ipcs_service_t* s,
90   				    void *context)
91   	{
92   		s->context = context;
93   	}
94   	
95   	void *
96   	qb_ipcs_service_context_get(qb_ipcs_service_t* s)
97   	{
98   		return s->context;
99   	}
100  	
101  	int32_t
102  	qb_ipcs_run(struct qb_ipcs_service *s)
103  	{
104  		int32_t res = 0;
105  	
106  		if (s->poll_fns.dispatch_add == NULL ||
107  		    s->poll_fns.dispatch_mod == NULL ||
108  		    s->poll_fns.dispatch_del == NULL) {
109  	
110  			res = -EINVAL;
111  			goto run_cleanup;
112  		}
113  	
114  		switch (s->type) {
115  		case QB_IPC_SOCKET:
116  			qb_ipcs_us_init((struct qb_ipcs_service *)s);
117  			break;
118  		case QB_IPC_SHM:
119  	#ifdef DISABLE_IPC_SHM
120  			res = -ENOTSUP;
121  	#else
122  			qb_ipcs_shm_init((struct qb_ipcs_service *)s);
123  	#endif /* DISABLE_IPC_SHM */
124  			break;
125  		case QB_IPC_POSIX_MQ:
126  		case QB_IPC_SYSV_MQ:
127  			res = -ENOTSUP;
128  			break;
129  		default:
130  			res = -EINVAL;
131  			break;
132  		}
133  	
134  		if (res == 0) {
135  			res = qb_ipcs_us_publish(s);
136  			if (res < 0) {
137  				(void)qb_ipcs_us_withdraw(s);
138  				goto run_cleanup;
139  			}
140  		}
141  	
142  	run_cleanup:
143  		if (res < 0) {
144  			/* Failed to run services, removing initial alloc reference. */
145  			qb_ipcs_unref(s);
146  		}
147  	
148  		return res;
149  	}
150  	
151  	static int32_t
152  	_modify_dispatch_descriptor_(struct qb_ipcs_connection *c)
153  	{
154  		qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod;
155  	
156  		if (c->service->type == QB_IPC_SOCKET) {
157  			return disp_mod(c->service->poll_priority,
158  					c->event.u.us.sock,
159  					c->poll_events, c,
160  					qb_ipcs_dispatch_connection_request);
161  		} else {
162  			return disp_mod(c->service->poll_priority,
163  					c->setup.u.us.sock,
164  					c->poll_events, c,
165  					qb_ipcs_dispatch_connection_request);
166  		}
167  		return -EINVAL;
168  	}
169  	
170  	void
171  	qb_ipcs_request_rate_limit(struct qb_ipcs_service *s,
172  				   enum qb_ipcs_rate_limit rl)
173  	{
174  		struct qb_ipcs_connection *c;
175  		enum qb_loop_priority old_p = s->poll_priority;
176  		struct qb_list_head *pos;
177  		struct qb_list_head *n;
178  	
179  		switch (rl) {
180  		case QB_IPCS_RATE_FAST:
181  			s->poll_priority = QB_LOOP_HIGH;
182  			break;
183  		case QB_IPCS_RATE_SLOW:
184  		case QB_IPCS_RATE_OFF:
185  		case QB_IPCS_RATE_OFF_2:
186  			s->poll_priority = QB_LOOP_LOW;
187  			break;
188  		default:
189  		case QB_IPCS_RATE_NORMAL:
190  			s->poll_priority = QB_LOOP_MED;
191  			break;
192  		}
193  	
194  		qb_list_for_each_safe(pos, n, &s->connections) {
195  	
196  			c = qb_list_entry(pos, struct qb_ipcs_connection, list);
197  			qb_ipcs_connection_ref(c);
198  	
199  			if (rl == QB_IPCS_RATE_OFF) {
200  				qb_ipcs_flowcontrol_set(c, 1);
201  			} else if (rl == QB_IPCS_RATE_OFF_2) {
202  				qb_ipcs_flowcontrol_set(c, 2);
203  			} else {
204  				qb_ipcs_flowcontrol_set(c, QB_FALSE);
205  			}
206  			if (old_p != s->poll_priority) {
207  				(void)_modify_dispatch_descriptor_(c);
208  			}
209  			qb_ipcs_connection_unref(c);
210  		}
211  	}
212  	
213  	void
214  	qb_ipcs_ref(struct qb_ipcs_service *s)
215  	{
216  		qb_atomic_int_inc(&s->ref_count);
217  	}
218  	
219  	void
220  	qb_ipcs_unref(struct qb_ipcs_service *s)
221  	{
222  		int32_t free_it;
223  	
224  		assert(s->ref_count > 0);
225  		free_it = qb_atomic_int_dec_and_test(&s->ref_count);
226  		if (free_it) {
227  			qb_util_log(LOG_DEBUG, "%s() - destroying", __func__);
228  			free(s);
229  		}
230  	}
231  	
232  	void
233  	qb_ipcs_destroy(struct qb_ipcs_service *s)
234  	{
235  		struct qb_ipcs_connection *c = NULL;
236  		struct qb_list_head *pos;
237  		struct qb_list_head *n;
238  	
239  		if (s == NULL) {
240  			return;
241  		}
242  		qb_list_for_each_safe(pos, n, &s->connections) {
243  			c = qb_list_entry(pos, struct qb_ipcs_connection, list);
244  			if (c == NULL) {
245  				continue;
246  			}
247  			qb_ipcs_disconnect(c);
248  		}
249  		(void)qb_ipcs_us_withdraw(s);
250  	
251  		/* service destroyed, remove initial alloc ref */
252  		qb_ipcs_unref(s);
253  	}
254  	
255  	/*
256  	 * connection API
257  	 */
258  	static struct qb_ipc_one_way *
259  	_event_sock_one_way_get(struct qb_ipcs_connection * c)
260  	{
261  		if (c->service->needs_sock_for_poll) {
262  			return &c->setup;
263  		}
264  		if (c->event.type == QB_IPC_SOCKET) {
265  			return &c->event;
266  		}
267  		return NULL;
268  	}
269  	
270  	static struct qb_ipc_one_way *
271  	_response_sock_one_way_get(struct qb_ipcs_connection * c)
272  	{
273  		if (c->service->needs_sock_for_poll) {
274  			return &c->setup;
275  		}
276  		if (c->response.type == QB_IPC_SOCKET) {
277  			return &c->response;
278  		}
279  		return NULL;
280  	}
281  	
282  	ssize_t
283  	qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
284  			      size_t size)
285  	{
286  		ssize_t res;
287  	
288  		if (c == NULL) {
289  			return -EINVAL;
290  		}
291  		qb_ipcs_connection_ref(c);
292  		res = c->service->funcs.send(&c->response, data, size);
293  		if (res == size) {
294  			c->stats.responses++;
295  		} else if (res == -EAGAIN || res == -ETIMEDOUT) {
296  			struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
297  			if (ow) {
298  				ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
299  				if (res2 < 0) {
300  					res = res2;
301  				}
302  			}
303  			c->stats.send_retries++;
304  		}
305  		qb_ipcs_connection_unref(c);
306  	
307  		return res;
308  	}
309  	
310  	ssize_t
311  	qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * iov,
312  			       size_t iov_len)
313  	{
314  		ssize_t res;
315  	
316  		if (c == NULL) {
317  			return -EINVAL;
318  		}
319  		qb_ipcs_connection_ref(c);
320  		res = c->service->funcs.sendv(&c->response, iov, iov_len);
321  		if (res > 0) {
322  			c->stats.responses++;
323  		} else if (res == -EAGAIN || res == -ETIMEDOUT) {
324  			struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
325  			if (ow) {
326  				ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
327  				if (res2 < 0) {
328  					res = res2;
329  				}
330  			}
331  			c->stats.send_retries++;
332  		}
333  		qb_ipcs_connection_unref(c);
334  	
335  		return res;
336  	}
337  	
338  	static int32_t
339  	resend_event_notifications(struct qb_ipcs_connection *c)
340  	{
341  		ssize_t res = 0;
342  	
343  		if (!c->service->needs_sock_for_poll) {
344  			return res;
345  		}
346  	
347  		if (c->outstanding_notifiers > 0) {
348  			res = qb_ipc_us_send(&c->setup, c->receive_buf,
349  					     c->outstanding_notifiers);
350  		}
351  		if (res > 0) {
352  			c->outstanding_notifiers -= res;
353  		}
354  	
355  		assert(c->outstanding_notifiers >= 0);
356  		if (c->outstanding_notifiers == 0) {
357  			c->poll_events = POLLIN | POLLPRI | POLLNVAL;
358  			(void)_modify_dispatch_descriptor_(c);
359  		}
360  		return res;
361  	}
362  	
363  	static int32_t
364  	new_event_notification(struct qb_ipcs_connection * c)
365  	{
366  		ssize_t res = 0;
367  	
368  		if (!c->service->needs_sock_for_poll) {
369  			return res;
370  		}
371  	
372  		assert(c->outstanding_notifiers >= 0);
373  		if (c->outstanding_notifiers > 0) {
374  			c->outstanding_notifiers++;
375  			res = resend_event_notifications(c);
376  		} else {
377  			res = qb_ipc_us_send(&c->setup, &c->outstanding_notifiers, 1);
378  			if (res == -EAGAIN) {
379  				/*
380  				 * notify the client later, when we can.
381  				 */
382  				c->outstanding_notifiers++;
383  				c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNVAL;
384  				(void)_modify_dispatch_descriptor_(c);
385  			}
386  		}
387  		return res;
388  	}
389  	
390  	ssize_t
391  	qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size)
392  	{
393  		ssize_t res;
394  		ssize_t resn;
395  	
396  		if (c == NULL) {
397  			return -EINVAL;
398  		} else if (size > c->event.max_msg_size) {
399  			return -EMSGSIZE;
400  		}
401  	
402  		qb_ipcs_connection_ref(c);
403  		res = c->service->funcs.send(&c->event, data, size);
404  		if (res == size) {
405  			c->stats.events++;
406  			resn = new_event_notification(c);
407  			if (resn < 0 && resn != -EAGAIN && resn != -ENOBUFS) {
408  				errno = -resn;
409  				qb_util_perror(LOG_DEBUG,
410  					       "new_event_notification (%s)",
411  					       c->description);
412  				res = resn;
413  			}
414  		} else if (res == -EAGAIN || res == -ETIMEDOUT) {
415  			struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
416  	
417  			if (c->outstanding_notifiers > 0) {
CID (unavailable; MK=2974ff1a6b4c3b62d11cbc7947603490) (#1 of 1): Unused value (UNUSED_VALUE):
(2) Event returned_value: Assigning value from "resend_event_notifications(c)" to "resn" here, but that stored value is overwritten before it can be used.
Also see events: [value_overwrite]
418  				resn = resend_event_notifications(c);
419  			}
420  			if (ow) {
(1) Event value_overwrite: Overwriting previous write to "resn" with value from "qb_ipc_us_ready(ow, &c->setup, 0, 4)".
Also see events: [returned_value]
421  				resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
422  				if (resn < 0) {
423  					res = resn;
424  				}
425  			}
426  			c->stats.send_retries++;
427  		}
428  	
429  		qb_ipcs_connection_unref(c);
430  		return res;
431  	}
432  	
433  	ssize_t
434  	qb_ipcs_event_sendv(struct qb_ipcs_connection * c,
435  			    const struct iovec * iov, size_t iov_len)
436  	{
437  		ssize_t res;
438  		ssize_t resn;
439  	
440  		if (c == NULL) {
441  			return -EINVAL;
442  		}
443  		qb_ipcs_connection_ref(c);
444  	
445  		res = c->service->funcs.sendv(&c->event, iov, iov_len);
446  		if (res > 0) {
447  			c->stats.events++;
448  			resn = new_event_notification(c);
449  			if (resn < 0 && resn != -EAGAIN) {
450  				errno = -resn;
451  				qb_util_perror(LOG_DEBUG,
452  					       "new_event_notification (%s)",
453  					       c->description);
454  				res = resn;
455  			}
456  		} else if (res == -EAGAIN || res == -ETIMEDOUT) {
457  			struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
458  	
459  			if (c->outstanding_notifiers > 0) {
460  				resn = resend_event_notifications(c);
461  			}
462  			if (ow) {
463  				resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
464  				if (resn < 0) {
465  					res = resn;
466  				}
467  			}
468  			c->stats.send_retries++;
469  		}
470  	
471  		qb_ipcs_connection_unref(c);
472  		return res;
473  	}
474  	
475  	qb_ipcs_connection_t *
476  	qb_ipcs_connection_first_get(struct qb_ipcs_service * s)
477  	{
478  		struct qb_ipcs_connection *c;
479  	
480  		if (qb_list_empty(&s->connections)) {
481  			return NULL;
482  		}
483  	
484  		c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection,
485  					list);
486  		qb_ipcs_connection_ref(c);
487  	
488  		return c;
489  	}
490  	
491  	qb_ipcs_connection_t *
492  	qb_ipcs_connection_next_get(struct qb_ipcs_service * s,
493  				    struct qb_ipcs_connection * current)
494  	{
495  		struct qb_ipcs_connection *c;
496  	
497  		if (current == NULL ||
498  		    qb_list_is_last(&current->list, &s->connections)) {
499  			return NULL;
500  		}
501  	
502  		c = qb_list_first_entry(&current->list, struct qb_ipcs_connection,
503  					list);
504  		qb_ipcs_connection_ref(c);
505  	
506  		return c;
507  	}
508  	
509  	int32_t
510  	qb_ipcs_service_id_get(struct qb_ipcs_connection * c)
511  	{
512  		if (c == NULL) {
513  			return -EINVAL;
514  		}
515  		return c->service->service_id;
516  	}
517  	
518  	struct qb_ipcs_connection *
519  	qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
520  	{
521  		struct qb_ipcs_connection *c =
522  		    calloc(1, sizeof(struct qb_ipcs_connection));
523  	
524  		if (c == NULL) {
525  			return NULL;
526  		}
527  	
528  		c->pid = 0;
529  		c->euid = -1;
530  		c->egid = -1;
531  		c->receive_buf = NULL;
532  		c->context = NULL;
533  		c->fc_enabled = QB_FALSE;
534  		c->state = QB_IPCS_CONNECTION_INACTIVE;
535  		c->poll_events = POLLIN | POLLPRI | POLLNVAL;
536  	
537  		c->setup.type = s->type;
538  		c->request.type = s->type;
539  		c->response.type = s->type;
540  		c->event.type = s->type;
541  		(void)strlcpy(c->description, "not set yet", CONNECTION_DESCRIPTION);
542  	
543  		/* initial alloc ref */
544  		qb_ipcs_connection_ref(c);
545  	
546  		/*
547  		 * The connection makes use of the service object. Give the connection
548  		 * a reference to the service so we know the service can never be destroyed
549  		 * until the connection is done with it.
550  		 */
551  		qb_ipcs_ref(s);
552  		c->service = s;
553  		qb_list_init(&c->list);
554  	
555  		return c;
556  	}
557  	
558  	void
559  	qb_ipcs_connection_ref(struct qb_ipcs_connection *c)
560  	{
561  		if (c) {
562  			qb_atomic_int_inc(&c->refcount);
563  		}
564  	}
565  	
566  	void
567  	qb_ipcs_connection_unref(struct qb_ipcs_connection *c)
568  	{
569  		int32_t free_it;
570  	
571  		if (c == NULL) {
572  			return;
573  		}
574  		if (c->refcount < 1) {
575  			qb_util_log(LOG_ERR, "ref:%d state:%d (%s)",
576  				    c->refcount, c->state, c->description);
577  			assert(0);
578  		}
579  		free_it = qb_atomic_int_dec_and_test(&c->refcount);
580  		if (free_it) {
581  			qb_list_del(&c->list);
582  			if (c->service->serv_fns.connection_destroyed) {
583  				c->service->serv_fns.connection_destroyed(c);
584  			}
585  			c->service->funcs.disconnect(c);
586  			/* Let go of the connection's reference to the service */
587  			qb_ipcs_unref(c->service);
588  			free(c->receive_buf);
589  			free(c);
590  		}
591  	}
592  	
593  	void
594  	qb_ipcs_disconnect(struct qb_ipcs_connection *c)
595  	{
596  		int32_t res = 0;
597  		qb_loop_job_dispatch_fn rerun_job;
598  	
599  		if (c == NULL) {
600  			return;
601  		}
602  		qb_util_log(LOG_DEBUG, "%s(%s) state:%d",
603  			    __func__, c->description, c->state);
604  	
605  		if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
606  			c->service->funcs.disconnect(c);
607  			c->state = QB_IPCS_CONNECTION_INACTIVE;
608  			c->service->stats.closed_connections++;
609  	
610  			/* This removes the initial alloc ref */
611  			qb_ipcs_connection_unref(c);
612  	
613  			/* return early as it's an incomplete connection.
614  			 */
615  			return;
616  		}
617  		if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) {
618  			c->service->funcs.disconnect(c);
619  			c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN;
620  			c->service->stats.active_connections--;
621  			c->service->stats.closed_connections++;
622  		}
623  		if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) {
624  			int scheduled_retry = 0;
625  			res = 0;
626  			if (c->service->serv_fns.connection_closed) {
627  				res = c->service->serv_fns.connection_closed(c);
628  			}
629  			if (res != 0) {
630  				/* OK, so they want the connection_closed
631  				 * function re-run */
632  				rerun_job =
633  				    (qb_loop_job_dispatch_fn) qb_ipcs_disconnect;
634  				res = c->service->poll_fns.job_add(QB_LOOP_LOW,
635  								   c, rerun_job);
636  				if (res == 0) {
637  					/* this function is going to be called again.
638  					 * so hold off on the unref */
639  					scheduled_retry = 1;
640  				}
641  			}
642  			remove_tempdir(c->description);
643  			if (scheduled_retry == 0) {
644  				/* This removes the initial alloc ref */
645  				qb_ipcs_connection_unref(c);
646  			}
647  		}
648  	
649  	}
650  	
651  	static void
652  	qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable)
653  	{
654  		if (c == NULL) {
655  			return;
656  		}
657  		if (c->fc_enabled != fc_enable) {
658  			c->service->funcs.fc_set(&c->request, fc_enable);
659  			c->fc_enabled = fc_enable;
660  			c->stats.flow_control_state = fc_enable;
661  			c->stats.flow_control_count++;
662  		}
663  	}
664  	
665  	static int32_t
666  	_process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout)
667  	{
668  		int32_t res = 0;
669  		ssize_t size;
670  		struct qb_ipc_request_header *hdr;
671  	
672  		if (c->service->funcs.peek && c->service->funcs.reclaim) {
673  			size = c->service->funcs.peek(&c->request, (void **)&hdr,
674  						      ms_timeout);
675  		} else {
676  			hdr = c->receive_buf;
677  			size = c->service->funcs.recv(&c->request,
678  						      hdr,
679  						      c->request.max_msg_size,
680  						      ms_timeout);
681  		}
682  		if (size < 0) {
683  			if (size != -EAGAIN && size != -ETIMEDOUT) {
684  				qb_util_perror(LOG_DEBUG,
685  					       "recv from client connection failed (%s)",
686  					       c->description);
687  			} else {
688  				c->stats.recv_retries++;
689  			}
690  			res = size;
691  			goto cleanup;
692  		} else if (size == 0 || hdr->id == QB_IPC_MSG_DISCONNECT) {
693  			qb_util_log(LOG_DEBUG, "client requesting a disconnect (%s)",
694  				    c->description);
695  			res = -ESHUTDOWN;
696  			goto cleanup;
697  		} else {
698  			/* Validate message size to prevent integer overflow attacks */
699  			if (hdr->size <= 0 || hdr->size > c->request.max_msg_size) {
700  				qb_util_log(LOG_WARNING,
701  					    "invalid message size %d (max: %zu) from client %s",
702  					    hdr->size, c->request.max_msg_size, c->description);
703  				res = -EINVAL;
704  				goto cleanup;
705  			}
706  			c->stats.requests++;
707  			res = c->service->serv_fns.msg_process(c, hdr, hdr->size);
708  			/* 0 == good, negative == backoff */
709  			if (res < 0) {
710  				res = -ENOBUFS;
711  			} else {
712  				res = size;
713  			}
714  		}
715  	
716  		if (c->service->funcs.peek && c->service->funcs.reclaim) {
717  			c->service->funcs.reclaim(&c->request);
718  		}
719  	
720  	cleanup:
721  		return res;
722  	}
723  	
724  	#define IPC_REQUEST_TIMEOUT 10
725  	#define MAX_RECV_MSGS 50
726  	
727  	static ssize_t
728  	_request_q_len_get(struct qb_ipcs_connection *c)
729  	{
730  		ssize_t q_len;
731  		if (c->service->funcs.q_len_get) {
732  			q_len = c->service->funcs.q_len_get(&c->request);
733  			if (q_len <= 0) {
734  				return q_len;
735  			}
736  			if (c->service->poll_priority == QB_LOOP_MED) {
737  				q_len = QB_MIN(q_len, 5);
738  			} else if (c->service->poll_priority == QB_LOOP_LOW) {
739  				q_len = 1;
740  			} else {
741  				q_len = QB_MIN(q_len, MAX_RECV_MSGS);
742  			}
743  		} else {
744  			q_len = 1;
745  		}
746  		return q_len;
747  	}
748  	
749  	int32_t
750  	qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data)
751  	{
752  		struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
753  		char bytes[MAX_RECV_MSGS];
754  		int32_t res = 0;
755  		int32_t res2;
756  		int32_t recvd = 0;
757  		ssize_t avail;
758  	
759  		if (c == NULL) {
760  			res = -EINVAL;
761  			goto dispatch_cleanup;
762  		}
763  	
764  		if (revents & POLLNVAL) {
765  			qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
766  			res = -EINVAL;
767  			goto dispatch_cleanup;
768  		}
769  		if (revents & POLLHUP) {
770  			qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
771  			res = -ESHUTDOWN;
772  			goto dispatch_cleanup;
773  		}
774  	
775  		if (revents & POLLOUT) {
776  			/* try resend events now that fd can write */
777  			res = resend_event_notifications(c);
778  			if (res < 0 && res != -EAGAIN) {
779  				errno = -res;
780  				qb_util_perror(LOG_WARNING,
781  					       "resend_event_notifications (%s)",
782  					       c->description);
783  			}
784  			/* nothing to read */
785  			if ((revents & POLLIN) == 0) {
786  				res = 0;
787  				goto dispatch_cleanup;
788  			}
789  		}
790  		if (c->fc_enabled) {
791  			res = 0;
792  			goto dispatch_cleanup;
793  		}
794  		avail = _request_q_len_get(c);
795  	
796  		if (c->service->needs_sock_for_poll && avail == 0) {
797  			res2 = qb_ipc_us_recv(&c->setup, bytes, 1, 0);
798  			if (qb_ipc_us_sock_error_is_disconnected(res2)) {
799  				errno = -res2;
800  				qb_util_perror(LOG_WARNING, "conn (%s) disconnected",
801  					       c->description);
802  				res = -ESHUTDOWN;
803  				goto dispatch_cleanup;
804  			} else {
805  				qb_util_log(LOG_WARNING,
806  					    "conn (%s) Nothing in q but got POLLIN on fd:%d (res2:%d)",
807  					    c->description, fd, res2);
808  				res = 0;
809  				goto dispatch_cleanup;
810  			}
811  		}
812  	
813  		do {
814  			res = _process_request_(c, IPC_REQUEST_TIMEOUT);
815  	
816  			if (res == -ESHUTDOWN) {
817  				goto dispatch_cleanup;
818  			}
819  	
820  			if (res > 0 || res == -ENOBUFS || res == -EINVAL) {
821  				recvd++;
822  			}
823  			if (res > 0) {
824  				avail--;
825  			}
826  		} while (avail > 0 && res > 0 && !c->fc_enabled);
827  	
828  		if (c->service->needs_sock_for_poll && recvd > 0) {
829  			res2 = qb_ipc_us_recv(&c->setup, bytes, recvd, -1);
830  			if (qb_ipc_us_sock_error_is_disconnected(res2)) {
831  				errno = -res2;
832  				qb_util_perror(LOG_ERR, "error receiving from setup sock (%s)", c->description);
833  	
834  				res = -ESHUTDOWN;
835  				goto dispatch_cleanup;
836  			}
837  		}
838  	
839  		res = QB_MIN(0, res);
840  		if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) {
841  			res = 0;
842  		}
843  		if (res != 0) {
844  			if (res != -ENOTCONN) {
845  				/*
846  				 * Abnormal state (ENOTCONN is normal shutdown).
847  				 */
848  				errno = -res;
849  				qb_util_perror(LOG_ERR, "request returned error (%s)",
850  					       c->description);
851  			}
852  		}
853  	
854  	dispatch_cleanup:
855  		if (res != 0) {
856  			qb_ipcs_disconnect(c);
857  		}
858  		return res;
859  	}
860  	
861  	void
862  	qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context)
863  	{
864  		if (c == NULL) {
865  			return;
866  		}
867  		c->context = context;
868  	}
869  	
870  	void *
871  	qb_ipcs_context_get(struct qb_ipcs_connection *c)
872  	{
873  		if (c == NULL) {
874  			return NULL;
875  		}
876  		return c->context;
877  	}
878  	
879  	void *
880  	qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c)
881  	{
882  		if (c == NULL || c->service == NULL) {
883  			return NULL;
884  		}
885  		return c->service->context;
886  	}
887  	
888  	int32_t
889  	qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c,
890  				     struct qb_ipcs_connection_stats * stats,
891  				     int32_t clear_after_read)
892  	{
893  		if (c == NULL) {
894  			return -EINVAL;
895  		}
896  		memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats));
897  		if (clear_after_read) {
898  			memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
899  			c->stats.client_pid = c->pid;
900  		}
901  		return 0;
902  	}
903  	
904  	struct qb_ipcs_connection_stats_2*
905  	qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c,
906  				       int32_t clear_after_read)
907  	{
908  		struct qb_ipcs_connection_stats_2 * stats;
909  	
910  		if (c == NULL) {
911  			errno = EINVAL;
912  			return NULL;
913  		}
914  		stats = calloc(1, sizeof(struct qb_ipcs_connection_stats_2));
915  		if (stats == NULL) {
916  			return NULL;
917  		}
918  	
919  		memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats_2));
920  	
921  		if (c->service->funcs.q_len_get) {
922  			stats->event_q_length = c->service->funcs.q_len_get(&c->event);
923  		} else {
924  			stats->event_q_length = 0;
925  		}
926  		if (clear_after_read) {
927  			memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
928  			c->stats.client_pid = c->pid;
929  		}
930  		return stats;
931  	}
932  	
933  	int32_t
934  	qb_ipcs_stats_get(struct qb_ipcs_service * s,
935  			  struct qb_ipcs_stats * stats, int32_t clear_after_read)
936  	{
937  		if (s == NULL) {
938  			return -EINVAL;
939  		}
940  		memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats));
941  		if (clear_after_read) {
942  			memset(&s->stats, 0, sizeof(struct qb_ipcs_stats));
943  		}
944  		return 0;
945  	}
946  	
947  	void
948  	qb_ipcs_connection_auth_set(qb_ipcs_connection_t *c, uid_t uid,
949  				    gid_t gid, mode_t mode)
950  	{
951  		if (c) {
952  			c->auth.uid = uid;
953  			c->auth.gid = gid;
954  			c->auth.mode = mode;
955  		}
956  	}
957  	
958  	int32_t
959  	qb_ipcs_connection_get_buffer_size(qb_ipcs_connection_t *c)
960  	{
961  		if (c == NULL) {
962  			return -EINVAL;
963  		}
964  	
965  		/* request, response, and event shoud all have the same
966  		 * buffer size allocated. It doesn't matter which we return
967  		 * here. */
968  		return c->response.max_msg_size;
969  	}
970  	
971  	void qb_ipcs_enforce_buffer_size(qb_ipcs_service_t *s, uint32_t buf_size)
972  	{
973  		if (s == NULL) {
974  			return;
975  		}
976  		s->max_buffer_size = buf_size;
977  	}
978