1    	/*
2    	 * Copyright (C) 2012-2025 Red Hat, Inc.  All rights reserved.
3    	 *
4    	 * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
5    	 *          Federico Simoncelli <fsimon@kronosnet.org>
6    	 *
7    	 * This software licensed under LGPL-2.0+
8    	 */
9    	
10   	#include "config.h"
11   	
12   	#include <math.h>
13   	#include <string.h>
14   	#include <pthread.h>
15   	#include <unistd.h>
16   	#include <sys/uio.h>
17   	#include <errno.h>
18   	
19   	#include "compat.h"
20   	#include "compress.h"
21   	#include "crypto.h"
22   	#include "host.h"
23   	#include "link.h"
24   	#include "logging.h"
25   	#include "transports.h"
26   	#include "transport_common.h"
27   	#include "threads_common.h"
28   	#include "threads_heartbeat.h"
29   	#include "threads_tx.h"
30   	#include "netutils.h"
31   	
32   	/*
33   	 * SEND
34   	 */
35   	
36   	static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
37   	{
38   		int link_idx, msg_idx, sent_msgs, prev_sent, progress;
39   		int err = 0, savederrno = 0, locked = 0;
40   		unsigned int i;
41   		struct knet_mmsghdr *cur;
42   		struct knet_link *cur_link;
43   	
44   		for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
45   			prev_sent = 0;
46   			progress = 1;
47   			locked = 0;
48   	
49   			cur_link = &dst_host->link[dst_host->active_links[link_idx]];
50   	
51   			if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) {
52   				continue;
53   			}
54   	
55   			savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex);
56   			if (savederrno) {
57   				log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s",
58   					dst_host->host_id, cur_link->link_id, strerror(savederrno));
59   				continue;
60   			}
61   			locked = 1;
62   	
63   			msg_idx = 0;
64   			while (msg_idx < msgs_to_send) {
65   				msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
66   				msg[msg_idx].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[cur_link->outsock].sockaddr_len;
67   	
68   				/* Cast for Linux/BSD compatibility */
69   				for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) {
70   					cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len;
71   				}
72   				cur_link->status.stats.tx_data_packets++;
73   				msg_idx++;
74   			}
75   	
76   	retry:
77   			cur = &msg[prev_sent];
78   	
79   			sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
80   					      transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport),
81   					      &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
82   			savederrno = errno;
83   	
84   			err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, KNET_SUB_TX, sent_msgs, savederrno);
85   			switch(err) {
86   				case -1: /* unrecoverable error */
87   					cur_link->status.stats.tx_data_errors++;
88   					goto out_unlock;
89   					break;
90   				case 0: /* ignore error and continue */
91   					break;
92   				case 1: /* retry to send those same data */
93   					cur_link->status.stats.tx_data_retries++;
94   					goto retry;
95   					break;
96   			}
97   	
98   			prev_sent = prev_sent + sent_msgs;
99   	
100  			if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
101  				if ((sent_msgs) || (progress)) {
102  					if (sent_msgs) {
103  						progress = 1;
104  					} else {
105  						progress = 0;
106  					}
107  					log_trace(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
108  						  sent_msgs, msg_idx,
109  						  dst_host->name, dst_host->host_id,
110  						  dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
111  						  dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
112  						  dst_host->link[dst_host->active_links[link_idx]].link_id);
113  					goto retry;
114  				}
115  				if (!progress) {
116  					savederrno = EAGAIN;
117  					err = -1;
118  					goto out_unlock;
119  				}
120  			}
121  	
122  			if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
123  			    (dst_host->active_link_entries > 1)) {
124  				uint8_t cur_link_id = dst_host->active_links[0];
125  	
126  				memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
127  				dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
128  	
129  				break;
130  			}
131  			pthread_mutex_unlock(&cur_link->link_stats_mutex);
132  			locked = 0;
133  		}
134  	
135  	out_unlock:
136  		if (locked) {
137  			pthread_mutex_unlock(&cur_link->link_stats_mutex);
138  		}
139  		errno = savederrno;
140  		return err;
141  	}
142  	
143  	static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync)
144  	{
145  		size_t outlen, frag_len;
146  		struct knet_host *dst_host;
147  		knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST];
148  		size_t dst_host_ids_entries_temp = 0;
149  		knet_node_id_t dst_host_ids[KNET_MAX_HOST];
150  		size_t dst_host_ids_entries = 0;
151  		int bcast = 1;
152  		struct iovec iov_out[PCKT_FRAG_MAX][2];
153  		int iovcnt_out = 2;
154  		uint8_t frag_idx;
155  		unsigned int temp_data_mtu;
156  		size_t host_idx;
157  		int send_mcast = 0;
158  		struct knet_header *inbuf;
159  		int savederrno = 0;
160  		int err = 0;
161  		seq_num_t tx_seq_num;
162  		struct knet_mmsghdr msg[PCKT_FRAG_MAX];
163  		int msgs_to_send, msg_idx;
164  		unsigned int i;
165  		int j;
166  		int send_local = 0;
167  		int data_compressed = 0;
168  		size_t uncrypted_frag_size;
169  		int stats_locked = 0, stats_err = 0;
170  	
171  		inbuf = knet_h->recv_from_sock_buf;
172  	
173  		if (knet_h->enabled != 1) {
174  			log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
175  			savederrno = ECANCELED;
176  			err = -1;
177  			goto out_unlock;
178  		}
179  	
180  		/*
181  		 * move this into a separate function to expand on
182  		 * extra switching rules
183  		 */
184  		switch(inbuf->kh_type) {
185  			case KNET_HEADER_TYPE_DATA:
186  				if (knet_h->dst_host_filter_fn) {
187  					bcast = knet_h->dst_host_filter_fn(
188  							knet_h->dst_host_filter_fn_private_data,
189  							(const unsigned char *)inbuf->khp_data_userdata,
190  							inlen,
191  							KNET_NOTIFY_TX,
192  							knet_h->host_id,
193  							knet_h->host_id,
194  							&channel,
195  							dst_host_ids_temp,
196  							&dst_host_ids_entries_temp);
197  					if (bcast < 0) {
198  						log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
199  						savederrno = EFAULT;
200  						err = -1;
201  						goto out_unlock;
202  					}
203  	
204  					if ((!bcast) && (!dst_host_ids_entries_temp)) {
205  						log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
206  						savederrno = EINVAL;
207  						err = -1;
208  						goto out_unlock;
209  					}
210  	
211  					if ((!bcast) &&
212  					    (dst_host_ids_entries_temp > KNET_MAX_HOST)) {
213  						log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations");
214  						savederrno = EINVAL;
215  						err = -1;
216  						goto out_unlock;
217  					}
218  				}
219  	
220  				/* Send to localhost if appropriate and enabled */
221  				if (knet_h->has_loop_link) {
222  					send_local = 0;
223  					if (bcast) {
224  						send_local = 1;
225  					} else {
226  						for (i=0; i< dst_host_ids_entries_temp; i++) {
227  							if (dst_host_ids_temp[i] == knet_h->host_id) {
228  								send_local = 1;
229  							}
230  						}
231  					}
232  					if (send_local) {
233  						const unsigned char *buf = inbuf->khp_data_userdata;
234  						ssize_t buflen = inlen;
235  						struct knet_link *local_link;
236  	
237  						local_link = knet_h->host_index[knet_h->host_id]->link;
238  	
239  					local_retry:
240  						err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen);
241  						if (err < 0) {
242  							log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno));
243  							local_link->status.stats.tx_data_errors++;
244  						}
245  						if (err > 0 && err < buflen) {
246  							log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen);
247  							local_link->status.stats.tx_data_retries++;
248  							buf += err;
249  							buflen -= err;
250  							goto local_retry;
251  						}
252  						if (err == buflen) {
253  							local_link->status.stats.tx_data_packets++;
254  							local_link->status.stats.tx_data_bytes += inlen;
255  						}
256  					}
257  				}
258  				break;
259  			default:
260  				log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
261  				savederrno = ENOMSG;
262  				err = -1;
263  				goto out_unlock;
264  				break;
265  		}
266  	
267  		if (is_sync) {
268  			if ((bcast) ||
269  			    ((!bcast) && (dst_host_ids_entries_temp > 1))) {
270  				log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
271  				savederrno = E2BIG;
272  				err = -1;
273  				goto out_unlock;
274  			}
275  		}
276  	
277  		/*
278  		 * check destinations hosts before spending time
279  		 * in fragmenting/encrypting packets to save
280  		 * time processing data for unreachable hosts.
281  		 * for unicast, also remap the destination data
282  		 * to skip unreachable hosts.
283  		 */
284  	
285  		if (!bcast) {
286  			dst_host_ids_entries = 0;
287  			for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
288  				dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
289  				if (!dst_host) {
290  					continue;
291  				}
292  				if (!(dst_host->host_id == knet_h->host_id &&
293  				     knet_h->has_loop_link) &&
294  				    dst_host->status.reachable) {
295  					dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
296  					dst_host_ids_entries++;
297  				}
298  			}
299  			if (!dst_host_ids_entries) {
300  				savederrno = EHOSTDOWN;
301  				err = -1;
302  				goto out_unlock;
303  			}
304  		} else {
305  			send_mcast = 0;
306  			for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
307  				if (!(dst_host->host_id == knet_h->host_id &&
308  				      knet_h->has_loop_link) &&
309  				    dst_host->status.reachable) {
310  					send_mcast = 1;
311  					break;
312  				}
313  			}
314  			if (!send_mcast) {
315  				savederrno = EHOSTDOWN;
316  				err = -1;
317  				goto out_unlock;
318  			}
319  		}
320  	
321  		if (!knet_h->data_mtu) {
322  			/*
323  			 * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
324  			 */
325  			log_debug(knet_h, KNET_SUB_TX,
326  				  "Received data packet but data MTU is still unknown."
327  				  " Packet might not be delivered."
328  				  " Assuming minimum IPv4 MTU (%d)",
329  				  KNET_PMTUD_MIN_MTU_V4);
330  			temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
331  		} else {
332  			/*
333  			 * take a copy of the mtu to avoid value changing under
334  			 * our feet while we are sending a fragmented pckt
335  			 */
336  			temp_data_mtu = knet_h->data_mtu;
337  		}
338  	
339  		/*
340  		 * compress data
341  		 */
342  		if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) {
343  			size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS;
344  			struct timespec start_time;
345  			struct timespec end_time;
346  			uint64_t compress_time;
347  	
348  			clock_gettime(CLOCK_MONOTONIC, &start_time);
349  			err = compress(knet_h,
350  				       (const unsigned char *)inbuf->khp_data_userdata, inlen,
351  				       knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen);
352  	
353  			savederrno = errno;
354  	
355  			stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
356  			if (stats_err < 0) {
357  				log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
358  				err = -1;
359  				savederrno = stats_err;
360  				goto out_unlock;
361  			}
362  			stats_locked = 1;
363  			/* Collect stats */
364  			clock_gettime(CLOCK_MONOTONIC, &end_time);
365  			timespec_diff(start_time, end_time, &compress_time);
366  	
367  			if (compress_time < knet_h->stats.tx_compress_time_min) {
368  				knet_h->stats.tx_compress_time_min = compress_time;
369  			}
370  			if (compress_time > knet_h->stats.tx_compress_time_max) {
371  				knet_h->stats.tx_compress_time_max = compress_time;
372  			}
373  			knet_h->stats.tx_compress_time_ave =
374  				(unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets +
375  				 compress_time) / (knet_h->stats.tx_compressed_packets+1);
376  			if (err < 0) {
377  				log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno));
378  			} else {
379  				knet_h->stats.tx_compressed_packets++;
380  				knet_h->stats.tx_compressed_original_bytes += inlen;
381  				knet_h->stats.tx_compressed_size_bytes += cmp_outlen;
382  	
383  				if (cmp_outlen < inlen) {
384  					memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen);
385  					inlen = cmp_outlen;
386  					data_compressed = 1;
387  				}
388  			}
389  		}
390  		if (!stats_locked) {
391  			stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
392  			if (stats_err < 0) {
393  				log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
394  				err = -1;
395  				savederrno = stats_err;
396  				goto out_unlock;
397  			}
398  		}
399  		if (knet_h->compress_model > 0 && !data_compressed) {
400  			knet_h->stats.tx_uncompressed_packets++;
401  		}
402  		pthread_mutex_unlock(&knet_h->handle_stats_mutex);
403  		stats_locked = 0;
404  	
405  		/*
406  		 * prepare the outgoing buffers
407  		 */
408  	
409  		frag_len = inlen;
410  		frag_idx = 0;
411  	
412  		inbuf->khp_data_bcast = bcast;
413  		inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
414  		inbuf->khp_data_channel = channel;
415  		if (data_compressed) {
416  			inbuf->khp_data_compress = knet_h->compress_model;
417  		} else {
418  			inbuf->khp_data_compress = 0;
419  		}
420  	
421  		if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
422  			log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
423  			goto out_unlock;
424  		}
425  		knet_h->tx_seq_num++;
426  		/*
427  		 * force seq_num 0 to detect a node that has crashed and rejoining
428  		 * the knet instance. seq_num 0 will clear the buffers in the RX
429  		 * thread
430  		 */
431  		if (knet_h->tx_seq_num == 0) {
432  			knet_h->tx_seq_num++;
433  		}
434  		/*
435  		 * cache the value in locked context
436  		 */
437  		tx_seq_num = knet_h->tx_seq_num;
438  		inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num);
439  		pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
440  	
441  		/*
442  		 * forcefully broadcast a ping to all nodes every SEQ_MAX / 8
443  		 * pckts.
444  		 * this solves 2 problems:
445  		 * 1) on TX socket overloads we generate extra pings to keep links alive
446  		 * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
447  		 *    node 3+ will be able to keep in sync on the TX seq_num even without
448  		 *    receiving traffic or pings in betweens. This avoids issues with
449  		 *    rollover of the circular buffer
450  		 */
451  	
452  		if (tx_seq_num % (SEQ_MAX / 8) == 0) {
453  			_send_pings(knet_h, 0);
454  		}
455  	
456  		if (inbuf->khp_data_frag_num > 1) {
457  			while (frag_idx < inbuf->khp_data_frag_num) {
458  				/*
459  				 * set the iov_base
460  				 */
461  				iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
462  				iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE;
463  				iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx);
464  	
465  				/*
466  				 * set the len
467  				 */
468  				if (frag_len > temp_data_mtu) {
469  					iov_out[frag_idx][1].iov_len = temp_data_mtu;
470  				} else {
471  					iov_out[frag_idx][1].iov_len = frag_len;
472  				}
473  	
474  				/*
475  				 * copy the frag info on all buffers
476  				 */
477  				knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
478  				knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
479  				knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
480  				knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
481  				knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
482  				knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress;
483  	
484  				frag_len = frag_len - temp_data_mtu;
485  				frag_idx++;
486  			}
487  			iovcnt_out = 2;
488  		} else {
489  			iov_out[frag_idx][0].iov_base = (void *)inbuf;
490  			iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
491  			iovcnt_out = 1;
492  		}
493  	
494  		if (knet_h->crypto_in_use_config) {
495  			struct timespec start_time;
496  			struct timespec end_time;
497  			uint64_t crypt_time;
498  	
499  			frag_idx = 0;
500  			while (frag_idx < inbuf->khp_data_frag_num) {
501  				clock_gettime(CLOCK_MONOTONIC, &start_time);
502  				if (crypto_encrypt_and_signv(
503  						knet_h,
504  						iov_out[frag_idx], iovcnt_out,
505  						knet_h->send_to_links_buf_crypt[frag_idx],
506  						(ssize_t *)&outlen) < 0) {
507  					log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
508  					savederrno = ECHILD;
509  					err = -1;
510  					goto out_unlock;
511  				}
512  				clock_gettime(CLOCK_MONOTONIC, &end_time);
513  				timespec_diff(start_time, end_time, &crypt_time);
514  	
515  				stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
516  				if (stats_err < 0) {
517  					log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
518  					err = -1;
519  					savederrno = stats_err;
520  					goto out_unlock;
521  				}
522  	
523  				if (crypt_time < knet_h->stats.tx_crypt_time_min) {
524  					knet_h->stats.tx_crypt_time_min = crypt_time;
525  				}
526  				if (crypt_time > knet_h->stats.tx_crypt_time_max) {
527  					knet_h->stats.tx_crypt_time_max = crypt_time;
528  				}
529  				knet_h->stats.tx_crypt_time_ave =
530  					(knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets +
531  					 crypt_time) / (knet_h->stats.tx_crypt_packets+1);
532  	
533  				uncrypted_frag_size = 0;
534  				for (j=0; j < iovcnt_out; j++) {
535  					uncrypted_frag_size += iov_out[frag_idx][j].iov_len;
536  				}
537  				knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size);
538  				knet_h->stats.tx_crypt_packets++;
539  				pthread_mutex_unlock(&knet_h->handle_stats_mutex);
540  	
541  				iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
542  				iov_out[frag_idx][0].iov_len = outlen;
543  				frag_idx++;
544  			}
545  			iovcnt_out = 1;
546  		}
547  	
548  		memset(&msg, 0, sizeof(msg));
549  	
550  		msgs_to_send = inbuf->khp_data_frag_num;
551  	
552  		msg_idx = 0;
553  	
554  		while (msg_idx < msgs_to_send) {
555  			msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* this will set properly in _dispatch_to_links() */
556  			msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0];
557  			msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out;
558  			msg_idx++;
559  		}
560  	
561  		if (!bcast) {
562  			for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
563  				dst_host = knet_h->host_index[dst_host_ids[host_idx]];
564  	
565  				err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
566  				savederrno = errno;
567  				if (err) {
568  					goto out_unlock;
569  				}
570  			}
571  		} else {
572  			for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
573  				if (dst_host->status.reachable) {
574  					err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
575  					savederrno = errno;
576  					if (err) {
577  						goto out_unlock;
578  					}
579  				}
580  			}
581  		}
582  	
583  	out_unlock:
584  		errno = savederrno;
585  		return err;
586  	}
587  	
588  	static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type)
589  	{
590  		ssize_t inlen = 0;
591  		int savederrno = 0, docallback = 0;
592  	
593  		/*
594  		 * make sure BSD gets the right size
595  		 */
596  		msg->msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len;
597  	
598  		if ((channel >= 0) &&
599  		    (channel < KNET_DATAFD_MAX) &&
600  		    (!knet_h->sockfd[channel].is_socket)) {
601  			inlen = readv(sockfd, msg->msg_iov, 1);
602  		} else {
603  			inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL);
604  			if (msg->msg_flags & MSG_TRUNC) {
605  				log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd);
606  				return;
607  			}
608  		}
609  	
610  		if (inlen == 0) {
611  			savederrno = 0;
612  			docallback = 1;
613  		} else if (inlen < 0) {
614  			struct epoll_event ev;
615  	
616  			savederrno = errno;
617  			docallback = 1;
618  			memset(&ev, 0, sizeof(struct epoll_event));
619  	
620  			if (epoll_ctl(knet_h->send_to_links_epollfd,
621  				      EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
622  				log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
623  					knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
624  			} else {
625  				knet_h->sockfd[channel].has_error = 1;
626  			}
627  		} else {
628  			knet_h->recv_from_sock_buf->kh_type = type;
629  			_parse_recv_from_sock(knet_h, inlen, channel, 0);
630  		}
631  	
632  		if (docallback) {
633  			knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
634  					       knet_h->sockfd[channel].sockfd[0],
635  					       channel,
636  					       KNET_NOTIFY_TX,
637  					       inlen,
638  					       savederrno);
639  		}
640  	}
641  	
642  	void *_handle_send_to_links_thread(void *data)
643  	{
644  		knet_handle_t knet_h = (knet_handle_t) data;
645  		struct epoll_event events[KNET_EPOLL_MAX_EVENTS + 1]; /* see _init_epolls for + 1 */
646  		int i, nev, type;
647  		int flush, flush_queue_limit;
648  		int8_t channel;
649  		struct iovec iov_in;
650  		struct msghdr msg;
651  		struct sockaddr_storage address;
652  	
653  		set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED);
654  	
655  		memset(&events, 0, sizeof(events));
656  		memset(&iov_in, 0, sizeof(iov_in));
657  		iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
658  		iov_in.iov_len = KNET_MAX_PACKET_SIZE;
659  	
660  		memset(&msg, 0, sizeof(struct msghdr));
661  		msg.msg_name = &address;
662  		msg.msg_namelen = sizeof(struct sockaddr_storage);
663  		msg.msg_iov = &iov_in;
664  		msg.msg_iovlen = 1;
665  	
666  		knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION;
667  		knet_h->recv_from_sock_buf->khp_data_frag_seq = 0;
668  		knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id);
669  	
670  		for (i = 0; i < (int)PCKT_FRAG_MAX; i++) {
671  			knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
672  			knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
673  			knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
674  		}
675  	
676  		flush_queue_limit = 0;
677  	
678  		while (!shutdown_in_progress(knet_h)) {
679  			nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, KNET_THREADS_TIMERES / 1000);
680  	
681  			flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX);
682  	
683  			/*
684  			 * we use timeout to detect if thread is shutting down
685  			 */
686  			if (nev == 0) {
687  				/*
688  				 * ideally we want to communicate that we are done flushing
689  				 * the queue when we have an epoll timeout event
690  				 */
691  				if (flush == KNET_THREAD_QUEUE_FLUSH) {
692  					set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
693  					flush_queue_limit = 0;
694  				}
695  				continue;
696  			}
697  	
698  			/*
699  			 * fall back in case the TX sockets will continue receive traffic
700  			 * and we do not hit an epoll timeout.
701  			 *
702  			 * allow up to a 100 loops to flush queues, then we give up.
703  			 * there might be more clean ways to do it by checking the buffer queue
704  			 * on each socket, but we have tons of sockets and calculations can go wrong.
705  			 * Also, why would you disable data forwarding and still send packets?
706  			 */
707  			if (flush == KNET_THREAD_QUEUE_FLUSH) {
708  				if (flush_queue_limit >= 100) {
709  					log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss");
710  					set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
711  					flush_queue_limit = 0;
712  				} else {
713  					flush_queue_limit++;
714  				}
715  			} else {
716  				flush_queue_limit = 0;
717  			}
718  	
719  			if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
720  				log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
721  				continue;
722  			}
723  	
724  			for (i = 0; i < nev; i++) {
725  				type = KNET_HEADER_TYPE_DATA;
726  				for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
727  					if ((knet_h->sockfd[channel].in_use) &&
728  					    (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
729  						break;
730  					}
731  				}
732  				if (channel >= KNET_DATAFD_MAX) {
733  					log_debug(knet_h, KNET_SUB_TX, "No available channels");
734  					continue; /* channel not found */
735  				}
736  				if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
737  					log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
738  					continue;
739  				}
740  				_handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
741  				pthread_mutex_unlock(&knet_h->tx_mutex);
742  			}
743  	
744  			pthread_rwlock_unlock(&knet_h->global_rwlock);
745  		}
746  	
747  		set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED);
748  	
749  		return NULL;
750  	}
751  	
752  	int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
753  	{
754  		int savederrno = 0, err = 0;
755  	
(1) Event cond_false: Condition "!_is_valid_handle(knet_h)", taking false branch.
756  		if (!_is_valid_handle(knet_h)) {
757  			return -1;
(2) Event if_end: End of if statement.
758  		}
759  	
(3) Event cond_false: Condition "buff == NULL", taking false branch.
760  		if (buff == NULL) {
761  			errno = EINVAL;
762  			return -1;
(4) Event if_end: End of if statement.
763  		}
764  	
(5) Event cond_false: Condition "buff_len <= 0", taking false branch.
765  		if (buff_len <= 0) {
766  			errno = EINVAL;
767  			return -1;
(6) Event if_end: End of if statement.
768  		}
769  	
(7) Event cond_false: Condition "buff_len > 65536", taking false branch.
770  		if (buff_len > KNET_MAX_PACKET_SIZE) {
771  			errno = EINVAL;
772  			return -1;
(8) Event if_end: End of if statement.
773  		}
774  	
(9) Event cond_false: Condition "channel < 0", taking false branch.
775  		if (channel < 0) {
776  			errno = EINVAL;
777  			return -1;
(10) Event if_end: End of if statement.
778  		}
779  	
(11) Event cond_false: Condition "channel >= 32", taking false branch.
780  		if (channel >= KNET_DATAFD_MAX) {
781  			errno = EINVAL;
782  			return -1;
(12) Event if_end: End of if statement.
783  		}
784  	
(13) Event lock: "pthread_rwlock_rdlock" locks "knet_h->global_rwlock".
785  		savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
(14) Event cond_false: Condition "savederrno", taking false branch.
786  		if (savederrno) {
787  			log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
788  				strerror(savederrno));
789  			errno = savederrno;
790  			return -1;
(15) Event if_end: End of if statement.
791  		}
792  	
(16) Event cond_false: Condition "!knet_h->dst_host_filter_fn", taking false branch.
793  		if (!knet_h->dst_host_filter_fn) {
794  			savederrno = ENETDOWN;
795  			err = -1;
796  			goto out;
(17) Event if_end: End of if statement.
797  		}
798  	
(18) Event cond_false: Condition "!knet_h->sockfd[channel].in_use", taking false branch.
799  		if (!knet_h->sockfd[channel].in_use) {
800  			savederrno = EINVAL;
801  			err = -1;
802  			goto out;
(19) Event if_end: End of if statement.
803  		}
804  	
805  		savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
(20) Event cond_false: Condition "savederrno", taking false branch.
806  		if (savederrno) {
807  			log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
808  				strerror(savederrno));
809  			err = -1;
810  			goto out;
(21) Event if_end: End of if statement.
811  		}
812  	
813  		knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
814  		memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
815  		err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
816  		savederrno = errno;
817  	
818  		pthread_mutex_unlock(&knet_h->tx_mutex);
819  	
820  	out:
821  		pthread_rwlock_unlock(&knet_h->global_rwlock);
822  	
(22) Event cond_false: Condition "err", taking false branch.
823  		errno = err ? savederrno : 0;
824  		return err;
825  	}
826  	
827  	ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
828  	{
829  		int savederrno = 0;
830  		ssize_t err = 0;
831  		struct iovec iov_out[1];
832  	
833  		if (!_is_valid_handle(knet_h)) {
834  			return -1;
835  		}
836  	
837  		if (buff == NULL) {
838  			errno = EINVAL;
839  			return -1;
840  		}
841  	
842  		if (buff_len <= 0) {
843  			errno = EINVAL;
844  			return -1;
845  		}
846  	
847  		if (buff_len > KNET_MAX_PACKET_SIZE) {
848  			errno = EINVAL;
849  			return -1;
850  		}
851  	
852  		if (channel < 0) {
853  			errno = EINVAL;
854  			return -1;
855  		}
856  	
857  		if (channel >= KNET_DATAFD_MAX) {
858  			errno = EINVAL;
859  			return -1;
860  		}
861  	
862  		savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
863  		if (savederrno) {
864  			log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
865  				strerror(savederrno));
866  			errno = savederrno;
867  			return -1;
868  		}
869  	
870  		if (!knet_h->sockfd[channel].in_use) {
871  			savederrno = EINVAL;
872  			err = -1;
873  			goto out_unlock;
874  		}
875  	
876  		memset(iov_out, 0, sizeof(iov_out));
877  	
878  		iov_out[0].iov_base = (void *)buff;
879  		iov_out[0].iov_len = buff_len;
880  	
881  		err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
882  		savederrno = errno;
883  	
884  	out_unlock:
885  		pthread_rwlock_unlock(&knet_h->global_rwlock);
886  		errno = err ? savederrno : 0;
887  		return err;
888  	}
889