1 /*
2 * Copyright (c) 2010 Red Hat, Inc.
3 *
4 * All rights reserved.
5 *
6 * Author: Angus Salkeld <asalkeld@redhat.com>
7 *
8 * This file is part of libqb.
9 *
10 * libqb is free software: you can redistribute it and/or modify
11 * it under the terms of the GNU Lesser General Public License as published by
12 * the Free Software Foundation, either version 2.1 of the License, or
13 * (at your option) any later version.
14 *
15 * libqb is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public License
21 * along with libqb. If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "os_base.h"
25 #include <sys/wait.h>
26 #include <sys/un.h>
27 #include <signal.h>
28 #include <stdbool.h>
29 #include <fcntl.h>
30
31 #ifdef HAVE_GLIB
32 #include <glib.h>
33 #endif
34
35 #include "check_common.h"
36
37 #include <qb/qbdefs.h>
38 #include <qb/qblog.h>
39 #include <qb/qbipcc.h>
40 #include <qb/qbipcs.h>
41 #include <qb/qbloop.h>
42
43 #ifdef HAVE_FAILURE_INJECTION
44 #include "_failure_injection.h"
45 #endif
46
47 #define NUM_STRESS_CONNECTIONS 5000
48
49 static char ipc_name[256];
50
51 #define DEFAULT_MAX_MSG_SIZE (8192*16)
52 #ifndef __clang__
53 static int CALCULATED_DGRAM_MAX_MSG_SIZE = 0;
54
55 #define DGRAM_MAX_MSG_SIZE \
56 (CALCULATED_DGRAM_MAX_MSG_SIZE == 0 ? \
57 CALCULATED_DGRAM_MAX_MSG_SIZE = qb_ipcc_verify_dgram_max_msg_size(DEFAULT_MAX_MSG_SIZE) : \
58 CALCULATED_DGRAM_MAX_MSG_SIZE)
59
60 #define MAX_MSG_SIZE (ipc_type == QB_IPC_SOCKET ? DGRAM_MAX_MSG_SIZE : DEFAULT_MAX_MSG_SIZE)
61
62 #else
63 /* because of clang's
64 'variable length array in structure' extension will never be supported;
65 assign default for SHM as we'll skip test that would use run-time
66 established value (via qb_ipcc_verify_dgram_max_msg_size), anyway */
67 #define MAX_MSG_SIZE DEFAULT_MAX_MSG_SIZE
68 #endif
69
70 /* The size the giant msg's data field needs to be to make
71 * this the largests msg we can successfully send. */
72 #define GIANT_MSG_DATA_SIZE (MAX_MSG_SIZE - sizeof(struct qb_ipc_response_header) - 8)
73
74 static int enforce_server_buffer;
75 static qb_ipcc_connection_t *conn;
76 static enum qb_ipc_type ipc_type;
77 static enum qb_loop_priority global_loop_prio = QB_LOOP_MED;
78 static bool global_use_glib;
79 static int global_pipefd[2];
80
81 enum my_msg_ids {
82 IPC_MSG_REQ_TX_RX,
83 IPC_MSG_RES_TX_RX,
84 IPC_MSG_REQ_DISPATCH,
85 IPC_MSG_RES_DISPATCH,
86 IPC_MSG_REQ_BULK_EVENTS,
87 IPC_MSG_RES_BULK_EVENTS,
88 IPC_MSG_REQ_STRESS_EVENT,
89 IPC_MSG_RES_STRESS_EVENT,
90 IPC_MSG_REQ_SELF_FEED,
91 IPC_MSG_RES_SELF_FEED,
92 IPC_MSG_REQ_SERVER_FAIL,
93 IPC_MSG_RES_SERVER_FAIL,
94 IPC_MSG_REQ_SERVER_DISCONNECT,
95 IPC_MSG_RES_SERVER_DISCONNECT,
96 };
97
98
99 /* these 2 functions from pacemaker code */
100 static enum qb_ipcs_rate_limit
101 conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
102 {
103 /* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */
104 enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL;
105 switch (prio) {
106 case QB_LOOP_LOW:
107 ret = QB_IPCS_RATE_SLOW;
108 break;
109 case QB_LOOP_HIGH:
110 ret = QB_IPCS_RATE_FAST;
111 break;
112 default:
113 qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d,"
114 " assuming QB_LOOP_MED", prio);
115 /* fall-through */
116 case QB_LOOP_MED:
117 break;
118 }
119 return ret;
120 }
121 #ifdef HAVE_GLIB
122 static gint
123 conv_prio_libqb2glib(enum qb_loop_priority prio)
124 {
125 gint ret = G_PRIORITY_DEFAULT;
126 switch (prio) {
127 case QB_LOOP_LOW:
128 ret = G_PRIORITY_LOW;
129 break;
130 case QB_LOOP_HIGH:
131 ret = G_PRIORITY_HIGH;
132 break;
133 default:
134 qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d,"
135 " assuming QB_LOOP_MED", prio);
136 /* fall-through */
137 case QB_LOOP_MED:
138 break;
139 }
140 return ret;
141 }
142
143 /* these 3 glue functions inspired from pacemaker, too */
144 static gboolean
145 gio_source_prepare(GSource *source, gint *timeout)
146 {
147 qb_enter();
148 *timeout = 500;
149 return FALSE;
150 }
151 static gboolean
152 gio_source_check(GSource *source)
153 {
154 qb_enter();
155 return TRUE;
156 }
157 static gboolean
158 gio_source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data)
159 {
160 gboolean ret = G_SOURCE_CONTINUE;
161 qb_enter();
162 if (callback) {
163 ret = callback(user_data);
164 }
165 return ret;
166 }
167 static GSourceFuncs gio_source_funcs = {
168 .prepare = gio_source_prepare,
169 .check = gio_source_check,
170 .dispatch = gio_source_dispatch,
171 };
172
173 #endif
174
175
176 /* Test Cases
177 *
178 * 1) basic send & recv different message sizes
179 *
180 * 2) send message to start dispatch (confirm receipt)
181 *
182 * 3) flow control
183 *
184 * 4) authentication
185 *
186 * 5) thread safety
187 *
188 * 6) cleanup
189 *
190 * 7) service availability
191 *
192 * 8) multiple services
193 *
194 * 9) setting perms on the sockets
195 */
196 static qb_loop_t *my_loop;
197 static qb_ipcs_service_t* s1;
198 static int32_t turn_on_fc = QB_FALSE;
199 static int32_t fc_enabled = 89;
200 static int32_t send_event_on_created = QB_FALSE;
201 static int32_t disconnect_after_created = QB_FALSE;
202 static int32_t num_bulk_events = 10;
203 static int32_t num_stress_events = 30000;
204 static int32_t reference_count_test = QB_FALSE;
205 static int32_t multiple_connections = QB_FALSE;
206 static int32_t set_perms_on_socket = QB_FALSE;
207
208
209 static int32_t
210 exit_handler(int32_t rsignal, void *data)
211 {
212 qb_log(LOG_DEBUG, "caught signal %d", rsignal);
213 qb_ipcs_destroy(s1);
214 exit(0);
215 }
216
217 static void
218 set_ipc_name(const char *prefix)
219 {
220 FILE *f;
221 char process_name[256];
222
223 /* The process-unique part of the IPC name has already been decided
224 * and stored in the file IPC_TEST_NAME_FILE
225 */
226 f = fopen(IPC_TEST_NAME_FILE, "r");
227 if (f) {
228 fgets(process_name, sizeof(process_name), f);
229 /* Remove any trailing LF that might be lurking */
230 if (process_name[strlen(process_name)-1] == '\n') {
231 process_name[strlen(process_name)-1] = '\0';
232 }
233 fclose(f);
234 snprintf(ipc_name, sizeof(ipc_name), "%.44s%s", prefix, process_name);
235 } else {
236 /* This is the old code, use only as a fallback */
237 static char t_sec[3] = "";
238 if (t_sec[0] == '\0') {
239 const char *const found = strrchr(__TIME__, ':');
240 strncpy(t_sec, found ? found + 1 : "-", sizeof(t_sec) - 1);
241 t_sec[sizeof(t_sec) - 1] = '\0';
242 }
243
244 snprintf(ipc_name, sizeof(ipc_name), "%.44s%s%lX%.4x", prefix, t_sec,
245 (unsigned long)getpid(), (unsigned) ((long) time(NULL) % (0x10000)));
246 }
247 }
248
249 static int
250 pipe_writer(int fd, int revents, void *data) {
251 qb_enter();
252 static const char buf[8] = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h' };
253
254 ssize_t wbytes = 0, wbytes_sum = 0;
255
256 //for (size_t i = 0; i < SIZE_MAX; i++) {
257 for (size_t i = 0; i < 4096; i++) {
258 wbytes_sum += wbytes;
259 if ((wbytes = write(fd, buf, sizeof(buf))) == -1) {
260 if (errno != EAGAIN) {
261 perror("write");
262 exit(-1);
263 }
264 break;
265 }
266 }
267 if (wbytes_sum > 0) {
268 qb_log(LOG_DEBUG, "written %zd bytes", wbytes_sum);
269 }
270 qb_leave();
271 return 1;
272 }
273
274 static int
275 pipe_reader(int fd, int revents, void *data) {
276 qb_enter();
277 ssize_t rbytes, rbytes_sum = 0;
278 size_t cnt = SIZE_MAX;
279 char buf[4096] = { '\0' };
280 while ((rbytes = read(fd, buf, sizeof(buf))) > 0 && rbytes < cnt) {
281 cnt -= rbytes;
282 rbytes_sum += rbytes;
283 }
284 if (rbytes_sum > 0) {
285 ck_assert(buf[0] != '\0'); /* avoid dead store elimination */
286 qb_log(LOG_DEBUG, "read %zd bytes", rbytes_sum);
287 sleep(1);
288 }
289 qb_leave();
290 return 1;
291 }
292
293 #if HAVE_GLIB
294 static gboolean
295 gio_pipe_reader(void *data) {
296 return (pipe_reader(*((int *) data), 0, NULL) > 0);
297 }
298 static gboolean
299 gio_pipe_writer(void *data) {
300 return (pipe_writer(*((int *) data), 0, NULL) > 0);
301 }
302 #endif
303
304 static int32_t
305 s1_msg_process_fn(qb_ipcs_connection_t *c,
306 void *data, size_t size)
307 {
308 struct qb_ipc_request_header *req_pt = (struct qb_ipc_request_header *)data;
309 struct qb_ipc_response_header response = { 0, };
310 ssize_t res;
311
312 if (req_pt->id == IPC_MSG_REQ_TX_RX) {
313 response.size = sizeof(struct qb_ipc_response_header);
314 response.id = IPC_MSG_RES_TX_RX;
315 response.error = 0;
316 res = qb_ipcs_response_send(c, &response, response.size);
317 if (res < 0) {
318 qb_perror(LOG_INFO, "qb_ipcs_response_send");
319 } else if (res != response.size) {
320 qb_log(LOG_DEBUG, "qb_ipcs_response_send %zd != %d",
321 res, response.size);
322 }
323 if (turn_on_fc) {
324 qb_ipcs_request_rate_limit(s1, QB_IPCS_RATE_OFF);
325 }
326 } else if (req_pt->id == IPC_MSG_REQ_DISPATCH) {
327 response.size = sizeof(struct qb_ipc_response_header);
328 response.id = IPC_MSG_RES_DISPATCH;
329 response.error = 0;
330 res = qb_ipcs_event_send(c, &response,
331 sizeof(response));
332 if (res < 0) {
333 qb_perror(LOG_INFO, "qb_ipcs_event_send");
334 }
335 } else if (req_pt->id == IPC_MSG_REQ_BULK_EVENTS) {
336 int32_t m;
337 int32_t num;
338 struct qb_ipcs_connection_stats_2 *stats;
339 uint32_t max_size = MAX_MSG_SIZE;
340
341 response.size = sizeof(struct qb_ipc_response_header);
342 response.error = 0;
343
344 stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE);
345 num = stats->event_q_length;
346 free(stats);
347
348 /* crazy large message */
349 res = qb_ipcs_event_send(c, &response, max_size*10);
350 ck_assert_int_eq(res, -EMSGSIZE);
351
352 /* send one event before responding */
353 res = qb_ipcs_event_send(c, &response, sizeof(response));
354 ck_assert_int_eq(res, sizeof(response));
355 response.id++;
356
357 /* There should be one more item in the event queue now. */
358 stats = qb_ipcs_connection_stats_get_2(c, QB_FALSE);
359 ck_assert_int_eq(stats->event_q_length - num, 1);
360 free(stats);
361
362 /* send response */
363 response.id = IPC_MSG_RES_BULK_EVENTS;
364 res = qb_ipcs_response_send(c, &response, response.size);
365 ck_assert_int_eq(res, sizeof(response));
366
367 /* send the rest of the events after the response */
368 for (m = 1; m < num_bulk_events; m++) {
369 res = qb_ipcs_event_send(c, &response, sizeof(response));
370
371 if (res == -EAGAIN || res == -ENOBUFS) {
372 /* retry */
373 usleep(1000);
374 m--;
375 continue;
376 }
377 ck_assert_int_eq(res, sizeof(response));
378 response.id++;
379 }
380
381 } else if (req_pt->id == IPC_MSG_REQ_STRESS_EVENT) {
382 struct {
383 struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
384 char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
385 uint32_t sent_msgs __attribute__ ((aligned(8)));
386 } __attribute__ ((aligned(8))) giant_event_send;
387 int32_t m;
388
389 response.size = sizeof(struct qb_ipc_response_header);
390 response.error = 0;
391
392 response.id = IPC_MSG_RES_STRESS_EVENT;
393 res = qb_ipcs_response_send(c, &response, response.size);
394 ck_assert_int_eq(res, sizeof(response));
395
396 giant_event_send.hdr.error = 0;
397 giant_event_send.hdr.id = IPC_MSG_RES_STRESS_EVENT;
398 for (m = 0; m < num_stress_events; m++) {
399 size_t sent_len = sizeof(struct qb_ipc_response_header);
400
401 if (((m+1) % 1000) == 0) {
402 sent_len = sizeof(giant_event_send);
403 giant_event_send.sent_msgs = m + 1;
404 }
405 giant_event_send.hdr.size = sent_len;
406
407 res = qb_ipcs_event_send(c, &giant_event_send, sent_len);
408 if (res < 0) {
409 if (res == -EAGAIN || res == -ENOBUFS) {
410 /* yield to the receive process */
411 usleep(1000);
412 m--;
413 continue;
414 } else {
415 qb_perror(LOG_DEBUG, "sending stress events");
416 ck_assert_int_eq(res, sent_len);
417 }
418 } else if (((m+1) % 1000) == 0) {
419 qb_log(LOG_DEBUG, "SENT: %d stress events sent", m+1);
420 }
421 giant_event_send.hdr.id++;
422 }
423
424 } else if (req_pt->id == IPC_MSG_REQ_SELF_FEED) {
425 if (pipe(global_pipefd) != 0) {
426 perror("pipefd");
427 ck_assert(0);
428 }
429 fcntl(global_pipefd[0], F_SETFL, O_NONBLOCK);
430 fcntl(global_pipefd[1], F_SETFL, O_NONBLOCK);
431 if (global_use_glib) {
432 #ifdef HAVE_GLIB
433 GSource *source_r, *source_w;
434 source_r = g_source_new(&gio_source_funcs, sizeof(GSource));
435 source_w = g_source_new(&gio_source_funcs, sizeof(GSource));
436 ck_assert(source_r != NULL && source_w != NULL);
437 g_source_set_priority(source_r, conv_prio_libqb2glib(QB_LOOP_HIGH));
438 g_source_set_priority(source_w, conv_prio_libqb2glib(QB_LOOP_HIGH));
439 g_source_set_can_recurse(source_r, FALSE);
440 g_source_set_can_recurse(source_w, FALSE);
441 g_source_set_callback(source_r, gio_pipe_reader, &global_pipefd[0], NULL);
442 g_source_set_callback(source_w, gio_pipe_writer, &global_pipefd[1], NULL);
443 g_source_add_unix_fd(source_r, global_pipefd[0], G_IO_IN);
444 g_source_add_unix_fd(source_w, global_pipefd[1], G_IO_OUT);
445 g_source_attach(source_r, NULL);
446 g_source_attach(source_w, NULL);
447 #else
448 ck_assert(0);
449 #endif
450 } else {
451 qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[1],
452 POLLOUT|POLLERR, NULL, pipe_writer);
453 qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[0],
454 POLLIN|POLLERR, NULL, pipe_reader);
455 }
456
457 } else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) {
458 exit(0);
459 } else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) {
460 multiple_connections = QB_FALSE;
461 qb_ipcs_disconnect(c);
462 }
463 return 0;
464 }
465
466 static int32_t
467 my_job_add(enum qb_loop_priority p,
468 void *data,
469 qb_loop_job_dispatch_fn fn)
470 {
471 return qb_loop_job_add(my_loop, p, data, fn);
472 }
473
474 static int32_t
475 my_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t events,
476 void *data, qb_ipcs_dispatch_fn_t fn)
477 {
478 return qb_loop_poll_add(my_loop, p, fd, events, data, fn);
479 }
480
481 static int32_t
482 my_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t events,
483 void *data, qb_ipcs_dispatch_fn_t fn)
484 {
485 return qb_loop_poll_mod(my_loop, p, fd, events, data, fn);
486 }
487
488 static int32_t
489 my_dispatch_del(int32_t fd)
490 {
491 return qb_loop_poll_del(my_loop, fd);
492 }
493
494
495 /* taken from examples/ipcserver.c, with s/my_g/gio/ */
496 #ifdef HAVE_GLIB
497
498 #include <qb/qbarray.h>
499
500 static qb_array_t *gio_map;
501 static GMainLoop *glib_loop;
502
503 struct gio_to_qb_poll {
504 int32_t is_used;
505 int32_t events;
506 int32_t source;
507 int32_t fd;
508 void *data;
509 qb_ipcs_dispatch_fn_t fn;
510 enum qb_loop_priority p;
511 };
512
513 static gboolean
514 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
515 {
516 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
517 gint fd = g_io_channel_unix_get_fd(gio);
518
519 qb_enter();
520
521 return (adaptor->fn(fd, condition, adaptor->data) == 0);
522 }
523
524 static void
525 gio_poll_destroy(gpointer data)
526 {
527 struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
528
529 adaptor->is_used--;
530 if (adaptor->is_used == 0) {
531 qb_log(LOG_DEBUG, "fd %d adaptor destroyed\n", adaptor->fd);
532 adaptor->fd = 0;
533 adaptor->source = 0;
534 }
535 }
536
537 static int32_t
538 gio_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
539 void *data, qb_ipcs_dispatch_fn_t fn, gboolean is_new)
540 {
541 struct gio_to_qb_poll *adaptor;
542 GIOChannel *channel;
543 int32_t res = 0;
544
545 qb_enter();
546
547 res = qb_array_index(gio_map, fd, (void **)&adaptor);
548 if (res < 0) {
549 return res;
550 }
551 if (adaptor->is_used && adaptor->source) {
552 if (is_new) {
553 return -EEXIST;
554 }
555 g_source_remove(adaptor->source);
556 adaptor->source = 0;
557 }
558
559 channel = g_io_channel_unix_new(fd);
560 if (!channel) {
561 return -ENOMEM;
562 }
563
564 adaptor->fn = fn;
565 adaptor->events = evts;
566 adaptor->data = data;
567 adaptor->p = p;
568 adaptor->is_used++;
569 adaptor->fd = fd;
570
571 adaptor->source = g_io_add_watch_full(channel, conv_prio_libqb2glib(p),
572 evts, gio_read_socket, adaptor,
573 gio_poll_destroy);
574
575 /* we are handing the channel off to be managed by mainloop now.
576 * remove our reference. */
577 g_io_channel_unref(channel);
578
579 return 0;
580 }
581
582 static int32_t
583 gio_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
584 void *data, qb_ipcs_dispatch_fn_t fn)
585 {
586 return gio_dispatch_update(p, fd, evts, data, fn, TRUE);
587 }
588
589 static int32_t
590 gio_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
591 void *data, qb_ipcs_dispatch_fn_t fn)
592 {
593 return gio_dispatch_update(p, fd, evts, data, fn, FALSE);
594 }
595
596 static int32_t
597 gio_dispatch_del(int32_t fd)
598 {
599 struct gio_to_qb_poll *adaptor;
600 if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
601 g_source_remove(adaptor->source);
602 adaptor->source = 0;
603 }
604 return 0;
605 }
606
607 #endif /* HAVE_GLIB */
608
609
610 static int32_t
611 s1_connection_closed(qb_ipcs_connection_t *c)
612 {
613 if (multiple_connections) {
614 return 0;
615 }
616 /* Stop the connection being freed when we call qb_ipcs_disconnect()
617 in the callback */
618 if (disconnect_after_created == QB_TRUE) {
619 disconnect_after_created = QB_FALSE;
620 return 1;
621 }
622
623 qb_enter();
624 qb_leave();
625 return 0;
626 }
627
628 static void
629 outq_flush (void *data)
630 {
631 static int i = 0;
632 struct cs_ipcs_conn_context *cnx;
633 cnx = qb_ipcs_context_get(data);
634
635 qb_log(LOG_DEBUG,"iter %u\n", i);
636 i++;
637 if (i == 2) {
638 qb_ipcs_destroy(s1);
639 s1 = NULL;
640 }
641 /* if the reference counting is not working, this should fail
642 * for i > 1.
643 */
644 qb_ipcs_event_send(data, "test", 4);
645 assert(memcmp(cnx, "test", 4) == 0);
646 if (i < 5) {
647 qb_loop_job_add(my_loop, QB_LOOP_HIGH, data, outq_flush);
648 } else {
649 /* this single unref should clean everything up.
650 */
651 qb_ipcs_connection_unref(data);
652 qb_log(LOG_INFO, "end of test, stopping loop");
653 qb_loop_stop(my_loop);
654 }
655 }
656
657
658 static void
659 s1_connection_destroyed(qb_ipcs_connection_t *c)
660 {
661 if (multiple_connections) {
662 return;
663 }
664
665 qb_enter();
666 if (reference_count_test) {
667 struct cs_ipcs_conn_context *cnx;
668 cnx = qb_ipcs_context_get(c);
669 free(cnx);
670 } else {
671 qb_loop_stop(my_loop);
672 }
673 qb_leave();
674 }
675
676 static int32_t
677 s1_connection_accept(qb_ipcs_connection_t *c, uid_t uid, gid_t gid)
678 {
679 if (set_perms_on_socket) {
680 qb_ipcs_connection_auth_set(c, 555, 741, S_IRWXU|S_IRWXG|S_IROTH|S_IWOTH);
681 }
682 return 0;
683 }
684
685
686 static void
687 s1_connection_created(qb_ipcs_connection_t *c)
688 {
689 uint32_t max = MAX_MSG_SIZE;
690 if (multiple_connections) {
691 return;
692 }
693
694 if (send_event_on_created) {
695 struct qb_ipc_response_header response;
696 int32_t res;
697
698 response.size = sizeof(struct qb_ipc_response_header);
699 response.id = IPC_MSG_RES_DISPATCH;
700 response.error = 0;
701 res = qb_ipcs_event_send(c, &response,
702 sizeof(response));
703 ck_assert_int_eq(res, response.size);
704 }
705 if (reference_count_test) {
706 struct cs_ipcs_conn_context *context;
707
708 qb_ipcs_connection_ref(c);
709 qb_loop_job_add(my_loop, QB_LOOP_HIGH, c, outq_flush);
710
711 context = calloc(1, 20);
712 memcpy(context, "test", 4);
713 qb_ipcs_context_set(c, context);
714 }
715
716
717 ck_assert_int_le(max, qb_ipcs_connection_get_buffer_size(c));
718
719 }
720
721 static volatile sig_atomic_t usr1_bit;
722
723 static void usr1_bit_setter(int signal) {
724 if (signal == SIGUSR1) {
725 usr1_bit = 1;
726 }
727 }
728
729 #define READY_SIGNALLER(name, data_arg) void (name)(void *data_arg)
730 typedef READY_SIGNALLER(ready_signaller_fn, );
731
732 static
733 READY_SIGNALLER(usr1_signaller, parent_target)
734 {
735 kill(*((pid_t *) parent_target), SIGUSR1);
736 }
737
738 #define NEW_PROCESS_RUNNER(name, ready_signaller_arg, signaller_data_arg, data_arg) \
739 void (name)(ready_signaller_fn ready_signaller_arg, \
740 void *signaller_data_arg, void *data_arg)
741 typedef NEW_PROCESS_RUNNER(new_process_runner_fn, , , );
742
743 static
744 NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data, data)
745 {
746 int32_t res;
747 qb_loop_signal_handle handle;
748
749 struct qb_ipcs_service_handlers sh = {
750 .connection_accept = s1_connection_accept,
751 .connection_created = s1_connection_created,
752 .msg_process = s1_msg_process_fn,
753 .connection_destroyed = s1_connection_destroyed,
754 .connection_closed = s1_connection_closed,
755 };
756
757 struct qb_ipcs_poll_handlers ph;
758 uint32_t max_size = MAX_MSG_SIZE;
759
760 my_loop = qb_loop_create();
761 qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM,
762 NULL, exit_handler, &handle);
763
764
765 s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh);
766 ck_assert(s1 != 0);
767
768 if (global_loop_prio != QB_LOOP_MED) {
769 qb_ipcs_request_rate_limit(s1,
770 conv_libqb_prio2ratelimit(global_loop_prio));
771 }
772 if (global_use_glib) {
773 #ifdef HAVE_GLIB
774 ph = (struct qb_ipcs_poll_handlers) {
775 .job_add = NULL,
776 .dispatch_add = gio_dispatch_add,
777 .dispatch_mod = gio_dispatch_mod,
778 .dispatch_del = gio_dispatch_del,
779 };
780 glib_loop = g_main_loop_new(NULL, FALSE);
781 gio_map = qb_array_create_2(16, sizeof(struct gio_to_qb_poll), 1);
782 ck_assert(gio_map != NULL);
783 #else
784 ck_assert(0);
785 #endif
786 } else {
787 ph = (struct qb_ipcs_poll_handlers) {
788 .job_add = my_job_add,
789 .dispatch_add = my_dispatch_add,
790 .dispatch_mod = my_dispatch_mod,
791 .dispatch_del = my_dispatch_del,
792 };
793 }
794
795 if (enforce_server_buffer) {
796 qb_ipcs_enforce_buffer_size(s1, max_size);
797 }
798 qb_ipcs_poll_handlers_set(s1, &ph);
799
800 res = qb_ipcs_run(s1);
801 ck_assert_int_eq(res, 0);
802
803 if (ready_signaller != NULL) {
804 ready_signaller(signaller_data);
805 }
806
807 if (global_use_glib) {
808 #ifdef HAVE_GLIB
809 g_main_loop_run(glib_loop);
810 #endif
811 } else {
812 qb_loop_run(my_loop);
813 }
814 qb_log(LOG_DEBUG, "loop finished - done ...");
815 }
816
817 static pid_t
818 run_function_in_new_process(const char *role,
819 new_process_runner_fn new_process_runner,
820 void *data)
821 {
822 char formatbuf[1024];
823 pid_t parent_target, pid1, pid2;
824 struct sigaction orig_sa, purpose_sa;
825 sigset_t orig_mask, purpose_mask, purpose_clear_mask;
826
827 sigemptyset(&purpose_mask);
828 sigaddset(&purpose_mask, SIGUSR1);
829
830 sigprocmask(SIG_BLOCK, &purpose_mask, &orig_mask);
831 purpose_clear_mask = orig_mask;
832 sigdelset(&purpose_clear_mask, SIGUSR1);
833
834 purpose_sa.sa_handler = usr1_bit_setter;
835 purpose_sa.sa_mask = purpose_mask;
836 purpose_sa.sa_flags = SA_RESTART;
837
838 /* Double-fork so the servers can be reaped in a timely manner */
839 parent_target = getpid();
840 pid1 = fork();
841 if (pid1 == 0) {
842 pid2 = fork();
843 if (pid2 == -1) {
844 fprintf (stderr, "Can't fork twice\n");
845 exit(0);
846 }
847 if (pid2 == 0) {
848 sigprocmask(SIG_SETMASK, &orig_mask, NULL);
849
850 if (role == NULL) {
851 qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l[%P] %b");
852 } else {
853 snprintf(formatbuf, sizeof(formatbuf),
854 "lib/%%f|%%l|%s[%%P] %%b", role);
855 qb_log_format_set(QB_LOG_STDERR, formatbuf);
856 }
857
858 new_process_runner(usr1_signaller, &parent_target, data);
859 exit(0);
860 } else {
861 waitpid(pid2, NULL, 0);
862 exit(0);
863 }
864 }
865
866 usr1_bit = 0;
867 /* XXX assume never fails */
868 sigaction(SIGUSR1, &purpose_sa, &orig_sa);
869
870 do {
871 /* XXX assume never fails with EFAULT */
872 sigsuspend(&purpose_clear_mask);
873 } while (usr1_bit != 1);
874 usr1_bit = 0;
875 sigprocmask(SIG_SETMASK, &orig_mask, NULL);
876 /* give children a slight/non-strict scheduling advantage */
877 sched_yield();
878
879 return pid1;
880 }
881
882 static void
883 request_server_exit(void)
884 {
885 struct qb_ipc_request_header req_header;
886 struct qb_ipc_response_header res_header;
887 struct iovec iov[1];
888 int32_t res;
889
890 /*
891 * tell the server to exit
892 */
893 req_header.id = IPC_MSG_REQ_SERVER_FAIL;
894 req_header.size = sizeof(struct qb_ipc_request_header);
895
896 iov[0].iov_len = req_header.size;
897 iov[0].iov_base = (void*)&req_header;
898
899 ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn));
900
901 res = qb_ipcc_sendv_recv(conn, iov, 1,
902 &res_header,
903 sizeof(struct qb_ipc_response_header), -1);
904 /*
905 * confirm we get -ENOTCONN or ECONNRESET
906 */
907 if (res != -ECONNRESET && res != -ENOTCONN) {
908 qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
909 ck_assert_int_eq(res, -ENOTCONN);
910 }
911 }
912
913 static void
914 kill_server(pid_t pid)
915 {
916 kill(pid, SIGTERM);
917 waitpid(pid, NULL, 0);
918 }
919
920 static int32_t
921 verify_graceful_stop(pid_t pid)
922 {
923 int wait_rc = 0;
924 int status = 0;
925 int rc = 0;
926 int tries;
927
928 /* We need the server to be able to exit by itself */
929 for (tries = 10; tries >= 0; tries--) {
930 sleep(1);
931 wait_rc = waitpid(pid, &status, WNOHANG);
932 if (wait_rc > 0) {
933 break;
934 }
935 }
936
937 ck_assert_int_eq(wait_rc, pid);
938 rc = WIFEXITED(status);
939 if (rc) {
940 rc = WEXITSTATUS(status);
941 ck_assert_int_eq(rc, 0);
942 } else {
943 ck_assert(rc != 0);
944 }
945
946 return 0;
947 }
948
949 struct my_req {
950 struct qb_ipc_request_header hdr;
951 char message[1024 * 1024];
952 };
953
954 static struct my_req request;
955 static int32_t
956 send_and_check(int32_t req_id, uint32_t size,
957 int32_t ms_timeout, int32_t expect_perfection)
958 {
959 struct qb_ipc_response_header res_header;
960 int32_t res;
961 int32_t try_times = 0;
962 uint32_t max_size = MAX_MSG_SIZE;
963
964 request.hdr.id = req_id;
965 request.hdr.size = sizeof(struct qb_ipc_request_header) + size;
966
967 /* check that we can't send a message that is too big
968 * and we get the right return code.
969 */
970 res = qb_ipcc_send(conn, &request, max_size*2);
971 ck_assert_int_eq(res, -EMSGSIZE);
972
973 repeat_send:
974 res = qb_ipcc_send(conn, &request, request.hdr.size);
975 try_times++;
976 if (res < 0) {
977 if (res == -EAGAIN && try_times < 10) {
978 goto repeat_send;
979 } else {
980 if (res == -EAGAIN && try_times >= 10) {
981 fc_enabled = QB_TRUE;
982 }
983 errno = -res;
984 qb_perror(LOG_INFO, "qb_ipcc_send");
985 return res;
986 }
987 }
988
989 if (req_id == IPC_MSG_REQ_DISPATCH) {
990 res = qb_ipcc_event_recv(conn, &res_header,
991 sizeof(struct qb_ipc_response_header),
992 ms_timeout);
993 } else {
994 res = qb_ipcc_recv(conn, &res_header,
995 sizeof(struct qb_ipc_response_header),
996 ms_timeout);
997 }
998 if (res == -EINTR) {
999 return -1;
1000 }
1001 if (res == -EAGAIN || res == -ETIMEDOUT) {
1002 fc_enabled = QB_TRUE;
1003 qb_perror(LOG_DEBUG, "qb_ipcc_recv");
1004 return res;
1005 }
1006 if (expect_perfection) {
1007 ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
1008 ck_assert_int_eq(res_header.id, req_id + 1);
1009 ck_assert_int_eq(res_header.size, sizeof(struct qb_ipc_response_header));
1010 }
1011 return res;
1012 }
1013
1014
1015 static int32_t
1016 process_async_connect(int32_t fd, int32_t revents, void *data)
1017 {
1018 qb_loop_t *cl = (qb_loop_t *)data;
1019 int res;
1020
1021 res = qb_ipcc_connect_continue(conn);
1022 ck_assert_int_eq(res, 0);
1023 qb_loop_stop(cl);
1024 return 0;
1025 }
1026 static void test_ipc_connect_async(void)
1027 {
1028 struct qb_ipc_request_header req_header;
1029 struct qb_ipc_response_header res_header;
1030 int32_t res;
1031 pid_t pid;
1032 uint32_t max_size = MAX_MSG_SIZE;
1033 int connect_fd;
1034 struct iovec iov[1];
1035 static qb_loop_t *cl;
1036
1037 pid = run_function_in_new_process("server", run_ipc_server, NULL);
1038 ck_assert(pid != -1);
1039
1040 conn = qb_ipcc_connect_async(ipc_name, max_size, &connect_fd);
1041 ck_assert(conn != NULL);
1042
1043 cl = qb_loop_create();
1044 res = qb_loop_poll_add(cl, QB_LOOP_MED,
1045 connect_fd, POLLIN,
1046 cl, process_async_connect);
1047 ck_assert_int_eq(res, 0);
1048 qb_loop_run(cl);
1049
1050 /* Send some data */
1051 req_header.id = IPC_MSG_REQ_TX_RX;
1052 req_header.size = sizeof(struct qb_ipc_request_header);
1053
1054 iov[0].iov_len = req_header.size;
1055 iov[0].iov_base = (void*)&req_header;
1056
1057 res = qb_ipcc_sendv_recv(conn, iov, 1,
1058 &res_header,
1059 sizeof(struct qb_ipc_response_header), 5000);
1060
1061 ck_assert_int_ge(res, 0);
1062
1063 request_server_exit();
1064 verify_graceful_stop(pid);
1065
1066
1067 qb_ipcc_disconnect(conn);
1068 }
1069
1070 static void
1071 test_ipc_txrx_timeout(void)
1072 {
1073 struct qb_ipc_request_header req_header;
1074 struct qb_ipc_response_header res_header;
1075 struct iovec iov[1];
1076 int32_t res;
1077 int32_t c = 0;
1078 int32_t j = 0;
1079 pid_t pid;
1080 uint32_t max_size = MAX_MSG_SIZE;
1081
1082 pid = run_function_in_new_process("server", run_ipc_server, NULL);
1083 ck_assert(pid != -1);
1084
1085 do {
1086 conn = qb_ipcc_connect(ipc_name, max_size);
1087 if (conn == NULL) {
1088 j = waitpid(pid, NULL, WNOHANG);
1089 ck_assert_int_eq(j, 0);
1090 poll(NULL, 0, 400);
1091 c++;
1092 }
1093 } while (conn == NULL && c < 5);
1094 ck_assert(conn != NULL);
1095
1096 /* The dispatch response will only come over
1097 * the event channel, we want to verify the receive times
1098 * out when an event is returned with no response */
1099 req_header.id = IPC_MSG_REQ_DISPATCH;
1100 req_header.size = sizeof(struct qb_ipc_request_header);
1101
1102 iov[0].iov_len = req_header.size;
1103 iov[0].iov_base = (void*)&req_header;
1104
1105 res = qb_ipcc_sendv_recv(conn, iov, 1,
1106 &res_header,
1107 sizeof(struct qb_ipc_response_header), 5000);
1108
1109 ck_assert_int_eq(res, -ETIMEDOUT);
1110
1111 request_server_exit();
1112 verify_graceful_stop(pid);
1113
1114 /*
1115 * this needs to free up the shared mem
1116 */
1117 qb_ipcc_disconnect(conn);
1118 }
1119
1120 static int32_t recv_timeout = -1;
1121 static void
1122 test_ipc_txrx(void)
1123 {
1124 int32_t j;
1125 int32_t c = 0;
1126 size_t size;
1127 pid_t pid;
1128 uint32_t max_size = MAX_MSG_SIZE;
1129
1130 pid = run_function_in_new_process("server", run_ipc_server, NULL);
|
(1) Event cond_true: |
Condition "pid != -1", taking true branch. |
1131 ck_assert(pid != -1);
1132
1133 do {
1134 conn = qb_ipcc_connect(ipc_name, max_size);
|
(2) Event cond_true: |
Condition "conn == NULL", taking true branch. |
1135 if (conn == NULL) {
1136 j = waitpid(pid, NULL, WNOHANG);
|
(3) Event cond_true: |
Condition "_ck_x == _ck_y", taking true branch. |
1137 ck_assert_int_eq(j, 0);
|
(4) Event check_return: |
Calling "poll(NULL, 0UL, 400)" without checking return value. This library function may fail and return an error code. |
1138 poll(NULL, 0, 400);
1139 c++;
1140 }
1141 } while (conn == NULL && c < 5);
1142 ck_assert(conn != NULL);
1143
1144 size = QB_MIN(sizeof(struct qb_ipc_request_header), 64);
1145 for (j = 1; j < 19; j++) {
1146 size *= 2;
1147 if (size >= max_size)
1148 break;
1149 if (send_and_check(IPC_MSG_REQ_TX_RX, size,
1150 recv_timeout, QB_TRUE) < 0) {
1151 break;
1152 }
1153 }
1154 if (turn_on_fc) {
1155 /* can't signal server to shutdown if flow control is on */
1156 ck_assert_int_eq(fc_enabled, QB_TRUE);
1157 qb_ipcc_disconnect(conn);
1158 /* TODO - figure out why this sleep is necessary */
1159 sleep(1);
1160 kill_server(pid);
1161 } else {
1162 request_server_exit();
1163 qb_ipcc_disconnect(conn);
1164 verify_graceful_stop(pid);
1165 }
1166 }
1167
1168 static void
1169 test_ipc_getauth(void)
1170 {
1171 int32_t j;
1172 int32_t c = 0;
1173 pid_t pid;
1174 pid_t spid;
1175 uid_t suid;
1176 gid_t sgid;
1177 int res;
1178 uint32_t max_size = MAX_MSG_SIZE;
1179
1180 pid = run_function_in_new_process("server", run_ipc_server, NULL);
1181 ck_assert(pid != -1);
1182
1183 do {
1184 conn = qb_ipcc_connect(ipc_name, max_size);
1185 if (conn == NULL) {
1186 j = waitpid(pid, NULL, WNOHANG);
1187 ck_assert_int_eq(j, 0);
1188 poll(NULL, 0, 400);
1189 c++;
1190 }
1191 } while (conn == NULL && c < 5);
1192 ck_assert(conn != NULL);
1193
1194 res = qb_ipcc_auth_get(NULL, NULL, NULL, NULL);
1195 ck_assert(res == -EINVAL);
1196
1197 res = qb_ipcc_auth_get(conn, &spid, &suid, &sgid);
1198 ck_assert(res == 0);
1199 #ifndef HAVE_GETPEEREID
1200 /* GETPEEREID doesn't return a PID */
1201 ck_assert(spid != 0);
1202 #endif
1203 ck_assert(suid == getuid());
1204 ck_assert(sgid == getgid());
1205
1206 request_server_exit();
1207 qb_ipcc_disconnect(conn);
1208 verify_graceful_stop(pid);
1209 }
1210
1211 static void
1212 test_ipc_exit(void)
1213 {
1214 struct qb_ipc_request_header req_header;
1215 struct qb_ipc_response_header res_header;
1216 struct iovec iov[1];
1217 int32_t res;
1218 int32_t c = 0;
1219 int32_t j = 0;
1220 pid_t pid;
1221 uint32_t max_size = MAX_MSG_SIZE;
1222
1223 pid = run_function_in_new_process("server", run_ipc_server, NULL);
1224 ck_assert(pid != -1);
1225
1226 do {
1227 conn = qb_ipcc_connect(ipc_name, max_size);
1228 if (conn == NULL) {
1229 j = waitpid(pid, NULL, WNOHANG);
1230 ck_assert_int_eq(j, 0);
1231 poll(NULL, 0, 400);
1232 c++;
1233 }
1234 } while (conn == NULL && c < 5);
1235 ck_assert(conn != NULL);
1236
1237 req_header.id = IPC_MSG_REQ_TX_RX;
1238 req_header.size = sizeof(struct qb_ipc_request_header);
1239
1240 iov[0].iov_len = req_header.size;
1241 iov[0].iov_base = (void*)&req_header;
1242
1243 res = qb_ipcc_sendv_recv(conn, iov, 1,
1244 &res_header,
1245 sizeof(struct qb_ipc_response_header), -1);
1246 ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
1247
1248 request_server_exit();
1249 verify_graceful_stop(pid);
1250
1251 /*
1252 * this needs to free up the shared mem
1253 */
1254 qb_ipcc_disconnect(conn);
1255 }
1256
1257 START_TEST(test_ipc_exit_us)
1258 {
1259 qb_enter();
1260 ipc_type = QB_IPC_SOCKET;
1261 set_ipc_name(__func__);
1262 recv_timeout = 5000;
1263 test_ipc_exit();
1264 qb_leave();
1265 }
1266 END_TEST
1267
1268 START_TEST(test_ipc_exit_shm)
1269 {
1270 qb_enter();
1271 ipc_type = QB_IPC_SHM;
1272 set_ipc_name(__func__);
1273 recv_timeout = 1000;
1274 test_ipc_exit();
1275 qb_leave();
1276 }
1277 END_TEST
1278
1279 START_TEST(test_ipc_txrx_shm_timeout)
1280 {
1281 qb_enter();
1282 ipc_type = QB_IPC_SHM;
1283 set_ipc_name(__func__);
1284 test_ipc_txrx_timeout();
1285 qb_leave();
1286 }
1287 END_TEST
1288
1289
1290 START_TEST(test_ipc_txrx_us_timeout)
1291 {
1292 qb_enter();
1293 ipc_type = QB_IPC_SOCKET;
1294 set_ipc_name(__func__);
1295 test_ipc_txrx_timeout();
1296 qb_leave();
1297 }
1298 END_TEST
1299
1300 START_TEST(test_ipc_shm_connect_async)
1301 {
1302 qb_enter();
1303 ipc_type = QB_IPC_SHM;
1304 set_ipc_name(__func__);
1305 test_ipc_connect_async();
1306 qb_leave();
1307 }
1308 END_TEST
1309
1310 START_TEST(test_ipc_us_connect_async)
1311 {
1312 qb_enter();
1313 ipc_type = QB_IPC_SOCKET;
1314 set_ipc_name(__func__);
1315 test_ipc_connect_async();
1316 qb_leave();
1317 }
1318 END_TEST
1319
1320 START_TEST(test_ipc_txrx_shm_getauth)
1321 {
1322 qb_enter();
1323 ipc_type = QB_IPC_SHM;
1324 set_ipc_name(__func__);
1325 test_ipc_getauth();
1326 qb_leave();
1327 }
1328 END_TEST
1329
1330 START_TEST(test_ipc_txrx_us_getauth)
1331 {
1332 qb_enter();
1333 ipc_type = QB_IPC_SOCKET;
1334 set_ipc_name(__func__);
1335 test_ipc_getauth();
1336 qb_leave();
1337 }
1338 END_TEST
1339
1340 START_TEST(test_ipc_txrx_shm_tmo)
1341 {
1342 qb_enter();
1343 turn_on_fc = QB_FALSE;
1344 ipc_type = QB_IPC_SHM;
1345 set_ipc_name(__func__);
1346 recv_timeout = 1000;
1347 test_ipc_txrx();
1348 qb_leave();
1349 }
1350 END_TEST
1351
1352 START_TEST(test_ipc_txrx_shm_block)
1353 {
1354 qb_enter();
1355 turn_on_fc = QB_FALSE;
1356 ipc_type = QB_IPC_SHM;
1357 set_ipc_name(__func__);
1358 recv_timeout = -1;
1359 test_ipc_txrx();
1360 qb_leave();
1361 }
1362 END_TEST
1363
1364 START_TEST(test_ipc_fc_shm)
1365 {
1366 qb_enter();
1367 turn_on_fc = QB_TRUE;
1368 ipc_type = QB_IPC_SHM;
1369 recv_timeout = 500;
1370 set_ipc_name(__func__);
1371 test_ipc_txrx();
1372 qb_leave();
1373 }
1374 END_TEST
1375
1376 START_TEST(test_ipc_txrx_us_block)
1377 {
1378 qb_enter();
1379 turn_on_fc = QB_FALSE;
1380 ipc_type = QB_IPC_SOCKET;
1381 set_ipc_name(__func__);
1382 recv_timeout = -1;
1383 test_ipc_txrx();
1384 qb_leave();
1385 }
1386 END_TEST
1387
1388 START_TEST(test_ipc_txrx_us_tmo)
1389 {
1390 qb_enter();
1391 turn_on_fc = QB_FALSE;
1392 ipc_type = QB_IPC_SOCKET;
1393 set_ipc_name(__func__);
1394 recv_timeout = 1000;
1395 test_ipc_txrx();
1396 qb_leave();
1397 }
1398 END_TEST
1399
1400 START_TEST(test_ipc_fc_us)
1401 {
1402 qb_enter();
1403 turn_on_fc = QB_TRUE;
1404 ipc_type = QB_IPC_SOCKET;
1405 recv_timeout = 500;
1406 set_ipc_name(__func__);
1407 test_ipc_txrx();
1408 qb_leave();
1409 }
1410 END_TEST
1411
1412 struct my_res {
1413 struct qb_ipc_response_header hdr;
1414 char message[1024 * 1024];
1415 };
1416
1417 struct dispatch_data {
1418 pid_t server_pid;
1419 enum my_msg_ids msg_type;
1420 uint32_t repetitions;
1421 };
1422
1423 static inline
1424 NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data, data)
1425 {
1426 uint32_t max_size = MAX_MSG_SIZE;
1427 int32_t size;
1428 int32_t c = 0;
1429 int32_t j;
1430 pid_t server_pid = ((struct dispatch_data *) data)->server_pid;
1431 enum my_msg_ids msg_type = ((struct dispatch_data *) data)->msg_type;
1432
1433 do {
1434 conn = qb_ipcc_connect(ipc_name, max_size);
1435 if (conn == NULL) {
1436 j = waitpid(server_pid, NULL, WNOHANG);
1437 ck_assert_int_eq(j, 0);
1438 poll(NULL, 0, 400);
1439 c++;
1440 }
1441 } while (conn == NULL && c < 5);
1442 ck_assert(conn != NULL);
1443
1444 if (ready_signaller != NULL) {
1445 ready_signaller(signaller_data);
1446 }
1447
1448 size = QB_MIN(sizeof(struct qb_ipc_request_header), 64);
1449
1450 for (uint32_t r = ((struct dispatch_data *) data)->repetitions;
1451 r > 0; r--) {
1452 for (j = 1; j < 19; j++) {
1453 size *= 2;
1454 if (size >= max_size)
1455 break;
1456 if (send_and_check(msg_type, size,
1457 recv_timeout, QB_TRUE) < 0) {
1458 break;
1459 }
1460 }
1461 }
1462 }
1463
1464 static void
1465 test_ipc_dispatch(void)
1466 {
1467 pid_t pid;
1468 struct dispatch_data data;
1469
1470 pid = run_function_in_new_process(NULL, run_ipc_server, NULL);
1471 ck_assert(pid != -1);
1472 data = (struct dispatch_data){.server_pid = pid,
1473 .msg_type = IPC_MSG_REQ_DISPATCH,
1474 .repetitions = 1};
1475
1476 client_dispatch(NULL, NULL, (void *) &data);
1477
1478 request_server_exit();
1479 qb_ipcc_disconnect(conn);
1480 verify_graceful_stop(pid);
1481 }
1482
1483 START_TEST(test_ipc_dispatch_us)
1484 {
1485 qb_enter();
1486 ipc_type = QB_IPC_SOCKET;
1487 set_ipc_name(__func__);
1488 test_ipc_dispatch();
1489 qb_leave();
1490 }
1491 END_TEST
1492
1493 static int32_t events_received;
1494
1495 static int32_t
1496 count_stress_events(int32_t fd, int32_t revents, void *data)
1497 {
1498 struct {
1499 struct qb_ipc_response_header hdr __attribute__ ((aligned(8)));
1500 char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
1501 uint32_t sent_msgs __attribute__ ((aligned(8)));
1502 } __attribute__ ((aligned(8))) giant_event_recv;
1503 qb_loop_t *cl = (qb_loop_t*)data;
1504 int32_t res;
1505
1506 res = qb_ipcc_event_recv(conn, &giant_event_recv,
1507 sizeof(giant_event_recv),
1508 -1);
1509 if (res > 0) {
1510 events_received++;
1511
1512 if ((events_received % 1000) == 0) {
1513 qb_log(LOG_DEBUG, "RECV: %d stress events processed", events_received);
1514 if (res != sizeof(giant_event_recv)) {
1515 qb_log(LOG_DEBUG, "Unexpected recv size, expected %d got %d",
1516 sizeof(giant_event_recv), res);
1517
1518 ck_assert_int_eq(res, sizeof(giant_event_recv));
1519 } else if (giant_event_recv.sent_msgs != events_received) {
1520 qb_log(LOG_DEBUG, "Server event mismatch. Server thinks we got %d msgs, but we only received %d",
1521 giant_event_recv.sent_msgs, events_received);
1522 /* This indicates that data corruption is occurring. Since the events
1523 * received is placed at the end of the giant msg, it is possible
1524 * that buffers were not allocated correctly resulting in us
1525 * reading/writing to uninitialized memeory at some point. */
1526 ck_assert_int_eq(giant_event_recv.sent_msgs, events_received);
1527 }
1528 }
1529 } else if (res != -EAGAIN) {
1530 qb_perror(LOG_DEBUG, "count_stress_events");
1531 qb_loop_stop(cl);
1532 return -1;
1533 }
1534
1535 if (events_received >= num_stress_events) {
1536 qb_loop_stop(cl);
1537 return -1;
1538 }
1539 return 0;
1540 }
1541
1542 static int32_t
1543 count_bulk_events(int32_t fd, int32_t revents, void *data)
1544 {
1545 qb_loop_t *cl = (qb_loop_t*)data;
1546 struct qb_ipc_response_header res_header;
1547 int32_t res;
1548
1549 res = qb_ipcc_event_recv(conn, &res_header,
1550 sizeof(struct qb_ipc_response_header),
1551 -1);
1552 if (res > 0) {
1553 events_received++;
1554 }
1555
1556 if (events_received >= num_bulk_events) {
1557 qb_loop_stop(cl);
1558 return -1;
1559 }
1560 return 0;
1561 }
1562
1563 static void
1564 test_ipc_stress_connections(void)
1565 {
1566 int32_t c = 0;
1567 int32_t j = 0;
1568 uint32_t max_size = MAX_MSG_SIZE;
1569 int32_t connections = 0;
1570 pid_t pid;
1571
1572 multiple_connections = QB_TRUE;
1573
1574 qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_CLEAR_ALL,
1575 QB_LOG_FILTER_FILE, "*", LOG_TRACE);
1576 qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
1577 QB_LOG_FILTER_FILE, "*", LOG_INFO);
1578 qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
1579
1580 pid = run_function_in_new_process("server", run_ipc_server, NULL);
1581 ck_assert(pid != -1);
1582
1583 for (connections = 1; connections < NUM_STRESS_CONNECTIONS; connections++) {
1584 if (conn) {
1585 qb_ipcc_disconnect(conn);
1586 conn = NULL;
1587 }
1588 do {
1589 conn = qb_ipcc_connect(ipc_name, max_size);
1590 if (conn == NULL) {
1591 j = waitpid(pid, NULL, WNOHANG);
1592 ck_assert_int_eq(j, 0);
1593 sleep(1);
1594 c++;
1595 }
1596 } while (conn == NULL && c < 5);
1597 ck_assert(conn != NULL);
1598
1599 if (((connections+1) % 1000) == 0) {
1600 qb_log(LOG_INFO, "%d ipc connections made", connections+1);
1601 }
1602 }
1603 multiple_connections = QB_FALSE;
1604
1605 /* Re-enable logging here so we get the "Free'ing" message which allows
1606 for resources.test to clear up after us if needed */
1607 qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_CLEAR_ALL,
1608 QB_LOG_FILTER_FILE, "*", LOG_TRACE);
1609 qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
1610 QB_LOG_FILTER_FILE, "*", LOG_TRACE);
1611 qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
1612
1613 request_server_exit();
1614 qb_ipcc_disconnect(conn);
1615 verify_graceful_stop(pid);
1616
1617 }
1618
1619 static void
1620 test_ipc_bulk_events(void)
1621 {
1622 int32_t c = 0;
1623 int32_t j = 0;
1624 pid_t pid;
1625 int32_t res;
1626 qb_loop_t *cl;
1627 int32_t fd;
1628 uint32_t max_size = MAX_MSG_SIZE;
1629
1630 pid = run_function_in_new_process("server", run_ipc_server, NULL);
1631 ck_assert(pid != -1);
1632
1633 do {
1634 conn = qb_ipcc_connect(ipc_name, max_size);
1635 if (conn == NULL) {
1636 j = waitpid(pid, NULL, WNOHANG);
1637 ck_assert_int_eq(j, 0);
1638 poll(NULL, 0, 400);
1639 c++;
1640 }
1641 } while (conn == NULL && c < 5);
1642 ck_assert(conn != NULL);
1643
1644 events_received = 0;
1645 cl = qb_loop_create();
1646 res = qb_ipcc_fd_get(conn, &fd);
1647 ck_assert_int_eq(res, 0);
1648 res = qb_loop_poll_add(cl, QB_LOOP_MED,
1649 fd, POLLIN,
1650 cl, count_bulk_events);
1651 ck_assert_int_eq(res, 0);
1652
1653 res = send_and_check(IPC_MSG_REQ_BULK_EVENTS,
1654 0,
1655 recv_timeout, QB_TRUE);
1656 ck_assert_int_eq(res, sizeof(struct qb_ipc_response_header));
1657
1658 qb_loop_run(cl);
1659 ck_assert_int_eq(events_received, num_bulk_events);
1660
1661 request_server_exit();
1662 qb_ipcc_disconnect(conn);
1663 verify_graceful_stop(pid);
1664 }
1665
1666 static void
1667 test_ipc_stress_test(void)
1668 {
1669 struct {
1670 struct qb_ipc_request_header hdr __attribute__ ((aligned(8)));
1671 char data[GIANT_MSG_DATA_SIZE] __attribute__ ((aligned(8)));
1672 uint32_t sent_msgs __attribute__ ((aligned(8)));
1673 } __attribute__ ((aligned(8))) giant_req;
1674
1675 struct qb_ipc_response_header res_header;
1676 struct iovec iov[1];
1677 int32_t c = 0;
1678 int32_t j = 0;
1679 pid_t pid;
1680 int32_t res;
1681 qb_loop_t *cl;
1682 int32_t fd;
1683 uint32_t max_size = MAX_MSG_SIZE;
1684 /* This looks strange, but it serves an important purpose.
1685 * This test forces the server to enforce the MAX_MSG_SIZE
1686 * limit from the server side, which overrides the client's
1687 * buffer limit. To verify this functionality is working
1688 * we set the client limit lower than what the server
1689 * is enforcing. */
1690 int32_t client_buf_size = max_size - 1024;
1691 int32_t real_buf_size;
1692
1693 enforce_server_buffer = 1;
1694 pid = run_function_in_new_process("server", run_ipc_server, NULL);
1695 enforce_server_buffer = 0;
1696 ck_assert(pid != -1);
1697
1698 do {
1699 conn = qb_ipcc_connect(ipc_name, client_buf_size);
1700 if (conn == NULL) {
1701 j = waitpid(pid, NULL, WNOHANG);
1702 ck_assert_int_eq(j, 0);
1703 poll(NULL, 0, 400);
1704 c++;
1705 }
1706 } while (conn == NULL && c < 5);
1707 ck_assert(conn != NULL);
1708
1709 real_buf_size = qb_ipcc_get_buffer_size(conn);
1710 ck_assert_int_ge(real_buf_size, max_size);
1711
1712 qb_log(LOG_DEBUG, "Testing %d iterations of EVENT msg passing.", num_stress_events);
1713
1714 events_received = 0;
1715 cl = qb_loop_create();
1716 res = qb_ipcc_fd_get(conn, &fd);
1717 ck_assert_int_eq(res, 0);
1718 res = qb_loop_poll_add(cl, QB_LOOP_MED,
1719 fd, POLLIN,
1720 cl, count_stress_events);
1721 ck_assert_int_eq(res, 0);
1722
1723 res = send_and_check(IPC_MSG_REQ_STRESS_EVENT, 0, recv_timeout, QB_TRUE);
1724
1725 qb_loop_run(cl);
1726 ck_assert_int_eq(events_received, num_stress_events);
1727
1728 giant_req.hdr.id = IPC_MSG_REQ_SERVER_FAIL;
1729 giant_req.hdr.size = sizeof(giant_req);
1730
1731 if (giant_req.hdr.size <= client_buf_size) {
1732 ck_assert_int_eq(1, 0);
1733 }
1734
1735 iov[0].iov_len = giant_req.hdr.size;
1736 iov[0].iov_base = (void*)&giant_req;
1737 res = qb_ipcc_sendv_recv(conn, iov, 1,
1738 &res_header,
1739 sizeof(struct qb_ipc_response_header), -1);
1740 if (res != -ECONNRESET && res != -ENOTCONN) {
1741 qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
1742 ck_assert_int_eq(res, -ENOTCONN);
1743 }
1744
1745 qb_ipcc_disconnect(conn);
1746 verify_graceful_stop(pid);
1747 }
1748
1749 #ifndef __clang__ /* see variable length array in structure' at the top */
1750 START_TEST(test_ipc_stress_test_us)
1751 {
1752 qb_enter();
1753 send_event_on_created = QB_FALSE;
1754 ipc_type = QB_IPC_SOCKET;
1755 set_ipc_name(__func__);
1756 test_ipc_stress_test();
1757 qb_leave();
1758 }
1759 END_TEST
1760 #endif
1761
1762 START_TEST(test_ipc_stress_connections_us)
1763 {
1764 qb_enter();
1765 ipc_type = QB_IPC_SOCKET;
1766 set_ipc_name(__func__);
1767 test_ipc_stress_connections();
1768 qb_leave();
1769 }
1770 END_TEST
1771
1772 START_TEST(test_ipc_bulk_events_us)
1773 {
1774 qb_enter();
1775 ipc_type = QB_IPC_SOCKET;
1776 set_ipc_name(__func__);
1777 test_ipc_bulk_events();
1778 qb_leave();
1779 }
1780 END_TEST
1781
1782 static
1783 READY_SIGNALLER(connected_signaller, _)
1784 {
1785 request_server_exit();
1786 }
1787
1788 START_TEST(test_ipc_us_native_prio_dlock)
1789 {
1790 pid_t server_pid, alphaclient_pid;
1791 struct dispatch_data data;
1792
1793 qb_enter();
1794 ipc_type = QB_IPC_SOCKET;
1795 set_ipc_name(__func__);
1796
1797 /* this is to demonstrate that native event loop can deal even
1798 with "extreme" priority disproportions */
1799 global_loop_prio = QB_LOOP_LOW;
1800 multiple_connections = QB_TRUE;
1801 recv_timeout = -1;
1802
1803 server_pid = run_function_in_new_process("server", run_ipc_server,
1804 NULL);
1805 ck_assert(server_pid != -1);
1806 data = (struct dispatch_data){.server_pid = server_pid,
1807 .msg_type = IPC_MSG_REQ_SELF_FEED,
1808 .repetitions = 1};
1809 alphaclient_pid = run_function_in_new_process("alphaclient",
1810 client_dispatch,
1811 (void *) &data);
1812 ck_assert(alphaclient_pid != -1);
1813
1814 //sleep(1);
1815 sched_yield();
1816
1817 data.repetitions = 0;
1818 client_dispatch(connected_signaller, NULL, (void *) &data);
1819 verify_graceful_stop(server_pid);
1820
1821 multiple_connections = QB_FALSE;
1822 qb_leave();
1823 }
1824 END_TEST
1825
1826 #if HAVE_GLIB
1827 START_TEST(test_ipc_us_glib_prio_dlock)
1828 {
1829 pid_t server_pid, alphaclient_pid;
1830 struct dispatch_data data;
1831
1832 qb_enter();
1833 ipc_type = QB_IPC_SOCKET;
1834 set_ipc_name(__func__);
1835
1836 global_use_glib = QB_TRUE;
1837 /* this is to make the test pass at all, since GLib is strict
1838 on priorities -- QB_LOOP_MED or lower would fail for sure */
1839 global_loop_prio = QB_LOOP_HIGH;
1840 multiple_connections = QB_TRUE;
1841 recv_timeout = -1;
1842
1843 server_pid = run_function_in_new_process("server", run_ipc_server,
1844 NULL);
1845 ck_assert(server_pid != -1);
1846 data = (struct dispatch_data){.server_pid = server_pid,
1847 .msg_type = IPC_MSG_REQ_SELF_FEED,
1848 .repetitions = 1};
1849 alphaclient_pid = run_function_in_new_process("alphaclient",
1850 client_dispatch,
1851 (void *) &data);
1852 ck_assert(alphaclient_pid != -1);
1853
1854 //sleep(1);
1855 sched_yield();
1856
1857 data.repetitions = 0;
1858 client_dispatch(connected_signaller, NULL, (void *) &data);
1859 verify_graceful_stop(server_pid);
1860
1861 multiple_connections = QB_FALSE;
1862 global_loop_prio = QB_LOOP_MED;
1863 global_use_glib = QB_FALSE;
1864 qb_leave();
1865 }
1866 END_TEST
1867 #endif
1868
1869 static void
1870 test_ipc_event_on_created(void)
1871 {
1872 int32_t c = 0;
1873 int32_t j = 0;
1874 pid_t pid;
1875 int32_t res;
1876 qb_loop_t *cl;
1877 int32_t fd;
1878 uint32_t max_size = MAX_MSG_SIZE;
1879
1880 num_bulk_events = 1;
1881
1882 pid = run_function_in_new_process("server", run_ipc_server, NULL);
1883 ck_assert(pid != -1);
1884
1885 do {
1886 conn = qb_ipcc_connect(ipc_name, max_size);
1887 if (conn == NULL) {
1888 j = waitpid(pid, NULL, WNOHANG);
1889 ck_assert_int_eq(j, 0);
1890 poll(NULL, 0, 400);
1891 c++;
1892 }
1893 } while (conn == NULL && c < 5);
1894 ck_assert(conn != NULL);
1895
1896 events_received = 0;
1897 cl = qb_loop_create();
1898 res = qb_ipcc_fd_get(conn, &fd);
1899 ck_assert_int_eq(res, 0);
1900 res = qb_loop_poll_add(cl, QB_LOOP_MED,
1901 fd, POLLIN,
1902 cl, count_bulk_events);
1903 ck_assert_int_eq(res, 0);
1904
1905 qb_loop_run(cl);
1906 ck_assert_int_eq(events_received, num_bulk_events);
1907
1908 request_server_exit();
1909 qb_ipcc_disconnect(conn);
1910 verify_graceful_stop(pid);
1911 }
1912
1913 START_TEST(test_ipc_event_on_created_us)
1914 {
1915 qb_enter();
1916 send_event_on_created = QB_TRUE;
1917 ipc_type = QB_IPC_SOCKET;
1918 set_ipc_name(__func__);
1919 test_ipc_event_on_created();
1920 qb_leave();
1921 }
1922 END_TEST
1923
1924 static void
1925 test_ipc_disconnect_after_created(void)
1926 {
1927 struct qb_ipc_request_header req_header;
1928 struct qb_ipc_response_header res_header;
1929 struct iovec iov[1];
1930 int32_t c = 0;
1931 int32_t j = 0;
1932 pid_t pid;
1933 int32_t res;
1934 uint32_t max_size = MAX_MSG_SIZE;
1935
1936 pid = run_function_in_new_process("server", run_ipc_server, NULL);
1937 ck_assert(pid != -1);
1938
1939 do {
1940 conn = qb_ipcc_connect(ipc_name, max_size);
1941 if (conn == NULL) {
1942 j = waitpid(pid, NULL, WNOHANG);
1943 ck_assert_int_eq(j, 0);
1944 poll(NULL, 0, 400);
1945 c++;
1946 }
1947 } while (conn == NULL && c < 5);
1948 ck_assert(conn != NULL);
1949
1950 ck_assert_int_eq(QB_TRUE, qb_ipcc_is_connected(conn));
1951
1952 req_header.id = IPC_MSG_REQ_SERVER_DISCONNECT;
1953 req_header.size = sizeof(struct qb_ipc_request_header);
1954
1955 iov[0].iov_len = req_header.size;
1956 iov[0].iov_base = (void*)&req_header;
1957
1958 res = qb_ipcc_sendv_recv(conn, iov, 1,
1959 &res_header,
1960 sizeof(struct qb_ipc_response_header), -1);
1961 /*
1962 * confirm we get -ENOTCONN or -ECONNRESET
1963 */
1964 if (res != -ECONNRESET && res != -ENOTCONN) {
1965 qb_log(LOG_ERR, "id:%d size:%d", res_header.id, res_header.size);
1966 ck_assert_int_eq(res, -ENOTCONN);
1967 }
1968 ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn));
1969 qb_ipcc_disconnect(conn);
1970 sleep(1); /* Give it time to stop */
1971 kill_server(pid);
1972 }
1973
1974 START_TEST(test_ipc_disconnect_after_created_us)
1975 {
1976 qb_enter();
1977 disconnect_after_created = QB_TRUE;
1978 ipc_type = QB_IPC_SOCKET;
1979 set_ipc_name(__func__);
1980 test_ipc_disconnect_after_created();
1981 qb_leave();
1982 }
1983 END_TEST
1984
1985 static void
1986 test_ipc_server_fail(void)
1987 {
1988 int32_t j;
1989 int32_t c = 0;
1990 pid_t pid;
1991 uint32_t max_size = MAX_MSG_SIZE;
1992
1993 pid = run_function_in_new_process("server", run_ipc_server, NULL);
1994 ck_assert(pid != -1);
1995
1996 do {
1997 conn = qb_ipcc_connect(ipc_name, max_size);
1998 if (conn == NULL) {
1999 j = waitpid(pid, NULL, WNOHANG);
2000 ck_assert_int_eq(j, 0);
2001 poll(NULL, 0, 400);
2002 c++;
2003 }
2004 } while (conn == NULL && c < 5);
2005 ck_assert(conn != NULL);
2006
2007 request_server_exit();
2008 if (_fi_unlink_inject_failure == QB_TRUE) {
2009 _fi_truncate_called = _fi_openat_called = 0;
2010 }
2011 ck_assert_int_eq(QB_FALSE, qb_ipcc_is_connected(conn));
2012 qb_ipcc_disconnect(conn);
2013 if (_fi_unlink_inject_failure == QB_TRUE) {
2014 ck_assert_int_ne(_fi_truncate_called + _fi_openat_called, 0);
2015 }
2016 verify_graceful_stop(pid);
2017 }
2018
2019 START_TEST(test_ipc_server_fail_soc)
2020 {
2021 qb_enter();
2022 ipc_type = QB_IPC_SOCKET;
2023 set_ipc_name(__func__);
2024 test_ipc_server_fail();
2025 qb_leave();
2026 }
2027 END_TEST
2028
2029 START_TEST(test_ipc_dispatch_shm)
2030 {
2031 qb_enter();
2032 ipc_type = QB_IPC_SHM;
2033 set_ipc_name(__func__);
2034 test_ipc_dispatch();
2035 qb_leave();
2036 }
2037 END_TEST
2038
2039 START_TEST(test_ipc_stress_test_shm)
2040 {
2041 qb_enter();
2042 send_event_on_created = QB_FALSE;
2043 ipc_type = QB_IPC_SHM;
2044 set_ipc_name(__func__);
2045 test_ipc_stress_test();
2046 qb_leave();
2047 }
2048 END_TEST
2049
2050 START_TEST(test_ipc_stress_connections_shm)
2051 {
2052 qb_enter();
2053 ipc_type = QB_IPC_SHM;
2054 set_ipc_name(__func__);
2055 test_ipc_stress_connections();
2056 qb_leave();
2057 }
2058 END_TEST
2059
2060 // Check perms uses illegal access to libqb internals
2061 // DO NOT try this at home.
2062 #include "../lib/ipc_int.h"
2063 #include "../lib/ringbuffer_int.h"
2064 START_TEST(test_ipc_server_perms)
2065 {
2066 pid_t pid;
2067 struct stat st;
2068 int j;
2069 uint32_t max_size;
2070 int res;
2071 int c = 0;
2072
2073 // Can only test this if we are root
2074 if (getuid() != 0) {
2075 return;
2076 }
2077
2078 ipc_type = QB_IPC_SHM;
2079 set_perms_on_socket = QB_TRUE;
2080 max_size = MAX_MSG_SIZE;
2081
2082 pid = run_function_in_new_process("server", run_ipc_server, NULL);
2083 ck_assert(pid != -1);
2084
2085 do {
2086 conn = qb_ipcc_connect(ipc_name, max_size);
2087 if (conn == NULL) {
2088 j = waitpid(pid, NULL, WNOHANG);
2089 ck_assert_int_eq(j, 0);
2090 poll(NULL, 0, 400);
2091 c++;
2092 }
2093 } while (conn == NULL && c < 5);
2094 ck_assert(conn != NULL);
2095
2096 /* Check perms - uses illegal access to libqb internals */
2097
2098 /* BSD uses /var/run for sockets so we can't alter the perms on the
2099 directory */
2100 #ifdef __linux__
2101 char sockdir[PATH_MAX];
2102 strcpy(sockdir, conn->request.u.shm.rb->shared_hdr->hdr_path);
2103 *strrchr(sockdir, '/') = 0;
2104
2105 res = stat(sockdir, &st);
2106
2107 ck_assert_int_eq(res, 0);
2108 ck_assert(st.st_mode & S_IRWXG);
2109 ck_assert_int_eq(st.st_uid, 555);
2110 ck_assert_int_eq(st.st_gid, 741);
2111 #endif
2112
2113 res = stat(conn->request.u.shm.rb->shared_hdr->hdr_path, &st);
2114 ck_assert_int_eq(res, 0);
2115 ck_assert_int_eq(st.st_uid, 555);
2116 ck_assert_int_eq(st.st_gid, 741);
2117
2118 qb_ipcc_disconnect(conn);
2119 verify_graceful_stop(pid);
2120 }
2121 END_TEST
2122
2123 START_TEST(test_ipc_disp_shm_native_prio_dlock)
2124 {
2125 pid_t server_pid, alphaclient_pid;
2126 struct dispatch_data data;
2127
2128 qb_enter();
2129 ipc_type = QB_IPC_SHM;
2130 set_ipc_name(__func__);
2131
2132 /* this is to demonstrate that native event loop can deal even
2133 with "extreme" priority disproportions */
2134 global_loop_prio = QB_LOOP_LOW;
2135 multiple_connections = QB_TRUE;
2136 recv_timeout = -1;
2137
2138 server_pid = run_function_in_new_process("server", run_ipc_server,
2139 NULL);
2140 ck_assert(server_pid != -1);
2141 data = (struct dispatch_data){.server_pid = server_pid,
2142 .msg_type = IPC_MSG_REQ_SELF_FEED,
2143 .repetitions = 1};
2144 alphaclient_pid = run_function_in_new_process("alphaclient",
2145 client_dispatch,
2146 (void *) &data);
2147 ck_assert(alphaclient_pid != -1);
2148
2149 //sleep(1);
2150 sched_yield();
2151
2152 data.repetitions = 0;
2153 client_dispatch(connected_signaller, NULL, (void *) &data);
2154 verify_graceful_stop(server_pid);
2155
2156 multiple_connections = QB_FALSE;
2157 qb_leave();
2158 }
2159 END_TEST
2160
2161 #if HAVE_GLIB
2162 START_TEST(test_ipc_disp_shm_glib_prio_dlock)
2163 {
2164 pid_t server_pid, alphaclient_pid;
2165 struct dispatch_data data;
2166
2167 qb_enter();
2168 ipc_type = QB_IPC_SOCKET;
2169 set_ipc_name(__func__);
2170
2171 global_use_glib = QB_TRUE;
2172 /* this is to make the test pass at all, since GLib is strict
2173 on priorities -- QB_LOOP_MED or lower would fail for sure */
2174 global_loop_prio = QB_LOOP_HIGH;
2175 multiple_connections = QB_TRUE;
2176 recv_timeout = -1;
2177
2178 server_pid = run_function_in_new_process("server", run_ipc_server,
2179 NULL);
2180 ck_assert(server_pid != -1);
2181 data = (struct dispatch_data){.server_pid = server_pid,
2182 .msg_type = IPC_MSG_REQ_SELF_FEED,
2183 .repetitions = 1};
2184 alphaclient_pid = run_function_in_new_process("alphaclient",
2185 client_dispatch,
2186 (void *) &data);
2187 ck_assert(alphaclient_pid != -1);
2188
2189 //sleep(1);
2190 sched_yield();
2191
2192 data.repetitions = 0;
2193 client_dispatch(connected_signaller, NULL, (void *) &data);
2194 verify_graceful_stop(server_pid);
2195
2196 multiple_connections = QB_FALSE;
2197 global_loop_prio = QB_LOOP_MED;
2198 global_use_glib = QB_FALSE;
2199 qb_leave();
2200 }
2201 END_TEST
2202 #endif
2203
2204 START_TEST(test_ipc_bulk_events_shm)
2205 {
2206 qb_enter();
2207 ipc_type = QB_IPC_SHM;
2208 set_ipc_name(__func__);
2209 test_ipc_bulk_events();
2210 qb_leave();
2211 }
2212 END_TEST
2213
2214 START_TEST(test_ipc_event_on_created_shm)
2215 {
2216 qb_enter();
2217 send_event_on_created = QB_TRUE;
2218 ipc_type = QB_IPC_SHM;
2219 set_ipc_name(__func__);
2220 test_ipc_event_on_created();
2221 qb_leave();
2222 }
2223 END_TEST
2224
2225 START_TEST(test_ipc_server_fail_shm)
2226 {
2227 qb_enter();
2228 ipc_type = QB_IPC_SHM;
2229 set_ipc_name(__func__);
2230 test_ipc_server_fail();
2231 qb_leave();
2232 }
2233 END_TEST
2234
2235 #ifdef HAVE_FAILURE_INJECTION
2236 START_TEST(test_ipcc_truncate_when_unlink_fails_shm)
2237 {
2238 char sock_file[PATH_MAX];
2239 struct sockaddr_un socka;
2240
2241 qb_enter();
2242 ipc_type = QB_IPC_SHM;
2243 set_ipc_name(__func__);
2244
2245 sprintf(sock_file, "%s/%s", SOCKETDIR, ipc_name);
2246 sock_file[sizeof(socka.sun_path)] = '\0';
2247
2248 /* If there's an old socket left from a previous run this test will fail
2249 unexpectedly, so try to remove it first */
2250 unlink(sock_file);
2251
2252 _fi_unlink_inject_failure = QB_TRUE;
2253 test_ipc_server_fail();
2254 _fi_unlink_inject_failure = QB_FALSE;
2255 unlink(sock_file);
2256 qb_leave();
2257 }
2258 END_TEST
2259 #endif
2260
2261 static void
2262 test_ipc_service_ref_count(void)
2263 {
2264 int32_t c = 0;
2265 int32_t j = 0;
2266 pid_t pid;
2267 uint32_t max_size = MAX_MSG_SIZE;
2268
2269 reference_count_test = QB_TRUE;
2270
2271 pid = run_function_in_new_process("server", run_ipc_server, NULL);
2272 ck_assert(pid != -1);
2273
2274 do {
2275 conn = qb_ipcc_connect(ipc_name, max_size);
2276 if (conn == NULL) {
2277 j = waitpid(pid, NULL, WNOHANG);
2278 ck_assert_int_eq(j, 0);
2279 (void)poll(NULL, 0, 400);
2280 c++;
2281 }
2282 } while (conn == NULL && c < 5);
2283 ck_assert(conn != NULL);
2284
2285 sleep(5);
2286
2287 kill_server(pid);
2288 }
2289
2290
2291 START_TEST(test_ipc_service_ref_count_shm)
2292 {
2293 qb_enter();
2294 ipc_type = QB_IPC_SHM;
2295 set_ipc_name(__func__);
2296 test_ipc_service_ref_count();
2297 qb_leave();
2298 }
2299 END_TEST
2300
2301 START_TEST(test_ipc_service_ref_count_us)
2302 {
2303 qb_enter();
2304 ipc_type = QB_IPC_SOCKET;
2305 set_ipc_name(__func__);
2306 test_ipc_service_ref_count();
2307 qb_leave();
2308 }
2309 END_TEST
2310
2311 #if 0
2312 static void test_max_dgram_size(void)
2313 {
2314 /* most implementations will not let you set a dgram buffer
2315 * of 1 million bytes. This test verifies that the we can detect
2316 * the max dgram buffersize regardless, and that the value we detect
2317 * is consistent. */
2318 int32_t init;
2319 int32_t i;
2320
2321 qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_REMOVE,
2322 QB_LOG_FILTER_FILE, "*", LOG_TRACE);
2323
2324 init = qb_ipcc_verify_dgram_max_msg_size(1000000);
2325 ck_assert(init > 0);
2326 for (i = 0; i < 100; i++) {
2327 int try = qb_ipcc_verify_dgram_max_msg_size(1000000);
2328 #if 0
2329 ck_assert_int_eq(init, try);
2330 #else
2331 /* extra troubleshooting, report also on i and errno variables;
2332 related: https://github.com/ClusterLabs/libqb/issues/234 */
2333 if (init != try) {
2334 #ifdef ci_dump_shm_usage
2335 system("df -h | grep -e /shm >/tmp/_shm_usage");
2336 #endif
2337 ck_abort_msg("Assertion 'init==try' failed:"
2338 " init==%#x, try==%#x, i=%d, errno=%d",
2339 init, try, i, errno);
2340 }
2341 #endif
2342 }
2343
2344 qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
2345 QB_LOG_FILTER_FILE, "*", LOG_TRACE);
2346 }
2347
2348 START_TEST(test_ipc_max_dgram_size)
2349 {
2350 qb_enter();
2351 test_max_dgram_size();
2352 qb_leave();
2353 }
2354 END_TEST
2355 #endif
2356
2357 static Suite *
2358 make_shm_suite(void)
2359 {
2360 TCase *tc;
2361 Suite *s = suite_create("shm");
2362
2363 add_tcase(s, tc, test_ipc_shm_connect_async, 7);
2364
2365 add_tcase(s, tc, test_ipc_txrx_shm_getauth, 7);
2366 add_tcase(s, tc, test_ipc_txrx_shm_timeout, 28);
2367 add_tcase(s, tc, test_ipc_server_fail_shm, 7);
2368 add_tcase(s, tc, test_ipc_txrx_shm_block, 7);
2369 add_tcase(s, tc, test_ipc_txrx_shm_tmo, 7);
2370 add_tcase(s, tc, test_ipc_fc_shm, 7);
2371 add_tcase(s, tc, test_ipc_dispatch_shm, 15);
2372 add_tcase(s, tc, test_ipc_stress_test_shm, 15);
2373 add_tcase(s, tc, test_ipc_bulk_events_shm, 15);
2374 add_tcase(s, tc, test_ipc_exit_shm, 6);
2375 add_tcase(s, tc, test_ipc_event_on_created_shm, 9);
2376 add_tcase(s, tc, test_ipc_service_ref_count_shm, 9);
2377 add_tcase(s, tc, test_ipc_server_perms, 7);
2378 add_tcase(s, tc, test_ipc_stress_connections_shm, 3600 /* ? */);
2379 add_tcase(s, tc, test_ipc_disp_shm_native_prio_dlock, 15);
2380 #if HAVE_GLIB
2381 add_tcase(s, tc, test_ipc_disp_shm_glib_prio_dlock, 15);
2382 #endif
2383 #ifdef HAVE_FAILURE_INJECTION
2384 add_tcase(s, tc, test_ipcc_truncate_when_unlink_fails_shm, 8);
2385 #endif
2386
2387 return s;
2388 }
2389
2390 static Suite *
2391 make_soc_suite(void)
2392 {
2393 Suite *s = suite_create("socket");
2394 TCase *tc;
2395
2396 add_tcase(s, tc, test_ipc_us_connect_async, 7);
2397
2398 add_tcase(s, tc, test_ipc_txrx_us_getauth, 7);
2399 add_tcase(s, tc, test_ipc_txrx_us_timeout, 28);
2400 /* Commented out for the moment as space in /dev/shm on the CI machines
2401 causes random failures */
2402 /* add_tcase(s, tc, test_ipc_max_dgram_size, 30); */
2403 add_tcase(s, tc, test_ipc_server_fail_soc, 7);
2404 add_tcase(s, tc, test_ipc_txrx_us_block, 7);
2405 add_tcase(s, tc, test_ipc_txrx_us_tmo, 7);
2406 add_tcase(s, tc, test_ipc_fc_us, 7);
2407 add_tcase(s, tc, test_ipc_exit_us, 6);
2408 add_tcase(s, tc, test_ipc_dispatch_us, 15);
2409 #ifndef __clang__ /* see variable length array in structure' at the top */
2410 add_tcase(s, tc, test_ipc_stress_test_us, 58);
2411 #endif
2412 add_tcase(s, tc, test_ipc_bulk_events_us, 15);
2413 add_tcase(s, tc, test_ipc_event_on_created_us, 9);
2414 add_tcase(s, tc, test_ipc_disconnect_after_created_us, 9);
2415 add_tcase(s, tc, test_ipc_service_ref_count_us, 9);
2416 add_tcase(s, tc, test_ipc_stress_connections_us, 3600 /* ? */);
2417 add_tcase(s, tc, test_ipc_us_native_prio_dlock, 15);
2418 #if HAVE_GLIB
2419 add_tcase(s, tc, test_ipc_us_glib_prio_dlock, 15);
2420 #endif
2421
2422 return s;
2423 }
2424
2425 int32_t
2426 main(void)
2427 {
2428 int32_t number_failed;
2429 SRunner *sr;
2430 Suite *s;
2431 int32_t do_shm_tests = QB_TRUE;
2432
2433 set_ipc_name("ipc_test");
2434 #ifdef DISABLE_IPC_SHM
2435 do_shm_tests = QB_FALSE;
2436 #endif /* DISABLE_IPC_SHM */
2437
2438 s = make_soc_suite();
2439 sr = srunner_create(s);
2440
2441 if (do_shm_tests) {
2442 srunner_add_suite(sr, make_shm_suite());
2443 }
2444
2445 qb_log_init("check", LOG_USER, LOG_EMERG);
2446 atexit(qb_log_fini);
2447 qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
2448 qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
2449 QB_LOG_FILTER_FILE, "*", LOG_TRACE);
2450 qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
2451 qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l| %b");
2452
2453 srunner_run_all(sr, CK_VERBOSE);
2454 number_failed = srunner_ntests_failed(sr);
2455 srunner_free(sr);
2456 return (number_failed == 0) ? EXIT_SUCCESS : EXIT_FAILURE;
2457 }
2458