1    	/*
2    	 * Copyright (C) 2016-2025 Red Hat, Inc.  All rights reserved.
3    	 *
4    	 * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
5    	 *
6    	 * This software licensed under GPL-2.0+
7    	 */
8    	
9    	#include "config.h"
10   	
11   	#include <errno.h>
12   	#include <stdio.h>
13   	#include <stdlib.h>
14   	#include <string.h>
15   	#include <unistd.h>
16   	#include <signal.h>
17   	#include <pthread.h>
18   	#include <sys/types.h>
19   	#include <inttypes.h>
20   	
21   	#include "libknet.h"
22   	
23   	#include "compat.h"
24   	#include "internals.h"
25   	#include "netutils.h"
26   	#include "transport_common.h"
27   	#include "threads_common.h"
28   	#include "test-common.h"
29   	
30   	#define MAX_NODES 128
31   	
32   	static int senderid = -1;
33   	static int thisnodeid = -1;
34   	static knet_handle_t knet_h;
35   	static int datafd = 0;
36   	static int8_t channel = 0;
37   	static int globallistener = 0;
38   	static int continous = 0;
39   	static int show_stats = 0;
40   	static struct sockaddr_storage allv4;
41   	static struct sockaddr_storage allv6;
42   	static int broadcast_test = 1;
43   	static pthread_t rx_thread = {0};
44   	static char *rx_buf[PCKT_FRAG_MAX];
45   	static int wait_for_perf_rx = 0;
46   	static char *compresscfg = NULL;
47   	static char *cryptocfg = NULL;
48   	static int machine_output = 0;
49   	static int use_access_lists = 0;
50   	static int use_pckt_verification = 0;
51   	
52   	static int bench_shutdown_in_progress = 0;
53   	static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER;
54   	
55   	#define TEST_PING 0
56   	#define TEST_PING_AND_DATA 1
57   	#define TEST_PERF_BY_SIZE 2
58   	#define TEST_PERF_BY_TIME 3
59   	
60   	static int test_type = TEST_PING;
61   	
62   	#define TEST_START 2
63   	#define TEST_STOP 4
64   	#define TEST_COMPLETE 6
65   	
66   	#define ONE_GIGABYTE 1073741824
67   	
68   	static uint64_t perf_by_size_size = 1 * ONE_GIGABYTE;
69   	static uint64_t perf_by_time_secs = 10;
70   	
71   	static uint32_t force_packet_size = 0;
72   	
73   	struct node {
74   		int nodeid;
75   		int links;
76   		uint8_t transport[KNET_MAX_LINK];
77   		struct sockaddr_storage address[KNET_MAX_LINK];
78   	};
79   	
80   	struct pckt_ver {
81   		uint32_t len;
82   		uint32_t chksum;
83   	};
84   	
85   	static void print_help(void)
86   	{
87   		printf("knet_bench usage:\n");
88   		printf(" -h                                        print this help (no really)\n");
89   		printf(" -d                                        enable debug logs (default INFO)\n");
90   		printf(" -f                                        enable use of access lists (default: off)\n");
91   		printf(" -c [implementation]:[crypto]:[hashing]    crypto configuration. (default disabled)\n");
92   		printf("                                           Example: -c nss:aes128:sha1\n");
93   		printf(" -z [implementation]:[level]:[threshold]   compress configuration. (default disabled)\n");
94   		printf("                                           Example: -z zlib:5:100\n");
95   		printf(" -p [active|passive|rr]                    (default: passive)\n");
96   		printf(" -P [UDP|SCTP]                             (default: UDP) protocol (transport) to use for all links\n");
97   		printf(" -t [nodeid]                               This nodeid (required)\n");
98   		printf(" -n [nodeid],[proto]/[link1_ip],[link2_..] Other nodes information (at least one required)\n");
99   		printf("                                           Example: -n 1,192.168.8.1,SCTP/3ffe::8:1,UDP/172...\n");
100  		printf("                                           can be repeated up to %d and should contain also the localnode info\n", MAX_NODES);
101  		printf(" -b [port]                                 baseport (default: 50000)\n");
102  		printf(" -l                                        enable global listener on 0.0.0.0/:: (default: off, incompatible with -o)\n");
103  		printf(" -o                                        enable baseport offset per nodeid\n");
104  		printf(" -m                                        change PMTUd interval in seconds (default: 60)\n");
105  		printf(" -w                                        dont wait for all nodes to be up before starting the test (default: wait)\n");
106  		printf(" -T [ping|ping_data|perf-by-size|perf-by-time]\n");
107  		printf("                                           test type (default: ping)\n");
108  		printf("                                           ping: will wait for all hosts to join the knet network, sleep 5 seconds and quit\n");
109  		printf("                                           ping_data: will wait for all hosts to join the knet network, sends some data to all nodes and quit\n");
110  		printf("                                           perf-by-size: will wait for all hosts to join the knet network,\n");
111  		printf("                                                         perform a series of benchmarks by transmitting a known\n");
112  		printf("                                                         size/quantity of packets and measuring the time, then quit\n");
113  		printf("                                           perf-by-time: will wait for all hosts to join the knet network,\n");
114  		printf("                                                         perform a series of benchmarks by transmitting a known\n");
115  		printf("                                                         size of packets for a given amount of time (10 seconds)\n");
116  		printf("                                                         and measuring the quantity of data transmitted, then quit\n");
117  		printf(" -s                                        nodeid that will generate traffic for benchmarks\n");
118  		printf(" -S [size|seconds]                         when used in combination with -T perf-by-size it indicates how many GB of traffic to generate for the test. (default: 1GB)\n");
119  		printf("                                           when used in combination with -T perf-by-time it indicates how many Seconds of traffic to generate for the test. (default: 10 seconds)\n");
120  		printf(" -x                                        force packet size for perf-by-time or perf-by-size\n");
121  		printf(" -C                                        repeat the test continously (default: off)\n");
122  		printf(" -X[XX]                                    show stats at the end of the run (default: 1)\n");
123  		printf("                                           1: show handle stats, 2: show summary link stats\n");
124  		printf("                                           3: show detailed link stats\n");
125  		printf(" -a                                        enable machine parsable output (default: off).\n");
126  		printf(" -v                                        enable packet verification for performance tests (default: off).\n");
127  	}
128  	
129  	static void parse_nodes(char *nodesinfo[MAX_NODES], int onidx, int port, struct node nodes[MAX_NODES], int *thisidx)
130  	{
131  		int i;
132  		char *temp = NULL;
133  		char port_str[11];
134  	
135  		memset(port_str, 0, sizeof(port_str));
136  		snprintf(port_str, sizeof(port_str), "%d", port);
137  	
138  		for (i = 0; i < onidx; i++) {
139  			nodes[i].nodeid = atoi(strtok(nodesinfo[i], ","));
140  			if ((nodes[i].nodeid < 0) || (nodes[i].nodeid > KNET_MAX_HOST)) {
141  				printf("Invalid nodeid: %d (0 - %d)\n", nodes[i].nodeid, KNET_MAX_HOST);
142  				exit(FAIL);
143  			}
144  			if (thisnodeid == nodes[i].nodeid) {
145  				*thisidx = i;
146  			}
147  			while((temp = strtok(NULL, ","))) {
148  				char *slash = NULL;
149  				uint8_t transport;
150  	
151  				if (nodes[i].links == KNET_MAX_LINK) {
152  					printf("Too many links configured. Max %d\n", KNET_MAX_LINK);
153  					exit(FAIL);
154  				}
155  	
156  				slash = strstr(temp, "/");
157  				if (slash) {
158  					memset(slash, 0, 1);
159  					transport = knet_get_transport_id_by_name(temp);
160  					if (transport == KNET_MAX_TRANSPORTS) {
161  						printf("Unknown transport: %s\n", temp);
162  						exit(FAIL);
163  					}
164  					nodes[i].transport[nodes[i].links] = transport;
165  					temp = slash + 1;
166  				} else {
167  					nodes[i].transport[nodes[i].links] = KNET_TRANSPORT_UDP;
168  				}
169  	
170  				if (knet_strtoaddr(temp, port_str,
171  						   &nodes[i].address[nodes[i].links],
172  						   sizeof(struct sockaddr_storage)) < 0) {
173  					printf("Unable to convert %s to sockaddress\n", temp);
174  					exit(FAIL);
175  				}
176  				nodes[i].links++;
177  			}
178  		}
179  	
180  		if (knet_strtoaddr("0.0.0.0", port_str, &allv4, sizeof(struct sockaddr_storage)) < 0) {
181  			printf("Unable to convert 0.0.0.0 to sockaddress\n");
182  			exit(FAIL);
183  		}
184  	
185  		if (knet_strtoaddr("::", port_str, &allv6, sizeof(struct sockaddr_storage)) < 0) {
186  			printf("Unable to convert :: to sockaddress\n");
187  			exit(FAIL);
188  		}
189  	
190  		for (i = 1; i < onidx; i++) {
191  			if (nodes[0].links != nodes[i].links) {
192  				printf("knet_bench does not support unbalanced link configuration\n");
193  				exit(FAIL);
194  			}
195  		}
196  	
197  		return;
198  	}
199  	
200  	static int private_data;
201  	
202  	static void sock_notify(void *pvt_data,
203  				int local_datafd,
204  				int8_t local_channel,
205  				uint8_t tx_rx,
206  				int error,
207  				int errorno)
208  	{
209  		printf("[info]: error (%d - %d - %s) from socket: %d\n", error, errorno, strerror(errno), local_datafd);
210  		return;
211  	}
212  	
213  	static int ping_dst_host_filter(void *pvt_data,
214  					const unsigned char *outdata,
215  					ssize_t outdata_len,
216  					uint8_t tx_rx,
217  					knet_node_id_t this_host_id,
218  					knet_node_id_t src_host_id,
219  					int8_t *dst_channel,
220  					knet_node_id_t *dst_host_ids,
221  					size_t *dst_host_ids_entries)
222  	{
223  		if (broadcast_test) {
224  			return 1;
225  		}
226  	
227  		if (tx_rx == KNET_NOTIFY_TX) {
228  			memmove(&dst_host_ids[0], outdata, 2);
229  		} else {
230  			dst_host_ids[0] = this_host_id;
231  		}
232  		*dst_host_ids_entries = 1;
233  		return 0;
234  	}
235  	
236  	static void setup_knet(int argc, char *argv[])
237  	{
238  		int logfd = 0;
239  		int rv;
240  		char *policystr = NULL, *protostr = NULL;
241  		char *othernodeinfo[MAX_NODES];
242  		struct node nodes[MAX_NODES];
243  		int thisidx = -1;
244  		int onidx = 0;
245  		int debug = KNET_LOG_INFO;
246  		int port = 50000, portoffset = 0;
247  		int thisport = 0, otherport = 0;
248  		int thisnewport = 0, othernewport = 0;
249  		struct sockaddr_in *so_in;
250  		struct sockaddr_in6 *so_in6;
251  		struct sockaddr_storage *src;
252  		int i, link_idx, allnodesup = 0;
253  		int policy = KNET_LINK_POLICY_PASSIVE, policyfound = 0;
254  		int protocol = KNET_TRANSPORT_UDP, protofound = 0;
255  		int wait = 1;
256  		int pmtud_interval = 60;
257  		struct knet_handle_crypto_cfg knet_handle_crypto_cfg;
258  		char *cryptomodel = NULL, *cryptotype = NULL, *cryptohash = NULL;
259  		struct knet_handle_compress_cfg knet_handle_compress_cfg;
260  	
261  		memset(nodes, 0, sizeof(nodes));
262  	
263  		while ((rv = getopt(argc, argv, "aCT:S:s:lvdfom:wb:t:n:c:p:x:X::P:z:h")) != EOF) {
264  			switch(rv) {
265  				case 'h':
266  					print_help();
267  					exit(PASS);
268  					break;
269  				case 'a':
270  					machine_output = 1;
271  					break;
272  				case 'd':
273  					debug = KNET_LOG_DEBUG;
274  					break;
275  				case 'f':
276  					use_access_lists = 1;
277  					break;
278  				case 'c':
279  					if (cryptocfg) {
280  						printf("Error: -c can only be specified once\n");
281  						exit(FAIL);
282  					}
283  					cryptocfg = optarg;
284  					break;
285  				case 'p':
286  					if (policystr) {
287  						printf("Error: -p can only be specified once\n");
288  						exit(FAIL);
289  					}
290  					if (optarg) {
291  						policystr = optarg;
292  						if (!strcmp(policystr, "active")) {
293  							policy = KNET_LINK_POLICY_ACTIVE;
294  							policyfound = 1;
295  						}
296  						/*
297  						 * we can't use rr because clangs can't compile
298  						 * an array of 3 strings, one of which is 2 bytes long
299  						 */
300  						if (!strcmp(policystr, "round-robin")) {
301  							policy = KNET_LINK_POLICY_RR;
302  							policyfound = 1;
303  						}
304  						if (!strcmp(policystr, "passive")) {
305  							policy = KNET_LINK_POLICY_PASSIVE;
306  							policyfound = 1;
307  						}
308  					}
309  					if (!policyfound) {
310  						printf("Error: invalid policy %s specified. -p accepts active|passive|rr\n", policystr);
311  						exit(FAIL);
312  					}
313  					break;
314  			        case 'P':
315  					if (protostr) {
316  						printf("Error: -P can only be specified once\n");
317  						exit(FAIL);
318  					}
319  					if (optarg) {
320  						protostr = optarg;
321  						if (!strcmp(protostr, "UDP")) {
322  							protocol = KNET_TRANSPORT_UDP;
323  							protofound = 1;
324  						}
325  						if (!strcmp(protostr, "SCTP")) {
326  							protocol = KNET_TRANSPORT_SCTP;
327  							protofound = 1;
328  						}
329  					}
330  					if (!protofound) {
331  						printf("Error: invalid protocol %s specified. -P accepts udp|sctp\n", policystr);
332  						exit(FAIL);
333  					}
334  					break;
335  				case 't':
336  					if (thisnodeid >= 0) {
337  						printf("Error: -t can only be specified once\n");
338  						exit(FAIL);
339  					}
340  					thisnodeid = atoi(optarg);
341  					if ((thisnodeid < 0) || (thisnodeid > 65536)) {
342  						printf("Error: -t nodeid out of range %d (1 - 65536)\n", thisnodeid);
343  	                                        exit(FAIL);
344  					}
345  					break;
346  				case 'n':
347  					if (onidx == MAX_NODES) {
348  						printf("Error: too many other nodes. Max %d\n", MAX_NODES);
349  						exit(FAIL);
350  					}
351  					othernodeinfo[onidx] = optarg;
352  					onidx++;
353  					break;
354  				case 'b':
355  					port = atoi(optarg);
356  					if ((port < 1) || (port > 65536)) {
357  						printf("Error: port %d out of range (1 - 65536)\n", port);
358  						exit(FAIL);
359  					}
360  					break;
361  				case 'o':
362  					if (globallistener) {
363  						printf("Error: -l cannot be used with -o\n");
364  						exit(FAIL);
365  					}
366  					portoffset = 1;
367  					break;
368  				case 'm':
369  					pmtud_interval = atoi(optarg);
370  					if (pmtud_interval < 1) {
371  						printf("Error: pmtud interval %d out of range (> 0)\n", pmtud_interval);
372  						exit(FAIL);
373  					}
374  					break;
375  				case 'l':
376  					if (portoffset) {
377  						printf("Error: -o cannot be used with -l\n");
378  						exit(FAIL);
379  					}
380  					globallistener = 1;
381  					break;
382  				case 'w':
383  					wait = 0;
384  					break;
385  				case 's':
386  					if (senderid >= 0) {
387  						printf("Error: -s can only be specified once\n");
388  						exit(FAIL);
389  					}
390  					senderid = atoi(optarg);
391  					if ((senderid < 0) || (senderid > 65536)) {
392  						printf("Error: -s nodeid out of range %d (1 - 65536)\n", senderid);
393  	                                        exit(FAIL);
394  					}
395  					break;
396  				case 'T':
397  					if (optarg) {
398  						if (!strcmp("ping", optarg)) {
399  							test_type = TEST_PING;
400  						}
401  						if (!strcmp("ping_data", optarg)) {
402  							test_type = TEST_PING_AND_DATA;
403  						}
404  						if (!strcmp("perf-by-size", optarg)) {
405  							test_type = TEST_PERF_BY_SIZE;
406  						}
407  						if (!strcmp("perf-by-time", optarg)) {
408  							test_type = TEST_PERF_BY_TIME;
409  						}
410  					} else {
411  						printf("Error: -T requires an option\n");
412  						exit(FAIL);
413  					}
414  					break;
415  				case 'S':
416  					perf_by_size_size = (uint64_t)atoi(optarg) * ONE_GIGABYTE;
417  					perf_by_time_secs = (uint64_t)atoi(optarg);
418  					break;
419  				case 'x':
420  					force_packet_size = (uint32_t)atoi(optarg);
421  					if ((force_packet_size < 64) || (force_packet_size > 65536)) {
422  						printf("Unsupported packet size %u (accepted 64 - 65536)\n", force_packet_size);
423  						exit(FAIL);
424  					}
425  					break;
426  				case 'v':
427  					use_pckt_verification = 1;
428  					break;
429  				case 'C':
430  					continous = 1;
431  					break;
432  				case 'X':
433  					if (optarg) {
434  						show_stats = atoi(optarg);
435  					} else {
436  						show_stats = 1;
437  					}
438  					break;
439  				case 'z':
440  					if (compresscfg) {
441  						printf("Error: -c can only be specified once\n");
442  						exit(FAIL);
443  					}
444  					compresscfg = optarg;
445  					break;
446  				default:
447  					break;
448  			}
449  		}
450  	
451  		if (thisnodeid < 0) {
452  			printf("Who am I?!? missing -t from command line?\n");
453  			exit(FAIL);
454  		}
455  	
456  		if (onidx < 1) {
457  			printf("no other nodes configured?!? missing -n from command line\n");
458  			exit(FAIL);
459  		}
460  	
461  		parse_nodes(othernodeinfo, onidx, port, nodes, &thisidx);
462  	
463  		if (thisidx < 0) {
464  			printf("no config for this node found\n");
465  			exit(FAIL);
466  		}
467  	
468  		if (senderid >= 0) {
469  			for (i=0; i < onidx; i++) {
470  				if (senderid == nodes[i].nodeid) {
471  					break;
472  				}
473  			}
474  			if (i == onidx) {
475  				printf("Unable to find senderid in nodelist\n");
476  				exit(FAIL);
477  			}
478  		}
479  	
480  		if (((test_type == TEST_PERF_BY_SIZE) || (test_type == TEST_PERF_BY_TIME)) && (senderid < 0)) {
481  			printf("Error: performance test requires -s to be set (for now)\n");
482  			exit(FAIL);
483  		}
484  	
485  		logfd = start_logging(stdout);
486  	
487  		knet_h = knet_handle_new(thisnodeid, logfd, debug);
488  		if (!knet_h) {
489  			printf("Unable to knet_handle_new: %s\n", strerror(errno));
490  			exit(FAIL);
491  		}
492  	
493  		if (knet_handle_enable_access_lists(knet_h, use_access_lists) < 0) {
494  			printf("Unable to knet_handle_enable_access_lists: %s\n", strerror(errno));
495  			exit(FAIL);
496  		}
497  	
498  		if (cryptocfg) {
499  			memset(&knet_handle_crypto_cfg, 0, sizeof(knet_handle_crypto_cfg));
500  			cryptomodel = strtok(cryptocfg, ":");
501  			cryptotype = strtok(NULL, ":");
502  			cryptohash = strtok(NULL, ":");
503  			if (cryptomodel) {
504  				strncpy(knet_handle_crypto_cfg.crypto_model, cryptomodel, sizeof(knet_handle_crypto_cfg.crypto_model) - 1);
505  			}
506  			if (cryptotype) {
507  				strncpy(knet_handle_crypto_cfg.crypto_cipher_type, cryptotype, sizeof(knet_handle_crypto_cfg.crypto_cipher_type) - 1);
508  			}
509  			if (cryptohash) {
510  				strncpy(knet_handle_crypto_cfg.crypto_hash_type, cryptohash, sizeof(knet_handle_crypto_cfg.crypto_hash_type) - 1);
511  			}
512  			knet_handle_crypto_cfg.private_key_len = KNET_MAX_KEY_LEN;
513  			if (knet_handle_crypto(knet_h, &knet_handle_crypto_cfg)) {
514  				printf("Unable to init crypto\n");
515  				exit(FAIL);
516  			}
517  		}
518  	
519  		if (compresscfg) {
520  			memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg));
521  			snprintf(knet_handle_compress_cfg.compress_model, 16, "%s", strtok(compresscfg, ":"));
522  			knet_handle_compress_cfg.compress_level = atoi(strtok(NULL, ":"));
523  			knet_handle_compress_cfg.compress_threshold = atoi(strtok(NULL, ":"));
524  			if (knet_handle_compress(knet_h, &knet_handle_compress_cfg)) {
525  				printf("Unable to configure compress\n");
526  				exit(FAIL);
527  			}
528  		}
529  	
530  		if (knet_handle_enable_sock_notify(knet_h, &private_data, sock_notify) < 0) {
531  			printf("knet_handle_enable_sock_notify failed: %s\n", strerror(errno));
532  			knet_handle_free(knet_h);
533  			exit(FAIL);
534  	        }
535  	
536  		datafd = 0;
537  		channel = -1;
538  	
539  		if (knet_handle_add_datafd(knet_h, &datafd, &channel) < 0) {
540  			printf("knet_handle_add_datafd failed: %s\n", strerror(errno));
541  			knet_handle_free(knet_h);
542  			exit(FAIL);
543  		}
544  	
545  		if (knet_handle_pmtud_setfreq(knet_h, pmtud_interval) < 0) {
546  			printf("knet_handle_pmtud_setfreq failed: %s\n", strerror(errno));
547  			knet_handle_free(knet_h);
548  			exit(FAIL);
549  		}
550  	
551  		for (i=0; i < onidx; i++) {
552  			if (i == thisidx) {
553  				continue;
554  			}
555  	
556  			if (knet_host_add(knet_h, nodes[i].nodeid) < 0) {
557  				printf("knet_host_add failed: %s\n", strerror(errno));
558  				exit(FAIL);
559  			}
560  	
561  			if (knet_host_set_policy(knet_h, nodes[i].nodeid, policy) < 0) {
562  				printf("knet_host_set_policy failed: %s\n", strerror(errno));
563  				exit(FAIL);
564  			}
565  	
566  			for (link_idx = 0; link_idx < nodes[i].links; link_idx++) {
567  				if (portoffset) {
568  					if (nodes[thisidx].address[link_idx].ss_family == AF_INET) {
569  						so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx];
570  						thisport = ntohs(so_in->sin_port);
571  						thisnewport = thisport + nodes[i].nodeid;
572  						so_in->sin_port = (htons(thisnewport));
573  						so_in = (struct sockaddr_in *)&nodes[i].address[link_idx];
574  						otherport = ntohs(so_in->sin_port);
575  						othernewport = otherport + nodes[thisidx].nodeid;
576  						so_in->sin_port = (htons(othernewport));
577  					} else {
578  						so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx];
579  						thisport = ntohs(so_in6->sin6_port);
580  						thisnewport = thisport + nodes[i].nodeid;
581  						so_in6->sin6_port = (htons(thisnewport));
582  						so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx];
583  						otherport = ntohs(so_in6->sin6_port);
584  						othernewport = otherport + nodes[thisidx].nodeid;
585  						so_in6->sin6_port = (htons(othernewport));
586  					}
587  				}
588  				if (!globallistener) {
589  					src = &nodes[thisidx].address[link_idx];
590  				} else {
591  					if (nodes[thisidx].address[link_idx].ss_family == AF_INET) {
592  						src = &allv4;
593  					} else {
594  						src = &allv6;
595  					}
596  				}
597  				/*
598  				 * -P overrides per link protocol configuration
599  				 */
600  				if (protofound) {
601  					nodes[i].transport[link_idx] = protocol;
602  				}
603  				if (knet_link_set_config(knet_h, nodes[i].nodeid, link_idx,
604  							 nodes[i].transport[link_idx], src,
605  							 &nodes[i].address[link_idx], 0) < 0) {
606  					printf("Unable to configure link: %s\n", strerror(errno));
607  					exit(FAIL);
608  				}
609  				if (portoffset) {
610  					if (nodes[thisidx].address[link_idx].ss_family == AF_INET) {
611  						so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx];
612  						so_in->sin_port = (htons(thisport));
613  						so_in = (struct sockaddr_in *)&nodes[i].address[link_idx];
614  						so_in->sin_port = (htons(otherport));
615  					} else {
616  						so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx];
617  						so_in6->sin6_port = (htons(thisport));
618  						so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx];
619  						so_in6->sin6_port = (htons(otherport));
620  					}
621  				}
622  				if (knet_link_set_enable(knet_h, nodes[i].nodeid, link_idx, 1) < 0) {
623  					printf("knet_link_set_enable failed: %s\n", strerror(errno));
624  					exit(FAIL);
625  				}
626  				if (knet_link_set_ping_timers(knet_h, nodes[i].nodeid, link_idx, 1000, 10000, 2048) < 0) {
627  					printf("knet_link_set_ping_timers failed: %s\n", strerror(errno));
628  					exit(FAIL);
629  				}
630  				if (knet_link_set_pong_count(knet_h, nodes[i].nodeid, link_idx, 2) < 0) {
631  					printf("knet_link_set_pong_count failed: %s\n", strerror(errno));
632  					exit(FAIL);
633  				}
634  			}
635  		}
636  	
637  		if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) {
638  			printf("Unable to enable dst_host_filter: %s\n", strerror(errno));
639  			exit(FAIL);
640  		}
641  	
642  		if (knet_handle_setfwd(knet_h, 1) < 0) {
643  			printf("knet_handle_setfwd failed: %s\n", strerror(errno));
644  			exit(FAIL);
645  		}
646  	
647  		if (wait) {
648  			while(!allnodesup) {
649  				allnodesup = 1;
650  				for (i=0; i < onidx; i++) {
651  					if (i == thisidx) {
652  						continue;
653  					}
654  					if (knet_h->host_index[nodes[i].nodeid]->status.reachable != 1) {
655  						printf("[info]: waiting host %d to be reachable\n", nodes[i].nodeid);
656  						allnodesup = 0;
657  					}
658  				}
659  				if (!allnodesup) {
660  					sleep(1);
661  				}
662  			}
663  			sleep(1);
664  		}
665  	}
666  	
667  	/*
668  	 * calculate weak chksum (stole from corosync for debugging purposes)
669  	 */
670  	static uint32_t compute_chsum(const unsigned char *data, uint32_t data_len)
671  	{
672  		unsigned int i;
673  		unsigned int checksum = 0;
674  	
675  		for (i = 0; i < data_len; i++) {
676  			if (checksum & 1) {
677  				checksum |= 0x10000;
678  			}
679  	
680  			checksum = ((checksum >> 1) + (unsigned char)data[i]) & 0xffff;
681  		}
682  		return (checksum);
683  	}
684  	
685  	static void *_rx_thread(void *args)
686  	{
687  		int rx_epoll;
688  		struct epoll_event ev;
689  		struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
690  	
691  		struct sockaddr_storage address[PCKT_FRAG_MAX];
692  		struct knet_mmsghdr msg[PCKT_FRAG_MAX];
693  		struct iovec iov_in[PCKT_FRAG_MAX];
694  		int i, msg_recv;
695  		struct timespec clock_start, clock_end;
696  		unsigned long long time_diff = 0;
697  		uint64_t rx_pkts = 0;
698  		uint64_t rx_bytes = 0;
699  		unsigned int current_pckt_size = 0;
700  	
(1) Event cond_true: Condition "(unsigned int)i < 255", taking true branch.
(5) Event loop_begin: Jumped back to beginning of loop.
(6) Event cond_true: Condition "(unsigned int)i < 255", taking true branch.
(10) Event loop_begin: Jumped back to beginning of loop.
(11) Event cond_false: Condition "(unsigned int)i < 255", taking false branch.
701  		for (i = 0; (unsigned int)i < PCKT_FRAG_MAX; i++) {
702  			rx_buf[i] = malloc(KNET_MAX_PACKET_SIZE);
(2) Event cond_false: Condition "!rx_buf[i]", taking false branch.
(7) Event cond_false: Condition "!rx_buf[i]", taking false branch.
703  			if (!rx_buf[i]) {
704  				printf("RXT: Unable to malloc!\nHALTING RX THREAD!\n");
705  				return NULL;
(3) Event if_end: End of if statement.
(8) Event if_end: End of if statement.
706  			}
707  			memset(rx_buf[i], 0, KNET_MAX_PACKET_SIZE);
708  			iov_in[i].iov_base = (void *)rx_buf[i];
709  			iov_in[i].iov_len = KNET_MAX_PACKET_SIZE;
710  			memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
711  			msg[i].msg_hdr.msg_name = &address[i];
712  			msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
713  			msg[i].msg_hdr.msg_iov = &iov_in[i];
714  			msg[i].msg_hdr.msg_iovlen = 1;
(4) Event loop: Jumping back to the beginning of the loop.
(9) Event loop: Jumping back to the beginning of the loop.
(12) Event loop_end: Reached end of loop.
715  		}
716  	
717  		rx_epoll = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
(13) Event cond_false: Condition "rx_epoll < 0", taking false branch.
718  		if (rx_epoll < 0) {
719  			printf("RXT: Unable to create epoll!\nHALTING RX THREAD!\n");
720  			return NULL;
(14) Event if_end: End of if statement.
721  		}
722  	
723  		memset(&ev, 0, sizeof(struct epoll_event));
724  		ev.events = EPOLLIN;
725  		ev.data.fd = datafd;
726  	
(15) Event cond_false: Condition "epoll_ctl(rx_epoll, 1, datafd, &ev)", taking false branch.
727  		if (epoll_ctl(rx_epoll, EPOLL_CTL_ADD, datafd, &ev)) {
728  			printf("RXT: Unable to add datafd to epoll\nHALTING RX THREAD!\n");
729  			return NULL;
(16) Event if_end: End of if statement.
730  		}
731  	
732  		memset(&clock_start, 0, sizeof(clock_start));
733  		memset(&clock_end, 0, sizeof(clock_start));
734  	
(17) Event missing_lock: Accessing "bench_shutdown_in_progress" without holding lock "shutdown_mutex". Elsewhere, "bench_shutdown_in_progress" is written to with "shutdown_mutex" held 1 out of 1 times (1 of these accesses strongly imply that it is necessary).
Also see events: [lock_acquire][example_access]
735  		while (!bench_shutdown_in_progress) {
736  			if (epoll_wait(rx_epoll, events, KNET_EPOLL_MAX_EVENTS, 1) >= 1) {
737  				msg_recv = _recvmmsg(datafd, &msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL);
738  				if (msg_recv < 0) {
739  					printf("[info]: RXT: error from recvmmsg: %s\n", strerror(errno));
740  				}
741  				switch(test_type) {
742  					case TEST_PING_AND_DATA:
743  						for (i = 0; i < msg_recv; i++) {
744  							if (msg[i].msg_len == 0) {
745  								printf("[info]: RXT: received 0 bytes message?\n");
746  							}
747  							printf("[info]: received %u bytes message: %s\n", msg[i].msg_len, (char *)msg[i].msg_hdr.msg_iov->iov_base);
748  						}
749  						break;
750  					case TEST_PERF_BY_TIME:
751  					case TEST_PERF_BY_SIZE:
752  						for (i = 0; i < msg_recv; i++) {
753  							if (msg[i].msg_len < 64) {
754  								if (msg[i].msg_len == 0) {
755  									printf("[info]: RXT: received 0 bytes message?\n");
756  								}
757  								if (msg[i].msg_len == TEST_START) {
758  									if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) {
759  										printf("[info]: unable to get start time!\n");
760  									}
761  								}
762  								if (msg[i].msg_len == TEST_STOP) {
763  									double average_rx_mbytes;
764  									double average_rx_pkts;
765  									double time_diff_sec;
766  									if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) {
767  										printf("[info]: unable to get end time!\n");
768  									}
769  									timespec_diff(clock_start, clock_end, &time_diff);
770  									/*
771  									 * adjust for sleep(2) between sending the last data and TEST_STOP
772  									 */
773  									time_diff = time_diff - 2000000000llu;
774  	
775  									/*
776  									 * convert to seconds
777  									 */
778  									time_diff_sec = (double)time_diff / 1000000000llu;
779  	
780  									average_rx_mbytes = (double)((rx_bytes / time_diff_sec) / (1024 * 1024));
781  									average_rx_pkts = (double)(rx_pkts / time_diff_sec);
782  									if (!machine_output) {
783  										printf("[perf] execution time: %8.4f secs Average speed: %8.4f MB/sec %8.4f pckts/sec (size: %u total: %" PRIu64 ")\n",
784  										       time_diff_sec, average_rx_mbytes, average_rx_pkts, current_pckt_size, rx_pkts);
785  									} else {
786  										printf("[perf],%.4f,%u,%" PRIu64 ",%.4f,%.4f\n", time_diff_sec, current_pckt_size, rx_pkts, average_rx_mbytes, average_rx_pkts);
787  									}
788  									rx_pkts = 0;
789  									rx_bytes = 0;
790  									current_pckt_size = 0;
791  								}
792  								if (msg[i].msg_len == TEST_COMPLETE) {
793  									wait_for_perf_rx = 1;
794  								}
795  								continue;
796  							}
797  							if (use_pckt_verification) {
798  								struct pckt_ver *recv_pckt = (struct pckt_ver *)msg[i].msg_hdr.msg_iov->iov_base;
799  								uint32_t chksum;
800  	
801  								if (msg[i].msg_len != recv_pckt->len) {
802  									printf("Wrong packet len received: %u expected: %u!\n", msg[i].msg_len, recv_pckt->len);
803  									exit(FAIL);
804  								}
805  								chksum = compute_chsum((const unsigned char *)msg[i].msg_hdr.msg_iov->iov_base + sizeof(struct pckt_ver), msg[i].msg_len - sizeof(struct pckt_ver));
806  								if (recv_pckt->chksum != chksum){
807  									printf("Wrong packet checksum received: %u expected: %u!\n", recv_pckt->chksum, chksum);
808  									exit(FAIL);
809  								}
810  							}
811  							rx_pkts++;
812  							rx_bytes = rx_bytes + msg[i].msg_len;
813  							current_pckt_size = msg[i].msg_len;
814  						}
815  						break;
816  				}
817  			}
818  		}
819  	
820  		epoll_ctl(rx_epoll, EPOLL_CTL_DEL, datafd, &ev);
821  		close(rx_epoll);
822  	
823  		return NULL;
824  	}
825  	
826  	static void setup_data_txrx_common(void)
827  	{
828  		if (!rx_thread) {
829  			if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) {
830  				printf("Unable to enable dst_host_filter: %s\n", strerror(errno));
831  				exit(FAIL);
832  			}
833  			printf("[info]: setting up rx thread\n");
834  			if (pthread_create(&rx_thread, 0, _rx_thread, NULL)) {
835  				printf("Unable to start rx thread\n");
836  				exit(FAIL);
837  			}
838  		}
839  	}
840  	
841  	static void stop_rx_thread(void)
842  	{
843  		void *retval;
844  		unsigned int i;
845  	
846  		if (rx_thread) {
847  			printf("[info]: shutting down rx thread\n");
848  			sleep(2);
849  			pthread_cancel(rx_thread);
850  			pthread_join(rx_thread, &retval);
851  			for (i = 0; i < PCKT_FRAG_MAX; i ++) {
852  				free(rx_buf[i]);
853  			}
854  		}
855  	}
856  	
857  	static void send_ping_data(void)
858  	{
859  		char buf[65535];
860  		ssize_t len;
861  	
862  		memset(&buf, 0, sizeof(buf));
863  		snprintf(buf, sizeof(buf), "Hello world!");
864  	
865  		if (compresscfg) {
866  			len = sizeof(buf);
867  		} else {
868  			len = strlen(buf);
869  		}
870  	
871  		if (knet_send(knet_h, buf, len, channel) != len) {
872  			printf("[info]: Error sending hello world: %s\n", strerror(errno));
873  		}
874  		sleep(1);
875  	}
876  	
877  	static int send_messages(struct knet_mmsghdr *msg, int msgs_to_send)
878  	{
879  		int sent_msgs, prev_sent, progress, total_sent;
880  	
881  		total_sent = 0;
882  		sent_msgs = 0;
883  		prev_sent = 0;
884  		progress = 1;
885  	
886  	retry:
887  		errno = 0;
888  		sent_msgs = _sendmmsg(datafd, 0, &msg[0], msgs_to_send, MSG_NOSIGNAL);
889  	
890  		if (sent_msgs < 0) {
891  			if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
892  				usleep(KNET_THREADS_TIMERES / 16);
893  				goto retry;
894  			}
895  			printf("[info]: Unable to send messages: %s\n", strerror(errno));
896  			return -1;
897  		}
898  	
899  		total_sent = total_sent + sent_msgs;
900  	
901  		if ((sent_msgs >= 0) && (sent_msgs < msgs_to_send)) {
902  			if ((sent_msgs) || (progress)) {
903  				msgs_to_send = msgs_to_send - sent_msgs;
904  				prev_sent = prev_sent + sent_msgs;
905  				if (sent_msgs) {
906  					progress = 1;
907  				} else {
908  					progress = 0;
909  				}
910  				goto retry;
911  			}
912  			if (!progress) {
913  				printf("[info]: Unable to send more messages after retry\n");
914  			}
915  		}
916  		return total_sent;
917  	}
918  	
919  	static int setup_send_buffers_common(struct knet_mmsghdr *msg, struct iovec *iov_out, char *tx_buf[])
920  	{
921  		unsigned int i;
922  	
923  		for (i = 0; i < PCKT_FRAG_MAX; i++) {
924  			tx_buf[i] = malloc(KNET_MAX_PACKET_SIZE);
925  			if (!tx_buf[i]) {
926  				printf("TXT: Unable to malloc!\n");
927  				return -1;
928  			}
929  			memset(tx_buf[i], i, KNET_MAX_PACKET_SIZE);
930  			iov_out[i].iov_base = (void *)tx_buf[i];
931  			memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
932  			msg[i].msg_hdr.msg_iov = &iov_out[i];
933  			msg[i].msg_hdr.msg_iovlen = 1;
934  		}
935  		return 0;
936  	}
937  	
938  	static void send_perf_data_by_size(void)
939  	{
940  		char *tx_buf[PCKT_FRAG_MAX];
941  		struct knet_mmsghdr msg[PCKT_FRAG_MAX];
942  		struct iovec iov_out[PCKT_FRAG_MAX];
943  		char ctrl_message[16];
944  		int sent_msgs;
945  		unsigned int i;
946  		uint64_t total_pkts_to_tx;
947  		uint64_t packets_to_send;
948  		uint32_t packetsize = 64;
949  	
950  		setup_send_buffers_common(msg, iov_out, tx_buf);
951  	
952  		while (packetsize <= KNET_MAX_PACKET_SIZE) {
953  			if (force_packet_size) {
954  				packetsize = force_packet_size;
955  			}
956  			for (i = 0; i < PCKT_FRAG_MAX; i++) {
957  				iov_out[i].iov_len = packetsize;
958  				if (use_pckt_verification) {
959  					struct pckt_ver *tx_pckt = (struct pckt_ver *)&iov_out[i].iov_base;
960  					tx_pckt->len = iov_out[i].iov_len;
961  					tx_pckt->chksum = compute_chsum((const unsigned char *)iov_out[i].iov_base + sizeof(struct pckt_ver), iov_out[i].iov_len - sizeof(struct pckt_ver));
962  				}
963  			}
964  	
965  			total_pkts_to_tx = perf_by_size_size / packetsize;
966  			printf("[info]: testing with %u packet size. total bytes to transfer: %" PRIu64 " (%" PRIu64 " packets)\n", packetsize, perf_by_size_size, total_pkts_to_tx);
967  	
968  			memset(ctrl_message, 0, sizeof(ctrl_message));
969  			knet_send(knet_h, ctrl_message, TEST_START, channel);
970  	
971  			while (total_pkts_to_tx > 0) {
972  				if (total_pkts_to_tx >= PCKT_FRAG_MAX) {
973  					packets_to_send = PCKT_FRAG_MAX;
974  				} else {
975  					packets_to_send = total_pkts_to_tx;
976  				}
977  				sent_msgs = send_messages(&msg[0], packets_to_send);
978  				if (sent_msgs < 0) {
979  					printf("Something went wrong, aborting\n");
980  					exit(FAIL);
981  				}
982  				total_pkts_to_tx = total_pkts_to_tx - sent_msgs;
983  			}
984  	
985  			sleep(2);
986  	
987  			knet_send(knet_h, ctrl_message, TEST_STOP, channel);
988  	
989  			if ((packetsize == KNET_MAX_PACKET_SIZE) || (force_packet_size)) {
990  				break;
991  			}
992  	
993  			/*
994  			 * Use a multiplier that can always divide properly a GB
995  			 * into smaller chunks without worry about boundaries
996  			 */
997  			packetsize *= 4;
998  	
999  			if (packetsize > KNET_MAX_PACKET_SIZE) {
1000 				packetsize = KNET_MAX_PACKET_SIZE;
1001 			}
1002 		}
1003 	
1004 		knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel);
1005 	
1006 		for (i = 0; i < PCKT_FRAG_MAX; i++) {
1007 			free(tx_buf[i]);
1008 		}
1009 	}
1010 	
1011 	/* For sorting the node list into order */
1012 	static int node_compare(const void *aptr, const void *bptr)
1013 	{
1014 		uint16_t a,b;
1015 	
1016 		a = *(uint16_t *)aptr;
1017 		b = *(uint16_t *)bptr;
1018 	
1019 		return a > b;
1020 	}
1021 	
1022 	static void display_stats(int level)
1023 	{
1024 		struct knet_handle_stats handle_stats;
1025 		struct knet_link_status link_status;
1026 		struct knet_link_stats total_link_stats;
1027 		knet_node_id_t host_list[KNET_MAX_HOST];
1028 		uint8_t link_list[KNET_MAX_LINK];
1029 		unsigned int i,j;
1030 		size_t num_hosts, num_links;
1031 	
1032 		if (knet_handle_get_stats(knet_h, &handle_stats, sizeof(handle_stats)) < 0) {
1033 			perror("[info]: failed to get knet handle stats");
1034 			return;
1035 		}
1036 	
1037 		if (compresscfg || cryptocfg) {
1038 			printf("\n");
1039 			printf("[stat]: handle stats\n");
1040 			printf("[stat]: ------------\n");
1041 			if (compresscfg) {
1042 				printf("[stat]:  tx_uncompressed_packets: %" PRIu64 "\n", handle_stats.tx_uncompressed_packets);
1043 				printf("[stat]:  tx_compressed_packets: %" PRIu64 "\n", handle_stats.tx_compressed_packets);
1044 				printf("[stat]:  tx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_original_bytes);
1045 				printf("[stat]:  tx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_size_bytes );
1046 				printf("[stat]:  tx_compress_time_ave: %" PRIu64 "\n", handle_stats.tx_compress_time_ave);
1047 				printf("[stat]:  tx_compress_time_min: %" PRIu64 "\n", handle_stats.tx_compress_time_min);
1048 				printf("[stat]:  tx_compress_time_max: %" PRIu64 "\n", handle_stats.tx_compress_time_max);
1049 				printf("[stat]:  rx_compressed_packets: %" PRIu64 "\n", handle_stats.rx_compressed_packets);
1050 				printf("[stat]:  rx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_original_bytes);
1051 				printf("[stat]:  rx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_size_bytes);
1052 				printf("[stat]:  rx_compress_time_ave: %" PRIu64 "\n", handle_stats.rx_compress_time_ave);
1053 				printf("[stat]:  rx_compress_time_min: %" PRIu64 "\n", handle_stats.rx_compress_time_min);
1054 				printf("[stat]:  rx_compress_time_max: %" PRIu64 "\n", handle_stats.rx_compress_time_max);
1055 				printf("\n");
1056 			}
1057 			if (cryptocfg) {
1058 				printf("[stat]:  tx_crypt_packets: %" PRIu64 "\n", handle_stats.tx_crypt_packets);
1059 				printf("[stat]:  tx_crypt_byte_overhead: %" PRIu64 "\n", handle_stats.tx_crypt_byte_overhead);
1060 				printf("[stat]:  tx_crypt_time_ave: %" PRIu64 "\n", handle_stats.tx_crypt_time_ave);
1061 				printf("[stat]:  tx_crypt_time_min: %" PRIu64 "\n", handle_stats.tx_crypt_time_min);
1062 				printf("[stat]:  tx_crypt_time_max: %" PRIu64 "\n", handle_stats.tx_crypt_time_max);
1063 				printf("[stat]:  rx_crypt_packets: %" PRIu64 "\n", handle_stats.rx_crypt_packets);
1064 				printf("[stat]:  rx_crypt_time_ave: %" PRIu64 "\n", handle_stats.rx_crypt_time_ave);
1065 				printf("[stat]:  rx_crypt_time_min: %" PRIu64 "\n", handle_stats.rx_crypt_time_min);
1066 				printf("[stat]:  rx_crypt_time_max: %" PRIu64 "\n", handle_stats.rx_crypt_time_max);
1067 				printf("\n");
1068 			}
1069 		}
1070 		if (level < 2) {
1071 			return;
1072 		}
1073 	
1074 		memset(&total_link_stats, 0, sizeof(struct knet_link_stats));
1075 	
1076 		if (knet_host_get_host_list(knet_h, host_list, &num_hosts) < 0) {
1077 			perror("[info]: cannot get host list for stats");
1078 			return;
1079 		}
1080 	
1081 		/* Print in host ID order */
1082 		qsort(host_list, num_hosts, sizeof(uint16_t), node_compare);
1083 	
1084 		for (j=0; j<num_hosts; j++) {
1085 			if (knet_link_get_link_list(knet_h, host_list[j], link_list, &num_links) < 0) {
1086 				perror("[info]: cannot get link list for stats");
1087 				return;
1088 			}
1089 	
1090 			for (i=0; i < num_links; i++) {
1091 				if (knet_link_get_status(knet_h, host_list[j], link_list[i], &link_status, sizeof(link_status)) < 0) {
1092 					perror("[info]: cannot get link status");
1093 					return;
1094 				}
1095 	
1096 				total_link_stats.tx_data_packets += link_status.stats.tx_data_packets;
1097 				total_link_stats.rx_data_packets += link_status.stats.rx_data_packets;
1098 				total_link_stats.tx_data_bytes += link_status.stats.tx_data_bytes;
1099 				total_link_stats.rx_data_bytes += link_status.stats.rx_data_bytes;
1100 				total_link_stats.rx_ping_packets += link_status.stats.rx_ping_packets;
1101 				total_link_stats.tx_ping_packets += link_status.stats.tx_ping_packets;
1102 				total_link_stats.rx_ping_bytes += link_status.stats.rx_ping_bytes;
1103 				total_link_stats.tx_ping_bytes += link_status.stats.tx_ping_bytes;
1104 				total_link_stats.rx_pong_packets += link_status.stats.rx_pong_packets;
1105 				total_link_stats.tx_pong_packets += link_status.stats.tx_pong_packets;
1106 				total_link_stats.rx_pong_bytes += link_status.stats.rx_pong_bytes;
1107 				total_link_stats.tx_pong_bytes += link_status.stats.tx_pong_bytes;
1108 				total_link_stats.rx_pmtu_packets += link_status.stats.rx_pmtu_packets;
1109 				total_link_stats.tx_pmtu_packets += link_status.stats.tx_pmtu_packets;
1110 				total_link_stats.rx_pmtu_bytes += link_status.stats.rx_pmtu_bytes;
1111 				total_link_stats.tx_pmtu_bytes += link_status.stats.tx_pmtu_bytes;
1112 	
1113 				total_link_stats.tx_total_packets += link_status.stats.tx_total_packets;
1114 				total_link_stats.rx_total_packets += link_status.stats.rx_total_packets;
1115 				total_link_stats.tx_total_bytes += link_status.stats.tx_total_bytes;
1116 				total_link_stats.rx_total_bytes += link_status.stats.rx_total_bytes;
1117 				total_link_stats.tx_total_errors += link_status.stats.tx_total_errors;
1118 				total_link_stats.tx_total_retries += link_status.stats.tx_total_retries;
1119 	
1120 				total_link_stats.tx_pmtu_errors += link_status.stats.tx_pmtu_errors;
1121 				total_link_stats.tx_pmtu_retries += link_status.stats.tx_pmtu_retries;
1122 				total_link_stats.tx_ping_errors += link_status.stats.tx_ping_errors;
1123 				total_link_stats.tx_ping_retries += link_status.stats.tx_ping_retries;
1124 				total_link_stats.tx_pong_errors += link_status.stats.tx_pong_errors;
1125 				total_link_stats.tx_pong_retries += link_status.stats.tx_pong_retries;
1126 				total_link_stats.tx_data_errors += link_status.stats.tx_data_errors;
1127 				total_link_stats.tx_data_retries += link_status.stats.tx_data_retries;
1128 	
1129 				total_link_stats.down_count += link_status.stats.down_count;
1130 				total_link_stats.up_count += link_status.stats.up_count;
1131 	
1132 				if (level > 2) {
1133 					printf("\n");
1134 					printf("[stat]: Node %d Link %d\n", host_list[j], link_list[i]);
1135 	
1136 					printf("[stat]:   tx_data_packets:  %" PRIu64 "\n", link_status.stats.tx_data_packets);
1137 					printf("[stat]:   rx_data_packets:  %" PRIu64 "\n", link_status.stats.rx_data_packets);
1138 					printf("[stat]:   tx_data_bytes:    %" PRIu64 "\n", link_status.stats.tx_data_bytes);
1139 					printf("[stat]:   rx_data_bytes:    %" PRIu64 "\n", link_status.stats.rx_data_bytes);
1140 					printf("[stat]:   rx_ping_packets:  %" PRIu64 "\n", link_status.stats.rx_ping_packets);
1141 					printf("[stat]:   tx_ping_packets:  %" PRIu64 "\n", link_status.stats.tx_ping_packets);
1142 					printf("[stat]:   rx_ping_bytes:    %" PRIu64 "\n", link_status.stats.rx_ping_bytes);
1143 					printf("[stat]:   tx_ping_bytes:    %" PRIu64 "\n", link_status.stats.tx_ping_bytes);
1144 					printf("[stat]:   rx_pong_packets:  %" PRIu64 "\n", link_status.stats.rx_pong_packets);
1145 					printf("[stat]:   tx_pong_packets:  %" PRIu64 "\n", link_status.stats.tx_pong_packets);
1146 					printf("[stat]:   rx_pong_bytes:    %" PRIu64 "\n", link_status.stats.rx_pong_bytes);
1147 					printf("[stat]:   tx_pong_bytes:    %" PRIu64 "\n", link_status.stats.tx_pong_bytes);
1148 					printf("[stat]:   rx_pmtu_packets:  %" PRIu64 "\n", link_status.stats.rx_pmtu_packets);
1149 					printf("[stat]:   tx_pmtu_packets:  %" PRIu64 "\n", link_status.stats.tx_pmtu_packets);
1150 					printf("[stat]:   rx_pmtu_bytes:    %" PRIu64 "\n", link_status.stats.rx_pmtu_bytes);
1151 					printf("[stat]:   tx_pmtu_bytes:    %" PRIu64 "\n", link_status.stats.tx_pmtu_bytes);
1152 	
1153 					printf("[stat]:   tx_total_packets: %" PRIu64 "\n", link_status.stats.tx_total_packets);
1154 					printf("[stat]:   rx_total_packets: %" PRIu64 "\n", link_status.stats.rx_total_packets);
1155 					printf("[stat]:   tx_total_bytes:   %" PRIu64 "\n", link_status.stats.tx_total_bytes);
1156 					printf("[stat]:   rx_total_bytes:   %" PRIu64 "\n", link_status.stats.rx_total_bytes);
1157 					printf("[stat]:   tx_total_errors:  %" PRIu64 "\n", link_status.stats.tx_total_errors);
1158 					printf("[stat]:   tx_total_retries: %" PRIu64 "\n", link_status.stats.tx_total_retries);
1159 	
1160 					printf("[stat]:   tx_pmtu_errors:   %" PRIu32 "\n", link_status.stats.tx_pmtu_errors);
1161 					printf("[stat]:   tx_pmtu_retries:  %" PRIu32 "\n", link_status.stats.tx_pmtu_retries);
1162 					printf("[stat]:   tx_ping_errors:   %" PRIu32 "\n", link_status.stats.tx_ping_errors);
1163 					printf("[stat]:   tx_ping_retries:  %" PRIu32 "\n", link_status.stats.tx_ping_retries);
1164 					printf("[stat]:   tx_pong_errors:   %" PRIu32 "\n", link_status.stats.tx_pong_errors);
1165 					printf("[stat]:   tx_pong_retries:  %" PRIu32 "\n", link_status.stats.tx_pong_retries);
1166 					printf("[stat]:   tx_data_errors:   %" PRIu32 "\n", link_status.stats.tx_data_errors);
1167 					printf("[stat]:   tx_data_retries:  %" PRIu32 "\n", link_status.stats.tx_data_retries);
1168 	
1169 					printf("[stat]:   latency_min:      %" PRIu32 "\n", link_status.stats.latency_min);
1170 					printf("[stat]:   latency_max:      %" PRIu32 "\n", link_status.stats.latency_max);
1171 					printf("[stat]:   latency_ave:      %" PRIu32 "\n", link_status.stats.latency_ave);
1172 					printf("[stat]:   latency_samples:  %" PRIu32 "\n", link_status.stats.latency_samples);
1173 	
1174 					printf("[stat]:   down_count:       %" PRIu32 "\n", link_status.stats.down_count);
1175 					printf("[stat]:   up_count:         %" PRIu32 "\n", link_status.stats.up_count);
1176 				}
1177 			}
1178 		}
1179 		printf("\n");
1180 		printf("[stat]: Total link stats\n");
1181 		printf("[stat]: ----------------\n");
1182 		printf("[stat]: tx_data_packets:  %" PRIu64 "\n", total_link_stats.tx_data_packets);
1183 		printf("[stat]: rx_data_packets:  %" PRIu64 "\n", total_link_stats.rx_data_packets);
1184 		printf("[stat]: tx_data_bytes:    %" PRIu64 "\n", total_link_stats.tx_data_bytes);
1185 		printf("[stat]: rx_data_bytes:    %" PRIu64 "\n", total_link_stats.rx_data_bytes);
1186 		printf("[stat]: rx_ping_packets:  %" PRIu64 "\n", total_link_stats.rx_ping_packets);
1187 		printf("[stat]: tx_ping_packets:  %" PRIu64 "\n", total_link_stats.tx_ping_packets);
1188 		printf("[stat]: rx_ping_bytes:    %" PRIu64 "\n", total_link_stats.rx_ping_bytes);
1189 		printf("[stat]: tx_ping_bytes:    %" PRIu64 "\n", total_link_stats.tx_ping_bytes);
1190 		printf("[stat]: rx_pong_packets:  %" PRIu64 "\n", total_link_stats.rx_pong_packets);
1191 		printf("[stat]: tx_pong_packets:  %" PRIu64 "\n", total_link_stats.tx_pong_packets);
1192 		printf("[stat]: rx_pong_bytes:    %" PRIu64 "\n", total_link_stats.rx_pong_bytes);
1193 		printf("[stat]: tx_pong_bytes:    %" PRIu64 "\n", total_link_stats.tx_pong_bytes);
1194 		printf("[stat]: rx_pmtu_packets:  %" PRIu64 "\n", total_link_stats.rx_pmtu_packets);
1195 		printf("[stat]: tx_pmtu_packets:  %" PRIu64 "\n", total_link_stats.tx_pmtu_packets);
1196 		printf("[stat]: rx_pmtu_bytes:    %" PRIu64 "\n", total_link_stats.rx_pmtu_bytes);
1197 		printf("[stat]: tx_pmtu_bytes:    %" PRIu64 "\n", total_link_stats.tx_pmtu_bytes);
1198 	
1199 		printf("[stat]: tx_total_packets: %" PRIu64 "\n", total_link_stats.tx_total_packets);
1200 		printf("[stat]: rx_total_packets: %" PRIu64 "\n", total_link_stats.rx_total_packets);
1201 		printf("[stat]: tx_total_bytes:   %" PRIu64 "\n", total_link_stats.tx_total_bytes);
1202 		printf("[stat]: rx_total_bytes:   %" PRIu64 "\n", total_link_stats.rx_total_bytes);
1203 		printf("[stat]: tx_total_errors:  %" PRIu64 "\n", total_link_stats.tx_total_errors);
1204 		printf("[stat]: tx_total_retries: %" PRIu64 "\n", total_link_stats.tx_total_retries);
1205 	
1206 		printf("[stat]: tx_pmtu_errors:   %" PRIu32 "\n", total_link_stats.tx_pmtu_errors);
1207 		printf("[stat]: tx_pmtu_retries:  %" PRIu32 "\n", total_link_stats.tx_pmtu_retries);
1208 		printf("[stat]: tx_ping_errors:   %" PRIu32 "\n", total_link_stats.tx_ping_errors);
1209 		printf("[stat]: tx_ping_retries:  %" PRIu32 "\n", total_link_stats.tx_ping_retries);
1210 		printf("[stat]: tx_pong_errors:   %" PRIu32 "\n", total_link_stats.tx_pong_errors);
1211 		printf("[stat]: tx_pong_retries:  %" PRIu32 "\n", total_link_stats.tx_pong_retries);
1212 		printf("[stat]: tx_data_errors:   %" PRIu32 "\n", total_link_stats.tx_data_errors);
1213 		printf("[stat]: tx_data_retries:  %" PRIu32 "\n", total_link_stats.tx_data_retries);
1214 	
1215 		printf("[stat]: down_count:       %" PRIu32 "\n", total_link_stats.down_count);
1216 		printf("[stat]: up_count:         %" PRIu32 "\n", total_link_stats.up_count);
1217 	
1218 	}
1219 	
1220 	static void send_perf_data_by_time(void)
1221 	{
1222 		char *tx_buf[PCKT_FRAG_MAX];
1223 		struct knet_mmsghdr msg[PCKT_FRAG_MAX];
1224 		struct iovec iov_out[PCKT_FRAG_MAX];
1225 		char ctrl_message[16];
1226 		int sent_msgs;
1227 		unsigned int i;
1228 		uint32_t packetsize = 64;
1229 		struct timespec clock_start, clock_end;
1230 		unsigned long long time_diff = 0;
1231 	
1232 		setup_send_buffers_common(msg, iov_out, tx_buf);
1233 	
1234 		memset(&clock_start, 0, sizeof(clock_start));
1235 		memset(&clock_end, 0, sizeof(clock_start));
1236 	
1237 		while (packetsize <= KNET_MAX_PACKET_SIZE) {
1238 			if (force_packet_size) {
1239 				packetsize = force_packet_size;
1240 			}
1241 			for (i = 0; i < PCKT_FRAG_MAX; i++) {
1242 				iov_out[i].iov_len = packetsize;
1243 				if (use_pckt_verification) {
1244 					struct pckt_ver *tx_pckt = (struct pckt_ver *)iov_out[i].iov_base;
1245 					tx_pckt->len = iov_out[i].iov_len;
1246 					tx_pckt->chksum = compute_chsum((const unsigned char *)iov_out[i].iov_base + sizeof(struct pckt_ver), iov_out[i].iov_len - sizeof(struct pckt_ver));
1247 				}
1248 			}
1249 			printf("[info]: testing with %u bytes packet size for %" PRIu64 " seconds.\n", packetsize, perf_by_time_secs);
1250 	
1251 			memset(ctrl_message, 0, sizeof(ctrl_message));
1252 			knet_send(knet_h, ctrl_message, TEST_START, channel);
1253 	
1254 			if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) {
1255 				printf("[info]: unable to get start time!\n");
1256 			}
1257 	
1258 			time_diff = 0;
1259 	
1260 			while (time_diff < (perf_by_time_secs * 1000000000llu)) {
1261 				sent_msgs = send_messages(&msg[0], PCKT_FRAG_MAX);
1262 				if (sent_msgs < 0) {
1263 					printf("Something went wrong, aborting\n");
1264 					exit(FAIL);
1265 				}
1266 				if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) {
1267 					printf("[info]: unable to get end time!\n");
1268 				}
1269 				timespec_diff(clock_start, clock_end, &time_diff);
1270 			}
1271 	
1272 			sleep(2);
1273 	
1274 			knet_send(knet_h, ctrl_message, TEST_STOP, channel);
1275 	
1276 			if ((packetsize == KNET_MAX_PACKET_SIZE) || (force_packet_size)) {
1277 				break;
1278 			}
1279 	
1280 			/*
1281 			 * Use a multiplier that can always divide properly a GB
1282 			 * into smaller chunks without worry about boundaries
1283 			 */
1284 			packetsize *= 4;
1285 	
1286 			if (packetsize > KNET_MAX_PACKET_SIZE) {
1287 				packetsize = KNET_MAX_PACKET_SIZE;
1288 			}
1289 		}
1290 	
1291 		knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel);
1292 	
1293 		for (i = 0; i < PCKT_FRAG_MAX; i++) {
1294 			free(tx_buf[i]);
1295 		}
1296 	}
1297 	
1298 	static void cleanup_all(void)
1299 	{
1300 		knet_handle_t knet_h_tmp[2];
1301 	
(18) Event lock_acquire: Example 1: Calling "pthread_mutex_lock" acquires lock "shutdown_mutex".
Also see events: [missing_lock][example_access]
1302 		if (pthread_mutex_lock(&shutdown_mutex)) {
1303 			return;
1304 		}
1305 	
1306 		if (bench_shutdown_in_progress) {
1307 			pthread_mutex_unlock(&shutdown_mutex);
1308 			return;
1309 		}
1310 	
(19) Event example_access: Example 1 (cont.): "bench_shutdown_in_progress" is written to with lock "shutdown_mutex" held.
Also see events: [missing_lock][lock_acquire]
1311 		bench_shutdown_in_progress = 1;
1312 	
1313 		pthread_mutex_unlock(&shutdown_mutex);
1314 	
1315 		if (rx_thread) {
1316 			stop_rx_thread();
1317 		}
1318 		knet_h_tmp[1] = knet_h;
1319 		knet_handle_stop_everything(knet_h_tmp, 1);
1320 	}
1321 	
1322 	static void sigint_handler(int signum)
1323 	{
1324 		printf("[info]: cleaning up... got signal: %d\n", signum);
1325 		cleanup_all();
1326 		exit(PASS);
1327 	}
1328 	
1329 	int main(int argc, char *argv[])
1330 	{
1331 		if (signal(SIGINT, sigint_handler) == SIG_ERR) {
1332 			printf("Unable to configure SIGINT handler\n");
1333 			exit(FAIL);
1334 		}
1335 	
1336 		setup_knet(argc, argv);
1337 	
1338 		setup_data_txrx_common();
1339 	
1340 		sleep(5);
1341 	
1342 	restart:
1343 		switch(test_type) {
1344 			default:
1345 			case TEST_PING: /* basic ping, no data */
1346 				sleep(5);
1347 				break;
1348 			case TEST_PING_AND_DATA:
1349 				send_ping_data();
1350 				break;
1351 			case TEST_PERF_BY_SIZE:
1352 				if (senderid == thisnodeid) {
1353 					send_perf_data_by_size();
1354 				} else {
1355 					printf("[info]: waiting for perf rx thread to finish\n");
1356 					while(!wait_for_perf_rx) {
1357 						sleep(1);
1358 					}
1359 				}
1360 				break;
1361 			case TEST_PERF_BY_TIME:
1362 				if (senderid == thisnodeid) {
1363 					send_perf_data_by_time();
1364 				} else {
1365 					printf("[info]: waiting for perf rx thread to finish\n");
1366 					while(!wait_for_perf_rx) {
1367 						sleep(1);
1368 					}
1369 				}
1370 				break;
1371 		}
1372 		if (continous) {
1373 			goto restart;
1374 		}
1375 		if (show_stats) {
1376 			display_stats(show_stats);
1377 		}
1378 	
1379 		cleanup_all();
1380 	
1381 		return PASS;
1382 	}
1383