1    	/*
2    	 * Copyright (C) 2010 Red Hat, Inc.
3    	 *
4    	 * Author: Angus Salkeld <asalkeld@redhat.com>
5    	 *
6    	 * This file is part of libqb.
7    	 *
8    	 * libqb is free software: you can redistribute it and/or modify
9    	 * it under the terms of the GNU Lesser General Public License as published by
10   	 * the Free Software Foundation, either version 2.1 of the License, or
11   	 * (at your option) any later version.
12   	 *
13   	 * libqb is distributed in the hope that it will be useful,
14   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   	 * GNU Lesser General Public License for more details.
17   	 *
18   	 * You should have received a copy of the GNU Lesser General Public License
19   	 * along with libqb.  If not, see <http://www.gnu.org/licenses/>.
20   	 */
21   	#include "os_base.h"
22   	#include <poll.h>
23   	
24   	#include "util_int.h"
25   	#include "ipc_int.h"
26   	#include <qb/qbdefs.h>
27   	#include <qb/qbatomic.h>
28   	#include <qb/qbipcs.h>
29   	
30   	static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c,
31   					    int32_t fc_enable);
32   	static int32_t
33   	new_event_notification(struct qb_ipcs_connection * c);
34   	
35   	
36   	qb_ipcs_service_t *
37   	qb_ipcs_create(const char *name,
38   		       int32_t service_id,
39   		       enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers)
40   	{
41   		struct qb_ipcs_service *s;
42   	
43   		s = calloc(1, sizeof(struct qb_ipcs_service));
44   		if (s == NULL) {
45   			return NULL;
46   		}
47   		if (type == QB_IPC_NATIVE) {
48   	#ifdef DISABLE_IPC_SHM
49   			s->type = QB_IPC_SOCKET;
50   	#else
51   			s->type = QB_IPC_SHM;
52   	#endif /* DISABLE_IPC_SHM */
53   		} else {
54   			s->type = type;
55   		}
56   	
57   		s->pid = getpid();
58   		s->needs_sock_for_poll = QB_FALSE;
59   		s->poll_priority = QB_LOOP_MED;
60   	
61   		/* Initial alloc ref */
62   		qb_ipcs_ref(s);
63   	
64   		s->service_id = service_id;
65   		(void)strlcpy(s->name, name, NAME_MAX);
66   	
67   		s->serv_fns.connection_accept = handlers->connection_accept;
68   		s->serv_fns.connection_created = handlers->connection_created;
69   		s->serv_fns.msg_process = handlers->msg_process;
70   		s->serv_fns.connection_closed = handlers->connection_closed;
71   		s->serv_fns.connection_destroyed = handlers->connection_destroyed;
72   	
73   		qb_list_init(&s->connections);
74   	
75   		return s;
76   	}
77   	
78   	void
79   	qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s,
80   				  struct qb_ipcs_poll_handlers *handlers)
81   	{
82   		s->poll_fns.job_add = handlers->job_add;
83   		s->poll_fns.dispatch_add = handlers->dispatch_add;
84   		s->poll_fns.dispatch_mod = handlers->dispatch_mod;
85   		s->poll_fns.dispatch_del = handlers->dispatch_del;
86   	}
87   	
88   	void
89   	qb_ipcs_service_context_set(qb_ipcs_service_t* s,
90   				    void *context)
91   	{
92   		s->context = context;
93   	}
94   	
95   	void *
96   	qb_ipcs_service_context_get(qb_ipcs_service_t* s)
97   	{
98   		return s->context;
99   	}
100  	
101  	int32_t
102  	qb_ipcs_run(struct qb_ipcs_service *s)
103  	{
104  		int32_t res = 0;
105  	
106  		if (s->poll_fns.dispatch_add == NULL ||
107  		    s->poll_fns.dispatch_mod == NULL ||
108  		    s->poll_fns.dispatch_del == NULL) {
109  	
110  			res = -EINVAL;
111  			goto run_cleanup;
112  		}
113  	
114  		switch (s->type) {
115  		case QB_IPC_SOCKET:
116  			qb_ipcs_us_init((struct qb_ipcs_service *)s);
117  			break;
118  		case QB_IPC_SHM:
119  	#ifdef DISABLE_IPC_SHM
120  			res = -ENOTSUP;
121  	#else
122  			qb_ipcs_shm_init((struct qb_ipcs_service *)s);
123  	#endif /* DISABLE_IPC_SHM */
124  			break;
125  		case QB_IPC_POSIX_MQ:
126  		case QB_IPC_SYSV_MQ:
127  			res = -ENOTSUP;
128  			break;
129  		default:
130  			res = -EINVAL;
131  			break;
132  		}
133  	
134  		if (res == 0) {
135  			res = qb_ipcs_us_publish(s);
136  			if (res < 0) {
137  				(void)qb_ipcs_us_withdraw(s);
138  				goto run_cleanup;
139  			}
140  		}
141  	
142  	run_cleanup:
143  		if (res < 0) {
144  			/* Failed to run services, removing initial alloc reference. */
145  			qb_ipcs_unref(s);
146  		}
147  	
148  		return res;
149  	}
150  	
151  	static int32_t
152  	_modify_dispatch_descriptor_(struct qb_ipcs_connection *c)
153  	{
154  		qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod;
155  	
156  		if (c->service->type == QB_IPC_SOCKET) {
157  			return disp_mod(c->service->poll_priority,
158  					c->event.u.us.sock,
159  					c->poll_events, c,
160  					qb_ipcs_dispatch_connection_request);
161  		} else {
162  			return disp_mod(c->service->poll_priority,
163  					c->setup.u.us.sock,
164  					c->poll_events, c,
165  					qb_ipcs_dispatch_connection_request);
166  		}
167  		return -EINVAL;
168  	}
169  	
170  	void
171  	qb_ipcs_request_rate_limit(struct qb_ipcs_service *s,
172  				   enum qb_ipcs_rate_limit rl)
173  	{
174  		struct qb_ipcs_connection *c;
175  		enum qb_loop_priority old_p = s->poll_priority;
176  		struct qb_list_head *pos;
177  		struct qb_list_head *n;
178  	
179  		switch (rl) {
180  		case QB_IPCS_RATE_FAST:
181  			s->poll_priority = QB_LOOP_HIGH;
182  			break;
183  		case QB_IPCS_RATE_SLOW:
184  		case QB_IPCS_RATE_OFF:
185  		case QB_IPCS_RATE_OFF_2:
186  			s->poll_priority = QB_LOOP_LOW;
187  			break;
188  		default:
189  		case QB_IPCS_RATE_NORMAL:
190  			s->poll_priority = QB_LOOP_MED;
191  			break;
192  		}
193  	
194  		qb_list_for_each_safe(pos, n, &s->connections) {
195  	
196  			c = qb_list_entry(pos, struct qb_ipcs_connection, list);
197  			qb_ipcs_connection_ref(c);
198  	
199  			if (rl == QB_IPCS_RATE_OFF) {
200  				qb_ipcs_flowcontrol_set(c, 1);
201  			} else if (rl == QB_IPCS_RATE_OFF_2) {
202  				qb_ipcs_flowcontrol_set(c, 2);
203  			} else {
204  				qb_ipcs_flowcontrol_set(c, QB_FALSE);
205  			}
206  			if (old_p != s->poll_priority) {
207  				(void)_modify_dispatch_descriptor_(c);
208  			}
209  			qb_ipcs_connection_unref(c);
210  		}
211  	}
212  	
213  	void
214  	qb_ipcs_ref(struct qb_ipcs_service *s)
215  	{
216  		qb_atomic_int_inc(&s->ref_count);
217  	}
218  	
219  	void
220  	qb_ipcs_unref(struct qb_ipcs_service *s)
221  	{
222  		int32_t free_it;
223  	
224  		assert(s->ref_count > 0);
225  		free_it = qb_atomic_int_dec_and_test(&s->ref_count);
226  		if (free_it) {
227  			qb_util_log(LOG_DEBUG, "%s() - destroying", __func__);
228  			free(s);
229  		}
230  	}
231  	
232  	void
233  	qb_ipcs_destroy(struct qb_ipcs_service *s)
234  	{
235  		struct qb_ipcs_connection *c = NULL;
236  		struct qb_list_head *pos;
237  		struct qb_list_head *n;
238  	
239  		if (s == NULL) {
240  			return;
241  		}
242  		qb_list_for_each_safe(pos, n, &s->connections) {
243  			c = qb_list_entry(pos, struct qb_ipcs_connection, list);
244  			if (c == NULL) {
245  				continue;
246  			}
247  			qb_ipcs_disconnect(c);
248  		}
249  		(void)qb_ipcs_us_withdraw(s);
250  	
251  		/* service destroyed, remove initial alloc ref */
252  		qb_ipcs_unref(s);
253  	}
254  	
255  	/*
256  	 * connection API
257  	 */
258  	static struct qb_ipc_one_way *
259  	_event_sock_one_way_get(struct qb_ipcs_connection * c)
260  	{
261  		if (c->service->needs_sock_for_poll) {
262  			return &c->setup;
263  		}
264  		if (c->event.type == QB_IPC_SOCKET) {
265  			return &c->event;
266  		}
267  		return NULL;
268  	}
269  	
270  	static struct qb_ipc_one_way *
271  	_response_sock_one_way_get(struct qb_ipcs_connection * c)
272  	{
273  		if (c->service->needs_sock_for_poll) {
274  			return &c->setup;
275  		}
276  		if (c->response.type == QB_IPC_SOCKET) {
277  			return &c->response;
278  		}
279  		return NULL;
280  	}
281  	
282  	ssize_t
283  	qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
284  			      size_t size)
285  	{
286  		ssize_t res;
287  	
288  		if (c == NULL) {
289  			return -EINVAL;
290  		}
291  		qb_ipcs_connection_ref(c);
292  		res = c->service->funcs.send(&c->response, data, size);
293  		if (res == size) {
294  			c->stats.responses++;
295  		} else if (res == -EAGAIN || res == -ETIMEDOUT) {
296  			struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
297  			if (ow) {
298  				ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
299  				if (res2 < 0) {
300  					res = res2;
301  				}
302  			}
303  			c->stats.send_retries++;
304  		}
305  		qb_ipcs_connection_unref(c);
306  	
307  		return res;
308  	}
309  	
310  	ssize_t
311  	qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * iov,
312  			       size_t iov_len)
313  	{
314  		ssize_t res;
315  	
316  		if (c == NULL) {
317  			return -EINVAL;
318  		}
319  		qb_ipcs_connection_ref(c);
320  		res = c->service->funcs.sendv(&c->response, iov, iov_len);
321  		if (res > 0) {
322  			c->stats.responses++;
323  		} else if (res == -EAGAIN || res == -ETIMEDOUT) {
324  			struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
325  			if (ow) {
326  				ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
327  				if (res2 < 0) {
328  					res = res2;
329  				}
330  			}
331  			c->stats.send_retries++;
332  		}
333  		qb_ipcs_connection_unref(c);
334  	
335  		return res;
336  	}
337  	
338  	static int32_t
339  	resend_event_notifications(struct qb_ipcs_connection *c)
340  	{
341  		ssize_t res = 0;
342  	
343  		if (!c->service->needs_sock_for_poll) {
344  			return res;
345  		}
346  	
347  		if (c->outstanding_notifiers > 0) {
348  			res = qb_ipc_us_send(&c->setup, c->receive_buf,
349  					     c->outstanding_notifiers);
350  		}
351  		if (res > 0) {
352  			c->outstanding_notifiers -= res;
353  		}
354  	
355  		assert(c->outstanding_notifiers >= 0);
356  		if (c->outstanding_notifiers == 0) {
357  			c->poll_events = POLLIN | POLLPRI | POLLNVAL;
358  			(void)_modify_dispatch_descriptor_(c);
359  		}
360  		return res;
361  	}
362  	
363  	static int32_t
364  	new_event_notification(struct qb_ipcs_connection * c)
365  	{
366  		ssize_t res = 0;
367  	
368  		if (!c->service->needs_sock_for_poll) {
369  			return res;
370  		}
371  	
372  		assert(c->outstanding_notifiers >= 0);
373  		if (c->outstanding_notifiers > 0) {
374  			c->outstanding_notifiers++;
375  			res = resend_event_notifications(c);
376  		} else {
377  			res = qb_ipc_us_send(&c->setup, &c->outstanding_notifiers, 1);
378  			if (res == -EAGAIN) {
379  				/*
380  				 * notify the client later, when we can.
381  				 */
382  				c->outstanding_notifiers++;
383  				c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNVAL;
384  				(void)_modify_dispatch_descriptor_(c);
385  			}
386  		}
387  		return res;
388  	}
389  	
390  	ssize_t
391  	qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size)
392  	{
393  		ssize_t res;
394  		ssize_t resn;
395  	
396  		if (c == NULL) {
397  			return -EINVAL;
398  		} else if (size > c->event.max_msg_size) {
399  			return -EMSGSIZE;
400  		}
401  	
402  		qb_ipcs_connection_ref(c);
403  		res = c->service->funcs.send(&c->event, data, size);
404  		if (res == size) {
405  			c->stats.events++;
406  			resn = new_event_notification(c);
407  			if (resn < 0 && resn != -EAGAIN && resn != -ENOBUFS) {
408  				errno = -resn;
409  				qb_util_perror(LOG_DEBUG,
410  					       "new_event_notification (%s)",
411  					       c->description);
412  				res = resn;
413  			}
414  		} else if (res == -EAGAIN || res == -ETIMEDOUT) {
415  			struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
416  	
417  			if (c->outstanding_notifiers > 0) {
418  				resn = resend_event_notifications(c);
419  			}
420  			if (ow) {
421  				resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
422  				if (resn < 0) {
423  					res = resn;
424  				}
425  			}
426  			c->stats.send_retries++;
427  		}
428  	
429  		qb_ipcs_connection_unref(c);
430  		return res;
431  	}
432  	
433  	ssize_t
434  	qb_ipcs_event_sendv(struct qb_ipcs_connection * c,
435  			    const struct iovec * iov, size_t iov_len)
436  	{
437  		ssize_t res;
438  		ssize_t resn;
439  	
440  		if (c == NULL) {
441  			return -EINVAL;
442  		}
443  		qb_ipcs_connection_ref(c);
444  	
445  		res = c->service->funcs.sendv(&c->event, iov, iov_len);
446  		if (res > 0) {
447  			c->stats.events++;
448  			resn = new_event_notification(c);
449  			if (resn < 0 && resn != -EAGAIN) {
450  				errno = -resn;
451  				qb_util_perror(LOG_DEBUG,
452  					       "new_event_notification (%s)",
453  					       c->description);
454  				res = resn;
455  			}
456  		} else if (res == -EAGAIN || res == -ETIMEDOUT) {
457  			struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
458  	
459  			if (c->outstanding_notifiers > 0) {
460  				resn = resend_event_notifications(c);
461  			}
462  			if (ow) {
463  				resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
464  				if (resn < 0) {
465  					res = resn;
466  				}
467  			}
468  			c->stats.send_retries++;
469  		}
470  	
471  		qb_ipcs_connection_unref(c);
472  		return res;
473  	}
474  	
475  	qb_ipcs_connection_t *
476  	qb_ipcs_connection_first_get(struct qb_ipcs_service * s)
477  	{
478  		struct qb_ipcs_connection *c;
479  	
480  		if (qb_list_empty(&s->connections)) {
481  			return NULL;
482  		}
483  	
484  		c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection,
485  					list);
486  		qb_ipcs_connection_ref(c);
487  	
488  		return c;
489  	}
490  	
491  	qb_ipcs_connection_t *
492  	qb_ipcs_connection_next_get(struct qb_ipcs_service * s,
493  				    struct qb_ipcs_connection * current)
494  	{
495  		struct qb_ipcs_connection *c;
496  	
497  		if (current == NULL ||
498  		    qb_list_is_last(&current->list, &s->connections)) {
499  			return NULL;
500  		}
501  	
502  		c = qb_list_first_entry(&current->list, struct qb_ipcs_connection,
503  					list);
504  		qb_ipcs_connection_ref(c);
505  	
506  		return c;
507  	}
508  	
509  	int32_t
510  	qb_ipcs_service_id_get(struct qb_ipcs_connection * c)
511  	{
512  		if (c == NULL) {
513  			return -EINVAL;
514  		}
515  		return c->service->service_id;
516  	}
517  	
518  	struct qb_ipcs_connection *
519  	qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
520  	{
521  		struct qb_ipcs_connection *c =
522  		    calloc(1, sizeof(struct qb_ipcs_connection));
523  	
524  		if (c == NULL) {
525  			return NULL;
526  		}
527  	
528  		c->pid = 0;
529  		c->euid = -1;
530  		c->egid = -1;
531  		c->receive_buf = NULL;
532  		c->context = NULL;
533  		c->fc_enabled = QB_FALSE;
534  		c->state = QB_IPCS_CONNECTION_INACTIVE;
535  		c->poll_events = POLLIN | POLLPRI | POLLNVAL;
536  	
537  		c->setup.type = s->type;
538  		c->request.type = s->type;
539  		c->response.type = s->type;
540  		c->event.type = s->type;
541  		(void)strlcpy(c->description, "not set yet", CONNECTION_DESCRIPTION);
542  	
543  		/* initial alloc ref */
544  		qb_ipcs_connection_ref(c);
545  	
546  		/*
547  		 * The connection makes use of the service object. Give the connection
548  		 * a reference to the service so we know the service can never be destroyed
549  		 * until the connection is done with it.
550  		 */
551  		qb_ipcs_ref(s);
552  		c->service = s;
553  		qb_list_init(&c->list);
554  	
555  		return c;
556  	}
557  	
558  	void
559  	qb_ipcs_connection_ref(struct qb_ipcs_connection *c)
560  	{
561  		if (c) {
562  			qb_atomic_int_inc(&c->refcount);
563  		}
564  	}
565  	
566  	void
567  	qb_ipcs_connection_unref(struct qb_ipcs_connection *c)
568  	{
569  		int32_t free_it;
570  	
571  		if (c == NULL) {
572  			return;
573  		}
574  		if (c->refcount < 1) {
575  			qb_util_log(LOG_ERR, "ref:%d state:%d (%s)",
576  				    c->refcount, c->state, c->description);
577  			assert(0);
578  		}
579  		free_it = qb_atomic_int_dec_and_test(&c->refcount);
580  		if (free_it) {
581  			qb_list_del(&c->list);
582  			if (c->service->serv_fns.connection_destroyed) {
583  				c->service->serv_fns.connection_destroyed(c);
584  			}
585  			c->service->funcs.disconnect(c);
586  			/* Let go of the connection's reference to the service */
587  			qb_ipcs_unref(c->service);
588  			free(c->receive_buf);
589  			free(c);
590  		}
591  	}
592  	
593  	void
594  	qb_ipcs_disconnect(struct qb_ipcs_connection *c)
595  	{
596  		int32_t res = 0;
597  		qb_loop_job_dispatch_fn rerun_job;
598  	
599  		if (c == NULL) {
600  			return;
601  		}
602  		qb_util_log(LOG_DEBUG, "%s(%s) state:%d",
603  			    __func__, c->description, c->state);
604  	
605  		if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
606  			c->service->funcs.disconnect(c);
607  			c->state = QB_IPCS_CONNECTION_INACTIVE;
608  			c->service->stats.closed_connections++;
609  	
610  			/* This removes the initial alloc ref */
611  			qb_ipcs_connection_unref(c);
612  	
613  			/* return early as it's an incomplete connection.
614  			 */
615  			return;
616  		}
617  		if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) {
618  			c->service->funcs.disconnect(c);
619  			c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN;
620  			c->service->stats.active_connections--;
621  			c->service->stats.closed_connections++;
622  		}
623  		if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) {
624  			int scheduled_retry = 0;
625  			res = 0;
626  			if (c->service->serv_fns.connection_closed) {
627  				res = c->service->serv_fns.connection_closed(c);
628  			}
629  			if (res != 0) {
630  				/* OK, so they want the connection_closed
631  				 * function re-run */
632  				rerun_job =
633  				    (qb_loop_job_dispatch_fn) qb_ipcs_disconnect;
634  				res = c->service->poll_fns.job_add(QB_LOOP_LOW,
635  								   c, rerun_job);
636  				if (res == 0) {
637  					/* this function is going to be called again.
638  					 * so hold off on the unref */
639  					scheduled_retry = 1;
640  				}
641  			}
642  			remove_tempdir(c->description);
643  			if (scheduled_retry == 0) {
644  				/* This removes the initial alloc ref */
645  				qb_ipcs_connection_unref(c);
646  			}
647  		}
648  	
649  	}
650  	
651  	static void
652  	qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable)
653  	{
654  		if (c == NULL) {
655  			return;
656  		}
657  		if (c->fc_enabled != fc_enable) {
658  			c->service->funcs.fc_set(&c->request, fc_enable);
659  			c->fc_enabled = fc_enable;
660  			c->stats.flow_control_state = fc_enable;
661  			c->stats.flow_control_count++;
662  		}
663  	}
664  	
665  	static int32_t
666  	_process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout)
667  	{
668  		int32_t res = 0;
669  		ssize_t size;
670  		struct qb_ipc_request_header *hdr;
671  	
(1) Event cond_true: Condition "c->service->funcs.peek", taking true branch.
(2) Event cond_true: Condition "c->service->funcs.reclaim", taking true branch.
672  		if (c->service->funcs.peek && c->service->funcs.reclaim) {
673  			size = c->service->funcs.peek(&c->request, (void **)&hdr,
674  						      ms_timeout);
(3) Event if_fallthrough: Falling through to end of if statement.
675  		} else {
676  			hdr = c->receive_buf;
677  			size = c->service->funcs.recv(&c->request,
678  						      hdr,
679  						      c->request.max_msg_size,
680  						      ms_timeout);
(4) Event if_end: End of if statement.
681  		}
(5) Event cond_true: Condition "size < 0", taking true branch.
682  		if (size < 0) {
(6) Event cond_true: Condition "size != -11", taking true branch.
(7) Event cond_true: Condition "size != -110", taking true branch.
683  			if (size != -EAGAIN && size != -ETIMEDOUT) {
684  				qb_util_perror(LOG_DEBUG,
685  					       "recv from client connection failed (%s)",
686  					       c->description);
(8) Event if_fallthrough: Falling through to end of if statement.
687  			} else {
688  				c->stats.recv_retries++;
(9) Event if_end: End of if statement.
689  			}
(10) Event cast_overflow: Truncation due to cast operation on "size" from 64 to 32 bits.
(11) Event overflow_assign: "res" is assigned from "size".
Also see events: [return_overflow]
690  			res = size;
(12) Event goto: Jumping to label "cleanup".
691  			goto cleanup;
692  		} else if (size == 0 || hdr->id == QB_IPC_MSG_DISCONNECT) {
693  			qb_util_log(LOG_DEBUG, "client requesting a disconnect (%s)",
694  				    c->description);
695  			res = -ESHUTDOWN;
696  			goto cleanup;
697  		} else {
698  			c->stats.requests++;
699  			res = c->service->serv_fns.msg_process(c, hdr, hdr->size);
700  			/* 0 == good, negative == backoff */
701  			if (res < 0) {
702  				res = -ENOBUFS;
703  			} else {
704  				res = size;
705  			}
706  		}
707  	
708  		if (c->service->funcs.peek && c->service->funcs.reclaim) {
709  			c->service->funcs.reclaim(&c->request);
710  		}
711  	
(13) Event label: Reached label "cleanup".
712  	cleanup:
(14) Event return_overflow: "res", which might have overflowed, is returned from the function.
Also see events: [cast_overflow][overflow_assign]
713  		return res;
714  	}
715  	
716  	#define IPC_REQUEST_TIMEOUT 10
717  	#define MAX_RECV_MSGS 50
718  	
719  	static ssize_t
720  	_request_q_len_get(struct qb_ipcs_connection *c)
721  	{
722  		ssize_t q_len;
723  		if (c->service->funcs.q_len_get) {
724  			q_len = c->service->funcs.q_len_get(&c->request);
725  			if (q_len <= 0) {
726  				return q_len;
727  			}
728  			if (c->service->poll_priority == QB_LOOP_MED) {
729  				q_len = QB_MIN(q_len, 5);
730  			} else if (c->service->poll_priority == QB_LOOP_LOW) {
731  				q_len = 1;
732  			} else {
733  				q_len = QB_MIN(q_len, MAX_RECV_MSGS);
734  			}
735  		} else {
736  			q_len = 1;
737  		}
738  		return q_len;
739  	}
740  	
741  	int32_t
742  	qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data)
743  	{
744  		struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
745  		char bytes[MAX_RECV_MSGS];
746  		int32_t res = 0;
747  		int32_t res2;
748  		int32_t recvd = 0;
749  		ssize_t avail;
750  	
751  		if (c == NULL) {
752  			res = -EINVAL;
753  			goto dispatch_cleanup;
754  		}
755  	
756  		if (revents & POLLNVAL) {
757  			qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
758  			res = -EINVAL;
759  			goto dispatch_cleanup;
760  		}
761  		if (revents & POLLHUP) {
762  			qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
763  			res = -ESHUTDOWN;
764  			goto dispatch_cleanup;
765  		}
766  	
767  		if (revents & POLLOUT) {
768  			/* try resend events now that fd can write */
769  			res = resend_event_notifications(c);
770  			if (res < 0 && res != -EAGAIN) {
771  				errno = -res;
772  				qb_util_perror(LOG_WARNING,
773  					       "resend_event_notifications (%s)",
774  					       c->description);
775  			}
776  			/* nothing to read */
777  			if ((revents & POLLIN) == 0) {
778  				res = 0;
779  				goto dispatch_cleanup;
780  			}
781  		}
782  		if (c->fc_enabled) {
783  			res = 0;
784  			goto dispatch_cleanup;
785  		}
786  		avail = _request_q_len_get(c);
787  	
788  		if (c->service->needs_sock_for_poll && avail == 0) {
789  			res2 = qb_ipc_us_recv(&c->setup, bytes, 1, 0);
790  			if (qb_ipc_us_sock_error_is_disconnected(res2)) {
791  				errno = -res2;
792  				qb_util_perror(LOG_WARNING, "conn (%s) disconnected",
793  					       c->description);
794  				res = -ESHUTDOWN;
795  				goto dispatch_cleanup;
796  			} else {
797  				qb_util_log(LOG_WARNING,
798  					    "conn (%s) Nothing in q but got POLLIN on fd:%d (res2:%d)",
799  					    c->description, fd, res2);
800  				res = 0;
801  				goto dispatch_cleanup;
802  			}
803  		}
804  	
805  		do {
806  			res = _process_request_(c, IPC_REQUEST_TIMEOUT);
807  	
808  			if (res == -ESHUTDOWN) {
809  				goto dispatch_cleanup;
810  			}
811  	
812  			if (res > 0 || res == -ENOBUFS || res == -EINVAL) {
813  				recvd++;
814  			}
815  			if (res > 0) {
816  				avail--;
817  			}
818  		} while (avail > 0 && res > 0 && !c->fc_enabled);
819  	
820  		if (c->service->needs_sock_for_poll && recvd > 0) {
821  			res2 = qb_ipc_us_recv(&c->setup, bytes, recvd, -1);
822  			if (qb_ipc_us_sock_error_is_disconnected(res2)) {
823  				errno = -res2;
824  				qb_util_perror(LOG_ERR, "error receiving from setup sock (%s)", c->description);
825  	
826  				res = -ESHUTDOWN;
827  				goto dispatch_cleanup;
828  			}
829  		}
830  	
831  		res = QB_MIN(0, res);
832  		if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) {
833  			res = 0;
834  		}
835  		if (res != 0) {
836  			if (res != -ENOTCONN) {
837  				/*
838  				 * Abnormal state (ENOTCONN is normal shutdown).
839  				 */
840  				errno = -res;
841  				qb_util_perror(LOG_ERR, "request returned error (%s)",
842  					       c->description);
843  			}
844  		}
845  	
846  	dispatch_cleanup:
847  		if (res != 0) {
848  			qb_ipcs_disconnect(c);
849  		}
850  		return res;
851  	}
852  	
853  	void
854  	qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context)
855  	{
856  		if (c == NULL) {
857  			return;
858  		}
859  		c->context = context;
860  	}
861  	
862  	void *
863  	qb_ipcs_context_get(struct qb_ipcs_connection *c)
864  	{
865  		if (c == NULL) {
866  			return NULL;
867  		}
868  		return c->context;
869  	}
870  	
871  	void *
872  	qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c)
873  	{
874  		if (c == NULL || c->service == NULL) {
875  			return NULL;
876  		}
877  		return c->service->context;
878  	}
879  	
880  	int32_t
881  	qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c,
882  				     struct qb_ipcs_connection_stats * stats,
883  				     int32_t clear_after_read)
884  	{
885  		if (c == NULL) {
886  			return -EINVAL;
887  		}
888  		memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats));
889  		if (clear_after_read) {
890  			memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
891  			c->stats.client_pid = c->pid;
892  		}
893  		return 0;
894  	}
895  	
896  	struct qb_ipcs_connection_stats_2*
897  	qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c,
898  				       int32_t clear_after_read)
899  	{
900  		struct qb_ipcs_connection_stats_2 * stats;
901  	
902  		if (c == NULL) {
903  			errno = EINVAL;
904  			return NULL;
905  		}
906  		stats = calloc(1, sizeof(struct qb_ipcs_connection_stats_2));
907  		if (stats == NULL) {
908  			return NULL;
909  		}
910  	
911  		memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats_2));
912  	
913  		if (c->service->funcs.q_len_get) {
914  			stats->event_q_length = c->service->funcs.q_len_get(&c->event);
915  		} else {
916  			stats->event_q_length = 0;
917  		}
918  		if (clear_after_read) {
919  			memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
920  			c->stats.client_pid = c->pid;
921  		}
922  		return stats;
923  	}
924  	
925  	int32_t
926  	qb_ipcs_stats_get(struct qb_ipcs_service * s,
927  			  struct qb_ipcs_stats * stats, int32_t clear_after_read)
928  	{
929  		if (s == NULL) {
930  			return -EINVAL;
931  		}
932  		memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats));
933  		if (clear_after_read) {
934  			memset(&s->stats, 0, sizeof(struct qb_ipcs_stats));
935  		}
936  		return 0;
937  	}
938  	
939  	void
940  	qb_ipcs_connection_auth_set(qb_ipcs_connection_t *c, uid_t uid,
941  				    gid_t gid, mode_t mode)
942  	{
943  		if (c) {
944  			c->auth.uid = uid;
945  			c->auth.gid = gid;
946  			c->auth.mode = mode;
947  		}
948  	}
949  	
950  	int32_t
951  	qb_ipcs_connection_get_buffer_size(qb_ipcs_connection_t *c)
952  	{
953  		if (c == NULL) {
954  			return -EINVAL;
955  		}
956  	
957  		/* request, response, and event shoud all have the same
958  		 * buffer size allocated. It doesn't matter which we return
959  		 * here. */
960  		return c->response.max_msg_size;
961  	}
962  	
963  	void qb_ipcs_enforce_buffer_size(qb_ipcs_service_t *s, uint32_t buf_size)
964  	{
965  		if (s == NULL) {
966  			return;
967  		}
968  		s->max_buffer_size = buf_size;
969  	}
970