1 /*
2 * Copyright (C) 2012-2025 Red Hat, Inc. All rights reserved.
3 *
4 * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
5 * Federico Simoncelli <fsimon@kronosnet.org>
6 *
7 * This software licensed under LGPL-2.0+
8 */
9
10 #include "config.h"
11
12 #include <stdio.h>
13 #include <string.h>
14 #include <errno.h>
15 #include <sys/uio.h>
16 #include <pthread.h>
17
18 #include "compat.h"
19 #include "compress.h"
20 #include "crypto.h"
21 #include "host.h"
22 #include "links.h"
23 #include "links_acl.h"
24 #include "logging.h"
25 #include "transports.h"
26 #include "transport_common.h"
27 #include "threads_common.h"
28 #include "threads_heartbeat.h"
29 #include "threads_rx.h"
30 #include "netutils.h"
31
32 /*
33 * RECV
34 */
35
36 /*
37 * return 1 if a > b
38 * return -1 if b > a
39 * return 0 if they are equal
40 */
41 static inline int timecmp(struct timespec a, struct timespec b)
42 {
43 if (a.tv_sec != b.tv_sec) {
44 if (a.tv_sec > b.tv_sec) {
45 return 1;
46 } else {
47 return -1;
48 }
49 } else {
50 if (a.tv_nsec > b.tv_nsec) {
51 return 1;
52 } else if (a.tv_nsec < b.tv_nsec) {
53 return -1;
54 } else {
55 return 0;
56 }
57 }
58 }
59
60 /*
61 * this functions needs to return an index (0 to 7)
62 * to a knet_host_defrag_buf. (-1 on errors)
63 */
64
65 static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
66 {
67 struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
68 int i, oldest;
69
70 /*
71 * check if there is a buffer already in use handling the same seq_num
72 */
73 for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
74 if (src_host->defrag_buf[i].in_use) {
75 if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
76 return i;
77 }
78 }
79 }
80
81 /*
82 * If there is no buffer that's handling the current seq_num
83 * either it's new or it's been reclaimed already.
84 * check if it's been reclaimed/seen before using the defrag circular
85 * buffer. If the pckt has been seen before, the buffer expired (ETIME)
86 * and there is no point to try to defrag it again.
87 */
88 if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
89 errno = ETIME;
90 return -1;
91 }
92
93 /*
94 * register the pckt as seen
95 */
96 _seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
97
98 /*
99 * see if there is a free buffer
100 */
101 for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
102 if (!src_host->defrag_buf[i].in_use) {
103 return i;
104 }
105 }
106
107 /*
108 * at this point, there are no free buffers, the pckt is new
109 * and we need to reclaim a buffer, and we will take the one
110 * with the oldest timestamp. It's as good as any.
111 */
112
113 oldest = 0;
114
115 for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
116 if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
117 oldest = i;
118 }
119 }
120 src_host->defrag_buf[oldest].in_use = 0;
121 return oldest;
122 }
123
124 static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
125 {
126 struct knet_host_defrag_buf *defrag_buf;
127 int defrag_buf_idx;
128
129 defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
130 if (defrag_buf_idx < 0) {
131 return 1;
132 }
133
134 defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
135
136 /*
137 * if the buf is not is use, then make sure it's clean
138 */
139 if (!defrag_buf->in_use) {
140 memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
141 defrag_buf->in_use = 1;
142 defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
143 }
144
145 /*
146 * update timestamp on the buffer
147 */
148 clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
149
150 /*
151 * check if we already received this fragment
152 */
153 if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
154 /*
155 * if we have received this fragment and we didn't clear the buffer
156 * it means that we don't have all fragments yet
157 */
158 return 1;
159 }
160
161 /*
162 * we need to handle the last packet with gloves due to its different size
163 */
164
165 if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
166 defrag_buf->last_frag_size = *len;
167
168 /*
169 * in the event when the last packet arrives first,
170 * we still don't know the offset vs the other fragments (based on MTU),
171 * so we store the fragment at the end of the buffer where it's safe
172 * and take a copy of the len so that we can restore its offset later.
173 * remember we can't use the local MTU for this calculation because pMTU
174 * can be asymettric between the same hosts.
175 */
176 if (!defrag_buf->frag_size) {
177 defrag_buf->last_first = 1;
178 memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
179 inbuf->khp_data_userdata,
180 *len);
181 }
182 } else {
183 defrag_buf->frag_size = *len;
184 }
185
186 if (defrag_buf->frag_size) {
187 memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
188 inbuf->khp_data_userdata, *len);
189 }
190
191 defrag_buf->frag_recv++;
192 defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
193
194 /*
195 * check if we received all the fragments
196 */
197 if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
198 /*
199 * special case the last pckt
200 */
201
202 if (defrag_buf->last_first) {
203 memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
204 defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
205 defrag_buf->last_frag_size);
206 }
207
208 /*
209 * recalculate packet lenght
210 */
211
212 *len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
213
214 /*
215 * copy the pckt back in the user data
216 */
217 memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
218
219 /*
220 * free this buffer
221 */
222 defrag_buf->in_use = 0;
223 return 0;
224 }
225
226 return 1;
227 }
228
229 /*
230 * processing incoming packets vs access lists
231 */
232 static int _check_rx_acl(knet_handle_t knet_h, struct knet_link *src_link, const struct knet_mmsghdr *msg)
233 {
234 if (knet_h->use_access_lists) {
235 if (!check_validate(knet_h, src_link, msg->msg_hdr.msg_name)) {
236 char src_ipaddr[KNET_MAX_HOST_LEN];
237 char src_port[KNET_MAX_PORT_LEN];
238
239 memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
240 memset(src_port, 0, KNET_MAX_PORT_LEN);
241 if (knet_addrtostr(msg->msg_hdr.msg_name, sockaddr_len(msg->msg_hdr.msg_name),
242 src_ipaddr, KNET_MAX_HOST_LEN,
243 src_port, KNET_MAX_PORT_LEN) < 0) {
244
245 log_warn(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port");
246 } else {
247 log_warn(knet_h, KNET_SUB_RX, "Packet rejected from %s:%s", src_ipaddr, src_port);
248 }
249 return 0;
250 }
251 }
252 return 1;
253 }
254
255 static int _fast_data_up(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link)
256 {
257 if (src_link->received_pong) {
258 log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received data during valid ping/pong activity. Force link up.", src_host->host_id, src_link->link_id);
259 _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
260 return 1;
261 }
262 // host is not eligible for fast data up
263 return 0;
264 }
265
266 static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
267 {
268 int err = 0, savederrno = 0, stats_err = 0;
269 ssize_t outlen;
270 struct knet_host *src_host;
271 struct knet_link *src_link;
272 unsigned long long latency_last;
273 knet_node_id_t dst_host_ids[KNET_MAX_HOST];
274 size_t dst_host_ids_entries = 0;
275 int bcast = 1;
276 uint64_t decrypt_time = 0;
277 struct timespec recvtime;
278 struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
279 unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
280 ssize_t len = msg->msg_len;
281 struct iovec iov_out[1];
282 int8_t channel;
283 seq_num_t recv_seq_num;
284 int wipe_bufs = 0;
285 int try_decrypt = 0, decrypted = 0, i, found_link = 0;
286
287 for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
288 if (knet_h->crypto_instance[i]) {
289 try_decrypt = 1;
290 break;
291 }
292 }
293
294 if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) {
295 log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!");
296 return;
297 }
298
299 if (try_decrypt) {
300 struct timespec start_time;
301 struct timespec end_time;
302
303 clock_gettime(CLOCK_MONOTONIC, &start_time);
304 if (crypto_authenticate_and_decrypt(knet_h,
305 (unsigned char *)inbuf,
306 len,
307 knet_h->recv_from_links_buf_decrypt,
308 &outlen) < 0) {
309 log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
310 if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
311 char src_ipaddr[KNET_MAX_HOST_LEN];
312 char src_port[KNET_MAX_PORT_LEN];
313
314 memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
315 memset(src_port, 0, KNET_MAX_PORT_LEN);
316 if (knet_addrtostr(msg->msg_hdr.msg_name, sockaddr_len(msg->msg_hdr.msg_name),
317 src_ipaddr, KNET_MAX_HOST_LEN,
318 src_port, KNET_MAX_PORT_LEN) < 0) {
319 log_err(knet_h, KNET_SUB_RX, "Unable to decrypt packet from unknown host/port (size %zu)!", len);
320 } else {
321 log_err(knet_h, KNET_SUB_RX, "Unable to decrypt packet from %s:%s (size %zu)!", src_ipaddr, src_port, len);
322 }
323 return;
324 }
325 log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data");
326 } else {
327 clock_gettime(CLOCK_MONOTONIC, &end_time);
328 timespec_diff(start_time, end_time, &decrypt_time);
329
330 len = outlen;
331 inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
332 decrypted = 1;
333 }
334 }
335
336 if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
337 log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
338 return;
339 }
340
341 if (inbuf->kh_version != KNET_HEADER_VERSION) {
342 log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
343 return;
344 }
345
346 inbuf->kh_node = ntohs(inbuf->kh_node);
347 src_host = knet_h->host_index[inbuf->kh_node];
348 if (src_host == NULL) { /* host not found */
349 log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
350 return;
351 }
352
353 if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
354 /* be aware this works only for PING / PONG and PMTUd packets! */
355 src_link = src_host->link +
356 (inbuf->khp_ping_link % KNET_MAX_LINK);
357 if (!_check_rx_acl(knet_h, src_link, msg)) {
358 return;
359 }
360 if (src_link->dynamic == KNET_LINK_DYNIP) {
361 if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) {
362 log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
363 src_host->host_id, src_link->link_id);
364 memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage));
365 if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
366 src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
367 src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
368 log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
369 snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
370 snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
371 } else {
372 log_info(knet_h, KNET_SUB_RX,
373 "host: %u link: %u new connection established from: %s:%s",
374 src_host->host_id, src_link->link_id,
375 src_link->status.dst_ipaddr, src_link->status.dst_port);
376 }
377 }
378 /*
379 * transport has already accepted the connection here
380 * otherwise we would not be receiving packets
381 */
382 transport_link_dyn_connect(knet_h, sockfd, src_link);
383 }
384 } else { /* data packet */
385 for (i = 0; i < KNET_MAX_LINK; i++) {
386 src_link = &src_host->link[i];
387 if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) {
388 found_link = 1;
389 break;
390 }
391 }
392 if (found_link) {
393 /*
394 * this check is currently redundant.. Keep it here for now
395 */
396 if (!_check_rx_acl(knet_h, src_link, msg)) {
397 return;
398 }
399 } else {
400 log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet.");
401 return;
402 }
403 }
404
|
(20) Event lock_acquire: |
Example 1: Calling "pthread_mutex_lock" acquires lock "knet_link.link_stats_mutex". |
| Also see events: |
[missing_lock][example_access] |
405 stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
406 if (stats_err) {
407 log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
408 src_host->host_id, src_link->link_id, strerror(savederrno));
409 return;
410 }
411
412 switch (inbuf->kh_type) {
413 case KNET_HEADER_TYPE_DATA:
414
415 /* data stats at the top for consistency with TX */
416 src_link->status.stats.rx_data_packets++;
417 src_link->status.stats.rx_data_bytes += len;
418
419 if (decrypted) {
420 stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
421 if (stats_err < 0) {
422 pthread_mutex_unlock(&src_link->link_stats_mutex);
423 log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
424 return;
425 }
426 /* Only update the crypto overhead for data packets. Mainly to be
427 consistent with TX */
428 if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
429 knet_h->stats.rx_crypt_time_min = decrypt_time;
430 }
431 if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
432 knet_h->stats.rx_crypt_time_max = decrypt_time;
433 }
434 knet_h->stats.rx_crypt_time_ave =
435 (knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
436 decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
437 knet_h->stats.rx_crypt_packets++;
438 pthread_mutex_unlock(&knet_h->handle_stats_mutex);
439 }
440
441 if (!src_host->status.reachable) {
442 if (!_fast_data_up(knet_h, src_host, src_link)) {
443 pthread_mutex_unlock(&src_link->link_stats_mutex);
444 log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
445 return;
446 }
447 }
448
449 inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
450 channel = inbuf->khp_data_channel;
451 src_host->got_data = 1;
452
453 if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
454 pthread_mutex_unlock(&src_link->link_stats_mutex);
455 if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
456 log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
457 }
458 return;
459 }
460
461 if (inbuf->khp_data_frag_num > 1) {
462 /*
463 * len as received from the socket also includes extra stuff
464 * that the defrag code doesn't care about. So strip it
465 * here and readd only for repadding once we are done
466 * defragging
467 */
468 len = len - KNET_HEADER_DATA_SIZE;
469 if (pckt_defrag(knet_h, inbuf, &len)) {
470 pthread_mutex_unlock(&src_link->link_stats_mutex);
471 return;
472 }
473 len = len + KNET_HEADER_DATA_SIZE;
474 }
475
476 if (inbuf->khp_data_compress) {
477 ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
478 struct timespec start_time;
479 struct timespec end_time;
480 uint64_t compress_time;
481
482 clock_gettime(CLOCK_MONOTONIC, &start_time);
483 err = decompress(knet_h, inbuf->khp_data_compress,
484 (const unsigned char *)inbuf->khp_data_userdata,
485 len - KNET_HEADER_DATA_SIZE,
486 knet_h->recv_from_links_buf_decompress,
487 &decmp_outlen);
488
489 stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
490 if (stats_err < 0) {
491 pthread_mutex_unlock(&src_link->link_stats_mutex);
492 log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
493 return;
494 }
495
496 clock_gettime(CLOCK_MONOTONIC, &end_time);
497 timespec_diff(start_time, end_time, &compress_time);
498
499 if (!err) {
500 /* Collect stats */
501 if (compress_time < knet_h->stats.rx_compress_time_min) {
502 knet_h->stats.rx_compress_time_min = compress_time;
503 }
504 if (compress_time > knet_h->stats.rx_compress_time_max) {
505 knet_h->stats.rx_compress_time_max = compress_time;
506 }
507 knet_h->stats.rx_compress_time_ave =
508 (knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
509 compress_time) / (knet_h->stats.rx_compressed_packets+1);
510
511 knet_h->stats.rx_compressed_packets++;
512 knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
513 knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
514
515 memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
516 len = decmp_outlen + KNET_HEADER_DATA_SIZE;
517 } else {
518 pthread_mutex_unlock(&knet_h->handle_stats_mutex);
519 pthread_mutex_unlock(&src_link->link_stats_mutex);
520 log_err(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
521 err, strerror(errno));
522 return;
523 }
524 pthread_mutex_unlock(&knet_h->handle_stats_mutex);
525 }
526
527 if (knet_h->enabled != 1) /* data forward is disabled */
528 break;
529
530 if (knet_h->dst_host_filter_fn) {
531 size_t host_idx;
532 int found = 0;
533
534 bcast = knet_h->dst_host_filter_fn(
535 knet_h->dst_host_filter_fn_private_data,
536 (const unsigned char *)inbuf->khp_data_userdata,
537 len - KNET_HEADER_DATA_SIZE,
538 KNET_NOTIFY_RX,
539 knet_h->host_id,
540 inbuf->kh_node,
541 &channel,
542 dst_host_ids,
543 &dst_host_ids_entries);
544 if (bcast < 0) {
545 pthread_mutex_unlock(&src_link->link_stats_mutex);
546 log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
547 return;
548 }
549
550 if ((!bcast) && (!dst_host_ids_entries)) {
551 pthread_mutex_unlock(&src_link->link_stats_mutex);
552 log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
553 return;
554 }
555
556 /* check if we are dst for this packet */
557 if (!bcast) {
558 if (dst_host_ids_entries > KNET_MAX_HOST) {
559 pthread_mutex_unlock(&src_link->link_stats_mutex);
560 log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
561 return;
562 }
563 for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
564 if (dst_host_ids[host_idx] == knet_h->host_id) {
565 found = 1;
566 break;
567 }
568 }
569 if (!found) {
570 pthread_mutex_unlock(&src_link->link_stats_mutex);
571 log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
572 return;
573 }
574 }
575 }
576
577 if (!knet_h->sockfd[channel].in_use) {
578 pthread_mutex_unlock(&src_link->link_stats_mutex);
579 log_debug(knet_h, KNET_SUB_RX,
580 "received packet for channel %d but there is no local sock connected",
581 channel);
582 return;
583 }
584
585 outlen = 0;
586 memset(iov_out, 0, sizeof(iov_out));
587
588 retry:
589 iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
590 iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
591
592 outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
593 if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
594 log_debug(knet_h, KNET_SUB_RX,
595 "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
596 iov_out[0].iov_len, outlen);
597 goto retry;
598 }
599
600 if (outlen <= 0) {
601 knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
602 knet_h->sockfd[channel].sockfd[0],
603 channel,
604 KNET_NOTIFY_RX,
605 outlen,
606 errno);
607 pthread_mutex_unlock(&src_link->link_stats_mutex);
608 return;
609 }
610 if ((size_t)outlen == iov_out[0].iov_len) {
611 _seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
612 }
613 break;
614 case KNET_HEADER_TYPE_PING:
615 outlen = KNET_HEADER_PING_SIZE;
616 inbuf->kh_type = KNET_HEADER_TYPE_PONG;
617 inbuf->kh_node = htons(knet_h->host_id);
618 recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
619 src_link->status.stats.rx_ping_packets++;
620 src_link->status.stats.rx_ping_bytes += len;
621
622 wipe_bufs = 0;
623
624 if (!inbuf->khp_ping_timed) {
625 /*
626 * we might be receiving this message from all links, but we want
627 * to process it only the first time
628 */
629 if (recv_seq_num != src_host->untimed_rx_seq_num) {
630 /*
631 * cache the untimed seq num
632 */
633 src_host->untimed_rx_seq_num = recv_seq_num;
634 /*
635 * if the host has received data in between
636 * untimed ping, then we don't need to wipe the bufs
637 */
638 if (src_host->got_data) {
639 src_host->got_data = 0;
640 wipe_bufs = 0;
641 } else {
642 wipe_bufs = 1;
643 }
644 }
645 _seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
646 } else {
647 /*
648 * pings always arrives in bursts over all the link
649 * catch the first of them to cache the seq num and
650 * avoid duplicate processing
651 */
652 if (recv_seq_num != src_host->timed_rx_seq_num) {
653 src_host->timed_rx_seq_num = recv_seq_num;
654
655 if (recv_seq_num == 0) {
656 _seq_num_lookup(src_host, recv_seq_num, 0, 1);
657 }
658 }
659 }
660
661 if (knet_h->crypto_in_use_config) {
662 if (crypto_encrypt_and_sign(knet_h,
663 (const unsigned char *)inbuf,
664 outlen,
665 knet_h->recv_from_links_buf_crypt,
666 &outlen) < 0) {
667 log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
668 break;
669 }
670 outbuf = knet_h->recv_from_links_buf_crypt;
671 stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
672 if (stats_err < 0) {
673 log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
674 break;
675 }
676 knet_h->stats_extra.tx_crypt_pong_packets++;
677 pthread_mutex_unlock(&knet_h->handle_stats_mutex);
678 }
679
680 retry_pong:
681 if (src_link->transport_connected) {
682 if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
683 len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
684 (struct sockaddr *) &src_link->dst_addr, knet_h->knet_transport_fd_tracker[src_link->outsock].sockaddr_len);
685 } else {
686 len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
687 }
688 savederrno = errno;
689 if (len != outlen) {
690 err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, KNET_SUB_RX, len, savederrno);
691 switch(err) {
692 case -1: /* unrecoverable error */
693 log_debug(knet_h, KNET_SUB_RX,
694 "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
695 src_link->outsock, errno, strerror(errno),
696 src_link->status.src_ipaddr, src_link->status.src_port,
697 src_link->status.dst_ipaddr, src_link->status.dst_port);
698 src_link->status.stats.tx_pong_errors++;
699 break;
700 case 0: /* ignore error and continue */
701 break;
702 case 1: /* retry to send those same data */
703 src_link->status.stats.tx_pong_retries++;
704 goto retry_pong;
705 break;
706 }
707 }
708 src_link->status.stats.tx_pong_packets++;
709 src_link->status.stats.tx_pong_bytes += outlen;
710 }
711 break;
712 case KNET_HEADER_TYPE_PONG:
713 src_link->status.stats.rx_pong_packets++;
714 src_link->status.stats.rx_pong_bytes += len;
715 clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
716
717 memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
718 timespec_diff(recvtime,
719 src_link->status.pong_last, &latency_last);
720
721 if ((latency_last / 1000llu) > src_link->pong_timeout) {
722 log_debug(knet_h, KNET_SUB_RX,
723 "Incoming pong packet from host: %u link: %u has higher latency than pong_timeout. Discarding",
724 src_host->host_id, src_link->link_id);
725 } else {
726
727 /*
728 * in words : ('previous mean' * '(count -1)') + 'new value') / 'count'
729 */
730
731 src_link->latency_cur_samples++;
732
733 /*
734 * limit to max_samples (precision)
735 */
736 if (src_link->latency_cur_samples >= src_link->latency_max_samples) {
737 src_link->latency_cur_samples = src_link->latency_max_samples;
738 }
|
(21) Event example_access: |
Example 1 (cont.): "knet_link_status.latency" is written to with lock "knet_link.link_stats_mutex" held. |
| Also see events: |
[missing_lock][lock_acquire] |
739 src_link->status.latency =
740 (((src_link->status.latency * (src_link->latency_cur_samples - 1)) + (latency_last / 1000llu)) / src_link->latency_cur_samples);
741
742 if (src_link->status.latency < src_link->pong_timeout_adj) {
743 if (!src_link->status.connected) {
744 if (src_link->received_pong >= src_link->pong_count) {
745 log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
746 src_host->host_id, src_link->link_id);
747 _link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
748 } else {
749 src_link->received_pong++;
750 log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
751 src_host->host_id, src_link->link_id, src_link->received_pong);
752 }
753 }
754 }
755 /* Calculate latency stats */
756 if (src_link->status.latency > src_link->status.stats.latency_max) {
757 src_link->status.stats.latency_max = src_link->status.latency;
758 }
759 if (src_link->status.latency < src_link->status.stats.latency_min) {
760 src_link->status.stats.latency_min = src_link->status.latency;
761 }
762
763 /*
764 * those 2 lines below make all latency average calculations consistent and capped to
765 * link precision. In future we will kill the one above to keep only this one in
766 * the stats structure, but for now we leave it around to avoid API/ABI
767 * breakage as we backport the fixes to stable
768 */
769 src_link->status.stats.latency_ave = src_link->status.latency;
770 src_link->status.stats.latency_samples = src_link->latency_cur_samples;
771 }
772 break;
773 case KNET_HEADER_TYPE_PMTUD:
774 src_link->status.stats.rx_pmtu_packets++;
775 src_link->status.stats.rx_pmtu_bytes += len;
776 outlen = KNET_HEADER_PMTUD_SIZE;
777 inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
778 inbuf->kh_node = htons(knet_h->host_id);
779
780 if (knet_h->crypto_in_use_config) {
781 if (crypto_encrypt_and_sign(knet_h,
782 (const unsigned char *)inbuf,
783 outlen,
784 knet_h->recv_from_links_buf_crypt,
785 &outlen) < 0) {
786 log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
787 break;
788 }
789 outbuf = knet_h->recv_from_links_buf_crypt;
790 stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
791 if (stats_err < 0) {
792 log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
793 break;
794 }
795 knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
796 pthread_mutex_unlock(&knet_h->handle_stats_mutex);
797 }
798
799 /* Unlock so we don't deadlock with tx_mutex */
800 pthread_mutex_unlock(&src_link->link_stats_mutex);
801
802 savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
803 if (savederrno) {
804 log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
805 goto out_pmtud;
806 }
807 retry_pmtud:
808 if (src_link->transport_connected) {
809 if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
810 len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
811 (struct sockaddr *) &src_link->dst_addr, knet_h->knet_transport_fd_tracker[src_link->outsock].sockaddr_len);
812 } else {
813 len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
814 }
815 savederrno = errno;
816 if (len != outlen) {
817 err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, KNET_SUB_RX, len, savederrno);
818 stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
819 if (stats_err < 0) {
820 log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
821 break;
822 }
823 switch(err) {
824 case -1: /* unrecoverable error */
825 log_debug(knet_h, KNET_SUB_RX,
826 "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
827 src_link->outsock, errno, strerror(errno),
828 src_link->status.src_ipaddr, src_link->status.src_port,
829 src_link->status.dst_ipaddr, src_link->status.dst_port);
830
831 src_link->status.stats.tx_pmtu_errors++;
832 break;
833 case 0: /* ignore error and continue */
834 src_link->status.stats.tx_pmtu_errors++;
835 break;
836 case 1: /* retry to send those same data */
837 src_link->status.stats.tx_pmtu_retries++;
838 pthread_mutex_unlock(&src_link->link_stats_mutex);
839 goto retry_pmtud;
840 break;
841 }
842 pthread_mutex_unlock(&src_link->link_stats_mutex);
843 }
844 }
845 pthread_mutex_unlock(&knet_h->tx_mutex);
846 out_pmtud:
847 return; /* Don't need to unlock link_stats_mutex */
848 case KNET_HEADER_TYPE_PMTUD_REPLY:
849 src_link->status.stats.rx_pmtu_packets++;
850 src_link->status.stats.rx_pmtu_bytes += len;
851
852 /* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
853 pthread_mutex_unlock(&src_link->link_stats_mutex);
854
855 if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
856 log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
857 break;
858 }
859 src_link->last_recv_mtu = inbuf->khp_pmtud_size;
860 pthread_cond_signal(&knet_h->pmtud_cond);
861 pthread_mutex_unlock(&knet_h->pmtud_mutex);
862 return;
863 default:
864 pthread_mutex_unlock(&src_link->link_stats_mutex);
865 return;
866 }
867 pthread_mutex_unlock(&src_link->link_stats_mutex);
868 }
869
870 static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
871 {
872 int err, savederrno;
873 int i, msg_recv, transport;
874
875 if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
876 log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
877 return;
878 }
879
880 if (_is_valid_fd(knet_h, sockfd) < 1) {
881 /*
882 * this is normal if a fd got an event and before we grab the read lock
883 * and the link is removed by another thread
884 */
885 goto exit_unlock;
886 }
887
888 transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
889
890 /*
891 * reset msg_namelen to buffer size because after recvmmsg
892 * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
893 */
894
895 for (i = 0; i < PCKT_RX_BUFS; i++) {
896 msg[i].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len;
897 }
898
899 msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
900 savederrno = errno;
901
902 /*
903 * WARNING: man page for recvmmsg is wrong. Kernel implementation here:
904 * recvmmsg can return:
905 * -1 on error
906 * 0 if the previous run of recvmmsg recorded an error on the socket
907 * N number of messages (see exception below).
908 *
909 * If there is an error from recvmsg after receiving a frame or more, the recvmmsg
910 * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
911 * it will be visibile in the next run.
912 *
913 * Need to be careful how we handle errors at this stage.
914 *
915 * error messages need to be handled on a per transport/protocol base
916 * at this point we have different layers of error handling
917 * - msg_recv < 0 -> error from this run
918 * msg_recv = 0 -> error from previous run and error on socket needs to be cleared
919 * - per-transport message data
920 * example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
921 * but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
922 * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
923 * errno set. That means the error api needs to be able to abort the loop below.
924 */
925
926 if (msg_recv <= 0) {
927 transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
928 goto exit_unlock;
929 }
930
931 for (i = 0; i < msg_recv; i++) {
932 err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
933
934 /*
935 * TODO: make this section silent once we are confident
936 * all protocols packet handlers are good
937 */
938
939 switch(err) {
940 case KNET_TRANSPORT_RX_ERROR: /* on error */
941 log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
942 goto exit_unlock;
943 break;
944 case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */
945 log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
946 break;
947 case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */
948 log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
949 goto exit_unlock;
950 break;
951 case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */
952 _parse_recv_from_links(knet_h, sockfd, &msg[i]);
953 break;
954 case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE:
955 log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue");
956 break;
957 case KNET_TRANSPORT_RX_OOB_DATA_STOP:
958 log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop");
959 goto exit_unlock;
960 break;
961 }
962 }
963
964 exit_unlock:
965 pthread_rwlock_unlock(&knet_h->global_rwlock);
966 }
967
968 void *_handle_recv_from_links_thread(void *data)
969 {
970 int i, nev;
971 knet_handle_t knet_h = (knet_handle_t) data;
972 struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
973 struct sockaddr_storage address[PCKT_RX_BUFS];
974 struct knet_mmsghdr msg[PCKT_RX_BUFS];
975 struct iovec iov_in[PCKT_RX_BUFS];
976 #if defined(IP_PKTINFO) || defined(IPV6_PKTINFO)
977 unsigned char control_in[PCKT_RX_BUFS][CMSG_SPACE(sizeof(struct in6_pktinfo))];
978 #endif
979
980 set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
981
982 memset(&msg, 0, sizeof(msg));
983 memset(&events, 0, sizeof(events));
984
985 for (i = 0; i < PCKT_RX_BUFS; i++) {
986 iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
987 iov_in[i].iov_len = KNET_DATABUFSIZE;
988
989 memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
990
991 msg[i].msg_hdr.msg_name = &address[i];
992 msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* Real value filled in before actual use */
993 msg[i].msg_hdr.msg_iov = &iov_in[i];
994 msg[i].msg_hdr.msg_iovlen = 1;
995 #if defined(IP_PKTINFO) || defined(IPV6_PKTINFO)
996 msg[i].msg_hdr.msg_control = &control_in[i][0];
997 msg[i].msg_hdr.msg_controllen = CMSG_SPACE(sizeof(struct in6_pktinfo)); /* Largest of the two pktinfo structs */
998 #endif
999 }
1000
1001 while (!shutdown_in_progress(knet_h)) {
1002 nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
1003
1004 /*
1005 * the RX threads only need to notify that there has been at least
1006 * one successful run after queue flush has been requested.
1007 * See setfwd in handle.c
1008 */
1009 if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
1010 set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
1011 }
1012
1013 /*
1014 * we use timeout to detect if thread is shutting down
1015 */
1016 if (nev == 0) {
1017 continue;
1018 }
1019
1020 for (i = 0; i < nev; i++) {
1021 _handle_recv_from_links(knet_h, events[i].data.fd, msg);
1022 }
1023 }
1024
1025 set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED);
1026
1027 return NULL;
1028 }
1029
1030 ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
1031 {
1032 int savederrno = 0;
1033 ssize_t err = 0;
1034 struct iovec iov_in;
1035
1036 if (!_is_valid_handle(knet_h)) {
1037 return -1;
1038 }
1039
1040 if (buff == NULL) {
1041 errno = EINVAL;
1042 return -1;
1043 }
1044
1045 if (buff_len <= 0) {
1046 errno = EINVAL;
1047 return -1;
1048 }
1049
1050 if (buff_len > KNET_MAX_PACKET_SIZE) {
1051 errno = EINVAL;
1052 return -1;
1053 }
1054
1055 if (channel < 0) {
1056 errno = EINVAL;
1057 return -1;
1058 }
1059
1060 if (channel >= KNET_DATAFD_MAX) {
1061 errno = EINVAL;
1062 return -1;
1063 }
1064
1065 savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
1066 if (savederrno) {
1067 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
1068 strerror(savederrno));
1069 errno = savederrno;
1070 return -1;
1071 }
1072
1073 if (!knet_h->sockfd[channel].in_use) {
1074 savederrno = EINVAL;
1075 err = -1;
1076 goto out_unlock;
1077 }
1078
1079 memset(&iov_in, 0, sizeof(iov_in));
1080 iov_in.iov_base = (void *)buff;
1081 iov_in.iov_len = buff_len;
1082
1083 err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
1084 savederrno = errno;
1085
1086 out_unlock:
1087 pthread_rwlock_unlock(&knet_h->global_rwlock);
1088 errno = err ? savederrno : 0;
1089 return err;
1090 }
1091