1    	/*
2    	 * Copyright (C) 2010,2013 Red Hat, Inc.
3    	 *
4    	 * Author: Angus Salkeld <asalkeld@redhat.com>
5    	 *
6    	 * This file is part of libqb.
7    	 *
8    	 * libqb is free software: you can redistribute it and/or modify
9    	 * it under the terms of the GNU Lesser General Public License as published by
10   	 * the Free Software Foundation, either version 2.1 of the License, or
11   	 * (at your option) any later version.
12   	 *
13   	 * libqb is distributed in the hope that it will be useful,
14   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   	 * GNU Lesser General Public License for more details.
17   	 *
18   	 * You should have received a copy of the GNU Lesser General Public License
19   	 * along with libqb.  If not, see <http://www.gnu.org/licenses/>.
20   	 */
21   	#include "os_base.h"
22   	#include <poll.h>
23   	
24   	#ifdef HAVE_SYS_UN_H
25   	#include <sys/un.h>
26   	#endif /* HAVE_SYS_UN_H */
27   	#ifdef HAVE_SYS_MMAN_H
28   	#include <sys/mman.h>
29   	#endif
30   	
31   	#include <qb/qbatomic.h>
32   	#include <qb/qbipcs.h>
33   	#include <qb/qbloop.h>
34   	#include <qb/qbdefs.h>
35   	
36   	#include "util_int.h"
37   	#include "ipc_int.h"
38   	
39   	struct ipc_us_control {
40   		int32_t sent;
41   		int32_t flow_control;
42   	};
43   	#define SHM_CONTROL_SIZE (3 * sizeof(struct ipc_us_control))
44   	
45   	int use_filesystem_sockets(void)
46   	{
47   		static int need_init = 1;
48   		static int filesystem_sockets = 0;
49   	
50   		if (need_init) {
51   	#if defined(QB_LINUX) || defined(QB_CYGWIN)
52   			struct stat buf;
53   	
54   			if (stat(FORCESOCKETSFILE, &buf) == 0) {
55   				filesystem_sockets = 1;
56   			}
57   	#else
58   			filesystem_sockets = 1;
59   	#endif
60   			need_init = 0;
61   		}
62   		return filesystem_sockets;
63   	}
64   	
65   	static void
66   	set_sock_addr(struct sockaddr_un *address, const char *socket_name)
67   	{
68   		memset(address, 0, sizeof(struct sockaddr_un));
69   		address->sun_family = AF_UNIX;
70   	#ifdef HAVE_STRUCT_SOCKADDR_UN_SUN_LEN
71   		address->sun_len = QB_SUN_LEN(address);
72   	#endif
73   	
74   		if (socket_name[0] == '/' || !use_filesystem_sockets()) {
75   			snprintf(address->sun_path + 1, UNIX_PATH_MAX - 1, "%s", socket_name);
76   		} else {
77   			snprintf(address->sun_path, sizeof(address->sun_path), "%s/%s", SOCKETDIR,
78   				 socket_name);
79   		}
80   	}
81   	
82   	static int32_t
83   	qb_ipc_dgram_sock_setup(const char *base_name,
84   				const char *service_name, int32_t * sock_pt,
85   				gid_t gid)
86   	{
87   		int32_t request_fd;
88   		struct sockaddr_un local_address;
89   		int32_t res = 0;
90   		char sock_path[PATH_MAX];
91   	
92   		request_fd = socket(PF_UNIX, SOCK_DGRAM, 0);
93   		if (request_fd == -1) {
94   			return -errno;
95   		}
96   	
97   		qb_socket_nosigpipe(request_fd);
98   		res = qb_sys_fd_nonblock_cloexec_set(request_fd);
99   		if (res < 0) {
100  			goto error_connect;
101  		}
102  		snprintf(sock_path, PATH_MAX, "%s-%s", base_name, service_name);
103  		set_sock_addr(&local_address, sock_path);
104  		if (use_filesystem_sockets()) {
105  			(void)unlink(local_address.sun_path);
106  		}
107  		res = bind(request_fd, (struct sockaddr *)&local_address,
108  			   sizeof(local_address));
109  	
110  		if (use_filesystem_sockets()) {
111  			(void)chmod(local_address.sun_path, 0660);
112  			(void)chown(local_address.sun_path, -1, gid);
113  		}
114  		if (res < 0) {
115  			goto error_connect;
116  		}
117  	
118  		*sock_pt = request_fd;
119  		return 0;
120  	
121  	error_connect:
122  		close(request_fd);
123  		*sock_pt = -1;
124  	
125  		return res;
126  	}
127  	
128  	static int32_t
129  	set_sock_size(int sockfd, size_t max_msg_size)
130  	{
131  		int32_t rc;
132  		unsigned int optval;
133  		socklen_t optlen = sizeof(optval);
134  	
135  		rc = getsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, &optlen);
136  	
137  		qb_util_log(LOG_TRACE, "%d: getsockopt(%d, SO_SNDBUF, needed:%d) actual:%d",
138  			rc, sockfd, max_msg_size, optval);
139  	
140  		/* The optval <= max_msg_size check is weird...
141  		 * during testing it was discovered in some instances if the
142  		 * default optval is exactly equal to our max_msg_size, we couldn't
143  		 * actually send a message that large unless we explicitly set
144  		 * it using setsockopt... there is no good explaination for this. Most
145  		 * likely this is hitting some sort of "off by one" error in the kernel. */
146  		if (rc == 0 && optval <= max_msg_size) {
147  			optval = max_msg_size;
148  			optlen = sizeof(optval);
149  			rc = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &optval, optlen);
150  		}
151  	
152  		if (rc != 0) {
153  			return -errno;
154  		}
155  	
156  		rc = getsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, &optlen);
157  	
158  		qb_util_log(LOG_TRACE, "%d: getsockopt(%d, SO_RCVBUF, needed:%d) actual:%d",
159  			rc, sockfd, max_msg_size, optval);
160  	
161  		/* Set the sockets receive buffer size to match the send buffer.  On
162  		 * FreeBSD without this calls to sendto() will result in an ENOBUFS error
163  		 * if the message is larger than net.local.dgram.recvspace sysctl. */
164  		if (rc == 0 && optval <= max_msg_size) {
165  			optval = max_msg_size;
166  			optlen = sizeof(optval);
167  			rc = setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &optval, optlen);
168  		}
169  	
170  		if (rc != 0) {
171  			return -errno;
172  		}
173  	
174  		return rc;
175  	}
176  	
177  	static int32_t
178  	dgram_verify_msg_size(size_t max_msg_size)
179  	{
180  		int32_t rc = -1;
181  		int32_t sockets[2];
182  		int32_t tries = 0;
183  		int32_t write_passed = 0;
184  		int32_t read_passed = 0;
185  		char buf[max_msg_size];
186  		memset (buf, 0, max_msg_size);
187  	
188  		if (socketpair(AF_UNIX, SOCK_DGRAM, 0, sockets) < 0) {
189  			qb_util_perror(LOG_DEBUG, "error calling socketpair()");
190  			goto cleanup_socks;
191  		}
192  	
193  		if (set_sock_size(sockets[0], max_msg_size) != 0) {
194  			qb_util_log(LOG_DEBUG, "error set_sock_size(sockets[0],%#x)",
195  				    max_msg_size);
196  			goto cleanup_socks;
197  		}
198  		if (set_sock_size(sockets[1], max_msg_size) != 0) {
199  			qb_util_log(LOG_DEBUG, "error set_sock_size(sockets[1],%#x)",
200  				    max_msg_size);
201  			goto cleanup_socks;
202  		}
203  	
204  		for (tries = 0; tries < 3; tries++) {
205  	
206  			if (write_passed == 0) {
207  				rc = write(sockets[1], buf, max_msg_size);
208  	
209  				if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
210  					continue;
211  				} else if (rc == max_msg_size) {
212  					write_passed = 1;
213  				} else {
214  					break;
215  				}
216  			}
217  	
218  			if (read_passed == 0) {
219  				rc = read(sockets[0], buf, max_msg_size);
220  	
221  				if (rc < 0 && (errno == EAGAIN || errno == EINTR)) {
222  					continue;
223  				} else if (rc == max_msg_size) {
224  					read_passed = 1;
225  				} else {
226  					break;
227  				}
228  			}
229  	
230  			if (read_passed && write_passed) {
231  				rc = 0;
232  				break;
233  			}
234  		}
235  	
236  	
237  	cleanup_socks:
238  		close(sockets[0]);
239  		close(sockets[1]);
240  		return rc;
241  	}
242  	
243  	int32_t
244  	qb_ipcc_verify_dgram_max_msg_size(size_t max_msg_size)
245  	{
246  		int32_t i;
(1) Event assign_neg_constant: Assigning: "last" = "-1", which is negative.
Also see events: [return_negative_variable]
247  		int32_t last = -1;
248  		int32_t inc = 2048;
249  	
(2) Event cond_false: Condition "dgram_verify_msg_size(max_msg_size) == 0", taking false branch.
250  		if (dgram_verify_msg_size(max_msg_size) == 0) {
251  			return max_msg_size;
(3) Event if_end: End of if statement.
252  		}
253  	
(4) Event cond_false: Condition "i < max_msg_size", taking false branch.
254  		for (i = inc; i < max_msg_size; i+=inc) {
255  			if (dgram_verify_msg_size(i) == 0) {
256  				last = i;
257  			} else if (inc >= 512) {
258  				i-=inc;
259  				inc = inc/2;
260  			} else {
261  				break;
262  			}
(5) Event loop_end: Reached end of loop.
263  		}
264  	
(6) Event return_negative_variable: Explicitly returning negative variable "last".
Also see events: [assign_neg_constant]
265  		return last;
266  	}
267  	
268  	/*
269  	 * bind to "base_name-local_name"
270  	 * connect to "base_name-remote_name"
271  	 * output sock_pt
272  	 */
273  	static int32_t
274  	qb_ipc_dgram_sock_connect(const char *base_name,
275  				  const char *local_name,
276  				  const char *remote_name,
277  				  int32_t max_msg_size, int32_t * sock_pt, gid_t gid)
278  	{
279  		char sock_path[PATH_MAX];
280  		struct sockaddr_un remote_address;
281  		int32_t res = qb_ipc_dgram_sock_setup(base_name, local_name,
282  						      sock_pt, gid);
283  		if (res < 0) {
284  			return res;
285  		}
286  	
287  		snprintf(sock_path, PATH_MAX, "%s-%s", base_name, remote_name);
288  		set_sock_addr(&remote_address, sock_path);
289  		if (connect(*sock_pt, (struct sockaddr *)&remote_address,
290  			    QB_SUN_LEN(&remote_address)) == -1) {
291  			res = -errno;
292  			goto error_connect;
293  		}
294  	
295  		return set_sock_size(*sock_pt, max_msg_size);
296  	
297  	error_connect:
298  		close(*sock_pt);
299  		*sock_pt = -1;
300  	
301  		return res;
302  	}
303  	
304  	static int32_t
305  	_finish_connecting(struct qb_ipc_one_way *one_way)
306  	{
307  		struct sockaddr_un remote_address;
308  		int res;
309  		int error;
310  		int retry = 0;
311  	
312  		set_sock_addr(&remote_address, one_way->u.us.sock_name);
313  	
314  		/* this retry loop is here to help connecting when trying to send
315  		 * an event right after connection setup.
316  		 */
317  		do {
318  			errno = 0;
319  			res = connect(one_way->u.us.sock,
320  				      (struct sockaddr *)&remote_address,
321  				      QB_SUN_LEN(&remote_address));
322  			if (res == -1) {
323  				error = -errno;
324  				qb_util_perror(LOG_DEBUG, "error calling connect()");
325  				retry++;
326  				usleep(100000);
327  			}
328  		} while (res == -1 && retry < 10);
329  		if (res == -1) {
330  			return error;
331  		}
332  	
333  		/* Beside disposing no longer needed value, this also signals that
334  		   we are done with connect-on-send arrangement at the server side
335  		   (i.e. for response and event channels). */
336  		free(one_way->u.us.sock_name);
337  		one_way->u.us.sock_name = NULL;
338  	
339  		return set_sock_size(one_way->u.us.sock, one_way->max_msg_size);
340  	}
341  	
342  	/*
343  	 * client functions
344  	 * --------------------------------------------------------
345  	 */
346  	static void
347  	qb_ipcc_us_disconnect(struct qb_ipcc_connection *c)
348  	{
349  		munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
350  		unlink(c->request.u.us.shared_file_name);
351  	
352  		if (use_filesystem_sockets()) {
353  			struct sockaddr_un un_addr;
354  			socklen_t un_addr_len = sizeof(struct sockaddr_un);
355  			char *base_name;
356  			char sock_name[PATH_MAX];
357  			size_t length;
358  			if (getsockname(c->response.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
359  				length = strlen(un_addr.sun_path);
360  				base_name = strndup(un_addr.sun_path,
361  						    length - /* strlen("-response") */ 9);
362  				qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
363  				snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
364  				qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
365  				unlink(sock_name);
366  				snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
367  				qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
368  				unlink(sock_name);
369  				snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
370  				qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
371  				unlink(sock_name);
372  				snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
373  				qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
374  				unlink(sock_name);
375  				free(base_name);
376  			}
377  		}
378  	
379  		/* Last-ditch attempt to tidy up after ourself */
380  		remove_tempdir(c->request.u.us.shared_file_name);
381  	
382  		qb_ipcc_us_sock_close(c->event.u.us.sock);
383  		qb_ipcc_us_sock_close(c->request.u.us.sock);
384  		qb_ipcc_us_sock_close(c->setup.u.us.sock);
385  	}
386  	
387  	static ssize_t
388  	qb_ipc_socket_send(struct qb_ipc_one_way *one_way,
389  			   const void *msg_ptr, size_t msg_len)
390  	{
391  		ssize_t rc = 0;
392  		struct ipc_us_control *ctl;
393  		ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
394  	
395  		if (one_way->u.us.sock_name) {
396  			rc = _finish_connecting(one_way);
397  			if (rc < 0) {
398  				qb_util_log(LOG_ERR, "socket connect-on-send");
399  				return rc;
400  			}
401  		}
402  	
403  		qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
404  		rc = send(one_way->u.us.sock, msg_ptr, msg_len, MSG_NOSIGNAL);
405  		if (rc == -1) {
406  			rc = -errno;
407  			if (errno != EAGAIN && errno != ENOBUFS) {
408  				qb_util_perror(LOG_DEBUG, "socket_send:send");
409  			}
410  		}
411  		qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
412  	
413  		if (ctl && rc == msg_len) {
414  			qb_atomic_int_inc(&ctl->sent);
415  		}
416  	
417  		return rc;
418  	}
419  	
420  	static ssize_t
421  	qb_ipc_socket_sendv(struct qb_ipc_one_way *one_way, const struct iovec *iov,
422  			    size_t iov_len)
423  	{
424  		int32_t rc;
425  		struct ipc_us_control *ctl;
426  		ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
427  	
428  		qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
429  	
430  		if (one_way->u.us.sock_name) {
431  			rc = _finish_connecting(one_way);
432  			if (rc < 0) {
433  				qb_util_perror(LOG_ERR, "socket connect-on-sendv");
434  				qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
435  				return rc;
436  			}
437  		}
438  	
439  		rc = writev(one_way->u.us.sock, iov, iov_len);
440  	
441  		if (rc == -1) {
442  			rc = -errno;
443  			if (errno != EAGAIN && errno != ENOBUFS) {
444  				qb_util_perror(LOG_DEBUG, "socket_sendv:writev %d",
445  					       one_way->u.us.sock);
446  			}
447  		}
448  	
449  		qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
450  	
451  		if (ctl && rc > 0) {
452  			qb_atomic_int_inc(&ctl->sent);
453  		}
454  		return rc;
455  	}
456  	
457  	/*
458  	 * recv a message of unknown size.
459  	 */
460  	static ssize_t
461  	qb_ipc_us_recv_at_most(struct qb_ipc_one_way *one_way,
462  			       void *msg, size_t len, int32_t timeout)
463  	{
464  		int32_t result;
465  		int32_t final_rc = 0;
466  		int32_t to_recv = 0;
467  		char *data = msg;
468  		struct ipc_us_control *ctl = NULL;
469  		int32_t time_waited = 0;
470  		int32_t time_to_wait = timeout;
471  	
472  		if (timeout == -1) {
473  			time_to_wait = 1000;
474  		}
475  	
476  		qb_sigpipe_ctl(QB_SIGPIPE_IGNORE);
477  	
478  	retry_peek:
479  		result = recv(one_way->u.us.sock, data,
480  			      sizeof(struct qb_ipc_request_header),
481  			      MSG_NOSIGNAL | MSG_PEEK);
482  	
483  		if (result == -1) {
484  	
485  			if (errno != EAGAIN) {
486  				final_rc = -errno;
487  				if (use_filesystem_sockets()) {
488  					if (errno == ECONNRESET || errno == EPIPE) {
489  						final_rc = -ENOTCONN;
490  					}
491  				}
492  				goto cleanup_sigpipe;
493  			}
494  	
495  			/* check to see if we have enough time left to try again */
496  			if (time_waited < timeout || timeout == -1) {
497  				result = qb_ipc_us_ready(one_way, NULL, time_to_wait, POLLIN);
498  				if (qb_ipc_us_sock_error_is_disconnected(result)) {
499  					final_rc = result;
500  					goto cleanup_sigpipe;
501  				}
502  				time_waited += time_to_wait;
503  				goto retry_peek;
504  			} else if (time_waited >= timeout) {
505  				final_rc = -ETIMEDOUT;
506  				goto cleanup_sigpipe;
507  			}
508  		}
509  		if (result >= sizeof(struct qb_ipc_request_header)) {
510  			struct qb_ipc_request_header *hdr = NULL;
511  			hdr = (struct qb_ipc_request_header *)msg;
512  			to_recv = hdr->size;
513  		}
514  	
515  		result = recv(one_way->u.us.sock, data, to_recv,
516  			      MSG_NOSIGNAL | MSG_WAITALL);
517  		if (result == -1) {
518  			final_rc = -errno;
519  			goto cleanup_sigpipe;
520  		} else if (result == 0) {
521  			qb_util_log(LOG_DEBUG, "recv == 0 -> ENOTCONN");
522  	
523  			final_rc = -ENOTCONN;
524  			goto cleanup_sigpipe;
525  		}
526  	
527  		final_rc = result;
528  	
529  		ctl = (struct ipc_us_control *)one_way->u.us.shared_data;
530  		if (ctl) {
531  			(void)qb_atomic_int_dec_and_test(&ctl->sent);
532  		}
533  	
534  	cleanup_sigpipe:
535  		qb_sigpipe_ctl(QB_SIGPIPE_DEFAULT);
536  		return final_rc;
537  	}
538  	
539  	static void
540  	qb_ipc_us_fc_set(struct qb_ipc_one_way *one_way, int32_t fc_enable)
541  	{
542  		struct ipc_us_control *ctl =
543  		    (struct ipc_us_control *)one_way->u.us.shared_data;
544  	
545  		qb_util_log(LOG_TRACE, "setting fc to %d", fc_enable);
546  		qb_atomic_int_set(&ctl->flow_control, fc_enable);
547  	}
548  	
549  	static int32_t
550  	qb_ipc_us_fc_get(struct qb_ipc_one_way *one_way)
551  	{
552  		struct ipc_us_control *ctl =
553  		    (struct ipc_us_control *)one_way->u.us.shared_data;
554  	
555  		return qb_atomic_int_get(&ctl->flow_control);
556  	}
557  	
558  	static ssize_t
559  	qb_ipc_us_q_len_get(struct qb_ipc_one_way *one_way)
560  	{
561  		struct ipc_us_control *ctl =
562  		    (struct ipc_us_control *)one_way->u.us.shared_data;
563  		return qb_atomic_int_get(&ctl->sent);
564  	}
565  	
566  	int32_t
567  	qb_ipcc_us_connect(struct qb_ipcc_connection * c,
568  			   struct qb_ipc_connection_response * r)
569  	{
570  		int32_t res;
571  		char path[PATH_MAX];
572  		int32_t fd_hdr;
573  		char *shm_ptr;
574  	
575  		qb_atomic_init();
576  	
577  		c->needs_sock_for_poll = QB_FALSE;
578  		c->funcs.send = qb_ipc_socket_send;
579  		c->funcs.sendv = qb_ipc_socket_sendv;
580  		c->funcs.recv = qb_ipc_us_recv_at_most;
581  		c->funcs.fc_get = qb_ipc_us_fc_get;
582  		c->funcs.disconnect = qb_ipcc_us_disconnect;
583  	
584  		fd_hdr = qb_sys_mmap_file_open(path, r->request,
585  					       SHM_CONTROL_SIZE, O_RDWR);
586  		if (fd_hdr < 0) {
587  			res = fd_hdr;
588  			errno = -fd_hdr;
589  			qb_util_perror(LOG_ERR, "couldn't open file for mmap");
590  			return res;
591  		}
592  		(void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
593  		shm_ptr = mmap(0, SHM_CONTROL_SIZE,
594  			       PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
595  	
596  		if (shm_ptr == MAP_FAILED) {
597  			res = -errno;
598  			qb_util_perror(LOG_ERR, "couldn't create mmap for header");
599  			goto cleanup_hdr;
600  		}
601  		c->request.u.us.shared_data = shm_ptr;
602  		c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
603  		c->event.u.us.shared_data =  shm_ptr + (2 * sizeof(struct ipc_us_control));
604  	
605  		close(fd_hdr);
606  		fd_hdr = -1;
607  	
608  		res = qb_ipc_dgram_sock_connect(r->response, "response", "request",
609  						r->max_msg_size, &c->request.u.us.sock, c->egid);
610  		if (res != 0) {
611  			goto cleanup_hdr;
612  		}
613  		c->response.u.us.sock = c->request.u.us.sock;
614  	
615  		res = qb_ipc_dgram_sock_connect(r->response, "event", "event-tx",
616  						r->max_msg_size, &c->event.u.us.sock, c->egid);
617  		if (res != 0) {
618  			goto cleanup_hdr;
619  		}
620  	
621  		return 0;
622  	
623  	cleanup_hdr:
624  		if (fd_hdr >= 0) {
625  			close(fd_hdr);
626  		}
627  		close(c->event.u.us.sock);
628  		close(c->request.u.us.sock);
629  		unlink(r->request);
630  		munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
631  		return res;
632  	}
633  	
634  	/*
635  	 * service functions
636  	 * --------------------------------------------------------
637  	 */
638  	static int32_t
639  	_sock_connection_liveliness(int32_t fd, int32_t revents, void *data)
640  	{
641  		struct qb_ipcs_connection *c = (struct qb_ipcs_connection *)data;
642  	
643  		qb_util_log(LOG_DEBUG, "LIVENESS: fd %d event %d conn (%s)",
644  			    fd, revents, c->description);
645  		if (revents & POLLNVAL) {
646  			qb_util_log(LOG_DEBUG, "NVAL conn (%s)", c->description);
647  			qb_ipcs_disconnect(c);
648  			return -EINVAL;
649  		}
650  		if (revents & POLLHUP) {
651  			qb_util_log(LOG_DEBUG, "HUP conn (%s)", c->description);
652  			qb_ipcs_disconnect(c);
653  			return -ESHUTDOWN;
654  		}
655  	
656  		/* If we actually get POLLIN for some reason here, it most
657  		 * certainly means EOF. Do a recv on the fd to detect eof and
658  		 * then disconnect */
659  		if (revents & POLLIN) {
660  			char buf[10];
661  			int res;
662  	
663  			res = recv(fd, buf, sizeof(buf), MSG_DONTWAIT);
664  			if (res < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
665  				res = -errno;
666  			} else if (res == 0) {
667  				qb_util_log(LOG_DEBUG, "EOF conn (%s)", c->description);
668  				res = -ESHUTDOWN;
669  			}
670  	
671  			if (res < 0) {
672  				qb_ipcs_disconnect(c);
673  				return res;
674  			}
675  		}
676  	
677  		return 0;
678  	}
679  	
680  	static int32_t
681  	_sock_add_to_mainloop(struct qb_ipcs_connection *c)
682  	{
683  		int res;
684  	
685  		res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
686  							c->request.u.us.sock,
687  							POLLIN | POLLPRI | POLLNVAL,
688  							c,
689  							qb_ipcs_dispatch_connection_request);
690  	
691  		if (res < 0) {
692  			qb_util_log(LOG_ERR,
693  				    "Error adding socket to mainloop (%s).",
694  				    c->description);
695  			return res;
696  		}
697  	
698  		res = c->service->poll_fns.dispatch_add(c->service->poll_priority,
699  							c->setup.u.us.sock,
700  							POLLIN | POLLPRI | POLLNVAL,
701  							c, _sock_connection_liveliness);
702  		qb_util_log(LOG_DEBUG, "added %d to poll loop (liveness)",
703  			    c->setup.u.us.sock);
704  		if (res < 0) {
705  			qb_util_perror(LOG_ERR, "Error adding setupfd to mainloop");
706  			(void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
707  			return res;
708  		}
709  		return res;
710  	}
711  	
712  	static void
713  	_sock_rm_from_mainloop(struct qb_ipcs_connection *c)
714  	{
715  		(void)c->service->poll_fns.dispatch_del(c->request.u.us.sock);
716  		(void)c->service->poll_fns.dispatch_del(c->setup.u.us.sock);
717  	}
718  	
719  	static void
720  	qb_ipcs_us_disconnect(struct qb_ipcs_connection *c)
721  	{
722  		qb_enter();
723  	
724  		if (c->state == QB_IPCS_CONNECTION_ESTABLISHED ||
725  		    c->state == QB_IPCS_CONNECTION_ACTIVE) {
726  			_sock_rm_from_mainloop(c);
727  	
728  			/* Free the temporaries denoting which respective socket
729  			   name on the client's side to connect upon the first
730  			   send operation -- normally the variable is free'd once
731  			   the connection is established but there may have been
732  			   no chance for that.  */
733  			free(c->response.u.us.sock_name);
734  			c->response.u.us.sock_name = NULL;
735  	
736  			free(c->event.u.us.sock_name);
737  			c->event.u.us.sock_name = NULL;
738  	
739  			if (use_filesystem_sockets()) {
740  				struct sockaddr_un un_addr;
741  				socklen_t un_addr_len = sizeof(struct sockaddr_un);
742  				char *base_name;
743  				char sock_name[PATH_MAX];
744  				size_t length;
745  				if (getsockname(c->request.u.us.sock, (struct sockaddr *)&un_addr, &un_addr_len) == 0) {
746  					length = strlen(un_addr.sun_path);
747  					base_name = strndup(un_addr.sun_path,
748  							    length - /* strlen("-request") */ 8);
749  					qb_util_log(LOG_DEBUG, "unlinking socket bound files with base_name=%s length=%d",base_name,length);
750  					snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"request");
751  					qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
752  					unlink(sock_name);
753  					snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event");
754  					qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
755  					unlink(sock_name);
756  					snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"event-tx");
757  					qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
758  					unlink(sock_name);
759  					snprintf(sock_name,PATH_MAX,"%s-%s",base_name,"response");
760  					qb_util_log(LOG_DEBUG, "unlink sock_name=%s",sock_name);
761  					unlink(sock_name);
762  					free(base_name);
763  				}
764  			}
765  			qb_ipcc_us_sock_close(c->setup.u.us.sock);
766  			qb_ipcc_us_sock_close(c->request.u.us.sock);
767  			qb_ipcc_us_sock_close(c->event.u.us.sock);
768  		}
769  		if (c->state == QB_IPCS_CONNECTION_SHUTTING_DOWN ||
770  		    c->state == QB_IPCS_CONNECTION_ACTIVE) {
771  			munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
772  			unlink(c->request.u.us.shared_file_name);
773  	
774  	
775  		}
776  		remove_tempdir(c->description);
777  	}
778  	
779  	static int32_t
780  	qb_ipcs_us_connect(struct qb_ipcs_service *s,
781  			   struct qb_ipcs_connection *c,
782  			   struct qb_ipc_connection_response *r)
783  	{
784  		char path[PATH_MAX];
785  		int32_t fd_hdr;
786  		int32_t res = 0;
787  		struct ipc_us_control *ctl;
788  		char *shm_ptr;
789  	
790  		qb_util_log(LOG_DEBUG, "connecting to client (%s)", c->description);
791  	
792  		c->request.u.us.sock = c->setup.u.us.sock;
793  		c->response.u.us.sock = c->setup.u.us.sock;
794  	
795  		snprintf(r->request, NAME_MAX, "%s-control-%s",
796  			 c->description, s->name);
797  		snprintf(r->response, NAME_MAX, "%s-%s", c->description, s->name);
798  	
799  		fd_hdr = qb_sys_mmap_file_open(path, r->request,
800  					       SHM_CONTROL_SIZE,
801  					       O_CREAT | O_TRUNC | O_RDWR | O_EXCL);
802  		if (fd_hdr < 0) {
803  			res = fd_hdr;
804  			errno = -fd_hdr;
805  			qb_util_perror(LOG_ERR, "couldn't create file for mmap (%s)",
806  				       c->description);
807  			return res;
808  		}
809  		(void)strlcpy(r->request, path, PATH_MAX);
810  		(void)strlcpy(c->request.u.us.shared_file_name, r->request, NAME_MAX);
811  		res = chown(r->request, c->auth.uid, c->auth.gid);
812  		if (res != 0) {
813  			/* ignore res, this is just for the compiler warnings.
814  			 */
815  			res = 0;
816  		}
817  		res = chmod(r->request, c->auth.mode);
818  		if (res != 0) {
819  			/* ignore res, this is just for the compiler warnings.
820  			 */
821  			res = 0;
822  		}
823  	
824  		shm_ptr = mmap(0, SHM_CONTROL_SIZE,
825  			       PROT_READ | PROT_WRITE, MAP_SHARED, fd_hdr, 0);
826  	
827  		if (shm_ptr == MAP_FAILED) {
828  			res = -errno;
829  			qb_util_perror(LOG_ERR, "couldn't create mmap for header (%s)",
830  				       c->description);
831  			goto cleanup_hdr;
832  		}
833  		c->request.u.us.shared_data = shm_ptr;
834  		c->response.u.us.shared_data = shm_ptr + sizeof(struct ipc_us_control);
835  		c->event.u.us.shared_data =  shm_ptr + (2 * sizeof(struct ipc_us_control));
836  	
837  		ctl = (struct ipc_us_control *)c->request.u.us.shared_data;
838  		ctl->sent = 0;
839  		ctl->flow_control = 0;
840  		ctl = (struct ipc_us_control *)c->response.u.us.shared_data;
841  		ctl->sent = 0;
842  		ctl->flow_control = 0;
843  		ctl = (struct ipc_us_control *)c->event.u.us.shared_data;
844  		ctl->sent = 0;
845  		ctl->flow_control = 0;
846  	
847  		close(fd_hdr);
848  		fd_hdr = -1;
849  	
850  		/* request channel */
851  		res = qb_ipc_dgram_sock_setup(r->response, "request",
852  					      &c->request.u.us.sock, c->egid);
853  		if (res < 0) {
854  			goto cleanup_hdr;
855  		}
856  	
857  		res = set_sock_size(c->request.u.us.sock, c->request.max_msg_size);
858  		if (res != 0) {
859  			goto cleanup_hdr;
860  		}
861  	
862  		c->setup.u.us.sock_name = NULL;
863  		c->request.u.us.sock_name = NULL;
864  	
865  		/* response channel */
866  		c->response.u.us.sock = c->request.u.us.sock;
867  		snprintf(path, PATH_MAX, "%s-%s", r->response, "response");
868  		c->response.u.us.sock_name = strdup(path);
869  	
870  		/* event channel */
871  		res = qb_ipc_dgram_sock_setup(r->response, "event-tx",
872  					      &c->event.u.us.sock, c->egid);
873  		if (res < 0) {
874  			goto cleanup_hdr;
875  		}
876  	
877  		res = set_sock_size(c->event.u.us.sock, c->event.max_msg_size);
878  		if (res != 0) {
879  			goto cleanup_hdr;
880  		}
881  	
882  		snprintf(path, PATH_MAX, "%s-%s", r->response, "event");
883  		c->event.u.us.sock_name = strdup(path);
884  	
885  		res = _sock_add_to_mainloop(c);
886  		if (res < 0) {
887  			goto cleanup_hdr;
888  		}
889  	
890  		return res;
891  	
892  	cleanup_hdr:
893  		free(c->response.u.us.sock_name);
894  		free(c->event.u.us.sock_name);
895  	
896  		if (fd_hdr >= 0) {
897  			close(fd_hdr);
898  		}
899  		unlink(r->request);
900  		munmap(c->request.u.us.shared_data, SHM_CONTROL_SIZE);
901  		return res;
902  	}
903  	
904  	void
905  	qb_ipcs_us_init(struct qb_ipcs_service *s)
906  	{
907  		s->funcs.connect = qb_ipcs_us_connect;
908  		s->funcs.disconnect = qb_ipcs_us_disconnect;
909  	
910  		s->funcs.recv = qb_ipc_us_recv_at_most;
911  		s->funcs.peek = NULL;
912  		s->funcs.reclaim = NULL;
913  		s->funcs.send = qb_ipc_socket_send;
914  		s->funcs.sendv = qb_ipc_socket_sendv;
915  	
916  		s->funcs.fc_set = qb_ipc_us_fc_set;
917  		s->funcs.q_len_get = qb_ipc_us_q_len_get;
918  	
919  		s->needs_sock_for_poll = QB_FALSE;
920  	
921  		qb_atomic_init();
922  	}
923