1 /*
2 * Copyright 2004-2012 Red Hat, Inc.
3 *
4 * This copyrighted material is made available to anyone wishing to use,
5 * modify, copy, or redistribute it subject to the terms and conditions
6 * of the GNU General Public License v2 or (at your option) any later version.
7 */
8
9 #include "dlm_daemon.h"
10 #include <linux/dlm_plock.h>
11
12 #ifndef DLM_PLOCK_OP_CANCEL
13 #define DLM_PLOCK_OP_CANCEL 4
14 #endif
15
16 static uint32_t plock_read_count;
17 static uint32_t plock_recv_count;
18 static uint32_t plock_rate_delays;
19 static struct timeval plock_read_time;
20 static struct timeval plock_recv_time;
21 static struct timeval plock_rate_last;
22
23 static int plock_device_fd = -1;
24
25 #define RD_CONTINUE 0x00000001
26
27 struct resource_data {
28 uint64_t number;
29 int owner;
30 uint32_t lock_count;
31 uint32_t flags;
32 uint32_t pad;
33 };
34
35 struct plock_data {
36 uint64_t start;
37 uint64_t end;
38 uint64_t owner;
39 uint32_t pid;
40 uint32_t nodeid;
41 uint8_t ex;
42 uint8_t waiter;
43 uint16_t pad1;
44 uint32_t pad;
45 };
46
47 #define R_GOT_UNOWN 0x00000001 /* have received owner=0 message */
48 #define R_SEND_UNOWN 0x00000002 /* have sent owner=0 message */
49 #define R_SEND_OWN 0x00000004 /* have sent owner=our_nodeid message */
50 #define R_PURGE_UNOWN 0x00000008 /* set owner=0 in purge */
51 #define R_SEND_DROP 0x00000010
52
53 struct resource {
54 struct list_head list; /* list of resources */
55 uint64_t number;
56 int owner; /* nodeid or 0 for unowned */
57 uint32_t flags;
58 struct timeval last_access;
59 struct list_head locks; /* one lock for each range */
60 struct list_head waiters;
61 struct list_head pending; /* discovering r owner */
62 struct rb_node rb_node;
63 };
64
65 #define P_SYNCING 0x00000001 /* plock has been sent as part of sync but not
66 yet received */
67
68 struct posix_lock {
69 struct list_head list; /* resource locks or waiters list */
70 uint32_t pid;
71 uint64_t owner;
72 uint64_t start;
73 uint64_t end;
74 int ex;
75 int nodeid;
76 uint32_t flags;
77 };
78
79 struct lock_waiter {
80 struct list_head list;
81 uint32_t flags;
82 struct dlm_plock_info info;
83 };
84
85 struct save_msg {
86 struct list_head list;
87 int nodeid;
88 int len;
89 int type;
90 char buf[0];
91 };
92
93
94 static void send_own(struct lockspace *ls, struct resource *r, int owner);
95 static void save_pending_plock(struct lockspace *ls, struct resource *r,
96 struct dlm_plock_info *in);
97
98
99 static int got_unown(struct resource *r)
100 {
101 return !!(r->flags & R_GOT_UNOWN);
102 }
103
104 static void info_bswap_out(struct dlm_plock_info *i)
105 {
106 i->version[0] = cpu_to_le32(i->version[0]);
107 i->version[1] = cpu_to_le32(i->version[1]);
108 i->version[2] = cpu_to_le32(i->version[2]);
109 i->pid = cpu_to_le32(i->pid);
110 i->nodeid = cpu_to_le32(i->nodeid);
111 i->rv = cpu_to_le32(i->rv);
112 i->fsid = cpu_to_le32(i->fsid);
113 i->number = cpu_to_le64(i->number);
114 i->start = cpu_to_le64(i->start);
115 i->end = cpu_to_le64(i->end);
116 i->owner = cpu_to_le64(i->owner);
117 }
118
119 static void info_bswap_in(struct dlm_plock_info *i)
120 {
121 i->version[0] = le32_to_cpu(i->version[0]);
122 i->version[1] = le32_to_cpu(i->version[1]);
123 i->version[2] = le32_to_cpu(i->version[2]);
124 i->pid = le32_to_cpu(i->pid);
125 i->nodeid = le32_to_cpu(i->nodeid);
126 i->rv = le32_to_cpu(i->rv);
127 i->fsid = le32_to_cpu(i->fsid);
128 i->number = le64_to_cpu(i->number);
129 i->start = le64_to_cpu(i->start);
130 i->end = le64_to_cpu(i->end);
131 i->owner = le64_to_cpu(i->owner);
132 }
133
134 static const char *op_str(int optype)
135 {
136 switch (optype) {
137 case DLM_PLOCK_OP_LOCK:
138 return "LK";
139 case DLM_PLOCK_OP_CANCEL:
140 return "CL";
141 case DLM_PLOCK_OP_UNLOCK:
142 return "UN";
143 case DLM_PLOCK_OP_GET:
144 return "GET";
145 default:
146 return "??";
147 }
148 }
149
150 static const char *ex_str(int optype, int ex)
151 {
152 if (optype == DLM_PLOCK_OP_UNLOCK || optype == DLM_PLOCK_OP_GET)
153 return "-";
154 if (ex)
155 return "WR";
156 else
157 return "RD";
158 }
159
160 int setup_plocks(void)
161 {
162 plock_read_count = 0;
163 plock_recv_count = 0;
164 plock_rate_delays = 0;
165 gettimeofday(&plock_read_time, NULL);
166 gettimeofday(&plock_recv_time, NULL);
167 gettimeofday(&plock_rate_last, NULL);
168
169 if (plock_minor) {
170 plock_device_fd = open("/dev/misc/dlm_plock", O_RDWR);
171 }
172
173 if (plock_device_fd < 0) {
174 log_error("Failure to open plock device: %s", strerror(errno));
175 return -1;
176 }
177
178 log_debug("plocks %d", plock_device_fd);
179
180 return plock_device_fd;
181 }
182
183 void close_plocks(void)
184 {
185 if (plock_device_fd > 0)
186 close(plock_device_fd);
187 }
188
189 /* FIXME: unify these two */
190
191 static unsigned long time_diff_ms(struct timeval *begin, struct timeval *end)
192 {
193 struct timeval result;
194 timersub(end, begin, &result);
195 return (result.tv_sec * 1000) + (result.tv_usec / 1000);
196 }
197
198 static uint64_t dt_usec(const struct timeval *start, const struct timeval *stop)
199 {
200 uint64_t dt;
201
202 dt = stop->tv_sec - start->tv_sec;
203 dt *= 1000000;
204 dt += stop->tv_usec - start->tv_usec;
205 return dt;
206 }
207
208 static struct resource * rb_search_plock_resource(struct lockspace *ls, uint64_t number)
209 {
210 struct rb_node *n = ls->plock_resources_root.rb_node;
211 struct resource *r;
212
213 while (n) {
214 r = rb_entry(n, struct resource, rb_node);
215 if (number < r->number)
216 n = n->rb_left;
217 else if (number > r->number)
218 n = n->rb_right;
219 else
220 return r;
221 }
222 return NULL;
223 }
224
225 static void rb_insert_plock_resource(struct lockspace *ls, struct resource *r)
226 {
227 struct resource *entry;
228 struct rb_node **p;
229 struct rb_node *parent = NULL;
230
231 p = &ls->plock_resources_root.rb_node;
232 while (*p) {
233 parent = *p;
234 entry = rb_entry(parent, struct resource, rb_node);
235 if (r->number < entry->number)
236 p = &parent->rb_left;
237 else if (r->number > entry->number)
238 p = &parent->rb_right;
239 else
240 return;
241 }
242 rb_link_node(&r->rb_node, parent, p);
243 rb_insert_color(&r->rb_node, &ls->plock_resources_root);
244 }
245
246 static void rb_del_plock_resource(struct lockspace *ls, struct resource *r)
247 {
248 if (!RB_EMPTY_NODE(&r->rb_node)) {
249 rb_erase(&r->rb_node, &ls->plock_resources_root);
250 RB_CLEAR_NODE(&r->rb_node);
251 }
252 }
253
254 static struct resource *search_resource(struct lockspace *ls, uint64_t number)
255 {
256 struct resource *r;
257
258 list_for_each_entry(r, &ls->plock_resources, list) {
259 if (r->number == number)
260 return r;
261 }
262 return NULL;
263 }
264
265 static int find_resource(struct lockspace *ls, uint64_t number, int create,
266 struct resource **r_out)
267 {
268 struct resource *r = NULL;
269 int rv = 0;
270
271 r = rb_search_plock_resource(ls, number);
272 if (r)
273 goto out;
274
275 if (create == 0) {
276 rv = -ENOENT;
277 goto out;
278 }
279
280 r = malloc(sizeof(struct resource));
281 if (!r) {
282 log_elock(ls, "find_resource no memory %d", errno);
283 rv = -ENOMEM;
284 goto out;
285 }
286
287 memset(r, 0, sizeof(struct resource));
288 r->number = number;
289 INIT_LIST_HEAD(&r->locks);
290 INIT_LIST_HEAD(&r->waiters);
291 INIT_LIST_HEAD(&r->pending);
292
293 if (opt(plock_ownership_ind))
294 r->owner = -1;
295 else
296 r->owner = 0;
297
298 list_add_tail(&r->list, &ls->plock_resources);
299 rb_insert_plock_resource(ls, r);
300 out:
301 if (r)
302 gettimeofday(&r->last_access, NULL);
303 *r_out = r;
304 return rv;
305 }
306
307 static void put_resource(struct lockspace *ls, struct resource *r)
308 {
309 /* with ownership, resources are only freed via drop messages */
310 if (opt(plock_ownership_ind))
311 return;
312
313 if (list_empty(&r->locks) && list_empty(&r->waiters)) {
314 rb_del_plock_resource(ls, r);
315 list_del(&r->list);
316 free(r);
317 }
318 }
319
320 static inline int ranges_overlap(uint64_t start1, uint64_t end1,
321 uint64_t start2, uint64_t end2)
322 {
323 if (end1 < start2 || start1 > end2)
324 return 0;
325 return 1;
326 }
327
328 /**
329 * overlap_type - returns a value based on the type of overlap
330 * @s1 - start of new lock range
331 * @e1 - end of new lock range
332 * @s2 - start of existing lock range
333 * @e2 - end of existing lock range
334 *
335 */
336
337 static int overlap_type(uint64_t s1, uint64_t e1, uint64_t s2, uint64_t e2)
338 {
339 int ret;
340
341 /*
342 * ---r1---
343 * ---r2---
344 */
345
346 if (s1 == s2 && e1 == e2)
347 ret = 0;
348
349 /*
350 * --r1--
351 * ---r2---
352 */
353
354 else if (s1 == s2 && e1 < e2)
355 ret = 1;
356
357 /*
358 * --r1--
359 * ---r2---
360 */
361
362 else if (s1 > s2 && e1 == e2)
363 ret = 1;
364
365 /*
366 * --r1--
367 * ---r2---
368 */
369
370 else if (s1 > s2 && e1 < e2)
371 ret = 2;
372
373 /*
374 * ---r1--- or ---r1--- or ---r1---
375 * --r2-- --r2-- --r2--
376 */
377
378 else if (s1 <= s2 && e1 >= e2)
379 ret = 3;
380
381 /*
382 * ---r1---
383 * ---r2---
384 */
385
386 else if (s1 > s2 && e1 > e2)
387 ret = 4;
388
389 /*
390 * ---r1---
391 * ---r2---
392 */
393
394 else if (s1 < s2 && e1 < e2)
395 ret = 4;
396
397 else
398 ret = -1;
399
400 return ret;
401 }
402
403 /* shrink the range start2:end2 by the partially overlapping start:end */
404
405 static int shrink_range2(uint64_t *start2, uint64_t *end2,
406 uint64_t start, uint64_t end)
407 {
408 int error = 0;
409
410 if (*start2 < start)
411 *end2 = start - 1;
412 else if (*end2 > end)
413 *start2 = end + 1;
414 else
415 error = -1;
416 return error;
417 }
418
419 static int shrink_range(struct posix_lock *po, uint64_t start, uint64_t end)
420 {
421 return shrink_range2(&po->start, &po->end, start, end);
422 }
423
424 static int is_conflict(struct resource *r, struct dlm_plock_info *in, int get)
425 {
426 struct posix_lock *po;
427
428 list_for_each_entry(po, &r->locks, list) {
429 if (po->nodeid == in->nodeid && po->owner == in->owner)
430 continue;
431 if (!ranges_overlap(po->start, po->end, in->start, in->end))
432 continue;
433
434 if (in->ex || po->ex) {
435 if (get) {
436 in->ex = po->ex;
437 in->pid = po->pid;
438 in->start = po->start;
439 in->end = po->end;
440 }
441 return 1;
442 }
443 }
444 return 0;
445 }
446
447 static int add_lock(struct resource *r, uint32_t nodeid, uint64_t owner,
448 uint32_t pid, int ex, uint64_t start, uint64_t end)
449 {
450 struct posix_lock *po;
451
452 po = malloc(sizeof(struct posix_lock));
453 if (!po)
454 return -ENOMEM;
455
456 po->start = start;
457 po->end = end;
458 po->nodeid = nodeid;
459 po->owner = owner;
460 po->pid = pid;
461 po->ex = ex;
462 po->flags = 0;
463 list_add_tail(&po->list, &r->locks);
464
465 return 0;
466 }
467
468 /* RN within RE (and starts or ends on RE boundary)
469 1. add new lock for non-overlap area of RE, orig mode
470 2. convert RE to RN range and mode */
471
472 static int lock_case1(struct posix_lock *po, struct resource *r,
473 struct dlm_plock_info *in)
474 {
475 uint64_t start2, end2;
476 int rv;
477
478 /* non-overlapping area start2:end2 */
479 start2 = po->start;
480 end2 = po->end;
481 rv = shrink_range2(&start2, &end2, in->start, in->end);
482 if (rv)
483 goto out;
484
485 po->start = in->start;
486 po->end = in->end;
487 po->ex = in->ex;
488
489 rv = add_lock(r, in->nodeid, in->owner, in->pid, !in->ex, start2, end2);
490 out:
491 return rv;
492 }
493
494 /* RN within RE (RE overlaps RN on both sides)
495 1. add new lock for front fragment, orig mode
496 2. add new lock for back fragment, orig mode
497 3. convert RE to RN range and mode */
498
499 static int lock_case2(struct posix_lock *po, struct resource *r,
500 struct dlm_plock_info *in)
501
502 {
503 int rv;
504
505 rv = add_lock(r, in->nodeid, in->owner, in->pid,
506 !in->ex, po->start, in->start - 1);
507 if (rv)
508 goto out;
509
510 rv = add_lock(r, in->nodeid, in->owner, in->pid,
511 !in->ex, in->end + 1, po->end);
512 if (rv)
513 goto out;
514
515 po->start = in->start;
516 po->end = in->end;
517 po->ex = in->ex;
518 out:
519 return rv;
520 }
521
522 static int lock_internal(struct lockspace *ls, struct resource *r,
523 struct dlm_plock_info *in)
524 {
525 struct posix_lock *po, *safe;
526 int rv = 0;
527
528 list_for_each_entry_safe(po, safe, &r->locks, list) {
529 if (po->nodeid != in->nodeid || po->owner != in->owner)
530 continue;
531 if (!ranges_overlap(po->start, po->end, in->start, in->end))
532 continue;
533
534 /* existing range (RE) overlaps new range (RN) */
535
536 switch(overlap_type(in->start, in->end, po->start, po->end)) {
537
538 case 0:
539 if (po->ex == in->ex)
540 goto out;
541
542 /* ranges the same - just update the existing lock */
543 po->ex = in->ex;
544 goto out;
545
546 case 1:
547 if (po->ex == in->ex)
548 goto out;
549
550 rv = lock_case1(po, r, in);
551 goto out;
552
553 case 2:
554 if (po->ex == in->ex)
555 goto out;
556
557 rv = lock_case2(po, r, in);
558 goto out;
559
560 case 3:
561 list_del(&po->list);
562 free(po);
563 break;
564
565 case 4:
566 if (po->start < in->start)
567 po->end = in->start - 1;
568 else
569 po->start = in->end + 1;
570 break;
571
572 default:
573 rv = -1;
574 goto out;
575 }
576 }
577
578 rv = add_lock(r, in->nodeid, in->owner, in->pid,
579 in->ex, in->start, in->end);
580 out:
581 return rv;
582
583 }
584
585 static int unlock_internal(struct lockspace *ls, struct resource *r,
586 struct dlm_plock_info *in)
587 {
588 struct posix_lock *po, *safe;
589 int rv = 0;
590
591 list_for_each_entry_safe(po, safe, &r->locks, list) {
592 if (po->nodeid != in->nodeid || po->owner != in->owner)
593 continue;
594 if (!ranges_overlap(po->start, po->end, in->start, in->end))
595 continue;
596
597 /* existing range (RE) overlaps new range (RN) */
598
599 switch (overlap_type(in->start, in->end, po->start, po->end)) {
600
601 case 0:
602 /* ranges the same - just remove the existing lock */
603
604 list_del(&po->list);
605 free(po);
606 goto out;
607
608 case 1:
609 /* RN within RE and starts or ends on RE boundary -
610 * shrink and update RE */
611
612 rv = shrink_range(po, in->start, in->end);
613 goto out;
614
615 case 2:
616 /* RN within RE - shrink and update RE to be front
617 * fragment, and add a new lock for back fragment */
618
619 rv = add_lock(r, in->nodeid, in->owner, in->pid,
620 po->ex, in->end + 1, po->end);
621 po->end = in->start - 1;
622 goto out;
623
624 case 3:
625 /* RE within RN - remove RE, then continue checking
626 * because RN could cover other locks */
627
628 list_del(&po->list);
629 free(po);
630 continue;
631
632 case 4:
633 /* front of RE in RN, or end of RE in RN - shrink and
634 * update RE, then continue because RN could cover
635 * other locks */
636
637 rv = shrink_range(po, in->start, in->end);
638 continue;
639
640 default:
641 rv = -1;
642 goto out;
643 }
644 }
645 out:
646 return rv;
647 }
648
649 static void clear_waiters(struct lockspace *ls, struct resource *r,
650 struct dlm_plock_info *in)
651 {
652 struct lock_waiter *w, *safe;
653
654 list_for_each_entry_safe(w, safe, &r->waiters, list) {
655 if (w->info.nodeid != in->nodeid || w->info.owner != in->owner)
656 continue;
657
658 list_del(&w->list);
659
660 log_dlock(ls, "clear waiter %llx %llx-%llx %d/%u/%llx",
661 (unsigned long long)in->number,
662 (unsigned long long)in->start,
663 (unsigned long long)in->end,
664 in->nodeid, in->pid,
665 (unsigned long long)in->owner);
666 free(w);
667 }
668 }
669
670 static int add_waiter(struct lockspace *ls, struct resource *r,
671 struct dlm_plock_info *in)
672
673 {
674 struct lock_waiter *w;
675
676 w = malloc(sizeof(struct lock_waiter));
677 if (!w)
678 return -ENOMEM;
679 memcpy(&w->info, in, sizeof(struct dlm_plock_info));
680 w->flags = 0;
681 list_add_tail(&w->list, &r->waiters);
682 return 0;
683 }
684
685 static void write_result(struct dlm_plock_info *in, int rv)
686 {
687 int write_rv;
688
689 in->rv = rv;
690 write_rv = write(plock_device_fd, in, sizeof(struct dlm_plock_info));
691 if (write_rv < 0)
692 log_debug("write_result: write error %d fd %d\n",
693 errno, plock_device_fd);
694 }
695
696 static void do_waiters(struct lockspace *ls, struct resource *r)
697 {
698 struct lock_waiter *w, *safe;
699 struct dlm_plock_info *in;
700 int rv;
701
702 list_for_each_entry_safe(w, safe, &r->waiters, list) {
703 in = &w->info;
704
705 if (is_conflict(r, in, 0))
706 continue;
707
708 list_del(&w->list);
709
710 /*
711 log_group(ls, "take waiter %llx %llx-%llx %d/%u/%llx",
712 in->number, in->start, in->end,
713 in->nodeid, in->pid, in->owner);
714 */
715
716 rv = lock_internal(ls, r, in);
717
718 if (in->nodeid == our_nodeid)
719 write_result(in, rv);
720
721 free(w);
722 }
723 }
724
725 static void do_lock(struct lockspace *ls, struct dlm_plock_info *in,
726 struct resource *r)
727 {
728 int rv;
729
730 if (is_conflict(r, in, 0)) {
731 if (!in->wait)
732 rv = -EAGAIN;
733 else {
734 rv = add_waiter(ls, r, in);
735 if (rv)
736 goto out;
737 rv = -EINPROGRESS;
738 }
739 } else
740 rv = lock_internal(ls, r, in);
741
742 out:
743 if (in->nodeid == our_nodeid && rv != -EINPROGRESS)
744 write_result(in, rv);
745
746 do_waiters(ls, r);
747 put_resource(ls, r);
748 }
749
750 static int remove_waiter(const struct resource *r, const struct dlm_plock_info *in)
751 {
752 struct lock_waiter *w;
753
754 list_for_each_entry(w, &r->waiters, list) {
755 if (w->info.nodeid == in->nodeid &&
756 w->info.fsid == in->fsid &&
757 w->info.number == in->number &&
758 w->info.owner == in->owner &&
759 w->info.pid == in->pid &&
760 w->info.start == in->start &&
761 w->info.end == in->end &&
762 w->info.ex == in->ex) {
763 list_del(&w->list);
764 free(w);
765 return 0;
766 }
767 }
768
769 return -ENOENT;
770 }
771
772 static void do_cancel(struct lockspace *ls, struct dlm_plock_info *in,
773 struct resource *r)
774 {
775 int rv;
776
777 rv = remove_waiter(r, in);
778 if (in->nodeid == our_nodeid)
779 write_result(in, rv);
780
781 put_resource(ls, r);
782 }
783
784 static void do_unlock(struct lockspace *ls, struct dlm_plock_info *in,
785 struct resource *r)
786 {
787 int rv;
788
789 rv = unlock_internal(ls, r, in);
790
791 if (in->flags & DLM_PLOCK_FL_CLOSE) {
792 clear_waiters(ls, r, in);
793 /* no replies for unlock-close ops */
794 goto skip_result;
795 }
796
797 if (in->nodeid == our_nodeid)
798 write_result(in, rv);
799
800 skip_result:
801 do_waiters(ls, r);
802 put_resource(ls, r);
803 }
804
805 /* we don't even get to this function if the getlk isn't from us */
806
807 static void do_get(struct lockspace *ls, struct dlm_plock_info *in,
808 struct resource *r)
809 {
810 int rv;
811
812 if (is_conflict(r, in, 1))
813 rv = 1;
814 else
815 rv = 0;
816
817 write_result(in, rv);
818 put_resource(ls, r);
819 }
820
821 static void save_message(struct lockspace *ls, struct dlm_header *hd, int len,
822 int from, int type)
823 {
824 struct save_msg *sm;
825
826 sm = malloc(sizeof(struct save_msg) + len);
827 if (!sm)
828 return;
829 memset(sm, 0, sizeof(struct save_msg) + len);
830
831 memcpy(&sm->buf, hd, len);
832 sm->type = type;
833 sm->len = len;
834 sm->nodeid = from;
835
836 log_plock(ls, "save %s from %d len %d", msg_name(type), from, len);
837
838 list_add_tail(&sm->list, &ls->saved_messages);
839 }
840
841 static void __receive_plock(struct lockspace *ls, struct dlm_plock_info *in,
842 int from, struct resource *r)
843 {
844 switch (in->optype) {
845 case DLM_PLOCK_OP_LOCK:
846 ls->last_plock_time = monotime();
847 do_lock(ls, in, r);
848 break;
849 case DLM_PLOCK_OP_CANCEL:
850 ls->last_plock_time = monotime();
851 do_cancel(ls, in, r);
852 break;
853 case DLM_PLOCK_OP_UNLOCK:
854 ls->last_plock_time = monotime();
855 do_unlock(ls, in, r);
856 break;
857 case DLM_PLOCK_OP_GET:
858 do_get(ls, in, r);
859 break;
860 default:
861 log_elock(ls, "receive_plock error from %d optype %d",
862 from, in->optype);
863 if (from == our_nodeid)
864 write_result(in, -EINVAL);
865 }
866 }
867
868 /* When ls members receive our options message (for our mount), one of them
869 saves all plock state received to that point in a checkpoint and then sends
870 us our journals message. We know to retrieve the plock state from the
871 checkpoint when we receive our journals message. Any plocks messages that
872 arrive between seeing our options message and our journals message needs to
873 be saved and processed after we synchronize our plock state from the
874 checkpoint. Any plock message received while we're mounting but before we
875 set save_plocks (when we see our options message) can be ignored because it
876 should be reflected in the checkpointed state. */
877
878 static void _receive_plock(struct lockspace *ls, struct dlm_header *hd, int len)
879 {
880 struct dlm_plock_info info;
881 struct resource *r = NULL;
882 struct timeval now;
883 uint64_t usec;
884 int from = hd->nodeid;
885 int rv, create;
886
887 memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
888 info_bswap_in(&info);
889
890 log_plock(ls, "receive plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
891 (unsigned long long)info.number,
892 op_str(info.optype),
893 ex_str(info.optype, info.ex),
894 (unsigned long long)info.start, (unsigned long long)info.end,
895 info.nodeid, info.pid, (unsigned long long)info.owner,
896 info.wait);
897
898 plock_recv_count++;
899 if (!(plock_recv_count % 1000)) {
900 gettimeofday(&now, NULL);
901 usec = dt_usec(&plock_recv_time, &now);
902 log_plock(ls, "plock_recv_count %u time %.3f s",
903 plock_recv_count, usec * 1.e-6);
904 plock_recv_time = now;
905 }
906
907 if (info.optype == DLM_PLOCK_OP_GET && from != our_nodeid)
908 return;
909
910 if (from != hd->nodeid || from != info.nodeid) {
911 log_elock(ls, "receive_plock error from %d header %d info %d",
912 from, hd->nodeid, info.nodeid);
913 return;
914 }
915
916 create = !opt(plock_ownership_ind);
917
918 rv = find_resource(ls, info.number, create, &r);
919
920 if (rv && opt(plock_ownership_ind)) {
921 /* There must have been a race with a drop, so we need to
922 ignore this plock op which will be resent. If we're the one
923 who sent the plock, we need to send_own() and put it on the
924 pending list to resend once the owner is established. */
925
926 log_plock(ls, "receive_plock from %d no r %llx", from,
927 (unsigned long long)info.number);
928
929 if (from != our_nodeid)
930 return;
931
932 rv = find_resource(ls, info.number, 1, &r);
933 if (rv)
934 return;
935 send_own(ls, r, our_nodeid);
936 save_pending_plock(ls, r, &info);
937 return;
938 }
939 if (rv) {
940 /* r not found, rv is -ENOENT, this shouldn't happen because
941 process_plocks() creates a resource for every op */
942
943 log_elock(ls, "receive_plock error from %d no r %llx %d",
944 from, (unsigned long long)info.number, rv);
945 return;
946 }
947
948 /* The owner should almost always be 0 here, but other owners may
949 be possible given odd combinations of races with drop. Odd races to
950 worry about (some seem pretty improbable):
951
952 - A sends drop, B sends plock, receive drop, receive plock.
953 This is addressed above.
954
955 - A sends drop, B sends plock, receive drop, B reads plock
956 and sends own, receive plock, on B we find owner of -1.
957
958 - A sends drop, B sends two plocks, receive drop, receive plocks.
959 Receiving the first plock is the previous case, receiving the
960 second plock will find r with owner of -1.
961
962 - A sends drop, B sends two plocks, receive drop, C sends own,
963 receive plock, B sends own, receive own (C), receive plock,
964 receive own (B).
965
966 Haven't tried to cook up a scenario that would lead to the
967 last case below; receiving a plock from ourself and finding
968 we're the owner of r. */
969
970 if (!r->owner) {
971 __receive_plock(ls, &info, from, r);
972
973 } else if (r->owner == -1) {
974 log_plock(ls, "receive_plock from %d r %llx owner %d", from,
975 (unsigned long long)info.number, r->owner);
976
977 if (from == our_nodeid)
978 save_pending_plock(ls, r, &info);
979
980 } else if (r->owner != our_nodeid) {
981 log_plock(ls, "receive_plock from %d r %llx owner %d", from,
982 (unsigned long long)info.number, r->owner);
983
984 if (from == our_nodeid)
985 save_pending_plock(ls, r, &info);
986
987 } else if (r->owner == our_nodeid) {
988 log_plock(ls, "receive_plock from %d r %llx owner %d", from,
989 (unsigned long long)info.number, r->owner);
990
991 if (from == our_nodeid)
992 __receive_plock(ls, &info, from, r);
993 }
994 }
995
996 void receive_plock(struct lockspace *ls, struct dlm_header *hd, int len)
997 {
998 if (ls->save_plocks) {
999 save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK);
1000 return;
1001 }
1002
1003 _receive_plock(ls, hd, len);
1004 }
1005
1006 static int send_struct_info(struct lockspace *ls, struct dlm_plock_info *in,
1007 int msg_type)
1008 {
1009 struct dlm_header *hd;
1010 int rv = 0, len;
1011 char *buf;
1012
1013 len = sizeof(struct dlm_header) + sizeof(struct dlm_plock_info);
1014 buf = malloc(len);
1015 if (!buf) {
1016 rv = -ENOMEM;
1017 goto out;
1018 }
1019 memset(buf, 0, len);
1020
1021 info_bswap_out(in);
1022
1023 hd = (struct dlm_header *)buf;
1024 hd->type = msg_type;
1025
1026 memcpy(buf + sizeof(struct dlm_header), in, sizeof(*in));
1027
1028 dlm_send_message(ls, buf, len);
1029
1030 free(buf);
1031 out:
1032 if (rv)
1033 log_elock(ls, "send_struct_info error %d", rv);
1034 return rv;
1035 }
1036
1037 static void send_plock(struct lockspace *ls, struct resource *r,
1038 struct dlm_plock_info *in)
1039 {
1040 send_struct_info(ls, in, DLM_MSG_PLOCK);
1041 }
1042
1043 static void send_own(struct lockspace *ls, struct resource *r, int owner)
1044 {
1045 struct dlm_plock_info info;
1046
1047 /* if we've already sent an own message for this resource,
1048 (pending list is not empty), then we shouldn't send another */
1049
1050 if (!list_empty(&r->pending)) {
1051 log_plock(ls, "send_own %llx already pending",
1052 (unsigned long long)r->number);
1053 return;
1054 }
1055
1056 if (!owner)
1057 r->flags |= R_SEND_UNOWN;
1058 else
1059 r->flags |= R_SEND_OWN;
1060
1061 memset(&info, 0, sizeof(info));
1062 info.number = r->number;
1063 info.nodeid = owner;
1064
1065 send_struct_info(ls, &info, DLM_MSG_PLOCK_OWN);
1066 }
1067
1068 static void send_syncs(struct lockspace *ls, struct resource *r)
1069 {
1070 struct dlm_plock_info info;
1071 struct posix_lock *po;
1072 struct lock_waiter *w;
1073 int rv;
1074
1075 list_for_each_entry(po, &r->locks, list) {
1076 memset(&info, 0, sizeof(info));
1077 info.number = r->number;
1078 info.start = po->start;
1079 info.end = po->end;
1080 info.nodeid = po->nodeid;
1081 info.owner = po->owner;
1082 info.pid = po->pid;
1083 info.ex = po->ex;
1084
1085 rv = send_struct_info(ls, &info, DLM_MSG_PLOCK_SYNC_LOCK);
1086 if (rv)
1087 goto out;
1088
1089 po->flags |= P_SYNCING;
1090 }
1091
1092 list_for_each_entry(w, &r->waiters, list) {
1093 memcpy(&info, &w->info, sizeof(info));
1094
1095 rv = send_struct_info(ls, &info, DLM_MSG_PLOCK_SYNC_WAITER);
1096 if (rv)
1097 goto out;
1098
1099 w->flags |= P_SYNCING;
1100 }
1101 out:
1102 return;
1103 }
1104
1105 static void send_drop(struct lockspace *ls, struct resource *r)
1106 {
1107 struct dlm_plock_info info;
1108
1109 memset(&info, 0, sizeof(info));
1110 info.number = r->number;
1111 r->flags |= R_SEND_DROP;
1112
1113 send_struct_info(ls, &info, DLM_MSG_PLOCK_DROP);
1114 }
1115
1116 /* plock op can't be handled until we know the owner value of the resource,
1117 so the op is saved on the pending list until the r owner is established */
1118
1119 static void save_pending_plock(struct lockspace *ls, struct resource *r,
1120 struct dlm_plock_info *in)
1121 {
1122 struct lock_waiter *w;
1123
1124 w = malloc(sizeof(struct lock_waiter));
1125 if (!w) {
1126 log_elock(ls, "save_pending_plock no mem");
1127 return;
1128 }
1129 memcpy(&w->info, in, sizeof(struct dlm_plock_info));
1130 w->flags = 0;
1131 list_add_tail(&w->list, &r->pending);
1132 }
1133
1134 /* plock ops are on pending list waiting for ownership to be established.
1135 owner has now become us, so add these plocks to r */
1136
1137 static void add_pending_plocks(struct lockspace *ls, struct resource *r)
1138 {
1139 struct lock_waiter *w, *safe;
1140
1141 list_for_each_entry_safe(w, safe, &r->pending, list) {
1142 __receive_plock(ls, &w->info, our_nodeid, r);
1143 list_del(&w->list);
1144 free(w);
1145 }
1146 }
1147
1148 /* plock ops are on pending list waiting for ownership to be established.
1149 owner has now become 0, so send these plocks to everyone */
1150
1151 static void send_pending_plocks(struct lockspace *ls, struct resource *r)
1152 {
1153 struct lock_waiter *w, *safe;
1154
1155 list_for_each_entry_safe(w, safe, &r->pending, list) {
1156 send_plock(ls, r, &w->info);
1157 list_del(&w->list);
1158 free(w);
1159 }
1160 }
1161
1162 static void _receive_own(struct lockspace *ls, struct dlm_header *hd, int len)
1163 {
1164 struct dlm_plock_info info;
1165 struct resource *r;
1166 int should_not_happen = 0;
1167 int from = hd->nodeid;
1168 int rv;
1169
1170 ls->last_plock_time = monotime();
1171
1172 memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
1173 info_bswap_in(&info);
1174
1175 log_plock(ls, "receive_own %llx from %u owner %u",
1176 (unsigned long long)info.number, hd->nodeid, info.nodeid);
1177
1178 rv = find_resource(ls, info.number, 1, &r);
1179 if (rv)
1180 return;
1181
1182 if (from == our_nodeid) {
1183 /*
1184 * received our own own message
1185 */
1186
1187 if (info.nodeid == 0) {
1188 /* we are setting owner to 0 */
1189
1190 if (r->owner == our_nodeid) {
1191 /* we set owner to 0 when we relinquish
1192 ownership */
1193 should_not_happen = 1;
1194 } else if (r->owner == 0) {
1195 /* this happens when we relinquish ownership */
1196 r->flags |= R_GOT_UNOWN;
1197 } else {
1198 should_not_happen = 1;
1199 }
1200
1201 } else if (info.nodeid == our_nodeid) {
1202 /* we are setting owner to ourself */
1203
1204 if (r->owner == -1) {
1205 /* we have gained ownership */
1206 r->owner = our_nodeid;
1207 add_pending_plocks(ls, r);
1208 } else if (r->owner == our_nodeid) {
1209 should_not_happen = 1;
1210 } else if (r->owner == 0) {
1211 send_pending_plocks(ls, r);
1212 } else {
1213 /* resource is owned by other node;
1214 they should set owner to 0 shortly */
1215 }
1216
1217 } else {
1218 /* we should only ever set owner to 0 or ourself */
1219 should_not_happen = 1;
1220 }
1221 } else {
1222 /*
1223 * received own message from another node
1224 */
1225
1226 if (info.nodeid == 0) {
1227 /* other node is setting owner to 0 */
1228
1229 if (r->owner == -1) {
1230 /* we should have a record of the owner before
1231 it relinquishes */
1232 should_not_happen = 1;
1233 } else if (r->owner == our_nodeid) {
1234 /* only the owner should relinquish */
1235 should_not_happen = 1;
1236 } else if (r->owner == 0) {
1237 should_not_happen = 1;
1238 } else {
1239 r->owner = 0;
1240 r->flags |= R_GOT_UNOWN;
1241 send_pending_plocks(ls, r);
1242 }
1243
1244 } else if (info.nodeid == from) {
1245 /* other node is setting owner to itself */
1246
1247 if (r->owner == -1) {
1248 /* normal path for a node becoming owner */
1249 r->owner = from;
1250 } else if (r->owner == our_nodeid) {
1251 /* we relinquish our ownership: sync our local
1252 plocks to everyone, then set owner to 0 */
1253 send_syncs(ls, r);
1254 send_own(ls, r, 0);
1255 /* we need to set owner to 0 here because
1256 local ops may arrive before we receive
1257 our send_own message and can't be added
1258 locally */
1259 r->owner = 0;
1260 } else if (r->owner == 0) {
1261 /* can happen because we set owner to 0 before
1262 we receive our send_own sent just above */
1263 } else {
1264 /* do nothing, current owner should be
1265 relinquishing its ownership */
1266 }
1267
1268 } else if (info.nodeid == our_nodeid) {
1269 /* no one else should try to set the owner to us */
1270 should_not_happen = 1;
1271 } else {
1272 /* a node should only ever set owner to 0 or itself */
1273 should_not_happen = 1;
1274 }
1275 }
1276
1277 if (should_not_happen) {
1278 log_elock(ls, "receive_own error from %u %llx "
1279 "info nodeid %d r owner %d",
1280 from, (unsigned long long)r->number,
1281 info.nodeid, r->owner);
1282 }
1283 }
1284
1285 void receive_own(struct lockspace *ls, struct dlm_header *hd, int len)
1286 {
1287 if (ls->save_plocks) {
1288 save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK_OWN);
1289 return;
1290 }
1291
1292 _receive_own(ls, hd, len);
1293 }
1294
1295 static void clear_syncing_flag(struct lockspace *ls, struct resource *r,
1296 struct dlm_plock_info *in)
1297 {
1298 struct posix_lock *po;
1299 struct lock_waiter *w;
1300
1301 list_for_each_entry(po, &r->locks, list) {
1302 if ((po->flags & P_SYNCING) &&
1303 in->start == po->start &&
1304 in->end == po->end &&
1305 in->nodeid == po->nodeid &&
1306 in->owner == po->owner &&
1307 in->pid == po->pid &&
1308 in->ex == po->ex) {
1309 po->flags &= ~P_SYNCING;
1310 return;
1311 }
1312 }
1313
1314 list_for_each_entry(w, &r->waiters, list) {
1315 if ((w->flags & P_SYNCING) &&
1316 in->start == w->info.start &&
1317 in->end == w->info.end &&
1318 in->nodeid == w->info.nodeid &&
1319 in->owner == w->info.owner &&
1320 in->pid == w->info.pid &&
1321 in->ex == w->info.ex) {
1322 w->flags &= ~P_SYNCING;
1323 return;
1324 }
1325 }
1326
1327 log_elock(ls, "clear_syncing error %llx no match %s %llx-%llx %d/%u/%llx",
1328 (unsigned long long)r->number,
1329 in->ex ? "WR" : "RD",
1330 (unsigned long long)in->start,
1331 (unsigned long long)in->end,
1332 in->nodeid, in->pid,
1333 (unsigned long long)in->owner);
1334 }
1335
1336 static void _receive_sync(struct lockspace *ls, struct dlm_header *hd, int len)
1337 {
1338 struct dlm_plock_info info;
1339 struct resource *r;
1340 int from = hd->nodeid;
1341 int rv;
1342
1343 ls->last_plock_time = monotime();
1344
1345 memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
1346 info_bswap_in(&info);
1347
1348 log_plock(ls, "receive sync %llx from %u %s %llx-%llx %d/%u/%llx",
1349 (unsigned long long)info.number, from, info.ex ? "WR" : "RD",
1350 (unsigned long long)info.start, (unsigned long long)info.end,
1351 info.nodeid, info.pid, (unsigned long long)info.owner);
1352
1353 rv = find_resource(ls, info.number, 0, &r);
1354 if (rv) {
1355 log_elock(ls, "receive_sync error no r %llx from %d",
1356 info.number, from);
1357 return;
1358 }
1359
1360 if (from == our_nodeid) {
1361 /* this plock now in sync on all nodes */
1362 clear_syncing_flag(ls, r, &info);
1363 return;
1364 }
1365
1366 if (hd->type == DLM_MSG_PLOCK_SYNC_LOCK)
1367 add_lock(r, info.nodeid, info.owner, info.pid, info.ex,
1368 info.start, info.end);
1369 else if (hd->type == DLM_MSG_PLOCK_SYNC_WAITER)
1370 add_waiter(ls, r, &info);
1371 }
1372
1373 void receive_sync(struct lockspace *ls, struct dlm_header *hd, int len)
1374 {
1375 if (ls->save_plocks) {
1376 save_message(ls, hd, len, hd->nodeid, hd->type);
1377 return;
1378 }
1379
1380 _receive_sync(ls, hd, len);
1381 }
1382
1383 static void _receive_drop(struct lockspace *ls, struct dlm_header *hd, int len)
1384 {
1385 struct dlm_plock_info info;
1386 struct resource *r;
1387 int from = hd->nodeid;
1388 int rv;
1389
1390 ls->last_plock_time = monotime();
1391
1392 memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
1393 info_bswap_in(&info);
1394
1395 log_plock(ls, "receive_drop %llx from %u",
1396 (unsigned long long)info.number, from);
1397
1398 rv = find_resource(ls, info.number, 0, &r);
1399 if (rv) {
1400 /* we'll find no r if two nodes sent drop at once */
1401 log_plock(ls, "receive_drop from %d no r %llx", from,
1402 (unsigned long long)info.number);
1403 return;
1404 }
1405
1406 if (r->owner != 0) {
1407 /* - A sent drop, B sent drop, receive drop A, C sent own,
1408 receive drop B (this warning on C, owner -1)
1409 - A sent drop, B sent drop, receive drop A, A sent own,
1410 receive own A, receive drop B (this warning on all,
1411 owner A) */
1412 log_plock(ls, "receive_drop from %d r %llx owner %d", from,
1413 (unsigned long long)r->number, r->owner);
1414 return;
1415 }
1416
1417 if (!list_empty(&r->pending)) {
1418 /* shouldn't happen */
1419 log_elock(ls, "receive_drop error from %d r %llx pending op",
1420 from, (unsigned long long)r->number);
1421 return;
1422 }
1423
1424 /* the decision to drop or not must be based on things that are
1425 guaranteed to be the same on all nodes */
1426
1427 if (list_empty(&r->locks) && list_empty(&r->waiters)) {
1428 rb_del_plock_resource(ls, r);
1429 list_del(&r->list);
1430 free(r);
1431 } else {
1432 /* A sent drop, B sent a plock, receive plock, receive drop */
1433 log_plock(ls, "receive_drop from %d r %llx in use", from,
1434 (unsigned long long)r->number);
1435 }
1436 }
1437
1438 void receive_drop(struct lockspace *ls, struct dlm_header *hd, int len)
1439 {
1440 if (ls->save_plocks) {
1441 save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK_DROP);
1442 return;
1443 }
1444
1445 _receive_drop(ls, hd, len);
1446 }
1447
1448 /* We only drop resources from the unowned state to simplify things.
1449 If we want to drop a resource we own, we unown/relinquish it first. */
1450
1451 /* FIXME: in the transition from owner = us, to owner = 0, to drop;
1452 we want the second period to be shorter than the first */
1453
1454 static int drop_resources(struct lockspace *ls)
1455 {
1456 struct resource *r;
1457 struct timeval now;
1458 int count = 0;
1459
1460 if (!opt(plock_ownership_ind))
1461 return 0;
1462
1463 if (list_empty(&ls->plock_resources))
1464 return 0;
1465
1466 gettimeofday(&now, NULL);
1467
1468 if (time_diff_ms(&ls->drop_resources_last, &now) <
1469 opt(drop_resources_time_ind))
1470 return 1;
1471
1472 ls->drop_resources_last = now;
1473
1474 /* try to drop the oldest, unused resources */
1475
1476 list_for_each_entry_reverse(r, &ls->plock_resources, list) {
1477 if (count >= opt(drop_resources_count_ind))
1478 break;
1479 if (r->owner && r->owner != our_nodeid)
1480 continue;
1481 if (time_diff_ms(&r->last_access, &now) <
1482 opt(drop_resources_age_ind))
1483 continue;
1484
1485 if (list_empty(&r->locks) && list_empty(&r->waiters)) {
1486 if (r->owner == our_nodeid) {
1487 send_own(ls, r, 0);
1488 r->owner = 0;
1489 } else if (r->owner == 0 && got_unown(r)) {
1490 send_drop(ls, r);
1491 }
1492
1493 count++;
1494 }
1495 }
1496
1497 return 1;
1498 }
1499
1500 void drop_resources_all(void)
1501 {
1502 struct lockspace *ls;
1503 int rv = 0;
1504
1505 poll_drop_plock = 0;
1506
1507 list_for_each_entry(ls, &lockspaces, list) {
1508 rv = drop_resources(ls);
1509 if (rv)
1510 poll_drop_plock = 1;
1511 }
1512 }
1513
1514 int limit_plocks(void)
1515 {
1516 struct timeval now;
1517
1518 if (!opt(plock_rate_limit_ind) || !plock_read_count)
1519 return 0;
1520
1521 gettimeofday(&now, NULL);
1522
1523 /* Every time a plock op is read from the kernel, we increment
1524 plock_read_count. After every plock_rate_limit (N) reads,
1525 we check the time it's taken to do those N; if the time is less than
1526 a second, then we delay reading any more until a second is up.
1527 This way we read a max of N ops from the kernel every second. */
1528
1529 if (!(plock_read_count % opt(plock_rate_limit_ind))) {
1530 if (time_diff_ms(&plock_rate_last, &now) < 1000) {
1531 plock_rate_delays++;
1532 return 2;
1533 }
1534 plock_rate_last = now;
1535 plock_read_count++;
1536 }
1537 return 0;
1538 }
1539
1540 void process_plocks(int ci)
1541 {
1542 struct lockspace *ls;
1543 struct resource *r;
1544 struct dlm_plock_info info;
1545 struct timeval now;
1546 uint64_t usec;
1547 int create, rv;
1548
1549 if (limit_plocks()) {
1550 poll_ignore_plock = 1;
1551 client_ignore(plock_ci, plock_fd);
1552 return;
1553 }
1554
1555 gettimeofday(&now, NULL);
1556
1557 rv = do_read(plock_device_fd, &info, sizeof(info));
1558 if (rv < 0) {
1559 log_debug("process_plocks: read error %d fd %d\n",
1560 errno, plock_device_fd);
1561 return;
1562 }
1563
1564 /* kernel doesn't set the nodeid field */
1565 info.nodeid = our_nodeid;
1566
1567 if (!opt(enable_plock_ind)) {
1568 rv = -ENOSYS;
1569 goto fail;
1570 }
1571
1572 ls = find_ls_id(info.fsid);
1573 if (!ls) {
1574 log_plock(ls, "process_plocks: no ls id %x", info.fsid);
1575 rv = -EEXIST;
1576 goto fail;
1577 }
1578
1579 if (ls->disable_plock) {
1580 rv = -ENOSYS;
1581 goto fail;
1582 }
1583
1584 log_plock(ls, "read plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
1585 (unsigned long long)info.number,
1586 op_str(info.optype),
1587 ex_str(info.optype, info.ex),
1588 (unsigned long long)info.start, (unsigned long long)info.end,
1589 info.nodeid, info.pid, (unsigned long long)info.owner,
1590 info.wait);
1591
1592 /* report plock rate and any delays since the last report */
1593 plock_read_count++;
1594 if (!(plock_read_count % 1000)) {
1595 usec = dt_usec(&plock_read_time, &now) ;
1596 log_plock(ls, "plock_read_count %u time %.3f s delays %u",
1597 plock_read_count, usec * 1.e-6, plock_rate_delays);
1598 plock_read_time = now;
1599 plock_rate_delays = 0;
1600 }
1601
1602 create = (info.optype == DLM_PLOCK_OP_UNLOCK) ? 0 : 1;
1603
1604 rv = find_resource(ls, info.number, create, &r);
1605 if (rv)
1606 goto fail;
1607
1608 if (r->owner == 0) {
1609 /* plock state replicated on all nodes */
1610 send_plock(ls, r, &info);
1611
1612 } else if (r->owner == our_nodeid) {
1613 /* we are the owner of r, so our plocks are local */
1614 __receive_plock(ls, &info, our_nodeid, r);
1615
1616 } else {
1617 /* r owner is -1: r is new, try to become the owner;
1618 r owner > 0: tell other owner to give up ownership;
1619 both done with a message trying to set owner to ourself */
1620 send_own(ls, r, our_nodeid);
1621 save_pending_plock(ls, r, &info);
1622 }
1623
1624 if (opt(plock_ownership_ind) && !list_empty(&ls->plock_resources))
1625 poll_drop_plock = 1;
1626 return;
1627
1628 fail:
1629 if (!(info.flags & DLM_PLOCK_FL_CLOSE))
1630 write_result(&info, rv);
1631 }
1632
1633 void process_saved_plocks(struct lockspace *ls)
1634 {
1635 struct save_msg *sm, *sm2;
1636 struct dlm_header *hd;
1637 int count = 0;
1638
1639 log_plock(ls, "process_saved_plocks begin");
1640
1641 if (list_empty(&ls->saved_messages))
1642 goto out;
1643
1644 list_for_each_entry_safe(sm, sm2, &ls->saved_messages, list) {
1645 hd = (struct dlm_header *)sm->buf;
1646
1647 switch (sm->type) {
1648 case DLM_MSG_PLOCK:
1649 _receive_plock(ls, hd, sm->len);
1650 break;
1651 case DLM_MSG_PLOCK_OWN:
1652 _receive_own(ls, hd, sm->len);
1653 break;
1654 case DLM_MSG_PLOCK_DROP:
1655 _receive_drop(ls, hd, sm->len);
1656 break;
1657 case DLM_MSG_PLOCK_SYNC_LOCK:
1658 case DLM_MSG_PLOCK_SYNC_WAITER:
1659 _receive_sync(ls, hd, sm->len);
1660 break;
1661 default:
1662 continue;
1663 }
1664
1665 list_del(&sm->list);
1666 free(sm);
1667 count++;
1668 }
1669 out:
1670 log_plock(ls, "process_saved_plocks %d done", count);
1671 }
1672
1673 /* locks still marked SYNCING should not go into the ckpt; the new node
1674 will get those locks by receiving PLOCK_SYNC messages */
1675
1676 #define MAX_SEND_SIZE 1024 /* 1024 holds 24 plock_data */
1677
1678 static char send_buf[MAX_SEND_SIZE];
1679
1680 static int pack_send_buf(struct lockspace *ls, struct resource *r, int owner,
1681 int full, int *count_out, void **last)
1682 {
1683 struct resource_data *rd;
1684 struct plock_data *pp;
1685 struct posix_lock *po;
1686 struct lock_waiter *w;
1687 int count = 0;
1688 int find = 0;
1689 int len;
1690
1691 /* N.B. owner not always equal to r->owner */
1692 rd = (struct resource_data *)(send_buf + sizeof(struct dlm_header));
1693 rd->number = cpu_to_le64(r->number);
1694 rd->owner = cpu_to_le32(owner);
1695
1696 if (full) {
1697 rd->flags = RD_CONTINUE;
1698 find = 1;
1699 }
1700
1701 /* plocks not replicated for owned resources */
1702 if (opt(plock_ownership_ind) && (owner == our_nodeid))
1703 goto done;
1704
1705 len = sizeof(struct dlm_header) + sizeof(struct resource_data);
1706
1707 pp = (struct plock_data *)(send_buf + sizeof(struct dlm_header) + sizeof(struct resource_data));
1708
1709 list_for_each_entry(po, &r->locks, list) {
1710 if (find && *last != po)
1711 continue;
1712 find = 0;
1713
1714 if (po->flags & P_SYNCING)
1715 continue;
1716
1717 if (len + sizeof(struct plock_data) > sizeof(send_buf)) {
1718 *last = po;
1719 goto full;
1720 }
1721 len += sizeof(struct plock_data);
1722
1723 pp->start = cpu_to_le64(po->start);
1724 pp->end = cpu_to_le64(po->end);
1725 pp->owner = cpu_to_le64(po->owner);
1726 pp->pid = cpu_to_le32(po->pid);
1727 pp->nodeid = cpu_to_le32(po->nodeid);
1728 pp->ex = po->ex;
1729 pp->waiter = 0;
1730 pp++;
1731 count++;
1732 }
1733
1734 list_for_each_entry(w, &r->waiters, list) {
1735 if (find && *last != w)
1736 continue;
1737 find = 0;
1738
1739 if (w->flags & P_SYNCING)
1740 continue;
1741
1742 if (len + sizeof(struct plock_data) > sizeof(send_buf)) {
1743 *last = w;
1744 goto full;
1745 }
1746 len += sizeof(struct plock_data);
1747
1748 pp->start = cpu_to_le64(w->info.start);
1749 pp->end = cpu_to_le64(w->info.end);
1750 pp->owner = cpu_to_le64(w->info.owner);
1751 pp->pid = cpu_to_le32(w->info.pid);
1752 pp->nodeid = cpu_to_le32(w->info.nodeid);
1753 pp->ex = w->info.ex;
1754 pp->waiter = 1;
1755 pp++;
1756 count++;
1757 }
1758 done:
1759 rd->lock_count = cpu_to_le32(count);
1760 *count_out = count;
1761 *last = NULL;
1762 return 0;
1763
1764 full:
1765 rd->lock_count = cpu_to_le32(count);
1766 *count_out = count;
1767 return 1;
1768 }
1769
1770 /* Copy all plock state into a checkpoint so new node can retrieve it. The
1771 node creating the ckpt for the mounter needs to be the same node that's
1772 sending the mounter its journals message (i.e. the low nodeid). The new
1773 mounter knows the ckpt is ready to read only after it gets its journals
1774 message.
1775
1776 If the mounter is becoming the new low nodeid in the group, the node doing
1777 the store closes the ckpt and the new node unlinks the ckpt after reading
1778 it. The ckpt should then disappear and the new node can create a new ckpt
1779 for the next mounter. */
1780
1781 static int send_plocks_data(struct lockspace *ls, uint32_t seq, char *buf, int len)
1782 {
1783 struct dlm_header *hd;
1784
1785 hd = (struct dlm_header *)buf;
1786 hd->type = DLM_MSG_PLOCKS_DATA;
1787 hd->msgdata = seq;
1788
1789 dlm_send_message(ls, buf, len);
1790
1791 return 0;
1792 }
1793
1794 void send_all_plocks_data(struct lockspace *ls, uint32_t seq, uint32_t *plocks_data)
1795 {
1796 struct resource *r;
1797 void *last;
1798 int owner, count, len, full;
1799 uint32_t send_count = 0;
1800
1801 if (!opt(enable_plock_ind) || ls->disable_plock)
1802 return;
1803
1804 log_dlock(ls, "send_all_plocks_data %d:%u", our_nodeid, seq);
1805
1806 /* - If r owner is -1, ckpt nothing.
1807 - If r owner is us, ckpt owner of us and no plocks.
1808 - If r owner is other, ckpt that owner and any plocks we have on r
1809 (they've just been synced but owner=0 msg not recved yet).
1810 - If r owner is 0 and !got_unown, then we've just unowned r;
1811 ckpt owner of us and any plocks that don't have SYNCING set
1812 (plocks with SYNCING will be handled by our sync messages).
1813 - If r owner is 0 and got_unown, then ckpt owner 0 and all plocks;
1814 (there should be no SYNCING plocks) */
1815
1816 list_for_each_entry(r, &ls->plock_resources, list) {
1817 if (!opt(plock_ownership_ind))
1818 owner = 0;
1819 else if (r->owner == -1)
1820 continue;
1821 else if (r->owner == our_nodeid)
1822 owner = our_nodeid;
1823 else if (r->owner)
1824 owner = r->owner;
1825 else if (!r->owner && !got_unown(r))
1826 owner = our_nodeid;
1827 else if (!r->owner)
1828 owner = 0;
1829 else {
1830 log_elock(ls, "send_all_plocks_data error owner %d r %llx",
1831 r->owner, (unsigned long long)r->number);
1832 continue;
1833 }
1834
1835 memset(&send_buf, 0, sizeof(send_buf));
1836 count = 0;
1837 full = 0;
1838 last = NULL;
1839
1840 do {
1841 full = pack_send_buf(ls, r, owner, full, &count, &last);
1842
1843 len = sizeof(struct dlm_header) +
1844 sizeof(struct resource_data) +
1845 sizeof(struct plock_data) * count;
1846
1847 log_plock(ls, "send_plocks_data %d:%u n %llu o %d locks %d len %d",
1848 our_nodeid, seq, (unsigned long long)r->number, r->owner,
1849 count, len);
1850
1851 send_plocks_data(ls, seq, send_buf, len);
1852
1853 send_count++;
1854
1855 } while (full);
1856 }
1857
1858 *plocks_data = send_count;
1859
1860 log_dlock(ls, "send_all_plocks_data %d:%u %u done",
1861 our_nodeid, seq, send_count);
1862 }
1863
1864 static void free_r_lists(struct resource *r)
1865 {
1866 struct posix_lock *po, *po2;
1867 struct lock_waiter *w, *w2;
1868
1869 list_for_each_entry_safe(po, po2, &r->locks, list) {
1870 list_del(&po->list);
1871 free(po);
1872 }
1873
1874 list_for_each_entry_safe(w, w2, &r->waiters, list) {
1875 list_del(&w->list);
1876 free(w);
1877 }
1878 }
1879
1880 void receive_plocks_data(struct lockspace *ls, struct dlm_header *hd, int len)
1881 {
1882 struct resource_data *rd;
1883 struct plock_data *pp;
1884 struct posix_lock *po;
1885 struct lock_waiter *w;
1886 struct resource *r;
1887 uint64_t num;
1888 uint32_t count;
1889 uint32_t flags;
1890 int owner;
1891 int i;
1892
1893 if (!opt(enable_plock_ind) || ls->disable_plock)
1894 return;
1895
1896 if (!ls->need_plocks)
1897 return;
1898
1899 if (!ls->save_plocks)
1900 return;
1901
1902 ls->recv_plocks_data_count++;
1903
1904 if (len < sizeof(struct dlm_header) + sizeof(struct resource_data)) {
1905 log_elock(ls, "recv_plocks_data %d:%u bad len %d",
1906 hd->nodeid, hd->msgdata, len);
1907 return;
1908 }
1909
1910 rd = (struct resource_data *)((char *)hd + sizeof(struct dlm_header));
1911 num = le64_to_cpu(rd->number);
1912 owner = le32_to_cpu(rd->owner);
1913 count = le32_to_cpu(rd->lock_count);
1914 flags = le32_to_cpu(rd->flags);
1915
1916 if (flags & RD_CONTINUE) {
1917 r = search_resource(ls, num);
1918 if (!r) {
1919 log_elock(ls, "recv_plocks_data %d:%u n %llu not found",
1920 hd->nodeid, hd->msgdata, (unsigned long long)num);
1921 return;
1922 }
1923 log_plock(ls, "recv_plocks_data %d:%u n %llu continue",
1924 hd->nodeid, hd->msgdata, (unsigned long long)num);
1925 goto unpack;
1926 }
1927
1928 r = malloc(sizeof(struct resource));
1929 if (!r) {
1930 log_elock(ls, "recv_plocks_data %d:%u n %llu no mem",
1931 hd->nodeid, hd->msgdata, (unsigned long long)num);
1932 return;
1933 }
1934 memset(r, 0, sizeof(struct resource));
1935 INIT_LIST_HEAD(&r->locks);
1936 INIT_LIST_HEAD(&r->waiters);
1937 INIT_LIST_HEAD(&r->pending);
1938
1939 if (!opt(plock_ownership_ind)) {
1940 if (owner) {
1941 log_elock(ls, "recv_plocks_data %d:%u n %llu bad owner %d",
1942 hd->nodeid, hd->msgdata, (unsigned long long)num,
1943 owner);
1944 goto fail_free;
1945 }
1946 } else {
1947 if (!owner)
1948 r->flags |= R_GOT_UNOWN;
1949
1950 /* no locks should be included for owned resources */
1951
1952 if (owner && count) {
1953 log_elock(ls, "recv_plocks_data %d:%u n %llu o %d bad count %" PRIu32,
1954 hd->nodeid, hd->msgdata,
1955 (unsigned long long)num, owner, count);
1956 goto fail_free;
1957 }
1958 }
1959
1960 r->number = num;
1961 r->owner = owner;
1962
1963 unpack:
1964 if (len < sizeof(struct dlm_header) +
1965 sizeof(struct resource_data) +
1966 sizeof(struct plock_data) * count) {
1967 log_elock(ls, "recv_plocks_data %d:%u count %u bad len %d",
1968 hd->nodeid, hd->msgdata, count, len);
1969 goto fail_free;
1970 }
1971
1972 pp = (struct plock_data *)((char *)rd + sizeof(struct resource_data));
1973
1974 for (i = 0; i < count; i++) {
1975 if (!pp->waiter) {
1976 po = malloc(sizeof(struct posix_lock));
1977 if (!po)
1978 goto fail_free;
1979 po->start = le64_to_cpu(pp->start);
1980 po->end = le64_to_cpu(pp->end);
1981 po->owner = le64_to_cpu(pp->owner);
1982 po->pid = le32_to_cpu(pp->pid);
1983 po->nodeid = le32_to_cpu(pp->nodeid);
1984 po->ex = pp->ex;
1985 po->flags = 0;
1986 list_add_tail(&po->list, &r->locks);
1987 } else {
1988 w = malloc(sizeof(struct lock_waiter));
1989 if (!w)
1990 goto fail_free;
1991 w->info.start = le64_to_cpu(pp->start);
1992 w->info.end = le64_to_cpu(pp->end);
1993 w->info.owner = le64_to_cpu(pp->owner);
1994 w->info.pid = le32_to_cpu(pp->pid);
1995 w->info.nodeid = le32_to_cpu(pp->nodeid);
1996 w->info.ex = pp->ex;
1997 w->flags = 0;
1998 list_add_tail(&w->list, &r->waiters);
1999 }
2000 pp++;
2001 }
2002
2003 log_plock(ls, "recv_plocks_data %d:%u n %llu o %d locks %d len %d",
2004 hd->nodeid, hd->msgdata, (unsigned long long)r->number,
2005 r->owner, count, len);
2006
2007 if (!(flags & RD_CONTINUE)) {
2008 list_add_tail(&r->list, &ls->plock_resources);
2009 rb_insert_plock_resource(ls, r);
2010 }
2011 return;
2012
2013 fail_free:
2014 if (!(flags & RD_CONTINUE)) {
2015 free_r_lists(r);
2016 free(r);
2017 }
2018 return;
2019 }
2020
2021 void clear_plocks_data(struct lockspace *ls)
2022 {
2023 struct resource *r, *r2;
2024 uint32_t count = 0;
2025
2026 if (!opt(enable_plock_ind) || ls->disable_plock)
2027 return;
2028
2029 list_for_each_entry_safe(r, r2, &ls->plock_resources, list) {
2030 free_r_lists(r);
2031 rb_del_plock_resource(ls, r);
2032 list_del(&r->list);
2033 free(r);
2034 count++;
2035 }
2036
2037 log_dlock(ls, "clear_plocks_data done %u recv_plocks_data_count %u",
2038 count, ls->recv_plocks_data_count);
2039
2040 ls->recv_plocks_data_count = 0;
2041 }
2042
2043 /* Called when a node has failed, or we're unmounting. For a node failure, we
2044 need to call this when the cpg confchg arrives so that we're guaranteed all
2045 nodes do this in the same sequence wrt other messages. */
2046
2047 void purge_plocks(struct lockspace *ls, int nodeid, int unmount)
2048 {
2049 struct posix_lock *po, *po2;
2050 struct lock_waiter *w, *w2;
2051 struct resource *r, *r2;
2052 int purged = 0;
2053
2054 if (!opt(enable_plock_ind) || ls->disable_plock)
2055 return;
2056
2057 list_for_each_entry_safe(r, r2, &ls->plock_resources, list) {
2058 list_for_each_entry_safe(po, po2, &r->locks, list) {
2059 if (po->nodeid == nodeid || unmount) {
2060 list_del(&po->list);
2061 free(po);
2062 purged++;
2063 }
2064 }
2065
2066 list_for_each_entry_safe(w, w2, &r->waiters, list) {
2067 if (w->info.nodeid == nodeid || unmount) {
2068 list_del(&w->list);
2069 free(w);
2070 purged++;
2071 }
2072 }
2073
2074 /* TODO: haven't thought carefully about how this transition
2075 to owner 0 might interact with other owner messages in
2076 progress. */
2077
2078 if (r->owner == nodeid) {
2079 r->owner = 0;
2080 r->flags |= R_GOT_UNOWN;
2081 r->flags |= R_PURGE_UNOWN;
2082 send_pending_plocks(ls, r);
2083 }
2084
2085 do_waiters(ls, r);
2086
2087 if (!opt(plock_ownership_ind) &&
2088 list_empty(&r->locks) && list_empty(&r->waiters)) {
2089 rb_del_plock_resource(ls, r);
2090 list_del(&r->list);
2091 free(r);
2092 }
2093 }
2094
2095 if (purged)
2096 ls->last_plock_time = monotime();
2097
2098 log_dlock(ls, "purged %d plocks for %d", purged, nodeid);
2099 }
2100
2101 int copy_plock_state(struct lockspace *ls, char *buf, int *len_out)
2102 {
2103 struct posix_lock *po;
2104 struct lock_waiter *w;
2105 struct resource *r;
2106 struct timeval now;
2107 int rv = 0;
2108 int len = DLMC_DUMP_SIZE, pos = 0, ret;
2109
2110 gettimeofday(&now, NULL);
2111
2112 list_for_each_entry(r, &ls->plock_resources, list) {
2113
2114 if (list_empty(&r->locks) &&
2115 list_empty(&r->waiters) &&
2116 list_empty(&r->pending)) {
2117 ret = snprintf(buf + pos, len - pos,
2118 "%llu rown %d unused_ms %llu\n",
2119 (unsigned long long)r->number, r->owner,
2120 (unsigned long long)time_diff_ms(&r->last_access,
2121 &now));
2122 if (ret >= len - pos) {
2123 rv = -ENOSPC;
2124 goto out;
2125 }
2126 pos += ret;
2127 continue;
2128 }
2129
2130 list_for_each_entry(po, &r->locks, list) {
2131 ret = snprintf(buf + pos, len - pos,
2132 "%llu %s %llu-%llu nodeid %d pid %u owner %llx rown %d\n",
2133 (unsigned long long)r->number,
2134 po->ex ? "WR" : "RD",
2135 (unsigned long long)po->start,
2136 (unsigned long long)po->end,
2137 po->nodeid, po->pid,
2138 (unsigned long long)po->owner, r->owner);
2139
2140 if (ret >= len - pos) {
2141 rv = -ENOSPC;
2142 goto out;
2143 }
2144 pos += ret;
2145 }
2146
2147 list_for_each_entry(w, &r->waiters, list) {
2148 ret = snprintf(buf + pos, len - pos,
2149 "%llu %s %llu-%llu nodeid %d pid %u owner %llx rown %d WAITING\n",
2150 (unsigned long long)r->number,
2151 w->info.ex ? "WR" : "RD",
2152 (unsigned long long)w->info.start,
2153 (unsigned long long)w->info.end,
2154 w->info.nodeid, w->info.pid,
2155 (unsigned long long)w->info.owner, r->owner);
2156
2157 if (ret >= len - pos) {
2158 rv = -ENOSPC;
2159 goto out;
2160 }
2161 pos += ret;
2162 }
2163
2164 list_for_each_entry(w, &r->pending, list) {
2165 ret = snprintf(buf + pos, len - pos,
2166 "%llu %s %llu-%llu nodeid %d pid %u owner %llx rown %d PENDING\n",
2167 (unsigned long long)r->number,
2168 w->info.ex ? "WR" : "RD",
2169 (unsigned long long)w->info.start,
2170 (unsigned long long)w->info.end,
2171 w->info.nodeid, w->info.pid,
2172 (unsigned long long)w->info.owner, r->owner);
2173
2174 if (ret >= len - pos) {
2175 rv = -ENOSPC;
2176 goto out;
2177 }
2178 pos += ret;
2179 }
2180 }
2181 out:
2182 *len_out = pos;
2183 return rv;
2184 }
2185
2186