1    	/*
2    	 * Copyright (C) 2012-2025 Red Hat, Inc.  All rights reserved.
3    	 *
4    	 * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
5    	 *          Federico Simoncelli <fsimon@kronosnet.org>
6    	 *
7    	 * This software licensed under LGPL-2.0+
8    	 */
9    	
10   	#include "config.h"
11   	
12   	#include <stdio.h>
13   	#include <string.h>
14   	#include <errno.h>
15   	#include <sys/uio.h>
16   	#include <pthread.h>
17   	
18   	#include "compat.h"
19   	#include "compress.h"
20   	#include "crypto.h"
21   	#include "host.h"
22   	#include "links.h"
23   	#include "links_acl.h"
24   	#include "logging.h"
25   	#include "transports.h"
26   	#include "transport_common.h"
27   	#include "threads_common.h"
28   	#include "threads_heartbeat.h"
29   	#include "threads_rx.h"
30   	#include "netutils.h"
31   	
32   	/*
33   	 * RECV
34   	 */
35   	
36   	/*
37   	 *  return 1 if a > b
38   	 *  return -1 if b > a
39   	 *  return 0 if they are equal
40   	 */
41   	static inline int timecmp(struct timespec a, struct timespec b)
42   	{
43   		if (a.tv_sec != b.tv_sec) {
44   			if (a.tv_sec > b.tv_sec) {
45   				return 1;
46   			} else {
47   				return -1;
48   			}
49   		} else {
50   			if (a.tv_nsec > b.tv_nsec) {
51   				return 1;
52   			} else if (a.tv_nsec < b.tv_nsec) {
53   				return -1;
54   			} else {
55   				return 0;
56   			}
57   		}
58   	}
59   	
60   	/*
61   	 * this functions needs to return an index (0 to 7)
62   	 * to a knet_host_defrag_buf. (-1 on errors)
63   	 */
64   	
65   	static int find_pckt_defrag_buf(knet_handle_t knet_h, struct knet_header *inbuf)
66   	{
67   		struct knet_host *src_host = knet_h->host_index[inbuf->kh_node];
68   		int i, oldest;
69   	
70   		/*
71   		 * check if there is a buffer already in use handling the same seq_num
72   		 */
73   		for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
74   			if (src_host->defrag_buf[i].in_use) {
75   				if (src_host->defrag_buf[i].pckt_seq == inbuf->khp_data_seq_num) {
76   					return i;
77   				}
78   			}
79   		}
80   	
81   		/*
82   		 * If there is no buffer that's handling the current seq_num
83   		 * either it's new or it's been reclaimed already.
84   		 * check if it's been reclaimed/seen before using the defrag circular
85   		 * buffer. If the pckt has been seen before, the buffer expired (ETIME)
86   		 * and there is no point to try to defrag it again.
87   		 */
88   		if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 1, 0)) {
89   			errno = ETIME;
90   			return -1;
91   		}
92   	
93   		/*
94   		 * register the pckt as seen
95   		 */
96   		_seq_num_set(src_host, inbuf->khp_data_seq_num, 1);
97   	
98   		/*
99   		 * see if there is a free buffer
100  		 */
101  		for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
102  			if (!src_host->defrag_buf[i].in_use) {
103  				return i;
104  			}
105  		}
106  	
107  		/*
108  		 * at this point, there are no free buffers, the pckt is new
109  		 * and we need to reclaim a buffer, and we will take the one
110  		 * with the oldest timestamp. It's as good as any.
111  		 */
112  	
113  		oldest = 0;
114  	
115  		for (i = 0; i < KNET_DEFRAG_BUFFERS; i++) {
116  			if (timecmp(src_host->defrag_buf[i].last_update, src_host->defrag_buf[oldest].last_update) < 0) {
117  				oldest = i;
118  			}
119  		}
120  		src_host->defrag_buf[oldest].in_use = 0;
121  		return oldest;
122  	}
123  	
124  	static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t *len)
125  	{
126  		struct knet_host_defrag_buf *defrag_buf;
127  		int defrag_buf_idx;
128  	
129  		defrag_buf_idx = find_pckt_defrag_buf(knet_h, inbuf);
130  		if (defrag_buf_idx < 0) {
131  			return 1;
132  		}
133  	
134  		defrag_buf = &knet_h->host_index[inbuf->kh_node]->defrag_buf[defrag_buf_idx];
135  	
136  		/*
137  		 * if the buf is not is use, then make sure it's clean
138  		 */
139  		if (!defrag_buf->in_use) {
140  			memset(defrag_buf, 0, sizeof(struct knet_host_defrag_buf));
141  			defrag_buf->in_use = 1;
142  			defrag_buf->pckt_seq = inbuf->khp_data_seq_num;
143  		}
144  	
145  		/*
146  		 * update timestamp on the buffer
147  		 */
148  		clock_gettime(CLOCK_MONOTONIC, &defrag_buf->last_update);
149  	
150  		/*
151  		 * check if we already received this fragment
152  		 */
153  		if (defrag_buf->frag_map[inbuf->khp_data_frag_seq]) {
154  			/*
155  			 * if we have received this fragment and we didn't clear the buffer
156  			 * it means that we don't have all fragments yet
157  			 */
158  			return 1;
159  		}
160  	
161  		/*
162  		 *  we need to handle the last packet with gloves due to its different size
163  		 */
164  	
165  		if (inbuf->khp_data_frag_seq == inbuf->khp_data_frag_num) {
166  			defrag_buf->last_frag_size = *len;
167  	
168  			/*
169  			 * in the event when the last packet arrives first,
170  			 * we still don't know the offset vs the other fragments (based on MTU),
171  			 * so we store the fragment at the end of the buffer where it's safe
172  			 * and take a copy of the len so that we can restore its offset later.
173  			 * remember we can't use the local MTU for this calculation because pMTU
174  			 * can be asymettric between the same hosts.
175  			 */
176  			if (!defrag_buf->frag_size) {
177  				defrag_buf->last_first = 1;
178  				memmove(defrag_buf->buf + (KNET_MAX_PACKET_SIZE - *len),
179  				       inbuf->khp_data_userdata,
180  				       *len);
181  			}
182  		} else {
183  			defrag_buf->frag_size = *len;
184  		}
185  	
186  		if (defrag_buf->frag_size) {
187  			memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size),
188  			       inbuf->khp_data_userdata, *len);
189  		}
190  	
191  		defrag_buf->frag_recv++;
192  		defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1;
193  	
194  		/*
195  		 * check if we received all the fragments
196  		 */
197  		if (defrag_buf->frag_recv == inbuf->khp_data_frag_num) {
198  			/*
199  			 * special case the last pckt
200  			 */
201  	
202  			if (defrag_buf->last_first) {
203  				memmove(defrag_buf->buf + ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size),
204  				        defrag_buf->buf + (KNET_MAX_PACKET_SIZE - defrag_buf->last_frag_size),
205  					defrag_buf->last_frag_size);
206  			}
207  	
208  			/*
209  			 * recalculate packet lenght
210  			 */
211  	
212  			*len = ((inbuf->khp_data_frag_num - 1) * defrag_buf->frag_size) + defrag_buf->last_frag_size;
213  	
214  			/*
215  			 * copy the pckt back in the user data
216  			 */
217  			memmove(inbuf->khp_data_userdata, defrag_buf->buf, *len);
218  	
219  			/*
220  			 * free this buffer
221  			 */
222  			defrag_buf->in_use = 0;
223  			return 0;
224  		}
225  	
226  		return 1;
227  	}
228  	
229  	/*
230  	 * processing incoming packets vs access lists
231  	 */
232  	static int _check_rx_acl(knet_handle_t knet_h, struct knet_link *src_link, const struct knet_mmsghdr *msg)
233  	{
234  		if (knet_h->use_access_lists) {
235  			if (!check_validate(knet_h, src_link, msg->msg_hdr.msg_name)) {
236  				char src_ipaddr[KNET_MAX_HOST_LEN];
237  				char src_port[KNET_MAX_PORT_LEN];
238  				
239  				memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
240  				memset(src_port, 0, KNET_MAX_PORT_LEN);
241  				if (knet_addrtostr(msg->msg_hdr.msg_name, sockaddr_len(msg->msg_hdr.msg_name),
242  						   src_ipaddr, KNET_MAX_HOST_LEN,
243  						   src_port, KNET_MAX_PORT_LEN) < 0) {
244  	
245  					log_warn(knet_h, KNET_SUB_RX, "Packet rejected: unable to resolve host/port");
246  				} else {
247  					log_warn(knet_h, KNET_SUB_RX, "Packet rejected from %s:%s", src_ipaddr, src_port);
248  				}
249  				return 0;
250  			}
251  		}
252  		return 1;
253  	}
254  	
255  	static int _fast_data_up(knet_handle_t knet_h, struct knet_host *src_host, struct knet_link *src_link)
256  	{
257  		if (src_link->received_pong) {
258  			log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received data during valid ping/pong activity. Force link up.", src_host->host_id, src_link->link_id);
259  			_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
260  			return 1;
261  		}
262  		// host is not eligible for fast data up
263  		return 0;
264  	}
265  	
266  	static void _parse_recv_from_links(knet_handle_t knet_h, int sockfd, const struct knet_mmsghdr *msg)
267  	{
268  		int err = 0, savederrno = 0, stats_err = 0;
269  		ssize_t outlen;
270  		struct knet_host *src_host;
271  		struct knet_link *src_link;
272  		unsigned long long latency_last;
273  		knet_node_id_t dst_host_ids[KNET_MAX_HOST];
274  		size_t dst_host_ids_entries = 0;
275  		int bcast = 1;
276  		uint64_t decrypt_time = 0;
277  		struct timespec recvtime;
278  		struct knet_header *inbuf = msg->msg_hdr.msg_iov->iov_base;
279  		unsigned char *outbuf = (unsigned char *)msg->msg_hdr.msg_iov->iov_base;
280  		ssize_t len = msg->msg_len;
281  		struct iovec iov_out[1];
282  		int8_t channel;
283  		seq_num_t recv_seq_num;
284  		int wipe_bufs = 0;
285  		int try_decrypt = 0, decrypted = 0, i, found_link = 0;
286  	
287  		for (i = 1; i <= KNET_MAX_CRYPTO_INSTANCES; i++) {
288  			if (knet_h->crypto_instance[i]) {
289  				try_decrypt = 1;
290  				break;
291  			}
292  		}
293  	
294  		if ((!try_decrypt) && (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC)) {
295  			log_debug(knet_h, KNET_SUB_RX, "RX thread configured to accept only crypto packets, but no crypto configs are configured!");
296  			return;
297  		}
298  	
299  		if (try_decrypt) {
300  			struct timespec start_time;
301  			struct timespec end_time;
302  	
303  			clock_gettime(CLOCK_MONOTONIC, &start_time);
304  			if (crypto_authenticate_and_decrypt(knet_h,
305  							    (unsigned char *)inbuf,
306  							    len,
307  							    knet_h->recv_from_links_buf_decrypt,
308  							    &outlen) < 0) {
309  				log_debug(knet_h, KNET_SUB_RX, "Unable to decrypt/auth packet");
310  				if (knet_h->crypto_only == KNET_CRYPTO_RX_DISALLOW_CLEAR_TRAFFIC) {
311  					char src_ipaddr[KNET_MAX_HOST_LEN];
312  					char src_port[KNET_MAX_PORT_LEN];
313  	
314  					memset(src_ipaddr, 0, KNET_MAX_HOST_LEN);
315  					memset(src_port, 0, KNET_MAX_PORT_LEN);
316  					if (knet_addrtostr(msg->msg_hdr.msg_name, sockaddr_len(msg->msg_hdr.msg_name),
317  							   src_ipaddr, KNET_MAX_HOST_LEN,
318  							   src_port, KNET_MAX_PORT_LEN) < 0) {
319  						log_err(knet_h, KNET_SUB_RX, "Unable to decrypt packet from unknown host/port (size %zu)!", len);
320  					} else {
321  						log_err(knet_h, KNET_SUB_RX, "Unable to decrypt packet from %s:%s (size %zu)!", src_ipaddr, src_port, len);
322  					}
323  					return;
324  				}
325  				log_debug(knet_h, KNET_SUB_RX, "Attempting to process packet as clear data");
326  			} else {
327  				clock_gettime(CLOCK_MONOTONIC, &end_time);
328  				timespec_diff(start_time, end_time, &decrypt_time);
329  	
330  				len = outlen;
331  				inbuf = (struct knet_header *)knet_h->recv_from_links_buf_decrypt;
332  				decrypted = 1;
333  			}
334  		}
335  	
336  		if (len < (ssize_t)(KNET_HEADER_SIZE + 1)) {
337  			log_debug(knet_h, KNET_SUB_RX, "Packet is too short: %ld", (long)len);
338  			return;
339  		}
340  	
341  		if (inbuf->kh_version != KNET_HEADER_VERSION) {
342  			log_debug(knet_h, KNET_SUB_RX, "Packet version does not match");
343  			return;
344  		}
345  	
346  		inbuf->kh_node = ntohs(inbuf->kh_node);
347  		src_host = knet_h->host_index[inbuf->kh_node];
348  		if (src_host == NULL) {  /* host not found */
349  			log_debug(knet_h, KNET_SUB_RX, "Unable to find source host for this packet");
350  			return;
351  		}
352  	
353  		if ((inbuf->kh_type & KNET_HEADER_TYPE_PMSK) != 0) {
354  			/* be aware this works only for PING / PONG and PMTUd packets! */
355  			src_link = src_host->link +
356  				(inbuf->khp_ping_link % KNET_MAX_LINK);
357  			if (!_check_rx_acl(knet_h, src_link, msg)) {
358  				return;
359  			}
360  			if (src_link->dynamic == KNET_LINK_DYNIP) {
361  				if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) != 0) {
362  					log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u appears to have changed ip address",
363  						  src_host->host_id, src_link->link_id);
364  					memmove(&src_link->dst_addr, msg->msg_hdr.msg_name, sizeof(struct sockaddr_storage));
365  					if (knet_addrtostr(&src_link->dst_addr, sockaddr_len(&src_link->dst_addr),
366  							src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN,
367  							src_link->status.dst_port, KNET_MAX_PORT_LEN) != 0) {
368  						log_debug(knet_h, KNET_SUB_RX, "Unable to resolve ???");
369  						snprintf(src_link->status.dst_ipaddr, KNET_MAX_HOST_LEN - 1, "Unknown!!!");
370  						snprintf(src_link->status.dst_port, KNET_MAX_PORT_LEN - 1, "??");
371  					} else {
372  						log_info(knet_h, KNET_SUB_RX,
373  							 "host: %u link: %u new connection established from: %s:%s",
374  							 src_host->host_id, src_link->link_id,
375  							 src_link->status.dst_ipaddr, src_link->status.dst_port);
376  					}
377  				}
378  				/*
379  				 * transport has already accepted the connection here
380  				 * otherwise we would not be receiving packets
381  				 */
382  				transport_link_dyn_connect(knet_h, sockfd, src_link);
383  			}
384  		} else { /* data packet */
385  			for (i = 0; i < KNET_MAX_LINK; i++) {
386  				src_link = &src_host->link[i];
387  				if (cmpaddr(&src_link->dst_addr, msg->msg_hdr.msg_name) == 0) {
388  					found_link = 1;
389  					break;
390  				}
391  			}
392  			if (found_link) {
393  				/*
394  				 * this check is currently redundant.. Keep it here for now
395  				 */
396  				if (!_check_rx_acl(knet_h, src_link, msg)) {
397  					return;
398  				}
399  			} else {
400  				log_debug(knet_h, KNET_SUB_RX, "Unable to determine source link for data packet. Discarding packet.");
401  				return;
402  			}
403  		}
404  	
(47) Event lock_acquire: Example 1: Calling "pthread_mutex_lock" acquires lock "knet_link.link_stats_mutex".
(49) Event lock_acquire: Example 2: Calling "pthread_mutex_lock" acquires lock "knet_link.link_stats_mutex".
Also see events: [missing_lock][example_access][example_access]
405  		stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
406  		if (stats_err) {
407  			log_err(knet_h, KNET_SUB_RX, "Unable to get stats mutex lock for host %u link %u: %s",
408  				src_host->host_id, src_link->link_id, strerror(savederrno));
409  			return;
410  		}
411  	
412  		switch (inbuf->kh_type) {
413  		case KNET_HEADER_TYPE_DATA:
414  	
415  			/* data stats at the top for consistency with TX */
416  			src_link->status.stats.rx_data_packets++;
417  			src_link->status.stats.rx_data_bytes += len;
418  	
419  			if (decrypted) {
420  				stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
421  				if (stats_err < 0) {
422  					pthread_mutex_unlock(&src_link->link_stats_mutex);
423  					log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
424  					return;
425  				}
426  				/* Only update the crypto overhead for data packets. Mainly to be
427  				   consistent with TX */
428  				if (decrypt_time < knet_h->stats.rx_crypt_time_min) {
429  					knet_h->stats.rx_crypt_time_min = decrypt_time;
430  				}
431  				if (decrypt_time > knet_h->stats.rx_crypt_time_max) {
432  					knet_h->stats.rx_crypt_time_max = decrypt_time;
433  				}
434  				knet_h->stats.rx_crypt_time_ave =
435  					(knet_h->stats.rx_crypt_time_ave * knet_h->stats.rx_crypt_packets +
436  					 decrypt_time) / (knet_h->stats.rx_crypt_packets+1);
437  				knet_h->stats.rx_crypt_packets++;
438  				pthread_mutex_unlock(&knet_h->handle_stats_mutex);
439  			}
440  	
441  			if (!src_host->status.reachable) {
442  				if (!_fast_data_up(knet_h, src_host, src_link)) {
443  					pthread_mutex_unlock(&src_link->link_stats_mutex);
444  					log_debug(knet_h, KNET_SUB_RX, "Source host %u not reachable yet. Discarding packet.", src_host->host_id);
445  					return;
446  				}
447  			}
448  	
449  			inbuf->khp_data_seq_num = ntohs(inbuf->khp_data_seq_num);
450  			channel = inbuf->khp_data_channel;
451  			src_host->got_data = 1;
452  	
453  			if (!_seq_num_lookup(src_host, inbuf->khp_data_seq_num, 0, 0)) {
454  				pthread_mutex_unlock(&src_link->link_stats_mutex);
455  				if (src_host->link_handler_policy != KNET_LINK_POLICY_ACTIVE) {
456  					log_debug(knet_h, KNET_SUB_RX, "Packet has already been delivered");
457  				}
458  				return;
459  			}
460  	
461  			if (inbuf->khp_data_frag_num > 1) {
462  				/*
463  				 * len as received from the socket also includes extra stuff
464  				 * that the defrag code doesn't care about. So strip it
465  				 * here and readd only for repadding once we are done
466  				 * defragging
467  				 */
468  				len = len - KNET_HEADER_DATA_SIZE;
469  				if (pckt_defrag(knet_h, inbuf, &len)) {
470  					pthread_mutex_unlock(&src_link->link_stats_mutex);
471  					return;
472  				}
473  				len = len + KNET_HEADER_DATA_SIZE;
474  			}
475  	
476  			if (inbuf->khp_data_compress) {
477  				ssize_t decmp_outlen = KNET_DATABUFSIZE_COMPRESS;
478  				struct timespec start_time;
479  				struct timespec end_time;
480  				uint64_t compress_time;
481  	
482  				clock_gettime(CLOCK_MONOTONIC, &start_time);
483  				err = decompress(knet_h, inbuf->khp_data_compress,
484  						 (const unsigned char *)inbuf->khp_data_userdata,
485  						 len - KNET_HEADER_DATA_SIZE,
486  						 knet_h->recv_from_links_buf_decompress,
487  						 &decmp_outlen);
488  	
489  				stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
490  				if (stats_err < 0) {
491  					pthread_mutex_unlock(&src_link->link_stats_mutex);
492  					log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
493  					return;
494  				}
495  	
496  				clock_gettime(CLOCK_MONOTONIC, &end_time);
497  				timespec_diff(start_time, end_time, &compress_time);
498  	
499  				if (!err) {
500  					/* Collect stats */
501  					if (compress_time < knet_h->stats.rx_compress_time_min) {
502  						knet_h->stats.rx_compress_time_min = compress_time;
503  					}
504  					if (compress_time > knet_h->stats.rx_compress_time_max) {
505  						knet_h->stats.rx_compress_time_max = compress_time;
506  					}
507  					knet_h->stats.rx_compress_time_ave =
508  						(knet_h->stats.rx_compress_time_ave * knet_h->stats.rx_compressed_packets +
509  						 compress_time) / (knet_h->stats.rx_compressed_packets+1);
510  	
511  					knet_h->stats.rx_compressed_packets++;
512  					knet_h->stats.rx_compressed_original_bytes += decmp_outlen;
513  					knet_h->stats.rx_compressed_size_bytes += len - KNET_HEADER_SIZE;
514  	
515  					memmove(inbuf->khp_data_userdata, knet_h->recv_from_links_buf_decompress, decmp_outlen);
516  					len = decmp_outlen + KNET_HEADER_DATA_SIZE;
517  				} else {
518  					pthread_mutex_unlock(&knet_h->handle_stats_mutex);
519  					pthread_mutex_unlock(&src_link->link_stats_mutex);
520  					log_err(knet_h, KNET_SUB_COMPRESS, "Unable to decompress packet (%d): %s",
521  						err, strerror(errno));
522  					return;
523  				}
524  				pthread_mutex_unlock(&knet_h->handle_stats_mutex);
525  			}
526  	
527  			if (knet_h->enabled != 1) /* data forward is disabled */
528  				break;
529  	
530  			if (knet_h->dst_host_filter_fn) {
531  				size_t host_idx;
532  				int found = 0;
533  	
534  				bcast = knet_h->dst_host_filter_fn(
535  						knet_h->dst_host_filter_fn_private_data,
536  						(const unsigned char *)inbuf->khp_data_userdata,
537  						len - KNET_HEADER_DATA_SIZE,
538  						KNET_NOTIFY_RX,
539  						knet_h->host_id,
540  						inbuf->kh_node,
541  						&channel,
542  						dst_host_ids,
543  						&dst_host_ids_entries);
544  				if (bcast < 0) {
545  					pthread_mutex_unlock(&src_link->link_stats_mutex);
546  					log_debug(knet_h, KNET_SUB_RX, "Error from dst_host_filter_fn: %d", bcast);
547  					return;
548  				}
549  	
550  				if ((!bcast) && (!dst_host_ids_entries)) {
551  					pthread_mutex_unlock(&src_link->link_stats_mutex);
552  					log_debug(knet_h, KNET_SUB_RX, "Message is unicast but no dst_host_ids_entries");
553  					return;
554  				}
555  	
556  				/* check if we are dst for this packet */
557  				if (!bcast) {
558  					if (dst_host_ids_entries > KNET_MAX_HOST) {
559  						pthread_mutex_unlock(&src_link->link_stats_mutex);
560  						log_debug(knet_h, KNET_SUB_RX, "dst_host_filter_fn returned too many destinations");
561  						return;
562  					}
563  					for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
564  						if (dst_host_ids[host_idx] == knet_h->host_id) {
565  							found = 1;
566  							break;
567  						}
568  					}
569  					if (!found) {
570  						pthread_mutex_unlock(&src_link->link_stats_mutex);
571  						log_debug(knet_h, KNET_SUB_RX, "Packet is not for us");
572  						return;
573  					}
574  				}
575  			}
576  	
577  			if (!knet_h->sockfd[channel].in_use) {
578  				pthread_mutex_unlock(&src_link->link_stats_mutex);
579  				log_debug(knet_h, KNET_SUB_RX,
580  					  "received packet for channel %d but there is no local sock connected",
581  					  channel);
582  				return;
583  			}
584  	
585  			outlen = 0;
586  			memset(iov_out, 0, sizeof(iov_out));
587  	
588  	retry:
589  			iov_out[0].iov_base = (void *) inbuf->khp_data_userdata + outlen;
590  			iov_out[0].iov_len = len - (outlen + KNET_HEADER_DATA_SIZE);
591  	
592  			outlen = writev(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], iov_out, 1);
593  			if ((outlen > 0) && (outlen < (ssize_t)iov_out[0].iov_len)) {
594  				log_debug(knet_h, KNET_SUB_RX,
595  					  "Unable to send all data to the application in one go. Expected: %zu Sent: %zd\n",
596  					  iov_out[0].iov_len, outlen);
597  				goto retry;
598  			}
599  	
600  			if (outlen <= 0) {
601  				knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
602  						       knet_h->sockfd[channel].sockfd[0],
603  						       channel,
604  						       KNET_NOTIFY_RX,
605  						       outlen,
606  						       errno);
607  				pthread_mutex_unlock(&src_link->link_stats_mutex);
608  				return;
609  			}
610  			if ((size_t)outlen == iov_out[0].iov_len) {
611  				_seq_num_set(src_host, inbuf->khp_data_seq_num, 0);
612  			}
613  			break;
614  		case KNET_HEADER_TYPE_PING:
615  			outlen = KNET_HEADER_PING_SIZE;
616  			inbuf->kh_type = KNET_HEADER_TYPE_PONG;
617  			inbuf->kh_node = htons(knet_h->host_id);
618  			recv_seq_num = ntohs(inbuf->khp_ping_seq_num);
619  			src_link->status.stats.rx_ping_packets++;
620  			src_link->status.stats.rx_ping_bytes += len;
621  	
622  			wipe_bufs = 0;
623  	
624  			if (!inbuf->khp_ping_timed) {
625  				/*
626  				 * we might be receiving this message from all links, but we want
627  				 * to process it only the first time
628  				 */
629  				if (recv_seq_num != src_host->untimed_rx_seq_num) {
630  					/*
631  					 * cache the untimed seq num
632  					 */
633  					src_host->untimed_rx_seq_num = recv_seq_num;
634  					/*
635  					 * if the host has received data in between
636  					 * untimed ping, then we don't need to wipe the bufs
637  					 */
638  					if (src_host->got_data) {
639  						src_host->got_data = 0;
640  						wipe_bufs = 0;
641  					} else {
642  						wipe_bufs = 1;
643  					}
644  				}
645  				_seq_num_lookup(src_host, recv_seq_num, 0, wipe_bufs);
646  			} else {
647  				/*
648  				 * pings always arrives in bursts over all the link
649  				 * catch the first of them to cache the seq num and
650  				 * avoid duplicate processing
651  				 */
652  				if (recv_seq_num != src_host->timed_rx_seq_num) {
653  					src_host->timed_rx_seq_num = recv_seq_num;
654  	
655  					if (recv_seq_num == 0) {
656  						_seq_num_lookup(src_host, recv_seq_num, 0, 1);
657  					}
658  				}
659  			}
660  	
661  			if (knet_h->crypto_in_use_config) {
662  				if (crypto_encrypt_and_sign(knet_h,
663  							    (const unsigned char *)inbuf,
664  							    outlen,
665  							    knet_h->recv_from_links_buf_crypt,
666  							    &outlen) < 0) {
667  					log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt pong packet");
668  					break;
669  				}
670  				outbuf = knet_h->recv_from_links_buf_crypt;
671  				stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
672  				if (stats_err < 0) {
673  					log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
674  					break;
675  				}
676  				knet_h->stats_extra.tx_crypt_pong_packets++;
677  				pthread_mutex_unlock(&knet_h->handle_stats_mutex);
678  			}
679  	
680  	retry_pong:
681  			if (src_link->transport_connected) {
682  				if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
683  					len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
684  						     (struct sockaddr *) &src_link->dst_addr, knet_h->knet_transport_fd_tracker[src_link->outsock].sockaddr_len);
685  				} else {
686  					len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
687  				}
688  				savederrno = errno;
689  				if (len != outlen) {
690  					err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, KNET_SUB_RX, len, savederrno);
691  					switch(err) {
692  						case -1: /* unrecoverable error */
693  							log_debug(knet_h, KNET_SUB_RX,
694  								  "Unable to send pong reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
695  								  src_link->outsock, errno, strerror(errno),
696  								  src_link->status.src_ipaddr, src_link->status.src_port,
697  								  src_link->status.dst_ipaddr, src_link->status.dst_port);
698  							src_link->status.stats.tx_pong_errors++;
699  							break;
700  						case 0: /* ignore error and continue */
701  							break;
702  						case 1: /* retry to send those same data */
703  							src_link->status.stats.tx_pong_retries++;
704  							goto retry_pong;
705  							break;
706  					}
707  				}
708  				src_link->status.stats.tx_pong_packets++;
709  				src_link->status.stats.tx_pong_bytes += outlen;
710  			}
711  			break;
712  		case KNET_HEADER_TYPE_PONG:
713  			src_link->status.stats.rx_pong_packets++;
714  			src_link->status.stats.rx_pong_bytes += len;
715  			clock_gettime(CLOCK_MONOTONIC, &src_link->status.pong_last);
716  	
717  			memmove(&recvtime, &inbuf->khp_ping_time[0], sizeof(struct timespec));
718  			timespec_diff(recvtime,
719  					src_link->status.pong_last, &latency_last);
720  	
721  			if ((latency_last / 1000llu) > src_link->pong_timeout) {
722  				log_debug(knet_h, KNET_SUB_RX,
723  					  "Incoming pong packet from host: %u link: %u has higher latency than pong_timeout. Discarding",
724  					  src_host->host_id, src_link->link_id);
725  			} else {
726  	
727  				/*
728  				 * in words : ('previous mean' * '(count -1)') + 'new value') / 'count'
729  				 */
730  	
(48) Event example_access: Example 1 (cont.): "knet_link.latency_cur_samples" is written to with lock "knet_link.link_stats_mutex" held.
Also see events: [missing_lock][lock_acquire][lock_acquire][example_access]
731  				src_link->latency_cur_samples++;
732  	
733  				/*
734  				 * limit to max_samples (precision)
735  				 */
736  				if (src_link->latency_cur_samples >= src_link->latency_max_samples) {
(50) Event example_access: Example 2 (cont.): "knet_link.latency_cur_samples" is written to with lock "knet_link.link_stats_mutex" held.
Also see events: [missing_lock][lock_acquire][example_access][lock_acquire]
737  					src_link->latency_cur_samples = src_link->latency_max_samples;
738  				}
739  				src_link->status.latency =
740  					(((src_link->status.latency * (src_link->latency_cur_samples - 1)) + (latency_last / 1000llu)) / src_link->latency_cur_samples);
741  	
742  				if (src_link->status.latency < src_link->pong_timeout_adj) {
743  					if (!src_link->status.connected) {
744  						if (src_link->received_pong >= src_link->pong_count) {
745  							log_info(knet_h, KNET_SUB_RX, "host: %u link: %u is up",
746  								 src_host->host_id, src_link->link_id);
747  							_link_updown(knet_h, src_host->host_id, src_link->link_id, src_link->status.enabled, 1, 0);
748  						} else {
749  							src_link->received_pong++;
750  							log_debug(knet_h, KNET_SUB_RX, "host: %u link: %u received pong: %u",
751  								  src_host->host_id, src_link->link_id, src_link->received_pong);
752  						}
753  					}
754  				}
755  				/* Calculate latency stats */
756  				if (src_link->status.latency > src_link->status.stats.latency_max) {
757  					src_link->status.stats.latency_max = src_link->status.latency;
758  				}
759  				if (src_link->status.latency < src_link->status.stats.latency_min) {
760  					src_link->status.stats.latency_min = src_link->status.latency;
761  				}
762  	
763  				/*
764  				 * those 2 lines below make all latency average calculations consistent and capped to
765  				 * link precision. In future we will kill the one above to keep only this one in
766  				 * the stats structure, but for now we leave it around to avoid API/ABI
767  				 * breakage as we backport the fixes to stable
768  				 */
769  				src_link->status.stats.latency_ave = src_link->status.latency;
770  				src_link->status.stats.latency_samples = src_link->latency_cur_samples;
771  			}
772  			break;
773  		case KNET_HEADER_TYPE_PMTUD:
774  			src_link->status.stats.rx_pmtu_packets++;
775  			src_link->status.stats.rx_pmtu_bytes += len;
776  			outlen = KNET_HEADER_PMTUD_SIZE;
777  			inbuf->kh_type = KNET_HEADER_TYPE_PMTUD_REPLY;
778  			inbuf->kh_node = htons(knet_h->host_id);
779  	
780  			if (knet_h->crypto_in_use_config) {
781  				if (crypto_encrypt_and_sign(knet_h,
782  							    (const unsigned char *)inbuf,
783  							    outlen,
784  							    knet_h->recv_from_links_buf_crypt,
785  							    &outlen) < 0) {
786  					log_debug(knet_h, KNET_SUB_RX, "Unable to encrypt PMTUd reply packet");
787  					break;
788  				}
789  				outbuf = knet_h->recv_from_links_buf_crypt;
790  				stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
791  				if (stats_err < 0) {
792  					log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
793  					break;
794  				}
795  				knet_h->stats_extra.tx_crypt_pmtu_reply_packets++;
796  				pthread_mutex_unlock(&knet_h->handle_stats_mutex);
797  			}
798  	
799  			/* Unlock so we don't deadlock with tx_mutex */
800  			pthread_mutex_unlock(&src_link->link_stats_mutex);
801  	
802  			savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
803  			if (savederrno) {
804  				log_err(knet_h, KNET_SUB_RX, "Unable to get TX mutex lock: %s", strerror(savederrno));
805  				goto out_pmtud;
806  			}
807  	retry_pmtud:
808  			if (src_link->transport_connected) {
809  				if (transport_get_connection_oriented(knet_h, src_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
810  					len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
811  						     (struct sockaddr *) &src_link->dst_addr, knet_h->knet_transport_fd_tracker[src_link->outsock].sockaddr_len);
812  				} else {
813  					len = sendto(src_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
814  				}
815  				savederrno = errno;
816  				if (len != outlen) {
817  					err = transport_tx_sock_error(knet_h, src_link->transport, src_link->outsock, KNET_SUB_RX, len, savederrno);
818  					stats_err = pthread_mutex_lock(&src_link->link_stats_mutex);
819  					if (stats_err < 0) {
820  						log_err(knet_h, KNET_SUB_RX, "Unable to get mutex lock: %s", strerror(stats_err));
821  						break;
822  					}
823  					switch(err) {
824  						case -1: /* unrecoverable error */
825  							log_debug(knet_h, KNET_SUB_RX,
826  								  "Unable to send PMTUd reply (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
827  								  src_link->outsock, errno, strerror(errno),
828  								  src_link->status.src_ipaddr, src_link->status.src_port,
829  								  src_link->status.dst_ipaddr, src_link->status.dst_port);
830  	
831  							src_link->status.stats.tx_pmtu_errors++;
832  							break;
833  						case 0: /* ignore error and continue */
834  							src_link->status.stats.tx_pmtu_errors++;
835  							break;
836  						case 1: /* retry to send those same data */
837  							src_link->status.stats.tx_pmtu_retries++;
838  							pthread_mutex_unlock(&src_link->link_stats_mutex);
839  							goto retry_pmtud;
840  							break;
841  					}
842  					pthread_mutex_unlock(&src_link->link_stats_mutex);
843  				}
844  			}
845  			pthread_mutex_unlock(&knet_h->tx_mutex);
846  	out_pmtud:
847  			return; /* Don't need to unlock link_stats_mutex */
848  		case KNET_HEADER_TYPE_PMTUD_REPLY:
849  			src_link->status.stats.rx_pmtu_packets++;
850  			src_link->status.stats.rx_pmtu_bytes += len;
851  	
852  			/* pmtud_mutex can't be acquired while we hold a link_stats_mutex (ordering) */
853  			pthread_mutex_unlock(&src_link->link_stats_mutex);
854  	
855  			if (pthread_mutex_lock(&knet_h->pmtud_mutex) != 0) {
856  				log_debug(knet_h, KNET_SUB_RX, "Unable to get mutex lock");
857  				break;
858  			}
859  			src_link->last_recv_mtu = inbuf->khp_pmtud_size;
860  			pthread_cond_signal(&knet_h->pmtud_cond);
861  			pthread_mutex_unlock(&knet_h->pmtud_mutex);
862  			return;
863  		default:
864  			pthread_mutex_unlock(&src_link->link_stats_mutex);
865  			return;
866  		}
867  		pthread_mutex_unlock(&src_link->link_stats_mutex);
868  	}
869  	
870  	static void _handle_recv_from_links(knet_handle_t knet_h, int sockfd, struct knet_mmsghdr *msg)
871  	{
872  		int err, savederrno;
873  		int i, msg_recv, transport;
874  	
875  		if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
876  			log_debug(knet_h, KNET_SUB_RX, "Unable to get global read lock");
877  			return;
878  		}
879  	
880  		if (_is_valid_fd(knet_h, sockfd) < 1) {
881  			/*
882  			 * this is normal if a fd got an event and before we grab the read lock
883  			 * and the link is removed by another thread
884  			 */
885  			goto exit_unlock;
886  		}
887  	
888  		transport = knet_h->knet_transport_fd_tracker[sockfd].transport;
889  	
890  		/*
891  		 * reset msg_namelen to buffer size because after recvmmsg
892  		 * each msg_namelen will contain sizeof sockaddr_in or sockaddr_in6
893  		 */
894  	
895  		for (i = 0; i < PCKT_RX_BUFS; i++) {
896  			msg[i].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len;
897  		}
898  	
899  		msg_recv = _recvmmsg(sockfd, &msg[0], PCKT_RX_BUFS, MSG_DONTWAIT | MSG_NOSIGNAL);
900  		savederrno = errno;
901  	
902  		/*
903  		 * WARNING: man page for recvmmsg is wrong. Kernel implementation here:
904  		 * recvmmsg can return:
905  		 * -1 on error
906  		 *  0 if the previous run of recvmmsg recorded an error on the socket
907  		 *  N number of messages (see exception below).
908  		 *
909  		 * If there is an error from recvmsg after receiving a frame or more, the recvmmsg
910  		 * loop is interrupted, error recorded in the socket (getsockopt(SO_ERROR) and
911  		 * it will be visibile in the next run.
912  		 *
913  		 * Need to be careful how we handle errors at this stage.
914  		 *
915  		 * error messages need to be handled on a per transport/protocol base
916  		 * at this point we have different layers of error handling
917  		 * - msg_recv < 0 -> error from this run
918  		 *   msg_recv = 0 -> error from previous run and error on socket needs to be cleared
919  		 * - per-transport message data
920  		 *   example: msg[i].msg_hdr.msg_flags & MSG_NOTIFICATION or msg_len for SCTP == EOF,
921  		 *            but for UDP it is perfectly legal to receive a 0 bytes message.. go figure
922  		 * - NOTE: on SCTP MSG_NOTIFICATION we get msg_recv == PCKT_FRAG_MAX messages and no
923  		 *         errno set. That means the error api needs to be able to abort the loop below.
924  		 */
925  	
926  		if (msg_recv <= 0) {
927  			transport_rx_sock_error(knet_h, transport, sockfd, msg_recv, savederrno);
928  			goto exit_unlock;
929  		}
930  	
931  		for (i = 0; i < msg_recv; i++) {
932  			err = transport_rx_is_data(knet_h, transport, sockfd, &msg[i]);
933  	
934  			/*
935  			 * TODO: make this section silent once we are confident
936  			 *       all protocols packet handlers are good
937  			 */
938  	
939  			switch(err) {
940  				case KNET_TRANSPORT_RX_ERROR: /* on error */
941  					log_debug(knet_h, KNET_SUB_RX, "Transport reported error parsing packet");
942  					goto exit_unlock;
943  					break;
944  				case KNET_TRANSPORT_RX_NOT_DATA_CONTINUE: /* packet is not data and we should continue the packet process loop */
945  					log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, continue");
946  					break;
947  				case KNET_TRANSPORT_RX_NOT_DATA_STOP: /* packet is not data and we should STOP the packet process loop */
948  					log_debug(knet_h, KNET_SUB_RX, "Transport reported no data, stop");
949  					goto exit_unlock;
950  					break;
951  				case KNET_TRANSPORT_RX_IS_DATA: /* packet is data and should be parsed as such */
952  					_parse_recv_from_links(knet_h, sockfd, &msg[i]);
953  					break;
954  				case KNET_TRANSPORT_RX_OOB_DATA_CONTINUE:
955  					log_debug(knet_h, KNET_SUB_RX, "Transport is processing sock OOB data, continue");
956  					break;
957  				case KNET_TRANSPORT_RX_OOB_DATA_STOP:
958  					log_debug(knet_h, KNET_SUB_RX, "Transport has completed processing sock OOB data, stop");
959  					goto exit_unlock;
960  					break;
961  			}
962  		}
963  	
964  	exit_unlock:
965  		pthread_rwlock_unlock(&knet_h->global_rwlock);
966  	}
967  	
968  	void *_handle_recv_from_links_thread(void *data)
969  	{
970  		int i, nev;
971  		knet_handle_t knet_h = (knet_handle_t) data;
972  		struct epoll_event events[KNET_EPOLL_MAX_EVENTS];
973  		struct sockaddr_storage address[PCKT_RX_BUFS];
974  		struct knet_mmsghdr msg[PCKT_RX_BUFS];
975  		struct iovec iov_in[PCKT_RX_BUFS];
976  	#if defined(IP_PKTINFO) || defined(IPV6_PKTINFO)
977  		unsigned char control_in[PCKT_RX_BUFS][CMSG_SPACE(sizeof(struct in6_pktinfo))];
978  	#endif
979  	
980  		set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STARTED);
981  	
982  		memset(&msg, 0, sizeof(msg));
983  		memset(&events, 0, sizeof(events));
984  	
985  		for (i = 0; i < PCKT_RX_BUFS; i++) {
986  			iov_in[i].iov_base = (void *)knet_h->recv_from_links_buf[i];
987  			iov_in[i].iov_len = KNET_DATABUFSIZE;
988  	
989  			memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr));
990  	
991  			msg[i].msg_hdr.msg_name = &address[i];
992  			msg[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* Real value filled in before actual use */
993  			msg[i].msg_hdr.msg_iov = &iov_in[i];
994  			msg[i].msg_hdr.msg_iovlen = 1;
995  	#if defined(IP_PKTINFO) || defined(IPV6_PKTINFO)
996  			msg[i].msg_hdr.msg_control = &control_in[i][0];
997  			msg[i].msg_hdr.msg_controllen = CMSG_SPACE(sizeof(struct in6_pktinfo)); /* Largest of the two pktinfo structs */
998  	#endif
999  		}
1000 	
1001 		while (!shutdown_in_progress(knet_h)) {
1002 			nev = epoll_wait(knet_h->recv_from_links_epollfd, events, KNET_EPOLL_MAX_EVENTS, KNET_THREADS_TIMERES / 1000);
1003 	
1004 			/*
1005 			 * the RX threads only need to notify that there has been at least
1006 			 * one successful run after queue flush has been requested.
1007 			 * See setfwd in handle.c
1008 			 */
1009 			if (get_thread_flush_queue(knet_h, KNET_THREAD_RX) == KNET_THREAD_QUEUE_FLUSH) {
1010 				set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSHED);
1011 			}
1012 	
1013 			/*
1014 			 * we use timeout to detect if thread is shutting down
1015 			 */
1016 			if (nev == 0) {
1017 				continue;
1018 			}
1019 	
1020 			for (i = 0; i < nev; i++) {
1021 				_handle_recv_from_links(knet_h, events[i].data.fd, msg);
1022 			}
1023 		}
1024 	
1025 		set_thread_status(knet_h, KNET_THREAD_RX, KNET_THREAD_STOPPED);
1026 	
1027 		return NULL;
1028 	}
1029 	
1030 	ssize_t knet_recv(knet_handle_t knet_h, char *buff, const size_t buff_len, const int8_t channel)
1031 	{
1032 		int savederrno = 0;
1033 		ssize_t err = 0;
1034 		struct iovec iov_in;
1035 	
1036 		if (!_is_valid_handle(knet_h)) {
1037 			return -1;
1038 		}
1039 	
1040 		if (buff == NULL) {
1041 			errno = EINVAL;
1042 			return -1;
1043 		}
1044 	
1045 		if (buff_len <= 0) {
1046 			errno = EINVAL;
1047 			return -1;
1048 		}
1049 	
1050 		if (buff_len > KNET_MAX_PACKET_SIZE) {
1051 			errno = EINVAL;
1052 			return -1;
1053 		}
1054 	
1055 		if (channel < 0) {
1056 			errno = EINVAL;
1057 			return -1;
1058 		}
1059 	
1060 		if (channel >= KNET_DATAFD_MAX) {
1061 			errno = EINVAL;
1062 			return -1;
1063 		}
1064 	
1065 		savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
1066 		if (savederrno) {
1067 			log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
1068 				strerror(savederrno));
1069 			errno = savederrno;
1070 			return -1;
1071 		}
1072 	
1073 		if (!knet_h->sockfd[channel].in_use) {
1074 			savederrno = EINVAL;
1075 			err = -1;
1076 			goto out_unlock;
1077 		}
1078 	
1079 		memset(&iov_in, 0, sizeof(iov_in));
1080 		iov_in.iov_base = (void *)buff;
1081 		iov_in.iov_len = buff_len;
1082 	
1083 		err = readv(knet_h->sockfd[channel].sockfd[0], &iov_in, 1);
1084 		savederrno = errno;
1085 	
1086 	out_unlock:
1087 		pthread_rwlock_unlock(&knet_h->global_rwlock);
1088 		errno = err ? savederrno : 0;
1089 		return err;
1090 	}
1091