1 /*
2 * Copyright (C) 2012-2025 Red Hat, Inc. All rights reserved.
3 *
4 * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
5 * Federico Simoncelli <fsimon@kronosnet.org>
6 *
7 * This software licensed under LGPL-2.0+
8 */
9
10 #include "config.h"
11
12 #include <math.h>
13 #include <string.h>
14 #include <pthread.h>
15 #include <unistd.h>
16 #include <sys/uio.h>
17 #include <errno.h>
18
19 #include "compat.h"
20 #include "compress.h"
21 #include "crypto.h"
22 #include "host.h"
23 #include "link.h"
24 #include "logging.h"
25 #include "transports.h"
26 #include "transport_common.h"
27 #include "threads_common.h"
28 #include "threads_heartbeat.h"
29 #include "threads_tx.h"
30 #include "netutils.h"
31
32 /*
33 * SEND
34 */
35
36 static int _dispatch_to_links(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_mmsghdr *msg, int msgs_to_send)
37 {
38 int link_idx, msg_idx, sent_msgs, prev_sent, progress;
39 int err = 0, savederrno = 0, locked = 0;
40 unsigned int i;
41 struct knet_mmsghdr *cur;
42 struct knet_link *cur_link;
43
44 for (link_idx = 0; link_idx < dst_host->active_link_entries; link_idx++) {
45 prev_sent = 0;
46 progress = 1;
47 locked = 0;
48
49 cur_link = &dst_host->link[dst_host->active_links[link_idx]];
50
51 if (cur_link->transport == KNET_TRANSPORT_LOOPBACK) {
52 continue;
53 }
54
55 savederrno = pthread_mutex_lock(&cur_link->link_stats_mutex);
56 if (savederrno) {
57 log_err(knet_h, KNET_SUB_TX, "Unable to get stats mutex lock for host %u link %u: %s",
58 dst_host->host_id, cur_link->link_id, strerror(savederrno));
59 continue;
60 }
61 locked = 1;
62
63 msg_idx = 0;
64 while (msg_idx < msgs_to_send) {
65 msg[msg_idx].msg_hdr.msg_name = &cur_link->dst_addr;
66 msg[msg_idx].msg_hdr.msg_namelen = knet_h->knet_transport_fd_tracker[cur_link->outsock].sockaddr_len;
67
68 /* Cast for Linux/BSD compatibility */
69 for (i=0; i<(unsigned int)msg[msg_idx].msg_hdr.msg_iovlen; i++) {
70 cur_link->status.stats.tx_data_bytes += msg[msg_idx].msg_hdr.msg_iov[i].iov_len;
71 }
72 cur_link->status.stats.tx_data_packets++;
73 msg_idx++;
74 }
75
76 retry:
77 cur = &msg[prev_sent];
78
79 sent_msgs = _sendmmsg(dst_host->link[dst_host->active_links[link_idx]].outsock,
80 transport_get_connection_oriented(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport),
81 &cur[0], msgs_to_send - prev_sent, MSG_DONTWAIT | MSG_NOSIGNAL);
82 savederrno = errno;
83
84 err = transport_tx_sock_error(knet_h, dst_host->link[dst_host->active_links[link_idx]].transport, dst_host->link[dst_host->active_links[link_idx]].outsock, KNET_SUB_TX, sent_msgs, savederrno);
85 switch(err) {
86 case -1: /* unrecoverable error */
87 cur_link->status.stats.tx_data_errors++;
88 goto out_unlock;
89 break;
90 case 0: /* ignore error and continue */
91 break;
92 case 1: /* retry to send those same data */
93 cur_link->status.stats.tx_data_retries++;
94 goto retry;
95 break;
96 }
97
98 prev_sent = prev_sent + sent_msgs;
99
100 if ((sent_msgs >= 0) && (prev_sent < msgs_to_send)) {
101 if ((sent_msgs) || (progress)) {
102 if (sent_msgs) {
103 progress = 1;
104 } else {
105 progress = 0;
106 }
107 log_trace(knet_h, KNET_SUB_TX, "Unable to send all (%d/%d) data packets to host %s (%u) link %s:%s (%u)",
108 sent_msgs, msg_idx,
109 dst_host->name, dst_host->host_id,
110 dst_host->link[dst_host->active_links[link_idx]].status.dst_ipaddr,
111 dst_host->link[dst_host->active_links[link_idx]].status.dst_port,
112 dst_host->link[dst_host->active_links[link_idx]].link_id);
113 goto retry;
114 }
115 if (!progress) {
116 savederrno = EAGAIN;
117 err = -1;
118 goto out_unlock;
119 }
120 }
121
122 if ((dst_host->link_handler_policy == KNET_LINK_POLICY_RR) &&
123 (dst_host->active_link_entries > 1)) {
124 uint8_t cur_link_id = dst_host->active_links[0];
125
126 memmove(&dst_host->active_links[0], &dst_host->active_links[1], KNET_MAX_LINK - 1);
127 dst_host->active_links[dst_host->active_link_entries - 1] = cur_link_id;
128
129 break;
130 }
131 pthread_mutex_unlock(&cur_link->link_stats_mutex);
132 locked = 0;
133 }
134
135 out_unlock:
136 if (locked) {
137 pthread_mutex_unlock(&cur_link->link_stats_mutex);
138 }
139 errno = savederrno;
140 return err;
141 }
142
143 static int _parse_recv_from_sock(knet_handle_t knet_h, size_t inlen, int8_t channel, int is_sync)
144 {
145 size_t outlen, frag_len;
146 struct knet_host *dst_host;
147 knet_node_id_t dst_host_ids_temp[KNET_MAX_HOST];
148 size_t dst_host_ids_entries_temp = 0;
149 knet_node_id_t dst_host_ids[KNET_MAX_HOST];
150 size_t dst_host_ids_entries = 0;
151 int bcast = 1;
152 struct iovec iov_out[PCKT_FRAG_MAX][2];
153 int iovcnt_out = 2;
154 uint8_t frag_idx;
155 unsigned int temp_data_mtu;
156 size_t host_idx;
157 int send_mcast = 0;
158 struct knet_header *inbuf;
159 int savederrno = 0;
160 int err = 0;
161 seq_num_t tx_seq_num;
162 struct knet_mmsghdr msg[PCKT_FRAG_MAX];
163 int msgs_to_send, msg_idx;
164 unsigned int i;
165 int j;
166 int send_local = 0;
167 int data_compressed = 0;
168 size_t uncrypted_frag_size;
169 int stats_locked = 0, stats_err = 0;
170
171 inbuf = knet_h->recv_from_sock_buf;
172
173 if (knet_h->enabled != 1) {
174 log_debug(knet_h, KNET_SUB_TX, "Received data packet but forwarding is disabled");
175 savederrno = ECANCELED;
176 err = -1;
177 goto out_unlock;
178 }
179
180 /*
181 * move this into a separate function to expand on
182 * extra switching rules
183 */
184 switch(inbuf->kh_type) {
185 case KNET_HEADER_TYPE_DATA:
186 if (knet_h->dst_host_filter_fn) {
187 bcast = knet_h->dst_host_filter_fn(
188 knet_h->dst_host_filter_fn_private_data,
189 (const unsigned char *)inbuf->khp_data_userdata,
190 inlen,
191 KNET_NOTIFY_TX,
192 knet_h->host_id,
193 knet_h->host_id,
194 &channel,
195 dst_host_ids_temp,
196 &dst_host_ids_entries_temp);
197 if (bcast < 0) {
198 log_debug(knet_h, KNET_SUB_TX, "Error from dst_host_filter_fn: %d", bcast);
199 savederrno = EFAULT;
200 err = -1;
201 goto out_unlock;
202 }
203
204 if ((!bcast) && (!dst_host_ids_entries_temp)) {
205 log_debug(knet_h, KNET_SUB_TX, "Message is unicast but no dst_host_ids_entries");
206 savederrno = EINVAL;
207 err = -1;
208 goto out_unlock;
209 }
210
211 if ((!bcast) &&
212 (dst_host_ids_entries_temp > KNET_MAX_HOST)) {
213 log_debug(knet_h, KNET_SUB_TX, "dst_host_filter_fn returned too many destinations");
214 savederrno = EINVAL;
215 err = -1;
216 goto out_unlock;
217 }
218 }
219
220 /* Send to localhost if appropriate and enabled */
221 if (knet_h->has_loop_link) {
222 send_local = 0;
223 if (bcast) {
224 send_local = 1;
225 } else {
226 for (i=0; i< dst_host_ids_entries_temp; i++) {
227 if (dst_host_ids_temp[i] == knet_h->host_id) {
228 send_local = 1;
229 }
230 }
231 }
232 if (send_local) {
233 const unsigned char *buf = inbuf->khp_data_userdata;
234 ssize_t buflen = inlen;
235 struct knet_link *local_link;
236
237 local_link = knet_h->host_index[knet_h->host_id]->link;
238
239 local_retry:
240 err = write(knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], buf, buflen);
241 if (err < 0) {
242 log_err(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local failed. error=%s\n", strerror(errno));
243 local_link->status.stats.tx_data_errors++;
244 }
245 if (err > 0 && err < buflen) {
246 log_debug(knet_h, KNET_SUB_TRANSP_LOOPBACK, "send local incomplete=%d bytes of %zu\n", err, inlen);
247 local_link->status.stats.tx_data_retries++;
248 buf += err;
249 buflen -= err;
250 goto local_retry;
251 }
252 if (err == buflen) {
253 local_link->status.stats.tx_data_packets++;
254 local_link->status.stats.tx_data_bytes += inlen;
255 }
256 }
257 }
258 break;
259 default:
260 log_warn(knet_h, KNET_SUB_TX, "Receiving unknown messages from socket");
261 savederrno = ENOMSG;
262 err = -1;
263 goto out_unlock;
264 break;
265 }
266
267 if (is_sync) {
268 if ((bcast) ||
269 ((!bcast) && (dst_host_ids_entries_temp > 1))) {
270 log_debug(knet_h, KNET_SUB_TX, "knet_send_sync is only supported with unicast packets for one destination");
271 savederrno = E2BIG;
272 err = -1;
273 goto out_unlock;
274 }
275 }
276
277 /*
278 * check destinations hosts before spending time
279 * in fragmenting/encrypting packets to save
280 * time processing data for unreachable hosts.
281 * for unicast, also remap the destination data
282 * to skip unreachable hosts.
283 */
284
285 if (!bcast) {
286 dst_host_ids_entries = 0;
287 for (host_idx = 0; host_idx < dst_host_ids_entries_temp; host_idx++) {
288 dst_host = knet_h->host_index[dst_host_ids_temp[host_idx]];
289 if (!dst_host) {
290 continue;
291 }
292 if (!(dst_host->host_id == knet_h->host_id &&
293 knet_h->has_loop_link) &&
294 dst_host->status.reachable) {
295 dst_host_ids[dst_host_ids_entries] = dst_host_ids_temp[host_idx];
296 dst_host_ids_entries++;
297 }
298 }
299 if (!dst_host_ids_entries) {
300 savederrno = EHOSTDOWN;
301 err = -1;
302 goto out_unlock;
303 }
304 } else {
305 send_mcast = 0;
306 for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
307 if (!(dst_host->host_id == knet_h->host_id &&
308 knet_h->has_loop_link) &&
309 dst_host->status.reachable) {
310 send_mcast = 1;
311 break;
312 }
313 }
314 if (!send_mcast) {
315 savederrno = EHOSTDOWN;
316 err = -1;
317 goto out_unlock;
318 }
319 }
320
321 if (!knet_h->data_mtu) {
322 /*
323 * using MIN_MTU_V4 for data mtu is not completely accurate but safe enough
324 */
325 log_debug(knet_h, KNET_SUB_TX,
326 "Received data packet but data MTU is still unknown."
327 " Packet might not be delivered."
328 " Assuming minimum IPv4 MTU (%d)",
329 KNET_PMTUD_MIN_MTU_V4);
330 temp_data_mtu = KNET_PMTUD_MIN_MTU_V4;
331 } else {
332 /*
333 * take a copy of the mtu to avoid value changing under
334 * our feet while we are sending a fragmented pckt
335 */
336 temp_data_mtu = knet_h->data_mtu;
337 }
338
339 /*
340 * compress data
341 */
342 if ((knet_h->compress_model > 0) && (inlen > knet_h->compress_threshold)) {
343 size_t cmp_outlen = KNET_DATABUFSIZE_COMPRESS;
344 struct timespec start_time;
345 struct timespec end_time;
346 uint64_t compress_time;
347
348 clock_gettime(CLOCK_MONOTONIC, &start_time);
349 err = compress(knet_h,
350 (const unsigned char *)inbuf->khp_data_userdata, inlen,
351 knet_h->send_to_links_buf_compress, (ssize_t *)&cmp_outlen);
352
353 savederrno = errno;
354
355 stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
356 if (stats_err < 0) {
357 log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
358 err = -1;
359 savederrno = stats_err;
360 goto out_unlock;
361 }
362 stats_locked = 1;
363 /* Collect stats */
364 clock_gettime(CLOCK_MONOTONIC, &end_time);
365 timespec_diff(start_time, end_time, &compress_time);
366
367 if (compress_time < knet_h->stats.tx_compress_time_min) {
368 knet_h->stats.tx_compress_time_min = compress_time;
369 }
370 if (compress_time > knet_h->stats.tx_compress_time_max) {
371 knet_h->stats.tx_compress_time_max = compress_time;
372 }
373 knet_h->stats.tx_compress_time_ave =
374 (unsigned long long)(knet_h->stats.tx_compress_time_ave * knet_h->stats.tx_compressed_packets +
375 compress_time) / (knet_h->stats.tx_compressed_packets+1);
376 if (err < 0) {
377 log_warn(knet_h, KNET_SUB_COMPRESS, "Compression failed (%d): %s", err, strerror(savederrno));
378 } else {
379 knet_h->stats.tx_compressed_packets++;
380 knet_h->stats.tx_compressed_original_bytes += inlen;
381 knet_h->stats.tx_compressed_size_bytes += cmp_outlen;
382
383 if (cmp_outlen < inlen) {
384 memmove(inbuf->khp_data_userdata, knet_h->send_to_links_buf_compress, cmp_outlen);
385 inlen = cmp_outlen;
386 data_compressed = 1;
387 }
388 }
389 }
390 if (!stats_locked) {
391 stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
392 if (stats_err < 0) {
393 log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
394 err = -1;
395 savederrno = stats_err;
396 goto out_unlock;
397 }
398 }
399 if (knet_h->compress_model > 0 && !data_compressed) {
400 knet_h->stats.tx_uncompressed_packets++;
401 }
402 pthread_mutex_unlock(&knet_h->handle_stats_mutex);
403 stats_locked = 0;
404
405 /*
406 * prepare the outgoing buffers
407 */
408
409 frag_len = inlen;
410 frag_idx = 0;
411
412 inbuf->khp_data_bcast = bcast;
413 inbuf->khp_data_frag_num = ceil((float)inlen / temp_data_mtu);
414 inbuf->khp_data_channel = channel;
415 if (data_compressed) {
416 inbuf->khp_data_compress = knet_h->compress_model;
417 } else {
418 inbuf->khp_data_compress = 0;
419 }
420
421 if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
422 log_debug(knet_h, KNET_SUB_TX, "Unable to get seq mutex lock");
423 goto out_unlock;
424 }
425 knet_h->tx_seq_num++;
426 /*
427 * force seq_num 0 to detect a node that has crashed and rejoining
428 * the knet instance. seq_num 0 will clear the buffers in the RX
429 * thread
430 */
431 if (knet_h->tx_seq_num == 0) {
432 knet_h->tx_seq_num++;
433 }
434 /*
435 * cache the value in locked context
436 */
437 tx_seq_num = knet_h->tx_seq_num;
438 inbuf->khp_data_seq_num = htons(knet_h->tx_seq_num);
439 pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
440
441 /*
442 * forcefully broadcast a ping to all nodes every SEQ_MAX / 8
443 * pckts.
444 * this solves 2 problems:
445 * 1) on TX socket overloads we generate extra pings to keep links alive
446 * 2) in 3+ nodes setup, where all the traffic is flowing between node 1 and 2,
447 * node 3+ will be able to keep in sync on the TX seq_num even without
448 * receiving traffic or pings in betweens. This avoids issues with
449 * rollover of the circular buffer
450 */
451
452 if (tx_seq_num % (SEQ_MAX / 8) == 0) {
453 _send_pings(knet_h, 0);
454 }
455
456 if (inbuf->khp_data_frag_num > 1) {
457 while (frag_idx < inbuf->khp_data_frag_num) {
458 /*
459 * set the iov_base
460 */
461 iov_out[frag_idx][0].iov_base = (void *)knet_h->send_to_links_buf[frag_idx];
462 iov_out[frag_idx][0].iov_len = KNET_HEADER_DATA_SIZE;
463 iov_out[frag_idx][1].iov_base = inbuf->khp_data_userdata + (temp_data_mtu * frag_idx);
464
465 /*
466 * set the len
467 */
468 if (frag_len > temp_data_mtu) {
469 iov_out[frag_idx][1].iov_len = temp_data_mtu;
470 } else {
471 iov_out[frag_idx][1].iov_len = frag_len;
472 }
473
474 /*
475 * copy the frag info on all buffers
476 */
477 knet_h->send_to_links_buf[frag_idx]->kh_type = inbuf->kh_type;
478 knet_h->send_to_links_buf[frag_idx]->khp_data_seq_num = inbuf->khp_data_seq_num;
479 knet_h->send_to_links_buf[frag_idx]->khp_data_frag_num = inbuf->khp_data_frag_num;
480 knet_h->send_to_links_buf[frag_idx]->khp_data_bcast = inbuf->khp_data_bcast;
481 knet_h->send_to_links_buf[frag_idx]->khp_data_channel = inbuf->khp_data_channel;
482 knet_h->send_to_links_buf[frag_idx]->khp_data_compress = inbuf->khp_data_compress;
483
484 frag_len = frag_len - temp_data_mtu;
485 frag_idx++;
486 }
487 iovcnt_out = 2;
488 } else {
489 iov_out[frag_idx][0].iov_base = (void *)inbuf;
490 iov_out[frag_idx][0].iov_len = frag_len + KNET_HEADER_DATA_SIZE;
491 iovcnt_out = 1;
492 }
493
494 if (knet_h->crypto_in_use_config) {
495 struct timespec start_time;
496 struct timespec end_time;
497 uint64_t crypt_time;
498
499 frag_idx = 0;
500 while (frag_idx < inbuf->khp_data_frag_num) {
501 clock_gettime(CLOCK_MONOTONIC, &start_time);
502 if (crypto_encrypt_and_signv(
503 knet_h,
504 iov_out[frag_idx], iovcnt_out,
505 knet_h->send_to_links_buf_crypt[frag_idx],
506 (ssize_t *)&outlen) < 0) {
507 log_debug(knet_h, KNET_SUB_TX, "Unable to encrypt packet");
508 savederrno = ECHILD;
509 err = -1;
510 goto out_unlock;
511 }
512 clock_gettime(CLOCK_MONOTONIC, &end_time);
513 timespec_diff(start_time, end_time, &crypt_time);
514
515 stats_err = pthread_mutex_lock(&knet_h->handle_stats_mutex);
516 if (stats_err < 0) {
517 log_err(knet_h, KNET_SUB_TX, "Unable to get mutex lock: %s", strerror(stats_err));
518 err = -1;
519 savederrno = stats_err;
520 goto out_unlock;
521 }
522
523 if (crypt_time < knet_h->stats.tx_crypt_time_min) {
524 knet_h->stats.tx_crypt_time_min = crypt_time;
525 }
526 if (crypt_time > knet_h->stats.tx_crypt_time_max) {
527 knet_h->stats.tx_crypt_time_max = crypt_time;
528 }
529 knet_h->stats.tx_crypt_time_ave =
530 (knet_h->stats.tx_crypt_time_ave * knet_h->stats.tx_crypt_packets +
531 crypt_time) / (knet_h->stats.tx_crypt_packets+1);
532
533 uncrypted_frag_size = 0;
534 for (j=0; j < iovcnt_out; j++) {
535 uncrypted_frag_size += iov_out[frag_idx][j].iov_len;
536 }
537 knet_h->stats.tx_crypt_byte_overhead += (outlen - uncrypted_frag_size);
538 knet_h->stats.tx_crypt_packets++;
539 pthread_mutex_unlock(&knet_h->handle_stats_mutex);
540
541 iov_out[frag_idx][0].iov_base = knet_h->send_to_links_buf_crypt[frag_idx];
542 iov_out[frag_idx][0].iov_len = outlen;
543 frag_idx++;
544 }
545 iovcnt_out = 1;
546 }
547
548 memset(&msg, 0, sizeof(msg));
549
550 msgs_to_send = inbuf->khp_data_frag_num;
551
552 msg_idx = 0;
553
554 while (msg_idx < msgs_to_send) {
555 msg[msg_idx].msg_hdr.msg_namelen = sizeof(struct sockaddr_storage); /* this will set properly in _dispatch_to_links() */
556 msg[msg_idx].msg_hdr.msg_iov = &iov_out[msg_idx][0];
557 msg[msg_idx].msg_hdr.msg_iovlen = iovcnt_out;
558 msg_idx++;
559 }
560
561 if (!bcast) {
562 for (host_idx = 0; host_idx < dst_host_ids_entries; host_idx++) {
563 dst_host = knet_h->host_index[dst_host_ids[host_idx]];
564
565 err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
566 savederrno = errno;
567 if (err) {
568 goto out_unlock;
569 }
570 }
571 } else {
572 for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
573 if (dst_host->status.reachable) {
574 err = _dispatch_to_links(knet_h, dst_host, &msg[0], msgs_to_send);
575 savederrno = errno;
576 if (err) {
577 goto out_unlock;
578 }
579 }
580 }
581 }
582
583 out_unlock:
584 errno = savederrno;
585 return err;
586 }
587
588 static void _handle_send_to_links(knet_handle_t knet_h, struct msghdr *msg, int sockfd, int8_t channel, int type)
589 {
590 ssize_t inlen = 0;
591 int savederrno = 0, docallback = 0;
592
593 /*
594 * make sure BSD gets the right size
595 */
596 msg->msg_namelen = knet_h->knet_transport_fd_tracker[sockfd].sockaddr_len;
597
598 if ((channel >= 0) &&
599 (channel < KNET_DATAFD_MAX) &&
600 (!knet_h->sockfd[channel].is_socket)) {
601 inlen = readv(sockfd, msg->msg_iov, 1);
602 } else {
603 inlen = recvmsg(sockfd, msg, MSG_DONTWAIT | MSG_NOSIGNAL);
604 if (msg->msg_flags & MSG_TRUNC) {
605 log_warn(knet_h, KNET_SUB_TX, "Received truncated message from sock %d. Discarding", sockfd);
606 return;
607 }
608 }
609
610 if (inlen == 0) {
611 savederrno = 0;
612 docallback = 1;
613 } else if (inlen < 0) {
614 struct epoll_event ev;
615
616 savederrno = errno;
617 docallback = 1;
618 memset(&ev, 0, sizeof(struct epoll_event));
619
620 if (epoll_ctl(knet_h->send_to_links_epollfd,
621 EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
622 log_err(knet_h, KNET_SUB_TX, "Unable to del datafd %d from linkfd epoll pool: %s",
623 knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
624 } else {
625 knet_h->sockfd[channel].has_error = 1;
626 }
627 } else {
628 knet_h->recv_from_sock_buf->kh_type = type;
629 _parse_recv_from_sock(knet_h, inlen, channel, 0);
630 }
631
632 if (docallback) {
633 knet_h->sock_notify_fn(knet_h->sock_notify_fn_private_data,
634 knet_h->sockfd[channel].sockfd[0],
635 channel,
636 KNET_NOTIFY_TX,
637 inlen,
638 savederrno);
639 }
640 }
641
642 void *_handle_send_to_links_thread(void *data)
643 {
644 knet_handle_t knet_h = (knet_handle_t) data;
645 struct epoll_event events[KNET_EPOLL_MAX_EVENTS + 1]; /* see _init_epolls for + 1 */
646 int i, nev, type;
647 int flush, flush_queue_limit;
648 int8_t channel;
649 struct iovec iov_in;
650 struct msghdr msg;
651 struct sockaddr_storage address;
652
653 set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STARTED);
654
655 memset(&events, 0, sizeof(events));
656 memset(&iov_in, 0, sizeof(iov_in));
657 iov_in.iov_base = (void *)knet_h->recv_from_sock_buf->khp_data_userdata;
658 iov_in.iov_len = KNET_MAX_PACKET_SIZE;
659
660 memset(&msg, 0, sizeof(struct msghdr));
661 msg.msg_name = &address;
662 msg.msg_namelen = sizeof(struct sockaddr_storage);
663 msg.msg_iov = &iov_in;
664 msg.msg_iovlen = 1;
665
666 knet_h->recv_from_sock_buf->kh_version = KNET_HEADER_VERSION;
667 knet_h->recv_from_sock_buf->khp_data_frag_seq = 0;
668 knet_h->recv_from_sock_buf->kh_node = htons(knet_h->host_id);
669
670 for (i = 0; i < (int)PCKT_FRAG_MAX; i++) {
671 knet_h->send_to_links_buf[i]->kh_version = KNET_HEADER_VERSION;
672 knet_h->send_to_links_buf[i]->khp_data_frag_seq = i + 1;
673 knet_h->send_to_links_buf[i]->kh_node = htons(knet_h->host_id);
674 }
675
676 flush_queue_limit = 0;
677
678 while (!shutdown_in_progress(knet_h)) {
679 nev = epoll_wait(knet_h->send_to_links_epollfd, events, KNET_EPOLL_MAX_EVENTS + 1, KNET_THREADS_TIMERES / 1000);
680
681 flush = get_thread_flush_queue(knet_h, KNET_THREAD_TX);
682
683 /*
684 * we use timeout to detect if thread is shutting down
685 */
686 if (nev == 0) {
687 /*
688 * ideally we want to communicate that we are done flushing
689 * the queue when we have an epoll timeout event
690 */
691 if (flush == KNET_THREAD_QUEUE_FLUSH) {
692 set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
693 flush_queue_limit = 0;
694 }
695 continue;
696 }
697
698 /*
699 * fall back in case the TX sockets will continue receive traffic
700 * and we do not hit an epoll timeout.
701 *
702 * allow up to a 100 loops to flush queues, then we give up.
703 * there might be more clean ways to do it by checking the buffer queue
704 * on each socket, but we have tons of sockets and calculations can go wrong.
705 * Also, why would you disable data forwarding and still send packets?
706 */
707 if (flush == KNET_THREAD_QUEUE_FLUSH) {
708 if (flush_queue_limit >= 100) {
709 log_debug(knet_h, KNET_SUB_TX, "Timeout flushing the TX queue, expect packet loss");
710 set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSHED);
711 flush_queue_limit = 0;
712 } else {
713 flush_queue_limit++;
714 }
715 } else {
716 flush_queue_limit = 0;
717 }
718
719 if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
720 log_debug(knet_h, KNET_SUB_TX, "Unable to get read lock");
721 continue;
722 }
723
724 for (i = 0; i < nev; i++) {
725 type = KNET_HEADER_TYPE_DATA;
726 for (channel = 0; channel < KNET_DATAFD_MAX; channel++) {
727 if ((knet_h->sockfd[channel].in_use) &&
728 (knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created] == events[i].data.fd)) {
729 break;
730 }
731 }
732 if (channel >= KNET_DATAFD_MAX) {
733 log_debug(knet_h, KNET_SUB_TX, "No available channels");
734 continue; /* channel not found */
735 }
736 if (pthread_mutex_lock(&knet_h->tx_mutex) != 0) {
737 log_debug(knet_h, KNET_SUB_TX, "Unable to get mutex lock");
738 continue;
739 }
740 _handle_send_to_links(knet_h, &msg, events[i].data.fd, channel, type);
741 pthread_mutex_unlock(&knet_h->tx_mutex);
742 }
743
744 pthread_rwlock_unlock(&knet_h->global_rwlock);
745 }
746
747 set_thread_status(knet_h, KNET_THREAD_TX, KNET_THREAD_STOPPED);
748
749 return NULL;
750 }
751
752 int knet_send_sync(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
753 {
754 int savederrno = 0, err = 0;
755
|
(1) Event cond_false: |
Condition "!_is_valid_handle(knet_h)", taking false branch. |
|
(1) Event cond_false: |
Condition "!_is_valid_handle(knet_h)", taking false branch. |
756 if (!_is_valid_handle(knet_h)) {
757 return -1;
|
(2) Event if_end: |
End of if statement. |
|
(2) Event if_end: |
End of if statement. |
758 }
759
|
(3) Event cond_false: |
Condition "buff == NULL", taking false branch. |
|
(3) Event cond_false: |
Condition "buff == NULL", taking false branch. |
760 if (buff == NULL) {
761 errno = EINVAL;
762 return -1;
|
(4) Event if_end: |
End of if statement. |
|
(4) Event if_end: |
End of if statement. |
763 }
764
|
(5) Event cond_false: |
Condition "buff_len <= 0", taking false branch. |
|
(5) Event cond_false: |
Condition "buff_len <= 0", taking false branch. |
765 if (buff_len <= 0) {
766 errno = EINVAL;
767 return -1;
|
(6) Event if_end: |
End of if statement. |
|
(6) Event if_end: |
End of if statement. |
768 }
769
|
(7) Event cond_false: |
Condition "buff_len > 65536", taking false branch. |
|
(7) Event cond_false: |
Condition "buff_len > 65536", taking false branch. |
770 if (buff_len > KNET_MAX_PACKET_SIZE) {
771 errno = EINVAL;
772 return -1;
|
(8) Event if_end: |
End of if statement. |
|
(8) Event if_end: |
End of if statement. |
773 }
774
|
(9) Event cond_false: |
Condition "channel < 0", taking false branch. |
|
(9) Event cond_false: |
Condition "channel < 0", taking false branch. |
775 if (channel < 0) {
776 errno = EINVAL;
777 return -1;
|
(10) Event if_end: |
End of if statement. |
|
(10) Event if_end: |
End of if statement. |
778 }
779
|
(11) Event cond_false: |
Condition "channel >= 32", taking false branch. |
|
(11) Event cond_false: |
Condition "channel >= 32", taking false branch. |
780 if (channel >= KNET_DATAFD_MAX) {
781 errno = EINVAL;
782 return -1;
|
(12) Event if_end: |
End of if statement. |
|
(12) Event if_end: |
End of if statement. |
783 }
784
785 savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
|
(13) Event cond_false: |
Condition "savederrno", taking false branch. |
|
(14) Event cond_false: |
Condition "savederrno", taking false branch. |
786 if (savederrno) {
787 log_err(knet_h, KNET_SUB_TX, "Unable to get read lock: %s",
788 strerror(savederrno));
789 errno = savederrno;
790 return -1;
|
(14) Event if_end: |
End of if statement. |
|
(15) Event if_end: |
End of if statement. |
791 }
792
|
(15) Event cond_false: |
Condition "!knet_h->dst_host_filter_fn", taking false branch. |
|
(16) Event cond_true: |
Condition "!knet_h->dst_host_filter_fn", taking true branch. |
793 if (!knet_h->dst_host_filter_fn) {
794 savederrno = ENETDOWN;
795 err = -1;
|
(17) Event goto: |
Jumping to label "out". |
796 goto out;
|
(16) Event if_end: |
End of if statement. |
797 }
798
|
(17) Event cond_false: |
Condition "!knet_h->sockfd[channel].in_use", taking false branch. |
799 if (!knet_h->sockfd[channel].in_use) {
800 savederrno = EINVAL;
801 err = -1;
802 goto out;
|
(18) Event if_end: |
End of if statement. |
803 }
804
805 savederrno = pthread_mutex_lock(&knet_h->tx_mutex);
|
(20) Event cond_false: |
Condition "savederrno", taking false branch. |
806 if (savederrno) {
807 log_err(knet_h, KNET_SUB_TX, "Unable to get TX mutex lock: %s",
808 strerror(savederrno));
809 err = -1;
810 goto out;
|
(21) Event if_end: |
End of if statement. |
811 }
812
813 knet_h->recv_from_sock_buf->kh_type = KNET_HEADER_TYPE_DATA;
814 memmove(knet_h->recv_from_sock_buf->khp_data_userdata, buff, buff_len);
815 err = _parse_recv_from_sock(knet_h, buff_len, channel, 1);
816 savederrno = errno;
817
818 pthread_mutex_unlock(&knet_h->tx_mutex);
819
|
(18) Event label: |
Reached label "out". |
820 out:
821 pthread_rwlock_unlock(&knet_h->global_rwlock);
822
|
(22) Event cond_false: |
Condition "err", taking false branch. |
|
(19) Event cond_true: |
Condition "err", taking true branch. |
823 errno = err ? savederrno : 0;
824 return err;
825 }
826
827 ssize_t knet_send(knet_handle_t knet_h, const char *buff, const size_t buff_len, const int8_t channel)
828 {
829 int savederrno = 0;
830 ssize_t err = 0;
831 struct iovec iov_out[1];
832
833 if (!_is_valid_handle(knet_h)) {
834 return -1;
835 }
836
837 if (buff == NULL) {
838 errno = EINVAL;
839 return -1;
840 }
841
842 if (buff_len <= 0) {
843 errno = EINVAL;
844 return -1;
845 }
846
847 if (buff_len > KNET_MAX_PACKET_SIZE) {
848 errno = EINVAL;
849 return -1;
850 }
851
852 if (channel < 0) {
853 errno = EINVAL;
854 return -1;
855 }
856
857 if (channel >= KNET_DATAFD_MAX) {
858 errno = EINVAL;
859 return -1;
860 }
861
862 savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
863 if (savederrno) {
864 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
865 strerror(savederrno));
866 errno = savederrno;
867 return -1;
868 }
869
870 if (!knet_h->sockfd[channel].in_use) {
871 savederrno = EINVAL;
872 err = -1;
873 goto out_unlock;
874 }
875
876 memset(iov_out, 0, sizeof(iov_out));
877
878 iov_out[0].iov_base = (void *)buff;
879 iov_out[0].iov_len = buff_len;
880
881 err = writev(knet_h->sockfd[channel].sockfd[0], iov_out, 1);
882 savederrno = errno;
883
884 out_unlock:
885 pthread_rwlock_unlock(&knet_h->global_rwlock);
886 errno = err ? savederrno : 0;
887 return err;
888 }
889