1 /*
2 * Copyright (C) 2010 Red Hat, Inc.
3 *
4 * Author: Angus Salkeld <asalkeld@redhat.com>
5 *
6 * This file is part of libqb.
7 *
8 * libqb is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation, either version 2.1 of the License, or
11 * (at your option) any later version.
12 *
13 * libqb is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with libqb. If not, see <http://www.gnu.org/licenses/>.
20 */
21 #include "os_base.h"
22 #include <poll.h>
23
24 #include "util_int.h"
25 #include "ipc_int.h"
26 #include <qb/qbdefs.h>
27 #include <qb/qbatomic.h>
28 #include <qb/qbipcs.h>
29
30 static void qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c,
31 int32_t fc_enable);
32 static int32_t
33 new_event_notification(struct qb_ipcs_connection * c);
34
35
36 qb_ipcs_service_t *
37 qb_ipcs_create(const char *name,
38 int32_t service_id,
39 enum qb_ipc_type type, struct qb_ipcs_service_handlers *handlers)
40 {
41 struct qb_ipcs_service *s;
42
43 s = calloc(1, sizeof(struct qb_ipcs_service));
44 if (s == NULL) {
45 return NULL;
46 }
47 if (type == QB_IPC_NATIVE) {
48 #ifdef DISABLE_IPC_SHM
49 s->type = QB_IPC_SOCKET;
50 #else
51 s->type = QB_IPC_SHM;
52 #endif /* DISABLE_IPC_SHM */
53 } else {
54 s->type = type;
55 }
56
57 s->pid = getpid();
58 s->needs_sock_for_poll = QB_FALSE;
59 s->poll_priority = QB_LOOP_MED;
60
61 /* Initial alloc ref */
62 qb_ipcs_ref(s);
63
64 s->service_id = service_id;
65 (void)strlcpy(s->name, name, NAME_MAX);
66
67 s->serv_fns.connection_accept = handlers->connection_accept;
68 s->serv_fns.connection_created = handlers->connection_created;
69 s->serv_fns.msg_process = handlers->msg_process;
70 s->serv_fns.connection_closed = handlers->connection_closed;
71 s->serv_fns.connection_destroyed = handlers->connection_destroyed;
72
73 qb_list_init(&s->connections);
74
75 return s;
76 }
77
78 void
79 qb_ipcs_poll_handlers_set(struct qb_ipcs_service *s,
80 struct qb_ipcs_poll_handlers *handlers)
81 {
82 s->poll_fns.job_add = handlers->job_add;
83 s->poll_fns.dispatch_add = handlers->dispatch_add;
84 s->poll_fns.dispatch_mod = handlers->dispatch_mod;
85 s->poll_fns.dispatch_del = handlers->dispatch_del;
86 }
87
88 void
89 qb_ipcs_service_context_set(qb_ipcs_service_t* s,
90 void *context)
91 {
92 s->context = context;
93 }
94
95 void *
96 qb_ipcs_service_context_get(qb_ipcs_service_t* s)
97 {
98 return s->context;
99 }
100
101 int32_t
102 qb_ipcs_run(struct qb_ipcs_service *s)
103 {
104 int32_t res = 0;
105
106 if (s->poll_fns.dispatch_add == NULL ||
107 s->poll_fns.dispatch_mod == NULL ||
108 s->poll_fns.dispatch_del == NULL) {
109
110 res = -EINVAL;
111 goto run_cleanup;
112 }
113
114 switch (s->type) {
115 case QB_IPC_SOCKET:
116 qb_ipcs_us_init((struct qb_ipcs_service *)s);
117 break;
118 case QB_IPC_SHM:
119 #ifdef DISABLE_IPC_SHM
120 res = -ENOTSUP;
121 #else
122 qb_ipcs_shm_init((struct qb_ipcs_service *)s);
123 #endif /* DISABLE_IPC_SHM */
124 break;
125 case QB_IPC_POSIX_MQ:
126 case QB_IPC_SYSV_MQ:
127 res = -ENOTSUP;
128 break;
129 default:
130 res = -EINVAL;
131 break;
132 }
133
134 if (res == 0) {
135 res = qb_ipcs_us_publish(s);
136 if (res < 0) {
137 (void)qb_ipcs_us_withdraw(s);
138 goto run_cleanup;
139 }
140 }
141
142 run_cleanup:
143 if (res < 0) {
144 /* Failed to run services, removing initial alloc reference. */
145 qb_ipcs_unref(s);
146 }
147
148 return res;
149 }
150
151 static int32_t
152 _modify_dispatch_descriptor_(struct qb_ipcs_connection *c)
153 {
154 qb_ipcs_dispatch_mod_fn disp_mod = c->service->poll_fns.dispatch_mod;
155
156 if (c->service->type == QB_IPC_SOCKET) {
157 return disp_mod(c->service->poll_priority,
158 c->event.u.us.sock,
159 c->poll_events, c,
160 qb_ipcs_dispatch_connection_request);
161 } else {
162 return disp_mod(c->service->poll_priority,
163 c->setup.u.us.sock,
164 c->poll_events, c,
165 qb_ipcs_dispatch_connection_request);
166 }
167 return -EINVAL;
168 }
169
170 void
171 qb_ipcs_request_rate_limit(struct qb_ipcs_service *s,
172 enum qb_ipcs_rate_limit rl)
173 {
174 struct qb_ipcs_connection *c;
175 enum qb_loop_priority old_p = s->poll_priority;
176 struct qb_list_head *pos;
177 struct qb_list_head *n;
178
179 switch (rl) {
180 case QB_IPCS_RATE_FAST:
181 s->poll_priority = QB_LOOP_HIGH;
182 break;
183 case QB_IPCS_RATE_SLOW:
184 case QB_IPCS_RATE_OFF:
185 case QB_IPCS_RATE_OFF_2:
186 s->poll_priority = QB_LOOP_LOW;
187 break;
188 default:
189 case QB_IPCS_RATE_NORMAL:
190 s->poll_priority = QB_LOOP_MED;
191 break;
192 }
193
194 qb_list_for_each_safe(pos, n, &s->connections) {
195
196 c = qb_list_entry(pos, struct qb_ipcs_connection, list);
197 qb_ipcs_connection_ref(c);
198
199 if (rl == QB_IPCS_RATE_OFF) {
200 qb_ipcs_flowcontrol_set(c, 1);
201 } else if (rl == QB_IPCS_RATE_OFF_2) {
202 qb_ipcs_flowcontrol_set(c, 2);
203 } else {
204 qb_ipcs_flowcontrol_set(c, QB_FALSE);
205 }
206 if (old_p != s->poll_priority) {
207 (void)_modify_dispatch_descriptor_(c);
208 }
209 qb_ipcs_connection_unref(c);
210 }
211 }
212
213 void
214 qb_ipcs_ref(struct qb_ipcs_service *s)
215 {
216 qb_atomic_int_inc(&s->ref_count);
217 }
218
219 void
220 qb_ipcs_unref(struct qb_ipcs_service *s)
221 {
222 int32_t free_it;
223
224 assert(s->ref_count > 0);
225 free_it = qb_atomic_int_dec_and_test(&s->ref_count);
226 if (free_it) {
227 qb_util_log(LOG_DEBUG, "%s() - destroying", __func__);
228 free(s);
229 }
230 }
231
232 void
233 qb_ipcs_destroy(struct qb_ipcs_service *s)
234 {
235 struct qb_ipcs_connection *c = NULL;
236 struct qb_list_head *pos;
237 struct qb_list_head *n;
238
239 if (s == NULL) {
240 return;
241 }
242 qb_list_for_each_safe(pos, n, &s->connections) {
243 c = qb_list_entry(pos, struct qb_ipcs_connection, list);
244 if (c == NULL) {
245 continue;
246 }
247 qb_ipcs_disconnect(c);
248 }
249 (void)qb_ipcs_us_withdraw(s);
250
251 /* service destroyed, remove initial alloc ref */
252 qb_ipcs_unref(s);
253 }
254
255 /*
256 * connection API
257 */
258 static struct qb_ipc_one_way *
259 _event_sock_one_way_get(struct qb_ipcs_connection * c)
260 {
261 if (c->service->needs_sock_for_poll) {
262 return &c->setup;
263 }
264 if (c->event.type == QB_IPC_SOCKET) {
265 return &c->event;
266 }
267 return NULL;
268 }
269
270 static struct qb_ipc_one_way *
271 _response_sock_one_way_get(struct qb_ipcs_connection * c)
272 {
273 if (c->service->needs_sock_for_poll) {
274 return &c->setup;
275 }
276 if (c->response.type == QB_IPC_SOCKET) {
277 return &c->response;
278 }
279 return NULL;
280 }
281
282 ssize_t
283 qb_ipcs_response_send(struct qb_ipcs_connection *c, const void *data,
284 size_t size)
285 {
286 ssize_t res;
287
288 if (c == NULL) {
289 return -EINVAL;
290 }
291 qb_ipcs_connection_ref(c);
292 res = c->service->funcs.send(&c->response, data, size);
293 if (res == size) {
294 c->stats.responses++;
295 } else if (res == -EAGAIN || res == -ETIMEDOUT) {
296 struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
297 if (ow) {
298 ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
299 if (res2 < 0) {
300 res = res2;
301 }
302 }
303 c->stats.send_retries++;
304 }
305 qb_ipcs_connection_unref(c);
306
307 return res;
308 }
309
310 ssize_t
311 qb_ipcs_response_sendv(struct qb_ipcs_connection * c, const struct iovec * iov,
312 size_t iov_len)
313 {
314 ssize_t res;
315
316 if (c == NULL) {
317 return -EINVAL;
318 }
319 qb_ipcs_connection_ref(c);
320 res = c->service->funcs.sendv(&c->response, iov, iov_len);
321 if (res > 0) {
322 c->stats.responses++;
323 } else if (res == -EAGAIN || res == -ETIMEDOUT) {
324 struct qb_ipc_one_way *ow = _response_sock_one_way_get(c);
325 if (ow) {
326 ssize_t res2 = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
327 if (res2 < 0) {
328 res = res2;
329 }
330 }
331 c->stats.send_retries++;
332 }
333 qb_ipcs_connection_unref(c);
334
335 return res;
336 }
337
338 static int32_t
339 resend_event_notifications(struct qb_ipcs_connection *c)
340 {
341 ssize_t res = 0;
342
343 if (!c->service->needs_sock_for_poll) {
344 return res;
345 }
346
347 if (c->outstanding_notifiers > 0) {
348 res = qb_ipc_us_send(&c->setup, c->receive_buf,
349 c->outstanding_notifiers);
350 }
351 if (res > 0) {
352 c->outstanding_notifiers -= res;
353 }
354
355 assert(c->outstanding_notifiers >= 0);
356 if (c->outstanding_notifiers == 0) {
357 c->poll_events = POLLIN | POLLPRI | POLLNVAL;
358 (void)_modify_dispatch_descriptor_(c);
359 }
360 return res;
361 }
362
363 static int32_t
364 new_event_notification(struct qb_ipcs_connection * c)
365 {
366 ssize_t res = 0;
367
368 if (!c->service->needs_sock_for_poll) {
369 return res;
370 }
371
372 assert(c->outstanding_notifiers >= 0);
373 if (c->outstanding_notifiers > 0) {
374 c->outstanding_notifiers++;
375 res = resend_event_notifications(c);
376 } else {
377 res = qb_ipc_us_send(&c->setup, &c->outstanding_notifiers, 1);
378 if (res == -EAGAIN) {
379 /*
380 * notify the client later, when we can.
381 */
382 c->outstanding_notifiers++;
383 c->poll_events = POLLOUT | POLLIN | POLLPRI | POLLNVAL;
384 (void)_modify_dispatch_descriptor_(c);
385 }
386 }
387 return res;
388 }
389
390 ssize_t
391 qb_ipcs_event_send(struct qb_ipcs_connection * c, const void *data, size_t size)
392 {
393 ssize_t res;
394 ssize_t resn;
395
|
(1) Event cond_false: |
Condition "c == NULL", taking false branch. |
|
(1) Event cond_false: |
Condition "c == NULL", taking false branch. |
396 if (c == NULL) {
397 return -EINVAL;
|
(2) Event else_branch: |
Reached else branch. |
|
(3) Event cond_false: |
Condition "size > c->event.max_msg_size", taking false branch. |
|
(2) Event else_branch: |
Reached else branch. |
|
(3) Event cond_false: |
Condition "size > c->event.max_msg_size", taking false branch. |
398 } else if (size > c->event.max_msg_size) {
399 return -EMSGSIZE;
|
(4) Event if_end: |
End of if statement. |
|
(4) Event if_end: |
End of if statement. |
400 }
401
402 qb_ipcs_connection_ref(c);
403 res = c->service->funcs.send(&c->event, data, size);
|
(5) Event cond_true: |
Condition "res == size", taking true branch. |
|
(5) Event cond_true: |
Condition "res == size", taking true branch. |
404 if (res == size) {
405 c->stats.events++;
406 resn = new_event_notification(c);
|
(6) Event cond_false: |
Condition "resn < 0", taking false branch. |
|
(6) Event cond_false: |
Condition "resn < 0", taking false branch. |
407 if (resn < 0 && resn != -EAGAIN && resn != -ENOBUFS) {
408 errno = -resn;
409 qb_util_perror(LOG_DEBUG,
410 "new_event_notification (%s)",
411 c->description);
412 res = resn;
|
(7) Event if_end: |
End of if statement. |
|
(7) Event if_end: |
End of if statement. |
413 }
|
(8) Event if_fallthrough: |
Falling through to end of if statement. |
|
(8) Event if_fallthrough: |
Falling through to end of if statement. |
414 } else if (res == -EAGAIN || res == -ETIMEDOUT) {
415 struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
416
417 if (c->outstanding_notifiers > 0) {
418 resn = resend_event_notifications(c);
419 }
420 if (ow) {
421 resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
422 if (resn < 0) {
423 res = resn;
424 }
425 }
426 c->stats.send_retries++;
|
(9) Event if_end: |
End of if statement. |
|
(9) Event if_end: |
End of if statement. |
427 }
428
|
(10) Event freed_arg: |
"qb_ipcs_connection_unref" frees parameter "c". [details] |
|
(10) Event freed_arg: |
"qb_ipcs_connection_unref" frees parameter "c". [details] |
429 qb_ipcs_connection_unref(c);
430 return res;
431 }
432
433 ssize_t
434 qb_ipcs_event_sendv(struct qb_ipcs_connection * c,
435 const struct iovec * iov, size_t iov_len)
436 {
437 ssize_t res;
438 ssize_t resn;
439
440 if (c == NULL) {
441 return -EINVAL;
442 }
443 qb_ipcs_connection_ref(c);
444
445 res = c->service->funcs.sendv(&c->event, iov, iov_len);
446 if (res > 0) {
447 c->stats.events++;
448 resn = new_event_notification(c);
449 if (resn < 0 && resn != -EAGAIN) {
450 errno = -resn;
451 qb_util_perror(LOG_DEBUG,
452 "new_event_notification (%s)",
453 c->description);
454 res = resn;
455 }
456 } else if (res == -EAGAIN || res == -ETIMEDOUT) {
457 struct qb_ipc_one_way *ow = _event_sock_one_way_get(c);
458
459 if (c->outstanding_notifiers > 0) {
460 resn = resend_event_notifications(c);
461 }
462 if (ow) {
463 resn = qb_ipc_us_ready(ow, &c->setup, 0, POLLOUT);
464 if (resn < 0) {
465 res = resn;
466 }
467 }
468 c->stats.send_retries++;
469 }
470
471 qb_ipcs_connection_unref(c);
472 return res;
473 }
474
475 qb_ipcs_connection_t *
476 qb_ipcs_connection_first_get(struct qb_ipcs_service * s)
477 {
478 struct qb_ipcs_connection *c;
479
480 if (qb_list_empty(&s->connections)) {
481 return NULL;
482 }
483
484 c = qb_list_first_entry(&s->connections, struct qb_ipcs_connection,
485 list);
486 qb_ipcs_connection_ref(c);
487
488 return c;
489 }
490
491 qb_ipcs_connection_t *
492 qb_ipcs_connection_next_get(struct qb_ipcs_service * s,
493 struct qb_ipcs_connection * current)
494 {
495 struct qb_ipcs_connection *c;
496
497 if (current == NULL ||
498 qb_list_is_last(¤t->list, &s->connections)) {
499 return NULL;
500 }
501
502 c = qb_list_first_entry(¤t->list, struct qb_ipcs_connection,
503 list);
504 qb_ipcs_connection_ref(c);
505
506 return c;
507 }
508
509 int32_t
510 qb_ipcs_service_id_get(struct qb_ipcs_connection * c)
511 {
512 if (c == NULL) {
513 return -EINVAL;
514 }
515 return c->service->service_id;
516 }
517
518 struct qb_ipcs_connection *
519 qb_ipcs_connection_alloc(struct qb_ipcs_service *s)
520 {
521 struct qb_ipcs_connection *c =
522 calloc(1, sizeof(struct qb_ipcs_connection));
523
524 if (c == NULL) {
525 return NULL;
526 }
527
528 c->pid = 0;
529 c->euid = -1;
530 c->egid = -1;
531 c->receive_buf = NULL;
532 c->context = NULL;
533 c->fc_enabled = QB_FALSE;
534 c->state = QB_IPCS_CONNECTION_INACTIVE;
535 c->poll_events = POLLIN | POLLPRI | POLLNVAL;
536
537 c->setup.type = s->type;
538 c->request.type = s->type;
539 c->response.type = s->type;
540 c->event.type = s->type;
541 (void)strlcpy(c->description, "not set yet", CONNECTION_DESCRIPTION);
542
543 /* initial alloc ref */
544 qb_ipcs_connection_ref(c);
545
546 /*
547 * The connection makes use of the service object. Give the connection
548 * a reference to the service so we know the service can never be destroyed
549 * until the connection is done with it.
550 */
551 qb_ipcs_ref(s);
552 c->service = s;
553 qb_list_init(&c->list);
554
555 return c;
556 }
557
558 void
559 qb_ipcs_connection_ref(struct qb_ipcs_connection *c)
560 {
561 if (c) {
562 qb_atomic_int_inc(&c->refcount);
563 }
564 }
565
566 void
567 qb_ipcs_connection_unref(struct qb_ipcs_connection *c)
568 {
569 int32_t free_it;
570
|
(1) Event cond_false: |
Condition "c == NULL", taking false branch. |
|
(1) Event cond_false: |
Condition "c == NULL", taking false branch. |
571 if (c == NULL) {
572 return;
|
(2) Event if_end: |
End of if statement. |
|
(2) Event if_end: |
End of if statement. |
573 }
|
(3) Event cond_false: |
Condition "c->refcount < 1", taking false branch. |
|
(3) Event cond_false: |
Condition "c->refcount < 1", taking false branch. |
574 if (c->refcount < 1) {
575 qb_util_log(LOG_ERR, "ref:%d state:%d (%s)",
576 c->refcount, c->state, c->description);
577 assert(0);
|
(4) Event if_end: |
End of if statement. |
|
(4) Event if_end: |
End of if statement. |
578 }
|
(5) Event cond_true: |
Condition "qb_atomic_int_exchange_and_add(&c->refcount, -1) == 1", taking true branch. |
|
(5) Event cond_true: |
Condition "qb_atomic_int_exchange_and_add(&c->refcount, -1) == 1", taking true branch. |
579 free_it = qb_atomic_int_dec_and_test(&c->refcount);
|
(6) Event cond_true: |
Condition "free_it", taking true branch. |
|
(6) Event cond_true: |
Condition "free_it", taking true branch. |
580 if (free_it) {
581 qb_list_del(&c->list);
|
(7) Event cond_true: |
Condition "c->service->serv_fns.connection_destroyed", taking true branch. |
|
(7) Event cond_true: |
Condition "c->service->serv_fns.connection_destroyed", taking true branch. |
582 if (c->service->serv_fns.connection_destroyed) {
583 c->service->serv_fns.connection_destroyed(c);
584 }
585 c->service->funcs.disconnect(c);
586 /* Let go of the connection's reference to the service */
587 qb_ipcs_unref(c->service);
588 free(c->receive_buf);
|
(8) Event freed_arg: |
"free" frees parameter "c". |
|
(8) Event freed_arg: |
"free" frees parameter "c". |
589 free(c);
590 }
591 }
592
593 void
594 qb_ipcs_disconnect(struct qb_ipcs_connection *c)
595 {
596 int32_t res = 0;
597 qb_loop_job_dispatch_fn rerun_job;
598
599 if (c == NULL) {
600 return;
601 }
602 qb_util_log(LOG_DEBUG, "%s(%s) state:%d",
603 __func__, c->description, c->state);
604
605 if (c->state == QB_IPCS_CONNECTION_ACTIVE) {
606 c->service->funcs.disconnect(c);
607 c->state = QB_IPCS_CONNECTION_INACTIVE;
608 c->service->stats.closed_connections++;
609
610 /* This removes the initial alloc ref */
611 qb_ipcs_connection_unref(c);
612
613 /* return early as it's an incomplete connection.
614 */
615 return;
616 }
617 if (c->state == QB_IPCS_CONNECTION_ESTABLISHED) {
618 c->service->funcs.disconnect(c);
619 c->state = QB_IPCS_CONNECTION_SHUTTING_DOWN;
620 c->service->stats.active_connections--;
621 c->service->stats.closed_connections++;
622 }
623 if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN) {
624 int scheduled_retry = 0;
625 res = 0;
626 if (c->service->serv_fns.connection_closed) {
627 res = c->service->serv_fns.connection_closed(c);
628 }
629 if (res != 0) {
630 /* OK, so they want the connection_closed
631 * function re-run */
632 rerun_job =
633 (qb_loop_job_dispatch_fn) qb_ipcs_disconnect;
634 res = c->service->poll_fns.job_add(QB_LOOP_LOW,
635 c, rerun_job);
636 if (res == 0) {
637 /* this function is going to be called again.
638 * so hold off on the unref */
639 scheduled_retry = 1;
640 }
641 }
642 remove_tempdir(c->description);
643 if (scheduled_retry == 0) {
644 /* This removes the initial alloc ref */
645 qb_ipcs_connection_unref(c);
646 }
647 }
648
649 }
650
651 static void
652 qb_ipcs_flowcontrol_set(struct qb_ipcs_connection *c, int32_t fc_enable)
653 {
654 if (c == NULL) {
655 return;
656 }
657 if (c->fc_enabled != fc_enable) {
658 c->service->funcs.fc_set(&c->request, fc_enable);
659 c->fc_enabled = fc_enable;
660 c->stats.flow_control_state = fc_enable;
661 c->stats.flow_control_count++;
662 }
663 }
664
665 static int32_t
666 _process_request_(struct qb_ipcs_connection *c, int32_t ms_timeout)
667 {
668 int32_t res = 0;
669 ssize_t size;
670 struct qb_ipc_request_header *hdr;
671
672 if (c->service->funcs.peek && c->service->funcs.reclaim) {
673 size = c->service->funcs.peek(&c->request, (void **)&hdr,
674 ms_timeout);
675 } else {
676 hdr = c->receive_buf;
677 size = c->service->funcs.recv(&c->request,
678 hdr,
679 c->request.max_msg_size,
680 ms_timeout);
681 }
682 if (size < 0) {
683 if (size != -EAGAIN && size != -ETIMEDOUT) {
684 qb_util_perror(LOG_DEBUG,
685 "recv from client connection failed (%s)",
686 c->description);
687 } else {
688 c->stats.recv_retries++;
689 }
690 res = size;
691 goto cleanup;
692 } else if (size == 0 || hdr->id == QB_IPC_MSG_DISCONNECT) {
693 qb_util_log(LOG_DEBUG, "client requesting a disconnect (%s)",
694 c->description);
695 res = -ESHUTDOWN;
696 goto cleanup;
697 } else {
698 c->stats.requests++;
699 res = c->service->serv_fns.msg_process(c, hdr, hdr->size);
700 /* 0 == good, negative == backoff */
701 if (res < 0) {
702 res = -ENOBUFS;
703 } else {
704 res = size;
705 }
706 }
707
708 if (c->service->funcs.peek && c->service->funcs.reclaim) {
709 c->service->funcs.reclaim(&c->request);
710 }
711
712 cleanup:
713 return res;
714 }
715
716 #define IPC_REQUEST_TIMEOUT 10
717 #define MAX_RECV_MSGS 50
718
719 static ssize_t
720 _request_q_len_get(struct qb_ipcs_connection *c)
721 {
722 ssize_t q_len;
723 if (c->service->funcs.q_len_get) {
724 q_len = c->service->funcs.q_len_get(&c->request);
725 if (q_len <= 0) {
726 return q_len;
727 }
728 if (c->service->poll_priority == QB_LOOP_MED) {
729 q_len = QB_MIN(q_len, 5);
730 } else if (c->service->poll_priority == QB_LOOP_LOW) {
731 q_len = 1;
732 } else {
733 q_len = QB_MIN(q_len, MAX_RECV_MSGS);
734 }
735 } else {
736 q_len = 1;
737 }
738 return q_len;
739 }
740
741 int32_t
742 qb_ipcs_dispatch_connection_request(int32_t fd, int32_t revents, void *data)
743 {
744 struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
745 char bytes[MAX_RECV_MSGS];
746 int32_t res = 0;
747 int32_t res2;
748 int32_t recvd = 0;
749 ssize_t avail;
750
751 if (c == NULL) {
752 res = -EINVAL;
753 goto dispatch_cleanup;
754 }
755
756 if (revents & POLLNVAL) {
757 qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
758 res = -EINVAL;
759 goto dispatch_cleanup;
760 }
761 if (revents & POLLHUP) {
762 qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
763 res = -ESHUTDOWN;
764 goto dispatch_cleanup;
765 }
766
767 if (revents & POLLOUT) {
768 /* try resend events now that fd can write */
769 res = resend_event_notifications(c);
770 if (res < 0 && res != -EAGAIN) {
771 errno = -res;
772 qb_util_perror(LOG_WARNING,
773 "resend_event_notifications (%s)",
774 c->description);
775 }
776 /* nothing to read */
777 if ((revents & POLLIN) == 0) {
778 res = 0;
779 goto dispatch_cleanup;
780 }
781 }
782 if (c->fc_enabled) {
783 res = 0;
784 goto dispatch_cleanup;
785 }
786 avail = _request_q_len_get(c);
787
788 if (c->service->needs_sock_for_poll && avail == 0) {
789 res2 = qb_ipc_us_recv(&c->setup, bytes, 1, 0);
790 if (qb_ipc_us_sock_error_is_disconnected(res2)) {
791 errno = -res2;
792 qb_util_perror(LOG_WARNING, "conn (%s) disconnected",
793 c->description);
794 res = -ESHUTDOWN;
795 goto dispatch_cleanup;
796 } else {
797 qb_util_log(LOG_WARNING,
798 "conn (%s) Nothing in q but got POLLIN on fd:%d (res2:%d)",
799 c->description, fd, res2);
800 res = 0;
801 goto dispatch_cleanup;
802 }
803 }
804
805 do {
806 res = _process_request_(c, IPC_REQUEST_TIMEOUT);
807
808 if (res == -ESHUTDOWN) {
809 goto dispatch_cleanup;
810 }
811
812 if (res > 0 || res == -ENOBUFS || res == -EINVAL) {
813 recvd++;
814 }
815 if (res > 0) {
816 avail--;
817 }
818 } while (avail > 0 && res > 0 && !c->fc_enabled);
819
820 if (c->service->needs_sock_for_poll && recvd > 0) {
821 res2 = qb_ipc_us_recv(&c->setup, bytes, recvd, -1);
822 if (qb_ipc_us_sock_error_is_disconnected(res2)) {
823 errno = -res2;
824 qb_util_perror(LOG_ERR, "error receiving from setup sock (%s)", c->description);
825
826 res = -ESHUTDOWN;
827 goto dispatch_cleanup;
828 }
829 }
830
831 res = QB_MIN(0, res);
832 if (res == -EAGAIN || res == -ETIMEDOUT || res == -ENOBUFS) {
833 res = 0;
834 }
835 if (res != 0) {
836 if (res != -ENOTCONN) {
837 /*
838 * Abnormal state (ENOTCONN is normal shutdown).
839 */
840 errno = -res;
841 qb_util_perror(LOG_ERR, "request returned error (%s)",
842 c->description);
843 }
844 }
845
846 dispatch_cleanup:
847 if (res != 0) {
848 qb_ipcs_disconnect(c);
849 }
850 return res;
851 }
852
853 void
854 qb_ipcs_context_set(struct qb_ipcs_connection *c, void *context)
855 {
856 if (c == NULL) {
857 return;
858 }
859 c->context = context;
860 }
861
862 void *
863 qb_ipcs_context_get(struct qb_ipcs_connection *c)
864 {
865 if (c == NULL) {
866 return NULL;
867 }
868 return c->context;
869 }
870
871 void *
872 qb_ipcs_connection_service_context_get(qb_ipcs_connection_t *c)
873 {
874 if (c == NULL || c->service == NULL) {
875 return NULL;
876 }
877 return c->service->context;
878 }
879
880 int32_t
881 qb_ipcs_connection_stats_get(qb_ipcs_connection_t * c,
882 struct qb_ipcs_connection_stats * stats,
883 int32_t clear_after_read)
884 {
885 if (c == NULL) {
886 return -EINVAL;
887 }
888 memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats));
889 if (clear_after_read) {
890 memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
891 c->stats.client_pid = c->pid;
892 }
893 return 0;
894 }
895
896 struct qb_ipcs_connection_stats_2*
897 qb_ipcs_connection_stats_get_2(qb_ipcs_connection_t *c,
898 int32_t clear_after_read)
899 {
900 struct qb_ipcs_connection_stats_2 * stats;
901
902 if (c == NULL) {
903 errno = EINVAL;
904 return NULL;
905 }
906 stats = calloc(1, sizeof(struct qb_ipcs_connection_stats_2));
907 if (stats == NULL) {
908 return NULL;
909 }
910
911 memcpy(stats, &c->stats, sizeof(struct qb_ipcs_connection_stats_2));
912
913 if (c->service->funcs.q_len_get) {
914 stats->event_q_length = c->service->funcs.q_len_get(&c->event);
915 } else {
916 stats->event_q_length = 0;
917 }
918 if (clear_after_read) {
919 memset(&c->stats, 0, sizeof(struct qb_ipcs_connection_stats_2));
920 c->stats.client_pid = c->pid;
921 }
922 return stats;
923 }
924
925 int32_t
926 qb_ipcs_stats_get(struct qb_ipcs_service * s,
927 struct qb_ipcs_stats * stats, int32_t clear_after_read)
928 {
929 if (s == NULL) {
930 return -EINVAL;
931 }
932 memcpy(stats, &s->stats, sizeof(struct qb_ipcs_stats));
933 if (clear_after_read) {
934 memset(&s->stats, 0, sizeof(struct qb_ipcs_stats));
935 }
936 return 0;
937 }
938
939 void
940 qb_ipcs_connection_auth_set(qb_ipcs_connection_t *c, uid_t uid,
941 gid_t gid, mode_t mode)
942 {
943 if (c) {
944 c->auth.uid = uid;
945 c->auth.gid = gid;
946 c->auth.mode = mode;
947 }
948 }
949
950 int32_t
951 qb_ipcs_connection_get_buffer_size(qb_ipcs_connection_t *c)
952 {
953 if (c == NULL) {
954 return -EINVAL;
955 }
956
957 /* request, response, and event shoud all have the same
958 * buffer size allocated. It doesn't matter which we return
959 * here. */
960 return c->response.max_msg_size;
961 }
962
963 void qb_ipcs_enforce_buffer_size(qb_ipcs_service_t *s, uint32_t buf_size)
964 {
965 if (s == NULL) {
966 return;
967 }
968 s->max_buffer_size = buf_size;
969 }
970