1 /*
2 * Copyright 2004-2012 Red Hat, Inc.
3 *
4 * This copyrighted material is made available to anyone wishing to use,
5 * modify, copy, or redistribute it subject to the terms and conditions
6 * of the GNU General Public License v2 or (at your option) any later version.
7 */
8
9 #include "dlm_daemon.h"
10 #include <linux/dlm_plock.h>
11
12 #ifndef DLM_PLOCK_OP_CANCEL
13 #define DLM_PLOCK_OP_CANCEL 4
14 #endif
15
16 static uint32_t plock_read_count;
17 static uint32_t plock_recv_count;
18 static uint32_t plock_rate_delays;
19 static struct timeval plock_read_time;
20 static struct timeval plock_recv_time;
21 static struct timeval plock_rate_last;
22
23 static int plock_device_fd = -1;
24
25 #define RD_CONTINUE 0x00000001
26
27 struct resource_data {
28 uint64_t number;
29 int owner;
30 uint32_t lock_count;
31 uint32_t flags;
32 uint32_t pad;
33 };
34
35 struct plock_data {
36 uint64_t start;
37 uint64_t end;
38 uint64_t owner;
39 uint32_t pid;
40 uint32_t nodeid;
41 uint8_t ex;
42 uint8_t waiter;
43 uint16_t pad1;
44 uint32_t pad;
45 };
46
47 #define R_GOT_UNOWN 0x00000001 /* have received owner=0 message */
48 #define R_SEND_UNOWN 0x00000002 /* have sent owner=0 message */
49 #define R_SEND_OWN 0x00000004 /* have sent owner=our_nodeid message */
50 #define R_PURGE_UNOWN 0x00000008 /* set owner=0 in purge */
51 #define R_SEND_DROP 0x00000010
52
53 struct resource {
54 struct list_head list; /* list of resources */
55 uint64_t number;
56 int owner; /* nodeid or 0 for unowned */
57 uint32_t flags;
58 struct timeval last_access;
59 struct list_head locks; /* one lock for each range */
60 struct list_head waiters;
61 struct list_head pending; /* discovering r owner */
62 struct rb_node rb_node;
63 };
64
65 #define P_SYNCING 0x00000001 /* plock has been sent as part of sync but not
66 yet received */
67
68 struct posix_lock {
69 struct list_head list; /* resource locks or waiters list */
70 uint32_t pid;
71 uint64_t owner;
72 uint64_t start;
73 uint64_t end;
74 int ex;
75 int nodeid;
76 uint32_t flags;
77 };
78
79 struct lock_waiter {
80 struct list_head list;
81 uint32_t flags;
82 struct dlm_plock_info info;
83 };
84
85 struct save_msg {
86 struct list_head list;
87 int nodeid;
88 int len;
89 int type;
90 char buf[0];
91 };
92
93
94 static void send_own(struct lockspace *ls, struct resource *r, int owner);
95 static void save_pending_plock(struct lockspace *ls, struct resource *r,
96 struct dlm_plock_info *in);
97
98
99 static int got_unown(struct resource *r)
100 {
101 return !!(r->flags & R_GOT_UNOWN);
102 }
103
104 static void info_bswap_out(struct dlm_plock_info *i)
105 {
106 i->version[0] = cpu_to_le32(i->version[0]);
107 i->version[1] = cpu_to_le32(i->version[1]);
108 i->version[2] = cpu_to_le32(i->version[2]);
109 i->pid = cpu_to_le32(i->pid);
110 i->nodeid = cpu_to_le32(i->nodeid);
111 i->rv = cpu_to_le32(i->rv);
112 i->fsid = cpu_to_le32(i->fsid);
113 i->number = cpu_to_le64(i->number);
114 i->start = cpu_to_le64(i->start);
115 i->end = cpu_to_le64(i->end);
116 i->owner = cpu_to_le64(i->owner);
117 }
118
119 static void info_bswap_in(struct dlm_plock_info *i)
120 {
121 i->version[0] = le32_to_cpu(i->version[0]);
122 i->version[1] = le32_to_cpu(i->version[1]);
123 i->version[2] = le32_to_cpu(i->version[2]);
124 i->pid = le32_to_cpu(i->pid);
125 i->nodeid = le32_to_cpu(i->nodeid);
126 i->rv = le32_to_cpu(i->rv);
127 i->fsid = le32_to_cpu(i->fsid);
128 i->number = le64_to_cpu(i->number);
129 i->start = le64_to_cpu(i->start);
130 i->end = le64_to_cpu(i->end);
131 i->owner = le64_to_cpu(i->owner);
132 }
133
134 static const char *op_str(int optype)
135 {
136 switch (optype) {
137 case DLM_PLOCK_OP_LOCK:
138 return "LK";
139 case DLM_PLOCK_OP_CANCEL:
140 return "CL";
141 case DLM_PLOCK_OP_UNLOCK:
142 return "UN";
143 case DLM_PLOCK_OP_GET:
144 return "GET";
145 default:
146 return "??";
147 }
148 }
149
150 static const char *ex_str(int optype, int ex)
151 {
152 if (optype == DLM_PLOCK_OP_UNLOCK || optype == DLM_PLOCK_OP_GET)
153 return "-";
154 if (ex)
155 return "WR";
156 else
157 return "RD";
158 }
159
160 int setup_plocks(void)
161 {
162 plock_read_count = 0;
163 plock_recv_count = 0;
164 plock_rate_delays = 0;
165 gettimeofday(&plock_read_time, NULL);
166 gettimeofday(&plock_recv_time, NULL);
167 gettimeofday(&plock_rate_last, NULL);
168
169 if (plock_minor) {
170 plock_device_fd = open("/dev/misc/dlm_plock", O_RDWR);
171 }
172
173 if (plock_device_fd < 0) {
174 log_error("Failure to open plock device: %s", strerror(errno));
175 return -1;
176 }
177
178 log_debug("plocks %d", plock_device_fd);
179
180 return plock_device_fd;
181 }
182
183 void close_plocks(void)
184 {
185 if (plock_device_fd > 0)
186 close(plock_device_fd);
187 }
188
189 /* FIXME: unify these two */
190
191 static unsigned long time_diff_ms(struct timeval *begin, struct timeval *end)
192 {
193 struct timeval result;
194 timersub(end, begin, &result);
195 return (result.tv_sec * 1000) + (result.tv_usec / 1000);
196 }
197
198 static uint64_t dt_usec(const struct timeval *start, const struct timeval *stop)
199 {
200 uint64_t dt;
201
202 dt = stop->tv_sec - start->tv_sec;
203 dt *= 1000000;
204 dt += stop->tv_usec - start->tv_usec;
205 return dt;
206 }
207
208 static struct resource * rb_search_plock_resource(struct lockspace *ls, uint64_t number)
209 {
210 struct rb_node *n = ls->plock_resources_root.rb_node;
211 struct resource *r;
212
213 while (n) {
214 r = rb_entry(n, struct resource, rb_node);
215 if (number < r->number)
216 n = n->rb_left;
217 else if (number > r->number)
218 n = n->rb_right;
219 else
220 return r;
221 }
222 return NULL;
223 }
224
225 static void rb_insert_plock_resource(struct lockspace *ls, struct resource *r)
226 {
227 struct resource *entry;
228 struct rb_node **p;
229 struct rb_node *parent = NULL;
230
231 p = &ls->plock_resources_root.rb_node;
232 while (*p) {
233 parent = *p;
234 entry = rb_entry(parent, struct resource, rb_node);
235 if (r->number < entry->number)
236 p = &parent->rb_left;
237 else if (r->number > entry->number)
238 p = &parent->rb_right;
239 else
240 return;
241 }
242 rb_link_node(&r->rb_node, parent, p);
243 rb_insert_color(&r->rb_node, &ls->plock_resources_root);
244 }
245
246 static void rb_del_plock_resource(struct lockspace *ls, struct resource *r)
247 {
248 if (!RB_EMPTY_NODE(&r->rb_node)) {
249 rb_erase(&r->rb_node, &ls->plock_resources_root);
250 RB_CLEAR_NODE(&r->rb_node);
251 }
252 }
253
254 static struct resource *search_resource(struct lockspace *ls, uint64_t number)
255 {
256 struct resource *r;
257
258 list_for_each_entry(r, &ls->plock_resources, list) {
259 if (r->number == number)
260 return r;
261 }
262 return NULL;
263 }
264
265 static int find_resource(struct lockspace *ls, uint64_t number, int create,
266 struct resource **r_out)
267 {
268 struct resource *r = NULL;
269 int rv = 0;
270
271 r = rb_search_plock_resource(ls, number);
272 if (r)
273 goto out;
274
275 if (create == 0) {
276 rv = -ENOENT;
277 goto out;
278 }
279
280 r = malloc(sizeof(struct resource));
281 if (!r) {
282 log_elock(ls, "find_resource no memory %d", errno);
283 rv = -ENOMEM;
284 goto out;
285 }
286
287 memset(r, 0, sizeof(struct resource));
288 r->number = number;
289 INIT_LIST_HEAD(&r->locks);
290 INIT_LIST_HEAD(&r->waiters);
291 INIT_LIST_HEAD(&r->pending);
292
293 if (opt(plock_ownership_ind))
294 r->owner = -1;
295 else
296 r->owner = 0;
297
298 list_add_tail(&r->list, &ls->plock_resources);
299 rb_insert_plock_resource(ls, r);
300 out:
301 if (r)
302 gettimeofday(&r->last_access, NULL);
303 *r_out = r;
304 return rv;
305 }
306
307 static void put_resource(struct lockspace *ls, struct resource *r)
308 {
309 /* with ownership, resources are only freed via drop messages */
310 if (opt(plock_ownership_ind))
311 return;
312
313 if (list_empty(&r->locks) && list_empty(&r->waiters)) {
314 rb_del_plock_resource(ls, r);
315 list_del(&r->list);
316 free(r);
317 }
318 }
319
320 static inline int ranges_overlap(uint64_t start1, uint64_t end1,
321 uint64_t start2, uint64_t end2)
322 {
323 if (end1 < start2 || start1 > end2)
324 return 0;
325 return 1;
326 }
327
328 /**
329 * overlap_type - returns a value based on the type of overlap
330 * @s1 - start of new lock range
331 * @e1 - end of new lock range
332 * @s2 - start of existing lock range
333 * @e2 - end of existing lock range
334 *
335 */
336
337 static int overlap_type(uint64_t s1, uint64_t e1, uint64_t s2, uint64_t e2)
338 {
339 int ret;
340
341 /*
342 * ---r1---
343 * ---r2---
344 */
345
346 if (s1 == s2 && e1 == e2)
347 ret = 0;
348
349 /*
350 * --r1--
351 * ---r2---
352 */
353
354 else if (s1 == s2 && e1 < e2)
355 ret = 1;
356
357 /*
358 * --r1--
359 * ---r2---
360 */
361
362 else if (s1 > s2 && e1 == e2)
363 ret = 1;
364
365 /*
366 * --r1--
367 * ---r2---
368 */
369
370 else if (s1 > s2 && e1 < e2)
371 ret = 2;
372
373 /*
374 * ---r1--- or ---r1--- or ---r1---
375 * --r2-- --r2-- --r2--
376 */
377
378 else if (s1 <= s2 && e1 >= e2)
379 ret = 3;
380
381 /*
382 * ---r1---
383 * ---r2---
384 */
385
386 else if (s1 > s2 && e1 > e2)
387 ret = 4;
388
389 /*
390 * ---r1---
391 * ---r2---
392 */
393
394 else if (s1 < s2 && e1 < e2)
395 ret = 4;
396
397 else
398 ret = -1;
399
400 return ret;
401 }
402
403 /* shrink the range start2:end2 by the partially overlapping start:end */
404
405 static int shrink_range2(uint64_t *start2, uint64_t *end2,
406 uint64_t start, uint64_t end)
407 {
408 int error = 0;
409
410 if (*start2 < start)
411 *end2 = start - 1;
412 else if (*end2 > end)
413 *start2 = end + 1;
414 else
415 error = -1;
416 return error;
417 }
418
419 static int shrink_range(struct posix_lock *po, uint64_t start, uint64_t end)
420 {
421 return shrink_range2(&po->start, &po->end, start, end);
422 }
423
424 static int is_conflict(struct resource *r, struct dlm_plock_info *in, int get)
425 {
426 struct posix_lock *po;
427
428 list_for_each_entry(po, &r->locks, list) {
429 if (po->nodeid == in->nodeid && po->owner == in->owner)
430 continue;
431 if (!ranges_overlap(po->start, po->end, in->start, in->end))
432 continue;
433
434 if (in->ex || po->ex) {
435 if (get) {
436 in->ex = po->ex;
437 in->pid = po->pid;
438 in->start = po->start;
439 in->end = po->end;
440 }
441 return 1;
442 }
443 }
444 return 0;
445 }
446
447 static int add_lock(struct resource *r, uint32_t nodeid, uint64_t owner,
448 uint32_t pid, int ex, uint64_t start, uint64_t end)
449 {
450 struct posix_lock *po;
451
452 po = malloc(sizeof(struct posix_lock));
453 if (!po)
454 return -ENOMEM;
455
456 po->start = start;
457 po->end = end;
458 po->nodeid = nodeid;
459 po->owner = owner;
460 po->pid = pid;
461 po->ex = ex;
462 po->flags = 0;
463 list_add_tail(&po->list, &r->locks);
464
465 return 0;
466 }
467
468 /* RN within RE (and starts or ends on RE boundary)
469 1. add new lock for non-overlap area of RE, orig mode
470 2. convert RE to RN range and mode */
471
472 static int lock_case1(struct posix_lock *po, struct resource *r,
473 struct dlm_plock_info *in)
474 {
475 uint64_t start2, end2;
476 int rv;
477
478 /* non-overlapping area start2:end2 */
479 start2 = po->start;
480 end2 = po->end;
481 rv = shrink_range2(&start2, &end2, in->start, in->end);
482 if (rv)
483 goto out;
484
485 po->start = in->start;
486 po->end = in->end;
487 po->ex = in->ex;
488
489 rv = add_lock(r, in->nodeid, in->owner, in->pid, !in->ex, start2, end2);
490 out:
491 return rv;
492 }
493
494 /* RN within RE (RE overlaps RN on both sides)
495 1. add new lock for front fragment, orig mode
496 2. add new lock for back fragment, orig mode
497 3. convert RE to RN range and mode */
498
499 static int lock_case2(struct posix_lock *po, struct resource *r,
500 struct dlm_plock_info *in)
501
502 {
503 int rv;
504
505 rv = add_lock(r, in->nodeid, in->owner, in->pid,
506 !in->ex, po->start, in->start - 1);
507 if (rv)
508 goto out;
509
510 rv = add_lock(r, in->nodeid, in->owner, in->pid,
511 !in->ex, in->end + 1, po->end);
512 if (rv)
513 goto out;
514
515 po->start = in->start;
516 po->end = in->end;
517 po->ex = in->ex;
518 out:
519 return rv;
520 }
521
522 static int lock_internal(struct lockspace *ls, struct resource *r,
523 struct dlm_plock_info *in)
524 {
525 struct posix_lock *po, *safe;
526 int rv = 0;
527
528 list_for_each_entry_safe(po, safe, &r->locks, list) {
529 if (po->nodeid != in->nodeid || po->owner != in->owner)
530 continue;
531 if (!ranges_overlap(po->start, po->end, in->start, in->end))
532 continue;
533
534 /* existing range (RE) overlaps new range (RN) */
535
536 switch(overlap_type(in->start, in->end, po->start, po->end)) {
537
538 case 0:
539 if (po->ex == in->ex)
540 goto out;
541
542 /* ranges the same - just update the existing lock */
543 po->ex = in->ex;
544 goto out;
545
546 case 1:
547 if (po->ex == in->ex)
548 goto out;
549
550 rv = lock_case1(po, r, in);
551 goto out;
552
553 case 2:
554 if (po->ex == in->ex)
555 goto out;
556
557 rv = lock_case2(po, r, in);
558 goto out;
559
560 case 3:
561 list_del(&po->list);
562 free(po);
563 break;
564
565 case 4:
566 if (po->start < in->start)
567 po->end = in->start - 1;
568 else
569 po->start = in->end + 1;
570 break;
571
572 default:
573 rv = -1;
574 goto out;
575 }
576 }
577
578 rv = add_lock(r, in->nodeid, in->owner, in->pid,
579 in->ex, in->start, in->end);
580 out:
581 return rv;
582
583 }
584
585 static int unlock_internal(struct lockspace *ls, struct resource *r,
586 struct dlm_plock_info *in)
587 {
588 struct posix_lock *po, *safe;
589 int rv = 0;
590
591 list_for_each_entry_safe(po, safe, &r->locks, list) {
592 if (po->nodeid != in->nodeid || po->owner != in->owner)
593 continue;
594 if (!ranges_overlap(po->start, po->end, in->start, in->end))
595 continue;
596
597 /* existing range (RE) overlaps new range (RN) */
598
599 switch (overlap_type(in->start, in->end, po->start, po->end)) {
600
601 case 0:
602 /* ranges the same - just remove the existing lock */
603
604 list_del(&po->list);
605 free(po);
606 goto out;
607
608 case 1:
609 /* RN within RE and starts or ends on RE boundary -
610 * shrink and update RE */
611
612 rv = shrink_range(po, in->start, in->end);
613 goto out;
614
615 case 2:
616 /* RN within RE - shrink and update RE to be front
617 * fragment, and add a new lock for back fragment */
618
619 rv = add_lock(r, in->nodeid, in->owner, in->pid,
620 po->ex, in->end + 1, po->end);
621 po->end = in->start - 1;
622 goto out;
623
624 case 3:
625 /* RE within RN - remove RE, then continue checking
626 * because RN could cover other locks */
627
628 list_del(&po->list);
629 free(po);
630 continue;
631
632 case 4:
633 /* front of RE in RN, or end of RE in RN - shrink and
634 * update RE, then continue because RN could cover
635 * other locks */
636
637 rv = shrink_range(po, in->start, in->end);
638 continue;
639
640 default:
641 rv = -1;
642 goto out;
643 }
644 }
645 out:
646 return rv;
647 }
648
649 static void clear_waiters(struct lockspace *ls, struct resource *r,
650 struct dlm_plock_info *in)
651 {
652 struct lock_waiter *w, *safe;
653
654 list_for_each_entry_safe(w, safe, &r->waiters, list) {
655 if (w->info.nodeid != in->nodeid || w->info.owner != in->owner)
656 continue;
657
658 list_del(&w->list);
659
660 log_dlock(ls, "clear waiter %llx %llx-%llx %d/%u/%llx",
661 (unsigned long long)in->number,
662 (unsigned long long)in->start,
663 (unsigned long long)in->end,
664 in->nodeid, in->pid,
665 (unsigned long long)in->owner);
666 free(w);
667 }
668 }
669
670 static int add_waiter(struct lockspace *ls, struct resource *r,
671 struct dlm_plock_info *in)
672
673 {
674 struct lock_waiter *w;
675
676 w = malloc(sizeof(struct lock_waiter));
677 if (!w)
678 return -ENOMEM;
679 memcpy(&w->info, in, sizeof(struct dlm_plock_info));
680 w->flags = 0;
681 list_add_tail(&w->list, &r->waiters);
682 return 0;
683 }
684
685 static void write_result(struct dlm_plock_info *in, int rv)
686 {
687 int write_rv;
688
689 in->rv = rv;
690 write_rv = write(plock_device_fd, in, sizeof(struct dlm_plock_info));
691 if (write_rv < 0)
692 log_debug("write_result: write error %d fd %d\n",
693 errno, plock_device_fd);
694 }
695
696 static void do_waiters(struct lockspace *ls, struct resource *r)
697 {
698 struct lock_waiter *w, *safe;
699 struct dlm_plock_info *in;
700 int rv;
701
702 list_for_each_entry_safe(w, safe, &r->waiters, list) {
703 in = &w->info;
704
705 if (is_conflict(r, in, 0))
706 continue;
707
708 list_del(&w->list);
709
710 /*
711 log_group(ls, "take waiter %llx %llx-%llx %d/%u/%llx",
712 in->number, in->start, in->end,
713 in->nodeid, in->pid, in->owner);
714 */
715
716 rv = lock_internal(ls, r, in);
717
718 if (in->nodeid == our_nodeid)
719 write_result(in, rv);
720
721 free(w);
722 }
723 }
724
725 static void do_lock(struct lockspace *ls, struct dlm_plock_info *in,
726 struct resource *r)
727 {
728 int rv;
729
730 if (is_conflict(r, in, 0)) {
731 if (!in->wait)
732 rv = -EAGAIN;
733 else {
734 rv = add_waiter(ls, r, in);
735 if (rv)
736 goto out;
737 rv = -EINPROGRESS;
738 }
739 } else
740 rv = lock_internal(ls, r, in);
741
742 out:
743 if (in->nodeid == our_nodeid && rv != -EINPROGRESS)
744 write_result(in, rv);
745
746 do_waiters(ls, r);
747 put_resource(ls, r);
748 }
749
750 static int remove_waiter(const struct resource *r, const struct dlm_plock_info *in)
751 {
752 struct lock_waiter *w;
753
754 list_for_each_entry(w, &r->waiters, list) {
755 if (w->info.nodeid == in->nodeid &&
756 w->info.fsid == in->fsid &&
757 w->info.number == in->number &&
758 w->info.owner == in->owner &&
759 w->info.pid == in->pid &&
760 w->info.start == in->start &&
761 w->info.end == in->end &&
762 w->info.ex == in->ex) {
763 list_del(&w->list);
764 free(w);
765 return 0;
766 }
767 }
768
769 return -ENOENT;
770 }
771
772 static void do_cancel(struct lockspace *ls, struct dlm_plock_info *in,
773 struct resource *r)
774 {
775 int rv;
776
777 rv = remove_waiter(r, in);
778 if (in->nodeid == our_nodeid)
779 write_result(in, rv);
780
781 put_resource(ls, r);
782 }
783
784 static void do_unlock(struct lockspace *ls, struct dlm_plock_info *in,
785 struct resource *r)
786 {
787 int rv;
788
789 rv = unlock_internal(ls, r, in);
790
791 if (in->flags & DLM_PLOCK_FL_CLOSE) {
792 clear_waiters(ls, r, in);
793 /* no replies for unlock-close ops */
794 goto skip_result;
795 }
796
797 if (in->nodeid == our_nodeid)
798 write_result(in, rv);
799
800 skip_result:
801 do_waiters(ls, r);
802 put_resource(ls, r);
803 }
804
805 /* we don't even get to this function if the getlk isn't from us */
806
807 static void do_get(struct lockspace *ls, struct dlm_plock_info *in,
808 struct resource *r)
809 {
810 int rv;
811
812 if (is_conflict(r, in, 1))
813 rv = 1;
814 else
815 rv = 0;
816
817 write_result(in, rv);
818 put_resource(ls, r);
819 }
820
821 static void save_message(struct lockspace *ls, struct dlm_header *hd, int len,
822 int from, int type)
823 {
824 struct save_msg *sm;
825
826 sm = malloc(sizeof(struct save_msg) + len);
827 if (!sm)
828 return;
829 memset(sm, 0, sizeof(struct save_msg) + len);
830
831 memcpy(&sm->buf, hd, len);
832 sm->type = type;
833 sm->len = len;
834 sm->nodeid = from;
835
836 log_plock(ls, "save %s from %d len %d", msg_name(type), from, len);
837
838 list_add_tail(&sm->list, &ls->saved_messages);
839 }
840
841 static void __receive_plock(struct lockspace *ls, struct dlm_plock_info *in,
842 int from, struct resource *r)
843 {
844 switch (in->optype) {
845 case DLM_PLOCK_OP_LOCK:
846 ls->last_plock_time = monotime();
847 do_lock(ls, in, r);
848 break;
849 case DLM_PLOCK_OP_CANCEL:
850 ls->last_plock_time = monotime();
851 do_cancel(ls, in, r);
852 break;
853 case DLM_PLOCK_OP_UNLOCK:
854 ls->last_plock_time = monotime();
855 do_unlock(ls, in, r);
856 break;
857 case DLM_PLOCK_OP_GET:
858 do_get(ls, in, r);
859 break;
860 default:
861 log_elock(ls, "receive_plock error from %d optype %d",
862 from, in->optype);
863 if (from == our_nodeid)
864 write_result(in, -EINVAL);
865 }
866 }
867
868 /* When ls members receive our options message (for our mount), one of them
869 saves all plock state received to that point in a checkpoint and then sends
870 us our journals message. We know to retrieve the plock state from the
871 checkpoint when we receive our journals message. Any plocks messages that
872 arrive between seeing our options message and our journals message needs to
873 be saved and processed after we synchronize our plock state from the
874 checkpoint. Any plock message received while we're mounting but before we
875 set save_plocks (when we see our options message) can be ignored because it
876 should be reflected in the checkpointed state. */
877
878 static void _receive_plock(struct lockspace *ls, struct dlm_header *hd, int len)
879 {
880 struct dlm_plock_info info;
881 struct resource *r = NULL;
882 struct timeval now;
883 uint64_t usec;
884 int from = hd->nodeid;
885 int rv, create;
886
887 memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
888 info_bswap_in(&info);
889
890 log_plock(ls, "receive plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
891 (unsigned long long)info.number,
892 op_str(info.optype),
893 ex_str(info.optype, info.ex),
894 (unsigned long long)info.start, (unsigned long long)info.end,
895 info.nodeid, info.pid, (unsigned long long)info.owner,
896 info.wait);
897
898 plock_recv_count++;
899 if (!(plock_recv_count % 1000)) {
900 gettimeofday(&now, NULL);
901 usec = dt_usec(&plock_recv_time, &now);
902 log_plock(ls, "plock_recv_count %u time %.3f s",
903 plock_recv_count, usec * 1.e-6);
904 plock_recv_time = now;
905 }
906
907 if (info.optype == DLM_PLOCK_OP_GET && from != our_nodeid)
908 return;
909
910 if (from != hd->nodeid || from != info.nodeid) {
911 log_elock(ls, "receive_plock error from %d header %d info %d",
912 from, hd->nodeid, info.nodeid);
913 return;
914 }
915
916 create = !opt(plock_ownership_ind);
917
918 rv = find_resource(ls, info.number, create, &r);
919
920 if (rv && opt(plock_ownership_ind)) {
921 /* There must have been a race with a drop, so we need to
922 ignore this plock op which will be resent. If we're the one
923 who sent the plock, we need to send_own() and put it on the
924 pending list to resend once the owner is established. */
925
926 log_plock(ls, "receive_plock from %d no r %llx", from,
927 (unsigned long long)info.number);
928
929 if (from != our_nodeid)
930 return;
931
932 rv = find_resource(ls, info.number, 1, &r);
933 if (rv)
934 return;
935 send_own(ls, r, our_nodeid);
936 save_pending_plock(ls, r, &info);
937 return;
938 }
939 if (rv) {
940 /* r not found, rv is -ENOENT, this shouldn't happen because
941 process_plocks() creates a resource for every op */
942
943 log_elock(ls, "receive_plock error from %d no r %llx %d",
944 from, (unsigned long long)info.number, rv);
945 return;
946 }
947
948 /* The owner should almost always be 0 here, but other owners may
949 be possible given odd combinations of races with drop. Odd races to
950 worry about (some seem pretty improbable):
951
952 - A sends drop, B sends plock, receive drop, receive plock.
953 This is addressed above.
954
955 - A sends drop, B sends plock, receive drop, B reads plock
956 and sends own, receive plock, on B we find owner of -1.
957
958 - A sends drop, B sends two plocks, receive drop, receive plocks.
959 Receiving the first plock is the previous case, receiving the
960 second plock will find r with owner of -1.
961
962 - A sends drop, B sends two plocks, receive drop, C sends own,
963 receive plock, B sends own, receive own (C), receive plock,
964 receive own (B).
965
966 Haven't tried to cook up a scenario that would lead to the
967 last case below; receiving a plock from ourself and finding
968 we're the owner of r. */
969
970 if (!r->owner) {
971 __receive_plock(ls, &info, from, r);
972
973 } else if (r->owner == -1) {
974 log_plock(ls, "receive_plock from %d r %llx owner %d", from,
975 (unsigned long long)info.number, r->owner);
976
977 if (from == our_nodeid)
978 save_pending_plock(ls, r, &info);
979
980 } else if (r->owner != our_nodeid) {
981 log_plock(ls, "receive_plock from %d r %llx owner %d", from,
982 (unsigned long long)info.number, r->owner);
983
984 if (from == our_nodeid)
985 save_pending_plock(ls, r, &info);
986
987 } else if (r->owner == our_nodeid) {
988 log_plock(ls, "receive_plock from %d r %llx owner %d", from,
989 (unsigned long long)info.number, r->owner);
990
991 if (from == our_nodeid)
992 __receive_plock(ls, &info, from, r);
993 }
994 }
995
996 void receive_plock(struct lockspace *ls, struct dlm_header *hd, int len)
997 {
998 if (ls->save_plocks) {
999 save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK);
1000 return;
1001 }
1002
1003 _receive_plock(ls, hd, len);
1004 }
1005
1006 static int send_struct_info(struct lockspace *ls, struct dlm_plock_info *in,
1007 int msg_type)
1008 {
1009 struct dlm_header *hd;
1010 int rv = 0, len;
1011 char *buf;
1012
1013 len = sizeof(struct dlm_header) + sizeof(struct dlm_plock_info);
1014 buf = malloc(len);
1015 if (!buf) {
1016 rv = -ENOMEM;
1017 goto out;
1018 }
1019 memset(buf, 0, len);
1020
1021 info_bswap_out(in);
1022
1023 hd = (struct dlm_header *)buf;
1024 hd->type = msg_type;
1025
1026 memcpy(buf + sizeof(struct dlm_header), in, sizeof(*in));
1027
1028 dlm_send_message(ls, buf, len);
1029
1030 free(buf);
1031 out:
1032 if (rv)
1033 log_elock(ls, "send_struct_info error %d", rv);
1034 return rv;
1035 }
1036
1037 static void send_plock(struct lockspace *ls, struct resource *r,
1038 struct dlm_plock_info *in)
1039 {
1040 send_struct_info(ls, in, DLM_MSG_PLOCK);
1041 }
1042
1043 static void send_own(struct lockspace *ls, struct resource *r, int owner)
1044 {
1045 struct dlm_plock_info info;
1046
1047 /* if we've already sent an own message for this resource,
1048 (pending list is not empty), then we shouldn't send another */
1049
1050 if (!list_empty(&r->pending)) {
1051 log_plock(ls, "send_own %llx already pending",
1052 (unsigned long long)r->number);
1053 return;
1054 }
1055
1056 if (!owner)
1057 r->flags |= R_SEND_UNOWN;
1058 else
1059 r->flags |= R_SEND_OWN;
1060
1061 memset(&info, 0, sizeof(info));
1062 info.number = r->number;
1063 info.nodeid = owner;
1064
1065 send_struct_info(ls, &info, DLM_MSG_PLOCK_OWN);
1066 }
1067
1068 static void send_syncs(struct lockspace *ls, struct resource *r)
1069 {
1070 struct dlm_plock_info info;
1071 struct posix_lock *po;
1072 struct lock_waiter *w;
1073 int rv;
1074
1075 list_for_each_entry(po, &r->locks, list) {
1076 memset(&info, 0, sizeof(info));
1077 info.number = r->number;
1078 info.start = po->start;
1079 info.end = po->end;
1080 info.nodeid = po->nodeid;
1081 info.owner = po->owner;
1082 info.pid = po->pid;
1083 info.ex = po->ex;
1084
1085 rv = send_struct_info(ls, &info, DLM_MSG_PLOCK_SYNC_LOCK);
1086 if (rv)
1087 goto out;
1088
1089 po->flags |= P_SYNCING;
1090 }
1091
1092 list_for_each_entry(w, &r->waiters, list) {
1093 memcpy(&info, &w->info, sizeof(info));
1094
1095 rv = send_struct_info(ls, &info, DLM_MSG_PLOCK_SYNC_WAITER);
1096 if (rv)
1097 goto out;
1098
1099 w->flags |= P_SYNCING;
1100 }
1101 out:
1102 return;
1103 }
1104
1105 static void send_drop(struct lockspace *ls, struct resource *r)
1106 {
1107 struct dlm_plock_info info;
1108
1109 memset(&info, 0, sizeof(info));
1110 info.number = r->number;
1111 r->flags |= R_SEND_DROP;
1112
1113 send_struct_info(ls, &info, DLM_MSG_PLOCK_DROP);
1114 }
1115
1116 /* plock op can't be handled until we know the owner value of the resource,
1117 so the op is saved on the pending list until the r owner is established */
1118
1119 static void save_pending_plock(struct lockspace *ls, struct resource *r,
1120 struct dlm_plock_info *in)
1121 {
1122 struct lock_waiter *w;
1123
1124 w = malloc(sizeof(struct lock_waiter));
1125 if (!w) {
1126 log_elock(ls, "save_pending_plock no mem");
1127 return;
1128 }
1129 memcpy(&w->info, in, sizeof(struct dlm_plock_info));
1130 w->flags = 0;
1131 list_add_tail(&w->list, &r->pending);
1132 }
1133
1134 /* plock ops are on pending list waiting for ownership to be established.
1135 owner has now become us, so add these plocks to r */
1136
1137 static void add_pending_plocks(struct lockspace *ls, struct resource *r)
1138 {
1139 struct lock_waiter *w, *safe;
1140
1141 list_for_each_entry_safe(w, safe, &r->pending, list) {
1142 __receive_plock(ls, &w->info, our_nodeid, r);
1143 list_del(&w->list);
1144 free(w);
1145 }
1146 }
1147
1148 /* plock ops are on pending list waiting for ownership to be established.
1149 owner has now become 0, so send these plocks to everyone */
1150
1151 static void send_pending_plocks(struct lockspace *ls, struct resource *r)
1152 {
1153 struct lock_waiter *w, *safe;
1154
1155 list_for_each_entry_safe(w, safe, &r->pending, list) {
1156 send_plock(ls, r, &w->info);
1157 list_del(&w->list);
1158 free(w);
1159 }
1160 }
1161
1162 static void _receive_own(struct lockspace *ls, struct dlm_header *hd, int len)
1163 {
1164 struct dlm_plock_info info;
1165 struct resource *r;
1166 int should_not_happen = 0;
1167 int from = hd->nodeid;
1168 int rv;
1169
1170 ls->last_plock_time = monotime();
1171
1172 memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
1173 info_bswap_in(&info);
1174
1175 log_plock(ls, "receive_own %llx from %u owner %u",
1176 (unsigned long long)info.number, hd->nodeid, info.nodeid);
1177
1178 rv = find_resource(ls, info.number, 1, &r);
1179 if (rv)
1180 return;
1181
1182 if (from == our_nodeid) {
1183 /*
1184 * received our own own message
1185 */
1186
1187 if (info.nodeid == 0) {
1188 /* we are setting owner to 0 */
1189
1190 if (r->owner == our_nodeid) {
1191 /* we set owner to 0 when we relinquish
1192 ownership */
1193 should_not_happen = 1;
1194 } else if (r->owner == 0) {
1195 /* this happens when we relinquish ownership */
1196 r->flags |= R_GOT_UNOWN;
1197 } else {
1198 should_not_happen = 1;
1199 }
1200
1201 } else if (info.nodeid == our_nodeid) {
1202 /* we are setting owner to ourself */
1203
1204 if (r->owner == -1) {
1205 /* we have gained ownership */
1206 r->owner = our_nodeid;
1207 add_pending_plocks(ls, r);
1208 } else if (r->owner == our_nodeid) {
1209 should_not_happen = 1;
1210 } else if (r->owner == 0) {
1211 send_pending_plocks(ls, r);
1212 } else {
1213 /* resource is owned by other node;
1214 they should set owner to 0 shortly */
1215 }
1216
1217 } else {
1218 /* we should only ever set owner to 0 or ourself */
1219 should_not_happen = 1;
1220 }
1221 } else {
1222 /*
1223 * received own message from another node
1224 */
1225
1226 if (info.nodeid == 0) {
1227 /* other node is setting owner to 0 */
1228
1229 if (r->owner == -1) {
1230 /* we should have a record of the owner before
1231 it relinquishes */
1232 should_not_happen = 1;
1233 } else if (r->owner == our_nodeid) {
1234 /* only the owner should relinquish */
1235 should_not_happen = 1;
1236 } else if (r->owner == 0) {
1237 should_not_happen = 1;
1238 } else {
1239 r->owner = 0;
1240 r->flags |= R_GOT_UNOWN;
1241 send_pending_plocks(ls, r);
1242 }
1243
1244 } else if (info.nodeid == from) {
1245 /* other node is setting owner to itself */
1246
1247 if (r->owner == -1) {
1248 /* normal path for a node becoming owner */
1249 r->owner = from;
1250 } else if (r->owner == our_nodeid) {
1251 /* we relinquish our ownership: sync our local
1252 plocks to everyone, then set owner to 0 */
1253 send_syncs(ls, r);
1254 send_own(ls, r, 0);
1255 /* we need to set owner to 0 here because
1256 local ops may arrive before we receive
1257 our send_own message and can't be added
1258 locally */
1259 r->owner = 0;
1260 } else if (r->owner == 0) {
1261 /* can happen because we set owner to 0 before
1262 we receive our send_own sent just above */
1263 } else {
1264 /* do nothing, current owner should be
1265 relinquishing its ownership */
1266 }
1267
1268 } else if (info.nodeid == our_nodeid) {
1269 /* no one else should try to set the owner to us */
1270 should_not_happen = 1;
1271 } else {
1272 /* a node should only ever set owner to 0 or itself */
1273 should_not_happen = 1;
1274 }
1275 }
1276
1277 if (should_not_happen) {
1278 log_elock(ls, "receive_own error from %u %llx "
1279 "info nodeid %d r owner %d",
1280 from, (unsigned long long)r->number,
1281 info.nodeid, r->owner);
1282 }
1283 }
1284
1285 void receive_own(struct lockspace *ls, struct dlm_header *hd, int len)
1286 {
1287 if (ls->save_plocks) {
1288 save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK_OWN);
1289 return;
1290 }
1291
1292 _receive_own(ls, hd, len);
1293 }
1294
1295 static void clear_syncing_flag(struct lockspace *ls, struct resource *r,
1296 struct dlm_plock_info *in)
1297 {
1298 struct posix_lock *po;
1299 struct lock_waiter *w;
1300
1301 list_for_each_entry(po, &r->locks, list) {
1302 if ((po->flags & P_SYNCING) &&
1303 in->start == po->start &&
1304 in->end == po->end &&
1305 in->nodeid == po->nodeid &&
1306 in->owner == po->owner &&
1307 in->pid == po->pid &&
1308 in->ex == po->ex) {
1309 po->flags &= ~P_SYNCING;
1310 return;
1311 }
1312 }
1313
1314 list_for_each_entry(w, &r->waiters, list) {
1315 if ((w->flags & P_SYNCING) &&
1316 in->start == w->info.start &&
1317 in->end == w->info.end &&
1318 in->nodeid == w->info.nodeid &&
1319 in->owner == w->info.owner &&
1320 in->pid == w->info.pid &&
1321 in->ex == w->info.ex) {
1322 w->flags &= ~P_SYNCING;
1323 return;
1324 }
1325 }
1326
1327 log_elock(ls, "clear_syncing error %llx no match %s %llx-%llx %d/%u/%llx",
1328 (unsigned long long)r->number,
1329 in->ex ? "WR" : "RD",
1330 (unsigned long long)in->start,
1331 (unsigned long long)in->end,
1332 in->nodeid, in->pid,
1333 (unsigned long long)in->owner);
1334 }
1335
1336 static void _receive_sync(struct lockspace *ls, struct dlm_header *hd, int len)
1337 {
1338 struct dlm_plock_info info;
1339 struct resource *r;
1340 int from = hd->nodeid;
1341 int rv;
1342
1343 ls->last_plock_time = monotime();
1344
1345 memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
1346 info_bswap_in(&info);
1347
1348 log_plock(ls, "receive sync %llx from %u %s %llx-%llx %d/%u/%llx",
1349 (unsigned long long)info.number, from, info.ex ? "WR" : "RD",
1350 (unsigned long long)info.start, (unsigned long long)info.end,
1351 info.nodeid, info.pid, (unsigned long long)info.owner);
1352
1353 rv = find_resource(ls, info.number, 0, &r);
1354 if (rv) {
1355 log_elock(ls, "receive_sync error no r %llx from %d",
1356 info.number, from);
1357 return;
1358 }
1359
1360 if (from == our_nodeid) {
1361 /* this plock now in sync on all nodes */
1362 clear_syncing_flag(ls, r, &info);
1363 return;
1364 }
1365
1366 if (hd->type == DLM_MSG_PLOCK_SYNC_LOCK)
1367 add_lock(r, info.nodeid, info.owner, info.pid, info.ex,
1368 info.start, info.end);
1369 else if (hd->type == DLM_MSG_PLOCK_SYNC_WAITER)
1370 add_waiter(ls, r, &info);
1371 }
1372
1373 void receive_sync(struct lockspace *ls, struct dlm_header *hd, int len)
1374 {
1375 if (ls->save_plocks) {
1376 save_message(ls, hd, len, hd->nodeid, hd->type);
1377 return;
1378 }
1379
1380 _receive_sync(ls, hd, len);
1381 }
1382
1383 static void _receive_drop(struct lockspace *ls, struct dlm_header *hd, int len)
1384 {
1385 struct dlm_plock_info info;
1386 struct resource *r;
1387 int from = hd->nodeid;
1388 int rv;
1389
1390 ls->last_plock_time = monotime();
1391
1392 memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
1393 info_bswap_in(&info);
1394
1395 log_plock(ls, "receive_drop %llx from %u",
1396 (unsigned long long)info.number, from);
1397
1398 rv = find_resource(ls, info.number, 0, &r);
1399 if (rv) {
1400 /* we'll find no r if two nodes sent drop at once */
1401 log_plock(ls, "receive_drop from %d no r %llx", from,
1402 (unsigned long long)info.number);
1403 return;
1404 }
1405
1406 if (r->owner != 0) {
1407 /* - A sent drop, B sent drop, receive drop A, C sent own,
1408 receive drop B (this warning on C, owner -1)
1409 - A sent drop, B sent drop, receive drop A, A sent own,
1410 receive own A, receive drop B (this warning on all,
1411 owner A) */
1412 log_plock(ls, "receive_drop from %d r %llx owner %d", from,
1413 (unsigned long long)r->number, r->owner);
1414 return;
1415 }
1416
1417 if (!list_empty(&r->pending)) {
1418 /* shouldn't happen */
1419 log_elock(ls, "receive_drop error from %d r %llx pending op",
1420 from, (unsigned long long)r->number);
1421 return;
1422 }
1423
1424 /* the decision to drop or not must be based on things that are
1425 guaranteed to be the same on all nodes */
1426
1427 if (list_empty(&r->locks) && list_empty(&r->waiters)) {
1428 rb_del_plock_resource(ls, r);
1429 list_del(&r->list);
1430 free(r);
1431 } else {
1432 /* A sent drop, B sent a plock, receive plock, receive drop */
1433 log_plock(ls, "receive_drop from %d r %llx in use", from,
1434 (unsigned long long)r->number);
1435 }
1436 }
1437
1438 void receive_drop(struct lockspace *ls, struct dlm_header *hd, int len)
1439 {
1440 if (ls->save_plocks) {
1441 save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK_DROP);
1442 return;
1443 }
1444
1445 _receive_drop(ls, hd, len);
1446 }
1447
1448 /* We only drop resources from the unowned state to simplify things.
1449 If we want to drop a resource we own, we unown/relinquish it first. */
1450
1451 /* FIXME: in the transition from owner = us, to owner = 0, to drop;
1452 we want the second period to be shorter than the first */
1453
1454 static int drop_resources(struct lockspace *ls)
1455 {
1456 struct resource *r;
1457 struct timeval now;
1458 int count = 0;
1459
1460 if (!opt(plock_ownership_ind))
1461 return 0;
1462
1463 if (list_empty(&ls->plock_resources))
1464 return 0;
1465
1466 gettimeofday(&now, NULL);
1467
1468 if (time_diff_ms(&ls->drop_resources_last, &now) <
1469 opt(drop_resources_time_ind))
1470 return 1;
1471
1472 ls->drop_resources_last = now;
1473
1474 /* try to drop the oldest, unused resources */
1475
1476 list_for_each_entry_reverse(r, &ls->plock_resources, list) {
1477 if (count >= opt(drop_resources_count_ind))
1478 break;
1479 if (r->owner && r->owner != our_nodeid)
1480 continue;
1481 if (time_diff_ms(&r->last_access, &now) <
1482 opt(drop_resources_age_ind))
1483 continue;
1484
1485 if (list_empty(&r->locks) && list_empty(&r->waiters)) {
1486 if (r->owner == our_nodeid) {
1487 send_own(ls, r, 0);
1488 r->owner = 0;
1489 } else if (r->owner == 0 && got_unown(r)) {
1490 send_drop(ls, r);
1491 }
1492
1493 count++;
1494 }
1495 }
1496
1497 return 1;
1498 }
1499
1500 void drop_resources_all(void)
1501 {
1502 struct lockspace *ls;
1503 int rv = 0;
1504
1505 poll_drop_plock = 0;
1506
1507 list_for_each_entry(ls, &lockspaces, list) {
1508 rv = drop_resources(ls);
1509 if (rv)
1510 poll_drop_plock = 1;
1511 }
1512 }
1513
1514 int limit_plocks(void)
1515 {
1516 struct timeval now;
1517
1518 if (!opt(plock_rate_limit_ind) || !plock_read_count)
1519 return 0;
1520
1521 gettimeofday(&now, NULL);
1522
1523 /* Every time a plock op is read from the kernel, we increment
1524 plock_read_count. After every plock_rate_limit (N) reads,
1525 we check the time it's taken to do those N; if the time is less than
1526 a second, then we delay reading any more until a second is up.
1527 This way we read a max of N ops from the kernel every second. */
1528
1529 if (!(plock_read_count % opt(plock_rate_limit_ind))) {
1530 if (time_diff_ms(&plock_rate_last, &now) < 1000) {
1531 plock_rate_delays++;
1532 return 2;
1533 }
1534 plock_rate_last = now;
1535 plock_read_count++;
1536 }
1537 return 0;
1538 }
1539
1540 void process_plocks(int ci)
1541 {
1542 struct lockspace *ls;
1543 struct resource *r;
1544 struct dlm_plock_info info;
1545 struct timeval now;
1546 uint64_t usec;
1547 int create, rv;
1548
1549 if (limit_plocks()) {
1550 poll_ignore_plock = 1;
1551 client_ignore(plock_ci, plock_fd);
1552 return;
1553 }
1554
1555 gettimeofday(&now, NULL);
1556
1557 rv = do_read(plock_device_fd, &info, sizeof(info));
1558 if (rv < 0) {
1559 log_debug("process_plocks: read error %d fd %d\n",
1560 errno, plock_device_fd);
1561 return;
1562 }
1563
1564 /* kernel doesn't set the nodeid field */
1565 info.nodeid = our_nodeid;
1566
1567 if (!opt(enable_plock_ind)) {
1568 rv = -ENOSYS;
1569 goto fail;
1570 }
1571
1572 ls = find_ls_id(info.fsid);
1573 if (!ls) {
1574 log_plock(ls, "process_plocks: no ls id %x", info.fsid);
1575 rv = -EEXIST;
1576 goto fail;
1577 }
1578
1579 if (ls->disable_plock) {
1580 rv = -ENOSYS;
1581 goto fail;
1582 }
1583
1584 log_plock(ls, "read plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
1585 (unsigned long long)info.number,
1586 op_str(info.optype),
1587 ex_str(info.optype, info.ex),
1588 (unsigned long long)info.start, (unsigned long long)info.end,
1589 info.nodeid, info.pid, (unsigned long long)info.owner,
1590 info.wait);
1591
1592 /* report plock rate and any delays since the last report */
1593 plock_read_count++;
1594 if (!(plock_read_count % 1000)) {
1595 usec = dt_usec(&plock_read_time, &now) ;
1596 log_plock(ls, "plock_read_count %u time %.3f s delays %u",
1597 plock_read_count, usec * 1.e-6, plock_rate_delays);
1598 plock_read_time = now;
1599 plock_rate_delays = 0;
1600 }
1601
1602 if (!opt(plock_ownership_ind)) {
1603 send_plock(ls, NULL, &info);
1604 return;
1605 }
1606
1607 create = (info.optype == DLM_PLOCK_OP_UNLOCK) ? 0 : 1;
1608
1609 rv = find_resource(ls, info.number, create, &r);
1610 if (rv)
1611 goto fail;
1612
1613 if (r->owner == 0) {
1614 /* plock state replicated on all nodes */
1615 send_plock(ls, r, &info);
1616
1617 } else if (r->owner == our_nodeid) {
1618 /* we are the owner of r, so our plocks are local */
1619 __receive_plock(ls, &info, our_nodeid, r);
1620
1621 } else {
1622 /* r owner is -1: r is new, try to become the owner;
1623 r owner > 0: tell other owner to give up ownership;
1624 both done with a message trying to set owner to ourself */
1625 send_own(ls, r, our_nodeid);
1626 save_pending_plock(ls, r, &info);
1627 }
1628
1629 if (opt(plock_ownership_ind) && !list_empty(&ls->plock_resources))
1630 poll_drop_plock = 1;
1631 return;
1632
1633 fail:
1634 if (!(info.flags & DLM_PLOCK_FL_CLOSE))
1635 write_result(&info, rv);
1636 }
1637
1638 void process_saved_plocks(struct lockspace *ls)
1639 {
1640 struct save_msg *sm, *sm2;
1641 struct dlm_header *hd;
1642 int count = 0;
1643
1644 log_plock(ls, "process_saved_plocks begin");
1645
1646 if (list_empty(&ls->saved_messages))
1647 goto out;
1648
1649 list_for_each_entry_safe(sm, sm2, &ls->saved_messages, list) {
1650 hd = (struct dlm_header *)sm->buf;
1651
1652 switch (sm->type) {
1653 case DLM_MSG_PLOCK:
1654 _receive_plock(ls, hd, sm->len);
1655 break;
1656 case DLM_MSG_PLOCK_OWN:
1657 _receive_own(ls, hd, sm->len);
1658 break;
1659 case DLM_MSG_PLOCK_DROP:
1660 _receive_drop(ls, hd, sm->len);
1661 break;
1662 case DLM_MSG_PLOCK_SYNC_LOCK:
1663 case DLM_MSG_PLOCK_SYNC_WAITER:
1664 _receive_sync(ls, hd, sm->len);
1665 break;
1666 default:
1667 continue;
1668 }
1669
1670 list_del(&sm->list);
1671 free(sm);
1672 count++;
1673 }
1674 out:
1675 log_plock(ls, "process_saved_plocks %d done", count);
1676 }
1677
1678 /* locks still marked SYNCING should not go into the ckpt; the new node
1679 will get those locks by receiving PLOCK_SYNC messages */
1680
1681 #define MAX_SEND_SIZE 1024 /* 1024 holds 24 plock_data */
1682
1683 static char send_buf[MAX_SEND_SIZE];
1684
1685 static int pack_send_buf(struct lockspace *ls, struct resource *r, int owner,
1686 int full, int *count_out, void **last)
1687 {
1688 struct resource_data *rd;
1689 struct plock_data *pp;
1690 struct posix_lock *po;
1691 struct lock_waiter *w;
1692 int count = 0;
1693 int find = 0;
1694 int len;
1695
1696 /* N.B. owner not always equal to r->owner */
1697 rd = (struct resource_data *)(send_buf + sizeof(struct dlm_header));
1698 rd->number = cpu_to_le64(r->number);
1699 rd->owner = cpu_to_le32(owner);
1700
1701 if (full) {
1702 rd->flags = RD_CONTINUE;
1703 find = 1;
1704 }
1705
1706 /* plocks not replicated for owned resources */
1707 if (opt(plock_ownership_ind) && (owner == our_nodeid))
1708 goto done;
1709
1710 len = sizeof(struct dlm_header) + sizeof(struct resource_data);
1711
1712 pp = (struct plock_data *)(send_buf + sizeof(struct dlm_header) + sizeof(struct resource_data));
1713
1714 list_for_each_entry(po, &r->locks, list) {
1715 if (find && *last != po)
1716 continue;
1717 find = 0;
1718
1719 if (po->flags & P_SYNCING)
1720 continue;
1721
1722 if (len + sizeof(struct plock_data) > sizeof(send_buf)) {
1723 *last = po;
1724 goto full;
1725 }
1726 len += sizeof(struct plock_data);
1727
1728 pp->start = cpu_to_le64(po->start);
1729 pp->end = cpu_to_le64(po->end);
1730 pp->owner = cpu_to_le64(po->owner);
1731 pp->pid = cpu_to_le32(po->pid);
1732 pp->nodeid = cpu_to_le32(po->nodeid);
1733 pp->ex = po->ex;
1734 pp->waiter = 0;
1735 pp++;
1736 count++;
1737 }
1738
1739 list_for_each_entry(w, &r->waiters, list) {
1740 if (find && *last != w)
1741 continue;
1742 find = 0;
1743
1744 if (w->flags & P_SYNCING)
1745 continue;
1746
1747 if (len + sizeof(struct plock_data) > sizeof(send_buf)) {
1748 *last = w;
1749 goto full;
1750 }
1751 len += sizeof(struct plock_data);
1752
1753 pp->start = cpu_to_le64(w->info.start);
1754 pp->end = cpu_to_le64(w->info.end);
1755 pp->owner = cpu_to_le64(w->info.owner);
1756 pp->pid = cpu_to_le32(w->info.pid);
1757 pp->nodeid = cpu_to_le32(w->info.nodeid);
1758 pp->ex = w->info.ex;
1759 pp->waiter = 1;
1760 pp++;
1761 count++;
1762 }
1763 done:
1764 rd->lock_count = cpu_to_le32(count);
1765 *count_out = count;
1766 *last = NULL;
1767 return 0;
1768
1769 full:
1770 rd->lock_count = cpu_to_le32(count);
1771 *count_out = count;
1772 return 1;
1773 }
1774
1775 /* Copy all plock state into a checkpoint so new node can retrieve it. The
1776 node creating the ckpt for the mounter needs to be the same node that's
1777 sending the mounter its journals message (i.e. the low nodeid). The new
1778 mounter knows the ckpt is ready to read only after it gets its journals
1779 message.
1780
1781 If the mounter is becoming the new low nodeid in the group, the node doing
1782 the store closes the ckpt and the new node unlinks the ckpt after reading
1783 it. The ckpt should then disappear and the new node can create a new ckpt
1784 for the next mounter. */
1785
1786 static int send_plocks_data(struct lockspace *ls, uint32_t seq, char *buf, int len)
1787 {
1788 struct dlm_header *hd;
1789
1790 hd = (struct dlm_header *)buf;
1791 hd->type = DLM_MSG_PLOCKS_DATA;
1792 hd->msgdata = seq;
1793
1794 dlm_send_message(ls, buf, len);
1795
1796 return 0;
1797 }
1798
1799 void send_all_plocks_data(struct lockspace *ls, uint32_t seq, uint32_t *plocks_data)
1800 {
1801 struct resource *r;
1802 void *last;
1803 int owner, count, len, full;
1804 uint32_t send_count = 0;
1805
1806 if (!opt(enable_plock_ind) || ls->disable_plock)
1807 return;
1808
1809 log_dlock(ls, "send_all_plocks_data %d:%u", our_nodeid, seq);
1810
1811 /* - If r owner is -1, ckpt nothing.
1812 - If r owner is us, ckpt owner of us and no plocks.
1813 - If r owner is other, ckpt that owner and any plocks we have on r
1814 (they've just been synced but owner=0 msg not recved yet).
1815 - If r owner is 0 and !got_unown, then we've just unowned r;
1816 ckpt owner of us and any plocks that don't have SYNCING set
1817 (plocks with SYNCING will be handled by our sync messages).
1818 - If r owner is 0 and got_unown, then ckpt owner 0 and all plocks;
1819 (there should be no SYNCING plocks) */
1820
1821 list_for_each_entry(r, &ls->plock_resources, list) {
1822 if (!opt(plock_ownership_ind))
1823 owner = 0;
1824 else if (r->owner == -1)
1825 continue;
1826 else if (r->owner == our_nodeid)
1827 owner = our_nodeid;
1828 else if (r->owner)
1829 owner = r->owner;
1830 else if (!r->owner && !got_unown(r))
1831 owner = our_nodeid;
1832 else if (!r->owner)
1833 owner = 0;
1834 else {
1835 log_elock(ls, "send_all_plocks_data error owner %d r %llx",
1836 r->owner, (unsigned long long)r->number);
1837 continue;
1838 }
1839
1840 memset(&send_buf, 0, sizeof(send_buf));
1841 count = 0;
1842 full = 0;
1843 last = NULL;
1844
1845 do {
1846 full = pack_send_buf(ls, r, owner, full, &count, &last);
1847
1848 len = sizeof(struct dlm_header) +
1849 sizeof(struct resource_data) +
1850 sizeof(struct plock_data) * count;
1851
1852 log_plock(ls, "send_plocks_data %d:%u n %llu o %d locks %d len %d",
1853 our_nodeid, seq, (unsigned long long)r->number, r->owner,
1854 count, len);
1855
1856 send_plocks_data(ls, seq, send_buf, len);
1857
1858 send_count++;
1859
1860 } while (full);
1861 }
1862
1863 *plocks_data = send_count;
1864
1865 log_dlock(ls, "send_all_plocks_data %d:%u %u done",
1866 our_nodeid, seq, send_count);
1867 }
1868
1869 static void free_r_lists(struct resource *r)
1870 {
1871 struct posix_lock *po, *po2;
1872 struct lock_waiter *w, *w2;
1873
1874 list_for_each_entry_safe(po, po2, &r->locks, list) {
1875 list_del(&po->list);
1876 free(po);
1877 }
1878
1879 list_for_each_entry_safe(w, w2, &r->waiters, list) {
1880 list_del(&w->list);
1881 free(w);
1882 }
1883 }
1884
1885 void receive_plocks_data(struct lockspace *ls, struct dlm_header *hd, int len)
1886 {
1887 struct resource_data *rd;
1888 struct plock_data *pp;
1889 struct posix_lock *po;
1890 struct lock_waiter *w;
1891 struct resource *r;
1892 uint64_t num;
1893 uint32_t count;
1894 uint32_t flags;
1895 int owner;
1896 int i;
1897
1898 if (!opt(enable_plock_ind) || ls->disable_plock)
1899 return;
1900
1901 if (!ls->need_plocks)
1902 return;
1903
1904 if (!ls->save_plocks)
1905 return;
1906
1907 ls->recv_plocks_data_count++;
1908
1909 if (len < sizeof(struct dlm_header) + sizeof(struct resource_data)) {
1910 log_elock(ls, "recv_plocks_data %d:%u bad len %d",
1911 hd->nodeid, hd->msgdata, len);
1912 return;
1913 }
1914
1915 rd = (struct resource_data *)((char *)hd + sizeof(struct dlm_header));
1916 num = le64_to_cpu(rd->number);
1917 owner = le32_to_cpu(rd->owner);
1918 count = le32_to_cpu(rd->lock_count);
1919 flags = le32_to_cpu(rd->flags);
1920
1921 if (flags & RD_CONTINUE) {
1922 r = search_resource(ls, num);
1923 if (!r) {
1924 log_elock(ls, "recv_plocks_data %d:%u n %llu not found",
1925 hd->nodeid, hd->msgdata, (unsigned long long)num);
1926 return;
1927 }
1928 log_plock(ls, "recv_plocks_data %d:%u n %llu continue",
1929 hd->nodeid, hd->msgdata, (unsigned long long)num);
1930 goto unpack;
1931 }
1932
1933 r = malloc(sizeof(struct resource));
1934 if (!r) {
1935 log_elock(ls, "recv_plocks_data %d:%u n %llu no mem",
1936 hd->nodeid, hd->msgdata, (unsigned long long)num);
1937 return;
1938 }
1939 memset(r, 0, sizeof(struct resource));
1940 INIT_LIST_HEAD(&r->locks);
1941 INIT_LIST_HEAD(&r->waiters);
1942 INIT_LIST_HEAD(&r->pending);
1943
1944 if (!opt(plock_ownership_ind)) {
1945 if (owner) {
1946 log_elock(ls, "recv_plocks_data %d:%u n %llu bad owner %d",
1947 hd->nodeid, hd->msgdata, (unsigned long long)num,
1948 owner);
1949 goto fail_free;
1950 }
1951 } else {
1952 if (!owner)
1953 r->flags |= R_GOT_UNOWN;
1954
1955 /* no locks should be included for owned resources */
1956
1957 if (owner && count) {
1958 log_elock(ls, "recv_plocks_data %d:%u n %llu o %d bad count %" PRIu32,
1959 hd->nodeid, hd->msgdata,
1960 (unsigned long long)num, owner, count);
1961 goto fail_free;
1962 }
1963 }
1964
1965 r->number = num;
1966 r->owner = owner;
1967
1968 unpack:
1969 if (len < sizeof(struct dlm_header) +
1970 sizeof(struct resource_data) +
1971 sizeof(struct plock_data) * count) {
1972 log_elock(ls, "recv_plocks_data %d:%u count %u bad len %d",
1973 hd->nodeid, hd->msgdata, count, len);
1974 goto fail_free;
1975 }
1976
1977 pp = (struct plock_data *)((char *)rd + sizeof(struct resource_data));
1978
1979 for (i = 0; i < count; i++) {
1980 if (!pp->waiter) {
1981 po = malloc(sizeof(struct posix_lock));
1982 if (!po)
1983 goto fail_free;
1984 po->start = le64_to_cpu(pp->start);
1985 po->end = le64_to_cpu(pp->end);
1986 po->owner = le64_to_cpu(pp->owner);
1987 po->pid = le32_to_cpu(pp->pid);
1988 po->nodeid = le32_to_cpu(pp->nodeid);
1989 po->ex = pp->ex;
1990 po->flags = 0;
1991 list_add_tail(&po->list, &r->locks);
1992 } else {
1993 w = malloc(sizeof(struct lock_waiter));
1994 if (!w)
1995 goto fail_free;
1996 w->info.start = le64_to_cpu(pp->start);
1997 w->info.end = le64_to_cpu(pp->end);
1998 w->info.owner = le64_to_cpu(pp->owner);
1999 w->info.pid = le32_to_cpu(pp->pid);
2000 w->info.nodeid = le32_to_cpu(pp->nodeid);
2001 w->info.ex = pp->ex;
2002 w->flags = 0;
2003 list_add_tail(&w->list, &r->waiters);
2004 }
2005 pp++;
2006 }
2007
2008 log_plock(ls, "recv_plocks_data %d:%u n %llu o %d locks %d len %d",
2009 hd->nodeid, hd->msgdata, (unsigned long long)r->number,
2010 r->owner, count, len);
2011
2012 if (!(flags & RD_CONTINUE)) {
2013 list_add_tail(&r->list, &ls->plock_resources);
2014 rb_insert_plock_resource(ls, r);
2015 }
2016 return;
2017
2018 fail_free:
2019 if (!(flags & RD_CONTINUE)) {
2020 free_r_lists(r);
2021 free(r);
2022 }
2023 return;
2024 }
2025
2026 void clear_plocks_data(struct lockspace *ls)
2027 {
2028 struct resource *r, *r2;
2029 uint32_t count = 0;
2030
2031 if (!opt(enable_plock_ind) || ls->disable_plock)
2032 return;
2033
2034 list_for_each_entry_safe(r, r2, &ls->plock_resources, list) {
2035 free_r_lists(r);
2036 rb_del_plock_resource(ls, r);
2037 list_del(&r->list);
2038 free(r);
2039 count++;
2040 }
2041
2042 log_dlock(ls, "clear_plocks_data done %u recv_plocks_data_count %u",
2043 count, ls->recv_plocks_data_count);
2044
2045 ls->recv_plocks_data_count = 0;
2046 }
2047
2048 /* Called when a node has failed, or we're unmounting. For a node failure, we
2049 need to call this when the cpg confchg arrives so that we're guaranteed all
2050 nodes do this in the same sequence wrt other messages. */
2051
2052 void purge_plocks(struct lockspace *ls, int nodeid, int unmount)
2053 {
2054 struct posix_lock *po, *po2;
2055 struct lock_waiter *w, *w2;
2056 struct resource *r, *r2;
2057 int purged = 0;
2058
2059 if (!opt(enable_plock_ind) || ls->disable_plock)
2060 return;
2061
2062 list_for_each_entry_safe(r, r2, &ls->plock_resources, list) {
2063 list_for_each_entry_safe(po, po2, &r->locks, list) {
2064 if (po->nodeid == nodeid || unmount) {
2065 list_del(&po->list);
2066 free(po);
2067 purged++;
2068 }
2069 }
2070
2071 list_for_each_entry_safe(w, w2, &r->waiters, list) {
2072 if (w->info.nodeid == nodeid || unmount) {
2073 list_del(&w->list);
2074 free(w);
2075 purged++;
2076 }
2077 }
2078
2079 /* TODO: haven't thought carefully about how this transition
2080 to owner 0 might interact with other owner messages in
2081 progress. */
2082
2083 if (r->owner == nodeid) {
2084 r->owner = 0;
2085 r->flags |= R_GOT_UNOWN;
2086 r->flags |= R_PURGE_UNOWN;
2087 send_pending_plocks(ls, r);
2088 }
2089
2090 do_waiters(ls, r);
2091
2092 if (!opt(plock_ownership_ind) &&
2093 list_empty(&r->locks) && list_empty(&r->waiters)) {
2094 rb_del_plock_resource(ls, r);
2095 list_del(&r->list);
2096 free(r);
2097 }
2098 }
2099
2100 if (purged)
2101 ls->last_plock_time = monotime();
2102
2103 log_dlock(ls, "purged %d plocks for %d", purged, nodeid);
2104 }
2105
2106 int copy_plock_state(struct lockspace *ls, char *buf, int *len_out)
2107 {
2108 struct posix_lock *po;
2109 struct lock_waiter *w;
2110 struct resource *r;
2111 struct timeval now;
2112 int rv = 0;
2113 int len = DLMC_DUMP_SIZE, pos = 0, ret;
2114
2115 gettimeofday(&now, NULL);
2116
2117 list_for_each_entry(r, &ls->plock_resources, list) {
2118
2119 if (list_empty(&r->locks) &&
2120 list_empty(&r->waiters) &&
2121 list_empty(&r->pending)) {
2122 ret = snprintf(buf + pos, len - pos,
2123 "%llu rown %d unused_ms %llu\n",
2124 (unsigned long long)r->number, r->owner,
2125 (unsigned long long)time_diff_ms(&r->last_access,
2126 &now));
2127 if (ret >= len - pos) {
2128 rv = -ENOSPC;
2129 goto out;
2130 }
2131 pos += ret;
2132 continue;
2133 }
2134
2135 list_for_each_entry(po, &r->locks, list) {
2136 ret = snprintf(buf + pos, len - pos,
2137 "%llu %s %llu-%llu nodeid %d pid %u owner %llx rown %d\n",
2138 (unsigned long long)r->number,
2139 po->ex ? "WR" : "RD",
2140 (unsigned long long)po->start,
2141 (unsigned long long)po->end,
2142 po->nodeid, po->pid,
2143 (unsigned long long)po->owner, r->owner);
2144
2145 if (ret >= len - pos) {
2146 rv = -ENOSPC;
2147 goto out;
2148 }
2149 pos += ret;
2150 }
2151
2152 list_for_each_entry(w, &r->waiters, list) {
2153 ret = snprintf(buf + pos, len - pos,
2154 "%llu %s %llu-%llu nodeid %d pid %u owner %llx rown %d WAITING\n",
2155 (unsigned long long)r->number,
2156 w->info.ex ? "WR" : "RD",
2157 (unsigned long long)w->info.start,
2158 (unsigned long long)w->info.end,
2159 w->info.nodeid, w->info.pid,
2160 (unsigned long long)w->info.owner, r->owner);
2161
2162 if (ret >= len - pos) {
2163 rv = -ENOSPC;
2164 goto out;
2165 }
2166 pos += ret;
2167 }
2168
2169 list_for_each_entry(w, &r->pending, list) {
2170 ret = snprintf(buf + pos, len - pos,
2171 "%llu %s %llu-%llu nodeid %d pid %u owner %llx rown %d PENDING\n",
2172 (unsigned long long)r->number,
2173 w->info.ex ? "WR" : "RD",
2174 (unsigned long long)w->info.start,
2175 (unsigned long long)w->info.end,
2176 w->info.nodeid, w->info.pid,
2177 (unsigned long long)w->info.owner, r->owner);
2178
2179 if (ret >= len - pos) {
2180 rv = -ENOSPC;
2181 goto out;
2182 }
2183 pos += ret;
2184 }
2185 }
2186 out:
2187 *len_out = pos;
2188 return rv;
2189 }
2190
2191