1    	
2    	/*
3    	 * Copyright (C) 2016-2026 Red Hat, Inc.  All rights reserved.
4    	 *
5    	 * Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
6    	 *
7    	 * This software licensed under GPL-2.0+
8    	 */
9    	
10   	#include "config.h"
11   	
12   	#include <errno.h>
13   	#include <stdio.h>
14   	#include <string.h>
15   	#include <unistd.h>
16   	#include <stdlib.h>
17   	#include <sys/types.h>
18   	#include <sys/wait.h>
19   	#include <sys/time.h>
20   	#include <fcntl.h>
21   	#include <pthread.h>
22   	#include <dirent.h>
23   	#include <sys/select.h>
24   	#include <poll.h>
25   	
26   	#include "libknet.h"
27   	#include "internals.h"
28   	#include "test-common.h"
29   	
30   	static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
31   	static int log_init = 0;
32   	static pthread_mutex_t log_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
33   	static pthread_t log_thread;
34   	static int log_thread_init = 0;
35   	static int log_fds[2];
36   	struct log_thread_data {
37   		int logfd;
38   		FILE *std;
39   	};
40   	static struct log_thread_data data;
41   	static char plugin_path[PATH_MAX];
42   	static struct timeval log_start_time;
43   	static int log_start_time_init = 0;
44   	
45   	/* Log filter state for runtime pattern matching */
46   	static pthread_mutex_t log_filter_mutex = PTHREAD_MUTEX_INITIALIZER;
47   	static log_filter_fn log_filter_callback = NULL;
48   	static int log_filter_logfd = -1;
49   	static void *log_filter_private_data = NULL;
50   	static int log_pattern_found = 0;
51   	
52   	int is_memcheck(void)
53   	{
54   		char *val;
55   	
56   		val = getenv("KNETMEMCHECK");
57   	
58   		if (val) {
59   			if (!strncmp(val, "yes", 3)) {
60   				return 1;
61   			}
62   		}
63   	
64   		return 0;
65   	}
66   	
67   	int is_helgrind(void)
68   	{
69   		char *val;
70   	
71   		val = getenv("KNETHELGRIND");
72   	
73   		if (val) {
74   			if (!strncmp(val, "yes", 3)) {
75   				return 1;
76   			}
77   		}
78   	
79   		return 0;
80   	}
81   	
82   	static int adjust_timeout_for_valgrind(int seconds, int logfd)
83   	{
84   		if (is_memcheck() || is_helgrind()) {
85   			int adjusted = seconds * 16;
86   			log_test(logfd, "Running under valgrind, adjusting timeout from %d to %d seconds",
87   				 seconds, adjusted);
88   			return adjusted;
89   		}
90   		return seconds;
91   	}
92   	
93   	static int setup_logpipes(int *logfds)
94   	{
95   		if (pipe2(logfds, O_CLOEXEC | O_NONBLOCK) < 0) {
96   			printf("Unable to setup logging pipe\n");
97   			exit(FAIL);
98   		}
99   	
100  		if (!log_start_time_init) {
101  			gettimeofday(&log_start_time, NULL);
102  			log_start_time_init = 1;
103  		}
104  	
105  		// coverity[ORDER_REVERSAL:SUPPRESS] - it's a test, get over it
106  		return PASS;
107  	}
108  	
109  	static void close_logpipes(int *logfds)
110  	{
111  		close(logfds[0]);
112  		logfds[0] = 0;
113  		close(logfds[1]);
114  		logfds[1] = 0;
115  	}
116  	
117  	static void flush_logs(int logfd, FILE *std)
118  	{
119  		struct knet_log_msg msg;
120  		int len;
121  		struct timeval now, elapsed;
122  		long elapsed_sec, elapsed_usec;
123  		char log_line[1024];
124  	
125  		while (1) {
126  			len = read(logfd, &msg, sizeof(msg));
127  			if (len != sizeof(msg)) {
128  				/*
129  				 * clear errno to avoid incorrect propagation
130  				 */
131  				errno = 0;
132  				return;
133  			}
134  	
135  			msg.msg[sizeof(msg.msg) - 1] = 0;
136  	
137  			gettimeofday(&now, NULL);
138  			timersub(&now, &log_start_time, &elapsed);
139  			elapsed_sec = elapsed.tv_sec;
140  			elapsed_usec = elapsed.tv_usec / 1000; /* convert to milliseconds */
141  	
142  			if (msg.subsystem == (KNET_SUB_UNKNOWN - 1) && msg.msglevel == 0) {
143  				snprintf(log_line, sizeof(log_line),
144  					 "[%6ld.%03ld] [testsuite]: %.*s",
145  					 elapsed_sec, elapsed_usec,
146  					 KNET_MAX_LOG_MSG_SIZE, msg.msg);
147  			} else {
148  				snprintf(log_line, sizeof(log_line),
149  					 "[%6ld.%03ld] [%s] %s: %.*s",
150  					 elapsed_sec, elapsed_usec,
151  					 knet_log_get_loglevel_name(msg.msglevel),
152  					 knet_log_get_subsystem_name(msg.subsystem),
153  					 KNET_MAX_LOG_MSG_SIZE, msg.msg);
154  			}
155  	
156  			fprintf(std, "%s\n", log_line);
157  	
158  			/* Check log filter if installed */
159  			pthread_mutex_lock(&log_filter_mutex);
160  			if (log_filter_callback != NULL) {
161  				if (log_filter_callback(log_filter_logfd, log_line, log_filter_private_data)) {
162  					log_pattern_found = 1;
163  				}
164  			}
165  			pthread_mutex_unlock(&log_filter_mutex);
166  		}
167  	}
168  	
169  	static void *_logthread(void *args)
170  	{
171  		while (1) {
172  			int num;
173  			struct timeval tv = { 60, 0 };
174  			fd_set rfds;
175  	
176  			FD_ZERO(&rfds);
177  			FD_SET(data.logfd, &rfds);
178  	
179  			num = select(FD_SETSIZE, &rfds, NULL, NULL, &tv);
180  			if (num < 0) {
181  				fprintf(data.std, "Unable select over logfd!\nHALTING LOGTHREAD!\n");
182  				return NULL;
183  			}
184  			if (num == 0) {
185  				fprintf(data.std, "[knet]: No logs in the last 60 seconds\n");
186  				continue;
187  			}
188  			if (FD_ISSET(data.logfd, &rfds)) {
189  				flush_logs(data.logfd, data.std);
190  			}
191  		}
192  	}
193  	
194  	static int start_logthread(int logfd, FILE *std)
195  	{
196  		int savederrno = 0;
197  	
198  		savederrno = pthread_mutex_lock(&log_thread_mutex);
199  		if (savederrno) {
200  			printf("Unable to get log_thread mutex lock\n");
201  			return -1;
202  		}
203  	
204  		if (!log_thread_init) {
205  			data.logfd = logfd;
206  			data.std = std;
207  	
208  			savederrno = pthread_create(&log_thread, 0, _logthread, NULL);
209  			if (savederrno) {
210  				printf("Unable to start logging thread: %s\n", strerror(savederrno));
211  				pthread_mutex_unlock(&log_thread_mutex);
212  				return -1;
213  			}
214  			log_thread_init = 1;
215  		}
216  	
217  		pthread_mutex_unlock(&log_thread_mutex);
218  		return 0;
219  	}
220  	
221  	static int stop_logthread(void)
222  	{
223  		int savederrno = 0;
224  		void *retval;
225  	
226  		savederrno = pthread_mutex_lock(&log_thread_mutex);
227  		if (savederrno) {
228  			printf("Unable to get log_thread mutex lock\n");
229  			return -1;
230  		}
231  	
232  		if (log_thread_init) {
233  			pthread_cancel(log_thread);
234  			pthread_join(log_thread, &retval);
235  			log_thread_init = 0;
236  		}
237  	
238  		pthread_mutex_unlock(&log_thread_mutex);
239  		return 0;
240  	}
241  	
242  	void stop_logging(void)
243  	{
244  		int savederrno = 0;
245  	
246  		savederrno = pthread_mutex_lock(&log_mutex);
247  		if (savederrno) {
248  			printf("Unable to get log_mutex lock\n");
249  			return;
250  		}
251  	
252  		if (log_init) {
253  			stop_logthread();
254  			flush_logs(log_fds[0], stdout);
255  			close_logpipes(log_fds);
256  			log_start_time_init = 0;
257  			log_init = 0;
258  		}
259  	
260  		pthread_mutex_unlock(&log_mutex);
261  	}
262  	
263  	int start_logging(FILE *std)
264  	{
265  		int savederrno = 0;
266  	
267  		savederrno = pthread_mutex_lock(&log_mutex);
268  		if (savederrno) {
269  			printf("Unable to get log_mutex lock\n");
270  			return -1;
271  		}
272  	
273  		if (!log_init) {
274  			setup_logpipes(log_fds);
275  	
276  			if (atexit(&stop_logging) != 0) {
277  				printf("Unable to register atexit handler to stop logging: %s\n",
278  				       strerror(errno));
279  				exit(FAIL);
280  			}
281  	
282  			if (start_logthread(log_fds[0], std) < 0) {
283  				exit(FAIL);
284  			}
285  	
286  			log_init = 1;
287  		}
288  	
289  		pthread_mutex_unlock(&log_mutex);
290  	
291  		// coverity[MISSING_LOCK:SUPPRESS] - log_fds[1] is set while holding lock and doesn't change after init
292  		return log_fds[1];
293  	}
294  	
295  	static int dir_filter(const struct dirent *dname)
296  	{
297  		if ( (strcmp(dname->d_name + strlen(dname->d_name)-3, ".so") == 0) &&
298  		    ((strncmp(dname->d_name,"crypto", 6) == 0) ||
299  		     (strncmp(dname->d_name,"compress", 8) == 0))) {
300  			return 1;
301  		}
302  		return 0;
303  	}
304  	
305  	/* Make sure the proposed plugin path has at least 1 of each plugin available
306  	   - just as a sanity check really */
307  	static int contains_plugins(char *path)
308  	{
309  		struct dirent **namelist;
310  		int n,i;
311  		size_t j;
312  		struct knet_compress_info compress_list[256];
313  		struct knet_crypto_info crypto_list[256];
314  		size_t num_compress, num_crypto;
315  		size_t compress_found = 0;
316  		size_t crypto_found = 0;
317  	
318  		if (knet_get_compress_list(compress_list, &num_compress) == -1) {
319  			return 0;
320  		}
321  		if (knet_get_crypto_list(crypto_list, &num_crypto) == -1) {
322  			return 0;
323  		}
324  	
325  		// coverity[UNINIT:SUPPRESS] - it's supposed to be...
326  		n = scandir(path, &namelist, dir_filter, alphasort);
327  		if (n == -1) {
328  			return 0;
329  		}
330  	
331  		/* Look for plugins in the list */
332  		for (i=0; i<n; i++) {
333  			for (j=0; j<num_crypto; j++) {
334  				if (strlen(namelist[i]->d_name) >= 7 &&
335  				    strncmp(crypto_list[j].name, namelist[i]->d_name+7,
336  					    strlen(crypto_list[j].name)) == 0) {
337  					crypto_found++;
338  				}
339  			}
340  			for (j=0; j<num_compress; j++) {
341  				if (strlen(namelist[i]->d_name) >= 9 &&
342  				    strncmp(compress_list[j].name, namelist[i]->d_name+9,
343  					    strlen(compress_list[j].name)) == 0) {
344  					compress_found++;
345  				}
346  			}
347  			free(namelist[i]);
348  		}
349  		free(namelist);
350  		/* If at least one plugin was found (or none were built) */
351  		if ((crypto_found || num_crypto == 0) &&
352  		    (compress_found || num_compress == 0)) {
353  			return 1;
354  		} else {
355  			return 0;
356  		}
357  	}
358  	
359  	
360  	/* libtool sets LD_LIBRARY_PATH to the build tree when running test in-tree */
361  	char *find_plugins_path(int logfd)
362  	{
363  		char *ld_libs_env = getenv("LD_LIBRARY_PATH");
364  		if (ld_libs_env) {
365  			char *ld_libs = strdup(ld_libs_env);
366  			char *str = strtok(ld_libs, ":");
367  			while (str) {
368  				if (contains_plugins(str)) {
369  					strncpy(plugin_path, str, sizeof(plugin_path)-1);
370  					free(ld_libs);
371  					log_test(logfd, "Using plugins from %.200s", plugin_path);
372  					return plugin_path;
373  				}
374  				str = strtok(NULL, ":");
375  			}
376  			free(ld_libs);
377  		}
378  		return NULL;
379  	}
380  	
381  	
382  	knet_handle_t _ts_knet_handle_start(int logfd, uint8_t log_level, knet_handle_t knet_h_array[])
383  	{
384  		knet_handle_t knet_h = knet_handle_new_ex(1, logfd, log_level, 0);
385  		char *plugins_path;
386  	
387  		if (knet_h) {
388  			log_test(logfd, "knet_handle_new at %p", knet_h);
389  			plugins_path = find_plugins_path(logfd);
390  			/* Use plugins from the build tree */
391  			if (plugins_path) {
392  				knet_h->plugin_path = plugins_path;
393  			}
394  			knet_h_array[1] = knet_h;
395  			return knet_h;
396  		} else {
397  			log_test(logfd, "knet_handle_new failed: %s", strerror(errno));
398  			stop_logging();
399  			exit(FAIL);
400  		}
401  	}
402  	
403  	int _ts_knet_handle_reconnect_links(knet_handle_t knet_h, int logfd)
404  	{
405  		size_t i, j;
406  		knet_node_id_t host_ids[KNET_MAX_HOST];
407  		uint8_t link_ids[KNET_MAX_LINK];
408  		size_t host_ids_entries = 0, link_ids_entries = 0;
409  		unsigned int enabled;
410  	
411  		if (!knet_h) {
412  			errno = EINVAL;
413  			return -1;
414  		}
415  	
416  		if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) {
417  			log_test(logfd, "knet_host_get_host_list failed: %s", strerror(errno));
418  			return -1;
419  		}
420  	
421  		for (i = 0; i < host_ids_entries; i++) {
422  			if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) {
423  				log_test(logfd, "knet_link_get_link_list failed: %s", strerror(errno));
424  				return -1;
425  			}
426  			for (j = 0; j < link_ids_entries; j++) {
427  				if (knet_link_get_enable(knet_h, host_ids[i], link_ids[j], &enabled)) {
428  					log_test(logfd, "knet_link_get_enable failed: %s", strerror(errno));
429  					return -1;
430  				}
431  				if (!enabled) {
432  					if (knet_link_set_enable(knet_h, host_ids[i], j, 1)) {
433  						log_test(logfd, "knet_link_set_enable failed: %s", strerror(errno));
434  						return -1;
435  					}
436  				}
437  			}
438  		}
439  	
440  		return 0;
441  	}
442  	
443  	int _ts_knet_handle_disconnect_links(knet_handle_t knet_h, int logfd)
444  	{
445  		size_t i, j;
446  		knet_node_id_t host_ids[KNET_MAX_HOST];
447  		uint8_t link_ids[KNET_MAX_LINK];
448  		size_t host_ids_entries = 0, link_ids_entries = 0;
449  		unsigned int enabled;
450  	
451  		if (!knet_h) {
452  			errno = EINVAL;
453  			return -1;
454  		}
455  	
456  		if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) {
457  			log_test(logfd, "knet_host_get_host_list failed: %s", strerror(errno));
458  			return -1;
459  		}
460  	
461  		for (i = 0; i < host_ids_entries; i++) {
462  			if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) {
463  				log_test(logfd, "knet_link_get_link_list failed: %s", strerror(errno));
464  				return -1;
465  			}
466  			for (j = 0; j < link_ids_entries; j++) {
467  				if (knet_link_get_enable(knet_h, host_ids[i], link_ids[j], &enabled)) {
468  					log_test(logfd, "knet_link_get_enable failed: %s", strerror(errno));
469  					return -1;
470  				}
471  				if (enabled) {
472  					if (knet_link_set_enable(knet_h, host_ids[i], j, 0)) {
473  						log_test(logfd, "knet_link_set_enable failed: %s", strerror(errno));
474  						return -1;
475  					}
476  				}
477  			}
478  		}
479  	
480  		return 0;
481  	}
482  	
483  	static int _make_local_sockaddr(struct sockaddr_storage *lo, int offset, int family, int logfd)
484  	{
485  		in_port_t port;
486  		char portstr[32];
487  	
488  		if (offset < 0) {
489  			/*
490  			 * api_knet_link_set_config needs to access the API directly, but
491  			 * it does not send any traffic, so it´s safe to ask the kernel
492  			 * for a random port.
493  			 */
494  			port = 0;
495  		} else {
496  			/* Use the pid if we can. but makes sure its in a sensible range */
497  			port = (getpid() + offset) % (TEST_PORT_MAX - TEST_PORT_BASE) + TEST_PORT_BASE;
498  		}
499  		sprintf(portstr, "%u", port);
500  		memset(lo, 0, sizeof(struct sockaddr_storage));
501  		log_test(logfd, "Using port %u", port);
502  	
503  		if (family == AF_INET6) {
504  			return knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage));
505  		}
506  		return knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage));
507  	}
508  	
509  	int make_local_sockaddr(struct sockaddr_storage *lo, int offset, int logfd)
510  	{
511  		return _make_local_sockaddr(lo, offset, AF_INET, logfd);
512  	}
513  	
514  	int make_local_sockaddr6(struct sockaddr_storage *lo, int offset, int logfd)
515  	{
516  		return _make_local_sockaddr(lo, offset, AF_INET6, logfd);
517  	}
518  	
519  	int _ts_knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
520  				  uint8_t transport, uint64_t flags, int family, int dynamic,
521  				  struct sockaddr_storage *lo, int logfd)
522  	{
523  		int err = 0, savederrno = 0;
524  		uint32_t port;
525  		char portstr[32];
526  	
527  		for (port = TEST_PORT_MIN; port < TEST_PORT_MAX; port++) {
528  			sprintf(portstr, "%u", port);
529  			memset(lo, 0, sizeof(struct sockaddr_storage));
530  			if (family == AF_INET6) {
531  				err = knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage));
532  			} else {
533  				err = knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage));
534  			}
535  			if (err < 0) {
536  				log_test(logfd, "Unable to convert loopback to sockaddr: %s", strerror(errno));
537  				goto out;
538  			}
539  			errno = 0;
540  			if (dynamic) {
541  				err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, NULL, flags);
542  			} else {
543  				err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, lo, flags);
544  			}
545  			savederrno = errno;
546  			if ((err < 0)  && (savederrno != EADDRINUSE)) {
547  				log_test(logfd, "Unable to configure link: %s", strerror(savederrno));
548  				goto out;
549  			}
550  			if (!err) {
551  				log_test(logfd, "Using port %u", port);
552  				goto out;
553  			}
554  		}
555  	
556  		if (err) {
557  			log_test(logfd, "No more ports available");
558  		}
559  	out:
560  		errno = savederrno;
561  		return err;
562  	}
563  	
564  	void test_sleep(int logfd, int seconds)
565  	{
566  		seconds = adjust_timeout_for_valgrind(seconds, logfd);
567  		log_test(logfd, "Sleeping for %d second%s", seconds, seconds == 1 ? "" : "s");
568  		sleep(seconds);
569  	}
570  	
571  	int wait_for_packet(knet_handle_t knet_h, int seconds, int datafd, int logfd)
572  	{
573  		fd_set rfds;
574  		struct timeval tv;
575  		int err = 0, i = 0;
576  	
577  		seconds = adjust_timeout_for_valgrind(seconds, logfd);
578  	
579  	try_again:
580  		FD_ZERO(&rfds);
581  		FD_SET(datafd, &rfds);
582  	
583  		tv.tv_sec = 1;
584  		tv.tv_usec = 0;
585  	
586  		err = select(datafd+1, &rfds, NULL, NULL, &tv);
587  		/*
588  		 * on slow arches the first call to select can return 0.
589  		 * pick an arbitrary 10 times loop (multiplied by waiting seconds)
590  		 * before failing.
591  		 */
592  		if ((!err) && (i < seconds)) {
593  			i++;
594  			goto try_again;
595  		}
596  		if ((err > 0) && (FD_ISSET(datafd, &rfds))) {
597  			return 0;
598  		}
599  	
600  		errno = ETIMEDOUT;
601  		return -1;
602  	}
603  	
604  	/*
605  	 * functional tests helpers
606  	 */
607  	
608  	void _ts_knet_handle_start_nodes(knet_handle_t knet_h[], uint8_t numnodes, int logfd, uint8_t log_level)
609  	{
610  		uint8_t i;
611  		char *plugins_path = find_plugins_path(logfd);
612  	
613  		for (i = 1; i <= numnodes; i++) {
614  			knet_h[i] = knet_handle_new_ex(i, logfd, log_level, 0);
615  			if (!knet_h[i]) {
616  				log_test(logfd, "failed to create handle: %s", strerror(errno));
617  				break;
618  			} else {
619  				log_test(logfd, "knet_h[%u] at %p", i, knet_h[i]);
620  			}
621  			/* Use plugins from the build tree */
622  			if (plugins_path) {
623  				knet_h[i]->plugin_path = plugins_path;
624  			}
625  		}
626  	
627  		if (i < numnodes) {
628  			_ts_knet_handle_stop_everything(knet_h, i, logfd);
629  			exit(FAIL);
630  		}
631  	
632  		return;
633  	}
634  	
635  	void _ts_knet_handle_join_nodes(knet_handle_t knet_h[], uint8_t numnodes, uint8_t numlinks, int family, uint8_t transport, int logfd)
636  	{
637  		uint8_t i, x, j, tmp_peer;
638  		struct sockaddr_storage lo;
639  	
640  		/*
641  		 * Phase 1: Add all peers and allocate ports with temporary configurations
642  		 * This binds ports without races - ports stay bound throughout
643  		 */
644  		log_test(logfd, "Phase 1: Adding peers and allocating ports for %u nodes with %u links each", numnodes, numlinks);
645  	
646  		for (i = 1; i <= numnodes; i++) {
647  			/* Add all peer nodes first */
648  			for (j = 1; j <= numnodes; j++) {
649  				if (j == i) {
650  					continue;
651  				}
652  	
653  				log_test(logfd, "host %u adding host: %u", i, j);
654  	
655  				if (knet_host_add(knet_h[i], j) < 0) {
656  					log_test(logfd, "Unable to add host: %s", strerror(errno));
657  					_ts_knet_handle_stop_everything(knet_h, numnodes, logfd);
658  					exit(FAIL);
659  				}
660  			}
661  	
662  			/* Configure links with temporary dst (use first peer as temporary dst) */
663  			tmp_peer = (i == 1) ? 2 : 1;
664  			for (x = 0; x < numlinks; x++) {
665  				/*
666  				 * Use _ts_knet_link_set_config to find available port
667  				 * Configure to temporary peer to bind the src port
668  				 * Port stays bound - we'll update dst in Phase 2
669  				 */
670  				if (_ts_knet_link_set_config(knet_h[i], tmp_peer, x, transport, 0, family, 0, &lo, logfd) < 0) {
671  					log_test(logfd, "Unable to allocate port for node %u link %u: %s", i, x, strerror(errno));
672  					_ts_knet_handle_stop_everything(knet_h, numnodes, logfd);
673  					exit(FAIL);
674  				}
675  	
676  				/*
677  				 * Hack: Clear the configured flag to allow reconfiguration to actual peer
678  				 * Port/socket stays bound, but API will allow reconfiguring to different host_id
679  				 */
680  				pthread_rwlock_wrlock(&knet_h[i]->global_rwlock);
681  				knet_h[i]->host_index[tmp_peer]->link[x].configured = 0;
682  				pthread_rwlock_unlock(&knet_h[i]->global_rwlock);
683  			}
684  		}
685  	
686  		/*
687  		 * Phase 2: Update link configurations with correct dst addresses
688  		 * All ports are now allocated and bound, just update destinations
689  		 */
690  		log_test(logfd, "Phase 2: Updating link destinations");
691  	
692  		for (i = 1; i <= numnodes; i++) {
693  			tmp_peer = (i == 1) ? 2 : 1;
694  			for (j = 1; j <= numnodes; j++) {
695  				if (j == i) {
696  					continue;
697  				}
698  	
699  				for (x = 0; x < numlinks; x++) {
700  					uint8_t tmp_peer_j = (j == 1) ? 2 : 1;
701  					struct sockaddr_storage src, dst;
702  	
703  					/*
704  					 * Copy addresses from link structures to local vars
705  					 * - src: node i's link x src_addr (bound in Phase 1)
706  					 * - dst: node j's link x src_addr (bound in Phase 1)
707  					 * Local copies required - passing pointers directly causes internal state issues
708  					 */
709  					memcpy(&src, &knet_h[i]->host_index[tmp_peer]->link[x].src_addr, sizeof(struct sockaddr_storage));
710  					memcpy(&dst, &knet_h[j]->host_index[tmp_peer_j]->link[x].src_addr, sizeof(struct sockaddr_storage));
711  	
712  					/*
713  					 * Update link configuration with correct dst
714  					 * Second call to knet_link_set_config updates the existing link
715  					 */
716  					if (knet_link_set_config(knet_h[i], j, x, transport, &src, &dst, 0) < 0) {
717  						log_test(logfd, "Unable to configure link: %s", strerror(errno));
718  						_ts_knet_handle_stop_everything(knet_h, numnodes, logfd);
719  						exit(FAIL);
720  					}
721  	
722  					if (knet_link_set_enable(knet_h[i], j, x, 1) < 0) {
723  						log_test(logfd, "unable to enable link: %s", strerror(errno));
724  						_ts_knet_handle_stop_everything(knet_h, numnodes, logfd);
725  						exit(FAIL);
726  					}
727  				}
728  			}
729  		}
730  	
731  		for (i = 1; i <= numnodes; i++) {
732  			wait_for_nodes_state(knet_h[i], numnodes, 1, TEST_TIMEOUT_LONG, logfd);
733  		}
734  	
735  		return;
736  	}
737  	
738  	
739  	static int target=0;
740  	
741  	static int state_wait_pipe[2] = {0,0};
742  	static int host_wait_pipe[2] = {0,0};
743  	static int callback_logfd = -1;
744  	
745  	static int count_nodes(knet_handle_t knet_h)
746  	{
747  		int nodes = 0;
748  		int i;
749  	
750  		for (i=0; i< KNET_MAX_HOST; i++) {
751  			if (knet_h->host_index[i] && knet_h->host_index[i]->status.reachable == 1) {
752  				nodes++;
753  			}
754  		}
755  		return nodes;
756  	}
757  	
758  	static void nodes_notify_callback(void *private_data,
759  					  knet_node_id_t host_id,
760  					  uint8_t reachable, uint8_t remote, uint8_t external)
761  	{
762  		knet_handle_t knet_h = (knet_handle_t) private_data;
763  		int nodes;
764  		int res;
765  	
766  		nodes = count_nodes(knet_h);
767  	
768  		if (nodes == target) {
769  			res = write(state_wait_pipe[1], ".", 1);
770  			if (res != 1) {
771  				log_test(callback_logfd, "***FAILed to signal wait_for_nodes_state: %s", strerror(errno));
772  			}
773  		}
774  	}
775  	
776  	/* Called atexit() */
777  	static void finish_state_pipes()
778  	{
779  		if (state_wait_pipe[0] != 0) {
780  			close(state_wait_pipe[0]);
781  			close(state_wait_pipe[1]);
782  			state_wait_pipe[0] = 0;
783  		}
784  		if (host_wait_pipe[0] != 0) {
785  			close(host_wait_pipe[0]);
786  			close(host_wait_pipe[1]);
787  			host_wait_pipe[0] = 0;
788  		}
789  	}
790  	
791  	static void host_notify_callback(void *private_data,
792  					 knet_node_id_t host_id,
793  					 uint8_t reachable, uint8_t remote, uint8_t external)
794  	{
795  		knet_handle_t knet_h = (knet_handle_t) private_data;
796  		int res;
797  	
798  		if (knet_h->host_index[host_id]->status.reachable == 1) {
799  			res = write(host_wait_pipe[1], ".", 1);
800  			if (res != 1) {
801  				log_test(callback_logfd, "***FAILed to signal wait_for_host: %s", strerror(errno));
802  			}
803  		}
804  	}
805  	
806  	int wait_for_reply(int seconds, int pipefd, int logfd)
807  	{
808  		int res;
809  		struct pollfd pfds;
810  		char tmpbuf[32];
811  	
812  		seconds = adjust_timeout_for_valgrind(seconds, logfd);
813  	
814  		pfds.fd = pipefd;
815  		pfds.events = POLLIN | POLLERR | POLLHUP;
816  		pfds.revents = 0;
817  	
818  		res = poll(&pfds, 1, seconds*1000);
819  		if (res == 1) {
820  			if (pfds.revents & POLLIN) {
821  				res = read(pipefd, tmpbuf, sizeof(tmpbuf));
822  				if (res > 0) {
823  					return 0;
824  				}
825  			} else {
826  				log_test(logfd, "Error on pipe poll revent = 0x%x", pfds.revents);
827  				errno = EIO;
828  			}
829  		}
830  		if (res == 0) {
831  			errno = ETIMEDOUT;
832  			return -1;
833  		}
834  	
835  		return -1;
836  	}
837  	
838  	/* Wait for a cluster of 'numnodes' to come up/go down */
839  	int wait_for_nodes_state(knet_handle_t knet_h, size_t numnodes,
840  				 uint8_t state, uint32_t seconds,
841  				 int logfd)
842  	{
843  		int res, savederrno = 0;
844  	
845  		callback_logfd = logfd;
846  	
847  		if (state_wait_pipe[0] == 0) {
848  			res = pipe(state_wait_pipe);
849  			if (res == -1) {
850  				savederrno = errno;
851  				log_test(logfd, "Error creating host reply pipe: %s", strerror(errno));
852  				errno = savederrno;
853  				return -1;
854  			}
855  			if (atexit(finish_state_pipes)) {
856  				log_test(logfd, "Unable to register atexit handler to close pipes: %s",
857  				       strerror(errno));
858  				exit(FAIL);
859  			}
860  	
861  		}
862  	
863  		if (state) {
864  			target = numnodes-1; /* exclude us */
865  		} else {
866  			target = 0; /* Wait for all to go down */
867  		}
868  	
869  		/* Set this before checking existing status or there's a race condition */
870  		knet_host_enable_status_change_notify(knet_h,
871  						      (void *)(long)knet_h,
872  						      nodes_notify_callback);
873  	
874  		/* Check we haven't already got all the nodes in the correct state */
875  		if (count_nodes(knet_h) == target) {
876  			log_test(logfd, "target already reached");
877  			knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
878  			return 0;
879  		}
880  	
881  		res = wait_for_reply(seconds, state_wait_pipe[0], logfd);
882  		if (res == -1) {
883  			savederrno = errno;
884  			log_test(logfd, "Error waiting for nodes status reply: %s", strerror(errno));
885  		}
886  	
887  		knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
888  		errno = savederrno;
889  		return res;
890  	}
891  	
892  	/* Wait for a single node to come up */
893  	int wait_for_host(knet_handle_t knet_h, uint16_t host_id, int seconds, int logfd)
894  	{
895  		int res = 0;
896  		int savederrno = 0;
897  	
898  		callback_logfd = logfd;
899  	
900  		if (host_wait_pipe[0] == 0) {
901  			res = pipe(host_wait_pipe);
902  			if (res == -1) {
903  				savederrno = errno;
904  				log_test(logfd, "Error creating host reply pipe: %s", strerror(errno));
905  				errno = savederrno;
906  				return -1;
907  			}
908  			if (atexit(finish_state_pipes)) {
909  				log_test(logfd, "Unable to register atexit handler to close pipes: %s",
910  				       strerror(errno));
911  				exit(FAIL);
912  			}
913  	
914  		}
915  	
916  		/* Set this before checking existing status or there's a race condition */
917  		knet_host_enable_status_change_notify(knet_h,
918  						      (void *)(long)knet_h,
919  						      host_notify_callback);
920  	
921  		/* Check it's not already reachable */
922  		if (knet_h->host_index[host_id]->status.reachable == 1) {
923  			knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
924  			return 0;
925  		}
926  	
927  		res = wait_for_reply(seconds, host_wait_pipe[0], logfd);
928  		if (res == -1) {
929  			savederrno = errno;
930  			log_test(logfd, "Error waiting for host status reply: %s", strerror(errno));
931  		}
932  	
933  		knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
934  	
935  		/* Still wait for it to settle */
936  		test_sleep(logfd, 1);
937  		errno = savederrno;
938  		return res;
939  	}
940  	
941  	/* Shutdown all nodes and links attached to an array of knet handles.
942  	 * Mostly stolen from corosync code (that I wrote, before anyone complains about licences)
943  	 */
944  	void _ts_knet_handle_stop_everything(knet_handle_t knet_h[], uint8_t numnodes, int logfd)
945  	{
946  		int res = 0;
947  		int h;
948  		size_t i,j;
949  		static knet_node_id_t nodes[KNET_MAX_HOST]; /* static to save stack */
950  		uint8_t links[KNET_MAX_LINK];
951  		size_t num_nodes;
952  		size_t num_links;
953  	
954  		for (h=1; h<numnodes+1; h++) {
955  			if (!knet_h[h]) {
956  				continue;
957  			}
958  	
959  			res = knet_handle_setfwd(knet_h[h], 0);
960  			if (res) {
961  				log_test(logfd, "knet_handle_setfwd failed: %s", strerror(errno));
962  			}
963  	
964  			res = knet_host_get_host_list(knet_h[h], nodes, &num_nodes);
965  			if (res) {
966  				log_test(logfd, "Cannot get knet node list for shutdown: %s", strerror(errno));
967  				continue;
968  			}
969  	
970  			/* Tidily shut down all nodes & links. */
971  			for (i=0; i<num_nodes; i++) {
972  	
973  				res = knet_link_get_link_list(knet_h[h], nodes[i], links, &num_links);
974  				if (res) {
975  					log_test(logfd, "Cannot get knet link list for node %u: %s", nodes[i], strerror(errno));
976  					goto finalise_error;
977  				}
978  				for (j=0; j<num_links; j++) {
979  					res = knet_link_set_enable(knet_h[h], nodes[i], links[j], 0);
980  					if (res) {
981  						log_test(logfd, "knet_link_set_enable(node %u, link %d) failed: %s", nodes[i], links[j], strerror(errno));
982  					}
983  					res = knet_link_clear_config(knet_h[h], nodes[i], links[j]);
984  					if (res) {
985  						log_test(logfd, "knet_link_clear_config(node %u, link %d) failed: %s", nodes[i], links[j], strerror(errno));
986  					}
987  				}
(36) Event example_assign: Example 5: Assigning: "res" = return value from "knet_host_remove(knet_h[h], nodes[i])".
Also see events: [check_return][example_checked][example_checked][example_checked][example_checked][example_checked]
988  				res = knet_host_remove(knet_h[h], nodes[i]);
(37) Event example_checked: Example 5 (cont.): "res" has its value checked in "res".
Also see events: [check_return][example_checked][example_checked][example_checked][example_checked][example_assign]
989  				if (res) {
990  					log_test(logfd, "knet_host_remove(node %u) failed: %s", nodes[i], strerror(errno));
991  				}
992  			}
993  	
994  		finalise_error:
995  			res = knet_handle_free(knet_h[h]);
996  			if (res) {
997  				log_test(logfd, "knet_handle_free failed: %s", strerror(errno));
998  			}
999  		}
1000 	}
1001 	
1002 	/*
1003 	 * Packet injector: Create a packet and inject it into a link's socket
1004 	 *
1005 	 * This allows testing RX validation without network-level packet manipulation.
1006 	 * The caller provides a seq_num to avoid packet deduplication in the RX thread.
1007 	 *
1008 	 * Returns 0 on success, -1 on error
1009 	 */
1010 	int inject_packet(knet_handle_t knet_h,
1011 			  uint8_t packet_type,
1012 			  knet_node_id_t src_host_id,
1013 			  uint8_t actual_link_id,
1014 			  uint8_t claimed_link_id,
1015 			  uint8_t frag_num,
1016 			  uint8_t frag_seq,
1017 			  seq_num_t seq_num,
1018 			  const char *payload,
1019 			  size_t payload_len)
1020 	{
1021 		struct knet_header *packet;
1022 		size_t packet_len;
1023 		struct knet_host *src_host;
1024 		struct knet_link *src_link;
1025 		ssize_t sent;
1026 		socklen_t addrlen;
1027 		struct timespec timestamp;
1028 	
1029 		/* Determine packet size based on type */
1030 		switch (packet_type) {
1031 		case KNET_HEADER_TYPE_DATA:
1032 			packet_len = KNET_HEADER_DATA_SIZE + payload_len;
1033 			break;
1034 		case KNET_HEADER_TYPE_PING:
1035 			packet_len = KNET_HEADER_PING_SIZE;
1036 			break;
1037 		default:
1038 			errno = EINVAL;
1039 			return -1;
1040 		}
1041 	
1042 		packet = malloc(packet_len);
1043 		if (!packet) {
1044 			return -1;
1045 		}
1046 	
1047 		memset(packet, 0, packet_len);
1048 	
1049 		/* Fill in common packet header */
1050 		packet->kh_version = 0;
1051 		packet->kh_type = packet_type;
1052 		packet->kh_node = htons(src_host_id);
1053 	
1054 		/* Fill in type-specific payload */
1055 		switch (packet_type) {
1056 		case KNET_HEADER_TYPE_DATA:
1057 			packet->khp_data_seq_num = htons(seq_num);
1058 			packet->khp_data_compress = 0;
1059 			packet->khp_data_bcast = 0;
1060 			packet->khp_data_channel = 0;
1061 			packet->khp_data_frag_num = frag_num;
1062 			packet->khp_data_frag_seq = frag_seq;
1063 	
1064 			/* Copy payload */
1065 			if (payload && payload_len > 0) {
1066 				memcpy(packet->khp_data_userdata, payload, payload_len);
1067 			}
1068 			break;
1069 		case KNET_HEADER_TYPE_PING:
1070 			packet->khp_ping_link = claimed_link_id;
1071 			clock_gettime(CLOCK_MONOTONIC, &timestamp);
1072 			memmove(&packet->khp_ping_time[0], &timestamp, sizeof(struct timespec));
1073 			packet->khp_ping_seq_num = htons(seq_num);
1074 			packet->khp_ping_timed = 1;
1075 			break;
1076 		}
1077 	
1078 		/* Get the source host and link to determine where to inject */
1079 		src_host = knet_h->host_index[src_host_id];
1080 		if (!src_host) {
1081 			free(packet);
1082 			return -1;
1083 		}
1084 	
1085 		src_link = &src_host->link[actual_link_id];
1086 	
1087 		/* Check if link is properly configured */
1088 		if (src_link->outsock < 0) {
1089 			free(packet);
1090 			return -1;
1091 		}
1092 	
1093 		/* Determine address length based on address family */
1094 		switch (src_link->dst_addr.ss_family) {
1095 		case AF_INET:
1096 			addrlen = sizeof(struct sockaddr_in);
1097 			break;
1098 		case AF_INET6:
1099 			addrlen = sizeof(struct sockaddr_in6);
1100 			break;
1101 		default:
1102 			free(packet);
1103 			return -1;
1104 		}
1105 	
1106 		/* Inject the packet by sending to ourselves (loopback) */
1107 		sent = sendto(src_link->outsock, packet, packet_len, MSG_DONTWAIT | MSG_NOSIGNAL,
1108 			      (struct sockaddr *)&src_link->dst_addr,
1109 			      addrlen);
1110 	
1111 		free(packet);
1112 	
1113 		if (sent != (ssize_t)packet_len) {
1114 			return -1;
1115 		}
1116 	
1117 		return 0;
1118 	}
1119 	
1120 	/*
1121 	 * Install a runtime log filter callback
1122 	 * Thread-safe via mutex protection
1123 	 */
1124 	void install_log_filter(int logfd, log_filter_fn filter_fn, void *private_data)
1125 	{
1126 		pthread_mutex_lock(&log_filter_mutex);
1127 		log_filter_callback = filter_fn;
1128 		log_filter_logfd = logfd;
1129 		log_filter_private_data = private_data;
1130 		log_pattern_found = 0; /* Reset flag when installing new filter */
1131 		pthread_mutex_unlock(&log_filter_mutex);
1132 	}
1133 	
1134 	/*
1135 	 * Check if log filter found a pattern match
1136 	 * Returns current value and resets the flag
1137 	 */
1138 	int check_log_pattern_found(void)
1139 	{
1140 		int found;
1141 	
1142 		pthread_mutex_lock(&log_filter_mutex);
1143 		found = log_pattern_found;
1144 		log_pattern_found = 0; /* Reset after reading */
1145 		pthread_mutex_unlock(&log_filter_mutex);
1146 	
1147 		return found;
1148 	}
1149