1
2 /*
3 * Copyright (C) 2016-2026 Red Hat, Inc. All rights reserved.
4 *
5 * Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
6 *
7 * This software licensed under GPL-2.0+
8 */
9
10 #include "config.h"
11
12 #include <errno.h>
13 #include <stdio.h>
14 #include <string.h>
15 #include <unistd.h>
16 #include <stdlib.h>
17 #include <sys/types.h>
18 #include <sys/wait.h>
19 #include <sys/time.h>
20 #include <fcntl.h>
21 #include <pthread.h>
22 #include <dirent.h>
23 #include <sys/select.h>
24 #include <poll.h>
25
26 #include "libknet.h"
27 #include "internals.h"
28 #include "test-common.h"
29
30 static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
31 static int log_init = 0;
32 static pthread_mutex_t log_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
33 static pthread_t log_thread;
34 static int log_thread_init = 0;
35 static int log_fds[2];
36 struct log_thread_data {
37 int logfd;
38 FILE *std;
39 };
40 static struct log_thread_data data;
41 static char plugin_path[PATH_MAX];
42 static struct timeval log_start_time;
43 static int log_start_time_init = 0;
44
45 /* Log filter state for runtime pattern matching */
46 static pthread_mutex_t log_filter_mutex = PTHREAD_MUTEX_INITIALIZER;
47 static log_filter_fn log_filter_callback = NULL;
48 static int log_filter_logfd = -1;
49 static void *log_filter_private_data = NULL;
50 static int log_pattern_found = 0;
51
52 int is_memcheck(void)
53 {
54 char *val;
55
56 val = getenv("KNETMEMCHECK");
57
58 if (val) {
59 if (!strncmp(val, "yes", 3)) {
60 return 1;
61 }
62 }
63
64 return 0;
65 }
66
67 int is_helgrind(void)
68 {
69 char *val;
70
71 val = getenv("KNETHELGRIND");
72
73 if (val) {
74 if (!strncmp(val, "yes", 3)) {
75 return 1;
76 }
77 }
78
79 return 0;
80 }
81
82 static int adjust_timeout_for_valgrind(int seconds, int logfd)
83 {
84 if (is_memcheck() || is_helgrind()) {
85 int adjusted = seconds * 16;
86 log_test(logfd, "Running under valgrind, adjusting timeout from %d to %d seconds",
87 seconds, adjusted);
88 return adjusted;
89 }
90 return seconds;
91 }
92
93 static int setup_logpipes(int *logfds)
94 {
95 if (pipe2(logfds, O_CLOEXEC | O_NONBLOCK) < 0) {
96 printf("Unable to setup logging pipe\n");
97 exit(FAIL);
98 }
99
100 if (!log_start_time_init) {
101 gettimeofday(&log_start_time, NULL);
102 log_start_time_init = 1;
103 }
104
105 // coverity[ORDER_REVERSAL:SUPPRESS] - it's a test, get over it
106 return PASS;
107 }
108
109 static void close_logpipes(int *logfds)
110 {
111 close(logfds[0]);
112 logfds[0] = 0;
113 close(logfds[1]);
114 logfds[1] = 0;
115 }
116
117 static void flush_logs(int logfd, FILE *std)
118 {
119 struct knet_log_msg msg;
120 int len;
121 struct timeval now, elapsed;
122 long elapsed_sec, elapsed_usec;
123 char log_line[1024];
124
125 while (1) {
126 len = read(logfd, &msg, sizeof(msg));
127 if (len != sizeof(msg)) {
128 /*
129 * clear errno to avoid incorrect propagation
130 */
131 errno = 0;
132 return;
133 }
134
135 msg.msg[sizeof(msg.msg) - 1] = 0;
136
137 gettimeofday(&now, NULL);
138 timersub(&now, &log_start_time, &elapsed);
139 elapsed_sec = elapsed.tv_sec;
140 elapsed_usec = elapsed.tv_usec / 1000; /* convert to milliseconds */
141
142 if (msg.subsystem == (KNET_SUB_UNKNOWN - 1) && msg.msglevel == 0) {
143 snprintf(log_line, sizeof(log_line),
144 "[%6ld.%03ld] [testsuite]: %.*s",
145 elapsed_sec, elapsed_usec,
146 KNET_MAX_LOG_MSG_SIZE, msg.msg);
147 } else {
148 snprintf(log_line, sizeof(log_line),
149 "[%6ld.%03ld] [%s] %s: %.*s",
150 elapsed_sec, elapsed_usec,
151 knet_log_get_loglevel_name(msg.msglevel),
152 knet_log_get_subsystem_name(msg.subsystem),
153 KNET_MAX_LOG_MSG_SIZE, msg.msg);
154 }
155
156 fprintf(std, "%s\n", log_line);
157
158 /* Check log filter if installed */
159 pthread_mutex_lock(&log_filter_mutex);
160 if (log_filter_callback != NULL) {
161 if (log_filter_callback(log_filter_logfd, log_line, log_filter_private_data)) {
162 log_pattern_found = 1;
163 }
164 }
165 pthread_mutex_unlock(&log_filter_mutex);
166 }
167 }
168
169 static void *_logthread(void *args)
170 {
171 while (1) {
172 int num;
173 struct timeval tv = { 60, 0 };
174 fd_set rfds;
175
176 FD_ZERO(&rfds);
177 FD_SET(data.logfd, &rfds);
178
179 num = select(FD_SETSIZE, &rfds, NULL, NULL, &tv);
180 if (num < 0) {
181 fprintf(data.std, "Unable select over logfd!\nHALTING LOGTHREAD!\n");
182 return NULL;
183 }
184 if (num == 0) {
185 fprintf(data.std, "[knet]: No logs in the last 60 seconds\n");
186 continue;
187 }
188 if (FD_ISSET(data.logfd, &rfds)) {
189 flush_logs(data.logfd, data.std);
190 }
191 }
192 }
193
194 static int start_logthread(int logfd, FILE *std)
195 {
196 int savederrno = 0;
197
198 savederrno = pthread_mutex_lock(&log_thread_mutex);
199 if (savederrno) {
200 printf("Unable to get log_thread mutex lock\n");
201 return -1;
202 }
203
204 if (!log_thread_init) {
205 data.logfd = logfd;
206 data.std = std;
207
208 savederrno = pthread_create(&log_thread, 0, _logthread, NULL);
209 if (savederrno) {
210 printf("Unable to start logging thread: %s\n", strerror(savederrno));
211 pthread_mutex_unlock(&log_thread_mutex);
212 return -1;
213 }
214 log_thread_init = 1;
215 }
216
217 pthread_mutex_unlock(&log_thread_mutex);
218 return 0;
219 }
220
221 static int stop_logthread(void)
222 {
223 int savederrno = 0;
224 void *retval;
225
226 savederrno = pthread_mutex_lock(&log_thread_mutex);
227 if (savederrno) {
228 printf("Unable to get log_thread mutex lock\n");
229 return -1;
230 }
231
232 if (log_thread_init) {
233 pthread_cancel(log_thread);
234 pthread_join(log_thread, &retval);
235 log_thread_init = 0;
236 }
237
238 pthread_mutex_unlock(&log_thread_mutex);
239 return 0;
240 }
241
242 void stop_logging(void)
243 {
244 int savederrno = 0;
245
246 savederrno = pthread_mutex_lock(&log_mutex);
247 if (savederrno) {
248 printf("Unable to get log_mutex lock\n");
249 return;
250 }
251
252 if (log_init) {
253 stop_logthread();
254 flush_logs(log_fds[0], stdout);
255 close_logpipes(log_fds);
256 log_start_time_init = 0;
257 log_init = 0;
258 }
259
260 pthread_mutex_unlock(&log_mutex);
261 }
262
263 int start_logging(FILE *std)
264 {
265 int savederrno = 0;
266
267 savederrno = pthread_mutex_lock(&log_mutex);
268 if (savederrno) {
269 printf("Unable to get log_mutex lock\n");
270 return -1;
271 }
272
273 if (!log_init) {
274 setup_logpipes(log_fds);
275
276 if (atexit(&stop_logging) != 0) {
277 printf("Unable to register atexit handler to stop logging: %s\n",
278 strerror(errno));
279 exit(FAIL);
280 }
281
282 if (start_logthread(log_fds[0], std) < 0) {
283 exit(FAIL);
284 }
285
286 log_init = 1;
287 }
288
289 pthread_mutex_unlock(&log_mutex);
290
291 // coverity[MISSING_LOCK:SUPPRESS] - log_fds[1] is set while holding lock and doesn't change after init
292 return log_fds[1];
293 }
294
295 static int dir_filter(const struct dirent *dname)
296 {
297 if ( (strcmp(dname->d_name + strlen(dname->d_name)-3, ".so") == 0) &&
298 ((strncmp(dname->d_name,"crypto", 6) == 0) ||
299 (strncmp(dname->d_name,"compress", 8) == 0))) {
300 return 1;
301 }
302 return 0;
303 }
304
305 /* Make sure the proposed plugin path has at least 1 of each plugin available
306 - just as a sanity check really */
307 static int contains_plugins(char *path)
308 {
309 struct dirent **namelist;
310 int n,i;
311 size_t j;
312 struct knet_compress_info compress_list[256];
313 struct knet_crypto_info crypto_list[256];
314 size_t num_compress, num_crypto;
315 size_t compress_found = 0;
316 size_t crypto_found = 0;
317
318 if (knet_get_compress_list(compress_list, &num_compress) == -1) {
319 return 0;
320 }
321 if (knet_get_crypto_list(crypto_list, &num_crypto) == -1) {
322 return 0;
323 }
324
325 // coverity[UNINIT:SUPPRESS] - it's supposed to be...
326 n = scandir(path, &namelist, dir_filter, alphasort);
327 if (n == -1) {
328 return 0;
329 }
330
331 /* Look for plugins in the list */
332 for (i=0; i<n; i++) {
333 for (j=0; j<num_crypto; j++) {
334 if (strlen(namelist[i]->d_name) >= 7 &&
335 strncmp(crypto_list[j].name, namelist[i]->d_name+7,
336 strlen(crypto_list[j].name)) == 0) {
337 crypto_found++;
338 }
339 }
340 for (j=0; j<num_compress; j++) {
341 if (strlen(namelist[i]->d_name) >= 9 &&
342 strncmp(compress_list[j].name, namelist[i]->d_name+9,
343 strlen(compress_list[j].name)) == 0) {
344 compress_found++;
345 }
346 }
347 free(namelist[i]);
348 }
349 free(namelist);
350 /* If at least one plugin was found (or none were built) */
351 if ((crypto_found || num_crypto == 0) &&
352 (compress_found || num_compress == 0)) {
353 return 1;
354 } else {
355 return 0;
356 }
357 }
358
359
360 /* libtool sets LD_LIBRARY_PATH to the build tree when running test in-tree */
361 char *find_plugins_path(int logfd)
362 {
363 char *ld_libs_env = getenv("LD_LIBRARY_PATH");
364 if (ld_libs_env) {
365 char *ld_libs = strdup(ld_libs_env);
366 char *str = strtok(ld_libs, ":");
367 while (str) {
368 if (contains_plugins(str)) {
369 strncpy(plugin_path, str, sizeof(plugin_path)-1);
370 free(ld_libs);
371 log_test(logfd, "Using plugins from %.200s", plugin_path);
372 return plugin_path;
373 }
374 str = strtok(NULL, ":");
375 }
376 free(ld_libs);
377 }
378 return NULL;
379 }
380
381
382 knet_handle_t _ts_knet_handle_start(int logfd, uint8_t log_level, knet_handle_t knet_h_array[])
383 {
384 knet_handle_t knet_h = knet_handle_new_ex(1, logfd, log_level, 0);
385 char *plugins_path;
386
387 if (knet_h) {
388 log_test(logfd, "knet_handle_new at %p", knet_h);
389 plugins_path = find_plugins_path(logfd);
390 /* Use plugins from the build tree */
391 if (plugins_path) {
392 knet_h->plugin_path = plugins_path;
393 }
394 knet_h_array[1] = knet_h;
395 return knet_h;
396 } else {
397 log_test(logfd, "knet_handle_new failed: %s", strerror(errno));
398 stop_logging();
399 exit(FAIL);
400 }
401 }
402
403 int _ts_knet_handle_reconnect_links(knet_handle_t knet_h, int logfd)
404 {
405 size_t i, j;
406 knet_node_id_t host_ids[KNET_MAX_HOST];
407 uint8_t link_ids[KNET_MAX_LINK];
408 size_t host_ids_entries = 0, link_ids_entries = 0;
409 unsigned int enabled;
410
411 if (!knet_h) {
412 errno = EINVAL;
413 return -1;
414 }
415
416 if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) {
417 log_test(logfd, "knet_host_get_host_list failed: %s", strerror(errno));
418 return -1;
419 }
420
421 for (i = 0; i < host_ids_entries; i++) {
422 if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) {
423 log_test(logfd, "knet_link_get_link_list failed: %s", strerror(errno));
424 return -1;
425 }
426 for (j = 0; j < link_ids_entries; j++) {
427 if (knet_link_get_enable(knet_h, host_ids[i], link_ids[j], &enabled)) {
428 log_test(logfd, "knet_link_get_enable failed: %s", strerror(errno));
429 return -1;
430 }
431 if (!enabled) {
432 if (knet_link_set_enable(knet_h, host_ids[i], j, 1)) {
433 log_test(logfd, "knet_link_set_enable failed: %s", strerror(errno));
434 return -1;
435 }
436 }
437 }
438 }
439
440 return 0;
441 }
442
443 int _ts_knet_handle_disconnect_links(knet_handle_t knet_h, int logfd)
444 {
445 size_t i, j;
446 knet_node_id_t host_ids[KNET_MAX_HOST];
447 uint8_t link_ids[KNET_MAX_LINK];
448 size_t host_ids_entries = 0, link_ids_entries = 0;
449 unsigned int enabled;
450
451 if (!knet_h) {
452 errno = EINVAL;
453 return -1;
454 }
455
456 if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) {
457 log_test(logfd, "knet_host_get_host_list failed: %s", strerror(errno));
458 return -1;
459 }
460
461 for (i = 0; i < host_ids_entries; i++) {
462 if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) {
463 log_test(logfd, "knet_link_get_link_list failed: %s", strerror(errno));
464 return -1;
465 }
466 for (j = 0; j < link_ids_entries; j++) {
467 if (knet_link_get_enable(knet_h, host_ids[i], link_ids[j], &enabled)) {
468 log_test(logfd, "knet_link_get_enable failed: %s", strerror(errno));
469 return -1;
470 }
471 if (enabled) {
472 if (knet_link_set_enable(knet_h, host_ids[i], j, 0)) {
473 log_test(logfd, "knet_link_set_enable failed: %s", strerror(errno));
474 return -1;
475 }
476 }
477 }
478 }
479
480 return 0;
481 }
482
483 static int _make_local_sockaddr(struct sockaddr_storage *lo, int offset, int family, int logfd)
484 {
485 in_port_t port;
486 char portstr[32];
487
488 if (offset < 0) {
489 /*
490 * api_knet_link_set_config needs to access the API directly, but
491 * it does not send any traffic, so it´s safe to ask the kernel
492 * for a random port.
493 */
494 port = 0;
495 } else {
496 /* Use the pid if we can. but makes sure its in a sensible range */
497 port = (getpid() + offset) % (TEST_PORT_MAX - TEST_PORT_BASE) + TEST_PORT_BASE;
498 }
499 sprintf(portstr, "%u", port);
500 memset(lo, 0, sizeof(struct sockaddr_storage));
501 log_test(logfd, "Using port %u", port);
502
503 if (family == AF_INET6) {
504 return knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage));
505 }
506 return knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage));
507 }
508
509 int make_local_sockaddr(struct sockaddr_storage *lo, int offset, int logfd)
510 {
511 return _make_local_sockaddr(lo, offset, AF_INET, logfd);
512 }
513
514 int make_local_sockaddr6(struct sockaddr_storage *lo, int offset, int logfd)
515 {
516 return _make_local_sockaddr(lo, offset, AF_INET6, logfd);
517 }
518
519 int _ts_knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
520 uint8_t transport, uint64_t flags, int family, int dynamic,
521 struct sockaddr_storage *lo, int logfd)
522 {
523 int err = 0, savederrno = 0;
524 uint32_t port;
525 char portstr[32];
526
527 for (port = TEST_PORT_MIN; port < TEST_PORT_MAX; port++) {
528 sprintf(portstr, "%u", port);
529 memset(lo, 0, sizeof(struct sockaddr_storage));
530 if (family == AF_INET6) {
531 err = knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage));
532 } else {
533 err = knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage));
534 }
535 if (err < 0) {
536 log_test(logfd, "Unable to convert loopback to sockaddr: %s", strerror(errno));
537 goto out;
538 }
539 errno = 0;
540 if (dynamic) {
541 err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, NULL, flags);
542 } else {
543 err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, lo, flags);
544 }
545 savederrno = errno;
546 if ((err < 0) && (savederrno != EADDRINUSE)) {
547 log_test(logfd, "Unable to configure link: %s", strerror(savederrno));
548 goto out;
549 }
550 if (!err) {
551 log_test(logfd, "Using port %u", port);
552 goto out;
553 }
554 }
555
556 if (err) {
557 log_test(logfd, "No more ports available");
558 }
559 out:
560 errno = savederrno;
561 return err;
562 }
563
564 void test_sleep(int logfd, int seconds)
565 {
566 seconds = adjust_timeout_for_valgrind(seconds, logfd);
567 log_test(logfd, "Sleeping for %d second%s", seconds, seconds == 1 ? "" : "s");
568 sleep(seconds);
569 }
570
571 int wait_for_packet(knet_handle_t knet_h, int seconds, int datafd, int logfd)
572 {
573 fd_set rfds;
574 struct timeval tv;
575 int err = 0, i = 0;
576
577 seconds = adjust_timeout_for_valgrind(seconds, logfd);
578
579 try_again:
580 FD_ZERO(&rfds);
581 FD_SET(datafd, &rfds);
582
583 tv.tv_sec = 1;
584 tv.tv_usec = 0;
585
586 err = select(datafd+1, &rfds, NULL, NULL, &tv);
587 /*
588 * on slow arches the first call to select can return 0.
589 * pick an arbitrary 10 times loop (multiplied by waiting seconds)
590 * before failing.
591 */
592 if ((!err) && (i < seconds)) {
593 i++;
594 goto try_again;
595 }
596 if ((err > 0) && (FD_ISSET(datafd, &rfds))) {
597 return 0;
598 }
599
600 errno = ETIMEDOUT;
601 return -1;
602 }
603
604 /*
605 * functional tests helpers
606 */
607
608 void _ts_knet_handle_start_nodes(knet_handle_t knet_h[], uint8_t numnodes, int logfd, uint8_t log_level)
609 {
610 uint8_t i;
611 char *plugins_path = find_plugins_path(logfd);
612
613 for (i = 1; i <= numnodes; i++) {
614 knet_h[i] = knet_handle_new_ex(i, logfd, log_level, 0);
615 if (!knet_h[i]) {
616 log_test(logfd, "failed to create handle: %s", strerror(errno));
617 break;
618 } else {
619 log_test(logfd, "knet_h[%u] at %p", i, knet_h[i]);
620 }
621 /* Use plugins from the build tree */
622 if (plugins_path) {
623 knet_h[i]->plugin_path = plugins_path;
624 }
625 }
626
627 if (i < numnodes) {
628 _ts_knet_handle_stop_everything(knet_h, i, logfd);
629 exit(FAIL);
630 }
631
632 return;
633 }
634
635 void _ts_knet_handle_join_nodes(knet_handle_t knet_h[], uint8_t numnodes, uint8_t numlinks, int family, uint8_t transport, int logfd)
636 {
637 uint8_t i, x, j, tmp_peer;
638 struct sockaddr_storage lo;
639
640 /*
641 * Phase 1: Add all peers and allocate ports with temporary configurations
642 * This binds ports without races - ports stay bound throughout
643 */
644 log_test(logfd, "Phase 1: Adding peers and allocating ports for %u nodes with %u links each", numnodes, numlinks);
645
646 for (i = 1; i <= numnodes; i++) {
647 /* Add all peer nodes first */
648 for (j = 1; j <= numnodes; j++) {
649 if (j == i) {
650 continue;
651 }
652
653 log_test(logfd, "host %u adding host: %u", i, j);
654
655 if (knet_host_add(knet_h[i], j) < 0) {
656 log_test(logfd, "Unable to add host: %s", strerror(errno));
657 _ts_knet_handle_stop_everything(knet_h, numnodes, logfd);
658 exit(FAIL);
659 }
660 }
661
662 /* Configure links with temporary dst (use first peer as temporary dst) */
663 tmp_peer = (i == 1) ? 2 : 1;
664 for (x = 0; x < numlinks; x++) {
665 /*
666 * Use _ts_knet_link_set_config to find available port
667 * Configure to temporary peer to bind the src port
668 * Port stays bound - we'll update dst in Phase 2
669 */
670 if (_ts_knet_link_set_config(knet_h[i], tmp_peer, x, transport, 0, family, 0, &lo, logfd) < 0) {
671 log_test(logfd, "Unable to allocate port for node %u link %u: %s", i, x, strerror(errno));
672 _ts_knet_handle_stop_everything(knet_h, numnodes, logfd);
673 exit(FAIL);
674 }
675
676 /*
677 * Hack: Clear the configured flag to allow reconfiguration to actual peer
678 * Port/socket stays bound, but API will allow reconfiguring to different host_id
679 */
680 pthread_rwlock_wrlock(&knet_h[i]->global_rwlock);
681 knet_h[i]->host_index[tmp_peer]->link[x].configured = 0;
682 pthread_rwlock_unlock(&knet_h[i]->global_rwlock);
683 }
684 }
685
686 /*
687 * Phase 2: Update link configurations with correct dst addresses
688 * All ports are now allocated and bound, just update destinations
689 */
690 log_test(logfd, "Phase 2: Updating link destinations");
691
692 for (i = 1; i <= numnodes; i++) {
693 tmp_peer = (i == 1) ? 2 : 1;
694 for (j = 1; j <= numnodes; j++) {
695 if (j == i) {
696 continue;
697 }
698
699 for (x = 0; x < numlinks; x++) {
700 uint8_t tmp_peer_j = (j == 1) ? 2 : 1;
701 struct sockaddr_storage src, dst;
702
703 /*
704 * Copy addresses from link structures to local vars
705 * - src: node i's link x src_addr (bound in Phase 1)
706 * - dst: node j's link x src_addr (bound in Phase 1)
707 * Local copies required - passing pointers directly causes internal state issues
708 */
709 memcpy(&src, &knet_h[i]->host_index[tmp_peer]->link[x].src_addr, sizeof(struct sockaddr_storage));
710 memcpy(&dst, &knet_h[j]->host_index[tmp_peer_j]->link[x].src_addr, sizeof(struct sockaddr_storage));
711
712 /*
713 * Update link configuration with correct dst
714 * Second call to knet_link_set_config updates the existing link
715 */
716 if (knet_link_set_config(knet_h[i], j, x, transport, &src, &dst, 0) < 0) {
717 log_test(logfd, "Unable to configure link: %s", strerror(errno));
718 _ts_knet_handle_stop_everything(knet_h, numnodes, logfd);
719 exit(FAIL);
720 }
721
722 if (knet_link_set_enable(knet_h[i], j, x, 1) < 0) {
723 log_test(logfd, "unable to enable link: %s", strerror(errno));
724 _ts_knet_handle_stop_everything(knet_h, numnodes, logfd);
725 exit(FAIL);
726 }
727 }
728 }
729 }
730
731 for (i = 1; i <= numnodes; i++) {
732 wait_for_nodes_state(knet_h[i], numnodes, 1, TEST_TIMEOUT_LONG, logfd);
733 }
734
735 return;
736 }
737
738
739 static int target=0;
740
741 static int state_wait_pipe[2] = {0,0};
742 static int host_wait_pipe[2] = {0,0};
743 static int callback_logfd = -1;
744
745 static int count_nodes(knet_handle_t knet_h)
746 {
747 int nodes = 0;
748 int i;
749
750 for (i=0; i< KNET_MAX_HOST; i++) {
751 if (knet_h->host_index[i] && knet_h->host_index[i]->status.reachable == 1) {
752 nodes++;
753 }
754 }
755 return nodes;
756 }
757
758 static void nodes_notify_callback(void *private_data,
759 knet_node_id_t host_id,
760 uint8_t reachable, uint8_t remote, uint8_t external)
761 {
762 knet_handle_t knet_h = (knet_handle_t) private_data;
763 int nodes;
764 int res;
765
766 nodes = count_nodes(knet_h);
767
768 if (nodes == target) {
769 res = write(state_wait_pipe[1], ".", 1);
770 if (res != 1) {
771 log_test(callback_logfd, "***FAILed to signal wait_for_nodes_state: %s", strerror(errno));
772 }
773 }
774 }
775
776 /* Called atexit() */
777 static void finish_state_pipes()
778 {
779 if (state_wait_pipe[0] != 0) {
780 close(state_wait_pipe[0]);
781 close(state_wait_pipe[1]);
782 state_wait_pipe[0] = 0;
783 }
784 if (host_wait_pipe[0] != 0) {
785 close(host_wait_pipe[0]);
786 close(host_wait_pipe[1]);
787 host_wait_pipe[0] = 0;
788 }
789 }
790
791 static void host_notify_callback(void *private_data,
792 knet_node_id_t host_id,
793 uint8_t reachable, uint8_t remote, uint8_t external)
794 {
795 knet_handle_t knet_h = (knet_handle_t) private_data;
796 int res;
797
798 if (knet_h->host_index[host_id]->status.reachable == 1) {
799 res = write(host_wait_pipe[1], ".", 1);
800 if (res != 1) {
801 log_test(callback_logfd, "***FAILed to signal wait_for_host: %s", strerror(errno));
802 }
803 }
804 }
805
806 int wait_for_reply(int seconds, int pipefd, int logfd)
807 {
808 int res;
809 struct pollfd pfds;
810 char tmpbuf[32];
811
812 seconds = adjust_timeout_for_valgrind(seconds, logfd);
813
814 pfds.fd = pipefd;
815 pfds.events = POLLIN | POLLERR | POLLHUP;
816 pfds.revents = 0;
817
818 res = poll(&pfds, 1, seconds*1000);
819 if (res == 1) {
820 if (pfds.revents & POLLIN) {
821 res = read(pipefd, tmpbuf, sizeof(tmpbuf));
822 if (res > 0) {
823 return 0;
824 }
825 } else {
826 log_test(logfd, "Error on pipe poll revent = 0x%x", pfds.revents);
827 errno = EIO;
828 }
829 }
830 if (res == 0) {
831 errno = ETIMEDOUT;
832 return -1;
833 }
834
835 return -1;
836 }
837
838 /* Wait for a cluster of 'numnodes' to come up/go down */
839 int wait_for_nodes_state(knet_handle_t knet_h, size_t numnodes,
840 uint8_t state, uint32_t seconds,
841 int logfd)
842 {
843 int res, savederrno = 0;
844
845 callback_logfd = logfd;
846
847 if (state_wait_pipe[0] == 0) {
848 res = pipe(state_wait_pipe);
849 if (res == -1) {
850 savederrno = errno;
851 log_test(logfd, "Error creating host reply pipe: %s", strerror(errno));
852 errno = savederrno;
853 return -1;
854 }
855 if (atexit(finish_state_pipes)) {
856 log_test(logfd, "Unable to register atexit handler to close pipes: %s",
857 strerror(errno));
858 exit(FAIL);
859 }
860
861 }
862
863 if (state) {
864 target = numnodes-1; /* exclude us */
865 } else {
866 target = 0; /* Wait for all to go down */
867 }
868
869 /* Set this before checking existing status or there's a race condition */
870 knet_host_enable_status_change_notify(knet_h,
871 (void *)(long)knet_h,
872 nodes_notify_callback);
873
874 /* Check we haven't already got all the nodes in the correct state */
875 if (count_nodes(knet_h) == target) {
876 log_test(logfd, "target already reached");
877 knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
878 return 0;
879 }
880
881 res = wait_for_reply(seconds, state_wait_pipe[0], logfd);
882 if (res == -1) {
883 savederrno = errno;
884 log_test(logfd, "Error waiting for nodes status reply: %s", strerror(errno));
885 }
886
887 knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
888 errno = savederrno;
889 return res;
890 }
891
892 /* Wait for a single node to come up */
893 int wait_for_host(knet_handle_t knet_h, uint16_t host_id, int seconds, int logfd)
894 {
895 int res = 0;
896 int savederrno = 0;
897
898 callback_logfd = logfd;
899
900 if (host_wait_pipe[0] == 0) {
901 res = pipe(host_wait_pipe);
902 if (res == -1) {
903 savederrno = errno;
904 log_test(logfd, "Error creating host reply pipe: %s", strerror(errno));
905 errno = savederrno;
906 return -1;
907 }
908 if (atexit(finish_state_pipes)) {
909 log_test(logfd, "Unable to register atexit handler to close pipes: %s",
910 strerror(errno));
911 exit(FAIL);
912 }
913
914 }
915
916 /* Set this before checking existing status or there's a race condition */
917 knet_host_enable_status_change_notify(knet_h,
918 (void *)(long)knet_h,
919 host_notify_callback);
920
921 /* Check it's not already reachable */
922 if (knet_h->host_index[host_id]->status.reachable == 1) {
923 knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
924 return 0;
925 }
926
927 res = wait_for_reply(seconds, host_wait_pipe[0], logfd);
928 if (res == -1) {
929 savederrno = errno;
930 log_test(logfd, "Error waiting for host status reply: %s", strerror(errno));
931 }
932
933 knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
934
935 /* Still wait for it to settle */
936 test_sleep(logfd, 1);
937 errno = savederrno;
938 return res;
939 }
940
941 /* Shutdown all nodes and links attached to an array of knet handles.
942 * Mostly stolen from corosync code (that I wrote, before anyone complains about licences)
943 */
944 void _ts_knet_handle_stop_everything(knet_handle_t knet_h[], uint8_t numnodes, int logfd)
945 {
946 int res = 0;
947 int h;
948 size_t i,j;
949 static knet_node_id_t nodes[KNET_MAX_HOST]; /* static to save stack */
950 uint8_t links[KNET_MAX_LINK];
951 size_t num_nodes;
952 size_t num_links;
953
954 for (h=1; h<numnodes+1; h++) {
955 if (!knet_h[h]) {
956 continue;
957 }
958
959 res = knet_handle_setfwd(knet_h[h], 0);
960 if (res) {
961 log_test(logfd, "knet_handle_setfwd failed: %s", strerror(errno));
962 }
963
964 res = knet_host_get_host_list(knet_h[h], nodes, &num_nodes);
965 if (res) {
966 log_test(logfd, "Cannot get knet node list for shutdown: %s", strerror(errno));
967 continue;
968 }
969
970 /* Tidily shut down all nodes & links. */
971 for (i=0; i<num_nodes; i++) {
972
973 res = knet_link_get_link_list(knet_h[h], nodes[i], links, &num_links);
974 if (res) {
975 log_test(logfd, "Cannot get knet link list for node %u: %s", nodes[i], strerror(errno));
976 goto finalise_error;
977 }
978 for (j=0; j<num_links; j++) {
979 res = knet_link_set_enable(knet_h[h], nodes[i], links[j], 0);
980 if (res) {
981 log_test(logfd, "knet_link_set_enable(node %u, link %d) failed: %s", nodes[i], links[j], strerror(errno));
982 }
983 res = knet_link_clear_config(knet_h[h], nodes[i], links[j]);
984 if (res) {
985 log_test(logfd, "knet_link_clear_config(node %u, link %d) failed: %s", nodes[i], links[j], strerror(errno));
986 }
987 }
988 res = knet_host_remove(knet_h[h], nodes[i]);
989 if (res) {
990 log_test(logfd, "knet_host_remove(node %u) failed: %s", nodes[i], strerror(errno));
991 }
992 }
993
994 finalise_error:
995 res = knet_handle_free(knet_h[h]);
996 if (res) {
997 log_test(logfd, "knet_handle_free failed: %s", strerror(errno));
998 }
999 }
1000 }
1001
1002 /*
1003 * Packet injector: Create a packet and inject it into a link's socket
1004 *
1005 * This allows testing RX validation without network-level packet manipulation.
1006 * The caller provides a seq_num to avoid packet deduplication in the RX thread.
1007 *
1008 * Returns 0 on success, -1 on error
1009 */
1010 int inject_packet(knet_handle_t knet_h,
1011 uint8_t packet_type,
1012 knet_node_id_t src_host_id,
1013 uint8_t actual_link_id,
1014 uint8_t claimed_link_id,
1015 uint8_t frag_num,
1016 uint8_t frag_seq,
1017 seq_num_t seq_num,
1018 const char *payload,
1019 size_t payload_len)
1020 {
1021 struct knet_header *packet;
1022 size_t packet_len;
1023 struct knet_host *src_host;
1024 struct knet_link *src_link;
1025 ssize_t sent;
1026 socklen_t addrlen;
1027 struct timespec timestamp;
1028
1029 /* Determine packet size based on type */
1030 switch (packet_type) {
1031 case KNET_HEADER_TYPE_DATA:
1032 packet_len = KNET_HEADER_DATA_SIZE + payload_len;
1033 break;
1034 case KNET_HEADER_TYPE_PING:
1035 packet_len = KNET_HEADER_PING_SIZE;
1036 break;
1037 default:
1038 errno = EINVAL;
1039 return -1;
1040 }
1041
1042 packet = malloc(packet_len);
1043 if (!packet) {
1044 return -1;
1045 }
1046
1047 memset(packet, 0, packet_len);
1048
1049 /* Fill in common packet header */
1050 packet->kh_version = 0;
1051 packet->kh_type = packet_type;
1052 packet->kh_node = htons(src_host_id);
1053
1054 /* Fill in type-specific payload */
1055 switch (packet_type) {
1056 case KNET_HEADER_TYPE_DATA:
1057 packet->khp_data_seq_num = htons(seq_num);
1058 packet->khp_data_compress = 0;
1059 packet->khp_data_bcast = 0;
1060 packet->khp_data_channel = 0;
1061 packet->khp_data_frag_num = frag_num;
1062 packet->khp_data_frag_seq = frag_seq;
1063
1064 /* Copy payload */
1065 if (payload && payload_len > 0) {
1066 memcpy(packet->khp_data_userdata, payload, payload_len);
1067 }
1068 break;
1069 case KNET_HEADER_TYPE_PING:
1070 packet->khp_ping_link = claimed_link_id;
1071 clock_gettime(CLOCK_MONOTONIC, ×tamp);
1072 memmove(&packet->khp_ping_time[0], ×tamp, sizeof(struct timespec));
1073 packet->khp_ping_seq_num = htons(seq_num);
1074 packet->khp_ping_timed = 1;
1075 break;
1076 }
1077
1078 /* Get the source host and link to determine where to inject */
1079 src_host = knet_h->host_index[src_host_id];
1080 if (!src_host) {
1081 free(packet);
1082 return -1;
1083 }
1084
1085 src_link = &src_host->link[actual_link_id];
1086
1087 /* Check if link is properly configured */
1088 if (src_link->outsock < 0) {
1089 free(packet);
1090 return -1;
1091 }
1092
1093 /* Determine address length based on address family */
1094 switch (src_link->dst_addr.ss_family) {
1095 case AF_INET:
1096 addrlen = sizeof(struct sockaddr_in);
1097 break;
1098 case AF_INET6:
1099 addrlen = sizeof(struct sockaddr_in6);
1100 break;
1101 default:
1102 free(packet);
1103 return -1;
1104 }
1105
1106 /* Inject the packet by sending to ourselves (loopback) */
1107 sent = sendto(src_link->outsock, packet, packet_len, MSG_DONTWAIT | MSG_NOSIGNAL,
1108 (struct sockaddr *)&src_link->dst_addr,
1109 addrlen);
1110
1111 free(packet);
1112
1113 if (sent != (ssize_t)packet_len) {
1114 return -1;
1115 }
1116
1117 return 0;
1118 }
1119
1120 /*
1121 * Install a runtime log filter callback
1122 * Thread-safe via mutex protection
1123 */
1124 void install_log_filter(int logfd, log_filter_fn filter_fn, void *private_data)
1125 {
1126 pthread_mutex_lock(&log_filter_mutex);
1127 log_filter_callback = filter_fn;
1128 log_filter_logfd = logfd;
1129 log_filter_private_data = private_data;
1130 log_pattern_found = 0; /* Reset flag when installing new filter */
1131 pthread_mutex_unlock(&log_filter_mutex);
1132 }
1133
1134 /*
1135 * Check if log filter found a pattern match
1136 * Returns current value and resets the flag
1137 */
1138 int check_log_pattern_found(void)
1139 {
1140 int found;
1141
1142 pthread_mutex_lock(&log_filter_mutex);
1143 found = log_pattern_found;
1144 log_pattern_found = 0; /* Reset after reading */
1145 pthread_mutex_unlock(&log_filter_mutex);
1146
1147 return found;
1148 }
1149