1 /*
2 * Copyright (C) 2015-2025 Red Hat, Inc. All rights reserved.
3 *
4 * Authors: Fabio M. Di Nitto <fabbione@kronosnet.org>
5 * Federico Simoncelli <fsimon@kronosnet.org>
6 *
7 * This software licensed under LGPL-2.0+
8 */
9
10 #include "config.h"
11
12 #include <unistd.h>
13 #include <errno.h>
14 #include <string.h>
15 #include <pthread.h>
16 #include <time.h>
17
18 #include "crypto.h"
19 #include "links.h"
20 #include "logging.h"
21 #include "transports.h"
22 #include "threads_common.h"
23 #include "threads_heartbeat.h"
24
25 static void _link_down(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link)
26 {
27 memset(&dst_link->pmtud_last, 0, sizeof(struct timespec));
28 dst_link->received_pong = 0;
29 dst_link->status.pong_last.tv_nsec = 0;
30 dst_link->pong_timeout_backoff = KNET_LINK_PONG_TIMEOUT_BACKOFF;
31 if (dst_link->status.connected == 1) {
32 log_info(knet_h, KNET_SUB_LINK, "host: %u link: %u is down",
33 dst_host->host_id, dst_link->link_id);
34 _link_updown(knet_h, dst_host->host_id, dst_link->link_id, dst_link->status.enabled, 0, 1);
35 }
36 }
37
38 static void _handle_check_each(knet_handle_t knet_h, struct knet_host *dst_host, struct knet_link *dst_link, int timed)
39 {
40 int err = 0, savederrno = 0, stats_err = 0;
41 int len;
42 ssize_t outlen = KNET_HEADER_PING_SIZE;
43 struct timespec clock_now, pong_last;
44 unsigned long long diff_ping;
45 unsigned char *outbuf = (unsigned char *)knet_h->pingbuf;
46
47 if (dst_link->transport_connected == 0) {
48 _link_down(knet_h, dst_host, dst_link);
49 return;
50 }
51
52 /* caching last pong to avoid race conditions */
53 pong_last = dst_link->status.pong_last;
54
55 if (clock_gettime(CLOCK_MONOTONIC, &clock_now) != 0) {
56 log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get monotonic clock");
57 return;
58 }
59
60 timespec_diff(dst_link->ping_last, clock_now, &diff_ping);
61
62 if ((diff_ping >= (dst_link->ping_interval * 1000llu)) || (!timed)) {
63 memmove(&knet_h->pingbuf->khp_ping_time[0], &clock_now, sizeof(struct timespec));
64 knet_h->pingbuf->khp_ping_link = dst_link->link_id;
65 if (pthread_mutex_lock(&knet_h->tx_seq_num_mutex)) {
66 log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get seq mutex lock");
67 return;
68 }
69 knet_h->pingbuf->khp_ping_seq_num = htons(knet_h->tx_seq_num);
70 pthread_mutex_unlock(&knet_h->tx_seq_num_mutex);
71 knet_h->pingbuf->khp_ping_timed = timed;
72
73 if (knet_h->crypto_in_use_config) {
74 if (crypto_encrypt_and_sign(knet_h,
75 (const unsigned char *)knet_h->pingbuf,
76 outlen,
77 knet_h->pingbuf_crypt,
78 &outlen) < 0) {
79 log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to crypto ping packet");
80 return;
81 }
82
83 outbuf = knet_h->pingbuf_crypt;
84 if (pthread_mutex_lock(&knet_h->handle_stats_mutex) < 0) {
85 log_err(knet_h, KNET_SUB_HEARTBEAT, "Unable to get mutex lock");
86 return;
87 }
88 knet_h->stats_extra.tx_crypt_ping_packets++;
89 pthread_mutex_unlock(&knet_h->handle_stats_mutex);
90 }
91
92 stats_err = pthread_mutex_lock(&dst_link->link_stats_mutex);
93 if (stats_err) {
94 log_err(knet_h, KNET_SUB_HEARTBEAT, "Unable to get stats mutex lock for host %u link %u: %s",
95 dst_host->host_id, dst_link->link_id, strerror(stats_err));
96 return;
97 }
98
99 retry:
100 if (transport_get_connection_oriented(knet_h, dst_link->transport) == TRANSPORT_PROTO_NOT_CONNECTION_ORIENTED) {
101 len = sendto(dst_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL,
102 (struct sockaddr *) &dst_link->dst_addr,
103 knet_h->knet_transport_fd_tracker[dst_link->outsock].sockaddr_len);
104 } else {
105 len = sendto(dst_link->outsock, outbuf, outlen, MSG_DONTWAIT | MSG_NOSIGNAL, NULL, 0);
106 }
107 savederrno = errno;
108
109 dst_link->ping_last = clock_now;
110 dst_link->status.stats.tx_ping_packets++;
111 dst_link->status.stats.tx_ping_bytes += outlen;
112
113 if (len != outlen) {
114 err = transport_tx_sock_error(knet_h, dst_link->transport, dst_link->outsock, KNET_SUB_HEARTBEAT, len, savederrno);
115 switch(err) {
116 case -1: /* unrecoverable error */
117 log_debug(knet_h, KNET_SUB_HEARTBEAT,
118 "Unable to send ping (sock: %d) packet (sendto): %d %s. recorded src ip: %s src port: %s dst ip: %s dst port: %s",
119 dst_link->outsock, savederrno, strerror(savederrno),
120 dst_link->status.src_ipaddr, dst_link->status.src_port,
121 dst_link->status.dst_ipaddr, dst_link->status.dst_port);
122 dst_link->status.stats.tx_ping_errors++;
123 break;
124 case 0:
125 break;
126 case 1:
127 dst_link->status.stats.tx_ping_retries++;
128 goto retry;
129 break;
130 }
131 } else {
132 dst_link->last_ping_size = outlen;
133 }
134 pthread_mutex_unlock(&dst_link->link_stats_mutex);
135 }
136
137 timespec_diff(pong_last, clock_now, &diff_ping);
138 if ((pong_last.tv_nsec) &&
139 (diff_ping >= (dst_link->pong_timeout_adj * 1000llu))) {
140 _link_down(knet_h, dst_host, dst_link);
141 }
142 }
143
144 void _send_pings(knet_handle_t knet_h, int timed)
145 {
146 struct knet_host *dst_host;
147 int link_idx;
148
149 if (pthread_mutex_lock(&knet_h->hb_mutex)) {
150 log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get hb mutex lock");
151 return;
152 }
153
154 for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
155 for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
156 if ((dst_host->link[link_idx].status.enabled != 1) ||
157 (dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK ) ||
158 ((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
159 (dst_host->link[link_idx].status.dynconnected != 1)))
160 continue;
161
162 _handle_check_each(knet_h, dst_host, &dst_host->link[link_idx], timed);
163 }
164 }
165
166 pthread_mutex_unlock(&knet_h->hb_mutex);
167 }
168
169 static void _adjust_pong_timeouts(knet_handle_t knet_h)
170 {
171 struct knet_host *dst_host;
172 struct knet_link *dst_link;
173 int link_idx;
174
|
(1) Event cond_false: |
Condition "pthread_mutex_lock(&knet_h->backoff_mutex)", taking false branch. |
175 if (pthread_mutex_lock(&knet_h->backoff_mutex)) {
176 log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get backoff_mutex");
177 return;
|
(2) Event if_end: |
End of if statement. |
178 }
179
|
(3) Event cond_true: |
Condition "dst_host != NULL", taking true branch. |
180 for (dst_host = knet_h->host_head; dst_host != NULL; dst_host = dst_host->next) {
|
(4) Event cond_true: |
Condition "link_idx < 8", taking true branch. |
|
(8) Event cond_true: |
Condition "link_idx < 8", taking true branch. |
|
(12) Event cond_true: |
Condition "link_idx < 8", taking true branch. |
181 for (link_idx = 0; link_idx < KNET_MAX_LINK; link_idx++) {
|
(5) Event cond_true: |
Condition "dst_host->link[link_idx].status.enabled != 1", taking true branch. |
|
(9) Event cond_true: |
Condition "dst_host->link[link_idx].status.enabled != 1", taking true branch. |
|
(13) Event cond_false: |
Condition "dst_host->link[link_idx].status.enabled != 1", taking false branch. |
|
(14) Event cond_false: |
Condition "dst_host->link[link_idx].transport == 0", taking false branch. |
|
(15) Event cond_true: |
Condition "dst_host->link[link_idx].dynamic == 1", taking true branch. |
|
(16) Event cond_false: |
Condition "dst_host->link[link_idx].status.dynconnected != 1", taking false branch. |
182 if ((dst_host->link[link_idx].status.enabled != 1) ||
183 (dst_host->link[link_idx].transport == KNET_TRANSPORT_LOOPBACK ) ||
184 ((dst_host->link[link_idx].dynamic == KNET_LINK_DYNIP) &&
185 (dst_host->link[link_idx].status.dynconnected != 1)))
|
(6) Event continue: |
Continuing loop. |
|
(10) Event continue: |
Continuing loop. |
|
(17) Event if_end: |
End of if statement. |
186 continue;
187
188 dst_link = &dst_host->link[link_idx];
189
|
(18) Event cond_true: |
Condition "dst_link->pong_timeout_backoff > 1", taking true branch. |
190 if (dst_link->pong_timeout_backoff > 1) {
191 dst_link->pong_timeout_backoff--;
192 }
193
|
(19) Event missing_lock: |
Accessing "dst_link->status.latency" without holding lock "knet_link.link_stats_mutex". Elsewhere, "knet_link_status.latency" is written to with "knet_link.link_stats_mutex" held 1 out of 1 times. |
| Also see events: |
[lock_acquire][example_access] |
194 dst_link->pong_timeout_adj = (dst_link->pong_timeout * dst_link->pong_timeout_backoff) + (dst_link->status.latency * KNET_LINK_PONG_TIMEOUT_LAT_MUL);
|
(7) Event loop: |
Looping back. |
|
(11) Event loop: |
Looping back. |
195 }
196 }
197
198 pthread_mutex_unlock(&knet_h->backoff_mutex);
199 }
200
201 void *_handle_heartbt_thread(void *data)
202 {
203 knet_handle_t knet_h = (knet_handle_t) data;
204 int i = 1;
205
206 set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_STARTED);
207
208 /* preparing ping buffer */
209 knet_h->pingbuf->kh_version = KNET_HEADER_VERSION;
210 knet_h->pingbuf->kh_type = KNET_HEADER_TYPE_PING;
211 knet_h->pingbuf->kh_node = htons(knet_h->host_id);
212
213 while (!shutdown_in_progress(knet_h)) {
214 usleep(KNET_THREADS_TIMERES);
215
216 if (pthread_rwlock_rdlock(&knet_h->global_rwlock) != 0) {
217 log_debug(knet_h, KNET_SUB_HEARTBEAT, "Unable to get read lock");
218 continue;
219 }
220
221 /*
222 * _adjust_pong_timeouts should execute approx once a second.
223 */
224 if ((i % (1000000 / KNET_THREADS_TIMERES)) == 0) {
225 _adjust_pong_timeouts(knet_h);
226 i = 1;
227 } else {
228 i++;
229 }
230
231 _send_pings(knet_h, 1);
232
233 pthread_rwlock_unlock(&knet_h->global_rwlock);
234 }
235
236 set_thread_status(knet_h, KNET_THREAD_HB, KNET_THREAD_STOPPED);
237
238 return NULL;
239 }
240