1 /*
2 * Copyright (C) 2016-2025 Red Hat, Inc. All rights reserved.
3 *
4 * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
5 *
6 * This software licensed under GPL-2.0+
7 */
8
9 #include "config.h"
10
11 #include <errno.h>
12 #include <stdio.h>
13 #include <stdlib.h>
14 #include <string.h>
15 #include <unistd.h>
16 #include <signal.h>
17 #include <pthread.h>
18 #include <sys/types.h>
19 #include <inttypes.h>
20
21 #include "libknet.h"
22
23 #include "compat.h"
24 #include "internals.h"
25 #include "netutils.h"
26 #include "transport_common.h"
27 #include "threads_common.h"
28 #include "test-common.h"
29
30 #define MAX_NODES 128
31
32 static int senderid = -1;
33 static int thisnodeid = -1;
34 static knet_handle_t knet_h;
35 static int datafd = 0;
36 static int8_t channel = 0;
37 static int globallistener = 0;
38 static int continous = 0;
39 static int show_stats = 0;
40 static struct sockaddr_storage allv4;
41 static struct sockaddr_storage allv6;
42 static int broadcast_test = 1;
43 static pthread_t rx_thread = {0};
44 static char *rx_buf[PCKT_FRAG_MAX];
45 static int wait_for_perf_rx = 0;
46 static char *compresscfg = NULL;
47 static char *cryptocfg = NULL;
48 static int machine_output = 0;
49 static int use_access_lists = 0;
50 static int use_pckt_verification = 0;
51
52 static int bench_shutdown_in_progress = 0;
53 static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER;
54
55 #define TEST_PING 0
56 #define TEST_PING_AND_DATA 1
57 #define TEST_PERF_BY_SIZE 2
58 #define TEST_PERF_BY_TIME 3
59
60 static int test_type = TEST_PING;
61
62 #define TEST_START 2
63 #define TEST_STOP 4
64 #define TEST_COMPLETE 6
65
66 #define ONE_GIGABYTE 1073741824
67
68 static uint64_t perf_by_size_size = 1 * ONE_GIGABYTE;
69 static uint64_t perf_by_time_secs = 10;
70
71 static uint32_t force_packet_size = 0;
72
73 struct node {
74 int nodeid;
75 int links;
76 uint8_t transport[KNET_MAX_LINK];
77 struct sockaddr_storage address[KNET_MAX_LINK];
78 };
79
80 struct pckt_ver {
81 uint32_t len;
82 uint32_t chksum;
83 };
84
85 static void print_help(void)
86 {
87 printf("knet_bench usage:\n");
88 printf(" -h print this help (no really)\n");
89 printf(" -d enable debug logs (default INFO)\n");
90 printf(" -f enable use of access lists (default: off)\n");
91 printf(" -c [implementation]:[crypto]:[hashing] crypto configuration. (default disabled)\n");
92 printf(" Example: -c nss:aes128:sha1\n");
93 printf(" -z [implementation]:[level]:[threshold] compress configuration. (default disabled)\n");
94 printf(" Example: -z zlib:5:100\n");
95 printf(" -p [active|passive|rr] (default: passive)\n");
96 printf(" -P [UDP|SCTP] (default: UDP) protocol (transport) to use for all links\n");
97 printf(" -t [nodeid] This nodeid (required)\n");
98 printf(" -n [nodeid],[proto]/[link1_ip],[link2_..] Other nodes information (at least one required)\n");
99 printf(" Example: -n 1,192.168.8.1,SCTP/3ffe::8:1,UDP/172...\n");
100 printf(" can be repeated up to %d and should contain also the localnode info\n", MAX_NODES);
101 printf(" -b [port] baseport (default: 50000)\n");
102 printf(" -l enable global listener on 0.0.0.0/:: (default: off, incompatible with -o)\n");
103 printf(" -o enable baseport offset per nodeid\n");
104 printf(" -m change PMTUd interval in seconds (default: 60)\n");
105 printf(" -w dont wait for all nodes to be up before starting the test (default: wait)\n");
106 printf(" -T [ping|ping_data|perf-by-size|perf-by-time]\n");
107 printf(" test type (default: ping)\n");
108 printf(" ping: will wait for all hosts to join the knet network, sleep 5 seconds and quit\n");
109 printf(" ping_data: will wait for all hosts to join the knet network, sends some data to all nodes and quit\n");
110 printf(" perf-by-size: will wait for all hosts to join the knet network,\n");
111 printf(" perform a series of benchmarks by transmitting a known\n");
112 printf(" size/quantity of packets and measuring the time, then quit\n");
113 printf(" perf-by-time: will wait for all hosts to join the knet network,\n");
114 printf(" perform a series of benchmarks by transmitting a known\n");
115 printf(" size of packets for a given amount of time (10 seconds)\n");
116 printf(" and measuring the quantity of data transmitted, then quit\n");
117 printf(" -s nodeid that will generate traffic for benchmarks\n");
118 printf(" -S [size|seconds] when used in combination with -T perf-by-size it indicates how many GB of traffic to generate for the test. (default: 1GB)\n");
119 printf(" when used in combination with -T perf-by-time it indicates how many Seconds of traffic to generate for the test. (default: 10 seconds)\n");
120 printf(" -x force packet size for perf-by-time or perf-by-size\n");
121 printf(" -C repeat the test continously (default: off)\n");
122 printf(" -X[XX] show stats at the end of the run (default: 1)\n");
123 printf(" 1: show handle stats, 2: show summary link stats\n");
124 printf(" 3: show detailed link stats\n");
125 printf(" -a enable machine parsable output (default: off).\n");
126 printf(" -v enable packet verification for performance tests (default: off).\n");
127 }
128
129 static void parse_nodes(char *nodesinfo[MAX_NODES], int onidx, int port, struct node nodes[MAX_NODES], int *thisidx)
130 {
131 int i;
132 char *temp = NULL;
133 char port_str[11];
134
135 memset(port_str, 0, sizeof(port_str));
136 snprintf(port_str, sizeof(port_str), "%d", port);
137
138 for (i = 0; i < onidx; i++) {
139 nodes[i].nodeid = atoi(strtok(nodesinfo[i], ","));
140 if ((nodes[i].nodeid < 0) || (nodes[i].nodeid > KNET_MAX_HOST)) {
141 printf("Invalid nodeid: %d (0 - %d)\n", nodes[i].nodeid, KNET_MAX_HOST);
142 exit(FAIL);
143 }
144 if (thisnodeid == nodes[i].nodeid) {
145 *thisidx = i;
146 }
147 while((temp = strtok(NULL, ","))) {
148 char *slash = NULL;
149 uint8_t transport;
150
151 if (nodes[i].links == KNET_MAX_LINK) {
152 printf("Too many links configured. Max %d\n", KNET_MAX_LINK);
153 exit(FAIL);
154 }
155
156 slash = strstr(temp, "/");
157 if (slash) {
158 memset(slash, 0, 1);
159 transport = knet_get_transport_id_by_name(temp);
160 if (transport == KNET_MAX_TRANSPORTS) {
161 printf("Unknown transport: %s\n", temp);
162 exit(FAIL);
163 }
164 nodes[i].transport[nodes[i].links] = transport;
165 temp = slash + 1;
166 } else {
167 nodes[i].transport[nodes[i].links] = KNET_TRANSPORT_UDP;
168 }
169
170 if (knet_strtoaddr(temp, port_str,
171 &nodes[i].address[nodes[i].links],
172 sizeof(struct sockaddr_storage)) < 0) {
173 printf("Unable to convert %s to sockaddress\n", temp);
174 exit(FAIL);
175 }
176 nodes[i].links++;
177 }
178 }
179
180 if (knet_strtoaddr("0.0.0.0", port_str, &allv4, sizeof(struct sockaddr_storage)) < 0) {
181 printf("Unable to convert 0.0.0.0 to sockaddress\n");
182 exit(FAIL);
183 }
184
185 if (knet_strtoaddr("::", port_str, &allv6, sizeof(struct sockaddr_storage)) < 0) {
186 printf("Unable to convert :: to sockaddress\n");
187 exit(FAIL);
188 }
189
190 for (i = 1; i < onidx; i++) {
191 if (nodes[0].links != nodes[i].links) {
192 printf("knet_bench does not support unbalanced link configuration\n");
193 exit(FAIL);
194 }
195 }
196
197 return;
198 }
199
200 static int private_data;
201
202 static void sock_notify(void *pvt_data,
203 int local_datafd,
204 int8_t local_channel,
205 uint8_t tx_rx,
206 int error,
207 int errorno)
208 {
209 printf("[info]: error (%d - %d - %s) from socket: %d\n", error, errorno, strerror(errno), local_datafd);
210 return;
211 }
212
213 static int ping_dst_host_filter(void *pvt_data,
214 const unsigned char *outdata,
215 ssize_t outdata_len,
216 uint8_t tx_rx,
217 knet_node_id_t this_host_id,
218 knet_node_id_t src_host_id,
219 int8_t *dst_channel,
220 knet_node_id_t *dst_host_ids,
221 size_t *dst_host_ids_entries)
222 {
223 if (broadcast_test) {
224 return 1;
225 }
226
227 if (tx_rx == KNET_NOTIFY_TX) {
228 memmove(&dst_host_ids[0], outdata, 2);
229 } else {
230 dst_host_ids[0] = this_host_id;
231 }
232 *dst_host_ids_entries = 1;
233 return 0;
234 }
235
236 static void setup_knet(int argc, char *argv[])
237 {
238 int logfd = 0;
239 int rv;
240 char *policystr = NULL, *protostr = NULL;
241 char *othernodeinfo[MAX_NODES];
242 struct node nodes[MAX_NODES];
243 int thisidx = -1;
244 int onidx = 0;
245 int debug = KNET_LOG_INFO;
246 int port = 50000, portoffset = 0;
247 int thisport = 0, otherport = 0;
248 int thisnewport = 0, othernewport = 0;
249 struct sockaddr_in *so_in;
250 struct sockaddr_in6 *so_in6;
251 struct sockaddr_storage *src;
252 int i, link_idx, allnodesup = 0;
253 int policy = KNET_LINK_POLICY_PASSIVE, policyfound = 0;
254 int protocol = KNET_TRANSPORT_UDP, protofound = 0;
255 int wait = 1;
256 int pmtud_interval = 60;
257 struct knet_handle_crypto_cfg knet_handle_crypto_cfg;
258 char *cryptomodel = NULL, *cryptotype = NULL, *cryptohash = NULL;
259 struct knet_handle_compress_cfg knet_handle_compress_cfg;
260
261 memset(nodes, 0, sizeof(nodes));
262
263 while ((rv = getopt(argc, argv, "aCT:S:s:lvdfom:wb:t:n:c:p:x:X::P:z:h")) != EOF) {
264 switch(rv) {
265 case 'h':
266 print_help();
267 exit(PASS);
268 break;
269 case 'a':
270 machine_output = 1;
271 break;
272 case 'd':
273 debug = KNET_LOG_DEBUG;
274 break;
275 case 'f':
276 use_access_lists = 1;
277 break;
278 case 'c':
279 if (cryptocfg) {
280 printf("Error: -c can only be specified once\n");
281 exit(FAIL);
282 }
283 cryptocfg = optarg;
284 break;
285 case 'p':
286 if (policystr) {
287 printf("Error: -p can only be specified once\n");
288 exit(FAIL);
289 }
290 if (optarg) {
291 policystr = optarg;
292 if (!strcmp(policystr, "active")) {
293 policy = KNET_LINK_POLICY_ACTIVE;
294 policyfound = 1;
295 }
296 /*
297 * we can't use rr because clangs can't compile
298 * an array of 3 strings, one of which is 2 bytes long
299 */
300 if (!strcmp(policystr, "round-robin")) {
301 policy = KNET_LINK_POLICY_RR;
302 policyfound = 1;
303 }
304 if (!strcmp(policystr, "passive")) {
305 policy = KNET_LINK_POLICY_PASSIVE;
306 policyfound = 1;
307 }
308 }
309 if (!policyfound) {
310 printf("Error: invalid policy %s specified. -p accepts active|passive|rr\n", policystr);
311 exit(FAIL);
312 }
313 break;
314 case 'P':
315 if (protostr) {
316 printf("Error: -P can only be specified once\n");
317 exit(FAIL);
318 }
319 if (optarg) {
320 protostr = optarg;
321 if (!strcmp(protostr, "UDP")) {
322 protocol = KNET_TRANSPORT_UDP;
323 protofound = 1;
324 }
325 if (!strcmp(protostr, "SCTP")) {
326 protocol = KNET_TRANSPORT_SCTP;
327 protofound = 1;
328 }
329 }
330 if (!protofound) {
331 printf("Error: invalid protocol %s specified. -P accepts udp|sctp\n", policystr);
332 exit(FAIL);
333 }
334 break;
335 case 't':
336 if (thisnodeid >= 0) {
337 printf("Error: -t can only be specified once\n");
338 exit(FAIL);
339 }
340 thisnodeid = atoi(optarg);
341 if ((thisnodeid < 0) || (thisnodeid > 65536)) {
342 printf("Error: -t nodeid out of range %d (1 - 65536)\n", thisnodeid);
343 exit(FAIL);
344 }
345 break;
346 case 'n':
347 if (onidx == MAX_NODES) {
348 printf("Error: too many other nodes. Max %d\n", MAX_NODES);
349 exit(FAIL);
350 }
351 othernodeinfo[onidx] = optarg;
352 onidx++;
353 break;
354 case 'b':
355 port = atoi(optarg);
356 if ((port < 1) || (port > 65536)) {
357 printf("Error: port %d out of range (1 - 65536)\n", port);
358 exit(FAIL);
359 }
360 break;
361 case 'o':
362 if (globallistener) {
363 printf("Error: -l cannot be used with -o\n");
364 exit(FAIL);
365 }
366 portoffset = 1;
367 break;
368 case 'm':
369 pmtud_interval = atoi(optarg);
370 if (pmtud_interval < 1) {
371 printf("Error: pmtud interval %d out of range (> 0)\n", pmtud_interval);
372 exit(FAIL);
373 }
374 break;
375 case 'l':
376 if (portoffset) {
377 printf("Error: -o cannot be used with -l\n");
378 exit(FAIL);
379 }
380 globallistener = 1;
381 break;
382 case 'w':
383 wait = 0;
384 break;
385 case 's':
386 if (senderid >= 0) {
387 printf("Error: -s can only be specified once\n");
388 exit(FAIL);
389 }
390 senderid = atoi(optarg);
391 if ((senderid < 0) || (senderid > 65536)) {
392 printf("Error: -s nodeid out of range %d (1 - 65536)\n", senderid);
393 exit(FAIL);
394 }
395 break;
396 case 'T':
397 if (optarg) {
398 if (!strcmp("ping", optarg)) {
399 test_type = TEST_PING;
400 }
401 if (!strcmp("ping_data", optarg)) {
402 test_type = TEST_PING_AND_DATA;
403 }
404 if (!strcmp("perf-by-size", optarg)) {
405 test_type = TEST_PERF_BY_SIZE;
406 }
407 if (!strcmp("perf-by-time", optarg)) {
408 test_type = TEST_PERF_BY_TIME;
409 }
410 } else {
411 printf("Error: -T requires an option\n");
412 exit(FAIL);
413 }
414 break;
415 case 'S':
416 perf_by_size_size = (uint64_t)atoi(optarg) * ONE_GIGABYTE;
417 perf_by_time_secs = (uint64_t)atoi(optarg);
418 break;
419 case 'x':
420 force_packet_size = (uint32_t)atoi(optarg);
421 if ((force_packet_size < 64) || (force_packet_size > 65536)) {
422 printf("Unsupported packet size %u (accepted 64 - 65536)\n", force_packet_size);
423 exit(FAIL);
424 }
425 break;
426 case 'v':
427 use_pckt_verification = 1;
428 break;
429 case 'C':
430 continous = 1;
431 break;
432 case 'X':
433 if (optarg) {
434 show_stats = atoi(optarg);
435 } else {
436 show_stats = 1;
437 }
438 break;
439 case 'z':
440 if (compresscfg) {
441 printf("Error: -c can only be specified once\n");
442 exit(FAIL);
443 }
444 compresscfg = optarg;
445 break;
446 default:
447 break;
448 }
449 }
450
451 if (thisnodeid < 0) {
452 printf("Who am I?!? missing -t from command line?\n");
453 exit(FAIL);
454 }
455
456 if (onidx < 1) {
457 printf("no other nodes configured?!? missing -n from command line\n");
458 exit(FAIL);
459 }
460
461 parse_nodes(othernodeinfo, onidx, port, nodes, &thisidx);
462
463 if (thisidx < 0) {
464 printf("no config for this node found\n");
465 exit(FAIL);
466 }
467
468 if (senderid >= 0) {
469 for (i=0; i < onidx; i++) {
470 if (senderid == nodes[i].nodeid) {
471 break;
472 }
473 }
474 if (i == onidx) {
475 printf("Unable to find senderid in nodelist\n");
476 exit(FAIL);
477 }
478 }
479
480 if (((test_type == TEST_PERF_BY_SIZE) || (test_type == TEST_PERF_BY_TIME)) && (senderid < 0)) {
481 printf("Error: performance test requires -s to be set (for now)\n");
482 exit(FAIL);
483 }
484
485 logfd = start_logging(stdout);
486
487 knet_h = knet_handle_new(thisnodeid, logfd, debug);
488 if (!knet_h) {
489 printf("Unable to knet_handle_new: %s\n", strerror(errno));
490 exit(FAIL);
491 }
492
493 if (knet_handle_enable_access_lists(knet_h, use_access_lists) < 0) {
494 printf("Unable to knet_handle_enable_access_lists: %s\n", strerror(errno));
495 exit(FAIL);
496 }
497
498 if (cryptocfg) {
499 memset(&knet_handle_crypto_cfg, 0, sizeof(knet_handle_crypto_cfg));
500 cryptomodel = strtok(cryptocfg, ":");
501 cryptotype = strtok(NULL, ":");
502 cryptohash = strtok(NULL, ":");
503 if (cryptomodel) {
504 strncpy(knet_handle_crypto_cfg.crypto_model, cryptomodel, sizeof(knet_handle_crypto_cfg.crypto_model) - 1);
505 }
506 if (cryptotype) {
507 strncpy(knet_handle_crypto_cfg.crypto_cipher_type, cryptotype, sizeof(knet_handle_crypto_cfg.crypto_cipher_type) - 1);
508 }
509 if (cryptohash) {
510 strncpy(knet_handle_crypto_cfg.crypto_hash_type, cryptohash, sizeof(knet_handle_crypto_cfg.crypto_hash_type) - 1);
511 }
512 knet_handle_crypto_cfg.private_key_len = KNET_MAX_KEY_LEN;
513 if (knet_handle_crypto(knet_h, &knet_handle_crypto_cfg)) {
514 printf("Unable to init crypto\n");
515 exit(FAIL);
516 }
517 }
518
519 if (compresscfg) {
520 memset(&knet_handle_compress_cfg, 0, sizeof(struct knet_handle_compress_cfg));
521 snprintf(knet_handle_compress_cfg.compress_model, 16, "%s", strtok(compresscfg, ":"));
522 knet_handle_compress_cfg.compress_level = atoi(strtok(NULL, ":"));
523 knet_handle_compress_cfg.compress_threshold = atoi(strtok(NULL, ":"));
524 if (knet_handle_compress(knet_h, &knet_handle_compress_cfg)) {
525 printf("Unable to configure compress\n");
526 exit(FAIL);
527 }
528 }
529
530 if (knet_handle_enable_sock_notify(knet_h, &private_data, sock_notify) < 0) {
531 printf("knet_handle_enable_sock_notify failed: %s\n", strerror(errno));
532 knet_handle_free(knet_h);
533 exit(FAIL);
534 }
535
536 datafd = 0;
537 channel = -1;
538
539 if (knet_handle_add_datafd(knet_h, &datafd, &channel) < 0) {
540 printf("knet_handle_add_datafd failed: %s\n", strerror(errno));
541 knet_handle_free(knet_h);
542 exit(FAIL);
543 }
544
545 if (knet_handle_pmtud_setfreq(knet_h, pmtud_interval) < 0) {
546 printf("knet_handle_pmtud_setfreq failed: %s\n", strerror(errno));
547 knet_handle_free(knet_h);
548 exit(FAIL);
549 }
550
551 for (i=0; i < onidx; i++) {
552 if (i == thisidx) {
553 continue;
554 }
555
556 if (knet_host_add(knet_h, nodes[i].nodeid) < 0) {
557 printf("knet_host_add failed: %s\n", strerror(errno));
558 exit(FAIL);
559 }
560
561 if (knet_host_set_policy(knet_h, nodes[i].nodeid, policy) < 0) {
562 printf("knet_host_set_policy failed: %s\n", strerror(errno));
563 exit(FAIL);
564 }
565
566 for (link_idx = 0; link_idx < nodes[i].links; link_idx++) {
567 if (portoffset) {
568 if (nodes[thisidx].address[link_idx].ss_family == AF_INET) {
569 so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx];
570 thisport = ntohs(so_in->sin_port);
571 thisnewport = thisport + nodes[i].nodeid;
572 so_in->sin_port = (htons(thisnewport));
573 so_in = (struct sockaddr_in *)&nodes[i].address[link_idx];
574 otherport = ntohs(so_in->sin_port);
575 othernewport = otherport + nodes[thisidx].nodeid;
576 so_in->sin_port = (htons(othernewport));
577 } else {
578 so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx];
579 thisport = ntohs(so_in6->sin6_port);
580 thisnewport = thisport + nodes[i].nodeid;
581 so_in6->sin6_port = (htons(thisnewport));
582 so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx];
583 otherport = ntohs(so_in6->sin6_port);
584 othernewport = otherport + nodes[thisidx].nodeid;
585 so_in6->sin6_port = (htons(othernewport));
586 }
587 }
588 if (!globallistener) {
589 src = &nodes[thisidx].address[link_idx];
590 } else {
591 if (nodes[thisidx].address[link_idx].ss_family == AF_INET) {
592 src = &allv4;
593 } else {
594 src = &allv6;
595 }
596 }
597 /*
598 * -P overrides per link protocol configuration
599 */
600 if (protofound) {
601 nodes[i].transport[link_idx] = protocol;
602 }
603 if (knet_link_set_config(knet_h, nodes[i].nodeid, link_idx,
604 nodes[i].transport[link_idx], src,
605 &nodes[i].address[link_idx], 0) < 0) {
606 printf("Unable to configure link: %s\n", strerror(errno));
607 exit(FAIL);
608 }
609 if (portoffset) {
610 if (nodes[thisidx].address[link_idx].ss_family == AF_INET) {
611 so_in = (struct sockaddr_in *)&nodes[thisidx].address[link_idx];
612 so_in->sin_port = (htons(thisport));
613 so_in = (struct sockaddr_in *)&nodes[i].address[link_idx];
614 so_in->sin_port = (htons(otherport));
615 } else {
616 so_in6 = (struct sockaddr_in6 *)&nodes[thisidx].address[link_idx];
617 so_in6->sin6_port = (htons(thisport));
618 so_in6 = (struct sockaddr_in6 *)&nodes[i].address[link_idx];
619 so_in6->sin6_port = (htons(otherport));
620 }
621 }
622 if (knet_link_set_enable(knet_h, nodes[i].nodeid, link_idx, 1) < 0) {
623 printf("knet_link_set_enable failed: %s\n", strerror(errno));
624 exit(FAIL);
625 }
626 if (knet_link_set_ping_timers(knet_h, nodes[i].nodeid, link_idx, 1000, 10000, 2048) < 0) {
627 printf("knet_link_set_ping_timers failed: %s\n", strerror(errno));
628 exit(FAIL);
629 }
630 if (knet_link_set_pong_count(knet_h, nodes[i].nodeid, link_idx, 2) < 0) {
631 printf("knet_link_set_pong_count failed: %s\n", strerror(errno));
632 exit(FAIL);
633 }
634 }
635 }
636
637 if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) {
638 printf("Unable to enable dst_host_filter: %s\n", strerror(errno));
639 exit(FAIL);
640 }
641
642 if (knet_handle_setfwd(knet_h, 1) < 0) {
643 printf("knet_handle_setfwd failed: %s\n", strerror(errno));
644 exit(FAIL);
645 }
646
647 if (wait) {
648 while(!allnodesup) {
649 allnodesup = 1;
650 for (i=0; i < onidx; i++) {
651 if (i == thisidx) {
652 continue;
653 }
654 if (knet_h->host_index[nodes[i].nodeid]->status.reachable != 1) {
655 printf("[info]: waiting host %d to be reachable\n", nodes[i].nodeid);
656 allnodesup = 0;
657 }
658 }
659 if (!allnodesup) {
660 sleep(1);
661 }
662 }
663 sleep(1);
664 }
665 }
666
667 /*
668 * calculate weak chksum (stole from corosync for debugging purposes)
669 */
670 static uint32_t compute_chsum(const unsigned char *data, uint32_t data_len)
671 {
672 unsigned int i;
673 unsigned int checksum = 0;
674
675 for (i = 0; i < data_len; i++) {
676 if (checksum & 1) {
677 checksum |= 0x10000;
678 }
679
680 checksum = ((checksum >> 1) + (unsigned char)data[i]) & 0xffff;
681 }
682 return (checksum);
683 }
684
685 static void *_rx_thread(void *args)
686 {
687 int rx_epoll;
688 struct epoll_event ev;
689 struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
690
691 struct sockaddr_storage address[PCKT_FRAG_MAX];
692 struct knet_mmsghdr msg[PCKT_FRAG_MAX];
693 struct iovec iov_in[PCKT_FRAG_MAX];
694 int i, msg_recv;
695 struct timespec clock_start, clock_end;
696 unsigned long long time_diff = 0;
697 uint64_t rx_pkts = 0;
698 uint64_t rx_bytes = 0;
699 unsigned int current_pckt_size = 0;
700
|
(1) Event cond_true: |
Condition "(unsigned int)i < 255", taking true branch. |
|
(5) Event loop_begin: |
Jumped back to beginning of loop. |
|
(6) Event cond_true: |
Condition "(unsigned int)i < 255", taking true branch. |
|
(10) Event loop_begin: |
Jumped back to beginning of loop. |
|
(11) Event cond_false: |
Condition "(unsigned int)i < 255", taking false branch. |
701 for (i = 0; (unsigned int)i < PCKT_FRAG_MAX; i++) {
702 rx_buf[i] = malloc(KNET_MAX_PACKET_SIZE);
|
(2) Event cond_false: |
Condition "!rx_buf[i]", taking false branch. |
|
(7) Event cond_false: |
Condition "!rx_buf[i]", taking false branch. |
703 if (!rx_buf[i]) {
704 printf("RXT: Unable to malloc!\nHALTING RX THREAD!\n");
705 return NULL;
|
(3) Event if_end: |
End of if statement. |
|
(8) Event if_end: |
End of if statement. |
706 }
707 memset(rx_buf[i], 0, KNET_MAX_PACKET_SIZE);
708 iov_in[i].iov_base = (void *)rx_buf[i];
709 iov_in[i].iov_len = KNET_MAX_PACKET_SIZE;
710 memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
711 msg[i].msg_hdr.msg_name = &address[i];
712 msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage);
713 msg[i].msg_hdr.msg_iov = &iov_in[i];
714 msg[i].msg_hdr.msg_iovlen = 1;
|
(4) Event loop: |
Jumping back to the beginning of the loop. |
|
(9) Event loop: |
Jumping back to the beginning of the loop. |
|
(12) Event loop_end: |
Reached end of loop. |
715 }
716
717 rx_epoll = epoll_create(KNET_EPOLL_MAX_EVENTS + 1);
|
(13) Event cond_false: |
Condition "rx_epoll < 0", taking false branch. |
718 if (rx_epoll < 0) {
719 printf("RXT: Unable to create epoll!\nHALTING RX THREAD!\n");
720 return NULL;
|
(14) Event if_end: |
End of if statement. |
721 }
722
723 memset(&ev, 0, sizeof(struct epoll_event));
724 ev.events = EPOLLIN;
725 ev.data.fd = datafd;
726
|
(15) Event cond_false: |
Condition "epoll_ctl(rx_epoll, 1, datafd, &ev)", taking false branch. |
727 if (epoll_ctl(rx_epoll, EPOLL_CTL_ADD, datafd, &ev)) {
728 printf("RXT: Unable to add datafd to epoll\nHALTING RX THREAD!\n");
729 return NULL;
|
(16) Event if_end: |
End of if statement. |
730 }
731
732 memset(&clock_start, 0, sizeof(clock_start));
733 memset(&clock_end, 0, sizeof(clock_start));
734
|
(17) Event missing_lock: |
Accessing "bench_shutdown_in_progress" without holding lock "shutdown_mutex". Elsewhere, "bench_shutdown_in_progress" is written to with "shutdown_mutex" held 1 out of 1 times (1 of these accesses strongly imply that it is necessary). |
| Also see events: |
[lock_acquire][example_access] |
735 while (!bench_shutdown_in_progress) {
736 if (epoll_wait(rx_epoll, events, KNET_EPOLL_MAX_EVENTS, 1) >= 1) {
737 msg_recv = _recvmmsg(datafd, &msg[0], PCKT_FRAG_MAX, MSG_DONTWAIT | MSG_NOSIGNAL);
738 if (msg_recv < 0) {
739 printf("[info]: RXT: error from recvmmsg: %s\n", strerror(errno));
740 }
741 switch(test_type) {
742 case TEST_PING_AND_DATA:
743 for (i = 0; i < msg_recv; i++) {
744 if (msg[i].msg_len == 0) {
745 printf("[info]: RXT: received 0 bytes message?\n");
746 }
747 printf("[info]: received %u bytes message: %s\n", msg[i].msg_len, (char *)msg[i].msg_hdr.msg_iov->iov_base);
748 }
749 break;
750 case TEST_PERF_BY_TIME:
751 case TEST_PERF_BY_SIZE:
752 for (i = 0; i < msg_recv; i++) {
753 if (msg[i].msg_len < 64) {
754 if (msg[i].msg_len == 0) {
755 printf("[info]: RXT: received 0 bytes message?\n");
756 }
757 if (msg[i].msg_len == TEST_START) {
758 if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) {
759 printf("[info]: unable to get start time!\n");
760 }
761 }
762 if (msg[i].msg_len == TEST_STOP) {
763 double average_rx_mbytes;
764 double average_rx_pkts;
765 double time_diff_sec;
766 if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) {
767 printf("[info]: unable to get end time!\n");
768 }
769 timespec_diff(clock_start, clock_end, &time_diff);
770 /*
771 * adjust for sleep(2) between sending the last data and TEST_STOP
772 */
773 time_diff = time_diff - 2000000000llu;
774
775 /*
776 * convert to seconds
777 */
778 time_diff_sec = (double)time_diff / 1000000000llu;
779
780 average_rx_mbytes = (double)((rx_bytes / time_diff_sec) / (1024 * 1024));
781 average_rx_pkts = (double)(rx_pkts / time_diff_sec);
782 if (!machine_output) {
783 printf("[perf] execution time: %8.4f secs Average speed: %8.4f MB/sec %8.4f pckts/sec (size: %u total: %" PRIu64 ")\n",
784 time_diff_sec, average_rx_mbytes, average_rx_pkts, current_pckt_size, rx_pkts);
785 } else {
786 printf("[perf],%.4f,%u,%" PRIu64 ",%.4f,%.4f\n", time_diff_sec, current_pckt_size, rx_pkts, average_rx_mbytes, average_rx_pkts);
787 }
788 rx_pkts = 0;
789 rx_bytes = 0;
790 current_pckt_size = 0;
791 }
792 if (msg[i].msg_len == TEST_COMPLETE) {
793 wait_for_perf_rx = 1;
794 }
795 continue;
796 }
797 if (use_pckt_verification) {
798 struct pckt_ver *recv_pckt = (struct pckt_ver *)msg[i].msg_hdr.msg_iov->iov_base;
799 uint32_t chksum;
800
801 if (msg[i].msg_len != recv_pckt->len) {
802 printf("Wrong packet len received: %u expected: %u!\n", msg[i].msg_len, recv_pckt->len);
803 exit(FAIL);
804 }
805 chksum = compute_chsum((const unsigned char *)msg[i].msg_hdr.msg_iov->iov_base + sizeof(struct pckt_ver), msg[i].msg_len - sizeof(struct pckt_ver));
806 if (recv_pckt->chksum != chksum){
807 printf("Wrong packet checksum received: %u expected: %u!\n", recv_pckt->chksum, chksum);
808 exit(FAIL);
809 }
810 }
811 rx_pkts++;
812 rx_bytes = rx_bytes + msg[i].msg_len;
813 current_pckt_size = msg[i].msg_len;
814 }
815 break;
816 }
817 }
818 }
819
820 epoll_ctl(rx_epoll, EPOLL_CTL_DEL, datafd, &ev);
821 close(rx_epoll);
822
823 return NULL;
824 }
825
826 static void setup_data_txrx_common(void)
827 {
828 if (!rx_thread) {
829 if (knet_handle_enable_filter(knet_h, NULL, ping_dst_host_filter)) {
830 printf("Unable to enable dst_host_filter: %s\n", strerror(errno));
831 exit(FAIL);
832 }
833 printf("[info]: setting up rx thread\n");
834 if (pthread_create(&rx_thread, 0, _rx_thread, NULL)) {
835 printf("Unable to start rx thread\n");
836 exit(FAIL);
837 }
838 }
839 }
840
841 static void stop_rx_thread(void)
842 {
843 void *retval;
844 unsigned int i;
845
846 if (rx_thread) {
847 printf("[info]: shutting down rx thread\n");
848 sleep(2);
849 pthread_cancel(rx_thread);
850 pthread_join(rx_thread, &retval);
851 for (i = 0; i < PCKT_FRAG_MAX; i ++) {
852 free(rx_buf[i]);
853 }
854 }
855 }
856
857 static void send_ping_data(void)
858 {
859 char buf[65535];
860 ssize_t len;
861
862 memset(&buf, 0, sizeof(buf));
863 snprintf(buf, sizeof(buf), "Hello world!");
864
865 if (compresscfg) {
866 len = sizeof(buf);
867 } else {
868 len = strlen(buf);
869 }
870
871 if (knet_send(knet_h, buf, len, channel) != len) {
872 printf("[info]: Error sending hello world: %s\n", strerror(errno));
873 }
874 sleep(1);
875 }
876
877 static int send_messages(struct knet_mmsghdr *msg, int msgs_to_send)
878 {
879 int sent_msgs, prev_sent, progress, total_sent;
880
881 total_sent = 0;
882 sent_msgs = 0;
883 prev_sent = 0;
884 progress = 1;
885
886 retry:
887 errno = 0;
888 sent_msgs = _sendmmsg(datafd, 0, &msg[0], msgs_to_send, MSG_NOSIGNAL);
889
890 if (sent_msgs < 0) {
891 if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
892 usleep(KNET_THREADS_TIMERES / 16);
893 goto retry;
894 }
895 printf("[info]: Unable to send messages: %s\n", strerror(errno));
896 return -1;
897 }
898
899 total_sent = total_sent + sent_msgs;
900
901 if ((sent_msgs >= 0) && (sent_msgs < msgs_to_send)) {
902 if ((sent_msgs) || (progress)) {
903 msgs_to_send = msgs_to_send - sent_msgs;
904 prev_sent = prev_sent + sent_msgs;
905 if (sent_msgs) {
906 progress = 1;
907 } else {
908 progress = 0;
909 }
910 goto retry;
911 }
912 if (!progress) {
913 printf("[info]: Unable to send more messages after retry\n");
914 }
915 }
916 return total_sent;
917 }
918
919 static int setup_send_buffers_common(struct knet_mmsghdr *msg, struct iovec *iov_out, char *tx_buf[])
920 {
921 unsigned int i;
922
923 for (i = 0; i < PCKT_FRAG_MAX; i++) {
924 tx_buf[i] = malloc(KNET_MAX_PACKET_SIZE);
925 if (!tx_buf[i]) {
926 printf("TXT: Unable to malloc!\n");
927 return -1;
928 }
929 memset(tx_buf[i], i, KNET_MAX_PACKET_SIZE);
930 iov_out[i].iov_base = (void *)tx_buf[i];
931 memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
932 msg[i].msg_hdr.msg_iov = &iov_out[i];
933 msg[i].msg_hdr.msg_iovlen = 1;
934 }
935 return 0;
936 }
937
938 static void send_perf_data_by_size(void)
939 {
940 char *tx_buf[PCKT_FRAG_MAX];
941 struct knet_mmsghdr msg[PCKT_FRAG_MAX];
942 struct iovec iov_out[PCKT_FRAG_MAX];
943 char ctrl_message[16];
944 int sent_msgs;
945 unsigned int i;
946 uint64_t total_pkts_to_tx;
947 uint64_t packets_to_send;
948 uint32_t packetsize = 64;
949
950 setup_send_buffers_common(msg, iov_out, tx_buf);
951
952 while (packetsize <= KNET_MAX_PACKET_SIZE) {
953 if (force_packet_size) {
954 packetsize = force_packet_size;
955 }
956 for (i = 0; i < PCKT_FRAG_MAX; i++) {
957 iov_out[i].iov_len = packetsize;
958 if (use_pckt_verification) {
959 struct pckt_ver *tx_pckt = (struct pckt_ver *)&iov_out[i].iov_base;
960 tx_pckt->len = iov_out[i].iov_len;
961 tx_pckt->chksum = compute_chsum((const unsigned char *)iov_out[i].iov_base + sizeof(struct pckt_ver), iov_out[i].iov_len - sizeof(struct pckt_ver));
962 }
963 }
964
965 total_pkts_to_tx = perf_by_size_size / packetsize;
966 printf("[info]: testing with %u packet size. total bytes to transfer: %" PRIu64 " (%" PRIu64 " packets)\n", packetsize, perf_by_size_size, total_pkts_to_tx);
967
968 memset(ctrl_message, 0, sizeof(ctrl_message));
969 knet_send(knet_h, ctrl_message, TEST_START, channel);
970
971 while (total_pkts_to_tx > 0) {
972 if (total_pkts_to_tx >= PCKT_FRAG_MAX) {
973 packets_to_send = PCKT_FRAG_MAX;
974 } else {
975 packets_to_send = total_pkts_to_tx;
976 }
977 sent_msgs = send_messages(&msg[0], packets_to_send);
978 if (sent_msgs < 0) {
979 printf("Something went wrong, aborting\n");
980 exit(FAIL);
981 }
982 total_pkts_to_tx = total_pkts_to_tx - sent_msgs;
983 }
984
985 sleep(2);
986
987 knet_send(knet_h, ctrl_message, TEST_STOP, channel);
988
989 if ((packetsize == KNET_MAX_PACKET_SIZE) || (force_packet_size)) {
990 break;
991 }
992
993 /*
994 * Use a multiplier that can always divide properly a GB
995 * into smaller chunks without worry about boundaries
996 */
997 packetsize *= 4;
998
999 if (packetsize > KNET_MAX_PACKET_SIZE) {
1000 packetsize = KNET_MAX_PACKET_SIZE;
1001 }
1002 }
1003
1004 knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel);
1005
1006 for (i = 0; i < PCKT_FRAG_MAX; i++) {
1007 free(tx_buf[i]);
1008 }
1009 }
1010
1011 /* For sorting the node list into order */
1012 static int node_compare(const void *aptr, const void *bptr)
1013 {
1014 uint16_t a,b;
1015
1016 a = *(uint16_t *)aptr;
1017 b = *(uint16_t *)bptr;
1018
1019 return a > b;
1020 }
1021
1022 static void display_stats(int level)
1023 {
1024 struct knet_handle_stats handle_stats;
1025 struct knet_link_status link_status;
1026 struct knet_link_stats total_link_stats;
1027 knet_node_id_t host_list[KNET_MAX_HOST];
1028 uint8_t link_list[KNET_MAX_LINK];
1029 unsigned int i,j;
1030 size_t num_hosts, num_links;
1031
1032 if (knet_handle_get_stats(knet_h, &handle_stats, sizeof(handle_stats)) < 0) {
1033 perror("[info]: failed to get knet handle stats");
1034 return;
1035 }
1036
1037 if (compresscfg || cryptocfg) {
1038 printf("\n");
1039 printf("[stat]: handle stats\n");
1040 printf("[stat]: ------------\n");
1041 if (compresscfg) {
1042 printf("[stat]: tx_uncompressed_packets: %" PRIu64 "\n", handle_stats.tx_uncompressed_packets);
1043 printf("[stat]: tx_compressed_packets: %" PRIu64 "\n", handle_stats.tx_compressed_packets);
1044 printf("[stat]: tx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_original_bytes);
1045 printf("[stat]: tx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.tx_compressed_size_bytes );
1046 printf("[stat]: tx_compress_time_ave: %" PRIu64 "\n", handle_stats.tx_compress_time_ave);
1047 printf("[stat]: tx_compress_time_min: %" PRIu64 "\n", handle_stats.tx_compress_time_min);
1048 printf("[stat]: tx_compress_time_max: %" PRIu64 "\n", handle_stats.tx_compress_time_max);
1049 printf("[stat]: rx_compressed_packets: %" PRIu64 "\n", handle_stats.rx_compressed_packets);
1050 printf("[stat]: rx_compressed_original_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_original_bytes);
1051 printf("[stat]: rx_compressed_size_bytes: %" PRIu64 "\n", handle_stats.rx_compressed_size_bytes);
1052 printf("[stat]: rx_compress_time_ave: %" PRIu64 "\n", handle_stats.rx_compress_time_ave);
1053 printf("[stat]: rx_compress_time_min: %" PRIu64 "\n", handle_stats.rx_compress_time_min);
1054 printf("[stat]: rx_compress_time_max: %" PRIu64 "\n", handle_stats.rx_compress_time_max);
1055 printf("\n");
1056 }
1057 if (cryptocfg) {
1058 printf("[stat]: tx_crypt_packets: %" PRIu64 "\n", handle_stats.tx_crypt_packets);
1059 printf("[stat]: tx_crypt_byte_overhead: %" PRIu64 "\n", handle_stats.tx_crypt_byte_overhead);
1060 printf("[stat]: tx_crypt_time_ave: %" PRIu64 "\n", handle_stats.tx_crypt_time_ave);
1061 printf("[stat]: tx_crypt_time_min: %" PRIu64 "\n", handle_stats.tx_crypt_time_min);
1062 printf("[stat]: tx_crypt_time_max: %" PRIu64 "\n", handle_stats.tx_crypt_time_max);
1063 printf("[stat]: rx_crypt_packets: %" PRIu64 "\n", handle_stats.rx_crypt_packets);
1064 printf("[stat]: rx_crypt_time_ave: %" PRIu64 "\n", handle_stats.rx_crypt_time_ave);
1065 printf("[stat]: rx_crypt_time_min: %" PRIu64 "\n", handle_stats.rx_crypt_time_min);
1066 printf("[stat]: rx_crypt_time_max: %" PRIu64 "\n", handle_stats.rx_crypt_time_max);
1067 printf("\n");
1068 }
1069 }
1070 if (level < 2) {
1071 return;
1072 }
1073
1074 memset(&total_link_stats, 0, sizeof(struct knet_link_stats));
1075
1076 if (knet_host_get_host_list(knet_h, host_list, &num_hosts) < 0) {
1077 perror("[info]: cannot get host list for stats");
1078 return;
1079 }
1080
1081 /* Print in host ID order */
1082 qsort(host_list, num_hosts, sizeof(uint16_t), node_compare);
1083
1084 for (j=0; j<num_hosts; j++) {
1085 if (knet_link_get_link_list(knet_h, host_list[j], link_list, &num_links) < 0) {
1086 perror("[info]: cannot get link list for stats");
1087 return;
1088 }
1089
1090 for (i=0; i < num_links; i++) {
1091 if (knet_link_get_status(knet_h, host_list[j], link_list[i], &link_status, sizeof(link_status)) < 0) {
1092 perror("[info]: cannot get link status");
1093 return;
1094 }
1095
1096 total_link_stats.tx_data_packets += link_status.stats.tx_data_packets;
1097 total_link_stats.rx_data_packets += link_status.stats.rx_data_packets;
1098 total_link_stats.tx_data_bytes += link_status.stats.tx_data_bytes;
1099 total_link_stats.rx_data_bytes += link_status.stats.rx_data_bytes;
1100 total_link_stats.rx_ping_packets += link_status.stats.rx_ping_packets;
1101 total_link_stats.tx_ping_packets += link_status.stats.tx_ping_packets;
1102 total_link_stats.rx_ping_bytes += link_status.stats.rx_ping_bytes;
1103 total_link_stats.tx_ping_bytes += link_status.stats.tx_ping_bytes;
1104 total_link_stats.rx_pong_packets += link_status.stats.rx_pong_packets;
1105 total_link_stats.tx_pong_packets += link_status.stats.tx_pong_packets;
1106 total_link_stats.rx_pong_bytes += link_status.stats.rx_pong_bytes;
1107 total_link_stats.tx_pong_bytes += link_status.stats.tx_pong_bytes;
1108 total_link_stats.rx_pmtu_packets += link_status.stats.rx_pmtu_packets;
1109 total_link_stats.tx_pmtu_packets += link_status.stats.tx_pmtu_packets;
1110 total_link_stats.rx_pmtu_bytes += link_status.stats.rx_pmtu_bytes;
1111 total_link_stats.tx_pmtu_bytes += link_status.stats.tx_pmtu_bytes;
1112
1113 total_link_stats.tx_total_packets += link_status.stats.tx_total_packets;
1114 total_link_stats.rx_total_packets += link_status.stats.rx_total_packets;
1115 total_link_stats.tx_total_bytes += link_status.stats.tx_total_bytes;
1116 total_link_stats.rx_total_bytes += link_status.stats.rx_total_bytes;
1117 total_link_stats.tx_total_errors += link_status.stats.tx_total_errors;
1118 total_link_stats.tx_total_retries += link_status.stats.tx_total_retries;
1119
1120 total_link_stats.tx_pmtu_errors += link_status.stats.tx_pmtu_errors;
1121 total_link_stats.tx_pmtu_retries += link_status.stats.tx_pmtu_retries;
1122 total_link_stats.tx_ping_errors += link_status.stats.tx_ping_errors;
1123 total_link_stats.tx_ping_retries += link_status.stats.tx_ping_retries;
1124 total_link_stats.tx_pong_errors += link_status.stats.tx_pong_errors;
1125 total_link_stats.tx_pong_retries += link_status.stats.tx_pong_retries;
1126 total_link_stats.tx_data_errors += link_status.stats.tx_data_errors;
1127 total_link_stats.tx_data_retries += link_status.stats.tx_data_retries;
1128
1129 total_link_stats.down_count += link_status.stats.down_count;
1130 total_link_stats.up_count += link_status.stats.up_count;
1131
1132 if (level > 2) {
1133 printf("\n");
1134 printf("[stat]: Node %d Link %d\n", host_list[j], link_list[i]);
1135
1136 printf("[stat]: tx_data_packets: %" PRIu64 "\n", link_status.stats.tx_data_packets);
1137 printf("[stat]: rx_data_packets: %" PRIu64 "\n", link_status.stats.rx_data_packets);
1138 printf("[stat]: tx_data_bytes: %" PRIu64 "\n", link_status.stats.tx_data_bytes);
1139 printf("[stat]: rx_data_bytes: %" PRIu64 "\n", link_status.stats.rx_data_bytes);
1140 printf("[stat]: rx_ping_packets: %" PRIu64 "\n", link_status.stats.rx_ping_packets);
1141 printf("[stat]: tx_ping_packets: %" PRIu64 "\n", link_status.stats.tx_ping_packets);
1142 printf("[stat]: rx_ping_bytes: %" PRIu64 "\n", link_status.stats.rx_ping_bytes);
1143 printf("[stat]: tx_ping_bytes: %" PRIu64 "\n", link_status.stats.tx_ping_bytes);
1144 printf("[stat]: rx_pong_packets: %" PRIu64 "\n", link_status.stats.rx_pong_packets);
1145 printf("[stat]: tx_pong_packets: %" PRIu64 "\n", link_status.stats.tx_pong_packets);
1146 printf("[stat]: rx_pong_bytes: %" PRIu64 "\n", link_status.stats.rx_pong_bytes);
1147 printf("[stat]: tx_pong_bytes: %" PRIu64 "\n", link_status.stats.tx_pong_bytes);
1148 printf("[stat]: rx_pmtu_packets: %" PRIu64 "\n", link_status.stats.rx_pmtu_packets);
1149 printf("[stat]: tx_pmtu_packets: %" PRIu64 "\n", link_status.stats.tx_pmtu_packets);
1150 printf("[stat]: rx_pmtu_bytes: %" PRIu64 "\n", link_status.stats.rx_pmtu_bytes);
1151 printf("[stat]: tx_pmtu_bytes: %" PRIu64 "\n", link_status.stats.tx_pmtu_bytes);
1152
1153 printf("[stat]: tx_total_packets: %" PRIu64 "\n", link_status.stats.tx_total_packets);
1154 printf("[stat]: rx_total_packets: %" PRIu64 "\n", link_status.stats.rx_total_packets);
1155 printf("[stat]: tx_total_bytes: %" PRIu64 "\n", link_status.stats.tx_total_bytes);
1156 printf("[stat]: rx_total_bytes: %" PRIu64 "\n", link_status.stats.rx_total_bytes);
1157 printf("[stat]: tx_total_errors: %" PRIu64 "\n", link_status.stats.tx_total_errors);
1158 printf("[stat]: tx_total_retries: %" PRIu64 "\n", link_status.stats.tx_total_retries);
1159
1160 printf("[stat]: tx_pmtu_errors: %" PRIu32 "\n", link_status.stats.tx_pmtu_errors);
1161 printf("[stat]: tx_pmtu_retries: %" PRIu32 "\n", link_status.stats.tx_pmtu_retries);
1162 printf("[stat]: tx_ping_errors: %" PRIu32 "\n", link_status.stats.tx_ping_errors);
1163 printf("[stat]: tx_ping_retries: %" PRIu32 "\n", link_status.stats.tx_ping_retries);
1164 printf("[stat]: tx_pong_errors: %" PRIu32 "\n", link_status.stats.tx_pong_errors);
1165 printf("[stat]: tx_pong_retries: %" PRIu32 "\n", link_status.stats.tx_pong_retries);
1166 printf("[stat]: tx_data_errors: %" PRIu32 "\n", link_status.stats.tx_data_errors);
1167 printf("[stat]: tx_data_retries: %" PRIu32 "\n", link_status.stats.tx_data_retries);
1168
1169 printf("[stat]: latency_min: %" PRIu32 "\n", link_status.stats.latency_min);
1170 printf("[stat]: latency_max: %" PRIu32 "\n", link_status.stats.latency_max);
1171 printf("[stat]: latency_ave: %" PRIu32 "\n", link_status.stats.latency_ave);
1172 printf("[stat]: latency_samples: %" PRIu32 "\n", link_status.stats.latency_samples);
1173
1174 printf("[stat]: down_count: %" PRIu32 "\n", link_status.stats.down_count);
1175 printf("[stat]: up_count: %" PRIu32 "\n", link_status.stats.up_count);
1176 }
1177 }
1178 }
1179 printf("\n");
1180 printf("[stat]: Total link stats\n");
1181 printf("[stat]: ----------------\n");
1182 printf("[stat]: tx_data_packets: %" PRIu64 "\n", total_link_stats.tx_data_packets);
1183 printf("[stat]: rx_data_packets: %" PRIu64 "\n", total_link_stats.rx_data_packets);
1184 printf("[stat]: tx_data_bytes: %" PRIu64 "\n", total_link_stats.tx_data_bytes);
1185 printf("[stat]: rx_data_bytes: %" PRIu64 "\n", total_link_stats.rx_data_bytes);
1186 printf("[stat]: rx_ping_packets: %" PRIu64 "\n", total_link_stats.rx_ping_packets);
1187 printf("[stat]: tx_ping_packets: %" PRIu64 "\n", total_link_stats.tx_ping_packets);
1188 printf("[stat]: rx_ping_bytes: %" PRIu64 "\n", total_link_stats.rx_ping_bytes);
1189 printf("[stat]: tx_ping_bytes: %" PRIu64 "\n", total_link_stats.tx_ping_bytes);
1190 printf("[stat]: rx_pong_packets: %" PRIu64 "\n", total_link_stats.rx_pong_packets);
1191 printf("[stat]: tx_pong_packets: %" PRIu64 "\n", total_link_stats.tx_pong_packets);
1192 printf("[stat]: rx_pong_bytes: %" PRIu64 "\n", total_link_stats.rx_pong_bytes);
1193 printf("[stat]: tx_pong_bytes: %" PRIu64 "\n", total_link_stats.tx_pong_bytes);
1194 printf("[stat]: rx_pmtu_packets: %" PRIu64 "\n", total_link_stats.rx_pmtu_packets);
1195 printf("[stat]: tx_pmtu_packets: %" PRIu64 "\n", total_link_stats.tx_pmtu_packets);
1196 printf("[stat]: rx_pmtu_bytes: %" PRIu64 "\n", total_link_stats.rx_pmtu_bytes);
1197 printf("[stat]: tx_pmtu_bytes: %" PRIu64 "\n", total_link_stats.tx_pmtu_bytes);
1198
1199 printf("[stat]: tx_total_packets: %" PRIu64 "\n", total_link_stats.tx_total_packets);
1200 printf("[stat]: rx_total_packets: %" PRIu64 "\n", total_link_stats.rx_total_packets);
1201 printf("[stat]: tx_total_bytes: %" PRIu64 "\n", total_link_stats.tx_total_bytes);
1202 printf("[stat]: rx_total_bytes: %" PRIu64 "\n", total_link_stats.rx_total_bytes);
1203 printf("[stat]: tx_total_errors: %" PRIu64 "\n", total_link_stats.tx_total_errors);
1204 printf("[stat]: tx_total_retries: %" PRIu64 "\n", total_link_stats.tx_total_retries);
1205
1206 printf("[stat]: tx_pmtu_errors: %" PRIu32 "\n", total_link_stats.tx_pmtu_errors);
1207 printf("[stat]: tx_pmtu_retries: %" PRIu32 "\n", total_link_stats.tx_pmtu_retries);
1208 printf("[stat]: tx_ping_errors: %" PRIu32 "\n", total_link_stats.tx_ping_errors);
1209 printf("[stat]: tx_ping_retries: %" PRIu32 "\n", total_link_stats.tx_ping_retries);
1210 printf("[stat]: tx_pong_errors: %" PRIu32 "\n", total_link_stats.tx_pong_errors);
1211 printf("[stat]: tx_pong_retries: %" PRIu32 "\n", total_link_stats.tx_pong_retries);
1212 printf("[stat]: tx_data_errors: %" PRIu32 "\n", total_link_stats.tx_data_errors);
1213 printf("[stat]: tx_data_retries: %" PRIu32 "\n", total_link_stats.tx_data_retries);
1214
1215 printf("[stat]: down_count: %" PRIu32 "\n", total_link_stats.down_count);
1216 printf("[stat]: up_count: %" PRIu32 "\n", total_link_stats.up_count);
1217
1218 }
1219
1220 static void send_perf_data_by_time(void)
1221 {
1222 char *tx_buf[PCKT_FRAG_MAX];
1223 struct knet_mmsghdr msg[PCKT_FRAG_MAX];
1224 struct iovec iov_out[PCKT_FRAG_MAX];
1225 char ctrl_message[16];
1226 int sent_msgs;
1227 unsigned int i;
1228 uint32_t packetsize = 64;
1229 struct timespec clock_start, clock_end;
1230 unsigned long long time_diff = 0;
1231
1232 setup_send_buffers_common(msg, iov_out, tx_buf);
1233
1234 memset(&clock_start, 0, sizeof(clock_start));
1235 memset(&clock_end, 0, sizeof(clock_start));
1236
1237 while (packetsize <= KNET_MAX_PACKET_SIZE) {
1238 if (force_packet_size) {
1239 packetsize = force_packet_size;
1240 }
1241 for (i = 0; i < PCKT_FRAG_MAX; i++) {
1242 iov_out[i].iov_len = packetsize;
1243 if (use_pckt_verification) {
1244 struct pckt_ver *tx_pckt = (struct pckt_ver *)iov_out[i].iov_base;
1245 tx_pckt->len = iov_out[i].iov_len;
1246 tx_pckt->chksum = compute_chsum((const unsigned char *)iov_out[i].iov_base + sizeof(struct pckt_ver), iov_out[i].iov_len - sizeof(struct pckt_ver));
1247 }
1248 }
1249 printf("[info]: testing with %u bytes packet size for %" PRIu64 " seconds.\n", packetsize, perf_by_time_secs);
1250
1251 memset(ctrl_message, 0, sizeof(ctrl_message));
1252 knet_send(knet_h, ctrl_message, TEST_START, channel);
1253
1254 if (clock_gettime(CLOCK_MONOTONIC, &clock_start) != 0) {
1255 printf("[info]: unable to get start time!\n");
1256 }
1257
1258 time_diff = 0;
1259
1260 while (time_diff < (perf_by_time_secs * 1000000000llu)) {
1261 sent_msgs = send_messages(&msg[0], PCKT_FRAG_MAX);
1262 if (sent_msgs < 0) {
1263 printf("Something went wrong, aborting\n");
1264 exit(FAIL);
1265 }
1266 if (clock_gettime(CLOCK_MONOTONIC, &clock_end) != 0) {
1267 printf("[info]: unable to get end time!\n");
1268 }
1269 timespec_diff(clock_start, clock_end, &time_diff);
1270 }
1271
1272 sleep(2);
1273
1274 knet_send(knet_h, ctrl_message, TEST_STOP, channel);
1275
1276 if ((packetsize == KNET_MAX_PACKET_SIZE) || (force_packet_size)) {
1277 break;
1278 }
1279
1280 /*
1281 * Use a multiplier that can always divide properly a GB
1282 * into smaller chunks without worry about boundaries
1283 */
1284 packetsize *= 4;
1285
1286 if (packetsize > KNET_MAX_PACKET_SIZE) {
1287 packetsize = KNET_MAX_PACKET_SIZE;
1288 }
1289 }
1290
1291 knet_send(knet_h, ctrl_message, TEST_COMPLETE, channel);
1292
1293 for (i = 0; i < PCKT_FRAG_MAX; i++) {
1294 free(tx_buf[i]);
1295 }
1296 }
1297
1298 static void cleanup_all(void)
1299 {
1300 knet_handle_t knet_h_tmp[2];
1301
|
(18) Event lock_acquire: |
Example 1: Calling "pthread_mutex_lock" acquires lock "shutdown_mutex". |
| Also see events: |
[missing_lock][example_access] |
1302 if (pthread_mutex_lock(&shutdown_mutex)) {
1303 return;
1304 }
1305
1306 if (bench_shutdown_in_progress) {
1307 pthread_mutex_unlock(&shutdown_mutex);
1308 return;
1309 }
1310
|
(19) Event example_access: |
Example 1 (cont.): "bench_shutdown_in_progress" is written to with lock "shutdown_mutex" held. |
| Also see events: |
[missing_lock][lock_acquire] |
1311 bench_shutdown_in_progress = 1;
1312
1313 pthread_mutex_unlock(&shutdown_mutex);
1314
1315 if (rx_thread) {
1316 stop_rx_thread();
1317 }
1318 knet_h_tmp[1] = knet_h;
1319 knet_handle_stop_everything(knet_h_tmp, 1);
1320 }
1321
1322 static void sigint_handler(int signum)
1323 {
1324 printf("[info]: cleaning up... got signal: %d\n", signum);
1325 cleanup_all();
1326 exit(PASS);
1327 }
1328
1329 int main(int argc, char *argv[])
1330 {
1331 if (signal(SIGINT, sigint_handler) == SIG_ERR) {
1332 printf("Unable to configure SIGINT handler\n");
1333 exit(FAIL);
1334 }
1335
1336 setup_knet(argc, argv);
1337
1338 setup_data_txrx_common();
1339
1340 sleep(5);
1341
1342 restart:
1343 switch(test_type) {
1344 default:
1345 case TEST_PING: /* basic ping, no data */
1346 sleep(5);
1347 break;
1348 case TEST_PING_AND_DATA:
1349 send_ping_data();
1350 break;
1351 case TEST_PERF_BY_SIZE:
1352 if (senderid == thisnodeid) {
1353 send_perf_data_by_size();
1354 } else {
1355 printf("[info]: waiting for perf rx thread to finish\n");
1356 while(!wait_for_perf_rx) {
1357 sleep(1);
1358 }
1359 }
1360 break;
1361 case TEST_PERF_BY_TIME:
1362 if (senderid == thisnodeid) {
1363 send_perf_data_by_time();
1364 } else {
1365 printf("[info]: waiting for perf rx thread to finish\n");
1366 while(!wait_for_perf_rx) {
1367 sleep(1);
1368 }
1369 }
1370 break;
1371 }
1372 if (continous) {
1373 goto restart;
1374 }
1375 if (show_stats) {
1376 display_stats(show_stats);
1377 }
1378
1379 cleanup_all();
1380
1381 return PASS;
1382 }
1383