1    	
2    	/*
3    	 * Copyright (C) 2016-2025 Red Hat, Inc.  All rights reserved.
4    	 *
5    	 * Author: Fabio M. Di Nitto <fabbione@kronosnet.org>
6    	 *
7    	 * This software licensed under GPL-2.0+
8    	 */
9    	
10   	#include "config.h"
11   	
12   	#include <errno.h>
13   	#include <stdio.h>
14   	#include <string.h>
15   	#include <unistd.h>
16   	#include <stdlib.h>
17   	#include <sys/types.h>
18   	#include <sys/wait.h>
19   	#include <fcntl.h>
20   	#include <pthread.h>
21   	#include <dirent.h>
22   	#include <sys/select.h>
23   	#include <poll.h>
24   	
25   	#include "libknet.h"
26   	#include "test-common.h"
27   	
28   	static pthread_mutex_t log_mutex = PTHREAD_MUTEX_INITIALIZER;
29   	static int log_init = 0;
30   	static pthread_mutex_t log_thread_mutex = PTHREAD_MUTEX_INITIALIZER;
31   	static pthread_t log_thread;
32   	static int log_thread_init = 0;
33   	static int log_fds[2];
34   	struct log_thread_data {
35   		int logfd;
36   		FILE *std;
37   	};
38   	static struct log_thread_data data;
39   	static char plugin_path[PATH_MAX];
40   	
41   	static int _read_pipe(int fd, char **file, size_t *length)
42   	{
43   		char buf[4096];
44   		int n;
45   		int done = 0;
46   	
47   		*file = NULL;
48   		*length = 0;
49   	
50   		memset(buf, 0, sizeof(buf));
51   	
52   		while (!done) {
53   	
54   			n = read(fd, buf, sizeof(buf));
55   	
56   			if (n < 0) {
57   				if (errno == EINTR)
58   					continue;
59   	
60   				if (*file)
61   					free(*file);
62   	
63   				return n;
64   			}
65   	
66   			if (n == 0 && (!*length))
67   				return 0;
68   	
69   			if (n == 0)
70   				done = 1;
71   	
72   			if (*file)
73   				*file = realloc(*file, (*length) + n + done);
74   			else
75   				*file = malloc(n + done);
76   	
77   			if (!*file)
78   				return -1;
79   	
80   			memmove((*file) + (*length), buf, n);
81   			*length += (done + n);
82   		}
83   	
84   		/* Null terminator */
85   		(*file)[(*length) - 1] = 0;
86   	
87   		return 0;
88   	}
89   	
90   	int execute_shell(const char *command, char **error_string)
91   	{
92   		pid_t pid;
93   		int status, err = 0;
94   		int fd[2];
95   		size_t size = 0;
96   	
97   		if ((command == NULL) || (!error_string)) {
98   			errno = EINVAL;
99   			return FAIL;
100  		}
101  	
102  		*error_string = NULL;
103  	
104  		err = pipe(fd);
105  		if (err)
106  			goto out_clean;
107  	
108  		pid = fork();
109  		if (pid < 0) {
110  			err = pid;
111  			goto out_clean;
112  		}
113  	
114  		if (pid) { /* parent */
115  	
116  			close(fd[1]);
117  			err = _read_pipe(fd[0], error_string, &size);
118  			if (err)
119  				goto out_clean0;
120  	
121  			waitpid(pid, &status, 0);
122  			if (!WIFEXITED(status)) {
123  				err = -1;
124  				goto out_clean0;
125  			}
126  			if (WIFEXITED(status) && WEXITSTATUS(status) != 0) {
127  				err = WEXITSTATUS(status);
128  				goto out_clean0;
129  			}
130  			goto out_clean0;
131  		} else { /* child */
132  			close(0);
133  			close(1);
134  			close(2);
135  	
136  			close(fd[0]);
137  			dup2(fd[1], 1);
138  			dup2(fd[1], 2);
139  			close(fd[1]);
140  	
141  			execlp("/bin/sh", "/bin/sh", "-c", command, NULL);
142  			exit(FAIL);
143  		}
144  	
145  	out_clean:
146  		close(fd[1]);
147  	out_clean0:
148  		close(fd[0]);
149  	
150  		return err;
151  	}
152  	
153  	int is_memcheck(void)
154  	{
155  		char *val;
156  	
157  		val = getenv("KNETMEMCHECK");
158  	
159  		if (val) {
160  			if (!strncmp(val, "yes", 3)) {
161  				return 1;
162  			}
163  		}
164  	
165  		return 0;
166  	}
167  	
168  	int is_helgrind(void)
169  	{
170  		char *val;
171  	
172  		val = getenv("KNETHELGRIND");
173  	
174  		if (val) {
175  			if (!strncmp(val, "yes", 3)) {
176  				return 1;
177  			}
178  		}
179  	
180  		return 0;
181  	}
182  	
183  	void set_scheduler(int policy)
184  	{
185  		struct sched_param sched_param;
186  		int err;
187  	
188  		err = sched_get_priority_max(policy);
189  		if (err < 0) {
190  			printf("Could not get maximum scheduler priority\n");
191  			exit(FAIL);
192  		}
193  		sched_param.sched_priority = err;
194  		err = sched_setscheduler(0, policy, &sched_param);
195  		if (err < 0) {
196  			printf("Could not set priority\n");
197  			exit(FAIL);
198  		}
199  		return;
200  	}
201  	
202  	int setup_logpipes(int *logfds)
203  	{
204  		if (pipe2(logfds, O_CLOEXEC | O_NONBLOCK) < 0) {
205  			printf("Unable to setup logging pipe\n");
206  			exit(FAIL);
207  		}
208  	
209  		return PASS;
210  	}
211  	
212  	void close_logpipes(int *logfds)
213  	{
214  		close(logfds[0]);
215  		logfds[0] = 0;
216  		close(logfds[1]);
217  		logfds[1] = 0;
218  	}
219  	
220  	void flush_logs(int logfd, FILE *std)
221  	{
222  		struct knet_log_msg msg;
223  		int len;
224  	
225  		while (1) {
226  			len = read(logfd, &msg, sizeof(msg));
227  			if (len != sizeof(msg)) {
228  				/*
229  				 * clear errno to avoid incorrect propagation
230  				 */
231  				errno = 0;
232  				return;
233  			}
234  	
235  			msg.msg[sizeof(msg.msg) - 1] = 0;
236  	
237  			fprintf(std, "[knet]: [%s] %s: %.*s\n",
238  				knet_log_get_loglevel_name(msg.msglevel),
239  				knet_log_get_subsystem_name(msg.subsystem),
240  				KNET_MAX_LOG_MSG_SIZE, msg.msg);
241  		}
242  	}
243  	
244  	static void *_logthread(void *args)
245  	{
246  		while (1) {
247  			int num;
248  			struct timeval tv = { 60, 0 };
249  			fd_set rfds;
250  	
251  			FD_ZERO(&rfds);
252  			FD_SET(data.logfd, &rfds);
253  	
254  			num = select(FD_SETSIZE, &rfds, NULL, NULL, &tv);
255  			if (num < 0) {
256  				fprintf(data.std, "Unable select over logfd!\nHALTING LOGTHREAD!\n");
257  				return NULL;
258  			}
259  			if (num == 0) {
260  				fprintf(data.std, "[knet]: No logs in the last 60 seconds\n");
261  				continue;
262  			}
263  			if (FD_ISSET(data.logfd, &rfds)) {
264  				flush_logs(data.logfd, data.std);
265  			}
266  		}
267  	}
268  	
269  	int start_logthread(int logfd, FILE *std)
270  	{
271  		int savederrno = 0;
272  	
273  		savederrno = pthread_mutex_lock(&log_thread_mutex);
274  		if (savederrno) {
275  			printf("Unable to get log_thread mutex lock\n");
276  			return -1;
277  		}
278  	
279  		if (!log_thread_init) {
280  			data.logfd = logfd;
281  			data.std = std;
282  	
283  			savederrno = pthread_create(&log_thread, 0, _logthread, NULL);
284  			if (savederrno) {
285  				printf("Unable to start logging thread: %s\n", strerror(savederrno));
286  				pthread_mutex_unlock(&log_thread_mutex);
287  				return -1;
288  			}
289  			log_thread_init = 1;
290  		}
291  	
292  		pthread_mutex_unlock(&log_thread_mutex);
293  		return 0;
294  	}
295  	
296  	int stop_logthread(void)
297  	{
298  		int savederrno = 0;
299  		void *retval;
300  	
301  		savederrno = pthread_mutex_lock(&log_thread_mutex);
302  		if (savederrno) {
303  			printf("Unable to get log_thread mutex lock\n");
304  			return -1;
305  		}
306  	
307  		if (log_thread_init) {
308  			pthread_cancel(log_thread);
309  			pthread_join(log_thread, &retval);
310  			log_thread_init = 0;
311  		}
312  	
313  		pthread_mutex_unlock(&log_thread_mutex);
314  		return 0;
315  	}
316  	
317  	static void stop_logging(void)
318  	{
319  		stop_logthread();
320  		flush_logs(log_fds[0], stdout);
321  		close_logpipes(log_fds);
322  	}
323  	
324  	int start_logging(FILE *std)
325  	{
326  		int savederrno = 0;
327  	
328  		savederrno = pthread_mutex_lock(&log_mutex);
329  		if (savederrno) {
330  			printf("Unable to get log_mutex lock\n");
331  			return -1;
332  		}
333  	
334  		if (!log_init) {
335  			setup_logpipes(log_fds);
336  	
337  			if (atexit(&stop_logging) != 0) {
338  				printf("Unable to register atexit handler to stop logging: %s\n",
339  				       strerror(errno));
340  				exit(FAIL);
341  			}
342  	
343  			if (start_logthread(log_fds[0], std) < 0) {
344  				exit(FAIL);
345  			}
346  	
347  			log_init = 1;
348  		}
349  	
350  		pthread_mutex_unlock(&log_mutex);
351  	
352  		return log_fds[1];
353  	}
354  	
355  	static int dir_filter(const struct dirent *dname)
356  	{
357  		if ( (strcmp(dname->d_name + strlen(dname->d_name)-3, ".so") == 0) &&
358  		    ((strncmp(dname->d_name,"crypto", 6) == 0) ||
359  		     (strncmp(dname->d_name,"compress", 8) == 0))) {
360  			return 1;
361  		}
362  		return 0;
363  	}
364  	
365  	/* Make sure the proposed plugin path has at least 1 of each plugin available
366  	   - just as a sanity check really */
367  	static int contains_plugins(char *path)
368  	{
(1) Event var_decl: Declaring variable "namelist" without initializer.
Also see events: [uninit_use_in_call]
369  		struct dirent **namelist;
370  		int n,i;
371  		size_t j;
372  		struct knet_compress_info compress_list[256];
373  		struct knet_crypto_info crypto_list[256];
374  		size_t num_compress, num_crypto;
375  		size_t compress_found = 0;
376  		size_t crypto_found = 0;
377  	
(2) Event cond_false: Condition "knet_get_compress_list(compress_list, &num_compress) == -1", taking false branch.
378  		if (knet_get_compress_list(compress_list, &num_compress) == -1) {
379  			return 0;
(3) Event if_end: End of if statement.
380  		}
(4) Event cond_false: Condition "knet_get_crypto_list(crypto_list, &num_crypto) == -1", taking false branch.
381  		if (knet_get_crypto_list(crypto_list, &num_crypto) == -1) {
382  			return 0;
(5) Event if_end: End of if statement.
383  		}
384  	
(6) Event uninit_use_in_call: Using uninitialized value "namelist" when calling "scandir".
Also see events: [var_decl]
385  		n = scandir(path, &namelist, dir_filter, alphasort);
386  		if (n == -1) {
387  			return 0;
388  		}
389  	
390  		/* Look for plugins in the list */
391  		for (i=0; i<n; i++) {
392  			for (j=0; j<num_crypto; j++) {
393  				if (strlen(namelist[i]->d_name) >= 7 &&
394  				    strncmp(crypto_list[j].name, namelist[i]->d_name+7,
395  					    strlen(crypto_list[j].name)) == 0) {
396  					crypto_found++;
397  				}
398  			}
399  			for (j=0; j<num_compress; j++) {
400  				if (strlen(namelist[i]->d_name) >= 9 &&
401  				    strncmp(compress_list[j].name, namelist[i]->d_name+9,
402  					    strlen(compress_list[j].name)) == 0) {
403  					compress_found++;
404  				}
405  			}
406  			free(namelist[i]);
407  		}
408  		free(namelist);
409  		/* If at least one plugin was found (or none were built) */
410  		if ((crypto_found || num_crypto == 0) &&
411  		    (compress_found || num_compress == 0)) {
412  			return 1;
413  		} else {
414  			return 0;
415  		}
416  	}
417  	
418  	
419  	/* libtool sets LD_LIBRARY_PATH to the build tree when running test in-tree */
420  	char *find_plugins_path(void)
421  	{
422  		char *ld_libs_env = getenv("LD_LIBRARY_PATH");
423  		if (ld_libs_env) {
424  			char *ld_libs = strdup(ld_libs_env);
425  			char *str = strtok(ld_libs, ":");
426  			while (str) {
427  				if (contains_plugins(str)) {
428  					strncpy(plugin_path, str, sizeof(plugin_path)-1);
429  					free(ld_libs);
430  					printf("Using plugins from %s\n", plugin_path);
431  					return plugin_path;
432  				}
433  				str = strtok(NULL, ":");
434  			}
435  			free(ld_libs);
436  		}
437  		return NULL;
438  	}
439  	
440  	
441  	knet_handle_t knet_handle_start(int logfds[2], uint8_t log_level, knet_handle_t knet_h_array[])
442  	{
443  		knet_handle_t knet_h = knet_handle_new_ex(1, logfds[1], log_level, 0);
444  		char *plugins_path;
445  	
446  		if (knet_h) {
447  			printf("knet_handle_new at %p\n", knet_h);
448  			plugins_path = find_plugins_path();
449  			/* Use plugins from the build tree */
450  			if (plugins_path) {
451  				knet_h->plugin_path = plugins_path;
452  			}
453  			knet_h_array[1] = knet_h;
454  			flush_logs(logfds[0], stdout);
455  			return knet_h;
456  		} else {
457  			printf("knet_handle_new failed: %s\n", strerror(errno));
458  			flush_logs(logfds[0], stdout);
459  			close_logpipes(logfds);
460  			exit(FAIL);
461  		}
462  	}
463  	
464  	int knet_handle_reconnect_links(knet_handle_t knet_h)
465  	{
466  		size_t i, j;
467  		knet_node_id_t host_ids[KNET_MAX_HOST];
468  		uint8_t link_ids[KNET_MAX_LINK];
469  		size_t host_ids_entries = 0, link_ids_entries = 0;
470  		unsigned int enabled;
471  	
472  		if (!knet_h) {
473  			errno = EINVAL;
474  			return -1;
475  		}
476  	
477  		if (knet_host_get_host_list(knet_h, host_ids, &host_ids_entries) < 0) {
478  			printf("knet_host_get_host_list failed: %s\n", strerror(errno));
479  			return -1;
480  		}
481  	
482  		for (i = 0; i < host_ids_entries; i++) {
483  			if (knet_link_get_link_list(knet_h, host_ids[i], link_ids, &link_ids_entries)) {
484  				printf("knet_link_get_link_list failed: %s\n", strerror(errno));
485  				return -1;
486  			}
487  			for (j = 0; j < link_ids_entries; j++) {
488  				if (knet_link_get_enable(knet_h, host_ids[i], link_ids[j], &enabled)) {
489  					printf("knet_link_get_enable failed: %s\n", strerror(errno));
490  					return -1;
491  				}
492  				if (!enabled) {
493  					if (knet_link_set_enable(knet_h, host_ids[i], j, 1)) {
494  						printf("knet_link_set_enable failed: %s\n", strerror(errno));
495  						return -1;
496  					}
497  				}
498  			}
499  		}
500  	
501  		return 0;
502  	}
503  	
504  	static int _make_local_sockaddr(struct sockaddr_storage *lo, int offset, int family)
505  	{
506  		in_port_t port;
507  		char portstr[32];
508  	
509  		if (offset < 0) {
510  			/*
511  			 * api_knet_link_set_config needs to access the API directly, but
512  			 * it does not send any traffic, so it´s safe to ask the kernel
513  			 * for a random port.
514  			 */
515  			port = 0;
516  		} else {
517  			/* Use the pid if we can. but makes sure its in a sensible range */
518  			port = (getpid() + offset) % (65536-1024) + 1024;
519  		}
520  		sprintf(portstr, "%u", port);
521  		memset(lo, 0, sizeof(struct sockaddr_storage));
522  		printf("Using port %u\n", port);
523  	
524  		if (family == AF_INET6) {
525  			return knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage));
526  		}
527  		return knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage));
528  	}
529  	
530  	int make_local_sockaddr(struct sockaddr_storage *lo, int offset)
531  	{
532  		return _make_local_sockaddr(lo, offset, AF_INET);
533  	}
534  	
535  	int make_local_sockaddr6(struct sockaddr_storage *lo, int offset)
536  	{
537  		return _make_local_sockaddr(lo, offset, AF_INET6);
538  	}
539  	
540  	int _knet_link_set_config(knet_handle_t knet_h, knet_node_id_t host_id, uint8_t link_id,
541  				  uint8_t transport, uint64_t flags, int family, int dynamic,
542  				  struct sockaddr_storage *lo)
543  	{
544  		int err = 0, savederrno = 0;
545  		uint32_t port;
546  		char portstr[32];
547  	
548  		for (port = 1025; port < 65536; port++) {
549  			sprintf(portstr, "%u", port);
550  			memset(lo, 0, sizeof(struct sockaddr_storage));
551  			if (family == AF_INET6) {
552  				err = knet_strtoaddr("::1", portstr, lo, sizeof(struct sockaddr_storage));
553  			} else {
554  				err = knet_strtoaddr("127.0.0.1", portstr, lo, sizeof(struct sockaddr_storage));
555  			}
556  			if (err < 0) {
557  				printf("Unable to convert loopback to sockaddr: %s\n", strerror(errno));
558  				goto out;
559  			}
560  			errno = 0;
561  			if (dynamic) {
562  				err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, NULL, flags);
563  			} else {
564  				err = knet_link_set_config(knet_h, host_id, link_id, transport, lo, lo, flags);
565  			}
566  			savederrno = errno;
567  			if ((err < 0)  && (savederrno != EADDRINUSE)) {
568  				printf("Unable to configure link: %s\n", strerror(savederrno));
569  				goto out;
570  			}
571  			if (!err) {
572  				printf("Using port %u\n", port);
573  				goto out;
574  			}
575  		}
576  	
577  		if (err) {
578  			printf("No more ports available\n");
579  		}
580  	out:
581  		errno = savederrno;
582  		return err;
583  	}
584  	
585  	void test_sleep(knet_handle_t knet_h, int seconds)
586  	{
587  		if (is_memcheck() || is_helgrind()) {
588  			printf("Test suite is running under valgrind, adjusting sleep timers\n");
589  			seconds = seconds * 16;
590  		}
591  		sleep(seconds);
592  	}
593  	
594  	
595  	int wait_for_packet(knet_handle_t knet_h, int seconds, int datafd, int logfd, FILE *std)
596  	{
597  		fd_set rfds;
598  		struct timeval tv;
599  		int err = 0, i = 0;
600  	
601  		if (is_memcheck() || is_helgrind()) {
602  			printf("Test suite is running under valgrind, adjusting wait_for_packet timeout\n");
603  			seconds = seconds * 16;
604  		}
605  	
606  	try_again:
607  		FD_ZERO(&rfds);
608  		FD_SET(datafd, &rfds);
609  	
610  		tv.tv_sec = 1;
611  		tv.tv_usec = 0;
612  	
613  		err = select(datafd+1, &rfds, NULL, NULL, &tv);
614  		/*
615  		 * on slow arches the first call to select can return 0.
616  		 * pick an arbitrary 10 times loop (multiplied by waiting seconds)
617  		 * before failing.
618  		 */
619  		if ((!err) && (i < seconds)) {
620  			flush_logs(logfd, std);
621  			i++;
622  			goto try_again;
623  		}
624  		if ((err > 0) && (FD_ISSET(datafd, &rfds))) {
625  			return 0;
626  		}
627  	
628  		errno = ETIMEDOUT;
629  		return -1;
630  	}
631  	
632  	/*
633  	 * functional tests helpers
634  	 */
635  	
636  	void knet_handle_start_nodes(knet_handle_t knet_h[], uint8_t numnodes, int logfds[2], uint8_t log_level)
637  	{
638  		uint8_t i;
639  		char *plugins_path = find_plugins_path();
640  	
641  		for (i = 1; i <= numnodes; i++) {
642  			knet_h[i] = knet_handle_new_ex(i, logfds[1], log_level, 0);
643  			if (!knet_h[i]) {
644  				printf("failed to create handle: %s\n", strerror(errno));
645  				break;
646  			} else {
647  				printf("knet_h[%u] at %p\n", i, knet_h[i]);
648  			}
649  			/* Use plugins from the build tree */
650  			if (plugins_path) {
651  				knet_h[i]->plugin_path = plugins_path;
652  			}
653  		}
654  	
655  		if (i < numnodes) {
656  			knet_handle_stop_everything(knet_h, i);
657  			exit(FAIL);
658  		}
659  	
660  		return;
661  	}
662  	
663  	void knet_handle_join_nodes(knet_handle_t knet_h[], uint8_t numnodes, uint8_t numlinks, int family, uint8_t transport)
664  	{
665  		uint8_t i, x, j;
666  		struct sockaddr_storage src, dst;
667  		int offset = 0;
668  		int res;
669  	
670  		for (i = 1; i <= numnodes; i++) {
671  			for (j = 1; j <= numnodes; j++) {
672  				/*
673  				 * don´t connect to itself
674  				 */
675  				if (j == i) {
676  					continue;
677  				}
678  	
679  				printf("host %u adding host: %u\n", i, j);
680  	
681  				if (knet_host_add(knet_h[i], j) < 0) {
682  					printf("Unable to add host: %s\n", strerror(errno));
683  					knet_handle_stop_everything(knet_h, numnodes);
684  					exit(FAIL);
685  				}
686  	
687  				for (x = 0; x < numlinks; x++) {
688  					res = -1;
689  					offset = 0;
690  					while (i + x + offset++ < 65535 && res != 0) {
691  						if (_make_local_sockaddr(&src, i + x + offset, family) < 0) {
692  							printf("Unable to convert src to sockaddr: %s\n", strerror(errno));
693  							knet_handle_stop_everything(knet_h, numnodes);
694  							exit(FAIL);
695  						}
696  	
697  						if (_make_local_sockaddr(&dst, j + x + offset, family) < 0) {
698  							printf("Unable to convert dst to sockaddr: %s\n", strerror(errno));
699  							knet_handle_stop_everything(knet_h, numnodes);
700  							exit(FAIL);
701  						}
702  	
703  						res = knet_link_set_config(knet_h[i], j, x, transport, &src, &dst, 0);
704  					}
705  					printf("joining node %u with node %u via link %u src offset: %u dst offset: %u\n", i, j, x, i+x, j+x);
706  					if (knet_link_set_enable(knet_h[i], j, x, 1) < 0) {
707  						printf("unable to enable link: %s\n", strerror(errno));
708  						knet_handle_stop_everything(knet_h, numnodes);
709  						exit(FAIL);
710  					}
711  				}
712  			}
713  		}
714  	
715  		for (i = 1; i <= numnodes; i++) {
716  			wait_for_nodes_state(knet_h[i], numnodes, 1, 600, knet_h[1]->logfd, stdout);
717  		}
718  		return;
719  	}
720  	
721  	
722  	static int target=0;
723  	
724  	static int state_wait_pipe[2] = {0,0};
725  	static int host_wait_pipe[2] = {0,0};
726  	
727  	static int count_nodes(knet_handle_t knet_h)
728  	{
729  		int nodes = 0;
730  		int i;
731  	
732  		for (i=0; i< KNET_MAX_HOST; i++) {
733  			if (knet_h->host_index[i] && knet_h->host_index[i]->status.reachable == 1) {
734  				nodes++;
735  			}
736  		}
737  		return nodes;
738  	}
739  	
740  	static void nodes_notify_callback(void *private_data,
741  					  knet_node_id_t host_id,
742  					  uint8_t reachable, uint8_t remote, uint8_t external)
743  	{
744  		knet_handle_t knet_h = (knet_handle_t) private_data;
745  		int nodes;
746  		int res;
747  	
748  		nodes = count_nodes(knet_h);
749  	
750  		if (nodes == target) {
751  			res = write(state_wait_pipe[1], ".", 1);
752  			if (res != 1) {
753  				printf("***FAILed to signal wait_for_nodes_state: %s\n", strerror(errno));
754  			}
755  		}
756  	}
757  	
758  	/* Called atexit() */
759  	static void finish_state_pipes()
760  	{
761  		if (state_wait_pipe[0] != 0) {
762  			close(state_wait_pipe[0]);
763  			close(state_wait_pipe[1]);
764  			state_wait_pipe[0] = 0;
765  		}
766  		if (host_wait_pipe[0] != 0) {
767  			close(host_wait_pipe[0]);
768  			close(host_wait_pipe[1]);
769  			host_wait_pipe[0] = 0;
770  		}
771  	}
772  	
773  	static void host_notify_callback(void *private_data,
774  					 knet_node_id_t host_id,
775  					 uint8_t reachable, uint8_t remote, uint8_t external)
776  	{
777  		knet_handle_t knet_h = (knet_handle_t) private_data;
778  		int res;
779  	
780  		if (knet_h->host_index[host_id]->status.reachable == 1) {
781  			res = write(host_wait_pipe[1], ".", 1);
782  			if (res != 1) {
783  				printf("***FAILed to signal wait_for_host: %s\n", strerror(errno));
784  			}
785  		}
786  	}
787  	
788  	static int wait_for_reply(int seconds, int pipefd)
789  	{
790  		int res;
791  		struct pollfd pfds;
792  		char tmpbuf[32];
793  	
794  		pfds.fd = pipefd;
795  		pfds.events = POLLIN | POLLERR | POLLHUP;
796  		pfds.revents = 0;
797  	
798  		res = poll(&pfds, 1, seconds*1000);
799  		if (res == 1) {
800  			if (pfds.revents & POLLIN) {
801  				res = read(pipefd, tmpbuf, sizeof(tmpbuf));
802  				if (res > 0) {
803  					return 0;
804  				}
805  			} else {
806  				printf("Error on pipe poll revent = 0x%x\n", pfds.revents);
807  				errno = EIO;
808  			}
809  		}
810  		if (res == 0) {
811  			errno = ETIMEDOUT;
812  			return -1;
813  		}
814  	
815  		return -1;
816  	}
817  	
818  	/* Wait for a cluster of 'numnodes' to come up/go down */
819  	int wait_for_nodes_state(knet_handle_t knet_h, size_t numnodes,
820  				 uint8_t state, uint32_t timeout,
821  				 int logfd, FILE *std)
822  	{
823  		int res, savederrno = 0;
824  	
825  		if (state_wait_pipe[0] == 0) {
826  			res = pipe(state_wait_pipe);
827  			if (res == -1) {
828  				savederrno = errno;
829  				printf("Error creating host reply pipe: %s\n", strerror(errno));
830  				errno = savederrno;
831  				return -1;
832  			}
833  			if (atexit(finish_state_pipes)) {
834  				printf("Unable to register atexit handler to close pipes: %s\n",
835  				       strerror(errno));
836  				exit(FAIL);
837  			}
838  	
839  		}
840  	
841  		if (state) {
842  			target = numnodes-1; /* exclude us */
843  		} else {
844  			target = 0; /* Wait for all to go down */
845  		}
846  	
847  		/* Set this before checking existing status or there's a race condition */
848  		knet_host_enable_status_change_notify(knet_h,
849  						      (void *)(long)knet_h,
850  						      nodes_notify_callback);
851  	
852  		/* Check we haven't already got all the nodes in the correct state */
853  		if (count_nodes(knet_h) == target) {
854  			fprintf(stderr, "target already reached\n");
855  			knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
856  			flush_logs(logfd, std);
857  			return 0;
858  		}
859  	
860  		res = wait_for_reply(timeout, state_wait_pipe[0]);
861  		if (res == -1) {
862  			savederrno = errno;
863  			printf("Error waiting for nodes status reply: %s\n", strerror(errno));
864  		}
865  	
866  		knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
867  		flush_logs(logfd, std);
868  		errno = savederrno;
869  		return res;
870  	}
871  	
872  	/* Wait for a single node to come up */
873  	int wait_for_host(knet_handle_t knet_h, uint16_t host_id, int seconds, int logfd, FILE *std)
874  	{
875  		int res = 0;
876  		int savederrno = 0;
877  	
878  		if (is_memcheck() || is_helgrind()) {
879  			printf("Test suite is running under valgrind, adjusting wait_for_host timeout\n");
880  			seconds = seconds * 16;
881  		}
882  	
883  		if (host_wait_pipe[0] == 0) {
884  			res = pipe(host_wait_pipe);
885  			if (res == -1) {
886  				savederrno = errno;
887  				printf("Error creating host reply pipe: %s\n", strerror(errno));
888  				errno = savederrno;
889  				return -1;
890  			}
891  			if (atexit(finish_state_pipes)) {
892  				printf("Unable to register atexit handler to close pipes: %s\n",
893  				       strerror(errno));
894  				exit(FAIL);
895  			}
896  	
897  		}
898  	
899  		/* Set this before checking existing status or there's a race condition */
900  		knet_host_enable_status_change_notify(knet_h,
901  						      (void *)(long)knet_h,
902  						      host_notify_callback);
903  	
904  		/* Check it's not already reachable */
905  		if (knet_h->host_index[host_id]->status.reachable == 1) {
906  			knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
907  			flush_logs(logfd, std);
908  			return 0;
909  		}
910  	
911  		res = wait_for_reply(seconds, host_wait_pipe[0]);
912  		if (res == -1) {
913  			savederrno = errno;
914  			printf("Error waiting for host status reply: %s\n", strerror(errno));
915  		}
916  	
917  		knet_host_enable_status_change_notify(knet_h, (void *)(long)0, NULL);
918  	
919  		/* Still wait for it to settle */
920  		flush_logs(logfd, std);
921  		test_sleep(knet_h, 1);
922  		errno = savederrno;
923  		return res;
924  	}
925  	
926  	void clean_exit(knet_handle_t *knet_h, int testnodes, int *logfds, int exit_status)
927  	{
928  		knet_handle_stop_everything(knet_h, testnodes);
929  		stop_logthread();
930  		flush_logs(logfds[0], stdout);
931  		close_logpipes(logfds);
932  		if (exit_status != CONTINUE) {
933  			exit(exit_status);
934  		}
935  	}
936  	
937  	/* Shutdown all nodes and links attached to an array of knet handles.
938  	 * Mostly stolen from corosync code (that I wrote, before anyone complains about licences)
939  	 */
940  	void knet_handle_stop_everything(knet_handle_t knet_h[], uint8_t numnodes)
941  	{
942  		int res = 0;
943  		int h;
944  		size_t i,j;
945  		static knet_node_id_t nodes[KNET_MAX_HOST]; /* static to save stack */
946  		uint8_t links[KNET_MAX_LINK];
947  		size_t num_nodes;
948  		size_t num_links;
949  	
950  		for (h=1; h<numnodes+1; h++) {
951  			res = knet_handle_setfwd(knet_h[h], 0);
952  			if (res) {
953  				perror("knet_handle_setfwd failed");
954  			}
955  	
956  			res = knet_host_get_host_list(knet_h[h], nodes, &num_nodes);
957  			if (res) {
958  				perror("Cannot get knet node list for shutdown");
959  				continue;
960  			}
961  	
962  			/* Tidily shut down all nodes & links. */
963  			for (i=0; i<num_nodes; i++) {
964  	
965  				res = knet_link_get_link_list(knet_h[h], nodes[i], links, &num_links);
966  				if (res) {
967  					fprintf(stderr, "Cannot get knet link list for node %u  %s\n", nodes[i], strerror(errno));
968  					goto finalise_error;
969  				}
970  				for (j=0; j<num_links; j++) {
971  					res = knet_link_set_enable(knet_h[h], nodes[i], links[j], 0);
972  					if (res) {
973  						fprintf(stderr, "knet_link_set_enable(node %u, link %d) failed: %s\n", nodes[i], links[j], strerror(errno));
974  					}
975  					res = knet_link_clear_config(knet_h[h], nodes[i], links[j]);
976  					if (res) {
977  						fprintf(stderr, "knet_link_clear_config(node %u, link %d) failed: %s\n", nodes[i], links[j], strerror(errno));
978  					}
979  				}
980  				res = knet_host_remove(knet_h[h], nodes[i]);
981  				if (res) {
982  					fprintf(stderr, "knet_host_remove(node %u) failed: %s\n", nodes[i], strerror(errno));
983  				}
984  			}
985  	
986  		finalise_error:
987  			res = knet_handle_free(knet_h[h]);
988  			if (res) {
989  				fprintf(stderr, "knet_handle_free failed: %s\n", strerror(errno));
990  			}
991  		}
992  	}
993