1 /*
2 * Copyright (C) 2020-2025 Red Hat, Inc. All rights reserved.
3 *
4 * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
5 * Federico Simoncelli <fsimon@kronosnet.org>
6 *
7 * This software licensed under LGPL-2.0+
8 */
9
10 #include "config.h"
11
12 #include <stdlib.h>
13 #include <string.h>
14 #include <unistd.h>
15 #include <errno.h>
16 #include <pthread.h>
17 #include <sys/uio.h>
18
19 #include "internals.h"
20 #include "crypto.h"
21 #include "links.h"
22 #include "common.h"
23 #include "transport_common.h"
24 #include "logging.h"
25
26 int knet_handle_enable_sock_notify(knet_handle_t knet_h,
27 void *sock_notify_fn_private_data,
28 void (*sock_notify_fn) (
29 void *private_data,
30 int datafd,
31 int8_t channel,
32 uint8_t tx_rx,
33 int error,
34 int errorno))
35 {
36 int savederrno = 0;
37
38 if (!_is_valid_handle(knet_h)) {
39 return -1;
40 }
41
42 if (!sock_notify_fn) {
43 errno = EINVAL;
44 return -1;
45 }
46
47 savederrno = get_global_wrlock(knet_h);
48 if (savederrno) {
49 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
50 strerror(savederrno));
51 errno = savederrno;
52 return -1;
53 }
54
55 knet_h->sock_notify_fn_private_data = sock_notify_fn_private_data;
56 knet_h->sock_notify_fn = sock_notify_fn;
57 log_debug(knet_h, KNET_SUB_HANDLE, "sock_notify_fn enabled");
58
59 pthread_rwlock_unlock(&knet_h->global_rwlock);
60
61 return 0;
62 }
63
64 int knet_handle_add_datafd(knet_handle_t knet_h, int *datafd, int8_t *channel)
65 {
66 int err = 0, savederrno = 0;
67 int i;
68 struct epoll_event ev;
69
70 if (!_is_valid_handle(knet_h)) {
71 return -1;
72 }
73
74 if (datafd == NULL) {
75 errno = EINVAL;
76 return -1;
77 }
78
79 if (channel == NULL) {
80 errno = EINVAL;
81 return -1;
82 }
83
84 if (*channel >= KNET_DATAFD_MAX) {
85 errno = EINVAL;
86 return -1;
87 }
88
|
(4) Event lock_acquire: |
Example 1: Calling "get_global_wrlock" acquires lock "knet_handle.global_rwlock". [details] |
|
(6) Event lock_acquire: |
Example 2: Calling "get_global_wrlock" acquires lock "knet_handle.global_rwlock". [details] |
|
(8) Event lock_acquire: |
Example 3: Calling "get_global_wrlock" acquires lock "knet_handle.global_rwlock". [details] |
| Also see events: |
[missing_lock][example_access][example_access][example_access] |
89 savederrno = get_global_wrlock(knet_h);
90 if (savederrno) {
91 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
92 strerror(savederrno));
93 errno = savederrno;
94 return -1;
95 }
96
97 if (!knet_h->sock_notify_fn) {
98 log_err(knet_h, KNET_SUB_HANDLE, "Adding datafd requires sock notify callback enabled!");
99 savederrno = EINVAL;
100 err = -1;
101 goto out_unlock;
102 }
103
104 if (*datafd > 0) {
105 for (i = 0; i < KNET_DATAFD_MAX; i++) {
106 if ((knet_h->sockfd[i].in_use) && (knet_h->sockfd[i].sockfd[0] == *datafd)) {
107 log_err(knet_h, KNET_SUB_HANDLE, "requested datafd: %d already exist in index: %d", *datafd, i);
108 savederrno = EEXIST;
109 err = -1;
110 goto out_unlock;
111 }
112 }
113 }
114
115 /*
116 * auto allocate a channel
117 */
118 if (*channel < 0) {
119 for (i = 0; i < KNET_DATAFD_MAX; i++) {
120 if (!knet_h->sockfd[i].in_use) {
121 *channel = i;
122 break;
123 }
124 }
125 if (*channel < 0) {
126 savederrno = EBUSY;
127 err = -1;
128 goto out_unlock;
129 }
130 } else {
131 if (knet_h->sockfd[*channel].in_use) {
132 savederrno = EBUSY;
133 err = -1;
134 goto out_unlock;
135 }
136 }
137
138 knet_h->sockfd[*channel].is_created = 0;
139 knet_h->sockfd[*channel].is_socket = 0;
140 knet_h->sockfd[*channel].has_error = 0;
141
142 if (*datafd > 0) {
143 int sockopt;
144 socklen_t sockoptlen = sizeof(sockopt);
145
146 if (_fdset_cloexec(*datafd)) {
147 savederrno = errno;
148 err = -1;
149 log_err(knet_h, KNET_SUB_HANDLE, "Unable to set CLOEXEC on datafd: %s",
150 strerror(savederrno));
151 goto out_unlock;
152 }
153
154 if (_fdset_nonblock(*datafd)) {
155 savederrno = errno;
156 err = -1;
157 log_err(knet_h, KNET_SUB_HANDLE, "Unable to set NONBLOCK on datafd: %s",
158 strerror(savederrno));
159 goto out_unlock;
160 }
161
162 knet_h->sockfd[*channel].sockfd[0] = *datafd;
163 knet_h->sockfd[*channel].sockfd[1] = 0;
164
165 if (!getsockopt(knet_h->sockfd[*channel].sockfd[0], SOL_SOCKET, SO_TYPE, &sockopt, &sockoptlen)) {
166 knet_h->sockfd[*channel].is_socket = 1;
167 }
168 } else {
169 if (_init_socketpair(knet_h, knet_h->sockfd[*channel].sockfd)) {
170 savederrno = errno;
171 err = -1;
172 goto out_unlock;
173 }
174
175 knet_h->sockfd[*channel].is_created = 1;
176 knet_h->sockfd[*channel].is_socket = 1;
177 *datafd = knet_h->sockfd[*channel].sockfd[0];
178 }
179
180 memset(&ev, 0, sizeof(struct epoll_event));
181 ev.events = EPOLLIN;
182 ev.data.fd = knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created];
183
184 if (epoll_ctl(knet_h->send_to_links_epollfd,
185 EPOLL_CTL_ADD, knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], &ev)) {
186 savederrno = errno;
187 err = -1;
188 log_err(knet_h, KNET_SUB_HANDLE, "Unable to add datafd %d to linkfd epoll pool: %s",
189 knet_h->sockfd[*channel].sockfd[knet_h->sockfd[*channel].is_created], strerror(savederrno));
190 if (knet_h->sockfd[*channel].is_created) {
191 _close_socketpair(knet_h, knet_h->sockfd[*channel].sockfd);
192 }
193 goto out_unlock;
194 }
195
196 knet_h->sockfd[*channel].in_use = 1;
197
198 out_unlock:
199 pthread_rwlock_unlock(&knet_h->global_rwlock);
200 errno = err ? savederrno : 0;
201 return err;
202 }
203
204 int knet_handle_remove_datafd(knet_handle_t knet_h, int datafd)
205 {
206 int err = 0, savederrno = 0;
207 int8_t channel = -1;
208 int i;
209 struct epoll_event ev;
210
211 if (!_is_valid_handle(knet_h)) {
212 return -1;
213 }
214
215 if (datafd <= 0) {
216 errno = EINVAL;
217 return -1;
218 }
219
220 savederrno = get_global_wrlock(knet_h);
221 if (savederrno) {
222 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
223 strerror(savederrno));
224 errno = savederrno;
225 return -1;
226 }
227
228 for (i = 0; i < KNET_DATAFD_MAX; i++) {
229 if ((knet_h->sockfd[i].in_use) &&
230 (knet_h->sockfd[i].sockfd[0] == datafd)) {
231 channel = i;
232 break;
233 }
234 }
235
236 if (channel < 0) {
237 savederrno = EINVAL;
238 err = -1;
239 goto out_unlock;
240 }
241
242 if (!knet_h->sockfd[channel].has_error) {
243 memset(&ev, 0, sizeof(struct epoll_event));
244
245 if (epoll_ctl(knet_h->send_to_links_epollfd,
246 EPOLL_CTL_DEL, knet_h->sockfd[channel].sockfd[knet_h->sockfd[channel].is_created], &ev)) {
247 savederrno = errno;
248 err = -1;
249 log_err(knet_h, KNET_SUB_HANDLE, "Unable to del datafd %d from linkfd epoll pool: %s",
250 knet_h->sockfd[channel].sockfd[0], strerror(savederrno));
251 goto out_unlock;
252 }
253 }
254
255 if (knet_h->sockfd[channel].is_created) {
256 _close_socketpair(knet_h, knet_h->sockfd[channel].sockfd);
257 }
258
259 memset(&knet_h->sockfd[channel], 0, sizeof(struct knet_sock));
260
261 out_unlock:
262 pthread_rwlock_unlock(&knet_h->global_rwlock);
263 errno = err ? savederrno : 0;
264 return err;
265 }
266
267 int knet_handle_get_datafd(knet_handle_t knet_h, const int8_t channel, int *datafd)
268 {
269 int err = 0, savederrno = 0;
270
271 if (!_is_valid_handle(knet_h)) {
272 return -1;
273 }
274
275 if ((channel < 0) || (channel >= KNET_DATAFD_MAX)) {
276 errno = EINVAL;
277 return -1;
278 }
279
280 if (datafd == NULL) {
281 errno = EINVAL;
282 return -1;
283 }
284
285 savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
286 if (savederrno) {
287 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
288 strerror(savederrno));
289 errno = savederrno;
290 return -1;
291 }
292
293 if (!knet_h->sockfd[channel].in_use) {
294 savederrno = EINVAL;
295 err = -1;
296 goto out_unlock;
297 }
298
299 *datafd = knet_h->sockfd[channel].sockfd[0];
300
301 out_unlock:
302 pthread_rwlock_unlock(&knet_h->global_rwlock);
303 errno = err ? savederrno : 0;
304 return err;
305 }
306
307 int knet_handle_get_channel(knet_handle_t knet_h, const int datafd, int8_t *channel)
308 {
309 int err = 0, savederrno = 0;
310 int i;
311
312 if (!_is_valid_handle(knet_h)) {
313 return -1;
314 }
315
316 if (datafd <= 0) {
317 errno = EINVAL;
318 return -1;
319 }
320
321 if (channel == NULL) {
322 errno = EINVAL;
323 return -1;
324 }
325
326 savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
327 if (savederrno) {
328 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
329 strerror(savederrno));
330 errno = savederrno;
331 return -1;
332 }
333
334 *channel = -1;
335
336 for (i = 0; i < KNET_DATAFD_MAX; i++) {
337 if ((knet_h->sockfd[i].in_use) &&
338 (knet_h->sockfd[i].sockfd[0] == datafd)) {
339 *channel = i;
340 break;
341 }
342 }
343
344 if (*channel < 0) {
345 savederrno = EINVAL;
346 err = -1;
347 goto out_unlock;
348 }
349
350 out_unlock:
351 pthread_rwlock_unlock(&knet_h->global_rwlock);
352 errno = err ? savederrno : 0;
353 return err;
354 }
355
356 int knet_handle_enable_filter(knet_handle_t knet_h,
357 void *dst_host_filter_fn_private_data,
358 int (*dst_host_filter_fn) (
359 void *private_data,
360 const unsigned char *outdata,
361 ssize_t outdata_len,
362 uint8_t tx_rx,
363 knet_node_id_t this_host_id,
364 knet_node_id_t src_node_id,
365 int8_t *channel,
366 knet_node_id_t *dst_host_ids,
367 size_t *dst_host_ids_entries))
368 {
369 int savederrno = 0;
370
371 if (!_is_valid_handle(knet_h)) {
372 return -1;
373 }
374
375 savederrno = get_global_wrlock(knet_h);
376 if (savederrno) {
377 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
378 strerror(savederrno));
379 errno = savederrno;
380 return -1;
381 }
382
383 knet_h->dst_host_filter_fn_private_data = dst_host_filter_fn_private_data;
384 knet_h->dst_host_filter_fn = dst_host_filter_fn;
385 if (knet_h->dst_host_filter_fn) {
386 log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn enabled");
387 } else {
388 log_debug(knet_h, KNET_SUB_HANDLE, "dst_host_filter_fn disabled");
389 }
390
391 pthread_rwlock_unlock(&knet_h->global_rwlock);
392
393 errno = 0;
394 return 0;
395 }
396
397 int knet_handle_setfwd(knet_handle_t knet_h, unsigned int enabled)
398 {
399 int savederrno = 0;
400
401 if (!_is_valid_handle(knet_h)) {
402 return -1;
403 }
404
405 if (enabled > 1) {
406 errno = EINVAL;
407 return -1;
408 }
409
410 savederrno = get_global_wrlock(knet_h);
411 if (savederrno) {
412 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
413 strerror(savederrno));
414 errno = savederrno;
415 return -1;
416 }
417
418 if (enabled) {
419 knet_h->enabled = enabled;
420 log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is enabled");
421 } else {
422 /*
423 * notify TX and RX threads to flush the queues
424 */
425 if (set_thread_flush_queue(knet_h, KNET_THREAD_TX, KNET_THREAD_QUEUE_FLUSH) < 0) {
426 log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for TX thread");
427 }
428 if (set_thread_flush_queue(knet_h, KNET_THREAD_RX, KNET_THREAD_QUEUE_FLUSH) < 0) {
429 log_debug(knet_h, KNET_SUB_HANDLE, "Unable to request queue flushing for RX thread");
430 }
431 }
432
433 pthread_rwlock_unlock(&knet_h->global_rwlock);
434
435 /*
436 * when disabling data forward, we need to give time to TX and RX
437 * to flush the queues.
438 *
439 * the TX thread is the main leader here. When there is no more
440 * data in the TX queue, we will also close traffic for RX.
441 */
442 if (!enabled) {
443 /*
444 * this usleep might be unnecessary, but wait_all_threads_flush_queue
445 * adds extra locking delay.
446 *
447 * allow all threads to run free without extra locking interference
448 * and then we switch to a more active wait in case the scheduler
449 * has decided to delay one thread or another
450 */
451 usleep(KNET_THREADS_TIMERES * 2);
452 wait_all_threads_flush_queue(knet_h);
453
454 /*
455 * all threads have done flushing the queue, we can stop data forwarding
456 */
457 savederrno = get_global_wrlock(knet_h);
458 if (savederrno) {
459 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
460 strerror(savederrno));
461 errno = savederrno;
462 return -1;
463 }
464 knet_h->enabled = enabled;
465 log_debug(knet_h, KNET_SUB_HANDLE, "Data forwarding is disabled");
466 pthread_rwlock_unlock(&knet_h->global_rwlock);
467 }
468
469 errno = 0;
470 return 0;
471 }
472
473 int knet_handle_setprio_dscp(knet_handle_t knet_h, uint8_t dscp)
474 {
475 int savederrno = 0;
476
477 if (!_is_valid_handle(knet_h)) {
478 return -1;
479 }
480
481 if (dscp > 0x3f) {
482 errno = EINVAL;
483 return -1;
484 }
485
486 savederrno = get_global_wrlock(knet_h);
487 if (savederrno) {
488 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
489 strerror(savederrno));
490 errno = savederrno;
491 return -1;
492 }
493
494 knet_h->prio_dscp = dscp;
495
496 pthread_rwlock_unlock(&knet_h->global_rwlock);
497
498 errno = 0;
499 return 0;
500 }
501
502 int knet_handle_get_stats(knet_handle_t knet_h, struct knet_handle_stats *stats, size_t struct_size)
503 {
504 int err = 0, savederrno = 0;
505
506 if (!_is_valid_handle(knet_h)) {
507 return -1;
508 }
509
510 if (!stats) {
511 errno = EINVAL;
512 return -1;
513 }
514
515 savederrno = pthread_rwlock_rdlock(&knet_h->global_rwlock);
516 if (savederrno) {
517 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get read lock: %s",
518 strerror(savederrno));
519 errno = savederrno;
520 return -1;
521 }
522
523 savederrno = pthread_mutex_lock(&knet_h->handle_stats_mutex);
524 if (savederrno) {
525 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get mutex lock: %s",
526 strerror(savederrno));
527 err = -1;
528 goto out_unlock;
529 }
530
531 if (struct_size > sizeof(struct knet_handle_stats)) {
532 struct_size = sizeof(struct knet_handle_stats);
533 }
534
535 memmove(stats, &knet_h->stats, struct_size);
536
537 /*
538 * TX crypt stats only count the data packets sent, so add in the ping/pong/pmtud figures
539 * RX is OK as it counts them before they are sorted.
540 */
541
542 stats->tx_crypt_packets += knet_h->stats_extra.tx_crypt_ping_packets +
543 knet_h->stats_extra.tx_crypt_pong_packets +
544 knet_h->stats_extra.tx_crypt_pmtu_packets +
545 knet_h->stats_extra.tx_crypt_pmtu_reply_packets;
546
547 /* Tell the caller our full size in case they have an old version */
548 stats->size = sizeof(struct knet_handle_stats);
549
550 out_unlock:
551 pthread_mutex_unlock(&knet_h->handle_stats_mutex);
552 pthread_rwlock_unlock(&knet_h->global_rwlock);
553 return err;
554 }
555
556 int knet_handle_clear_stats(knet_handle_t knet_h, int clear_option)
557 {
558 int savederrno = 0;
559
560 if (!_is_valid_handle(knet_h)) {
561 return -1;
562 }
563
564 if (clear_option != KNET_CLEARSTATS_HANDLE_ONLY &&
565 clear_option != KNET_CLEARSTATS_HANDLE_AND_LINK) {
566 errno = EINVAL;
567 return -1;
568 }
569
570 savederrno = get_global_wrlock(knet_h);
571 if (savederrno) {
572 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
573 strerror(savederrno));
574 errno = savederrno;
575 return -1;
576 }
577
578 memset(&knet_h->stats, 0, sizeof(struct knet_handle_stats));
579 memset(&knet_h->stats_extra, 0, sizeof(struct knet_handle_stats_extra));
580 if (clear_option == KNET_CLEARSTATS_HANDLE_AND_LINK) {
581 _link_clear_stats(knet_h);
582 }
583
584 pthread_rwlock_unlock(&knet_h->global_rwlock);
585 return 0;
586 }
587
588 int knet_handle_enable_access_lists(knet_handle_t knet_h, unsigned int enabled)
589 {
590 int savederrno = 0;
591
592 if (!_is_valid_handle(knet_h)) {
593 return -1;
594 }
595
596 if (enabled > 1) {
597 errno = EINVAL;
598 return -1;
599 }
600
601 savederrno = get_global_wrlock(knet_h);
602 if (savederrno) {
603 log_err(knet_h, KNET_SUB_HANDLE, "Unable to get write lock: %s",
604 strerror(savederrno));
605 errno = savederrno;
606 return -1;
607 }
608
609 knet_h->use_access_lists = enabled;
610
611 if (enabled) {
612 log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are enabled");
613 } else {
614 log_debug(knet_h, KNET_SUB_HANDLE, "Links access lists are disabled");
615 }
616
617 pthread_rwlock_unlock(&knet_h->global_rwlock);
618
619 errno = 0;
620 return 0;
621 }
622