1 /*
2 * Copyright (C) 2011 Red Hat, Inc.
3 *
4 * All rights reserved.
5 *
6 * Author: Angus Salkeld <asalkeld@redhat.com>
7 *
8 * libqb is free software: you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation, either version 2.1 of the License, or
11 * (at your option) any later version.
12 *
13 * libqb is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with libqb. If not, see <http://www.gnu.org/licenses/>.
20 */
21 #include "os_base.h"
22
23 #include <pthread.h>
24 #include <semaphore.h>
25
26 #include <qb/qbdefs.h>
27 #include <qb/qblist.h>
28 #include <qb/qbutil.h>
29 #include "log_int.h"
30
31 static int wthread_active = QB_FALSE;
32
33 static int wthread_should_exit = QB_FALSE;
34
35 static qb_thread_lock_t *logt_wthread_lock = NULL;
36
37 static QB_LIST_DECLARE(logt_print_finished_records);
38
39 static int logt_memory_used = 0;
40
41 static int logt_dropped_messages = 0;
42
43 static sem_t logt_thread_start;
44
45 static sem_t logt_print_finished;
46
47 static int logt_sched_param_queued = QB_FALSE;
48
49 static int logt_sched_policy;
50
51 #if defined(HAVE_PTHREAD_SETSCHEDPARAM)
52 static struct sched_param logt_sched_param;
53 #endif /* HAVE_PTHREAD_SETSCHEDPARAM */
54
55 static pthread_t logt_thread_id = 0;
56
57 static void *qb_logt_worker_thread(void *data) __attribute__ ((noreturn));
58 static void *
59 qb_logt_worker_thread(void *data)
60 {
61 struct qb_log_record *rec;
62 int dropped = 0;
63 int res;
64
65 /* Signal qb_log_thread_start that the initialization may continue */
66 sem_post(&logt_thread_start);
67 for (;;) {
68 retry_sem_wait:
69 res = sem_wait(&logt_print_finished);
70 if (res == -1 && errno == EINTR) {
71 goto retry_sem_wait;
72 } else if (res == -1) {
73 /* This case shouldn't happen */
74 pthread_exit(NULL);
75 }
76
77 (void)qb_thread_lock(logt_wthread_lock);
78 if (wthread_should_exit) {
79 int value = -1;
80
81 (void)sem_getvalue(&logt_print_finished, &value);
82 if (value == 0) {
83 (void)qb_thread_unlock(logt_wthread_lock);
84 pthread_exit(NULL);
85 }
86 }
87
88 rec =
89 qb_list_first_entry(&logt_print_finished_records,
90 struct qb_log_record, list);
91 qb_list_del(&rec->list);
92 logt_memory_used = logt_memory_used - strlen(rec->buffer) -
93 sizeof(struct qb_log_record) - 1;
94 dropped = logt_dropped_messages;
95 logt_dropped_messages = 0;
96 if (dropped) {
97 printf("%d messages lost\n", dropped);
98 }
99
100 qb_log_thread_log_write(rec->cs, &rec->timestamp, rec->buffer);
101
102 (void)qb_thread_unlock(logt_wthread_lock);
103 free(rec->buffer);
104 free(rec);
105 }
106 }
107
108 int32_t
109 qb_log_thread_priority_set(int32_t policy, int32_t priority)
110 {
111 int res = 0;
112
113 #if defined(HAVE_PTHREAD_SETSCHEDPARAM)
114
115 logt_sched_policy = policy;
116
117 if (policy == SCHED_OTHER
118 #ifdef SCHED_IDLE
119 || policy == SCHED_IDLE
120 #endif
121 #if defined(SCHED_BATCH) && !defined(QB_DARWIN)
122 || policy == SCHED_BATCH
123 #endif
124 ) {
125 logt_sched_param.sched_priority = 0;
126 } else {
127 logt_sched_param.sched_priority = priority;
128 }
129 if (wthread_active == QB_FALSE) {
130 logt_sched_param_queued = QB_TRUE;
131 } else {
132 res = pthread_setschedparam(logt_thread_id, policy,
133 &logt_sched_param);
134 if (res != 0) {
135 res = -res;
136 }
137 }
138 #endif
139 return res;
140 }
141
142 int32_t
143 qb_log_thread_start(void)
144 {
145 int res;
146 qb_thread_lock_t *wthread_lock;
147
148 if (wthread_active) {
149 return 0;
150 }
151
152 wthread_active = QB_TRUE;
153 sem_init(&logt_thread_start, 0, 0);
154 sem_init(&logt_print_finished, 0, 0);
155 errno = 0;
156 logt_wthread_lock = qb_thread_lock_create(QB_THREAD_LOCK_SHORT);
|
(1) Event thread1_checks_field: |
The thread 1 uses the value read from static field "logt_wthread_lock" in the condition "logt_wthread_lock == NULL". It sees that the condition is false. Control is switched to the thread 2. |
|
(2) Event thread2_checks_field: |
The thread 2 uses the value read from static field "logt_wthread_lock" in the condition "logt_wthread_lock == NULL". It sees that the condition is false. |
|
(7) Event use_same_locks_for_read_and_modify: |
Guard the modification of "logt_wthread_lock" and the read used to decide whether to modify "logt_wthread_lock" with the same set of locks. |
| Also see events: |
[thread2_acquires_lock][thread2_modifies_field][thread1_acquires_lock][thread1_overwrites_value_in_field] |
157 if (logt_wthread_lock == NULL) {
158 return errno ? -errno : -1;
159 }
160 res = pthread_create(&logt_thread_id, NULL,
161 qb_logt_worker_thread, NULL);
162 if (res != 0) {
163 wthread_active = QB_FALSE;
164 (void)qb_thread_lock_destroy(logt_wthread_lock);
165 return -res;
166 }
167 sem_wait(&logt_thread_start);
168
169 if (logt_sched_param_queued) {
170 res = qb_log_thread_priority_set(logt_sched_policy,
171 #if defined(HAVE_PTHREAD_SETSCHEDPARAM)
172 logt_sched_param.sched_priority);
173 #else
174 0);
175 #endif
176 if (res != 0) {
177 goto cleanup_pthread;
178 }
179 logt_sched_param_queued = QB_FALSE;
180 }
181
182 return 0;
183
184 cleanup_pthread:
185 wthread_should_exit = QB_TRUE;
186 sem_post(&logt_print_finished);
187 pthread_join(logt_thread_id, NULL);
188
189 wthread_active = QB_FALSE;
190 wthread_lock = logt_wthread_lock;
|
(4) Event thread2_modifies_field: |
The thread 2 sets "logt_wthread_lock" to a new value. Note that this write can be reordered at runtime to occur before instructions that do not access this field within this locked region. After the thread 2 leaves the critical section, control is switched back to the thread 1. |
|
(6) Event thread1_overwrites_value_in_field: |
The thread 1 sets "logt_wthread_lock" to a new value. Now the two threads have an inconsistent view of "logt_wthread_lock" and updates to fields of "logt_wthread_lock" or fields correlated with "logt_wthread_lock" may be lost. |
| Also see events: |
[thread1_checks_field][thread2_checks_field][thread2_acquires_lock][thread1_acquires_lock][use_same_locks_for_read_and_modify] |
191 logt_wthread_lock = NULL;
192 (void)qb_thread_lock_destroy(wthread_lock);
193
194 sem_destroy(&logt_print_finished);
195 sem_destroy(&logt_thread_start);
196
197 return res;
198 }
199
200
201 void
202 qb_log_thread_pause(struct qb_log_target *t)
203 {
204 if (t->threaded) {
205 (void)qb_thread_lock(logt_wthread_lock);
206 }
207 }
208
209 void
210 qb_log_thread_resume(struct qb_log_target *t)
211 {
212 if (t->threaded) {
213 (void)qb_thread_unlock(logt_wthread_lock);
214 }
215 }
216
217 void
218 qb_log_thread_log_post(struct qb_log_callsite *cs,
219 struct timespec *timestamp, const char *buffer)
220 {
221 struct qb_log_record *rec;
222 size_t buf_size;
223 size_t total_size;
224
225 rec = malloc(sizeof(struct qb_log_record));
226 if (rec == NULL) {
227 return;
228 }
229
230 buf_size = strlen(buffer) + 1;
231 total_size = sizeof(struct qb_log_record) + buf_size;
232
233 rec->cs = cs;
234 rec->buffer = malloc(buf_size);
235 if (rec->buffer == NULL) {
236 goto free_record;
237 }
238 memcpy(rec->buffer, buffer, buf_size);
239
240 memcpy(&rec->timestamp, timestamp, sizeof(struct timespec));
241
242 qb_list_init(&rec->list);
243 (void)qb_thread_lock(logt_wthread_lock);
244 logt_memory_used += total_size;
245 if (logt_memory_used > 512000) {
246 free(rec->buffer);
247 free(rec);
248 logt_memory_used = logt_memory_used - total_size;
249 logt_dropped_messages += 1;
250 (void)qb_thread_unlock(logt_wthread_lock);
251 return;
252
253 } else {
254 qb_list_add_tail(&rec->list, &logt_print_finished_records);
255 }
256 (void)qb_thread_unlock(logt_wthread_lock);
257
258 sem_post(&logt_print_finished);
259 return;
260
261 free_record:
262 free(rec);
263 }
264
265 void
266 qb_log_thread_stop(void)
267 {
268 int res;
269 int value;
270 struct qb_log_record *rec;
271
272 if (wthread_active == QB_FALSE && logt_wthread_lock == NULL) {
273 return;
274 }
275 if (wthread_active == QB_FALSE) {
276 for (;;) {
277 res = sem_getvalue(&logt_print_finished, &value);
278 if (res != 0 || value == 0) {
279 break;
280 }
281 sem_wait(&logt_print_finished);
282
283 (void)qb_thread_lock(logt_wthread_lock);
284
285 rec = qb_list_first_entry(&logt_print_finished_records,
286 struct qb_log_record, list);
287 qb_list_del(&rec->list);
288 logt_memory_used = logt_memory_used -
289 strlen(rec->buffer) -
290 sizeof(struct qb_log_record) - 1;
291
292 qb_log_thread_log_write(rec->cs, &rec->timestamp,
293 rec->buffer);
294 (void)qb_thread_unlock(logt_wthread_lock);
295 free(rec->buffer);
296 free(rec);
297 }
298 } else {
299 (void)qb_thread_lock(logt_wthread_lock);
300 wthread_should_exit = QB_TRUE;
301 (void)qb_thread_unlock(logt_wthread_lock);
302 sem_post(&logt_print_finished);
303 pthread_join(logt_thread_id, NULL);
304 }
305 (void)qb_thread_lock_destroy(logt_wthread_lock);
306 sem_destroy(&logt_print_finished);
307 sem_destroy(&logt_thread_start);
308 }
309