1    	/*
2    	 * Copyright (C) 2010,2013 Red Hat, Inc.
3    	 *
4    	 * Author: Angus Salkeld <asalkeld@redhat.com>
5    	 *
6    	 * This file is part of libqb.
7    	 *
8    	 * libqb is free software: you can redistribute it and/or modify
9    	 * it under the terms of the GNU Lesser General Public License as published by
10   	 * the Free Software Foundation, either version 2.1 of the License, or
11   	 * (at your option) any later version.
12   	 *
13   	 * libqb is distributed in the hope that it will be useful,
14   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   	 * GNU Lesser General Public License for more details.
17   	 *
18   	 * You should have received a copy of the GNU Lesser General Public License
19   	 * along with libqb.  If not, see <http://www.gnu.org/licenses/>.
20   	 */
21   	#include "os_base.h"
22   	#include <poll.h>
23   	
24   	#ifdef HAVE_SYS_UN_H
25   	#include <sys/un.h>
26   	#endif /* HAVE_SYS_UN_H */
27   	#ifdef HAVE_SYS_MMAN_H
28   	#include <sys/mman.h>
29   	#endif
30   	
31   	#include <qb/qbatomic.h>
32   	#include <qb/qbipcs.h>
33   	#include <qb/qbloop.h>
34   	#include <qb/qbdefs.h>
35   	
36   	#include "util_int.h"
37   	#include "ipc_int.h"
38   	
39   	struct ipc_us_control {
40   		int32_t sent;
41   		int32_t flow_control;
42   	};
43   	#define SHM_CONTROL_SIZE (3 * sizeof(struct ipc_us_control))
44   	
45   	int use_filesystem_sockets(void)
46   	{
47   		static int need_init = 1;
48   		static int filesystem_sockets = 0;
49   	
50   		if (need_init) {
51   	#if defined(QB_LINUX) || defined(QB_CYGWIN)
52   			struct stat buf;
53   	
54   			if (stat(FORCESOCKETSFILE, &buf) == 0) {
55   				filesystem_sockets = 1;
56   			}
57   	#else
58   			filesystem_sockets = 1;
59   	#endif
60   			need_init = 0;
61   		}
62   		return filesystem_sockets;
63   	}
64   	
65   	static void
66   	set_sock_addr(struct sockaddr_un *address, const char *socket_name)
67   	{
68   		memset(address, 0, sizeof(struct sockaddr_un));
69   		address->sun_family = AF_UNIX;
70   	#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
71   		address->sun_len = QB_SUN_LEN(address);
72   	#endif
73   	
74   		if (socket_name[0] == '/' || !use_filesystem_sockets()) {
75   			snprintf(address->sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name);
76   		} else {
77   			snprintf(address->sun_path, sizeof(address->sun_path), "%s/%s", SOCKETDIR,
78   				 socket_name);
79   		}
80   	}
81   	
82   	static int32_t
83   	qb_ipc_dgram_sock_setup(const char *base_name,
84   				const char *service_name, int32_t * sock_pt,
85   				gid_t gid)
86   	{
87   		int32_t request_fd;
88   		struct sockaddr_un local_address;
89   		int32_t res = 0;
90   		char sock_path[PATH_MAX];
91   	
92   		request_fd = socket(PF_UNIX, SOCK_DGRAM, 0);
93   		if (request_fd == -1) {
94   			return -errno;
95   		}
96   	
97   		qb_socket_nosigpipe(request_fd);
98   		res = qb_sys_fd_nonblock_cloexec_set(request_fd);
99   		if (res < 0) {
100  			goto error_connect;
101  		}
102  		snprintf(sock_path, PATH_MAX, "%s-%s", base_name, service_name);
103  		set_sock_addr(&local_address, sock_path);
104  		if (use_filesystem_sockets()) {
105  			(void)unlink(local_address.sun_path);
106  		}
107  		res = bind(request_fd, (struct sockaddr *)&local_address,
108  			   sizeof(local_address));
109  		if (res < 0) {
110  			goto error_connect;
111  		}
112  	
113  		if (use_filesystem_sockets()) {
114  			if (chmod(local_address.sun_path, 0660) != 0) {
115  				res = 0;
116  				qb_util_perror(LOG_ERR, "failed to chmod socket (%s)",
117  					       local_address.sun_path);
118  			}
119  			/* chown may fail if not root, but log it */
120  			if (chown(local_address.sun_path, -1, gid) != 0) {
121  				qb_util_perror(LOG_WARNING, "failed to chown socket (%s)",
122  					       local_address.sun_path);
123  			}
124  		}
125  	
126  		*sock_pt = request_fd;
127  		return 0;
128  	
129  	error_connect:
130  		close(request_fd);
131  		*sock_pt = -1;
132  	
133  		return res;
134  	}
135  	
136  	static int32_t
137  	set_sock_size(int sockfd, size_t max_msg_size)
138  	{
139  		int32_t rc;
140  		unsigned int optval;
141  		socklen_t optlen = sizeof(optval);
142  	
143  		rc = getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, &optlen);
144  	
145  		qb_util_log(LOG_TRACE, "%d: getsockopt(%d, SO_SNDBUF, needed:%d) actual:%d",
146  			rc, sockfd, max_msg_size, optval);
147  	
148  		/* The optval <= max_msg_size check is weird...
149  		 * during testing it was discovered in some instances if the
150  		 * default optval is exactly equal to our max_msg_size, we couldn't
151  		 * actually send a message that large unless we explicitly set
152  		 * it using setsockopt... there is no good explaination for this. Most
153  		 * likely this is hitting some sort of "off by one" error in the kernel. */
154  		if (rc == 0 && optval <= max_msg_size) {
155  			optval = max_msg_size;
156  			optlen = sizeof(optval);
157  			rc = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, optlen);
158  		}
159  	
160  		if (rc != 0) {
161  			return -errno;
162  		}
163  	
164  		rc = getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, &optlen);
165  	
166  		qb_util_log(LOG_TRACE, "%d: getsockopt(%d, SO_RCVBUF, needed:%d) actual:%d",
167  			rc, sockfd, max_msg_size, optval);
168  	
169  		/* Set the sockets receive buffer size to match the send buffer.  On
170  		 * FreeBSD without this calls to sendto() will result in an ENOBUFS error
171  		 * if the message is larger than net.local.dgram.recvspace sysctl. */
172  		if (rc == 0 && optval <= max_msg_size) {
173  			optval = max_msg_size;
174  			optlen = sizeof(optval);
175  			rc = setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, optlen);
176  		}
177  	
178  		if (rc != 0) {
179  			return -errno;
180  		}
181  	
182  		return rc;
183  	}
184  	
185  	static int32_t
186  	dgram_verify_msg_size(size_t max_msg_size)
187  	{
188  		int32_t rc = -1;
189  		int32_t sockets[2];
190  		int32_t tries = 0;
191  		int32_t write_passed = 0;
192  		int32_t read_passed = 0;
193  		char buf[max_msg_size];
194  		memset (buf, 0, max_msg_size);
195  	
196  		if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets) < 0) {
197  			qb_util_perror(LOG_DEBUG, "error calling socketpair()");
198  			goto cleanup_socks;
199  		}
200  	
201  		if (set_sock_size(sockets[0], max_msg_size) != 0) {
202  			qb_util_log(LOG_DEBUG, "error set_sock_size(sockets[0],%#x)",
203  				    max_msg_size);
204  			goto cleanup_socks;
205  		}
206  		if (set_sock_size(sockets[1], max_msg_size) != 0) {
207  			qb_util_log(LOG_DEBUG, "error set_sock_size(sockets[1],%#x)",
208  				    max_msg_size);
209  			goto cleanup_socks;
210  		}
211  	
212  		for (tries = 0; tries < 3; tries++) {
213  	
214  			if (write_passed == 0) {
215  				rc = write(sockets[1], buf, max_msg_size);
216  	
217  				if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
218  					continue;
219  				} else if (rc == max_msg_size) {
220  					write_passed = 1;
221  				} else {
222  					break;
223  				}
224  			}
225  	
226  			if (read_passed == 0) {
227  				rc = read(sockets[0], buf, max_msg_size);
228  	
229  				if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
230  					continue;
231  				} else if (rc == max_msg_size) {
232  					read_passed = 1;
233  				} else {
234  					break;
235  				}
236  			}
237  	
238  			if (read_passed && write_passed) {
239  				rc = 0;
240  				break;
241  			}
242  		}
243  	
244  	
245  	cleanup_socks:
246  		close(sockets[0]);
247  		close(sockets[1]);
248  		return rc;
249  	}
250  	
251  	int32_t
252  	qb_ipcc_verify_dgram_max_msg_size(size_t max_msg_size)
253  	{
254  		int32_t i;
(1) Event assign_neg_constant: Assigning: "last" = "-1", which is negative.
Also see events: [return_negative_variable]
255  		int32_t last = -1;
256  		int32_t inc = 2048;
257  	
(2) Event path: Condition "dgram_verify_msg_size(max_msg_size) == 0", taking false branch.
258  		if (dgram_verify_msg_size(max_msg_size) == 0) {
259  			return max_msg_size;
260  		}
261  	
(3) Event path: Condition "i < max_msg_size", taking false branch.
262  		for (i = inc; i < max_msg_size; i+=inc) {
263  			if (dgram_verify_msg_size(i) == 0) {
264  				last = i;
265  			} else if (inc >= 512) {
266  				i-=inc;
267  				inc = inc/2;
268  			} else {
269  				break;
270  			}
271  		}
272  	
(4) Event return_negative_variable: Explicitly returning negative variable "last".
Also see events: [assign_neg_constant]
273  		return last;
274  	}
275  	
276  	/*
277  	 * bind to "base_name-local_name"
278  	 * connect to "base_name-remote_name"
279  	 * output sock_pt
280  	 */
281  	static int32_t
282  	qb_ipc_dgram_sock_connect(const char *base_name,
283  				  const char *local_name,
284  				  const char *remote_name,
285  				  int32_t max_msg_size, int32_t * sock_pt, gid_t gid)
286  	{
287  		char sock_path[PATH_MAX];
288  		struct sockaddr_un remote_address;
289  		int32_t res = qb_ipc_dgram_sock_setup(base_name, local_name,
290  						      sock_pt, gid);
291  		if (res < 0) {
292  			return res;
293  		}
294  	
295  		snprintf(sock_path, PATH_MAX, "%s-%s", base_name, remote_name);
296  		set_sock_addr(&remote_address, sock_path);
297  		if (connect(*sock_pt, (struct sockaddr *)&remote_address,
298  			    QB_SUN_LEN(&remote_address)) == -1) {
299  			res = -errno;
300  			goto error_connect;
301  		}
302  	
303  		return set_sock_size(*sock_pt, max_msg_size);
304  	
305  	error_connect:
306  		close(*sock_pt);
307  		*sock_pt = -1;
308  	
309  		return res;
310  	}
311  	
312  	static int32_t
313  	_finish_connecting(struct qb_ipc_one_way *one_way)
314  	{
315  		struct sockaddr_un remote_address;
316  		int res;
317  		int error;
318  		int retry = 0;
319  	
320  		set_sock_addr(&remote_address, one_way->u.us.sock_name);
321  	
322  		/* this retry loop is here to help connecting when trying to send
323  		 * an event right after connection setup.
324  		 */
325  		do {
326  			errno = 0;
327  			res = connect(one_way->u.us.sock,
328  				      (struct sockaddr *)&remote_address,
329  				      QB_SUN_LEN(&remote_address));
330  			if (res == -1) {
331  				error = -errno;
332  				qb_util_perror(LOG_DEBUG, "error calling connect()");
333  				retry++;
334  				usleep(100000);
335  			}
336  		} while (res == -1 && retry < 10);
337  		if (res == -1) {
338  			return error;
339  		}
340  	
341  		/* Beside disposing no longer needed value, this also signals that
342  		   we are done with connect-on-send arrangement at the server side
343  		   (i.e. for response and event channels). */
344  		free(one_way->u.us.sock_name);
345  		one_way->u.us.sock_name = NULL;
346  	
347  		return set_sock_size(one_way->u.us.sock, one_way->max_msg_size);
348  	}
349  	
350  	/*
351  	 * client functions
352  	 * --------------------------------------------------------
353  	 */
354  	static void
355  	qb_ipcc_us_disconnect(struct qb_ipcc_connection *c)
356  	{
357  		munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
358  		unlink(c->request.u.us.shared_file_name);
359  	
360  		if (use_filesystem_sockets()) {
361  			struct sockaddr_un un_addr;
362  			socklen_t un_addr_len = sizeof(struct sockaddr_un);
363  			char *base_name;
364  			char sock_name[PATH_MAX];
365  			size_t length;
366  			if (getsockname(c->response.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
367  				length = strlen(un_addr.sun_path);
368  				base_name = strndup(un_addr.sun_path,
369  						    length - /* strlen("-response") */ 9);
370  				qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
371  				snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
372  				qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
373  				unlink(sock_name);
374  				snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
375  				qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
376  				unlink(sock_name);
377  				snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
378  				qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
379  				unlink(sock_name);
380  				snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
381  				qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
382  				unlink(sock_name);
383  				free(base_name);
384  			}
385  		}
386  	
387  		/* Last-ditch attempt to tidy up after ourself */
388  		remove_tempdir(c->request.u.us.shared_file_name);
389  	
390  		qb_ipcc_us_sock_close(c->event.u.us.sock);
391  		qb_ipcc_us_sock_close(c->request.u.us.sock);
392  		qb_ipcc_us_sock_close(c->setup.u.us.sock);
393  	}
394  	
395  	static ssize_t
396  	qb_ipc_socket_send(struct qb_ipc_one_way *one_way,
397  			   const void *msg_ptr, size_t msg_len)
398  	{
399  		ssize_t rc = 0;
400  		struct ipc_us_control *ctl;
401  		ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
402  	
403  		if (one_way->u.us.sock_name) {
404  			rc = _finish_connecting(one_way);
405  			if (rc < 0) {
406  				qb_util_log(LOG_ERR, "socket connect-on-send");
407  				return rc;
408  			}
409  		}
410  	
411  		qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
412  		rc = send(one_way->u.us.sock, msg_ptr, msg_len, MSG_NOSIGNAL);
413  		if (rc == -1) {
414  			rc = -errno;
415  			if (errno != EAGAIN && errno != ENOBUFS) {
416  				qb_util_perror(LOG_DEBUG, "socket_send:send");
417  			}
418  		}
419  		qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
420  	
421  		if (ctl && rc == msg_len) {
422  			qb_atomic_int_inc(&ctl->sent);
423  		}
424  	
425  		return rc;
426  	}
427  	
428  	static ssize_t
429  	qb_ipc_socket_sendv(struct qb_ipc_one_way *one_way, const struct iovec *iov,
430  			    size_t iov_len)
431  	{
432  		int32_t rc;
433  		struct ipc_us_control *ctl;
434  		ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
435  	
436  		qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
437  	
438  		if (one_way->u.us.sock_name) {
439  			rc = _finish_connecting(one_way);
440  			if (rc < 0) {
441  				qb_util_perror(LOG_ERR, "socket connect-on-sendv");
442  				qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
443  				return rc;
444  			}
445  		}
446  	
447  		rc = writev(one_way->u.us.sock, iov, iov_len);
448  	
449  		if (rc == -1) {
450  			rc = -errno;
451  			if (errno != EAGAIN && errno != ENOBUFS) {
452  				qb_util_perror(LOG_DEBUG, "socket_sendv:writev %d",
453  					       one_way->u.us.sock);
454  			}
455  		}
456  	
457  		qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
458  	
459  		if (ctl && rc > 0) {
460  			qb_atomic_int_inc(&ctl->sent);
461  		}
462  		return rc;
463  	}
464  	
465  	/*
466  	 * recv a message of unknown size.
467  	 */
468  	static ssize_t
469  	qb_ipc_us_recv_at_most(struct qb_ipc_one_way *one_way,
470  			       void *msg, size_t len, int32_t timeout)
471  	{
472  		int32_t result;
473  		int32_t final_rc = 0;
474  		int32_t to_recv = 0;
475  		char *data = msg;
476  		struct ipc_us_control *ctl = NULL;
477  		int32_t time_waited = 0;
478  		int32_t time_to_wait = timeout;
479  	
480  		if (timeout == -1) {
481  			time_to_wait = 1000;
482  		}
483  	
484  		qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
485  	
486  	retry_peek:
487  		result = recv(one_way->u.us.sock, data,
488  			      sizeof(struct qb_ipc_request_header),
489  			      MSG_NOSIGNAL | MSG_PEEK);
490  	
491  		if (result == -1) {
492  	
493  			if (errno != EAGAIN) {
494  				final_rc = -errno;
495  				if (use_filesystem_sockets()) {
496  					if (errno == ECONNRESET || errno == EPIPE) {
497  						final_rc = -ENOTCONN;
498  					}
499  				}
500  				goto cleanup_sigpipe;
501  			}
502  	
503  			/* check to see if we have enough time left to try again */
504  			if (time_waited < timeout || timeout == -1) {
505  				result = qb_ipc_us_ready(one_way, NULL, time_to_wait, POLLIN);
506  				if (qb_ipc_us_sock_error_is_disconnected(result)) {
507  					final_rc = result;
508  					goto cleanup_sigpipe;
509  				}
510  				time_waited += time_to_wait;
511  				goto retry_peek;
512  			} else if (time_waited >= timeout) {
513  				final_rc = -ETIMEDOUT;
514  				goto cleanup_sigpipe;
515  			}
516  		}
517  		if (result >= sizeof(struct qb_ipc_request_header)) {
518  			struct qb_ipc_request_header *hdr = NULL;
519  			hdr = (struct qb_ipc_request_header *)msg;
520  			to_recv = hdr->size;
521  		}
522  	
523  		result = recv(one_way->u.us.sock, data, to_recv,
524  			      MSG_NOSIGNAL | MSG_WAITALL);
525  		if (result == -1) {
526  			final_rc = -errno;
527  			goto cleanup_sigpipe;
528  		} else if (result == 0) {
529  			qb_util_log(LOG_DEBUG, "recv == 0 -> ENOTCONN");
530  	
531  			final_rc = -ENOTCONN;
532  			goto cleanup_sigpipe;
533  		}
534  	
535  		final_rc = result;
536  	
537  		ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
538  		if (ctl) {
539  			(void)qb_atomic_int_dec_and_test(&ctl->sent);
540  		}
541  	
542  	cleanup_sigpipe:
543  		qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
544  		return final_rc;
545  	}
546  	
547  	static void
548  	qb_ipc_us_fc_set(struct qb_ipc_one_way *one_way, int32_t fc_enable)
549  	{
550  		struct ipc_us_control *ctl =
551  		    (struct ipc_us_control *)one_way->u.us.shared_data;
552  	
553  		qb_util_log(LOG_TRACE, "setting fc to %d", fc_enable);
554  		qb_atomic_int_set(&ctl->flow_control, fc_enable);
555  	}
556  	
557  	static int32_t
558  	qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way)
559  	{
560  		struct ipc_us_control *ctl =
561  		    (struct ipc_us_control *)one_way->u.us.shared_data;
562  	
563  		return qb_atomic_int_get(&ctl->flow_control);
564  	}
565  	
566  	static ssize_t
567  	qb_ipc_us_q_len_get(struct qb_ipc_one_way *one_way)
568  	{
569  		struct ipc_us_control *ctl =
570  		    (struct ipc_us_control *)one_way->u.us.shared_data;
571  		return qb_atomic_int_get(&ctl->sent);
572  	}
573  	
574  	int32_t
575  	qb_ipcc_us_connect(struct qb_ipcc_connection * c,
576  			   struct qb_ipc_connection_response * r)
577  	{
578  		int32_t res;
579  		char path[PATH_MAX];
580  		int32_t fd_hdr;
581  		char *shm_ptr;
582  	
583  		qb_atomic_init();
584  	
585  		c->needs_sock_for_poll = QB_FALSE;
586  		c->funcs.send = qb_ipc_socket_send;
587  		c->funcs.sendv = qb_ipc_socket_sendv;
588  		c->funcs.recv = qb_ipc_us_recv_at_most;
589  		c->funcs.fc_get = qb_ipc_us_fc_get;
590  		c->funcs.disconnect = qb_ipcc_us_disconnect;
591  	
592  		fd_hdr = qb_sys_mmap_file_open(path, r->request,
593  					       SHM_CONTROL_SIZE, O_RDWR);
594  		if (fd_hdr < 0) {
595  			res = fd_hdr;
596  			errno = -fd_hdr;
597  			qb_util_perror(LOG_ERR, "couldn't open file for mmap");
598  			return res;
599  		}
600  		(void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
601  		shm_ptr = mmap(0, SHM_CONTROL_SIZE,
602  			       PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
603  	
604  		if (shm_ptr == MAP_FAILED) {
605  			res = -errno;
606  			qb_util_perror(LOG_ERR, "couldn't create mmap for header");
607  			goto cleanup_hdr;
608  		}
609  		c->request.u.us.shared_data = shm_ptr;
610  		c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
611  		c->event.u.us.shared_data =  shm_ptr + (2 * sizeof(struct ipc_us_control));
612  	
613  		close(fd_hdr);
614  		fd_hdr = -1;
615  	
616  		res = qb_ipc_dgram_sock_connect(r->response, "response", "request",
617  						r->max_msg_size, &c->request.u.us.sock, c->egid);
618  		if (res != 0) {
619  			goto cleanup_hdr;
620  		}
621  		c->response.u.us.sock = c->request.u.us.sock;
622  	
623  		res = qb_ipc_dgram_sock_connect(r->response, "event", "event-tx",
624  						r->max_msg_size, &c->event.u.us.sock, c->egid);
625  		if (res != 0) {
626  			goto cleanup_hdr;
627  		}
628  	
629  		return 0;
630  	
631  	cleanup_hdr:
632  		if (fd_hdr >= 0) {
633  			close(fd_hdr);
634  		}
635  		close(c->event.u.us.sock);
636  		close(c->request.u.us.sock);
637  		unlink(r->request);
638  		munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
639  		return res;
640  	}
641  	
642  	/*
643  	 * service functions
644  	 * --------------------------------------------------------
645  	 */
646  	static int32_t
647  	_sock_connection_liveliness(int32_t fd, int32_t revents, void *data)
648  	{
649  		struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
650  	
651  		qb_util_log(LOG_DEBUG, "LIVENESS: fd %d event %d conn (%s)",
652  			    fd, revents, c->description);
653  		if (revents & POLLNVAL) {
654  			qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
655  			qb_ipcs_disconnect(c);
656  			return -EINVAL;
657  		}
658  		if (revents & POLLHUP) {
659  			qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
660  			qb_ipcs_disconnect(c);
661  			return -ESHUTDOWN;
662  		}
663  	
664  		/* If we actually get POLLIN for some reason here, it most
665  		 * certainly means EOF. Do a recv on the fd to detect eof and
666  		 * then disconnect */
667  		if (revents & POLLIN) {
668  			char buf[10];
669  			int res;
670  	
671  			res = recv(fd, buf, sizeof(buf), MSG_DONTWAIT);
672  			if (res < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
673  				res = -errno;
674  			} else if (res == 0) {
675  				qb_util_log(LOG_DEBUG, "EOF conn (%s)", c->description);
676  				res = -ESHUTDOWN;
677  			}
678  	
679  			if (res < 0) {
680  				qb_ipcs_disconnect(c);
681  				return res;
682  			}
683  		}
684  	
685  		return 0;
686  	}
687  	
688  	static int32_t
689  	_sock_add_to_mainloop(struct qb_ipcs_connection *c)
690  	{
691  		int res;
692  	
693  		res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
694  							c->request.u.us.sock,
695  							POLLIN | POLLPRI | POLLNVAL,
696  							c,
697  							qb_ipcs_dispatch_connection_request);
698  	
699  		if (res < 0) {
700  			qb_util_log(LOG_ERR,
701  				    "Error adding socket to mainloop (%s).",
702  				    c->description);
703  			return res;
704  		}
705  	
706  		res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
707  							c->setup.u.us.sock,
708  							POLLIN | POLLPRI | POLLNVAL,
709  							c, _sock_connection_liveliness);
710  		qb_util_log(LOG_DEBUG, "added %d to poll loop (liveness)",
711  			    c->setup.u.us.sock);
712  		if (res < 0) {
713  			qb_util_perror(LOG_ERR, "Error adding setupfd to mainloop");
714  			(void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
715  			return res;
716  		}
717  		return res;
718  	}
719  	
720  	static void
721  	_sock_rm_from_mainloop(struct qb_ipcs_connection *c)
722  	{
723  		(void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
724  		(void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock);
725  	}
726  	
727  	static void
728  	qb_ipcs_us_disconnect(struct qb_ipcs_connection *c)
729  	{
730  		qb_enter();
731  	
732  		if (c->state == QB_IPCS_CONNECTION_ESTABLISHED ||
733  		    c->state == QB_IPCS_CONNECTION_ACTIVE) {
734  			_sock_rm_from_mainloop(c);
735  	
736  			/* Free the temporaries denoting which respective socket
737  			   name on the client's side to connect upon the first
738  			   send operation -- normally the variable is free'd once
739  			   the connection is established but there may have been
740  			   no chance for that.  */
741  			free(c->response.u.us.sock_name);
742  			c->response.u.us.sock_name = NULL;
743  	
744  			free(c->event.u.us.sock_name);
745  			c->event.u.us.sock_name = NULL;
746  	
747  			if (use_filesystem_sockets()) {
748  				struct sockaddr_un un_addr;
749  				socklen_t un_addr_len = sizeof(struct sockaddr_un);
750  				char *base_name;
751  				char sock_name[PATH_MAX];
752  				size_t length;
753  				if (getsockname(c->request.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
754  					length = strlen(un_addr.sun_path);
755  					base_name = strndup(un_addr.sun_path,
756  							    length - /* strlen("-request") */ 8);
757  					qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
758  					snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
759  					qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
760  					unlink(sock_name);
761  					snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
762  					qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
763  					unlink(sock_name);
764  					snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
765  					qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
766  					unlink(sock_name);
767  					snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
768  					qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
769  					unlink(sock_name);
770  					free(base_name);
771  				}
772  			}
773  			qb_ipcc_us_sock_close(c->setup.u.us.sock);
774  			qb_ipcc_us_sock_close(c->request.u.us.sock);
775  			qb_ipcc_us_sock_close(c->event.u.us.sock);
776  		}
777  		if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN ||
778  		    c->state == QB_IPCS_CONNECTION_ACTIVE) {
779  			munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
780  			unlink(c->request.u.us.shared_file_name);
781  	
782  	
783  		}
784  		remove_tempdir(c->description);
785  	}
786  	
787  	static int32_t
788  	qb_ipcs_us_connect(struct qb_ipcs_service *s,
789  			   struct qb_ipcs_connection *c,
790  			   struct qb_ipc_connection_response *r)
791  	{
792  		char path[PATH_MAX];
793  		int32_t fd_hdr;
794  		int32_t res = 0;
795  		struct ipc_us_control *ctl;
796  		char *shm_ptr;
797  	
798  		qb_util_log(LOG_DEBUG, "connecting to client (%s)", c->description);
799  	
800  		c->request.u.us.sock = c->setup.u.us.sock;
801  		c->response.u.us.sock = c->setup.u.us.sock;
802  	
803  		snprintf(r->request, NAME_MAX, "%s-control-%s",
804  			 c->description, s->name);
805  		snprintf(r->response, NAME_MAX, "%s-%s", c->description, s->name);
806  	
807  		fd_hdr = qb_sys_mmap_file_open(path, r->request,
808  					       SHM_CONTROL_SIZE,
809  					       O_CREAT | O_TRUNC | O_RDWR | O_EXCL);
810  		if (fd_hdr < 0) {
811  			res = fd_hdr;
812  			errno = -fd_hdr;
813  			qb_util_perror(LOG_ERR, "couldn't create file for mmap (%s)",
814  				       c->description);
815  			return res;
816  		}
817  		(void)strlcpy(r->request, path, PATH_MAX);
818  		(void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
819  		/* These aren't failures, but are worth logging */
820  		res = chown(r->request, c->auth.uid, c->auth.gid);
821  		if (res != 0) {
822  			res = 0;
823  			qb_util_perror(LOG_ERR, "failed to chown shared memory file (%s)",
824  				       r->request);
825  			goto cleanup_hdr;
826  		}
827  		res = chmod(r->request, c->auth.mode);
828  		if (res != 0) {
829  			res = 0;
830  			qb_util_perror(LOG_ERR, "failed to chmod shared memory file (%s)",
831  				       r->request);
832  			goto cleanup_hdr;
833  		}
834  	
835  		shm_ptr = mmap(0, SHM_CONTROL_SIZE,
836  			       PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
837  	
838  		if (shm_ptr == MAP_FAILED) {
839  			res = -errno;
840  			qb_util_perror(LOG_ERR, "couldn't create mmap for header (%s)",
841  				       c->description);
842  			goto cleanup_hdr;
843  		}
844  		c->request.u.us.shared_data = shm_ptr;
845  		c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
846  		c->event.u.us.shared_data =  shm_ptr + (2 * sizeof(struct ipc_us_control));
847  	
848  		ctl = (struct ipc_us_control *)c->request.u.us.shared_data;
849  		ctl->sent = 0;
850  		ctl->flow_control = 0;
851  		ctl = (struct ipc_us_control *)c->response.u.us.shared_data;
852  		ctl->sent = 0;
853  		ctl->flow_control = 0;
854  		ctl = (struct ipc_us_control *)c->event.u.us.shared_data;
855  		ctl->sent = 0;
856  		ctl->flow_control = 0;
857  	
858  		close(fd_hdr);
859  		fd_hdr = -1;
860  	
861  		/* request channel */
862  		res = qb_ipc_dgram_sock_setup(r->response, "request",
863  					      &c->request.u.us.sock, c->egid);
864  		if (res < 0) {
865  			goto cleanup_hdr;
866  		}
867  	
868  		res = set_sock_size(c->request.u.us.sock, c->request.max_msg_size);
869  		if (res != 0) {
870  			goto cleanup_hdr;
871  		}
872  	
873  		c->setup.u.us.sock_name = NULL;
874  		c->request.u.us.sock_name = NULL;
875  	
876  		/* response channel */
877  		c->response.u.us.sock = c->request.u.us.sock;
878  		snprintf(path, PATH_MAX, "%s-%s", r->response, "response");
879  		c->response.u.us.sock_name = strdup(path);
880  		if (c->response.u.us.sock_name == NULL) {
881  			res = -ENOMEM;
882  			goto cleanup_hdr;
883  		}
884  	
885  		/* event channel */
886  		res = qb_ipc_dgram_sock_setup(r->response, "event-tx",
887  					      &c->event.u.us.sock, c->egid);
888  		if (res < 0) {
889  			goto cleanup_hdr;
890  		}
891  	
892  		res = set_sock_size(c->event.u.us.sock, c->event.max_msg_size);
893  		if (res != 0) {
894  			goto cleanup_hdr;
895  		}
896  	
897  		snprintf(path, PATH_MAX, "%s-%s", r->response, "event");
898  		c->event.u.us.sock_name = strdup(path);
899  		if (c->event.u.us.sock_name == NULL) {
900  			res = -ENOMEM;
901  			goto cleanup_hdr;
902  		}
903  	
904  		res = _sock_add_to_mainloop(c);
905  		if (res < 0) {
906  			goto cleanup_hdr;
907  		}
908  	
909  		return res;
910  	
911  	cleanup_hdr:
912  		free(c->response.u.us.sock_name);
913  		free(c->event.u.us.sock_name);
914  	
915  		if (fd_hdr >= 0) {
916  			close(fd_hdr);
917  		}
918  		unlink(r->request);
919  		munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
920  		return res;
921  	}
922  	
923  	void
924  	qb_ipcs_us_init(struct qb_ipcs_service *s)
925  	{
926  		s->funcs.connect = qb_ipcs_us_connect;
927  		s->funcs.disconnect = qb_ipcs_us_disconnect;
928  	
929  		s->funcs.recv = qb_ipc_us_recv_at_most;
930  		s->funcs.peek = NULL;
931  		s->funcs.reclaim = NULL;
932  		s->funcs.send = qb_ipc_socket_send;
933  		s->funcs.sendv = qb_ipc_socket_sendv;
934  	
935  		s->funcs.fc_set = qb_ipc_us_fc_set;
936  		s->funcs.q_len_get = qb_ipc_us_q_len_get;
937  	
938  		s->needs_sock_for_poll = QB_FALSE;
939  	
940  		qb_atomic_init();
941  	}
942