1 /*
2 * Copyright (C) 2010,2013 Red Hat, Inc.
3 *
4 * Author: Angus Salkeld <asalkeld@redhat.com>
5 *
6 * This file is part of libqb.
7 *
8 * libqb is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation, either version 2.1 of the License, or
11 * (at your option) any later version.
12 *
13 * libqb is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with libqb. If not, see <http://www.gnu.org/licenses/>.
20 */
21 #include "os_base.h"
22 #include <poll.h>
23
24 #ifdef HAVE_SYS_UN_H
25 #include <sys/un.h>
26 #endif /* HAVE_SYS_UN_H */
27 #ifdef HAVE_SYS_MMAN_H
28 #include <sys/mman.h>
29 #endif
30
31 #include <qb/qbatomic.h>
32 #include <qb/qbipcs.h>
33 #include <qb/qbloop.h>
34 #include <qb/qbdefs.h>
35
36 #include "util_int.h"
37 #include "ipc_int.h"
38
39 struct ipc_us_control {
40 int32_t sent;
41 int32_t flow_control;
42 };
43 #define SHM_CONTROL_SIZE (3 * sizeof(struct ipc_us_control))
44
45 int use_filesystem_sockets(void)
46 {
47 static int need_init = 1;
48 static int filesystem_sockets = 0;
49
50 if (need_init) {
51 #if defined(QB_LINUX) || defined(QB_CYGWIN)
52 struct stat buf;
53
54 if (stat(FORCESOCKETSFILE, &buf) == 0) {
55 filesystem_sockets = 1;
56 }
57 #else
58 filesystem_sockets = 1;
59 #endif
60 need_init = 0;
61 }
62 return filesystem_sockets;
63 }
64
65 static void
66 set_sock_addr(struct sockaddr_un *address, const char *socket_name)
67 {
68 memset(address, 0, sizeof(struct sockaddr_un));
69 address->sun_family = AF_UNIX;
70 #ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
71 address->sun_len = QB_SUN_LEN(address);
72 #endif
73
74 if (socket_name[0] == '/' || !use_filesystem_sockets()) {
75 snprintf(address->sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name);
76 } else {
77 snprintf(address->sun_path, sizeof(address->sun_path), "%s/%s", SOCKETDIR,
78 socket_name);
79 }
80 }
81
82 static int32_t
83 qb_ipc_dgram_sock_setup(const char *base_name,
84 const char *service_name, int32_t * sock_pt,
85 gid_t gid)
86 {
87 int32_t request_fd;
88 struct sockaddr_un local_address;
89 int32_t res = 0;
90 char sock_path[PATH_MAX];
91
92 request_fd = socket(PF_UNIX, SOCK_DGRAM, 0);
93 if (request_fd == -1) {
94 return -errno;
95 }
96
97 qb_socket_nosigpipe(request_fd);
98 res = qb_sys_fd_nonblock_cloexec_set(request_fd);
99 if (res < 0) {
100 goto error_connect;
101 }
102 snprintf(sock_path, PATH_MAX, "%s-%s", base_name, service_name);
103 set_sock_addr(&local_address, sock_path);
104 if (use_filesystem_sockets()) {
105 (void)unlink(local_address.sun_path);
106 }
107 res = bind(request_fd, (struct sockaddr *)&local_address,
108 sizeof(local_address));
109
110 if (use_filesystem_sockets()) {
111 (void)chmod(local_address.sun_path, 0660);
112 (void)chown(local_address.sun_path, -1, gid);
113 }
114 if (res < 0) {
115 goto error_connect;
116 }
117
118 *sock_pt = request_fd;
119 return 0;
120
121 error_connect:
122 close(request_fd);
123 *sock_pt = -1;
124
125 return res;
126 }
127
128 static int32_t
129 set_sock_size(int sockfd, size_t max_msg_size)
130 {
131 int32_t rc;
132 unsigned int optval;
133 socklen_t optlen = sizeof(optval);
134
135 rc = getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, &optlen);
136
137 qb_util_log(LOG_TRACE, "%d: getsockopt(%d, SO_SNDBUF, needed:%d) actual:%d",
138 rc, sockfd, max_msg_size, optval);
139
140 /* The optval <= max_msg_size check is weird...
141 * during testing it was discovered in some instances if the
142 * default optval is exactly equal to our max_msg_size, we couldn't
143 * actually send a message that large unless we explicitly set
144 * it using setsockopt... there is no good explaination for this. Most
145 * likely this is hitting some sort of "off by one" error in the kernel. */
146 if (rc == 0 && optval <= max_msg_size) {
147 optval = max_msg_size;
148 optlen = sizeof(optval);
149 rc = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, optlen);
150 }
151
152 if (rc != 0) {
153 return -errno;
154 }
155
156 rc = getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, &optlen);
157
158 qb_util_log(LOG_TRACE, "%d: getsockopt(%d, SO_RCVBUF, needed:%d) actual:%d",
159 rc, sockfd, max_msg_size, optval);
160
161 /* Set the sockets receive buffer size to match the send buffer. On
162 * FreeBSD without this calls to sendto() will result in an ENOBUFS error
163 * if the message is larger than net.local.dgram.recvspace sysctl. */
164 if (rc == 0 && optval <= max_msg_size) {
165 optval = max_msg_size;
166 optlen = sizeof(optval);
167 rc = setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, optlen);
168 }
169
170 if (rc != 0) {
171 return -errno;
172 }
173
174 return rc;
175 }
176
177 static int32_t
178 dgram_verify_msg_size(size_t max_msg_size)
179 {
180 int32_t rc = -1;
181 int32_t sockets[2];
182 int32_t tries = 0;
183 int32_t write_passed = 0;
184 int32_t read_passed = 0;
185 char buf[max_msg_size];
186 memset (buf, 0, max_msg_size);
187
|
(1) Event cond_false: |
Condition "socketpair(1, SOCK_DGRAM, 0, sockets) < 0", taking false branch. |
188 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets) < 0) {
189 qb_util_perror(LOG_DEBUG, "error calling socketpair()");
190 goto cleanup_socks;
|
(2) Event if_end: |
End of if statement. |
191 }
192
|
(3) Event cond_false: |
Condition "set_sock_size(sockets[0], max_msg_size) != 0", taking false branch. |
193 if (set_sock_size(sockets[0], max_msg_size) != 0) {
194 qb_util_log(LOG_DEBUG, "error set_sock_size(sockets[0],%#x)",
195 max_msg_size);
196 goto cleanup_socks;
|
(4) Event if_end: |
End of if statement. |
197 }
|
(5) Event cond_false: |
Condition "set_sock_size(sockets[1], max_msg_size) != 0", taking false branch. |
198 if (set_sock_size(sockets[1], max_msg_size) != 0) {
199 qb_util_log(LOG_DEBUG, "error set_sock_size(sockets[1],%#x)",
200 max_msg_size);
201 goto cleanup_socks;
|
(6) Event if_end: |
End of if statement. |
202 }
203
|
(7) Event cond_true: |
Condition "tries < 3", taking true branch. |
204 for (tries = 0; tries < 3; tries++) {
205
|
(8) Event cond_true: |
Condition "write_passed == 0", taking true branch. |
206 if (write_passed == 0) {
|
(9) Event return_constant: |
Function call "write(sockets[1], buf, max_msg_size)" may return -1. |
|
(10) Event assignment: |
Assigning: "rc" = "write(sockets[1], buf, max_msg_size)". The value of "rc" is now -1. |
| Also see events: |
[cond_const][overrun-buffer-arg] |
207 rc = write(sockets[1], buf, max_msg_size);
208
|
(11) Event cond_true: |
Condition "rc < 0", taking true branch. |
|
(12) Event cond_false: |
Condition "*__errno_location() == 11", taking false branch. |
|
(13) Event cond_false: |
Condition "*__errno_location() == 4", taking false branch. |
209 if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
210 continue;
|
(14) Event else_branch: |
Reached else branch. |
|
(15) Event cond_true: |
Condition "rc == max_msg_size", taking true branch. |
|
(16) Event cond_const: |
Checking "rc == max_msg_size" implies that "max_msg_size" is 18446744073709551615 on the true branch. |
| Also see events: |
[return_constant][assignment][overrun-buffer-arg] |
211 } else if (rc == max_msg_size) {
212 write_passed = 1;
|
(17) Event if_fallthrough: |
Falling through to end of if statement. |
213 } else {
214 break;
|
(18) Event if_end: |
End of if statement. |
215 }
216 }
217
|
(19) Event cond_true: |
Condition "read_passed == 0", taking true branch. |
218 if (read_passed == 0) {
|
(20) Event overrun-buffer-arg: |
Calling "read" with "buf" and "max_msg_size" is suspicious because of the very large index, 18446744073709551615. The index may be due to a negative parameter being interpreted as unsigned. |
| Also see events: |
[return_constant][assignment][cond_const] |
219 rc = read(sockets[0], buf, max_msg_size);
220
221 if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
222 continue;
223 } else if (rc == max_msg_size) {
224 read_passed = 1;
225 } else {
226 break;
227 }
228 }
229
230 if (read_passed && write_passed) {
231 rc = 0;
232 break;
233 }
234 }
235
236
237 cleanup_socks:
238 close(sockets[0]);
239 close(sockets[1]);
240 return rc;
241 }
242
243 int32_t
244 qb_ipcc_verify_dgram_max_msg_size(size_t max_msg_size)
245 {
246 int32_t i;
247 int32_t last = -1;
248 int32_t inc = 2048;
249
250 if (dgram_verify_msg_size(max_msg_size) == 0) {
251 return max_msg_size;
252 }
253
254 for (i = inc; i < max_msg_size; i+=inc) {
255 if (dgram_verify_msg_size(i) == 0) {
256 last = i;
257 } else if (inc >= 512) {
258 i-=inc;
259 inc = inc/2;
260 } else {
261 break;
262 }
263 }
264
265 return last;
266 }
267
268 /*
269 * bind to "base_name-local_name"
270 * connect to "base_name-remote_name"
271 * output sock_pt
272 */
273 static int32_t
274 qb_ipc_dgram_sock_connect(const char *base_name,
275 const char *local_name,
276 const char *remote_name,
277 int32_t max_msg_size, int32_t * sock_pt, gid_t gid)
278 {
279 char sock_path[PATH_MAX];
280 struct sockaddr_un remote_address;
281 int32_t res = qb_ipc_dgram_sock_setup(base_name, local_name,
282 sock_pt, gid);
283 if (res < 0) {
284 return res;
285 }
286
287 snprintf(sock_path, PATH_MAX, "%s-%s", base_name, remote_name);
288 set_sock_addr(&remote_address, sock_path);
289 if (connect(*sock_pt, (struct sockaddr *)&remote_address,
290 QB_SUN_LEN(&remote_address)) == -1) {
291 res = -errno;
292 goto error_connect;
293 }
294
295 return set_sock_size(*sock_pt, max_msg_size);
296
297 error_connect:
298 close(*sock_pt);
299 *sock_pt = -1;
300
301 return res;
302 }
303
304 static int32_t
305 _finish_connecting(struct qb_ipc_one_way *one_way)
306 {
307 struct sockaddr_un remote_address;
308 int res;
309 int error;
310 int retry = 0;
311
312 set_sock_addr(&remote_address, one_way->u.us.sock_name);
313
314 /* this retry loop is here to help connecting when trying to send
315 * an event right after connection setup.
316 */
317 do {
318 errno = 0;
319 res = connect(one_way->u.us.sock,
320 (struct sockaddr *)&remote_address,
321 QB_SUN_LEN(&remote_address));
322 if (res == -1) {
323 error = -errno;
324 qb_util_perror(LOG_DEBUG, "error calling connect()");
325 retry++;
326 usleep(100000);
327 }
328 } while (res == -1 && retry < 10);
329 if (res == -1) {
330 return error;
331 }
332
333 /* Beside disposing no longer needed value, this also signals that
334 we are done with connect-on-send arrangement at the server side
335 (i.e. for response and event channels). */
336 free(one_way->u.us.sock_name);
337 one_way->u.us.sock_name = NULL;
338
339 return set_sock_size(one_way->u.us.sock, one_way->max_msg_size);
340 }
341
342 /*
343 * client functions
344 * --------------------------------------------------------
345 */
346 static void
347 qb_ipcc_us_disconnect(struct qb_ipcc_connection *c)
348 {
349 munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
350 unlink(c->request.u.us.shared_file_name);
351
352 if (use_filesystem_sockets()) {
353 struct sockaddr_un un_addr;
354 socklen_t un_addr_len = sizeof(struct sockaddr_un);
355 char *base_name;
356 char sock_name[PATH_MAX];
357 size_t length;
358 if (getsockname(c->response.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
359 length = strlen(un_addr.sun_path);
360 base_name = strndup(un_addr.sun_path,
361 length - /* strlen("-response") */ 9);
362 qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
363 snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
364 qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
365 unlink(sock_name);
366 snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
367 qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
368 unlink(sock_name);
369 snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
370 qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
371 unlink(sock_name);
372 snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
373 qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
374 unlink(sock_name);
375 free(base_name);
376 }
377 }
378
379 /* Last-ditch attempt to tidy up after ourself */
380 remove_tempdir(c->request.u.us.shared_file_name);
381
382 qb_ipcc_us_sock_close(c->event.u.us.sock);
383 qb_ipcc_us_sock_close(c->request.u.us.sock);
384 qb_ipcc_us_sock_close(c->setup.u.us.sock);
385 }
386
387 static ssize_t
388 qb_ipc_socket_send(struct qb_ipc_one_way *one_way,
389 const void *msg_ptr, size_t msg_len)
390 {
391 ssize_t rc = 0;
392 struct ipc_us_control *ctl;
393 ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
394
395 if (one_way->u.us.sock_name) {
396 rc = _finish_connecting(one_way);
397 if (rc < 0) {
398 qb_util_log(LOG_ERR, "socket connect-on-send");
399 return rc;
400 }
401 }
402
403 qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
404 rc = send(one_way->u.us.sock, msg_ptr, msg_len, MSG_NOSIGNAL);
405 if (rc == -1) {
406 rc = -errno;
407 if (errno != EAGAIN && errno != ENOBUFS) {
408 qb_util_perror(LOG_DEBUG, "socket_send:send");
409 }
410 }
411 qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
412
413 if (ctl && rc == msg_len) {
414 qb_atomic_int_inc(&ctl->sent);
415 }
416
417 return rc;
418 }
419
420 static ssize_t
421 qb_ipc_socket_sendv(struct qb_ipc_one_way *one_way, const struct iovec *iov,
422 size_t iov_len)
423 {
424 int32_t rc;
425 struct ipc_us_control *ctl;
426 ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
427
428 qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
429
430 if (one_way->u.us.sock_name) {
431 rc = _finish_connecting(one_way);
432 if (rc < 0) {
433 qb_util_perror(LOG_ERR, "socket connect-on-sendv");
434 qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
435 return rc;
436 }
437 }
438
439 rc = writev(one_way->u.us.sock, iov, iov_len);
440
441 if (rc == -1) {
442 rc = -errno;
443 if (errno != EAGAIN && errno != ENOBUFS) {
444 qb_util_perror(LOG_DEBUG, "socket_sendv:writev %d",
445 one_way->u.us.sock);
446 }
447 }
448
449 qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
450
451 if (ctl && rc > 0) {
452 qb_atomic_int_inc(&ctl->sent);
453 }
454 return rc;
455 }
456
457 /*
458 * recv a message of unknown size.
459 */
460 static ssize_t
461 qb_ipc_us_recv_at_most(struct qb_ipc_one_way *one_way,
462 void *msg, size_t len, int32_t timeout)
463 {
464 int32_t result;
465 int32_t final_rc = 0;
466 int32_t to_recv = 0;
467 char *data = msg;
468 struct ipc_us_control *ctl = NULL;
469 int32_t time_waited = 0;
470 int32_t time_to_wait = timeout;
471
472 if (timeout == -1) {
473 time_to_wait = 1000;
474 }
475
476 qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
477
478 retry_peek:
479 result = recv(one_way->u.us.sock, data,
480 sizeof(struct qb_ipc_request_header),
481 MSG_NOSIGNAL | MSG_PEEK);
482
483 if (result == -1) {
484
485 if (errno != EAGAIN) {
486 final_rc = -errno;
487 if (use_filesystem_sockets()) {
488 if (errno == ECONNRESET || errno == EPIPE) {
489 final_rc = -ENOTCONN;
490 }
491 }
492 goto cleanup_sigpipe;
493 }
494
495 /* check to see if we have enough time left to try again */
496 if (time_waited < timeout || timeout == -1) {
497 result = qb_ipc_us_ready(one_way, NULL, time_to_wait, POLLIN);
498 if (qb_ipc_us_sock_error_is_disconnected(result)) {
499 final_rc = result;
500 goto cleanup_sigpipe;
501 }
502 time_waited += time_to_wait;
503 goto retry_peek;
504 } else if (time_waited >= timeout) {
505 final_rc = -ETIMEDOUT;
506 goto cleanup_sigpipe;
507 }
508 }
509 if (result >= sizeof(struct qb_ipc_request_header)) {
510 struct qb_ipc_request_header *hdr = NULL;
511 hdr = (struct qb_ipc_request_header *)msg;
512 to_recv = hdr->size;
513 }
514
515 result = recv(one_way->u.us.sock, data, to_recv,
516 MSG_NOSIGNAL | MSG_WAITALL);
517 if (result == -1) {
518 final_rc = -errno;
519 goto cleanup_sigpipe;
520 } else if (result == 0) {
521 qb_util_log(LOG_DEBUG, "recv == 0 -> ENOTCONN");
522
523 final_rc = -ENOTCONN;
524 goto cleanup_sigpipe;
525 }
526
527 final_rc = result;
528
529 ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
530 if (ctl) {
531 (void)qb_atomic_int_dec_and_test(&ctl->sent);
532 }
533
534 cleanup_sigpipe:
535 qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
536 return final_rc;
537 }
538
539 static void
540 qb_ipc_us_fc_set(struct qb_ipc_one_way *one_way, int32_t fc_enable)
541 {
542 struct ipc_us_control *ctl =
543 (struct ipc_us_control *)one_way->u.us.shared_data;
544
545 qb_util_log(LOG_TRACE, "setting fc to %d", fc_enable);
546 qb_atomic_int_set(&ctl->flow_control, fc_enable);
547 }
548
549 static int32_t
550 qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way)
551 {
552 struct ipc_us_control *ctl =
553 (struct ipc_us_control *)one_way->u.us.shared_data;
554
555 return qb_atomic_int_get(&ctl->flow_control);
556 }
557
558 static ssize_t
559 qb_ipc_us_q_len_get(struct qb_ipc_one_way *one_way)
560 {
561 struct ipc_us_control *ctl =
562 (struct ipc_us_control *)one_way->u.us.shared_data;
563 return qb_atomic_int_get(&ctl->sent);
564 }
565
566 int32_t
567 qb_ipcc_us_connect(struct qb_ipcc_connection * c,
568 struct qb_ipc_connection_response * r)
569 {
570 int32_t res;
571 char path[PATH_MAX];
572 int32_t fd_hdr;
573 char *shm_ptr;
574
575 qb_atomic_init();
576
577 c->needs_sock_for_poll = QB_FALSE;
578 c->funcs.send = qb_ipc_socket_send;
579 c->funcs.sendv = qb_ipc_socket_sendv;
580 c->funcs.recv = qb_ipc_us_recv_at_most;
581 c->funcs.fc_get = qb_ipc_us_fc_get;
582 c->funcs.disconnect = qb_ipcc_us_disconnect;
583
584 fd_hdr = qb_sys_mmap_file_open(path, r->request,
585 SHM_CONTROL_SIZE, O_RDWR);
586 if (fd_hdr < 0) {
587 res = fd_hdr;
588 errno = -fd_hdr;
589 qb_util_perror(LOG_ERR, "couldn't open file for mmap");
590 return res;
591 }
592 (void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
593 shm_ptr = mmap(0, SHM_CONTROL_SIZE,
594 PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
595
596 if (shm_ptr == MAP_FAILED) {
597 res = -errno;
598 qb_util_perror(LOG_ERR, "couldn't create mmap for header");
599 goto cleanup_hdr;
600 }
601 c->request.u.us.shared_data = shm_ptr;
602 c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
603 c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control));
604
605 close(fd_hdr);
606 fd_hdr = -1;
607
608 res = qb_ipc_dgram_sock_connect(r->response, "response", "request",
609 r->max_msg_size, &c->request.u.us.sock, c->egid);
610 if (res != 0) {
611 goto cleanup_hdr;
612 }
613 c->response.u.us.sock = c->request.u.us.sock;
614
615 res = qb_ipc_dgram_sock_connect(r->response, "event", "event-tx",
616 r->max_msg_size, &c->event.u.us.sock, c->egid);
617 if (res != 0) {
618 goto cleanup_hdr;
619 }
620
621 return 0;
622
623 cleanup_hdr:
624 if (fd_hdr >= 0) {
625 close(fd_hdr);
626 }
627 close(c->event.u.us.sock);
628 close(c->request.u.us.sock);
629 unlink(r->request);
630 munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
631 return res;
632 }
633
634 /*
635 * service functions
636 * --------------------------------------------------------
637 */
638 static int32_t
639 _sock_connection_liveliness(int32_t fd, int32_t revents, void *data)
640 {
641 struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
642
643 qb_util_log(LOG_DEBUG, "LIVENESS: fd %d event %d conn (%s)",
644 fd, revents, c->description);
645 if (revents & POLLNVAL) {
646 qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
647 qb_ipcs_disconnect(c);
648 return -EINVAL;
649 }
650 if (revents & POLLHUP) {
651 qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
652 qb_ipcs_disconnect(c);
653 return -ESHUTDOWN;
654 }
655
656 /* If we actually get POLLIN for some reason here, it most
657 * certainly means EOF. Do a recv on the fd to detect eof and
658 * then disconnect */
659 if (revents & POLLIN) {
660 char buf[10];
661 int res;
662
663 res = recv(fd, buf, sizeof(buf), MSG_DONTWAIT);
664 if (res < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
665 res = -errno;
666 } else if (res == 0) {
667 qb_util_log(LOG_DEBUG, "EOF conn (%s)", c->description);
668 res = -ESHUTDOWN;
669 }
670
671 if (res < 0) {
672 qb_ipcs_disconnect(c);
673 return res;
674 }
675 }
676
677 return 0;
678 }
679
680 static int32_t
681 _sock_add_to_mainloop(struct qb_ipcs_connection *c)
682 {
683 int res;
684
685 res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
686 c->request.u.us.sock,
687 POLLIN | POLLPRI | POLLNVAL,
688 c,
689 qb_ipcs_dispatch_connection_request);
690
691 if (res < 0) {
692 qb_util_log(LOG_ERR,
693 "Error adding socket to mainloop (%s).",
694 c->description);
695 return res;
696 }
697
698 res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
699 c->setup.u.us.sock,
700 POLLIN | POLLPRI | POLLNVAL,
701 c, _sock_connection_liveliness);
702 qb_util_log(LOG_DEBUG, "added %d to poll loop (liveness)",
703 c->setup.u.us.sock);
704 if (res < 0) {
705 qb_util_perror(LOG_ERR, "Error adding setupfd to mainloop");
706 (void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
707 return res;
708 }
709 return res;
710 }
711
712 static void
713 _sock_rm_from_mainloop(struct qb_ipcs_connection *c)
714 {
715 (void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
716 (void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock);
717 }
718
719 static void
720 qb_ipcs_us_disconnect(struct qb_ipcs_connection *c)
721 {
722 qb_enter();
723
724 if (c->state == QB_IPCS_CONNECTION_ESTABLISHED ||
725 c->state == QB_IPCS_CONNECTION_ACTIVE) {
726 _sock_rm_from_mainloop(c);
727
728 /* Free the temporaries denoting which respective socket
729 name on the client's side to connect upon the first
730 send operation -- normally the variable is free'd once
731 the connection is established but there may have been
732 no chance for that. */
733 free(c->response.u.us.sock_name);
734 c->response.u.us.sock_name = NULL;
735
736 free(c->event.u.us.sock_name);
737 c->event.u.us.sock_name = NULL;
738
739 if (use_filesystem_sockets()) {
740 struct sockaddr_un un_addr;
741 socklen_t un_addr_len = sizeof(struct sockaddr_un);
742 char *base_name;
743 char sock_name[PATH_MAX];
744 size_t length;
745 if (getsockname(c->request.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
746 length = strlen(un_addr.sun_path);
747 base_name = strndup(un_addr.sun_path,
748 length - /* strlen("-request") */ 8);
749 qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
750 snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
751 qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
752 unlink(sock_name);
753 snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
754 qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
755 unlink(sock_name);
756 snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
757 qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
758 unlink(sock_name);
759 snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
760 qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
761 unlink(sock_name);
762 free(base_name);
763 }
764 }
765 qb_ipcc_us_sock_close(c->setup.u.us.sock);
766 qb_ipcc_us_sock_close(c->request.u.us.sock);
767 qb_ipcc_us_sock_close(c->event.u.us.sock);
768 }
769 if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN ||
770 c->state == QB_IPCS_CONNECTION_ACTIVE) {
771 munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
772 unlink(c->request.u.us.shared_file_name);
773
774
775 }
776 remove_tempdir(c->description);
777 }
778
779 static int32_t
780 qb_ipcs_us_connect(struct qb_ipcs_service *s,
781 struct qb_ipcs_connection *c,
782 struct qb_ipc_connection_response *r)
783 {
784 char path[PATH_MAX];
785 int32_t fd_hdr;
786 int32_t res = 0;
787 struct ipc_us_control *ctl;
788 char *shm_ptr;
789
790 qb_util_log(LOG_DEBUG, "connecting to client (%s)", c->description);
791
792 c->request.u.us.sock = c->setup.u.us.sock;
793 c->response.u.us.sock = c->setup.u.us.sock;
794
795 snprintf(r->request, NAME_MAX, "%s-control-%s",
796 c->description, s->name);
797 snprintf(r->response, NAME_MAX, "%s-%s", c->description, s->name);
798
799 fd_hdr = qb_sys_mmap_file_open(path, r->request,
800 SHM_CONTROL_SIZE,
801 O_CREAT | O_TRUNC | O_RDWR | O_EXCL);
802 if (fd_hdr < 0) {
803 res = fd_hdr;
804 errno = -fd_hdr;
805 qb_util_perror(LOG_ERR, "couldn't create file for mmap (%s)",
806 c->description);
807 return res;
808 }
809 (void)strlcpy(r->request, path, PATH_MAX);
810 (void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
811 res = chown(r->request, c->auth.uid, c->auth.gid);
812 if (res != 0) {
813 /* ignore res, this is just for the compiler warnings.
814 */
815 res = 0;
816 }
817 res = chmod(r->request, c->auth.mode);
818 if (res != 0) {
819 /* ignore res, this is just for the compiler warnings.
820 */
821 res = 0;
822 }
823
824 shm_ptr = mmap(0, SHM_CONTROL_SIZE,
825 PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
826
827 if (shm_ptr == MAP_FAILED) {
828 res = -errno;
829 qb_util_perror(LOG_ERR, "couldn't create mmap for header (%s)",
830 c->description);
831 goto cleanup_hdr;
832 }
833 c->request.u.us.shared_data = shm_ptr;
834 c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
835 c->event.u.us.shared_data = shm_ptr + (2 * sizeof(struct ipc_us_control));
836
837 ctl = (struct ipc_us_control *)c->request.u.us.shared_data;
838 ctl->sent = 0;
839 ctl->flow_control = 0;
840 ctl = (struct ipc_us_control *)c->response.u.us.shared_data;
841 ctl->sent = 0;
842 ctl->flow_control = 0;
843 ctl = (struct ipc_us_control *)c->event.u.us.shared_data;
844 ctl->sent = 0;
845 ctl->flow_control = 0;
846
847 close(fd_hdr);
848 fd_hdr = -1;
849
850 /* request channel */
851 res = qb_ipc_dgram_sock_setup(r->response, "request",
852 &c->request.u.us.sock, c->egid);
853 if (res < 0) {
854 goto cleanup_hdr;
855 }
856
857 res = set_sock_size(c->request.u.us.sock, c->request.max_msg_size);
858 if (res != 0) {
859 goto cleanup_hdr;
860 }
861
862 c->setup.u.us.sock_name = NULL;
863 c->request.u.us.sock_name = NULL;
864
865 /* response channel */
866 c->response.u.us.sock = c->request.u.us.sock;
867 snprintf(path, PATH_MAX, "%s-%s", r->response, "response");
868 c->response.u.us.sock_name = strdup(path);
869
870 /* event channel */
871 res = qb_ipc_dgram_sock_setup(r->response, "event-tx",
872 &c->event.u.us.sock, c->egid);
873 if (res < 0) {
874 goto cleanup_hdr;
875 }
876
877 res = set_sock_size(c->event.u.us.sock, c->event.max_msg_size);
878 if (res != 0) {
879 goto cleanup_hdr;
880 }
881
882 snprintf(path, PATH_MAX, "%s-%s", r->response, "event");
883 c->event.u.us.sock_name = strdup(path);
884
885 res = _sock_add_to_mainloop(c);
886 if (res < 0) {
887 goto cleanup_hdr;
888 }
889
890 return res;
891
892 cleanup_hdr:
893 free(c->response.u.us.sock_name);
894 free(c->event.u.us.sock_name);
895
896 if (fd_hdr >= 0) {
897 close(fd_hdr);
898 }
899 unlink(r->request);
900 munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
901 return res;
902 }
903
904 void
905 qb_ipcs_us_init(struct qb_ipcs_service *s)
906 {
907 s->funcs.connect = qb_ipcs_us_connect;
908 s->funcs.disconnect = qb_ipcs_us_disconnect;
909
910 s->funcs.recv = qb_ipc_us_recv_at_most;
911 s->funcs.peek = NULL;
912 s->funcs.reclaim = NULL;
913 s->funcs.send = qb_ipc_socket_send;
914 s->funcs.sendv = qb_ipc_socket_sendv;
915
916 s->funcs.fc_set = qb_ipc_us_fc_set;
917 s->funcs.q_len_get = qb_ipc_us_q_len_get;
918
919 s->needs_sock_for_poll = QB_FALSE;
920
921 qb_atomic_init();
922 }
923