1    	/*
2    	 * Copyright 2004-2012 Red Hat, Inc.
3    	 *
4    	 * This copyrighted material is made available to anyone wishing to use,
5    	 * modify, copy, or redistribute it subject to the terms and conditions
6    	 * of the GNU General Public License v2 or (at your option) any later version.
7    	 */
8    	
9    	#include "dlm_daemon.h"
10   	#include <linux/dlm_plock.h>
11   	
12   	#ifndef DLM_PLOCK_OP_CANCEL
13   	#define DLM_PLOCK_OP_CANCEL 4
14   	#endif
15   	
16   	static uint32_t plock_read_count;
17   	static uint32_t plock_recv_count;
18   	static uint32_t plock_rate_delays;
19   	static struct timeval plock_read_time;
20   	static struct timeval plock_recv_time;
21   	static struct timeval plock_rate_last;
22   	
23   	static int plock_device_fd = -1;
24   	
25   	#define RD_CONTINUE 0x00000001
26   	
27   	struct resource_data {
28   		uint64_t number;
29   		int      owner;
30   		uint32_t lock_count;
31   		uint32_t flags;
32   		uint32_t pad;
33   	};
34   	
35   	struct plock_data {
36   		uint64_t start;
37   		uint64_t end;
38   		uint64_t owner;
39   		uint32_t pid;
40   		uint32_t nodeid;
41   		uint8_t ex;
42   		uint8_t waiter;
43   		uint16_t pad1;
44   		uint32_t pad;
45   	};
46   	
47   	#define R_GOT_UNOWN   0x00000001 /* have received owner=0 message */
48   	#define R_SEND_UNOWN  0x00000002 /* have sent owner=0 message */
49   	#define R_SEND_OWN    0x00000004 /* have sent owner=our_nodeid message */
50   	#define R_PURGE_UNOWN 0x00000008 /* set owner=0 in purge */
51   	#define R_SEND_DROP   0x00000010
52   	
53   	struct resource {
54   		struct list_head	list;	   /* list of resources */
55   		uint64_t		number;
56   		int                     owner;     /* nodeid or 0 for unowned */
57   		uint32_t		flags;
58   		struct timeval          last_access;
59   		struct list_head	locks;	   /* one lock for each range */
60   		struct list_head	waiters;
61   		struct list_head        pending;   /* discovering r owner */
62   		struct rb_node		rb_node;
63   	};
64   	
65   	#define P_SYNCING 0x00000001 /* plock has been sent as part of sync but not
66   					yet received */
67   	
68   	struct posix_lock {
69   		struct list_head	list;	   /* resource locks or waiters list */
70   		uint32_t		pid;
71   		uint64_t		owner;
72   		uint64_t		start;
73   		uint64_t		end;
74   		int			ex;
75   		int			nodeid;
76   		uint32_t		flags;
77   	};
78   	
79   	struct lock_waiter {
80   		struct list_head	list;
81   		uint32_t		flags;
82   		struct dlm_plock_info	info;
83   	};
84   	
85   	struct save_msg {
86   		struct list_head list;
87   		int nodeid;
88   		int len;
89   		int type;
90   		char buf[0];
91   	};
92   	
93   	
94   	static void send_own(struct lockspace *ls, struct resource *r, int owner);
95   	static void save_pending_plock(struct lockspace *ls, struct resource *r,
96   				       struct dlm_plock_info *in);
97   	
98   	
99   	static int got_unown(struct resource *r)
100  	{
101  		return !!(r->flags & R_GOT_UNOWN);
102  	}
103  	
104  	static void info_bswap_out(struct dlm_plock_info *i)
105  	{
106  		i->version[0]	= cpu_to_le32(i->version[0]);
107  		i->version[1]	= cpu_to_le32(i->version[1]);
108  		i->version[2]	= cpu_to_le32(i->version[2]);
109  		i->pid		= cpu_to_le32(i->pid);
110  		i->nodeid	= cpu_to_le32(i->nodeid);
111  		i->rv		= cpu_to_le32(i->rv);
112  		i->fsid		= cpu_to_le32(i->fsid);
113  		i->number	= cpu_to_le64(i->number);
114  		i->start	= cpu_to_le64(i->start);
115  		i->end		= cpu_to_le64(i->end);
116  		i->owner	= cpu_to_le64(i->owner);
117  	}
118  	
119  	static void info_bswap_in(struct dlm_plock_info *i)
120  	{
121  		i->version[0]	= le32_to_cpu(i->version[0]);
122  		i->version[1]	= le32_to_cpu(i->version[1]);
123  		i->version[2]	= le32_to_cpu(i->version[2]);
124  		i->pid		= le32_to_cpu(i->pid);
125  		i->nodeid	= le32_to_cpu(i->nodeid);
126  		i->rv		= le32_to_cpu(i->rv);
127  		i->fsid		= le32_to_cpu(i->fsid);
128  		i->number	= le64_to_cpu(i->number);
129  		i->start	= le64_to_cpu(i->start);
130  		i->end		= le64_to_cpu(i->end);
131  		i->owner	= le64_to_cpu(i->owner);
132  	}
133  	
134  	static const char *op_str(int optype)
135  	{
136  		switch (optype) {
137  		case DLM_PLOCK_OP_LOCK:
138  			return "LK";
139  		case DLM_PLOCK_OP_CANCEL:
140  			return "CL";
141  		case DLM_PLOCK_OP_UNLOCK:
142  			return "UN";
143  		case DLM_PLOCK_OP_GET:
144  			return "GET";
145  		default:
146  			return "??";
147  		}
148  	}
149  	
150  	static const char *ex_str(int optype, int ex)
151  	{
152  		if (optype == DLM_PLOCK_OP_UNLOCK || optype == DLM_PLOCK_OP_GET)
153  			return "-";
154  		if (ex)
155  			return "WR";
156  		else
157  			return "RD";
158  	}
159  	
160  	int setup_plocks(void)
161  	{
162  		plock_read_count = 0;
163  		plock_recv_count = 0;
164  		plock_rate_delays = 0;
165  		gettimeofday(&plock_read_time, NULL);
166  		gettimeofday(&plock_recv_time, NULL);
167  		gettimeofday(&plock_rate_last, NULL);
168  	
169  		if (plock_minor) {
170  			plock_device_fd = open("/dev/misc/dlm_plock", O_RDWR);
171  		}
172  	
173  		if (plock_device_fd < 0) {
174  			log_error("Failure to open plock device: %s", strerror(errno));
175  			return -1;
176  		}
177  	
178  		log_debug("plocks %d", plock_device_fd);
179  	
180  		return plock_device_fd;
181  	}
182  	
183  	void close_plocks(void)
184  	{
185  		if (plock_device_fd > 0)
186  			close(plock_device_fd);
187  	}
188  	
189  	/* FIXME: unify these two */
190  	
191  	static unsigned long time_diff_ms(struct timeval *begin, struct timeval *end)
192  	{
193  		struct timeval result;
194  		timersub(end, begin, &result);
195  		return (result.tv_sec * 1000) + (result.tv_usec / 1000);
196  	}
197  	
198  	static uint64_t dt_usec(const struct timeval *start, const struct timeval *stop)
199  	{
200  		uint64_t dt;
201  	
202  		dt = stop->tv_sec - start->tv_sec;
203  		dt *= 1000000;
204  		dt += stop->tv_usec - start->tv_usec;
205  		return dt;
206  	}
207  	
208  	static struct resource * rb_search_plock_resource(struct lockspace *ls, uint64_t number)
209  	{
210  		struct rb_node *n = ls->plock_resources_root.rb_node;
211  		struct resource *r;
212  	
(9) Event example_checked: Example 1: "n->rb_left" has its value checked in "n".
Also see events: [null_field][example_checked][example_checked][example_checked][example_checked][alias_transfer][dereference]
213  		while (n) {
214  			r = rb_entry(n, struct resource, rb_node);
215  			if (number < r->number)
216  				n = n->rb_left;
217  			else if (number > r->number)
218  				n = n->rb_right;
219  			else
220  				return r;
221  		}
222  		return NULL;
223  	}
224  	
225  	static void rb_insert_plock_resource(struct lockspace *ls, struct resource *r)
226  	{
227  		struct resource *entry;
228  		struct rb_node **p;
229  		struct rb_node *parent = NULL;
230  		
231  		p = &ls->plock_resources_root.rb_node;
232  		while (*p) {
233  			parent = *p;
234  			entry = rb_entry(parent, struct resource, rb_node);
235  			if (r->number < entry->number)
236  				p = &parent->rb_left;
237  			else if (r->number > entry->number)
238  				p = &parent->rb_right;
239  			else
240  				return; 
241  		}
242  		rb_link_node(&r->rb_node, parent, p);
243  		rb_insert_color(&r->rb_node, &ls->plock_resources_root);
244  	}
245  	
246  	static void rb_del_plock_resource(struct lockspace *ls, struct resource *r)
247  	{
248  		if (!RB_EMPTY_NODE(&r->rb_node)) {
249  			rb_erase(&r->rb_node, &ls->plock_resources_root);
250  			RB_CLEAR_NODE(&r->rb_node);
251  		}
252  	}
253  	
254  	static struct resource *search_resource(struct lockspace *ls, uint64_t number)
255  	{
256  		struct resource *r;
257  	
258  		list_for_each_entry(r, &ls->plock_resources, list) {
259  			if (r->number == number)
260  				return r;
261  		}
262  		return NULL;
263  	}
264  	
265  	static int find_resource(struct lockspace *ls, uint64_t number, int create,
266  				 struct resource **r_out)
267  	{
268  		struct resource *r = NULL;
269  		int rv = 0;
270  	
271  		r = rb_search_plock_resource(ls, number);
272  		if (r)
273  			goto out;
274  	
275  		if (create == 0) {
276  			rv = -ENOENT;
277  			goto out;
278  		}
279  	
280  		r = malloc(sizeof(struct resource));
281  		if (!r) {
282  			log_elock(ls, "find_resource no memory %d", errno);
283  			rv = -ENOMEM;
284  			goto out;
285  		}
286  	
287  		memset(r, 0, sizeof(struct resource));
288  		r->number = number;
289  		INIT_LIST_HEAD(&r->locks);
290  		INIT_LIST_HEAD(&r->waiters);
291  		INIT_LIST_HEAD(&r->pending);
292  	
293  		if (opt(plock_ownership_ind))
294  			r->owner = -1;
295  		else
296  			r->owner = 0;
297  	
298  		list_add_tail(&r->list, &ls->plock_resources);
299  		rb_insert_plock_resource(ls, r);
300  	 out:
301  		if (r)
302  			gettimeofday(&r->last_access, NULL);
303  		*r_out = r;
304  		return rv;
305  	}
306  	
307  	static void put_resource(struct lockspace *ls, struct resource *r)
308  	{
309  		/* with ownership, resources are only freed via drop messages */
310  		if (opt(plock_ownership_ind))
311  			return;
312  	
313  		if (list_empty(&r->locks) && list_empty(&r->waiters)) {
314  			rb_del_plock_resource(ls, r);
315  			list_del(&r->list);
316  			free(r);
317  		}
318  	}
319  	
320  	static inline int ranges_overlap(uint64_t start1, uint64_t end1,
321  					 uint64_t start2, uint64_t end2)
322  	{
323  		if (end1 < start2 || start1 > end2)
324  			return 0;
325  		return 1;
326  	}
327  	
328  	/**
329  	 * overlap_type - returns a value based on the type of overlap
330  	 * @s1 - start of new lock range
331  	 * @e1 - end of new lock range
332  	 * @s2 - start of existing lock range
333  	 * @e2 - end of existing lock range
334  	 *
335  	 */
336  	
337  	static int overlap_type(uint64_t s1, uint64_t e1, uint64_t s2, uint64_t e2)
338  	{
339  		int ret;
340  	
341  		/*
342  		 * ---r1---
343  		 * ---r2---
344  		 */
345  	
346  		if (s1 == s2 && e1 == e2)
347  			ret = 0;
348  	
349  		/*
350  		 * --r1--
351  		 * ---r2---
352  		 */
353  	
354  		else if (s1 == s2 && e1 < e2)
355  			ret = 1;
356  	
357  		/*
358  		 *   --r1--
359  		 * ---r2---
360  		 */
361  	
362  		else if (s1 > s2 && e1 == e2)
363  			ret = 1;
364  	
365  		/*
366  		 *  --r1--
367  		 * ---r2---
368  		 */
369  	
370  		else if (s1 > s2 && e1 < e2)
371  			ret = 2;
372  	
373  		/*
374  		 * ---r1---  or  ---r1---  or  ---r1---
375  		 * --r2--	  --r2--       --r2--
376  		 */
377  	
378  		else if (s1 <= s2 && e1 >= e2)
379  			ret = 3;
380  	
381  		/*
382  		 *   ---r1---
383  		 * ---r2---
384  		 */
385  	
386  		else if (s1 > s2 && e1 > e2)
387  			ret = 4;
388  	
389  		/*
390  		 * ---r1---
391  		 *   ---r2---
392  		 */
393  	
394  		else if (s1 < s2 && e1 < e2)
395  			ret = 4;
396  	
397  		else
398  			ret = -1;
399  	
400  		return ret;
401  	}
402  	
403  	/* shrink the range start2:end2 by the partially overlapping start:end */
404  	
405  	static int shrink_range2(uint64_t *start2, uint64_t *end2,
406  				 uint64_t start, uint64_t end)
407  	{
408  		int error = 0;
409  	
410  		if (*start2 < start)
411  			*end2 = start - 1;
412  		else if (*end2 > end)
413  			*start2 =  end + 1;
414  		else
415  			error = -1;
416  		return error;
417  	}
418  	
419  	static int shrink_range(struct posix_lock *po, uint64_t start, uint64_t end)
420  	{
421  		return shrink_range2(&po->start, &po->end, start, end);
422  	}
423  	
424  	static int is_conflict(struct resource *r, struct dlm_plock_info *in, int get)
425  	{
426  		struct posix_lock *po;
427  	
428  		list_for_each_entry(po, &r->locks, list) {
429  			if (po->nodeid == in->nodeid && po->owner == in->owner)
430  				continue;
431  			if (!ranges_overlap(po->start, po->end, in->start, in->end))
432  				continue;
433  	
434  			if (in->ex || po->ex) {
435  				if (get) {
436  					in->ex = po->ex;
437  					in->pid = po->pid;
438  					in->start = po->start;
439  					in->end = po->end;
440  				}
441  				return 1;
442  			}
443  		}
444  		return 0;
445  	}
446  	
447  	static int add_lock(struct resource *r, uint32_t nodeid, uint64_t owner,
448  			    uint32_t pid, int ex, uint64_t start, uint64_t end)
449  	{
450  		struct posix_lock *po;
451  	
452  		po = malloc(sizeof(struct posix_lock));
453  		if (!po)
454  			return -ENOMEM;
455  	
456  		po->start = start;
457  		po->end = end;
458  		po->nodeid = nodeid;
459  		po->owner = owner;
460  		po->pid = pid;
461  		po->ex = ex;
462  		po->flags = 0;
463  		list_add_tail(&po->list, &r->locks);
464  	
465  		return 0;
466  	}
467  	
468  	/* RN within RE (and starts or ends on RE boundary)
469  	   1. add new lock for non-overlap area of RE, orig mode
470  	   2. convert RE to RN range and mode */
471  	
472  	static int lock_case1(struct posix_lock *po, struct resource *r,
473  			      struct dlm_plock_info *in)
474  	{
475  		uint64_t start2, end2;
476  		int rv;
477  	
478  		/* non-overlapping area start2:end2 */
479  		start2 = po->start;
480  		end2 = po->end;
481  		rv = shrink_range2(&start2, &end2, in->start, in->end);
482  		if (rv)
483  			goto out;
484  	
485  		po->start = in->start;
486  		po->end = in->end;
487  		po->ex = in->ex;
488  	
489  		rv = add_lock(r, in->nodeid, in->owner, in->pid, !in->ex, start2, end2);
490  	 out:
491  		return rv;
492  	}
493  	
494  	/* RN within RE (RE overlaps RN on both sides)
495  	   1. add new lock for front fragment, orig mode
496  	   2. add new lock for back fragment, orig mode
497  	   3. convert RE to RN range and mode */
498  				 
499  	static int lock_case2(struct posix_lock *po, struct resource *r,
500  			      struct dlm_plock_info *in)
501  	
502  	{
503  		int rv;
504  	
505  		rv = add_lock(r, in->nodeid, in->owner, in->pid,
506  			      !in->ex, po->start, in->start - 1);
507  		if (rv)
508  			goto out;
509  	
510  		rv = add_lock(r, in->nodeid, in->owner, in->pid,
511  			      !in->ex, in->end + 1, po->end);
512  		if (rv)
513  			goto out;
514  	
515  		po->start = in->start;
516  		po->end = in->end;
517  		po->ex = in->ex;
518  	 out:
519  		return rv;
520  	}
521  	
522  	static int lock_internal(struct lockspace *ls, struct resource *r,
523  				 struct dlm_plock_info *in)
524  	{
525  		struct posix_lock *po, *safe;
526  		int rv = 0;
527  	
528  		list_for_each_entry_safe(po, safe, &r->locks, list) {
529  			if (po->nodeid != in->nodeid || po->owner != in->owner)
530  				continue;
531  			if (!ranges_overlap(po->start, po->end, in->start, in->end))
532  				continue;
533  	
534  			/* existing range (RE) overlaps new range (RN) */
535  	
536  			switch(overlap_type(in->start, in->end, po->start, po->end)) {
537  	
538  			case 0:
539  				if (po->ex == in->ex)
540  					goto out;
541  	
542  				/* ranges the same - just update the existing lock */
543  				po->ex = in->ex;
544  				goto out;
545  	
546  			case 1:
547  				if (po->ex == in->ex)
548  					goto out;
549  	
550  				rv = lock_case1(po, r, in);
551  				goto out;
552  	
553  			case 2:
554  				if (po->ex == in->ex)
555  					goto out;
556  	
557  				rv = lock_case2(po, r, in);
558  				goto out;
559  	
560  			case 3:
561  				list_del(&po->list);
562  				free(po);
563  				break;
564  	
565  			case 4:
566  				if (po->start < in->start)
567  					po->end = in->start - 1;
568  				else
569  					po->start = in->end + 1;
570  				break;
571  	
572  			default:
573  				rv = -1;
574  				goto out;
575  			}
576  		}
577  	
578  		rv = add_lock(r, in->nodeid, in->owner, in->pid,
579  			      in->ex, in->start, in->end);
580  	 out:
581  		return rv;
582  	
583  	}
584  	
585  	static int unlock_internal(struct lockspace *ls, struct resource *r,
586  				   struct dlm_plock_info *in)
587  	{
588  		struct posix_lock *po, *safe;
589  		int rv = 0;
590  	
591  		list_for_each_entry_safe(po, safe, &r->locks, list) {
592  			if (po->nodeid != in->nodeid || po->owner != in->owner)
593  				continue;
594  			if (!ranges_overlap(po->start, po->end, in->start, in->end))
595  				continue;
596  	
597  			/* existing range (RE) overlaps new range (RN) */
598  	
599  			switch (overlap_type(in->start, in->end, po->start, po->end)) {
600  	
601  			case 0:
602  				/* ranges the same - just remove the existing lock */
603  	
604  				list_del(&po->list);
605  				free(po);
606  				goto out;
607  	
608  			case 1:
609  				/* RN within RE and starts or ends on RE boundary -
610  				 * shrink and update RE */
611  	
612  				rv = shrink_range(po, in->start, in->end);
613  				goto out;
614  	
615  			case 2:
616  				/* RN within RE - shrink and update RE to be front
617  				 * fragment, and add a new lock for back fragment */
618  	
619  				rv = add_lock(r, in->nodeid, in->owner, in->pid,
620  					      po->ex, in->end + 1, po->end);
621  				po->end = in->start - 1;
622  				goto out;
623  	
624  			case 3:
625  				/* RE within RN - remove RE, then continue checking
626  				 * because RN could cover other locks */
627  	
628  				list_del(&po->list);
629  				free(po);
630  				continue;
631  	
632  			case 4:
633  				/* front of RE in RN, or end of RE in RN - shrink and
634  				 * update RE, then continue because RN could cover
635  				 * other locks */
636  	
637  				rv = shrink_range(po, in->start, in->end);
638  				continue;
639  	
640  			default:
641  				rv = -1;
642  				goto out;
643  			}
644  		}
645  	 out:
646  		return rv;
647  	}
648  	
649  	static void clear_waiters(struct lockspace *ls, struct resource *r,
650  				  struct dlm_plock_info *in)
651  	{
652  		struct lock_waiter *w, *safe;
653  	
654  		list_for_each_entry_safe(w, safe, &r->waiters, list) {
655  			if (w->info.nodeid != in->nodeid || w->info.owner != in->owner)
656  				continue;
657  	
658  			list_del(&w->list);
659  	
660  			log_dlock(ls, "clear waiter %llx %llx-%llx %d/%u/%llx",
661  				  (unsigned long long)in->number,
662  				  (unsigned long long)in->start,
663  				  (unsigned long long)in->end,
664  				  in->nodeid, in->pid,
665  				  (unsigned long long)in->owner);
666  			free(w);
667  		}
668  	}
669  	
670  	static int add_waiter(struct lockspace *ls, struct resource *r,
671  			      struct dlm_plock_info *in)
672  	
673  	{
674  		struct lock_waiter *w;
675  	
676  		w = malloc(sizeof(struct lock_waiter));
677  		if (!w)
678  			return -ENOMEM;
679  		memcpy(&w->info, in, sizeof(struct dlm_plock_info));
680  		w->flags = 0;
681  		list_add_tail(&w->list, &r->waiters);
682  		return 0;
683  	}
684  	
685  	static void write_result(struct dlm_plock_info *in, int rv)
686  	{
687  		int write_rv;
688  	
689  		in->rv = rv;
690  		write_rv = write(plock_device_fd, in, sizeof(struct dlm_plock_info));
691  		if (write_rv < 0)
692  			log_debug("write_result: write error %d fd %d\n",
693  				  errno, plock_device_fd);
694  	}
695  	
696  	static void do_waiters(struct lockspace *ls, struct resource *r)
697  	{
698  		struct lock_waiter *w, *safe;
699  		struct dlm_plock_info *in;
700  		int rv;
701  	
702  		list_for_each_entry_safe(w, safe, &r->waiters, list) {
703  			in = &w->info;
704  	
705  			if (is_conflict(r, in, 0))
706  				continue;
707  	
708  			list_del(&w->list);
709  	
710  			/*
711  			log_group(ls, "take waiter %llx %llx-%llx %d/%u/%llx",
712  				  in->number, in->start, in->end,
713  				  in->nodeid, in->pid, in->owner);
714  			*/
715  	
716  			rv = lock_internal(ls, r, in);
717  	
718  			if (in->nodeid == our_nodeid)
719  				write_result(in, rv);
720  	
721  			free(w);
722  		}
723  	}
724  	
725  	static void do_lock(struct lockspace *ls, struct dlm_plock_info *in,
726  			    struct resource *r)
727  	{
728  		int rv;
729  	
730  		if (is_conflict(r, in, 0)) {
731  			if (!in->wait)
732  				rv = -EAGAIN;
733  			else {
734  				rv = add_waiter(ls, r, in);
735  				if (rv)
736  					goto out;
737  				rv = -EINPROGRESS;
738  			}
739  		} else
740  			rv = lock_internal(ls, r, in);
741  	
742  	 out:
743  		if (in->nodeid == our_nodeid && rv != -EINPROGRESS)
744  			write_result(in, rv);
745  	
746  		do_waiters(ls, r);
747  		put_resource(ls, r);
748  	}
749  	
750  	static int remove_waiter(const struct resource *r, const struct dlm_plock_info *in)
751  	{
752  		struct lock_waiter *w;
753  	
754  		list_for_each_entry(w, &r->waiters, list) {
755  			if (w->info.nodeid == in->nodeid &&
756  			    w->info.fsid == in->fsid &&
757  			    w->info.number == in->number &&
758  			    w->info.owner == in->owner &&
759  			    w->info.pid == in->pid &&
760  			    w->info.start == in->start &&
761  			    w->info.end == in->end &&
762  			    w->info.ex == in->ex) {
763  				list_del(&w->list);
764  				free(w);
765  				return 0;
766  			}
767  		}
768  	
769  		return -ENOENT;
770  	}
771  	
772  	static void do_cancel(struct lockspace *ls, struct dlm_plock_info *in,
773  			      struct resource *r)
774  	{
775  		int rv;
776  	
777  		rv = remove_waiter(r, in);
778  		if (in->nodeid == our_nodeid)
779  			write_result(in, rv);
780  	
781  		put_resource(ls, r);
782  	}
783  	
784  	static void do_unlock(struct lockspace *ls, struct dlm_plock_info *in,
785  			      struct resource *r)
786  	{
787  		int rv;
788  	
789  		rv = unlock_internal(ls, r, in);
790  	
791  		if (in->flags & DLM_PLOCK_FL_CLOSE) {
792  			clear_waiters(ls, r, in);
793  			/* no replies for unlock-close ops */
794  			goto skip_result;
795  		}
796  	
797  		if (in->nodeid == our_nodeid)
798  			write_result(in, rv);
799  	
800  	 skip_result:
801  		do_waiters(ls, r);
802  		put_resource(ls, r);
803  	}
804  	
805  	/* we don't even get to this function if the getlk isn't from us */
806  	
807  	static void do_get(struct lockspace *ls, struct dlm_plock_info *in,
808  			   struct resource *r)
809  	{
810  		int rv;
811  	
812  		if (is_conflict(r, in, 1))
813  			rv = 1;
814  		else
815  			rv = 0;
816  	
817  		write_result(in, rv);
818  		put_resource(ls, r);
819  	}
820  	
821  	static void save_message(struct lockspace *ls, struct dlm_header *hd, int len,
822  				 int from, int type)
823  	{
824  		struct save_msg *sm;
825  	
826  		sm = malloc(sizeof(struct save_msg) + len);
827  		if (!sm)
828  			return;
829  		memset(sm, 0, sizeof(struct save_msg) + len);
830  	
831  		memcpy(&sm->buf, hd, len);
832  		sm->type = type;
833  		sm->len = len;
834  		sm->nodeid = from;
835  	
836  		log_plock(ls, "save %s from %d len %d", msg_name(type), from, len);
837  	
838  		list_add_tail(&sm->list, &ls->saved_messages);
839  	}
840  	
841  	static void __receive_plock(struct lockspace *ls, struct dlm_plock_info *in,
842  				    int from, struct resource *r)
843  	{
844  		switch (in->optype) {
845  		case DLM_PLOCK_OP_LOCK:
846  			ls->last_plock_time = monotime();
847  			do_lock(ls, in, r);
848  			break;
849  		case DLM_PLOCK_OP_CANCEL:
850  			ls->last_plock_time = monotime();
851  			do_cancel(ls, in, r);
852  			break;
853  		case DLM_PLOCK_OP_UNLOCK:
854  			ls->last_plock_time = monotime();
855  			do_unlock(ls, in, r);
856  			break;
857  		case DLM_PLOCK_OP_GET:
858  			do_get(ls, in, r);
859  			break;
860  		default:
861  			log_elock(ls, "receive_plock error from %d optype %d",
862  				  from, in->optype);
863  			if (from == our_nodeid)
864  				write_result(in, -EINVAL);
865  		}
866  	}
867  	
868  	/* When ls members receive our options message (for our mount), one of them
869  	   saves all plock state received to that point in a checkpoint and then sends
870  	   us our journals message.  We know to retrieve the plock state from the
871  	   checkpoint when we receive our journals message.  Any plocks messages that
872  	   arrive between seeing our options message and our journals message needs to
873  	   be saved and processed after we synchronize our plock state from the
874  	   checkpoint.  Any plock message received while we're mounting but before we
875  	   set save_plocks (when we see our options message) can be ignored because it
876  	   should be reflected in the checkpointed state. */
877  	
878  	static void _receive_plock(struct lockspace *ls, struct dlm_header *hd, int len)
879  	{
880  		struct dlm_plock_info info;
881  		struct resource *r = NULL;
882  		struct timeval now;
883  		uint64_t usec;
884  		int from = hd->nodeid;
885  		int rv, create;
886  	
887  		memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
888  		info_bswap_in(&info);
889  	
890  		log_plock(ls, "receive plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
891  			  (unsigned long long)info.number,
892  			  op_str(info.optype),
893  			  ex_str(info.optype, info.ex),
894  			  (unsigned long long)info.start, (unsigned long long)info.end,
895  			  info.nodeid, info.pid, (unsigned long long)info.owner,
896  			  info.wait);
897  	
898  		plock_recv_count++;
899  		if (!(plock_recv_count % 1000)) {
900  			gettimeofday(&now, NULL);
901  			usec = dt_usec(&plock_recv_time, &now);
902  			log_plock(ls, "plock_recv_count %u time %.3f s",
903  				  plock_recv_count, usec * 1.e-6);
904  			plock_recv_time = now;
905  		}
906  	
907  		if (info.optype == DLM_PLOCK_OP_GET && from != our_nodeid)
908  			return;
909  	
910  		if (from != hd->nodeid || from != info.nodeid) {
911  			log_elock(ls, "receive_plock error from %d header %d info %d",
912  				  from, hd->nodeid, info.nodeid);
913  			return;
914  		}
915  	
916  		create = !opt(plock_ownership_ind);
917  	
918  		rv = find_resource(ls, info.number, create, &r);
919  	
920  		if (rv && opt(plock_ownership_ind)) {
921  			/* There must have been a race with a drop, so we need to
922  			   ignore this plock op which will be resent.  If we're the one
923  			   who sent the plock, we need to send_own() and put it on the
924  			   pending list to resend once the owner is established. */
925  	
926  			log_plock(ls, "receive_plock from %d no r %llx", from,
927  				  (unsigned long long)info.number);
928  	
929  			if (from != our_nodeid)
930  				return;
931  	
932  			rv = find_resource(ls, info.number, 1, &r);
933  			if (rv)
934  				return;
935  			send_own(ls, r, our_nodeid);
936  			save_pending_plock(ls, r, &info);
937  			return;
938  		}
939  		if (rv) {
940  			/* r not found, rv is -ENOENT, this shouldn't happen because
941  			   process_plocks() creates a resource for every op */
942  	
943  			log_elock(ls, "receive_plock error from %d no r %llx %d",
944  				  from, (unsigned long long)info.number, rv);
945  			return;
946  		}
947  	
948  		/* The owner should almost always be 0 here, but other owners may
949  		   be possible given odd combinations of races with drop.  Odd races to
950  		   worry about (some seem pretty improbable):
951  	
952  		   - A sends drop, B sends plock, receive drop, receive plock.
953  		   This is addressed above.
954  	
955  		   - A sends drop, B sends plock, receive drop, B reads plock
956  		   and sends own, receive plock, on B we find owner of -1.
957  	
958  		   - A sends drop, B sends two plocks, receive drop, receive plocks.
959  		   Receiving the first plock is the previous case, receiving the
960  		   second plock will find r with owner of -1.
961  	
962  		   - A sends drop, B sends two plocks, receive drop, C sends own,
963  		   receive plock, B sends own, receive own (C), receive plock,
964  		   receive own (B).
965  	
966  		   Haven't tried to cook up a scenario that would lead to the
967  		   last case below; receiving a plock from ourself and finding
968  		   we're the owner of r. */
969  	
970  		if (!r->owner) {
971  			__receive_plock(ls, &info, from, r);
972  	
973  		} else if (r->owner == -1) {
974  			log_plock(ls, "receive_plock from %d r %llx owner %d", from,
975  				  (unsigned long long)info.number, r->owner);
976  	
977  			if (from == our_nodeid)
978  				save_pending_plock(ls, r, &info);
979  	
980  		} else if (r->owner != our_nodeid) {
981  			log_plock(ls, "receive_plock from %d r %llx owner %d", from,
982  				  (unsigned long long)info.number, r->owner);
983  	
984  			if (from == our_nodeid)
985  				save_pending_plock(ls, r, &info);
986  	
987  		} else if (r->owner == our_nodeid) {
988  			log_plock(ls, "receive_plock from %d r %llx owner %d", from,
989  				  (unsigned long long)info.number, r->owner);
990  	
991  			if (from == our_nodeid)
992  				__receive_plock(ls, &info, from, r);
993  		}
994  	}
995  	
996  	void receive_plock(struct lockspace *ls, struct dlm_header *hd, int len)
997  	{
998  		if (ls->save_plocks) {
999  			save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK);
1000 			return;
1001 		}
1002 	
1003 		_receive_plock(ls, hd, len);
1004 	}
1005 	
1006 	static int send_struct_info(struct lockspace *ls, struct dlm_plock_info *in,
1007 				    int msg_type)
1008 	{
1009 		struct dlm_header *hd;
1010 		int rv = 0, len;
1011 		char *buf;
1012 	
1013 		len = sizeof(struct dlm_header) + sizeof(struct dlm_plock_info);
1014 		buf = malloc(len);
1015 		if (!buf) {
1016 			rv = -ENOMEM;
1017 			goto out;
1018 		}
1019 		memset(buf, 0, len);
1020 	
1021 		info_bswap_out(in);
1022 	
1023 		hd = (struct dlm_header *)buf;
1024 		hd->type = msg_type;
1025 	
1026 		memcpy(buf + sizeof(struct dlm_header), in, sizeof(*in));
1027 	
1028 		dlm_send_message(ls, buf, len);
1029 	
1030 		free(buf);
1031 	 out:
1032 		if (rv)
1033 			log_elock(ls, "send_struct_info error %d", rv);
1034 		return rv;
1035 	}
1036 	
1037 	static void send_plock(struct lockspace *ls, struct resource *r,
1038 			       struct dlm_plock_info *in)
1039 	{
1040 		send_struct_info(ls, in, DLM_MSG_PLOCK);
1041 	}
1042 	
1043 	static void send_own(struct lockspace *ls, struct resource *r, int owner)
1044 	{
1045 		struct dlm_plock_info info;
1046 	
1047 		/* if we've already sent an own message for this resource,
1048 		   (pending list is not empty), then we shouldn't send another */
1049 	
1050 		if (!list_empty(&r->pending)) {
1051 			log_plock(ls, "send_own %llx already pending",
1052 				  (unsigned long long)r->number);
1053 			return;
1054 		}
1055 	
1056 		if (!owner)
1057 			r->flags |= R_SEND_UNOWN;
1058 		else
1059 			r->flags |= R_SEND_OWN;
1060 	
1061 		memset(&info, 0, sizeof(info));
1062 		info.number = r->number;
1063 		info.nodeid = owner;
1064 	
1065 		send_struct_info(ls, &info, DLM_MSG_PLOCK_OWN);
1066 	}
1067 	
1068 	static void send_syncs(struct lockspace *ls, struct resource *r)
1069 	{
1070 		struct dlm_plock_info info;
1071 		struct posix_lock *po;
1072 		struct lock_waiter *w;
1073 		int rv;
1074 	
1075 		list_for_each_entry(po, &r->locks, list) {
1076 			memset(&info, 0, sizeof(info));
1077 			info.number    = r->number;
1078 			info.start     = po->start;
1079 			info.end       = po->end;
1080 			info.nodeid    = po->nodeid;
1081 			info.owner     = po->owner;
1082 			info.pid       = po->pid;
1083 			info.ex        = po->ex;
1084 	
1085 			rv = send_struct_info(ls, &info, DLM_MSG_PLOCK_SYNC_LOCK);
1086 			if (rv)
1087 				goto out;
1088 	
1089 			po->flags |= P_SYNCING;
1090 		}
1091 	
1092 		list_for_each_entry(w, &r->waiters, list) {
1093 			memcpy(&info, &w->info, sizeof(info));
1094 	
1095 			rv = send_struct_info(ls, &info, DLM_MSG_PLOCK_SYNC_WAITER);
1096 			if (rv)
1097 				goto out;
1098 	
1099 			w->flags |= P_SYNCING;
1100 		}
1101 	 out:
1102 		return;
1103 	}
1104 	
1105 	static void send_drop(struct lockspace *ls, struct resource *r)
1106 	{
1107 		struct dlm_plock_info info;
1108 	
1109 		memset(&info, 0, sizeof(info));
1110 		info.number = r->number;
1111 		r->flags |= R_SEND_DROP;
1112 	
1113 		send_struct_info(ls, &info, DLM_MSG_PLOCK_DROP);
1114 	}
1115 	
1116 	/* plock op can't be handled until we know the owner value of the resource,
1117 	   so the op is saved on the pending list until the r owner is established */
1118 	
1119 	static void save_pending_plock(struct lockspace *ls, struct resource *r,
1120 				       struct dlm_plock_info *in)
1121 	{
1122 		struct lock_waiter *w;
1123 	
1124 		w = malloc(sizeof(struct lock_waiter));
1125 		if (!w) {
1126 			log_elock(ls, "save_pending_plock no mem");
1127 			return;
1128 		}
1129 		memcpy(&w->info, in, sizeof(struct dlm_plock_info));
1130 		w->flags = 0;
1131 		list_add_tail(&w->list, &r->pending);
1132 	}
1133 	
1134 	/* plock ops are on pending list waiting for ownership to be established.
1135 	   owner has now become us, so add these plocks to r */
1136 	
1137 	static void add_pending_plocks(struct lockspace *ls, struct resource *r)
1138 	{
1139 		struct lock_waiter *w, *safe;
1140 	
1141 		list_for_each_entry_safe(w, safe, &r->pending, list) {
1142 			__receive_plock(ls, &w->info, our_nodeid, r);
1143 			list_del(&w->list);
1144 			free(w);
1145 		}
1146 	}
1147 	
1148 	/* plock ops are on pending list waiting for ownership to be established.
1149 	   owner has now become 0, so send these plocks to everyone */
1150 	
1151 	static void send_pending_plocks(struct lockspace *ls, struct resource *r)
1152 	{
1153 		struct lock_waiter *w, *safe;
1154 	
1155 		list_for_each_entry_safe(w, safe, &r->pending, list) {
1156 			send_plock(ls, r, &w->info);
1157 			list_del(&w->list);
1158 			free(w);
1159 		}
1160 	}
1161 	
1162 	static void _receive_own(struct lockspace *ls, struct dlm_header *hd, int len)
1163 	{
1164 		struct dlm_plock_info info;
1165 		struct resource *r;
1166 		int should_not_happen = 0;
1167 		int from = hd->nodeid;
1168 		int rv;
1169 	
1170 		ls->last_plock_time = monotime();
1171 	
1172 		memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
1173 		info_bswap_in(&info);
1174 	
1175 		log_plock(ls, "receive_own %llx from %u owner %u",
1176 			  (unsigned long long)info.number, hd->nodeid, info.nodeid);
1177 	
1178 		rv = find_resource(ls, info.number, 1, &r);
1179 		if (rv)
1180 			return;
1181 	
1182 		if (from == our_nodeid) {
1183 			/*
1184 			 * received our own own message
1185 			 */
1186 	
1187 			if (info.nodeid == 0) {
1188 				/* we are setting owner to 0 */
1189 	
1190 				if (r->owner == our_nodeid) {
1191 					/* we set owner to 0 when we relinquish
1192 					   ownership */
1193 					should_not_happen = 1;
1194 				} else if (r->owner == 0) {
1195 					/* this happens when we relinquish ownership */
1196 					r->flags |= R_GOT_UNOWN;
1197 				} else {
1198 					should_not_happen = 1;
1199 				}
1200 	
1201 			} else if (info.nodeid == our_nodeid) {
1202 				/* we are setting owner to ourself */
1203 	
1204 				if (r->owner == -1) {
1205 					/* we have gained ownership */
1206 					r->owner = our_nodeid;
1207 					add_pending_plocks(ls, r);
1208 				} else if (r->owner == our_nodeid) {
1209 					should_not_happen = 1;
1210 				} else if (r->owner == 0) {
1211 					send_pending_plocks(ls, r);
1212 				} else {
1213 					/* resource is owned by other node;
1214 					   they should set owner to 0 shortly */
1215 				}
1216 	
1217 			} else {
1218 				/* we should only ever set owner to 0 or ourself */
1219 				should_not_happen = 1;
1220 			}
1221 		} else {
1222 			/*
1223 			 * received own message from another node
1224 			 */
1225 	
1226 			if (info.nodeid == 0) {
1227 				/* other node is setting owner to 0 */
1228 	
1229 				if (r->owner == -1) {
1230 					/* we should have a record of the owner before
1231 					   it relinquishes */
1232 					should_not_happen = 1;
1233 				} else if (r->owner == our_nodeid) {
1234 					/* only the owner should relinquish */
1235 					should_not_happen = 1;
1236 				} else if (r->owner == 0) {
1237 					should_not_happen = 1;
1238 				} else {
1239 					r->owner = 0;
1240 					r->flags |= R_GOT_UNOWN;
1241 					send_pending_plocks(ls, r);
1242 				}
1243 	
1244 			} else if (info.nodeid == from) {
1245 				/* other node is setting owner to itself */
1246 	
1247 				if (r->owner == -1) {
1248 					/* normal path for a node becoming owner */
1249 					r->owner = from;
1250 				} else if (r->owner == our_nodeid) {
1251 					/* we relinquish our ownership: sync our local
1252 					   plocks to everyone, then set owner to 0 */
1253 					send_syncs(ls, r);
1254 					send_own(ls, r, 0);
1255 					/* we need to set owner to 0 here because
1256 					   local ops may arrive before we receive
1257 					   our send_own message and can't be added
1258 					   locally */
1259 					r->owner = 0;
1260 				} else if (r->owner == 0) {
1261 					/* can happen because we set owner to 0 before
1262 					   we receive our send_own sent just above */
1263 				} else {
1264 					/* do nothing, current owner should be
1265 					   relinquishing its ownership */
1266 				}
1267 	
1268 			} else if (info.nodeid == our_nodeid) {
1269 				/* no one else should try to set the owner to us */
1270 				should_not_happen = 1;
1271 			} else {
1272 				/* a node should only ever set owner to 0 or itself */
1273 				should_not_happen = 1;
1274 			}
1275 		}
1276 	
1277 		if (should_not_happen) {
1278 			log_elock(ls, "receive_own error from %u %llx "
1279 				  "info nodeid %d r owner %d",
1280 				  from, (unsigned long long)r->number,
1281 				  info.nodeid, r->owner);
1282 		}
1283 	}
1284 	
1285 	void receive_own(struct lockspace *ls, struct dlm_header *hd, int len)
1286 	{
1287 		if (ls->save_plocks) {
1288 			save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK_OWN);
1289 			return;
1290 		}
1291 	
1292 		_receive_own(ls, hd, len);
1293 	}
1294 	
1295 	static void clear_syncing_flag(struct lockspace *ls, struct resource *r,
1296 				       struct dlm_plock_info *in)
1297 	{
1298 		struct posix_lock *po;
1299 		struct lock_waiter *w;
1300 	
1301 		list_for_each_entry(po, &r->locks, list) {
1302 			if ((po->flags & P_SYNCING) &&
1303 			    in->start  == po->start &&
1304 			    in->end    == po->end &&
1305 			    in->nodeid == po->nodeid &&
1306 			    in->owner  == po->owner &&
1307 			    in->pid    == po->pid &&
1308 			    in->ex     == po->ex) {
1309 				po->flags &= ~P_SYNCING;
1310 				return;
1311 			}
1312 		}
1313 	
1314 		list_for_each_entry(w, &r->waiters, list) {
1315 			if ((w->flags & P_SYNCING) &&
1316 			    in->start  == w->info.start &&
1317 			    in->end    == w->info.end &&
1318 			    in->nodeid == w->info.nodeid &&
1319 			    in->owner  == w->info.owner &&
1320 			    in->pid    == w->info.pid &&
1321 			    in->ex     == w->info.ex) {
1322 				w->flags &= ~P_SYNCING;
1323 				return;
1324 			}
1325 		}
1326 	
1327 		log_elock(ls, "clear_syncing error %llx no match %s %llx-%llx %d/%u/%llx",
1328 			  (unsigned long long)r->number,
1329 			  in->ex ? "WR" : "RD", 
1330 			  (unsigned long long)in->start,
1331 			  (unsigned long long)in->end,
1332 			  in->nodeid, in->pid,
1333 			  (unsigned long long)in->owner);
1334 	}
1335 	
1336 	static void _receive_sync(struct lockspace *ls, struct dlm_header *hd, int len)
1337 	{
1338 		struct dlm_plock_info info;
1339 		struct resource *r;
1340 		int from = hd->nodeid;
1341 		int rv;
1342 	
1343 		ls->last_plock_time = monotime();
1344 	
1345 		memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
1346 		info_bswap_in(&info);
1347 	
1348 		log_plock(ls, "receive sync %llx from %u %s %llx-%llx %d/%u/%llx",
1349 			  (unsigned long long)info.number, from, info.ex ? "WR" : "RD",
1350 			  (unsigned long long)info.start, (unsigned long long)info.end,
1351 			  info.nodeid, info.pid, (unsigned long long)info.owner);
1352 	
1353 		rv = find_resource(ls, info.number, 0, &r);
1354 		if (rv) {
1355 			log_elock(ls, "receive_sync error no r %llx from %d",
1356 				  info.number, from);
1357 			return;
1358 		}
1359 	
1360 		if (from == our_nodeid) {
1361 			/* this plock now in sync on all nodes */
1362 			clear_syncing_flag(ls, r, &info);
1363 			return;
1364 		}
1365 	
1366 		if (hd->type == DLM_MSG_PLOCK_SYNC_LOCK)
1367 			add_lock(r, info.nodeid, info.owner, info.pid, info.ex, 
1368 				 info.start, info.end);
1369 		else if (hd->type == DLM_MSG_PLOCK_SYNC_WAITER)
1370 			add_waiter(ls, r, &info);
1371 	}
1372 	
1373 	void receive_sync(struct lockspace *ls, struct dlm_header *hd, int len)
1374 	{
1375 		if (ls->save_plocks) {
1376 			save_message(ls, hd, len, hd->nodeid, hd->type);
1377 			return;
1378 		}
1379 	
1380 		_receive_sync(ls, hd, len);
1381 	}
1382 	
1383 	static void _receive_drop(struct lockspace *ls, struct dlm_header *hd, int len)
1384 	{
1385 		struct dlm_plock_info info;
1386 		struct resource *r;
1387 		int from = hd->nodeid;
1388 		int rv;
1389 	
1390 		ls->last_plock_time = monotime();
1391 	
1392 		memcpy(&info, (char *)hd + sizeof(struct dlm_header), sizeof(info));
1393 		info_bswap_in(&info);
1394 	
1395 		log_plock(ls, "receive_drop %llx from %u",
1396 			  (unsigned long long)info.number, from);
1397 	
1398 		rv = find_resource(ls, info.number, 0, &r);
1399 		if (rv) {
1400 			/* we'll find no r if two nodes sent drop at once */
1401 			log_plock(ls, "receive_drop from %d no r %llx", from,
1402 				  (unsigned long long)info.number);
1403 			return;
1404 		}
1405 	
1406 		if (r->owner != 0) {
1407 			/* - A sent drop, B sent drop, receive drop A, C sent own,
1408 			     receive drop B (this warning on C, owner -1)
1409 		   	   - A sent drop, B sent drop, receive drop A, A sent own,
1410 			     receive own A, receive drop B (this warning on all,
1411 			     owner A) */
1412 			log_plock(ls, "receive_drop from %d r %llx owner %d", from,
1413 				  (unsigned long long)r->number, r->owner);
1414 			return;
1415 		}
1416 	
1417 		if (!list_empty(&r->pending)) {
1418 			/* shouldn't happen */
1419 			log_elock(ls, "receive_drop error from %d r %llx pending op",
1420 				  from, (unsigned long long)r->number);
1421 			return;
1422 		}
1423 	
1424 		/* the decision to drop or not must be based on things that are
1425 		   guaranteed to be the same on all nodes */
1426 	
1427 		if (list_empty(&r->locks) && list_empty(&r->waiters)) {
1428 			rb_del_plock_resource(ls, r);
1429 			list_del(&r->list);
1430 			free(r);
1431 		} else {
1432 			/* A sent drop, B sent a plock, receive plock, receive drop */
1433 			log_plock(ls, "receive_drop from %d r %llx in use", from,
1434 				  (unsigned long long)r->number);
1435 		}
1436 	}
1437 	
1438 	void receive_drop(struct lockspace *ls, struct dlm_header *hd, int len)
1439 	{
1440 		if (ls->save_plocks) {
1441 			save_message(ls, hd, len, hd->nodeid, DLM_MSG_PLOCK_DROP);
1442 			return;
1443 		}
1444 	
1445 		_receive_drop(ls, hd, len);
1446 	}
1447 	
1448 	/* We only drop resources from the unowned state to simplify things.
1449 	   If we want to drop a resource we own, we unown/relinquish it first. */
1450 	
1451 	/* FIXME: in the transition from owner = us, to owner = 0, to drop;
1452 	   we want the second period to be shorter than the first */
1453 	
1454 	static int drop_resources(struct lockspace *ls)
1455 	{
1456 		struct resource *r;
1457 		struct timeval now;
1458 		int count = 0;
1459 	
1460 		if (!opt(plock_ownership_ind))
1461 			return 0;
1462 	
1463 		if (list_empty(&ls->plock_resources))
1464 			return 0;
1465 	
1466 		gettimeofday(&now, NULL);
1467 	
1468 		if (time_diff_ms(&ls->drop_resources_last, &now) <
1469 				 opt(drop_resources_time_ind))
1470 			return 1;
1471 	
1472 		ls->drop_resources_last = now;
1473 	
1474 		/* try to drop the oldest, unused resources */
1475 	
1476 		list_for_each_entry_reverse(r, &ls->plock_resources, list) {
1477 			if (count >= opt(drop_resources_count_ind))
1478 				break;
1479 			if (r->owner && r->owner != our_nodeid)
1480 				continue;
1481 			if (time_diff_ms(&r->last_access, &now) <
1482 			    opt(drop_resources_age_ind))
1483 				continue;
1484 	
1485 			if (list_empty(&r->locks) && list_empty(&r->waiters)) {
1486 				if (r->owner == our_nodeid) {
1487 					send_own(ls, r, 0);
1488 					r->owner = 0;
1489 				} else if (r->owner == 0 && got_unown(r)) {
1490 					send_drop(ls, r);
1491 				}
1492 	
1493 				count++;
1494 			}
1495 		}
1496 	
1497 		return 1;
1498 	}
1499 	
1500 	void drop_resources_all(void)
1501 	{
1502 		struct lockspace *ls;
1503 		int rv = 0;
1504 	
1505 		poll_drop_plock = 0;
1506 	
1507 		list_for_each_entry(ls, &lockspaces, list) {
1508 			rv = drop_resources(ls);
1509 			if (rv)
1510 				poll_drop_plock = 1;
1511 		}
1512 	}
1513 	
1514 	int limit_plocks(void)
1515 	{
1516 		struct timeval now;
1517 	
1518 		if (!opt(plock_rate_limit_ind) || !plock_read_count)
1519 			return 0;
1520 	
1521 		gettimeofday(&now, NULL);
1522 	
1523 		/* Every time a plock op is read from the kernel, we increment
1524 		   plock_read_count.  After every plock_rate_limit (N) reads,
1525 		   we check the time it's taken to do those N; if the time is less than
1526 		   a second, then we delay reading any more until a second is up.
1527 		   This way we read a max of N ops from the kernel every second. */
1528 	
1529 		if (!(plock_read_count % opt(plock_rate_limit_ind))) {
1530 			if (time_diff_ms(&plock_rate_last, &now) < 1000) {
1531 				plock_rate_delays++;
1532 				return 2;
1533 			}
1534 			plock_rate_last = now;
1535 			plock_read_count++;
1536 		}
1537 		return 0;
1538 	}
1539 	
1540 	void process_plocks(int ci)
1541 	{
1542 		struct lockspace *ls;
1543 		struct resource *r;
1544 		struct dlm_plock_info info;
1545 		struct timeval now;
1546 		uint64_t usec;
1547 		int create, rv;
1548 	
1549 		if (limit_plocks()) {
1550 			poll_ignore_plock = 1;
1551 			client_ignore(plock_ci, plock_fd);
1552 			return;
1553 		}
1554 	
1555 		gettimeofday(&now, NULL);
1556 	
1557 		rv = do_read(plock_device_fd, &info, sizeof(info));
1558 		if (rv < 0) {
1559 			log_debug("process_plocks: read error %d fd %d\n",
1560 				  errno, plock_device_fd);
1561 			return;
1562 		}
1563 	
1564 		/* kernel doesn't set the nodeid field */
1565 		info.nodeid = our_nodeid;
1566 	
1567 		if (!opt(enable_plock_ind)) {
1568 			rv = -ENOSYS;
1569 			goto fail;
1570 		}
1571 	
1572 		ls = find_ls_id(info.fsid);
1573 		if (!ls) {
1574 			log_plock(ls, "process_plocks: no ls id %x", info.fsid);
1575 			rv = -EEXIST;
1576 			goto fail;
1577 		}
1578 	
1579 		if (ls->disable_plock) {
1580 			rv = -ENOSYS;
1581 			goto fail;
1582 		}
1583 	
1584 		log_plock(ls, "read plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
1585 			  (unsigned long long)info.number,
1586 			  op_str(info.optype),
1587 			  ex_str(info.optype, info.ex),
1588 			  (unsigned long long)info.start, (unsigned long long)info.end,
1589 			  info.nodeid, info.pid, (unsigned long long)info.owner,
1590 			  info.wait);
1591 	
1592 		/* report plock rate and any delays since the last report */
1593 		plock_read_count++;
1594 		if (!(plock_read_count % 1000)) {
1595 			usec = dt_usec(&plock_read_time, &now) ;
1596 			log_plock(ls, "plock_read_count %u time %.3f s delays %u",
1597 				  plock_read_count, usec * 1.e-6, plock_rate_delays);
1598 			plock_read_time = now;
1599 			plock_rate_delays = 0;
1600 		}
1601 	
1602 		if (!opt(plock_ownership_ind)) {
1603 			send_plock(ls, NULL, &info);
1604 			return;
1605 		}
1606 	
1607 		create = (info.optype == DLM_PLOCK_OP_UNLOCK) ? 0 : 1;
1608 	
1609 		rv = find_resource(ls, info.number, create, &r);
1610 		if (rv)
1611 			goto fail;
1612 	
1613 		if (r->owner == 0) {
1614 			/* plock state replicated on all nodes */
1615 			send_plock(ls, r, &info);
1616 	
1617 		} else if (r->owner == our_nodeid) {
1618 			/* we are the owner of r, so our plocks are local */
1619 			__receive_plock(ls, &info, our_nodeid, r);
1620 	
1621 		} else {
1622 			/* r owner is -1: r is new, try to become the owner;
1623 			   r owner > 0: tell other owner to give up ownership;
1624 			   both done with a message trying to set owner to ourself */
1625 			send_own(ls, r, our_nodeid);
1626 			save_pending_plock(ls, r, &info);
1627 		}
1628 	
1629 		if (opt(plock_ownership_ind) && !list_empty(&ls->plock_resources))
1630 			poll_drop_plock = 1;
1631 		return;
1632 	
1633 	 fail:
1634 		if (!(info.flags & DLM_PLOCK_FL_CLOSE))
1635 			write_result(&info, rv);
1636 	}
1637 	
1638 	void process_saved_plocks(struct lockspace *ls)
1639 	{
1640 		struct save_msg *sm, *sm2;
1641 		struct dlm_header *hd;
1642 		int count = 0;
1643 	
1644 		log_plock(ls, "process_saved_plocks begin");
1645 	
1646 		if (list_empty(&ls->saved_messages))
1647 			goto out;
1648 	
1649 		list_for_each_entry_safe(sm, sm2, &ls->saved_messages, list) {
1650 			hd = (struct dlm_header *)sm->buf;
1651 	
1652 			switch (sm->type) {
1653 			case DLM_MSG_PLOCK:
1654 				_receive_plock(ls, hd, sm->len);
1655 				break;
1656 			case DLM_MSG_PLOCK_OWN:
1657 				_receive_own(ls, hd, sm->len);
1658 				break;
1659 			case DLM_MSG_PLOCK_DROP:
1660 				_receive_drop(ls, hd, sm->len);
1661 				break;
1662 			case DLM_MSG_PLOCK_SYNC_LOCK:
1663 			case DLM_MSG_PLOCK_SYNC_WAITER:
1664 				_receive_sync(ls, hd, sm->len);
1665 				break;
1666 			default:
1667 				continue;
1668 			}
1669 	
1670 			list_del(&sm->list);
1671 			free(sm);
1672 			count++;
1673 		}
1674 	 out:
1675 		log_plock(ls, "process_saved_plocks %d done", count);
1676 	}
1677 	
1678 	/* locks still marked SYNCING should not go into the ckpt; the new node
1679 	   will get those locks by receiving PLOCK_SYNC messages */
1680 	
1681 	#define MAX_SEND_SIZE 1024 /* 1024 holds 24 plock_data */
1682 	
1683 	static char send_buf[MAX_SEND_SIZE];
1684 	
1685 	static int pack_send_buf(struct lockspace *ls, struct resource *r, int owner,
1686 				 int full, int *count_out, void **last)
1687 	{
1688 		struct resource_data *rd;
1689 		struct plock_data *pp;
1690 		struct posix_lock *po;
1691 		struct lock_waiter *w;
1692 		int count = 0;
1693 		int find = 0;
1694 		int len;
1695 	
1696 		/* N.B. owner not always equal to r->owner */
1697 		rd = (struct resource_data *)(send_buf + sizeof(struct dlm_header));
1698 		rd->number = cpu_to_le64(r->number);
1699 		rd->owner = cpu_to_le32(owner);
1700 	
1701 		if (full) {
1702 			rd->flags = RD_CONTINUE;
1703 			find = 1;
1704 		}
1705 	
1706 		/* plocks not replicated for owned resources */
1707 		if (opt(plock_ownership_ind) && (owner == our_nodeid))
1708 			goto done;
1709 	
1710 		len = sizeof(struct dlm_header) + sizeof(struct resource_data);
1711 	
1712 		pp = (struct plock_data *)(send_buf + sizeof(struct dlm_header) + sizeof(struct resource_data));
1713 	
1714 		list_for_each_entry(po, &r->locks, list) {
1715 			if (find && *last != po)
1716 				continue;
1717 			find = 0;
1718 	
1719 			if (po->flags & P_SYNCING)
1720 				continue;
1721 	
1722 			if (len + sizeof(struct plock_data) > sizeof(send_buf)) {
1723 				*last = po;
1724 				goto full;
1725 			}
1726 			len += sizeof(struct plock_data);
1727 	
1728 			pp->start	= cpu_to_le64(po->start);
1729 			pp->end		= cpu_to_le64(po->end);
1730 			pp->owner	= cpu_to_le64(po->owner);
1731 			pp->pid		= cpu_to_le32(po->pid);
1732 			pp->nodeid	= cpu_to_le32(po->nodeid);
1733 			pp->ex		= po->ex;
1734 			pp->waiter	= 0;
1735 			pp++;
1736 			count++;
1737 		}
1738 	
1739 		list_for_each_entry(w, &r->waiters, list) {
1740 			if (find && *last != w)
1741 				continue;
1742 			find = 0;
1743 	
1744 			if (w->flags & P_SYNCING)
1745 				continue;
1746 	
1747 			if (len + sizeof(struct plock_data) > sizeof(send_buf)) {
1748 				*last = w;
1749 				goto full;
1750 			}
1751 			len += sizeof(struct plock_data);
1752 	
1753 			pp->start	= cpu_to_le64(w->info.start);
1754 			pp->end		= cpu_to_le64(w->info.end);
1755 			pp->owner	= cpu_to_le64(w->info.owner);
1756 			pp->pid		= cpu_to_le32(w->info.pid);
1757 			pp->nodeid	= cpu_to_le32(w->info.nodeid);
1758 			pp->ex		= w->info.ex;
1759 			pp->waiter	= 1;
1760 			pp++;
1761 			count++;
1762 		}
1763 	 done:
1764 		rd->lock_count = cpu_to_le32(count);
1765 		*count_out = count;
1766 		*last = NULL;
1767 		return 0;
1768 	
1769 	 full:
1770 		rd->lock_count = cpu_to_le32(count);
1771 		*count_out = count;
1772 		return 1;
1773 	}
1774 	
1775 	/* Copy all plock state into a checkpoint so new node can retrieve it.  The
1776 	   node creating the ckpt for the mounter needs to be the same node that's
1777 	   sending the mounter its journals message (i.e. the low nodeid).  The new
1778 	   mounter knows the ckpt is ready to read only after it gets its journals
1779 	   message.
1780 	 
1781 	   If the mounter is becoming the new low nodeid in the group, the node doing
1782 	   the store closes the ckpt and the new node unlinks the ckpt after reading
1783 	   it.  The ckpt should then disappear and the new node can create a new ckpt
1784 	   for the next mounter. */
1785 	
1786 	static int send_plocks_data(struct lockspace *ls, uint32_t seq, char *buf, int len)
1787 	{
1788 		struct dlm_header *hd;
1789 	
1790 		hd = (struct dlm_header *)buf;
1791 		hd->type = DLM_MSG_PLOCKS_DATA;
1792 		hd->msgdata = seq;
1793 	
1794 		dlm_send_message(ls, buf, len);
1795 	
1796 		return 0;
1797 	}
1798 	
1799 	void send_all_plocks_data(struct lockspace *ls, uint32_t seq, uint32_t *plocks_data)
1800 	{
1801 		struct resource *r;
1802 		void *last;
1803 		int owner, count, len, full;
1804 		uint32_t send_count = 0;
1805 	
1806 		if (!opt(enable_plock_ind) || ls->disable_plock)
1807 			return;
1808 	
1809 		log_dlock(ls, "send_all_plocks_data %d:%u", our_nodeid, seq);
1810 	
1811 		/* - If r owner is -1, ckpt nothing.
1812 		   - If r owner is us, ckpt owner of us and no plocks.
1813 		   - If r owner is other, ckpt that owner and any plocks we have on r
1814 		     (they've just been synced but owner=0 msg not recved yet).
1815 		   - If r owner is 0 and !got_unown, then we've just unowned r;
1816 		     ckpt owner of us and any plocks that don't have SYNCING set
1817 		     (plocks with SYNCING will be handled by our sync messages).
1818 		   - If r owner is 0 and got_unown, then ckpt owner 0 and all plocks;
1819 		     (there should be no SYNCING plocks) */
1820 	
1821 		list_for_each_entry(r, &ls->plock_resources, list) {
1822 			if (!opt(plock_ownership_ind))
1823 				owner = 0;
1824 			else if (r->owner == -1)
1825 				continue;
1826 			else if (r->owner == our_nodeid)
1827 				owner = our_nodeid;
1828 			else if (r->owner)
1829 				owner = r->owner;
1830 			else if (!r->owner && !got_unown(r))
1831 				owner = our_nodeid;
1832 			else if (!r->owner)
1833 				owner = 0;
1834 			else {
1835 				log_elock(ls, "send_all_plocks_data error owner %d r %llx",
1836 					  r->owner, (unsigned long long)r->number);
1837 				continue;
1838 			}
1839 	
1840 			memset(&send_buf, 0, sizeof(send_buf));
1841 			count = 0;
1842 			full = 0;
1843 			last = NULL;
1844 	
1845 			do {
1846 				full = pack_send_buf(ls, r, owner, full, &count, &last);
1847 	
1848 				len = sizeof(struct dlm_header) +
1849 				      sizeof(struct resource_data) +
1850 				      sizeof(struct plock_data) * count;
1851 	
1852 				log_plock(ls, "send_plocks_data %d:%u n %llu o %d locks %d len %d",
1853 					  our_nodeid, seq, (unsigned long long)r->number, r->owner,
1854 					  count, len);
1855 	
1856 				send_plocks_data(ls, seq, send_buf, len);
1857 	
1858 				send_count++;
1859 	
1860 			} while (full);
1861 		}
1862 	
1863 		*plocks_data = send_count;
1864 	
1865 		log_dlock(ls, "send_all_plocks_data %d:%u %u done",
1866 			  our_nodeid, seq, send_count);
1867 	}
1868 	
1869 	static void free_r_lists(struct resource *r)
1870 	{
1871 		struct posix_lock *po, *po2;
1872 		struct lock_waiter *w, *w2;
1873 	
1874 		list_for_each_entry_safe(po, po2, &r->locks, list) {
1875 			list_del(&po->list);
1876 			free(po);
1877 		}
1878 	
1879 		list_for_each_entry_safe(w, w2, &r->waiters, list) {
1880 			list_del(&w->list);
1881 			free(w);
1882 		}
1883 	}
1884 	
1885 	void receive_plocks_data(struct lockspace *ls, struct dlm_header *hd, int len)
1886 	{
1887 		struct resource_data *rd;
1888 		struct plock_data *pp;
1889 		struct posix_lock *po;
1890 		struct lock_waiter *w;
1891 		struct resource *r;
1892 		uint64_t num;
1893 		uint32_t count;
1894 		uint32_t flags;
1895 		int owner;
1896 		int i;
1897 	
1898 		if (!opt(enable_plock_ind) || ls->disable_plock)
1899 			return;
1900 	
1901 		if (!ls->need_plocks)
1902 			return;
1903 	
1904 		if (!ls->save_plocks)
1905 			return;
1906 	
1907 		ls->recv_plocks_data_count++;
1908 	
1909 		if (len < sizeof(struct dlm_header) + sizeof(struct resource_data)) {
1910 			log_elock(ls, "recv_plocks_data %d:%u bad len %d",
1911 				  hd->nodeid, hd->msgdata, len);
1912 			return;
1913 		}
1914 	
1915 		rd = (struct resource_data *)((char *)hd + sizeof(struct dlm_header));
1916 		num = le64_to_cpu(rd->number);
1917 		owner = le32_to_cpu(rd->owner);
1918 		count = le32_to_cpu(rd->lock_count);
1919 		flags = le32_to_cpu(rd->flags);
1920 	
1921 		if (flags & RD_CONTINUE) {
1922 			r = search_resource(ls, num);
1923 			if (!r) {
1924 				log_elock(ls, "recv_plocks_data %d:%u n %llu not found",
1925 					  hd->nodeid, hd->msgdata, (unsigned long long)num);
1926 				return;
1927 			}
1928 			log_plock(ls, "recv_plocks_data %d:%u n %llu continue",
1929 				  hd->nodeid, hd->msgdata, (unsigned long long)num);
1930 			goto unpack;
1931 		}
1932 	
1933 		r = malloc(sizeof(struct resource));
1934 		if (!r) {
1935 			log_elock(ls, "recv_plocks_data %d:%u n %llu no mem",
1936 				  hd->nodeid, hd->msgdata, (unsigned long long)num);
1937 			return;
1938 		}
1939 		memset(r, 0, sizeof(struct resource));
1940 		INIT_LIST_HEAD(&r->locks);
1941 		INIT_LIST_HEAD(&r->waiters);
1942 		INIT_LIST_HEAD(&r->pending);
1943 	
1944 		if (!opt(plock_ownership_ind)) {
1945 			if (owner) {
1946 				log_elock(ls, "recv_plocks_data %d:%u n %llu bad owner %d",
1947 					  hd->nodeid, hd->msgdata, (unsigned long long)num,
1948 					  owner);
1949 				goto fail_free;
1950 			}
1951 		} else {
1952 			if (!owner)
1953 				r->flags |= R_GOT_UNOWN;
1954 	
1955 			/* no locks should be included for owned resources */
1956 	
1957 			if (owner && count) {
1958 				log_elock(ls, "recv_plocks_data %d:%u n %llu o %d bad count %" PRIu32,
1959 					  hd->nodeid, hd->msgdata,
1960 					  (unsigned long long)num, owner, count);
1961 				goto fail_free;
1962 			}
1963 		}
1964 	
1965 		r->number = num;
1966 		r->owner = owner;
1967 	
1968 	 unpack:
1969 		if (len < sizeof(struct dlm_header) +
1970 			  sizeof(struct resource_data) +
1971 			  sizeof(struct plock_data) * count) {
1972 			log_elock(ls, "recv_plocks_data %d:%u count %u bad len %d",
1973 				  hd->nodeid, hd->msgdata, count, len);
1974 			goto fail_free;
1975 		}
1976 	
1977 		pp = (struct plock_data *)((char *)rd + sizeof(struct resource_data));
1978 	
1979 		for (i = 0; i < count; i++) {
1980 			if (!pp->waiter) {
1981 				po = malloc(sizeof(struct posix_lock));
1982 				if (!po)
1983 					goto fail_free;
1984 				po->start	= le64_to_cpu(pp->start);
1985 				po->end		= le64_to_cpu(pp->end);
1986 				po->owner	= le64_to_cpu(pp->owner);
1987 				po->pid		= le32_to_cpu(pp->pid);
1988 				po->nodeid	= le32_to_cpu(pp->nodeid);
1989 				po->ex		= pp->ex;
1990 				po->flags	= 0;
1991 				list_add_tail(&po->list, &r->locks);
1992 			} else {
1993 				w = malloc(sizeof(struct lock_waiter));
1994 				if (!w)
1995 					goto fail_free;
1996 				w->info.start	= le64_to_cpu(pp->start);
1997 				w->info.end	= le64_to_cpu(pp->end);
1998 				w->info.owner	= le64_to_cpu(pp->owner);
1999 				w->info.pid	= le32_to_cpu(pp->pid);
2000 				w->info.nodeid	= le32_to_cpu(pp->nodeid);
2001 				w->info.ex	= pp->ex;
2002 				w->flags	= 0;
2003 				list_add_tail(&w->list, &r->waiters);
2004 			}
2005 			pp++;
2006 		}
2007 	
2008 		log_plock(ls, "recv_plocks_data %d:%u n %llu o %d locks %d len %d",
2009 			  hd->nodeid, hd->msgdata, (unsigned long long)r->number,
2010 			  r->owner, count, len);
2011 	
2012 		if (!(flags & RD_CONTINUE)) {
2013 			list_add_tail(&r->list, &ls->plock_resources);
2014 			rb_insert_plock_resource(ls, r);
2015 		}
2016 		return;
2017 	
2018 	 fail_free:
2019 		if (!(flags & RD_CONTINUE)) {
2020 			free_r_lists(r);
2021 			free(r);
2022 		}
2023 		return;
2024 	}
2025 	
2026 	void clear_plocks_data(struct lockspace *ls)
2027 	{
2028 		struct resource *r, *r2;
2029 		uint32_t count = 0;
2030 	
2031 		if (!opt(enable_plock_ind) || ls->disable_plock)
2032 			return;
2033 	
2034 		list_for_each_entry_safe(r, r2, &ls->plock_resources, list) {
2035 			free_r_lists(r);
2036 			rb_del_plock_resource(ls, r);
2037 			list_del(&r->list);
2038 			free(r);
2039 			count++;
2040 		}
2041 	
2042 		log_dlock(ls, "clear_plocks_data done %u recv_plocks_data_count %u",
2043 			  count, ls->recv_plocks_data_count);
2044 	
2045 		ls->recv_plocks_data_count = 0;
2046 	}
2047 	
2048 	/* Called when a node has failed, or we're unmounting.  For a node failure, we
2049 	   need to call this when the cpg confchg arrives so that we're guaranteed all
2050 	   nodes do this in the same sequence wrt other messages. */
2051 	
2052 	void purge_plocks(struct lockspace *ls, int nodeid, int unmount)
2053 	{
2054 		struct posix_lock *po, *po2;
2055 		struct lock_waiter *w, *w2;
2056 		struct resource *r, *r2;
2057 		int purged = 0;
2058 	
2059 		if (!opt(enable_plock_ind) || ls->disable_plock)
2060 			return;
2061 	
2062 		list_for_each_entry_safe(r, r2, &ls->plock_resources, list) {
2063 			list_for_each_entry_safe(po, po2, &r->locks, list) {
2064 				if (po->nodeid == nodeid || unmount) {
2065 					list_del(&po->list);
2066 					free(po);
2067 					purged++;
2068 				}
2069 			}
2070 	
2071 			list_for_each_entry_safe(w, w2, &r->waiters, list) {
2072 				if (w->info.nodeid == nodeid || unmount) {
2073 					list_del(&w->list);
2074 					free(w);
2075 					purged++;
2076 				}
2077 			}
2078 	
2079 			/* TODO: haven't thought carefully about how this transition
2080 			   to owner 0 might interact with other owner messages in
2081 			   progress. */
2082 	
2083 			if (r->owner == nodeid) {
2084 				r->owner = 0;
2085 				r->flags |= R_GOT_UNOWN;
2086 				r->flags |= R_PURGE_UNOWN;
2087 				send_pending_plocks(ls, r);
2088 			}
2089 			
2090 			do_waiters(ls, r);
2091 	
2092 			if (!opt(plock_ownership_ind) &&
2093 			    list_empty(&r->locks) && list_empty(&r->waiters)) {
2094 				rb_del_plock_resource(ls, r);
2095 				list_del(&r->list);
2096 				free(r);
2097 			}
2098 		}
2099 		
2100 		if (purged)
2101 			ls->last_plock_time = monotime();
2102 	
2103 		log_dlock(ls, "purged %d plocks for %d", purged, nodeid);
2104 	}
2105 	
2106 	int copy_plock_state(struct lockspace *ls, char *buf, int *len_out)
2107 	{
2108 		struct posix_lock *po;
2109 		struct lock_waiter *w;
2110 		struct resource *r;
2111 		struct timeval now;
2112 		int rv = 0;
2113 		int len = DLMC_DUMP_SIZE, pos = 0, ret;
2114 	
2115 		gettimeofday(&now, NULL);
2116 	
2117 		list_for_each_entry(r, &ls->plock_resources, list) {
2118 	
2119 			if (list_empty(&r->locks) &&
2120 			    list_empty(&r->waiters) &&
2121 			    list_empty(&r->pending)) {
2122 				ret = snprintf(buf + pos, len - pos,
2123 				      "%llu rown %d unused_ms %llu\n",
2124 				      (unsigned long long)r->number, r->owner,
2125 				      (unsigned long long)time_diff_ms(&r->last_access,
2126 					      			       &now));
2127 				if (ret >= len - pos) {
2128 					rv = -ENOSPC;
2129 					goto out;
2130 				}
2131 				pos += ret;
2132 				continue;
2133 			}
2134 	
2135 			list_for_each_entry(po, &r->locks, list) {
2136 				ret = snprintf(buf + pos, len - pos,
2137 				      "%llu %s %llu-%llu nodeid %d pid %u owner %llx rown %d\n",
2138 				      (unsigned long long)r->number,
2139 				      po->ex ? "WR" : "RD",
2140 				      (unsigned long long)po->start,
2141 				      (unsigned long long)po->end,
2142 				      po->nodeid, po->pid,
2143 				      (unsigned long long)po->owner, r->owner);
2144 	
2145 				if (ret >= len - pos) {
2146 					rv = -ENOSPC;
2147 					goto out;
2148 				}
2149 				pos += ret;
2150 			}
2151 	
2152 			list_for_each_entry(w, &r->waiters, list) {
2153 				ret = snprintf(buf + pos, len - pos,
2154 				      "%llu %s %llu-%llu nodeid %d pid %u owner %llx rown %d WAITING\n",
2155 				      (unsigned long long)r->number,
2156 				      w->info.ex ? "WR" : "RD",
2157 				      (unsigned long long)w->info.start,
2158 				      (unsigned long long)w->info.end,
2159 				      w->info.nodeid, w->info.pid,
2160 				      (unsigned long long)w->info.owner, r->owner);
2161 	
2162 				if (ret >= len - pos) {
2163 					rv = -ENOSPC;
2164 					goto out;
2165 				}
2166 				pos += ret;
2167 			}
2168 	
2169 			list_for_each_entry(w, &r->pending, list) {
2170 				ret = snprintf(buf + pos, len - pos,
2171 				      "%llu %s %llu-%llu nodeid %d pid %u owner %llx rown %d PENDING\n",
2172 				      (unsigned long long)r->number,
2173 				      w->info.ex ? "WR" : "RD",
2174 				      (unsigned long long)w->info.start,
2175 				      (unsigned long long)w->info.end,
2176 				      w->info.nodeid, w->info.pid,
2177 				      (unsigned long long)w->info.owner, r->owner);
2178 	
2179 				if (ret >= len - pos) {
2180 					rv = -ENOSPC;
2181 					goto out;
2182 				}
2183 				pos += ret;
2184 			}
2185 		}
2186 	 out:
2187 		*len_out = pos;
2188 		return rv;
2189 	}
2190 	
2191