1    	/*
2    	 * Copyright (C) 2011 Red Hat, Inc.
3    	 *
4    	 * All rights reserved.
5    	 *
6    	 * Author: Angus Salkeld <asalkeld@redhat.com>
7    	 *
8    	 * libqb is free software: you can redistribute it and/or modify
9    	 * it under the terms of the GNU Lesser General Public License as published by
10   	 * the Free Software Foundation, either version 2.1 of the License, or
11   	 * (at your option) any later version.
12   	 *
13   	 * libqb is distributed in the hope that it will be useful,
14   	 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15   	 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16   	 * GNU Lesser General Public License for more details.
17   	 *
18   	 * You should have received a copy of the GNU Lesser General Public License
19   	 * along with libqb.  If not, see <http://www.gnu.org/licenses/>.
20   	 */
21   	#include "os_base.h"
22   	
23   	#include <pthread.h>
24   	#include <semaphore.h>
25   	
26   	#include <qb/qbdefs.h>
27   	#include <qb/qblist.h>
28   	#include <qb/qbutil.h>
29   	#include "log_int.h"
30   	
31   	static int wthread_active = QB_FALSE;
32   	
33   	static int wthread_should_exit = QB_FALSE;
34   	
35   	static qb_thread_lock_t *logt_wthread_lock = NULL;
36   	
37   	static QB_LIST_DECLARE(logt_print_finished_records);
38   	
39   	static int logt_memory_used = 0;
40   	
41   	static int logt_dropped_messages = 0;
42   	
43   	static sem_t logt_thread_start;
44   	
45   	static sem_t logt_print_finished;
46   	
47   	static int logt_sched_param_queued = QB_FALSE;
48   	
49   	static int logt_sched_policy;
50   	
51   	#if defined(HAVE_PTHREAD_SETSCHEDPARAM)
52   	static struct sched_param logt_sched_param;
53   	#endif /* HAVE_PTHREAD_SETSCHEDPARAM */
54   	
55   	static pthread_t logt_thread_id = 0;
56   	
57   	static void *qb_logt_worker_thread(void *data) __attribute__ ((noreturn));
58   	static void *
59   	qb_logt_worker_thread(void *data)
60   	{
61   		struct qb_log_record *rec;
62   		int dropped = 0;
63   		int res;
64   	
65   		/* Signal qb_log_thread_start that the initialization may continue */
66   		sem_post(&logt_thread_start);
67   		for (;;) {
68   	retry_sem_wait:
69   			res = sem_wait(&logt_print_finished);
70   			if (res == -1 && errno == EINTR) {
71   				goto retry_sem_wait;
72   			} else if (res == -1) {
73   				/* This case shouldn't happen */
74   				pthread_exit(NULL);
75   			}
76   	
77   			(void)qb_thread_lock(logt_wthread_lock);
78   			if (wthread_should_exit) {
79   				int value = -1;
80   	
81   				(void)sem_getvalue(&logt_print_finished, &value);
82   				if (value == 0) {
83   					(void)qb_thread_unlock(logt_wthread_lock);
84   					pthread_exit(NULL);
85   				}
86   			}
87   	
88   			rec =
89   			    qb_list_first_entry(&logt_print_finished_records,
90   					  struct qb_log_record, list);
91   			qb_list_del(&rec->list);
92   			logt_memory_used = logt_memory_used - strlen(rec->buffer) -
93   			    sizeof(struct qb_log_record) - 1;
94   			dropped = logt_dropped_messages;
95   			logt_dropped_messages = 0;
96   			if (dropped) {
97   				printf("%d messages lost\n", dropped);
98   			}
99   	
100  			qb_log_thread_log_write(rec->cs, &rec->timestamp, rec->buffer);
101  	
102  			(void)qb_thread_unlock(logt_wthread_lock);
103  			free(rec->buffer);
104  			free(rec);
105  		}
106  	}
107  	
108  	int32_t
109  	qb_log_thread_priority_set(int32_t policy, int32_t priority)
110  	{
111  		int res = 0;
112  	
113  	#if defined(HAVE_PTHREAD_SETSCHEDPARAM)
114  	
115  		logt_sched_policy = policy;
116  	
117  		if (policy == SCHED_OTHER
118  	#ifdef SCHED_IDLE
119  		    || policy == SCHED_IDLE
120  	#endif
121  	#if defined(SCHED_BATCH) && !defined(QB_DARWIN)
122  		    || policy == SCHED_BATCH
123  	#endif
124  		    ) {
125  			logt_sched_param.sched_priority = 0;
126  		} else {
127  			logt_sched_param.sched_priority = priority;
128  		}
129  		if (wthread_active == QB_FALSE) {
130  			logt_sched_param_queued = QB_TRUE;
131  		} else {
132  			res = pthread_setschedparam(logt_thread_id, policy,
133  						    &logt_sched_param);
134  			if (res != 0) {
135  				res = -res;
136  			}
137  		}
138  	#endif
139  		return res;
140  	}
141  	
142  	int32_t
143  	qb_log_thread_start(void)
144  	{
145  		int res;
146  		qb_thread_lock_t *wthread_lock;
147  	
(1) Event thread1_checks_field: The thread 1 uses the value read from static field "wthread_active" in the condition "wthread_active". It sees that the condition is false. Control is switched to the thread 2.
(2) Event thread2_checks_field: The thread 2 uses the value read from static field "wthread_active" in the condition "wthread_active". It sees that the condition is false.
(7) Event use_same_locks_for_read_and_modify: Guard the modification of "wthread_active" and the read used to decide whether to modify "wthread_active" with the same set of locks.
Also see events: [thread2_acquires_lock][thread2_modifies_field][thread1_acquires_lock][thread1_overwrites_value_in_field]
148  		if (wthread_active) {
149  			return 0;
150  		}
151  	
152  		wthread_active = QB_TRUE;
153  		sem_init(&logt_thread_start, 0, 0);
154  		sem_init(&logt_print_finished, 0, 0);
155  		errno = 0;
156  		logt_wthread_lock = qb_thread_lock_create(QB_THREAD_LOCK_SHORT);
157  		if (logt_wthread_lock == NULL) {
158  			return errno ? -errno : -1;
159  		}
160  		res = pthread_create(&logt_thread_id, NULL,
161  				     qb_logt_worker_thread, NULL);
162  		if (res != 0) {
163  			wthread_active = QB_FALSE;
164  			(void)qb_thread_lock_destroy(logt_wthread_lock);
165  			return -res;
166  		}
(3) Event thread2_acquires_lock: The thread 2 acquires lock "logt_thread_start".
(5) Event thread1_acquires_lock: The thread 1 acquires lock "logt_thread_start".
Also see events: [thread1_checks_field][thread2_checks_field][thread2_modifies_field][thread1_overwrites_value_in_field][use_same_locks_for_read_and_modify]
167  		sem_wait(&logt_thread_start);
168  	
169  		if (logt_sched_param_queued) {
170  			res = qb_log_thread_priority_set(logt_sched_policy,
171  	#if defined(HAVE_PTHREAD_SETSCHEDPARAM)
172  			                                 logt_sched_param.sched_priority);
173  	#else
174  			                                 0);
175  	#endif
176  			if (res != 0) {
177  				goto cleanup_pthread;
178  			}
179  			logt_sched_param_queued = QB_FALSE;
180  		}
181  	
182  		return 0;
183  	
184  	cleanup_pthread:
185  		wthread_should_exit = QB_TRUE;
186  		sem_post(&logt_print_finished);
187  		pthread_join(logt_thread_id, NULL);
188  	
(4) Event thread2_modifies_field: The thread 2 sets "wthread_active" to a new value. Note that this write can be reordered at runtime to occur before instructions that do not access this field within this locked region. After the thread 2 leaves the critical section, control is switched back to the thread 1.
(6) Event thread1_overwrites_value_in_field: The thread 1 sets "wthread_active" to a new value. Now the two threads have an inconsistent view of "wthread_active" and updates to fields correlated with "wthread_active" may be lost.
Also see events: [thread1_checks_field][thread2_checks_field][thread2_acquires_lock][thread1_acquires_lock][use_same_locks_for_read_and_modify]
189  		wthread_active = QB_FALSE;
190  		wthread_lock = logt_wthread_lock;
191  		logt_wthread_lock = NULL;
192  		(void)qb_thread_lock_destroy(wthread_lock);
193  	
194  		sem_destroy(&logt_print_finished);
195  		sem_destroy(&logt_thread_start);
196  	
197  		return res;
198  	}
199  	
200  	
201  	void
202  	qb_log_thread_pause(struct qb_log_target *t)
203  	{
204  		if (t->threaded) {
205  			(void)qb_thread_lock(logt_wthread_lock);
206  		}
207  	}
208  	
209  	void
210  	qb_log_thread_resume(struct qb_log_target *t)
211  	{
212  		if (t->threaded) {
213  			(void)qb_thread_unlock(logt_wthread_lock);
214  		}
215  	}
216  	
217  	void
218  	qb_log_thread_log_post(struct qb_log_callsite *cs,
219  			       struct timespec *timestamp, const char *buffer)
220  	{
221  		struct qb_log_record *rec;
222  		size_t buf_size;
223  		size_t total_size;
224  	
225  		rec = malloc(sizeof(struct qb_log_record));
226  		if (rec == NULL) {
227  			return;
228  		}
229  	
230  		buf_size = strlen(buffer) + 1;
231  		total_size = sizeof(struct qb_log_record) + buf_size;
232  	
233  		rec->cs = cs;
234  		rec->buffer = malloc(buf_size);
235  		if (rec->buffer == NULL) {
236  			goto free_record;
237  		}
238  		memcpy(rec->buffer, buffer, buf_size);
239  	
240  		memcpy(&rec->timestamp, timestamp, sizeof(struct timespec));
241  	
242  		qb_list_init(&rec->list);
243  		(void)qb_thread_lock(logt_wthread_lock);
244  		logt_memory_used += total_size;
245  		if (logt_memory_used > 512000) {
246  			free(rec->buffer);
247  			free(rec);
248  			logt_memory_used = logt_memory_used - total_size;
249  			logt_dropped_messages += 1;
250  			(void)qb_thread_unlock(logt_wthread_lock);
251  			return;
252  	
253  		} else {
254  			qb_list_add_tail(&rec->list, &logt_print_finished_records);
255  		}
256  		(void)qb_thread_unlock(logt_wthread_lock);
257  	
258  		sem_post(&logt_print_finished);
259  		return;
260  	
261  	free_record:
262  		free(rec);
263  	}
264  	
265  	void
266  	qb_log_thread_stop(void)
267  	{
268  		int res;
269  		int value;
270  		struct qb_log_record *rec;
271  	
272  		if (wthread_active == QB_FALSE && logt_wthread_lock == NULL) {
273  			return;
274  		}
275  		if (wthread_active == QB_FALSE) {
276  			for (;;) {
277  				res = sem_getvalue(&logt_print_finished, &value);
278  				if (res != 0 || value == 0) {
279  					break;
280  				}
281  				sem_wait(&logt_print_finished);
282  	
283  				(void)qb_thread_lock(logt_wthread_lock);
284  	
285  				rec = qb_list_first_entry(&logt_print_finished_records,
286  						    struct qb_log_record, list);
287  				qb_list_del(&rec->list);
288  				logt_memory_used = logt_memory_used -
289  						   strlen(rec->buffer) -
290  						   sizeof(struct qb_log_record) - 1;
291  	
292  				qb_log_thread_log_write(rec->cs, &rec->timestamp,
293  							rec->buffer);
294  				(void)qb_thread_unlock(logt_wthread_lock);
295  				free(rec->buffer);
296  				free(rec);
297  			}
298  		} else {
299  			(void)qb_thread_lock(logt_wthread_lock);
300  			wthread_should_exit = QB_TRUE;
301  			(void)qb_thread_unlock(logt_wthread_lock);
302  			sem_post(&logt_print_finished);
303  			pthread_join(logt_thread_id, NULL);
304  		}
305  		(void)qb_thread_lock_destroy(logt_wthread_lock);
306  		sem_destroy(&logt_print_finished);
307  		sem_destroy(&logt_thread_start);
308  	}
309