1    	/*
2    	 * Copyright 2004-2012 Red Hat, Inc.
3    	 *
4    	 * This copyrighted material is made available to anyone wishing to use,
5    	 * modify, copy, or redistribute it subject to the terms and conditions
6    	 * of the GNU General Public License v2 or (at your option) any later version.
7    	 */
8    	
9    	/*
10   	 * . application in the kernel or userspace asks to join
11   	 *   a lockspace
12   	 *
13   	 * . dlm_new_lockspace() in the kernel sets up basic
14   	 *   lockspace structures, then sends a uevent to
15   	 *   dlm_controld in userspace (KOBJ_ONLINE), asking
16   	 *   dlm_controld to join the corosync group ("cpg")
17   	 *   for the lockspace.  dlm_new_lockspace() blocks
18   	 *   waiting for a reply from dlm_controld (the reply is
19   	 *   a write to the "event_done" sysfs file).
20   	 *   When the successful reply comes back to dlm-kernel
21   	 *   from dlm_controld, dlm-kernel knows it is now a member
22   	 *   of the lockspace membership (represented in userspace
23   	 *   by the corosync cpg), and can do locking with the
24   	 *   other members.  Before sending event_done to the kernel,
25   	 *   dlm_controld tells dlm-kernel who the other lockspace
26   	 *   members are via configfs settings.
27   	 *
28   	 * . When dlm_controld gets a request from dlm-kernel to
29   	 *   join a lockspace, it runs dlm_join_lockspace() which
30   	 *   calls cpg_join() to join the corosync group representing
31   	 *   the members of the lockspace.  dlm_controld will get
32   	 *   callbacks from corosync when membership of this cpg 
33   	 *   changes (joins/leaves/failures).  After calling
34   	 *   cpg_join(), dlm_controld waits for the first
35   	 *   corosync membership callback indicating it is now
36   	 *   a member of the cpg.  The callback function for
37   	 *   cpg membership changes is confchg_cb().  Corosync
38   	 *   guarantees that all members of the cpg see the
39   	 *   same sequence of confchg callbacks (e.g. if a number
40   	 *   of nodes are joining/leaving/failing at once).
41   	 *   When the first confchg arrives after cpg_join(),
42   	 *   dlm_controld sets up the current members for dlm-kernel
43   	 *   via configfs, then writes to event_done in sysfs to start
44   	 *   dlm-kernel running.
45   	 *
46   	 * . When a cpg member joins/leaves/fails, dlm_controld
47   	 *   on all current members gets a confchg callback
48   	 *   showing the new members.  dlm_controld then stops
49   	 *   dlm-kernel locking activity for that lockspace by
50   	 *   writing 0 to the "control" sysfs file.
51   	 *   dlm_controld then sends/recvs cpg messages to all
52   	 *   other cpg members to act as barrier to ensure all
53   	 *   members have stopped locking activity in the kernel
54   	 *   (apply_changes()).  When all have done this,
55   	 *   dlm_controld on all the members then sets up the
56   	 *   new members in the kernel (via configfs) and tells
57   	 *   dlm-kernel to start the lockspace again (start_kernel()).
58   	 *
59   	 * . When dlm-kernel is started after being stopped, it does
60   	 *   lockspace recovery based on changes to the membership.
61   	 *   When recovery is done, normal locking activity resumes.
62   	 *
63   	 * Replacing dlm_controld is a matter doing the following
64   	 * steps by either manually setting up sysfs and configfs,
65   	 * or having a new daemon to do it:
66   	 *
67   	 * - decide who the lockspace members are
68   	 * - stop dlm-kernel before changing lockspace members (write to sysfs)
69   	 * - wait for previous step on all before making changes
70   	 * - tell dlm-kernel member nodeids/IPs in configfs (write to configfs)
71   	 * - start dlm-kernel (write to sysfs)
72   	 *
73   	 * echo 0/1 into /sys/kernel/dlm/foo/control and /sys/kernel/dlm/foo/event_done
74   	 * echo/mkdir/write values into /sys/kernel/config/dlm/cluster/comms/ and
75   	 * /sys/kernel/config/dlm/cluster/spaces/foo/
76   	 */
77   	
78   	#include "dlm_daemon.h"
79   	
80   	#define log_limit(ls, fmt, args...) ({        \
81   		static uint32_t __change_nr;          \
82   		if (ls->change_seq > __change_nr) {   \
83   			__change_nr = ls->change_seq; \
84   			log_group(ls, fmt, ##args);   \
85   		}                                     \
86   	})
87   	
88   	/* retries are once a second */
89   	#define log_retry(ls, fmt, args...) ({ \
90   		if (ls->wait_retry < 60) \
91   			log_group(ls, fmt, ##args); \
92   		else if (ls->wait_retry == 60) \
93   			log_erros(ls, fmt, ##args); \
94   	        else if (!(ls->wait_retry % 3600)) \
95   	                log_erros(ls, fmt, ##args); \
96   	})
97   	
98   	/* per lockspace cpg: ls->node_history */
99   	
100  	struct node {
101  		struct list_head list;
102  		int nodeid;
103  	
104  		uint64_t lockspace_add_time;
105  		uint64_t lockspace_rem_time;
106  		uint64_t lockspace_fail_time;
107  		uint32_t lockspace_add_seq;
108  		uint32_t lockspace_rem_seq;
109  		uint32_t lockspace_fail_seq;
110  		int lockspace_member;
111  		int lockspace_fail_reason;
112  	
113  		uint32_t last_match_seq;
114  	
115  		uint64_t start_time;
116  	
117  		int check_fs;
118  		int fs_notified;
119  	
120  		int need_fencing;
121  		uint32_t fence_queries;	/* for debug */
122  		uint64_t fail_walltime;
123  		uint64_t fail_monotime;
124  	};
125  	
126  	/* per lockspace confchg: ls->changes */
127  	
128  	#define CGST_WAIT_CONDITIONS 1
129  	#define CGST_WAIT_MESSAGES   2
130  	
131  	struct change {
132  		struct list_head list;
133  		struct list_head members;
134  		struct list_head removed; /* nodes removed by this change */
135  		int member_count;
136  		int joined_count;
137  		int remove_count;
138  		int failed_count;
139  		int state;
140  		int we_joined;
141  		uint32_t seq; /* used as a reference for debugging, and for queries */
142  		uint32_t combined_seq; /* for queries */
143  		uint64_t create_time;
144  	};
145  	
146  	/* per lockspace change member: cg->members */
147  	
148  	struct member {
149  		struct list_head list;
150  		int nodeid;
151  		int start;   /* 1 if we received a start message for this change */
152  		int added;   /* 1 if added by this change */
153  		int failed;  /* 1 if failed in this change */
154  		int disallowed;
155  		uint32_t start_flags;
156  	};
157  	
158  	struct ls_info {
159  		uint32_t ls_info_size;
160  		uint32_t id_info_size;
161  		uint32_t id_info_count;
162  	
163  		uint32_t started_count;
164  	
165  		int member_count;
166  		int joined_count;
167  		int remove_count;
168  		int failed_count;
169  	};
170  	
171  	struct id_info {
172  		int nodeid;
173  	};
174  	
175  	static void ls_info_in(struct ls_info *li)
176  	{
177  		li->ls_info_size  = le32_to_cpu(li->ls_info_size);
178  		li->id_info_size  = le32_to_cpu(li->id_info_size);
179  		li->id_info_count = le32_to_cpu(li->id_info_count);
180  		li->started_count = le32_to_cpu(li->started_count);
181  		li->member_count  = le32_to_cpu(li->member_count);
182  		li->joined_count  = le32_to_cpu(li->joined_count);
183  		li->remove_count  = le32_to_cpu(li->remove_count);
184  		li->failed_count  = le32_to_cpu(li->failed_count);
185  	}
186  	
187  	static void id_info_in(struct id_info *id)
188  	{
189  		id->nodeid = le32_to_cpu(id->nodeid);
190  	}
191  	
192  	static void ids_in(struct ls_info *li, struct id_info *ids)
193  	{
194  		struct id_info *id;
195  		int i;
196  	
197  		id = ids;
198  		for (i = 0; i < li->id_info_count; i++) {
199  			id_info_in(id);
200  			id = (struct id_info *)((char *)id + li->id_info_size);
201  		}
202  	}
203  	
204  	static struct member *find_memb(struct change *cg, int nodeid)
205  	{
206  		struct member *memb;
207  	
208  		list_for_each_entry(memb, &cg->members, list) {
209  			if (memb->nodeid == nodeid)
210  				return memb;
211  		}
212  		return NULL;
213  	}
214  	
215  	static struct lockspace *find_ls_handle(cpg_handle_t h)
216  	{
217  		struct lockspace *ls;
218  	
219  		list_for_each_entry(ls, &lockspaces, list) {
220  			if (ls->cpg_handle == h)
221  				return ls;
222  		}
223  		return NULL;
224  	}
225  	
226  	static struct lockspace *find_ls_ci(int ci)
227  	{
228  		struct lockspace *ls;
229  	
230  		list_for_each_entry(ls, &lockspaces, list) {
231  			if (ls->cpg_client == ci)
232  				return ls;
233  		}
234  		return NULL;
235  	}
236  	
237  	static void free_cg(struct change *cg)
238  	{
239  		struct member *memb, *safe;
240  	
(1) Event dereference: Dereferencing pointer "cg".
241  		list_for_each_entry_safe(memb, safe, &cg->members, list) {
242  			list_del(&memb->list);
243  			free(memb);
244  		}
245  		list_for_each_entry_safe(memb, safe, &cg->removed, list) {
246  			list_del(&memb->list);
247  			free(memb);
248  		}
249  		free(cg);
250  	}
251  	
252  	static void free_ls(struct lockspace *ls)
253  	{
254  		struct change *cg, *cg_safe;
255  		struct node *node, *node_safe;
256  	
257  		list_for_each_entry_safe(cg, cg_safe, &ls->changes, list) {
258  			list_del(&cg->list);
259  			free_cg(cg);
260  		}
261  	
262  		if (ls->started_change)
263  			free_cg(ls->started_change);
264  	
265  		list_for_each_entry_safe(node, node_safe, &ls->node_history, list) {
266  			list_del(&node->list);
267  			free(node);
268  		}
269  	
270  		free(ls);
271  	}
272  	
273  	
274  	/* Problem scenario:
275  	   nodes A,B,C are in fence domain
276  	   node C has gfs foo mounted
277  	   node C fails
278  	   nodes A,B begin fencing C (slow, not completed)
279  	   node B mounts gfs foo
280  	
281  	   We may end up having gfs foo mounted and being used on B before
282  	   C has been fenced.  C could wake up corrupt fs.
283  	
284  	   So, we need to prevent any new gfs mounts while there are any
285  	   outstanding, incomplete fencing operations.
286  	
287  	   We also need to check that the specific failed nodes we know about have
288  	   been fenced (since fenced may not even have been notified that the node
289  	   has failed yet).
290  	
291  	   So, check that:
292  	   1. has fenced fenced the node since we saw it fail?
293  	   2. fenced has no outstanding fencing ops
294  	
295  	   For 1:
296  	   - node X fails
297  	   - we see node X fail and X has non-zero start_time,
298  	     set need_fencing and record the fail time
299  	   - wait for X to be removed from all dlm cpg's  (probably not necessary)
300  	   - check that the fencing time is later than the recorded time above
301  	
302  	   Tracking fencing state when there are spurious partitions/merges...
303  	
304  	   from a spurious leave/join of node X, a lockspace will see:
305  	   - node X is a lockspace member
306  	   - node X fails, may be waiting for all cpgs to see failure or for fencing to
307  	     complete
308  	   - node X joins the lockspace - we want to process the change as usual, but
309  	     don't want to disrupt the code waiting for the fencing, and we want to
310  	     continue running properly once the remerged node is properly reset
311  	
312  	   ls->node_history
313  	   when we see a node not in this list, add entry for it with zero start_time
314  	   record the time we get a good start message from the node, start_time
315  	   clear start_time if the node leaves
316  	   if node fails with non-zero start_time, set need_fencing
317  	   when a node is fenced, clear start_time and clear need_fencing
318  	   if a node remerges after this, no good start message, no new start_time set
319  	   if a node fails with zero start_time, it doesn't need fencing
320  	   if a node remerges before it's been fenced, no good start message, no new
321  	   start_time set 
322  	*/
323  	
324  	static struct node *get_node_history(struct lockspace *ls, int nodeid)
325  	{
326  		struct node *node;
327  	
328  		list_for_each_entry(node, &ls->node_history, list) {
329  			if (node->nodeid == nodeid)
330  				return node;
331  		}
332  		return NULL;
333  	}
334  	
335  	static struct node *get_node_history_create(struct lockspace *ls, int nodeid)
336  	{
337  		struct node *node;
338  	
339  		node = get_node_history(ls, nodeid);
340  		if (node)
341  			return node;
342  	
343  		node = malloc(sizeof(struct node));
344  		if (!node)
345  			return NULL;
346  		memset(node, 0, sizeof(struct node));
347  	
348  		node->nodeid = nodeid;
349  		list_add_tail(&node->list, &ls->node_history);
350  		return node;
351  	}
352  	
353  	static void node_history_lockspace_add(struct lockspace *ls, int nodeid,
354  					       struct change *cg, uint64_t now)
355  	{
356  		struct node *node;
357  	
358  		node = get_node_history_create(ls, nodeid);
359  		if (!node) {
360  			log_error("node_history_lockspace_add no nodeid %d", nodeid);
361  			return;
362  		}
363  	
364  		node->lockspace_add_time = now;
365  		node->lockspace_add_seq = cg->seq;
366  		node->lockspace_member = 1;
367  	}
368  	
369  	static void node_history_lockspace_left(struct lockspace *ls, int nodeid,
370  						struct change *cg, uint64_t now)
371  	{
372  		struct node *node;
373  	
374  		node = get_node_history(ls, nodeid);
375  		if (!node) {
376  			log_error("node_history_lockspace_left no nodeid %d", nodeid);
377  			return;
378  		}
379  	
380  		node->start_time = 0;
381  	
382  		node->lockspace_rem_time = now;
383  		node->lockspace_rem_seq = cg->seq;	/* for queries */
384  		node->lockspace_member = 0;
385  	}
386  	
387  	static void node_history_lockspace_fail(struct lockspace *ls, int nodeid,
388  						struct change *cg, int reason,
389  						uint64_t now)
390  	{
391  		struct node *node;
392  	
393  		node = get_node_history(ls, nodeid);
394  		if (!node) {
395  			log_error("node_history_lockspace_fail no nodeid %d", nodeid);
396  			return;
397  		}
398  	
399  		if (opt(enable_fencing_ind) && node->start_time) {
400  			node->need_fencing = 1;
401  			node->fence_queries = 0;
402  		}
403  	
404  		if (ls->fs_registered) {
405  			log_group(ls, "check_fs nodeid %d set", nodeid);
406  			node->check_fs = 1;
407  		}
408  	
409  		node->lockspace_rem_time = now;
410  		node->lockspace_rem_seq = cg->seq;	/* for queries */
411  		node->lockspace_member = 0;
412  		node->lockspace_fail_time = now;
413  		node->lockspace_fail_seq = node->lockspace_rem_seq;
414  		node->lockspace_fail_reason = reason;	/* for queries */
415  	
416  		node->fail_monotime = now;
417  		node->fail_walltime = time(NULL);
418  	}
419  	
420  	static void node_history_start(struct lockspace *ls, int nodeid)
421  	{
422  		struct node *node;
423  		
424  		node = get_node_history(ls, nodeid);
425  		if (!node) {
426  			log_error("node_history_start no nodeid %d", nodeid);
427  			return;
428  		}
429  	
430  		node->start_time = monotime();
431  	}
432  	
433  	/* wait for cluster ringid and cpg ringid to be the same so we know our
434  	   information from each service is based on the same node state */
435  	
436  	static int check_ringid_done(struct lockspace *ls)
437  	{
438  		/* If we've received a confchg due to a nodedown, but not
439  		   the corresponding ringid callback, then we should wait
440  		   for the ringid callback.  Once we have both conf and ring
441  		   callbacks, we can compare cpg/quorum ringids.
442  		   
443  		   Otherwise, there's a possible problem if we receive a
444  		   confchg before both ringid callback and quorum callback.
445  		   Then we'd get through this function by comparing the old,
446  		   matching ringids.
447  	
448  		   (We seem to usually get the quorum callback before any cpg
449  		   callbacks, in which case we wouldn't need cpg_ringid_wait,
450  		   but that's probably not guaranteed.) */
451  	
452  		if (ls->cpg_ringid_wait) {
453  			log_group(ls, "check_ringid wait cluster %llu cpg %u:%llu",
454  				  (unsigned long long)cluster_ringid_seq,
455  				  ls->cpg_ringid.nodeid,
456  				  (unsigned long long)ls->cpg_ringid.seq);
457  			return 0;
458  		}
459  	
460  		if (cluster_ringid_seq != ls->cpg_ringid.seq) {
461  			log_group(ls, "check_ringid cluster %llu cpg %u:%llu",
462  				  (unsigned long long)cluster_ringid_seq,
463  				  ls->cpg_ringid.nodeid,
464  				  (unsigned long long)ls->cpg_ringid.seq);
465  			return 0;
466  		}
467  	
468  		log_limit(ls, "check_ringid done cluster %llu cpg %u:%llu",
469  			  (unsigned long long)cluster_ringid_seq,
470  			  ls->cpg_ringid.nodeid,
471  			  (unsigned long long)ls->cpg_ringid.seq);
472  	
473  		return 1;
474  	}
475  	
476  	static int check_fencing_done(struct lockspace *ls)
477  	{
478  		struct node *node;
479  		uint64_t fence_monotime;
480  		int wait_count = 0;
481  		int rv, in_progress;
482  	
483  		if (!opt(enable_fencing_ind)) {
484  			log_group(ls, "check_fencing disabled");
485  			return 1;
486  		}
487  	
488  		list_for_each_entry(node, &ls->node_history, list) {
489  			if (!node->need_fencing)
490  				continue;
491  	
492  			rv = fence_node_time(node->nodeid, &fence_monotime);
493  			if (rv < 0) {
494  				log_error("fenced_node_time error %d", rv);
495  				continue;
496  			}
497  	
498  			if (fence_monotime >= node->fail_monotime) {
499  				log_group(ls, "check_fencing %d done start %llu fail %llu fence %llu",
500  					  node->nodeid,
501  					  (unsigned long long)node->start_time,
502  					  (unsigned long long)node->fail_monotime,
503  					  (unsigned long long)fence_monotime);
504  	
505  				node->need_fencing = 0;
506  				node->start_time = 0;
507  				continue;
508  			} else {
509  				if (!node->fence_queries) {
510  					log_group(ls, "check_fencing %d wait start %llu fail %llu",
511  						  node->nodeid,
512  						 (unsigned long long)node->start_time,
513  						 (unsigned long long)node->fail_monotime);
514  					node->fence_queries++;
515  				}
516  				wait_count++;
517  				continue;
518  			}
519  		}
520  	
521  		if (wait_count) {
522  			log_limit(ls, "check_fencing wait_count %d", wait_count);
523  			return 0;
524  		}
525  	
526  		/* now check if there are any outstanding fencing ops (for nodes
527  		   we may not have seen in any lockspace), and return 0 if there
528  		   are any */
529  	
530  		rv = fence_in_progress(&in_progress);
531  		if (rv < 0) {
532  			log_error("fenced_domain_info error %d", rv);
533  			return 0;
534  		}
535  	
536  		if (in_progress) {
537  			log_limit(ls, "check_fencing in progress %d", in_progress);
538  			return 0;
539  		}
540  	
541  		log_group(ls, "check_fencing done");
542  		return 1;
543  	}
544  	
545  	/* wait for local fs_controld to ack each failed node */
546  	
547  	static int check_fs_done(struct lockspace *ls)
548  	{
549  		struct node *node;
550  		int wait_count = 0;
551  	
552  		/* no corresponding fs for this lockspace */
553  		if (!ls->fs_registered)
554  			return 1;
555  	
556  		list_for_each_entry(node, &ls->node_history, list) {
557  			if (!node->check_fs)
558  				continue;
559  	
560  			if (node->fs_notified) {
561  				log_group(ls, "check_fs nodeid %d clear", node->nodeid);
562  				node->check_fs = 0;
563  				node->fs_notified = 0;
564  			} else {
565  				log_group(ls, "check_fs nodeid %d needs fs notify",
566  					  node->nodeid);
567  				wait_count++;
568  			}
569  		}
570  	
571  		if (wait_count)
572  			return 0;
573  	
574  		log_group(ls, "check_fs done");
575  		return 1;
576  	}
577  	
578  	static int member_ids[MAX_NODES];
579  	static int member_count;
580  	static int renew_ids[MAX_NODES];
581  	static int renew_count;
582  	
583  	static void format_member_ids(struct lockspace *ls)
584  	{
585  		struct change *cg = list_first_entry(&ls->changes, struct change, list);
586  		struct member *memb;
587  	
588  		memset(member_ids, 0, sizeof(member_ids));
589  		member_count = 0;
590  	
591  		list_for_each_entry(memb, &cg->members, list)
592  			member_ids[member_count++] = memb->nodeid;
593  	}
594  	
595  	/* list of nodeids that have left and rejoined since last start_kernel;
596  	   is any member of startcg in the left list of any other cg's?
597  	   (if it is, then it presumably must be flagged added in another) */
598  	
599  	static void format_renew_ids(struct lockspace *ls)
600  	{
601  		struct change *cg, *startcg;
602  		struct member *memb, *leftmemb;
603  	
604  		startcg = list_first_entry(&ls->changes, struct change, list);
605  	
606  		memset(renew_ids, 0, sizeof(renew_ids));
607  		renew_count = 0;
608  	
609  		list_for_each_entry(memb, &startcg->members, list) {
610  			list_for_each_entry(cg, &ls->changes, list) {
611  				if (cg == startcg)
612  					continue;
613  				list_for_each_entry(leftmemb, &cg->removed, list) {
614  					if (memb->nodeid == leftmemb->nodeid) {
615  						renew_ids[renew_count++] = memb->nodeid;
616  					}
617  				}
618  			}
619  		}
620  	
621  	}
622  	
623  	static void start_kernel(struct lockspace *ls)
624  	{
625  		struct change *cg = list_first_entry(&ls->changes, struct change, list);
626  	
627  		if (!ls->kernel_stopped) {
628  			log_error("start_kernel cg %u not stopped", cg->seq);
629  			return;
630  		}
631  	
632  		log_group(ls, "start_kernel cg %u member_count %d",
633  			  cg->seq, cg->member_count);
634  	
635  		/* needs to happen before setting control which starts recovery */
636  		if (ls->joining)
637  			set_sysfs_id(ls->name, ls->global_id);
638  	
639  		if (ls->nodir)
640  			set_sysfs_nodir(ls->name, 1);
641  	
642  		format_member_ids(ls);
643  		format_renew_ids(ls);
644  		set_configfs_members(ls, ls->name, member_count, member_ids,
645  				     renew_count, renew_ids);
646  		set_sysfs_control(ls->name, 1);
647  		ls->kernel_stopped = 0;
648  	
649  		if (ls->joining) {
650  			set_sysfs_event_done(ls->name, 0);
651  			ls->joining = 0;
652  		}
653  	}
654  	
655  	void cpg_stop_kernel(struct lockspace *ls)
656  	{
657  		if (!ls->kernel_stopped) {
658  			log_group(ls, "%s", __func__);
659  			set_sysfs_control(ls->name, 0);
660  			ls->kernel_stopped = 1;
661  		}
662  	}
663  	
664  	static void stop_kernel(struct lockspace *ls, uint32_t seq)
665  	{
666  		log_group(ls, "%s seq %u", __func__, seq);
667  		cpg_stop_kernel(ls);
668  	}
669  	
670  	/* the first condition is that the local lockspace is stopped which we
671  	   don't need to check for because stop_kernel(), which is synchronous,
672  	   was done when the change was created */
673  	
674  	/* the fencing/quorum/fs conditions need to account for all the changes
675  	   that have occured since the last change applied to dlm-kernel, not
676  	   just the latest change */
677  	
678  	/* we know that the cluster_quorate value here is consistent with the cpg events
679  	   because the ringid's are in sync per the check_ringid_done */
680  	
681  	static int wait_conditions_done(struct lockspace *ls)
682  	{
683  		if (!check_ringid_done(ls)) {
684  			if (ls->wait_debug != DLMC_LS_WAIT_RINGID) {
685  				ls->wait_debug = DLMC_LS_WAIT_RINGID;
686  				ls->wait_retry = 0;
687  			}
688  			ls->wait_retry++;
689  			/* the check function logs a message */
690  	
691  			poll_lockspaces++;
692  			return 0;
693  		}
694  	
695  		if (opt(enable_quorum_lockspace_ind) && !cluster_quorate) {
696  			if (ls->wait_debug != DLMC_LS_WAIT_QUORUM) {
697  				ls->wait_debug = DLMC_LS_WAIT_QUORUM;
698  				ls->wait_retry = 0;
699  			}
700  			ls->wait_retry++;
701  			log_retry(ls, "wait for quorum");
702  	
703  			poll_lockspaces++;
704  			return 0;
705  		}
706  	
707  		if (!check_fencing_done(ls)) {
708  			if (ls->wait_debug != DLMC_LS_WAIT_FENCING) {
709  				ls->wait_debug = DLMC_LS_WAIT_FENCING;
710  				ls->wait_retry = 0;
711  			}
712  			ls->wait_retry++;
713  			log_retry(ls, "wait for fencing");
714  	
715  			poll_lockspaces++;
716  			return 0;
717  		}
718  	
719  		if (!check_fs_done(ls)) {
720  			if (ls->wait_debug != DLMC_LS_WAIT_FSDONE) {
721  				ls->wait_debug = DLMC_LS_WAIT_FSDONE;
722  				ls->wait_retry = 0;
723  			}
724  			ls->wait_retry++;
725  			log_retry(ls, "wait for fsdone");
726  	
727  			poll_fs++;
728  			return 0;
729  		}
730  	
731  		ls->wait_debug = 0;
732  		ls->wait_retry = 0;
733  	
734  		return 1;
735  	}
736  	
737  	static int wait_messages_done(struct lockspace *ls)
738  	{
739  		struct change *cg = list_first_entry(&ls->changes, struct change, list);
740  		struct member *memb;
741  		int need = 0, total = 0;
742  	
743  		list_for_each_entry(memb, &cg->members, list) {
744  			if (!memb->start)
745  				need++;
746  			total++;
747  		}
748  	
749  		if (need) {
750  			log_group(ls, "wait_messages cg %u need %d of %d",
751  				  cg->seq, need, total);
752  			ls->wait_debug = need;
753  			return 0;
754  		}
755  	
756  		log_group(ls, "wait_messages cg %u got all %d", cg->seq, total);
757  	
758  		ls->wait_debug = 0;
759  	
760  		return 1;
761  	}
762  	
763  	static void cleanup_changes(struct lockspace *ls)
764  	{
765  		struct change *cg = list_first_entry(&ls->changes, struct change, list);
766  		struct change *safe;
767  	
768  		list_del(&cg->list);
769  		if (ls->started_change)
770  			free_cg(ls->started_change);
771  		ls->started_change = cg;
772  	
773  		ls->started_count++;
774  		if (!ls->started_count)
775  			ls->started_count++;
776  	
777  		cg->combined_seq = cg->seq; /* for queries */
778  	
779  		list_for_each_entry_safe(cg, safe, &ls->changes, list) {
780  			ls->started_change->combined_seq = cg->seq; /* for queries */
781  			list_del(&cg->list);
782  			free_cg(cg);
783  		}
784  	}
785  	
786  	/* There's a stream of confchg and messages. At one of these
787  	   messages, the low node needs to store plocks and new nodes
788  	   need to begin saving plock messages.  A second message is
789  	   needed to say that the plocks are ready to be read.
790  	
791  	   When the last start message is recvd for a change, the low node
792  	   stores plocks and the new nodes begin saving messages.  When the
793  	   store is done, low node sends plocks_stored message.  When
794  	   new nodes recv this, they read the plocks and their saved messages.
795  	   plocks_stored message should identify a specific change, like start
796  	   messages do; if it doesn't match ls->started_change, then it's ignored.
797  	
798  	   If a confchg adding a new node arrives after plocks are stored but
799  	   before plocks_stored msg recvd, then the message is ignored.  The low
800  	   node will send another plocks_stored message for the latest change
801  	   (although it may be able to reuse the ckpt if no plock state has changed).
802  	*/
803  	
804  	static void set_plock_data_node(struct lockspace *ls)
805  	{
806  		struct change *cg = list_first_entry(&ls->changes, struct change, list);
807  		struct member *memb;
808  		int low = 0;
809  	
810  		list_for_each_entry(memb, &cg->members, list) {
811  			if (!(memb->start_flags & DLM_MFLG_HAVEPLOCK))
812  				continue;
813  	
814  			if (!low || memb->nodeid < low)
815  				low = memb->nodeid;
816  		}
817  	
818  		log_dlock(ls, "set_plock_data_node from %d to %d",
819  			  ls->plock_data_node, low);
820  	
821  		ls->plock_data_node = low;
822  	}
823  	
824  	static struct id_info *get_id_struct(struct id_info *ids, int count, int size,
825  					     int nodeid)
826  	{
827  		struct id_info *id = ids;
828  		int i;
829  	
830  		for (i = 0; i < count; i++) {
831  			if (id->nodeid == nodeid)
832  				return id;
833  			id = (struct id_info *)((char *)id + size);
834  		}
835  		return NULL;
836  	}
837  	
838  	/* do the change details in the message match the details of the given change */
839  	
840  	static int match_change(struct lockspace *ls, struct change *cg,
841  				struct dlm_header *hd, struct ls_info *li,
842  				struct id_info *ids)
843  	{
844  		struct id_info *id;
845  		struct member *memb;
846  		struct node *node;
847  		uint64_t t;
848  		uint32_t seq = hd->msgdata;
849  		int i, members_mismatch;
850  	
851  		/* We can ignore messages if we're not in the list of members.
852  		   The one known time this will happen is after we've joined
853  		   the cpg, we can get messages for changes prior to the change
854  		   in which we're added. */
855  	
856  		id = get_id_struct(ids, li->id_info_count, li->id_info_size,our_nodeid);
857  	
858  		if (!id) {
859  			log_group(ls, "match_change %d:%u skip %u we are not in members",
860  				  hd->nodeid, seq, cg->seq);
861  			return 0;
862  		}
863  	
864  		memb = find_memb(cg, hd->nodeid);
865  		if (!memb) {
866  			log_group(ls, "match_change %d:%u skip %u sender not member",
867  				  hd->nodeid, seq, cg->seq);
868  			return 0;
869  		}
870  	
871  		if (memb->start_flags & DLM_MFLG_NACK) {
872  			log_group(ls, "match_change %d:%u skip %u is nacked",
873  				  hd->nodeid, seq, cg->seq);
874  			return 0;
875  		}
876  	
877  		if (memb->start && hd->type == DLM_MSG_START) {
878  			log_group(ls, "match_change %d:%u skip %u already start",
879  				  hd->nodeid, seq, cg->seq);
880  			return 0;
881  		}
882  	
883  		/* a node's start can't match a change if the node joined the cluster
884  		   more recently than the change was created */
885  	
886  		node = get_node_history(ls, hd->nodeid);
887  		if (!node) {
888  			log_group(ls, "match_change %d:%u skip cg %u no node history",
889  				  hd->nodeid, seq, cg->seq);
890  			return 0;
891  		}
892  	
893  		t = cluster_add_time(node->nodeid);
894  		if (t > cg->create_time) {
895  			log_group(ls, "match_change %d:%u skip cg %u created %llu "
896  				  "cluster add %llu", hd->nodeid, seq, cg->seq,
897  				  (unsigned long long)cg->create_time,
898  				  (unsigned long long)t);
899  	
900  			/* nacks can apply to older cg's */
901  			if (!(hd->flags & DLM_MFLG_NACK)) {
902  				return 0;
903  			} else {
904  				log_group(ls, "match_change %d:%u unskip cg %u for nack",
905  					  hd->nodeid, seq, cg->seq);
906  			}
907  		}
908  	
909  		if (node->last_match_seq > cg->seq) {
910  			log_group(ls, "match_change %d:%u skip cg %u last matched cg %u",
911  				  hd->nodeid, seq, cg->seq, node->last_match_seq);
912  			return 0;
913  		}
914  	
915  		/* verify this is the right change by matching the counts
916  		   and the nodeids of the current members */
917  	
918  		if (li->member_count != cg->member_count ||
919  		    li->joined_count != cg->joined_count ||
920  		    li->remove_count != cg->remove_count ||
921  		    li->failed_count != cg->failed_count) {
922  			log_group(ls, "match_change %d:%u skip %u expect counts "
923  				  "%d %d %d %d", hd->nodeid, seq, cg->seq,
924  				  cg->member_count, cg->joined_count,
925  				  cg->remove_count, cg->failed_count);
926  			return 0;
927  		}
928  	
929  		members_mismatch = 0;
930  		id = ids;
931  	
932  		for (i = 0; i < li->id_info_count; i++) {
933  			memb = find_memb(cg, id->nodeid);
934  			if (!memb) {
935  				log_group(ls, "match_change %d:%u skip %u no memb %d",
936  				  	  hd->nodeid, seq, cg->seq, id->nodeid);
937  				members_mismatch = 1;
938  				break;
939  			}
940  			id = (struct id_info *)((char *)id + li->id_info_size);
941  		}
942  	
943  		if (members_mismatch)
944  			return 0;
945  	
946  		/* Not completely sure if this is a valid assertion or not, i.e. not
947  		   sure if we really never want to nack our first and only cg.  I have
948  		   seen one case in which a node incorrectly accepted nacks for cg seq
949  		   1 and ls change_seq 1.  (It was the secondary effect of another bug.)
950  	
951  		   Or, it's possible that this should apply a little more broadly as:
952  		   don't nack our most recent cg, i.e. cg->seq == ls->change_seq (1 or
953  		   otherwise).  I'm hoping to find a test case that will exercise this
954  		   to clarify the situation here, and then update this comment. */
955  	
956  		if (cg->seq == 1 && ls->change_seq == 1 && (hd->flags & DLM_MFLG_NACK)) {
957  			log_group(ls, "match_change %d:%u skip cg %u for nack",
958  				  hd->nodeid, seq, cg->seq);
959  			return 0;
960  		}
961  	
962  		node->last_match_seq = cg->seq;
963  	
964  		log_group(ls, "match_change %d:%u matches cg %u", hd->nodeid, seq,
965  			  cg->seq);
966  		return 1;
967  	}
968  	
969  	/* Unfortunately, there's no really simple way to match a message with the
970  	   specific change that it was sent for.  We hope that by passing all the
971  	   details of the change in the message, we will be able to uniquely match the
972  	   it to the correct change. */
973  	
974  	/* A start message will usually be for the first (current) change on our list.
975  	   In some cases it will be for a non-current change, and we can ignore it:
976  	
977  	   1. A,B,C get confchg1 adding C
978  	   2. C sends start for confchg1
979  	   3. A,B,C get confchg2 adding D
980  	   4. A,B,C,D recv start from C for confchg1 - ignored
981  	   5. C,D send start for confchg2
982  	   6. A,B send start for confchg2
983  	   7. A,B,C,D recv all start messages for confchg2, and start kernel
984  	 
985  	   In step 4, how do the nodes know whether the start message from C is
986  	   for confchg1 or confchg2?  Hopefully by comparing the counts and members. */
987  	
988  	static struct change *find_change(struct lockspace *ls, struct dlm_header *hd,
989  					  struct ls_info *li, struct id_info *ids)
990  	{
991  		struct change *cg;
992  	
993  		list_for_each_entry_reverse(cg, &ls->changes, list) {
994  			if (!match_change(ls, cg, hd, li, ids))
995  				continue;
996  			return cg;
997  		}
998  	
999  		log_group(ls, "find_change %d:%u no match", hd->nodeid, hd->msgdata);
1000 		return NULL;
1001 	}
1002 	
1003 	static int is_added(struct lockspace *ls, int nodeid)
1004 	{
1005 		struct change *cg;
1006 		struct member *memb;
1007 	
1008 		list_for_each_entry(cg, &ls->changes, list) {
1009 			memb = find_memb(cg, nodeid);
1010 			if (memb && memb->added)
1011 				return 1;
1012 		}
1013 		return 0;
1014 	}
1015 	
1016 	static void receive_start(struct lockspace *ls, struct dlm_header *hd, int len)
1017 	{
1018 		struct change *cg;
1019 		struct member *memb;
1020 		struct ls_info *li;
1021 		struct id_info *ids;
1022 		uint32_t seq = hd->msgdata;
1023 		int added;
1024 	
1025 		log_group(ls, "receive_start %d:%u len %d", hd->nodeid, seq, len);
1026 	
1027 		li = (struct ls_info *)((char *)hd + sizeof(struct dlm_header));
1028 		ids = (struct id_info *)((char *)li + sizeof(struct ls_info));
1029 	
1030 		ls_info_in(li);
1031 		ids_in(li, ids);
1032 	
1033 		cg = find_change(ls, hd, li, ids);
1034 		if (!cg)
1035 			return;
1036 	
1037 		memb = find_memb(cg, hd->nodeid);
1038 		if (!memb) {
1039 			/* this should never happen since match_change checks it */
1040 			log_error("receive_start no member %d", hd->nodeid);
1041 			return;
1042 		}
1043 	
1044 		memb->start_flags = hd->flags;
1045 	
1046 		added = is_added(ls, hd->nodeid);
1047 	
1048 		if (added && li->started_count && ls->started_count) {
1049 			log_error("receive_start %d:%u add node with started_count %u",
1050 				  hd->nodeid, seq, li->started_count);
1051 	
1052 			/* see comment in fence/fenced/cpg.c */
1053 			memb->disallowed = 1;
1054 			return;
1055 		}
1056 	
1057 		if (memb->start_flags & DLM_MFLG_NACK) {
1058 			log_group(ls, "receive_start %d:%u is NACK", hd->nodeid, seq);
1059 			return;
1060 		}
1061 	
1062 		node_history_start(ls, hd->nodeid);
1063 		memb->start = 1;
1064 	}
1065 	
1066 	static void receive_release_recover(struct lockspace *ls,
1067 					    struct dlm_header *hd, int len)
1068 	{
1069 		uint32_t release_recover = hd->msgdata;
1070 	
1071 		log_dlock(ls, "%s %d: len %d release_recover %u", __func__,
1072 			  hd->nodeid, len, release_recover);
1073 	
1074 		if (hd->nodeid == our_nodeid)
1075 			return;
1076 	
1077 		/* try to set members release recover setting before removing to
1078 		 * tell other nodes the release recover option from the initiator.
1079 		 */
1080 		set_configfs_member_release_recover(ls, hd->nodeid, release_recover);
1081 	}
1082 	
1083 	static void receive_plocks_done(struct lockspace *ls, struct dlm_header *hd,
1084 					int len)
1085 	{
1086 		struct ls_info *li;
1087 		struct id_info *ids;
1088 	
1089 		log_dlock(ls, "receive_plocks_done %d:%u flags %x plocks_data %u need %d save %d",
1090 			  hd->nodeid, hd->msgdata, hd->flags, hd->msgdata2,
1091 			  ls->need_plocks, ls->save_plocks);
1092 	
1093 		if (!ls->need_plocks)
1094 			return;
1095 	
1096 		if (ls->need_plocks && !ls->save_plocks)
1097 			return;
1098 	
1099 		if (!ls->started_change) {
1100 			/* don't think this should happen */
1101 			log_elock(ls, "receive_plocks_done %d:%u no started_change",
1102 				  hd->nodeid, hd->msgdata);
1103 			return;
1104 		}
1105 	
1106 		li = (struct ls_info *)((char *)hd + sizeof(struct dlm_header));
1107 		ids = (struct id_info *)((char *)li + sizeof(struct ls_info));
1108 		ls_info_in(li);
1109 		ids_in(li, ids);
1110 	
1111 		if (!match_change(ls, ls->started_change, hd, li, ids)) {
1112 			/* don't think this should happen */
1113 			log_elock(ls, "receive_plocks_done %d:%u no match_change",
1114 				  hd->nodeid, hd->msgdata);
1115 	
1116 			/* remove/free anything we've saved from
1117 			   receive_plocks_data messages that weren't for us */
1118 			clear_plocks_data(ls);
1119 			return;
1120 		}
1121 	
1122 		if (ls->recv_plocks_data_count != hd->msgdata2) {
1123 			log_elock(ls, "receive_plocks_done plocks_data %u recv %u",
1124 				  hd->msgdata2, ls->recv_plocks_data_count);
1125 		}
1126 	
1127 		process_saved_plocks(ls);
1128 		ls->need_plocks = 0;
1129 		ls->save_plocks = 0;
1130 	
1131 		log_dlock(ls, "receive_plocks_done %d:%u plocks_data_count %u",
1132 			  hd->nodeid, hd->msgdata, ls->recv_plocks_data_count);
1133 	}
1134 	
1135 	static void send_info(struct lockspace *ls, struct change *cg, int type,
1136 			      uint32_t flags, uint32_t msgdata2)
1137 	{
1138 		struct dlm_header *hd;
1139 		struct ls_info *li;
1140 		struct id_info *id;
1141 		struct member *memb;
1142 		char *buf;
1143 		int len, id_count;
1144 	
1145 		id_count = cg->member_count;
1146 	
1147 		len = sizeof(struct dlm_header) + sizeof(struct ls_info) +
1148 		      id_count * sizeof(struct id_info);
1149 	
1150 		buf = malloc(len);
1151 		if (!buf) {
1152 			log_error("send_info len %d no mem", len);
1153 			return;
1154 		}
1155 		memset(buf, 0, len);
1156 	
1157 		hd = (struct dlm_header *)buf;
1158 		li = (struct ls_info *)(buf + sizeof(*hd));
1159 		id = (struct id_info *)(buf + sizeof(*hd) + sizeof(*li));
1160 	
1161 		/* fill in header (dlm_send_message handles part of header) */
1162 	
1163 		hd->type = type;
1164 		hd->msgdata = cg->seq;
1165 		hd->flags = flags;
1166 		hd->msgdata2 = msgdata2;
1167 	
1168 		if (ls->joining)
1169 			hd->flags |= DLM_MFLG_JOINING;
1170 		if (!ls->need_plocks)
1171 			hd->flags |= DLM_MFLG_HAVEPLOCK;
1172 	
1173 		/* fill in ls_info */
1174 	
1175 		li->ls_info_size  = cpu_to_le32(sizeof(struct ls_info));
1176 		li->id_info_size  = cpu_to_le32(sizeof(struct id_info));
1177 		li->id_info_count = cpu_to_le32(id_count);
1178 		li->started_count = cpu_to_le32(ls->started_count);
1179 		li->member_count  = cpu_to_le32(cg->member_count);
1180 		li->joined_count  = cpu_to_le32(cg->joined_count);
1181 		li->remove_count  = cpu_to_le32(cg->remove_count);
1182 		li->failed_count  = cpu_to_le32(cg->failed_count);
1183 	
1184 		/* fill in id_info entries */
1185 	
1186 		list_for_each_entry(memb, &cg->members, list) {
1187 			id->nodeid = cpu_to_le32(memb->nodeid);
1188 			id++;
1189 		}
1190 	
1191 		dlm_send_message(ls, buf, len);
1192 	
1193 		free(buf);
1194 	}
1195 	
1196 	static void send_release_recover_msg(struct lockspace *ls, int type,
1197 					     unsigned long release_recover)
1198 	{
1199 		struct dlm_header *hd;
1200 		char *buf;
1201 		int len;
1202 	
1203 		len = sizeof(struct dlm_header);
1204 	
1205 		buf = malloc(len);
1206 		if (!buf) {
1207 			log_error("send_info len %d no mem", len);
1208 			return;
1209 		}
1210 		memset(buf, 0, len);
1211 	
1212 		hd = (struct dlm_header *)buf;
1213 	
1214 		/* fill in header (dlm_send_message handles part of header) */
1215 	
1216 		hd->type = type;
1217 		hd->msgdata = release_recover;
1218 	
1219 		dlm_send_message(ls, buf, len);
1220 	
1221 		free(buf);
1222 	}
1223 	
1224 	static void send_release_recover(struct lockspace *ls,
1225 					 unsigned long release_recover)
1226 	{
1227 		log_group(ls, "%s %d: counts %u release_recover: %ld", __func__,
1228 			  our_nodeid, ls->started_count, release_recover);
1229 	
1230 		send_release_recover_msg(ls, DLM_MSG_RELEASE_RECOVER, release_recover);
1231 	}
1232 	
1233 	/* fenced used the DUPLICATE_CG flag instead of sending nacks like we
1234 	   do here.  I think the nacks didn't work for fenced for some reason,
1235 	   but I don't remember why (possibly because the node blocked doing
1236 	   the fencing hadn't created the cg to nack yet). */
1237 	
1238 	static void send_start(struct lockspace *ls, struct change *cg)
1239 	{
1240 		log_group(ls, "send_start %d:%u counts %u %d %d %d %d",
1241 			  our_nodeid, cg->seq, ls->started_count,
1242 			  cg->member_count, cg->joined_count, cg->remove_count,
1243 			  cg->failed_count);
1244 	
1245 		send_info(ls, cg, DLM_MSG_START, 0, 0);
1246 	}
1247 	
1248 	static void send_plocks_done(struct lockspace *ls, struct change *cg, uint32_t plocks_data)
1249 	{
1250 		log_dlock(ls, "send_plocks_done %d:%u counts %u %d %d %d %d plocks_data %u",
1251 			  our_nodeid, cg->seq, ls->started_count,
1252 			  cg->member_count, cg->joined_count, cg->remove_count,
1253 			  cg->failed_count, plocks_data);
1254 	
1255 		send_info(ls, cg, DLM_MSG_PLOCKS_DONE, 0, plocks_data);
1256 	}
1257 	
1258 	static int same_members(struct change *cg1, struct change *cg2)
1259 	{
1260 		struct member *memb;
1261 	
1262 		list_for_each_entry(memb, &cg1->members, list) {
1263 			if (!find_memb(cg2, memb->nodeid))
1264 				return 0;
1265 		}
1266 		return 1;
1267 	}
1268 	
1269 	static void send_nacks(struct lockspace *ls, struct change *startcg)
1270 	{
1271 		struct change *cg;
1272 	
1273 		list_for_each_entry(cg, &ls->changes, list) {
1274 			if (cg->seq < startcg->seq &&
1275 			    cg->member_count == startcg->member_count &&
1276 			    cg->joined_count == startcg->joined_count &&
1277 			    cg->remove_count == startcg->remove_count &&
1278 			    cg->failed_count == startcg->failed_count &&
1279 			    same_members(cg, startcg)) {
1280 				log_group(ls, "send nack old cg %u new cg %u",
1281 					   cg->seq, startcg->seq);
1282 				send_info(ls, cg, DLM_MSG_START, DLM_MFLG_NACK, 0);
1283 			}
1284 		}
1285 	}
1286 	
1287 	static int nodes_added(struct lockspace *ls)
1288 	{
1289 		struct change *cg;
1290 	
1291 		list_for_each_entry(cg, &ls->changes, list) {
1292 			if (cg->joined_count)
1293 				return 1;
1294 		}
1295 		return 0;
1296 	}
1297 	
1298 	static void prepare_plocks(struct lockspace *ls)
1299 	{
1300 		struct change *cg = list_first_entry(&ls->changes, struct change, list);
1301 		uint32_t plocks_data = 0;
1302 		struct member *memb;
1303 	
1304 		if (!opt(enable_plock_ind) || ls->disable_plock)
1305 			return;
1306 	
1307 		log_dlock(ls, "prepare_plocks");
1308 	
1309 		/* if we're the only node in the lockspace, then we are the data_node
1310 		   and we don't need plocks */
1311 	
1312 		if (cg->member_count == 1) {
1313 			list_for_each_entry(memb, &cg->members, list) {
1314 				if (memb->nodeid != our_nodeid) {
1315 					log_elock(ls, "prepare_plocks other member %d",
1316 						  memb->nodeid);
1317 				}
1318 			}
1319 			ls->plock_data_node = our_nodeid;
1320 			ls->need_plocks = 0;
1321 			return;
1322 		}
1323 	
1324 		/* the low node that indicated it had plock state in its last
1325 		   start message is the data_node */
1326 	
1327 		set_plock_data_node(ls);
1328 	
1329 		/* there is no node with plock state, so there's no syncing to do */
1330 	
1331 		if (!ls->plock_data_node) {
1332 			ls->need_plocks = 0;
1333 			ls->save_plocks = 0;
1334 			return;
1335 		}
1336 	
1337 		/* We save all plock messages received after our own confchg and
1338 		   apply them after we receive the plocks_done message from the
1339 		   data_node. */
1340 	
1341 		if (ls->need_plocks) {
1342 			log_dlock(ls, "save_plocks start");
1343 			ls->save_plocks = 1;
1344 			return;
1345 		}
1346 	
1347 		if (ls->plock_data_node != our_nodeid)
1348 			return;
1349 	
1350 		if (nodes_added(ls))
1351 			send_all_plocks_data(ls, cg->seq, &plocks_data);
1352 	
1353 		send_plocks_done(ls, cg, plocks_data);
1354 	}
1355 	
1356 	static void apply_changes(struct lockspace *ls)
1357 	{
1358 		struct change *cg;
1359 	
1360 		if (list_empty(&ls->changes))
1361 			return;
1362 		cg = list_first_entry(&ls->changes, struct change, list);
1363 	
1364 		switch (cg->state) {
1365 	
1366 		case CGST_WAIT_CONDITIONS:
1367 			if (wait_conditions_done(ls)) {
1368 				send_nacks(ls, cg);
1369 				send_start(ls, cg);
1370 				cg->state = CGST_WAIT_MESSAGES;
1371 			}
1372 			break;
1373 	
1374 		case CGST_WAIT_MESSAGES:
1375 			if (wait_messages_done(ls)) {
1376 				set_protocol_stateful();
1377 				start_kernel(ls);
1378 				prepare_plocks(ls);
1379 				cleanup_changes(ls);
1380 			}
1381 			break;
1382 	
1383 		default:
1384 			log_error("apply_changes invalid state %d", cg->state);
1385 		}
1386 	}
1387 	
1388 	void process_lockspace_changes(void)
1389 	{
1390 		struct lockspace *ls, *safe;
1391 	
1392 		poll_lockspaces = 0;
1393 		poll_fs = 0;
1394 	
1395 		list_for_each_entry_safe(ls, safe, &lockspaces, list) {
1396 			if (!list_empty(&ls->changes))
1397 				apply_changes(ls);
1398 		}
1399 	}
1400 	
1401 	static int add_change(struct lockspace *ls,
1402 			      const struct cpg_address *member_list,
1403 			      size_t member_list_entries,
1404 			      const struct cpg_address *left_list,
1405 			      size_t left_list_entries,
1406 			      const struct cpg_address *joined_list,
1407 			      size_t joined_list_entries,
1408 			      struct change **cg_out)
1409 	{
1410 		struct change *cg;
1411 		struct member *memb;
1412 		int i, error;
1413 		uint64_t now = monotime();
1414 	
1415 		cg = malloc(sizeof(struct change));
(1) Event cond_true: Condition "!cg", taking true branch.
(2) Event var_compare_op: Comparing "cg" to null implies that "cg" might be null.
Also see events: [var_deref_model]
1416 		if (!cg)
(3) Event goto: Jumping to label "fail_nomem".
1417 			goto fail_nomem;
1418 		memset(cg, 0, sizeof(struct change));
1419 		INIT_LIST_HEAD(&cg->members);
1420 		INIT_LIST_HEAD(&cg->removed);
1421 		cg->state = CGST_WAIT_CONDITIONS;
1422 		cg->create_time = now;
1423 		cg->seq = ++ls->change_seq;
1424 		if (!cg->seq)
1425 			cg->seq = ++ls->change_seq;
1426 	
1427 		cg->member_count = member_list_entries;
1428 		cg->joined_count = joined_list_entries;
1429 		cg->remove_count = left_list_entries;
1430 	
1431 		for (i = 0; i < member_list_entries; i++) {
1432 			memb = malloc(sizeof(struct member));
1433 			if (!memb)
1434 				goto fail_nomem;
1435 			memset(memb, 0, sizeof(struct member));
1436 			memb->nodeid = member_list[i].nodeid;
1437 			list_add_tail(&memb->list, &cg->members);
1438 		}
1439 	
1440 		for (i = 0; i < left_list_entries; i++) {
1441 			memb = malloc(sizeof(struct member));
1442 			if (!memb)
1443 				goto fail_nomem;
1444 			memset(memb, 0, sizeof(struct member));
1445 			memb->nodeid = left_list[i].nodeid;
1446 			if (left_list[i].reason == CPG_REASON_NODEDOWN ||
1447 			    left_list[i].reason == CPG_REASON_PROCDOWN) {
1448 				memb->failed = 1;
1449 				cg->failed_count++;
1450 			}
1451 			list_add_tail(&memb->list, &cg->removed);
1452 	
1453 			if (left_list[i].reason == CPG_REASON_NODEDOWN)
1454 				ls->cpg_ringid_wait = 1;
1455 	
1456 			if (memb->failed) {
1457 				node_history_lockspace_fail(ls, memb->nodeid, cg,
1458 							    left_list[i].reason, now);
1459 			} else {
1460 				node_history_lockspace_left(ls, memb->nodeid, cg, now);
1461 			}
1462 	
1463 			log_group(ls, "add_change cg %u remove nodeid %d reason %s",
1464 				  cg->seq, memb->nodeid, reason_str(left_list[i].reason));
1465 	
1466 			if (left_list[i].reason == CPG_REASON_PROCDOWN)
1467 				kick_node_from_cluster(memb->nodeid);
1468 		}
1469 	
1470 		for (i = 0; i < joined_list_entries; i++) {
1471 			memb = find_memb(cg, joined_list[i].nodeid);
1472 			if (!memb) {
1473 				log_error("no member %d", joined_list[i].nodeid);
1474 				error = -ENOENT;
1475 				goto fail;
1476 			}
1477 			memb->added = 1;
1478 	
1479 			if (memb->nodeid == our_nodeid) {
1480 				cg->we_joined = 1;
1481 			} else {
1482 				node_history_lockspace_add(ls, memb->nodeid, cg, now);
1483 			}
1484 	
1485 			log_group(ls, "add_change cg %u joined nodeid %d", cg->seq,
1486 				  memb->nodeid);
1487 		}
1488 	
1489 		if (cg->we_joined) {
1490 			log_group(ls, "add_change cg %u we joined", cg->seq);
1491 			list_for_each_entry(memb, &cg->members, list) {
1492 				node_history_lockspace_add(ls, memb->nodeid, cg, now);
1493 			}
1494 		}
1495 	
1496 		log_group(ls, "add_change cg %u counts member %d joined %d remove %d "
1497 			  "failed %d", cg->seq, cg->member_count, cg->joined_count,
1498 			  cg->remove_count, cg->failed_count);
1499 	
1500 		list_add(&cg->list, &ls->changes);
1501 		*cg_out = cg;
1502 		return 0;
1503 	
(4) Event label: Reached label "fail_nomem".
1504 	 fail_nomem:
1505 		log_error("no memory");
1506 		error = -ENOMEM;
1507 	 fail:
(5) Event var_deref_model: Passing null pointer "cg" to "free_cg", which dereferences it. [details]
Also see events: [var_compare_op]
1508 		free_cg(cg);
1509 		return error;
1510 	}
1511 	
1512 	static int we_left(const struct cpg_address *left_list,
1513 			   size_t left_list_entries)
1514 	{
1515 		int i;
1516 	
1517 		for (i = 0; i < left_list_entries; i++) {
1518 			if (left_list[i].nodeid == our_nodeid)
1519 				return 1;
1520 		}
1521 		return 0;
1522 	}
1523 	
1524 	static void confchg_cb(cpg_handle_t handle,
1525 			       const struct cpg_name *group_name,
1526 			       const struct cpg_address *member_list,
1527 			       size_t member_list_entries,
1528 			       const struct cpg_address *left_list,
1529 			       size_t left_list_entries,
1530 			       const struct cpg_address *joined_list,
1531 			       size_t joined_list_entries)
1532 	{
1533 		struct lockspace *ls;
1534 		struct change *cg;
1535 		struct member *memb;
1536 		int rv;
1537 	
1538 		log_config(group_name, member_list, member_list_entries,
1539 			   left_list, left_list_entries,
1540 			   joined_list, joined_list_entries);
1541 	
1542 		ls = find_ls_handle(handle);
1543 		if (!ls) {
1544 			log_error("confchg_cb no lockspace for cpg %s",
1545 				  group_name->value);
1546 			return;
1547 		}
1548 	
1549 		if (ls->leaving && we_left(left_list, left_list_entries)) {
1550 			/* we called cpg_leave(), and this should be the final
1551 			   cpg callback we receive */
1552 			log_group(ls, "confchg for our leave");
1553 			stop_kernel(ls, 0);
1554 			set_configfs_members(ls, ls->name, 0, NULL, 0, NULL);
1555 			set_sysfs_event_done(ls->name, 0);
1556 			cpg_finalize(ls->cpg_handle);
1557 			client_dead(ls->cpg_client);
1558 			purge_plocks(ls, our_nodeid, 1);
1559 			list_del(&ls->list);
1560 			free_ls(ls);
1561 			return;
1562 		}
1563 	
1564 		rv = add_change(ls, member_list, member_list_entries,
1565 				left_list, left_list_entries,
1566 				joined_list, joined_list_entries, &cg);
1567 		if (rv)
1568 			return;
1569 	
1570 		stop_kernel(ls, cg->seq);
1571 	
1572 		list_for_each_entry(memb, &cg->removed, list)
1573 			purge_plocks(ls, memb->nodeid, 0);
1574 	
1575 		apply_changes(ls);
1576 	
1577 	#if 0
1578 		deadlk_confchg(ls, member_list, member_list_entries,
1579 			       left_list, left_list_entries,
1580 			       joined_list, joined_list_entries);
1581 	#endif
1582 	}
1583 	
1584 	/* after our join confchg, we want to ignore plock messages (see need_plocks
1585 	   checks below) until the point in time where the ckpt_node saves plock
1586 	   state (final start message received); at this time we want to shift from
1587 	   ignoring plock messages to saving plock messages to apply on top of the
1588 	   plock state that we read. */
1589 	
1590 	static void deliver_cb(cpg_handle_t handle,
1591 			       const struct cpg_name *group_name,
1592 			       uint32_t nodeid, uint32_t pid,
1593 			       void *data, size_t len)
1594 	{
1595 		struct lockspace *ls;
1596 		struct dlm_header *hd;
1597 		int ignore_plock;
1598 		int rv;
1599 	
1600 		int enable_plock = opt(enable_plock_ind);
1601 		int plock_ownership = opt(plock_ownership_ind);
1602 	
1603 		ls = find_ls_handle(handle);
1604 		if (!ls) {
1605 			log_error("deliver_cb no ls for cpg %s", group_name->value);
1606 			return;
1607 		}
1608 	
1609 		if (len < sizeof(struct dlm_header)) {
1610 			log_error("deliver_cb short message %zd", len);
1611 			return;
1612 		}
1613 	
1614 		hd = (struct dlm_header *)data;
1615 		dlm_header_in(hd);
1616 	
1617 		rv = dlm_header_validate(hd, nodeid);
1618 		if (rv < 0)
1619 			return;
1620 	
1621 		ignore_plock = 0;
1622 	
1623 		switch (hd->type) {
1624 		case DLM_MSG_START:
1625 			receive_start(ls, hd, len);
1626 			break;
1627 	
1628 		case DLM_MSG_PLOCK:
1629 			if (ls->disable_plock)
1630 				break;
1631 			if (ls->need_plocks && !ls->save_plocks) {
1632 				ignore_plock = 1;
1633 				break;
1634 			}
1635 			if (enable_plock)
1636 				receive_plock(ls, hd, len);
1637 			else
1638 				log_error("msg %d nodeid %d enable_plock %d",
1639 					  hd->type, nodeid, enable_plock);
1640 			break;
1641 	
1642 		case DLM_MSG_PLOCK_OWN:
1643 			if (ls->disable_plock)
1644 				break;
1645 			if (ls->need_plocks && !ls->save_plocks) {
1646 				ignore_plock = 1;
1647 				break;
1648 			}
1649 			if (enable_plock && plock_ownership)
1650 				receive_own(ls, hd, len);
1651 			else
1652 				log_error("msg %d nodeid %d enable_plock %d owner %d",
1653 					  hd->type, nodeid, enable_plock, plock_ownership);
1654 			break;
1655 	
1656 		case DLM_MSG_PLOCK_DROP:
1657 			if (ls->disable_plock)
1658 				break;
1659 			if (ls->need_plocks && !ls->save_plocks) {
1660 				ignore_plock = 1;
1661 				break;
1662 			}
1663 			if (enable_plock && plock_ownership)
1664 				receive_drop(ls, hd, len);
1665 			else
1666 				log_error("msg %d nodeid %d enable_plock %d owner %d",
1667 					  hd->type, nodeid, enable_plock, plock_ownership);
1668 			break;
1669 	
1670 		case DLM_MSG_PLOCK_SYNC_LOCK:
1671 		case DLM_MSG_PLOCK_SYNC_WAITER:
1672 			if (ls->disable_plock)
1673 				break;
1674 			if (ls->need_plocks && !ls->save_plocks) {
1675 				ignore_plock = 1;
1676 				break;
1677 			}
1678 			if (enable_plock && plock_ownership)
1679 				receive_sync(ls, hd, len);
1680 			else
1681 				log_error("msg %d nodeid %d enable_plock %d owner %d",
1682 					  hd->type, nodeid, enable_plock, plock_ownership);
1683 			break;
1684 	
1685 		case DLM_MSG_PLOCKS_DATA:
1686 			if (ls->disable_plock)
1687 				break;
1688 			if (enable_plock)
1689 				receive_plocks_data(ls, hd, len);
1690 			else
1691 				log_error("msg %d nodeid %d enable_plock %d",
1692 					  hd->type, nodeid, enable_plock);
1693 			break;
1694 	
1695 		case DLM_MSG_PLOCKS_DONE:
1696 			if (ls->disable_plock)
1697 				break;
1698 			if (enable_plock)
1699 				receive_plocks_done(ls, hd, len);
1700 			else
1701 				log_error("msg %d nodeid %d enable_plock %d",
1702 					  hd->type, nodeid, enable_plock);
1703 			break;
1704 	
1705 		case DLM_MSG_RELEASE_RECOVER:
1706 			receive_release_recover(ls, hd, len);
1707 			break;
1708 	
1709 	#if 0
1710 		case DLM_MSG_DEADLK_CYCLE_START:
1711 			if (opt(enable_deadlk))
1712 				receive_cycle_start(ls, hd, len);
1713 			else
1714 				log_error("msg %d nodeid %d enable_deadlk %d",
1715 					  hd->type, nodeid, opt(enable_deadlk));
1716 			break;
1717 	
1718 		case DLM_MSG_DEADLK_CYCLE_END:
1719 			if (opt(enable_deadlk))
1720 				receive_cycle_end(ls, hd, len);
1721 			else
1722 				log_error("msg %d nodeid %d enable_deadlk %d",
1723 					  hd->type, nodeid, opt(enable_deadlk));
1724 			break;
1725 	
1726 		case DLM_MSG_DEADLK_CHECKPOINT_READY:
1727 			if (opt(enable_deadlk))
1728 				receive_checkpoint_ready(ls, hd, len);
1729 			else
1730 				log_error("msg %d nodeid %d enable_deadlk %d",
1731 					  hd->type, nodeid, opt(enable_deadlk));
1732 			break;
1733 	
1734 		case DLM_MSG_DEADLK_CANCEL_LOCK:
1735 			if (opt(enable_deadlk))
1736 				receive_cancel_lock(ls, hd, len);
1737 			else
1738 				log_error("msg %d nodeid %d enable_deadlk %d",
1739 					  hd->type, nodeid, opt(enable_deadlk));
1740 			break;
1741 	#endif
1742 	
1743 		default:
1744 			log_error("unknown msg type %d", hd->type);
1745 		}
1746 	
1747 		if (ignore_plock)
1748 			log_plock(ls, "msg %s nodeid %d need_plock ignore",
1749 				  msg_name(hd->type), nodeid);
1750 	
1751 		apply_changes(ls);
1752 	}
1753 	
1754 	/* save ringid to compare with cman's.
1755 	   also save member_list to double check with cman's member list?
1756 	   they should match */
1757 	
1758 	static void totem_cb(cpg_handle_t handle,
1759 			     struct cpg_ring_id ring_id,
1760 			     uint32_t member_list_entries,
1761 			     const uint32_t *member_list)
1762 	{
1763 		struct lockspace *ls;
1764 		char name[128];
1765 	
1766 		ls = find_ls_handle(handle);
1767 		if (!ls) {
1768 			log_error("totem_cb no lockspace for handle");
1769 			return;
1770 		}
1771 	
1772 		memset(&name, 0, sizeof(name));
1773 		sprintf(name, "dlm:ls:%s", ls->name);
1774 	
1775 		log_ringid(name, &ring_id, member_list, member_list_entries);
1776 	
1777 		ls->cpg_ringid.nodeid = ring_id.nodeid;
1778 		ls->cpg_ringid.seq = ring_id.seq;
1779 		ls->cpg_ringid_wait = 0;
1780 	
1781 		apply_changes(ls);
1782 	}
1783 	
1784 	static cpg_model_v1_data_t cpg_callbacks = {
1785 		.cpg_deliver_fn = deliver_cb,
1786 		.cpg_confchg_fn = confchg_cb,
1787 		.cpg_totem_confchg_fn = totem_cb,
1788 		.flags = CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF,
1789 	};
1790 	
1791 	static void process_cpg_lockspace(int ci)
1792 	{
1793 		struct lockspace *ls;
1794 		cs_error_t error;
1795 	
1796 		ls = find_ls_ci(ci);
1797 		if (!ls) {
1798 			log_error("process_lockspace_cpg no lockspace for ci %d", ci);
1799 			return;
1800 		}
1801 	
1802 		error = cpg_dispatch(ls->cpg_handle, CS_DISPATCH_ALL);
1803 		if (error != CS_OK && error != CS_ERR_BAD_HANDLE) {
1804 			log_error("cpg_dispatch error %d", error);
1805 			return;
1806 		}
1807 	}
1808 	
1809 	/* received an "online" uevent from dlm-kernel */
1810 	
1811 	int dlm_join_lockspace(struct lockspace *ls)
1812 	{
1813 		cs_error_t error;
1814 		cpg_handle_t h;
1815 		struct cpg_name name;
1816 		int i = 0, fd, ci, rv;
1817 	
1818 		error = cpg_model_initialize(&h, CPG_MODEL_V1,
1819 					     (cpg_model_data_t *)&cpg_callbacks, NULL);
1820 		if (error != CS_OK) {
1821 			log_error("cpg_model_initialize error %d", error);
1822 			rv = -1;
1823 			goto fail_free;
1824 		}
1825 	
1826 		cpg_fd_get(h, &fd);
1827 	
1828 		ci = client_add(fd, process_cpg_lockspace, NULL);
1829 	
1830 		list_add(&ls->list, &lockspaces);
1831 	
1832 		ls->cpg_handle = h;
1833 		ls->cpg_client = ci;
1834 		ls->cpg_fd = fd;
1835 		ls->kernel_stopped = 1;
1836 		ls->need_plocks = 1;
1837 		ls->joining = 1;
1838 	
1839 		memset(&name, 0, sizeof(name));
1840 		sprintf(name.value, "dlm:ls:%s", ls->name);
1841 		name.length = strlen(name.value) + 1;
1842 	
1843 		/* TODO: allow global_id to be set in cluster.conf? */
1844 		ls->global_id = cpgname_to_crc(name.value, name.length);
1845 	
1846 		log_group(ls, "cpg_join %s ...", name.value);
1847 	 retry:
1848 		error = cpg_join(h, &name);
1849 		if (error == CS_ERR_TRY_AGAIN) {
1850 			sleep(1);
1851 			if (!(++i % 10))
1852 				log_error("cpg_join error retrying");
1853 			goto retry;
1854 		}
1855 		if (error != CS_OK) {
1856 			log_error("cpg_join error %d", error);
1857 			cpg_finalize(h);
1858 			rv = -1;
1859 			goto fail;
1860 		}
1861 	
1862 		return 0;
1863 	
1864 	 fail:
1865 		list_del(&ls->list);
1866 		client_dead(ci);
1867 		cpg_finalize(h);
1868 	 fail_free:
1869 		set_sysfs_event_done(ls->name, rv);
1870 		free_ls(ls);
1871 		return rv;
1872 	}
1873 	
1874 	/* received an "offline" uevent from dlm-kernel */
1875 	
1876 	int dlm_leave_lockspace(struct lockspace *ls, const char *release_recover_str)
1877 	{
1878 		cs_error_t error;
1879 		struct cpg_name name;
1880 		unsigned long release_recover;
1881 		int i = 0;
1882 	
1883 		if (release_recover_str) {
1884 			release_recover = strtoul(release_recover_str, NULL, 0);
1885 			if (release_recover == ULONG_MAX) {
1886 				log_error("failed to parse release recover: %s",
1887 					  release_recover_str);
1888 				return errno;
1889 			}
1890 	
1891 			send_release_recover(ls, release_recover);
1892 		}
1893 	
1894 		ls->leaving = 1;
1895 	
1896 		memset(&name, 0, sizeof(name));
1897 		sprintf(name.value, "dlm:ls:%s", ls->name);
1898 		name.length = strlen(name.value) + 1;
1899 	
1900 	 retry:
1901 		error = cpg_leave(ls->cpg_handle, &name);
1902 		if (error == CS_ERR_TRY_AGAIN) {
1903 			sleep(1);
1904 			if (!(++i % 10))
1905 				log_error("cpg_leave error retrying");
1906 			goto retry;
1907 		}
1908 		if (error != CS_OK)
1909 			log_error("cpg_leave error %d", error);
1910 	
1911 		return 0;
1912 	}
1913 	
1914 	int set_fs_notified(struct lockspace *ls, int nodeid)
1915 	{
1916 		struct node *node;
1917 	
1918 		/* this shouldn't happen */
1919 		node = get_node_history(ls, nodeid);
1920 		if (!node) {
1921 			log_error("set_fs_notified no nodeid %d", nodeid);
1922 			return -ESRCH;
1923 		}
1924 	
1925 		if (!find_memb(ls->started_change, nodeid)) {
1926 			log_group(ls, "set_fs_notified %d not in ls", nodeid);
1927 			return 0;
1928 		}
1929 	
1930 		/* this can happen, we haven't seen a nodedown for this node yet,
1931 		   but we should soon */
1932 		if (!node->check_fs) {
1933 			log_group(ls, "set_fs_notified %d zero check_fs", nodeid);
1934 			return -EAGAIN;
1935 		}
1936 	
1937 		log_group(ls, "set_fs_notified nodeid %d", nodeid);
1938 		node->fs_notified = 1;
1939 		return 0;
1940 	}
1941 	
1942 	int set_lockspace_info(struct lockspace *ls, struct dlmc_lockspace *lockspace)
1943 	{
1944 		struct change *cg, *last = NULL;
1945 	
1946 		strncpy(lockspace->name, ls->name, DLM_LOCKSPACE_LEN + 1);
1947 		lockspace->name[DLM_LOCKSPACE_LEN] = '\0';
1948 		lockspace->global_id = ls->global_id;
1949 	
1950 		if (ls->joining)
1951 			lockspace->flags |= DLMC_LF_JOINING;
1952 		if (ls->leaving)
1953 			lockspace->flags |= DLMC_LF_LEAVING;
1954 		if (ls->kernel_stopped)
1955 			lockspace->flags |= DLMC_LF_KERNEL_STOPPED;
1956 		if (ls->fs_registered)
1957 			lockspace->flags |= DLMC_LF_FS_REGISTERED;
1958 		if (ls->need_plocks)
1959 			lockspace->flags |= DLMC_LF_NEED_PLOCKS;
1960 		if (ls->save_plocks)
1961 			lockspace->flags |= DLMC_LF_SAVE_PLOCKS;
1962 	
1963 		if (!ls->started_change)
1964 			goto next;
1965 	
1966 		cg = ls->started_change;
1967 	
1968 		lockspace->cg_prev.member_count = cg->member_count;
1969 		lockspace->cg_prev.joined_count = cg->joined_count;
1970 		lockspace->cg_prev.remove_count = cg->remove_count;
1971 		lockspace->cg_prev.failed_count = cg->failed_count;
1972 		lockspace->cg_prev.combined_seq = cg->combined_seq;
1973 		lockspace->cg_prev.seq = cg->seq;
1974 	
1975 	 next:
1976 		if (list_empty(&ls->changes))
1977 			goto out;
1978 	
1979 		list_for_each_entry(cg, &ls->changes, list)
1980 			last = cg;
1981 	
1982 		cg = list_first_entry(&ls->changes, struct change, list);
1983 	
1984 		lockspace->cg_next.member_count = cg->member_count;
1985 		lockspace->cg_next.joined_count = cg->joined_count;
1986 		lockspace->cg_next.remove_count = cg->remove_count;
1987 		lockspace->cg_next.failed_count = cg->failed_count;
1988 		lockspace->cg_next.combined_seq = last->seq;
1989 		lockspace->cg_next.seq = cg->seq;
1990 		lockspace->cg_next.wait_condition = ls->wait_debug;
1991 		if (cg->state == CGST_WAIT_MESSAGES)
1992 			lockspace->cg_next.wait_messages = 1;
1993 	 out:
1994 		return 0;
1995 	}
1996 	
1997 	static int _set_node_info(struct lockspace *ls, struct change *cg, int nodeid,
1998 				  struct dlmc_node *node)
1999 	{
2000 		struct member *m = NULL;
2001 		struct node *n;
2002 	
2003 		node->nodeid = nodeid;
2004 	
2005 		if (cg)
2006 			m = find_memb(cg, nodeid);
2007 		if (!m)
2008 			goto history;
2009 	
2010 		node->flags |= DLMC_NF_MEMBER;
2011 	
2012 		if (m->start)
2013 			node->flags |= DLMC_NF_START;
2014 		if (m->disallowed)
2015 			node->flags |= DLMC_NF_DISALLOWED;
2016 	
2017 	 history:
2018 		n = get_node_history(ls, nodeid);
2019 		if (!n)
2020 			goto out;
2021 	
2022 		if (n->need_fencing)
2023 			node->flags |= DLMC_NF_NEED_FENCING;
2024 		if (n->check_fs)
2025 			node->flags |= DLMC_NF_CHECK_FS;
2026 	
2027 		node->added_seq = n->lockspace_add_seq;
2028 		node->removed_seq = n->lockspace_rem_seq;
2029 	
2030 		node->fail_reason = n->lockspace_fail_reason;
2031 		node->fail_walltime = n->fail_walltime;
2032 		node->fail_monotime = n->fail_monotime;
2033 	 out:
2034 		return 0;
2035 	}
2036 	
2037 	int set_node_info(struct lockspace *ls, int nodeid, struct dlmc_node *node)
2038 	{
2039 		struct change *cg;
2040 	
2041 		if (!list_empty(&ls->changes)) {
2042 			cg = list_first_entry(&ls->changes, struct change, list);
2043 			return _set_node_info(ls, cg, nodeid, node);
2044 		}
2045 	
2046 		return _set_node_info(ls, ls->started_change, nodeid, node);
2047 	}
2048 	
2049 	int set_lockspaces(int *count, struct dlmc_lockspace **lss_out)
2050 	{
2051 		struct lockspace *ls;
2052 		struct dlmc_lockspace *lss, *lsp;
2053 		int ls_count = 0;
2054 	
2055 		list_for_each_entry(ls, &lockspaces, list)
2056 			ls_count++;
2057 	
2058 		lss = malloc(ls_count * sizeof(struct dlmc_lockspace));
2059 		if (!lss)
2060 			return -ENOMEM;
2061 		memset(lss, 0, ls_count * sizeof(struct dlmc_lockspace));
2062 	
2063 		lsp = lss;
2064 		list_for_each_entry(ls, &lockspaces, list) {
2065 			set_lockspace_info(ls, lsp++);
2066 		}
2067 	
2068 		*count = ls_count;
2069 		*lss_out = lss;
2070 		return 0;
2071 	}
2072 	
2073 	int set_lockspace_nodes(struct lockspace *ls, int option, int *node_count,
2074 	                        struct dlmc_node **nodes_out)
2075 	{
2076 		struct change *cg;
2077 		struct node *n;
2078 		struct dlmc_node *nodes = NULL, *nodep;
2079 		struct member *memb;
2080 		int count = 0;
2081 	
2082 		if (option == DLMC_NODES_ALL) {
2083 			if (!list_empty(&ls->changes))
2084 				cg = list_first_entry(&ls->changes, struct change,list);
2085 			else
2086 				cg = ls->started_change;
2087 	
2088 			list_for_each_entry(n, &ls->node_history, list)
2089 				count++;
2090 	
2091 		} else if (option == DLMC_NODES_MEMBERS) {
2092 			if (!ls->started_change)
2093 				goto out;
2094 			cg = ls->started_change;
2095 			count = cg->member_count;
2096 	
2097 		} else if (option == DLMC_NODES_NEXT) {
2098 			if (list_empty(&ls->changes))
2099 				goto out;
2100 			cg = list_first_entry(&ls->changes, struct change, list);
2101 			count = cg->member_count;
2102 		} else
2103 			goto out;
2104 	
2105 		nodes = malloc(count * sizeof(struct dlmc_node));
2106 		if (!nodes)
2107 			return -ENOMEM;
2108 		memset(nodes, 0, count * sizeof(struct dlmc_node));
2109 		nodep = nodes;
2110 	
2111 		if (option == DLMC_NODES_ALL) {
2112 			list_for_each_entry(n, &ls->node_history, list)
2113 				_set_node_info(ls, cg, n->nodeid, nodep++);
2114 		} else {
2115 			list_for_each_entry(memb, &cg->members, list)
2116 				_set_node_info(ls, cg, memb->nodeid, nodep++);
2117 		}
2118 	 out:
2119 		*node_count = count;
2120 		*nodes_out = nodes;
2121 		return 0;
2122 	}
2123 	
2124