1    	/*
2    	 * Copyright 2004-2012 Red Hat, Inc.
3    	 *
4    	 * This copyrighted material is made available to anyone wishing to use,
5    	 * modify, copy, or redistribute it subject to the terms and conditions
6    	 * of the GNU General Public License v2 or (at your option) any later version.
7    	 */
8    	
9    	/*
10   	 * . application in the kernel or userspace asks to join
11   	 *   a lockspace
12   	 *
13   	 * . dlm_new_lockspace() in the kernel sets up basic
14   	 *   lockspace structures, then sends a uevent to
15   	 *   dlm_controld in userspace (KOBJ_ONLINE), asking
16   	 *   dlm_controld to join the corosync group ("cpg")
17   	 *   for the lockspace.  dlm_new_lockspace() blocks
18   	 *   waiting for a reply from dlm_controld (the reply is
19   	 *   a write to the "event_done" sysfs file).
20   	 *   When the successful reply comes back to dlm-kernel
21   	 *   from dlm_controld, dlm-kernel knows it is now a member
22   	 *   of the lockspace membership (represented in userspace
23   	 *   by the corosync cpg), and can do locking with the
24   	 *   other members.  Before sending event_done to the kernel,
25   	 *   dlm_controld tells dlm-kernel who the other lockspace
26   	 *   members are via configfs settings.
27   	 *
28   	 * . When dlm_controld gets a request from dlm-kernel to
29   	 *   join a lockspace, it runs dlm_join_lockspace() which
30   	 *   calls cpg_join() to join the corosync group representing
31   	 *   the members of the lockspace.  dlm_controld will get
32   	 *   callbacks from corosync when membership of this cpg 
33   	 *   changes (joins/leaves/failures).  After calling
34   	 *   cpg_join(), dlm_controld waits for the first
35   	 *   corosync membership callback indicating it is now
36   	 *   a member of the cpg.  The callback function for
37   	 *   cpg membership changes is confchg_cb().  Corosync
38   	 *   guarantees that all members of the cpg see the
39   	 *   same sequence of confchg callbacks (e.g. if a number
40   	 *   of nodes are joining/leaving/failing at once).
41   	 *   When the first confchg arrives after cpg_join(),
42   	 *   dlm_controld sets up the current members for dlm-kernel
43   	 *   via configfs, then writes to event_done in sysfs to start
44   	 *   dlm-kernel running.
45   	 *
46   	 * . When a cpg member joins/leaves/fails, dlm_controld
47   	 *   on all current members gets a confchg callback
48   	 *   showing the new members.  dlm_controld then stops
49   	 *   dlm-kernel locking activity for that lockspace by
50   	 *   writing 0 to the "control" sysfs file.
51   	 *   dlm_controld then sends/recvs cpg messages to all
52   	 *   other cpg members to act as barrier to ensure all
53   	 *   members have stopped locking activity in the kernel
54   	 *   (apply_changes()).  When all have done this,
55   	 *   dlm_controld on all the members then sets up the
56   	 *   new members in the kernel (via configfs) and tells
57   	 *   dlm-kernel to start the lockspace again (start_kernel()).
58   	 *
59   	 * . When dlm-kernel is started after being stopped, it does
60   	 *   lockspace recovery based on changes to the membership.
61   	 *   When recovery is done, normal locking activity resumes.
62   	 *
63   	 * Replacing dlm_controld is a matter doing the following
64   	 * steps by either manually setting up sysfs and configfs,
65   	 * or having a new daemon to do it:
66   	 *
67   	 * - decide who the lockspace members are
68   	 * - stop dlm-kernel before changing lockspace members (write to sysfs)
69   	 * - wait for previous step on all before making changes
70   	 * - tell dlm-kernel member nodeids/IPs in configfs (write to configfs)
71   	 * - start dlm-kernel (write to sysfs)
72   	 *
73   	 * echo 0/1 into /sys/kernel/dlm/foo/control and /sys/kernel/dlm/foo/event_done
74   	 * echo/mkdir/write values into /sys/kernel/config/dlm/cluster/comms/ and
75   	 * /sys/kernel/config/dlm/cluster/spaces/foo/
76   	 */
77   	
78   	#include "dlm_daemon.h"
79   	
80   	#define log_limit(ls, fmt, args...) ({        \
81   		static uint32_t __change_nr;          \
82   		if (ls->change_seq > __change_nr) {   \
83   			__change_nr = ls->change_seq; \
84   			log_group(ls, fmt, ##args);   \
85   		}                                     \
86   	})
87   	
88   	/* retries are once a second */
89   	#define log_retry(ls, fmt, args...) ({ \
90   		if (ls->wait_retry < 60) \
91   			log_group(ls, fmt, ##args); \
92   		else if (ls->wait_retry == 60) \
93   			log_erros(ls, fmt, ##args); \
94   	        else if (!(ls->wait_retry % 3600)) \
95   	                log_erros(ls, fmt, ##args); \
96   	})
97   	
98   	/* per lockspace cpg: ls->node_history */
99   	
100  	struct node {
101  		struct list_head list;
102  		int nodeid;
103  	
104  		uint64_t lockspace_add_time;
105  		uint64_t lockspace_rem_time;
106  		uint64_t lockspace_fail_time;
107  		uint32_t lockspace_add_seq;
108  		uint32_t lockspace_rem_seq;
109  		uint32_t lockspace_fail_seq;
110  		int lockspace_member;
111  		int lockspace_fail_reason;
112  	
113  		uint32_t last_match_seq;
114  	
115  		uint64_t start_time;
116  	
117  		int check_fs;
118  		int fs_notified;
119  	
120  		int need_fencing;
121  		uint32_t fence_queries;	/* for debug */
122  		uint64_t fail_walltime;
123  		uint64_t fail_monotime;
124  	};
125  	
126  	/* per lockspace confchg: ls->changes */
127  	
128  	#define CGST_WAIT_CONDITIONS 1
129  	#define CGST_WAIT_MESSAGES   2
130  	
131  	struct change {
132  		struct list_head list;
133  		struct list_head members;
134  		struct list_head removed; /* nodes removed by this change */
135  		int member_count;
136  		int joined_count;
137  		int remove_count;
138  		int failed_count;
139  		int state;
140  		int we_joined;
141  		uint32_t seq; /* used as a reference for debugging, and for queries */
142  		uint32_t combined_seq; /* for queries */
143  		uint64_t create_time;
144  	};
145  	
146  	/* per lockspace change member: cg->members */
147  	
148  	struct member {
149  		struct list_head list;
150  		int nodeid;
151  		int start;   /* 1 if we received a start message for this change */
152  		int added;   /* 1 if added by this change */
153  		int failed;  /* 1 if failed in this change */
154  		int disallowed;
155  		uint32_t start_flags;
156  	};
157  	
158  	struct ls_info {
159  		uint32_t ls_info_size;
160  		uint32_t id_info_size;
161  		uint32_t id_info_count;
162  	
163  		uint32_t started_count;
164  	
165  		int member_count;
166  		int joined_count;
167  		int remove_count;
168  		int failed_count;
169  	};
170  	
171  	struct id_info {
172  		int nodeid;
173  	};
174  	
175  	static void ls_info_in(struct ls_info *li)
176  	{
177  		li->ls_info_size  = le32_to_cpu(li->ls_info_size);
178  		li->id_info_size  = le32_to_cpu(li->id_info_size);
179  		li->id_info_count = le32_to_cpu(li->id_info_count);
180  		li->started_count = le32_to_cpu(li->started_count);
181  		li->member_count  = le32_to_cpu(li->member_count);
182  		li->joined_count  = le32_to_cpu(li->joined_count);
183  		li->remove_count  = le32_to_cpu(li->remove_count);
184  		li->failed_count  = le32_to_cpu(li->failed_count);
185  	}
186  	
187  	static void id_info_in(struct id_info *id)
188  	{
189  		id->nodeid = le32_to_cpu(id->nodeid);
190  	}
191  	
192  	static void ids_in(struct ls_info *li, struct id_info *ids)
193  	{
194  		struct id_info *id;
195  		int i;
196  	
197  		id = ids;
198  		for (i = 0; i < li->id_info_count; i++) {
199  			id_info_in(id);
200  			id = (struct id_info *)((char *)id + li->id_info_size);
201  		}
202  	}
203  	
204  	static struct member *find_memb(struct change *cg, int nodeid)
205  	{
206  		struct member *memb;
207  	
208  		list_for_each_entry(memb, &cg->members, list) {
209  			if (memb->nodeid == nodeid)
210  				return memb;
211  		}
212  		return NULL;
213  	}
214  	
215  	static struct lockspace *find_ls_handle(cpg_handle_t h)
216  	{
217  		struct lockspace *ls;
218  	
219  		list_for_each_entry(ls, &lockspaces, list) {
220  			if (ls->cpg_handle == h)
221  				return ls;
222  		}
223  		return NULL;
224  	}
225  	
226  	static struct lockspace *find_ls_ci(int ci)
227  	{
228  		struct lockspace *ls;
229  	
230  		list_for_each_entry(ls, &lockspaces, list) {
231  			if (ls->cpg_client == ci)
232  				return ls;
233  		}
234  		return NULL;
235  	}
236  	
237  	static void free_cg(struct change *cg)
238  	{
239  		struct member *memb, *safe;
240  	
(1) Event dereference: Dereferencing pointer "cg".
241  		list_for_each_entry_safe(memb, safe, &cg->members, list) {
242  			list_del(&memb->list);
243  			free(memb);
244  		}
245  		list_for_each_entry_safe(memb, safe, &cg->removed, list) {
246  			list_del(&memb->list);
247  			free(memb);
248  		}
249  		free(cg);
250  	}
251  	
252  	static void free_ls(struct lockspace *ls)
253  	{
254  		struct change *cg, *cg_safe;
255  		struct node *node, *node_safe;
256  	
257  		list_for_each_entry_safe(cg, cg_safe, &ls->changes, list) {
258  			list_del(&cg->list);
259  			free_cg(cg);
260  		}
261  	
262  		if (ls->started_change)
263  			free_cg(ls->started_change);
264  	
265  		list_for_each_entry_safe(node, node_safe, &ls->node_history, list) {
266  			list_del(&node->list);
267  			free(node);
268  		}
269  	
270  		free(ls);
271  	}
272  	
273  	
274  	/* Problem scenario:
275  	   nodes A,B,C are in fence domain
276  	   node C has gfs foo mounted
277  	   node C fails
278  	   nodes A,B begin fencing C (slow, not completed)
279  	   node B mounts gfs foo
280  	
281  	   We may end up having gfs foo mounted and being used on B before
282  	   C has been fenced.  C could wake up corrupt fs.
283  	
284  	   So, we need to prevent any new gfs mounts while there are any
285  	   outstanding, incomplete fencing operations.
286  	
287  	   We also need to check that the specific failed nodes we know about have
288  	   been fenced (since fenced may not even have been notified that the node
289  	   has failed yet).
290  	
291  	   So, check that:
292  	   1. has fenced fenced the node since we saw it fail?
293  	   2. fenced has no outstanding fencing ops
294  	
295  	   For 1:
296  	   - node X fails
297  	   - we see node X fail and X has non-zero start_time,
298  	     set need_fencing and record the fail time
299  	   - wait for X to be removed from all dlm cpg's  (probably not necessary)
300  	   - check that the fencing time is later than the recorded time above
301  	
302  	   Tracking fencing state when there are spurious partitions/merges...
303  	
304  	   from a spurious leave/join of node X, a lockspace will see:
305  	   - node X is a lockspace member
306  	   - node X fails, may be waiting for all cpgs to see failure or for fencing to
307  	     complete
308  	   - node X joins the lockspace - we want to process the change as usual, but
309  	     don't want to disrupt the code waiting for the fencing, and we want to
310  	     continue running properly once the remerged node is properly reset
311  	
312  	   ls->node_history
313  	   when we see a node not in this list, add entry for it with zero start_time
314  	   record the time we get a good start message from the node, start_time
315  	   clear start_time if the node leaves
316  	   if node fails with non-zero start_time, set need_fencing
317  	   when a node is fenced, clear start_time and clear need_fencing
318  	   if a node remerges after this, no good start message, no new start_time set
319  	   if a node fails with zero start_time, it doesn't need fencing
320  	   if a node remerges before it's been fenced, no good start message, no new
321  	   start_time set 
322  	*/
323  	
324  	static struct node *get_node_history(struct lockspace *ls, int nodeid)
325  	{
326  		struct node *node;
327  	
328  		list_for_each_entry(node, &ls->node_history, list) {
329  			if (node->nodeid == nodeid)
330  				return node;
331  		}
332  		return NULL;
333  	}
334  	
335  	static struct node *get_node_history_create(struct lockspace *ls, int nodeid)
336  	{
337  		struct node *node;
338  	
339  		node = get_node_history(ls, nodeid);
340  		if (node)
341  			return node;
342  	
343  		node = malloc(sizeof(struct node));
344  		if (!node)
345  			return NULL;
346  		memset(node, 0, sizeof(struct node));
347  	
348  		node->nodeid = nodeid;
349  		list_add_tail(&node->list, &ls->node_history);
350  		return node;
351  	}
352  	
353  	static void node_history_lockspace_add(struct lockspace *ls, int nodeid,
354  					       struct change *cg, uint64_t now)
355  	{
356  		struct node *node;
357  	
358  		node = get_node_history_create(ls, nodeid);
359  		if (!node) {
360  			log_error("node_history_lockspace_add no nodeid %d", nodeid);
361  			return;
362  		}
363  	
364  		node->lockspace_add_time = now;
365  		node->lockspace_add_seq = cg->seq;
366  		node->lockspace_member = 1;
367  	}
368  	
369  	static void node_history_lockspace_left(struct lockspace *ls, int nodeid,
370  						struct change *cg, uint64_t now)
371  	{
372  		struct node *node;
373  	
374  		node = get_node_history(ls, nodeid);
375  		if (!node) {
376  			log_error("node_history_lockspace_left no nodeid %d", nodeid);
377  			return;
378  		}
379  	
380  		node->start_time = 0;
381  	
382  		node->lockspace_rem_time = now;
383  		node->lockspace_rem_seq = cg->seq;	/* for queries */
384  		node->lockspace_member = 0;
385  	}
386  	
387  	static void node_history_lockspace_fail(struct lockspace *ls, int nodeid,
388  						struct change *cg, int reason,
389  						uint64_t now)
390  	{
391  		struct node *node;
392  	
393  		node = get_node_history(ls, nodeid);
394  		if (!node) {
395  			log_error("node_history_lockspace_fail no nodeid %d", nodeid);
396  			return;
397  		}
398  	
399  		if (opt(enable_fencing_ind) && node->start_time) {
400  			node->need_fencing = 1;
401  			node->fence_queries = 0;
402  		}
403  	
404  		if (ls->fs_registered) {
405  			log_group(ls, "check_fs nodeid %d set", nodeid);
406  			node->check_fs = 1;
407  		}
408  	
409  		node->lockspace_rem_time = now;
410  		node->lockspace_rem_seq = cg->seq;	/* for queries */
411  		node->lockspace_member = 0;
412  		node->lockspace_fail_time = now;
413  		node->lockspace_fail_seq = node->lockspace_rem_seq;
414  		node->lockspace_fail_reason = reason;	/* for queries */
415  	
416  		node->fail_monotime = now;
417  		node->fail_walltime = time(NULL);
418  	}
419  	
420  	static void node_history_start(struct lockspace *ls, int nodeid)
421  	{
422  		struct node *node;
423  		
424  		node = get_node_history(ls, nodeid);
425  		if (!node) {
426  			log_error("node_history_start no nodeid %d", nodeid);
427  			return;
428  		}
429  	
430  		node->start_time = monotime();
431  	}
432  	
433  	/* wait for cluster ringid and cpg ringid to be the same so we know our
434  	   information from each service is based on the same node state */
435  	
436  	static int check_ringid_done(struct lockspace *ls)
437  	{
438  		/* If we've received a confchg due to a nodedown, but not
439  		   the corresponding ringid callback, then we should wait
440  		   for the ringid callback.  Once we have both conf and ring
441  		   callbacks, we can compare cpg/quorum ringids.
442  		   
443  		   Otherwise, there's a possible problem if we receive a
444  		   confchg before both ringid callback and quorum callback.
445  		   Then we'd get through this function by comparing the old,
446  		   matching ringids.
447  	
448  		   (We seem to usually get the quorum callback before any cpg
449  		   callbacks, in which case we wouldn't need cpg_ringid_wait,
450  		   but that's probably not guaranteed.) */
451  	
452  		if (ls->cpg_ringid_wait) {
453  			log_group(ls, "check_ringid wait cluster %llu cpg %u:%llu",
454  				  (unsigned long long)cluster_ringid_seq,
455  				  ls->cpg_ringid.nodeid,
456  				  (unsigned long long)ls->cpg_ringid.seq);
457  			return 0;
458  		}
459  	
460  		if (cluster_ringid_seq != ls->cpg_ringid.seq) {
461  			log_group(ls, "check_ringid cluster %llu cpg %u:%llu",
462  				  (unsigned long long)cluster_ringid_seq,
463  				  ls->cpg_ringid.nodeid,
464  				  (unsigned long long)ls->cpg_ringid.seq);
465  			return 0;
466  		}
467  	
468  		log_limit(ls, "check_ringid done cluster %llu cpg %u:%llu",
469  			  (unsigned long long)cluster_ringid_seq,
470  			  ls->cpg_ringid.nodeid,
471  			  (unsigned long long)ls->cpg_ringid.seq);
472  	
473  		return 1;
474  	}
475  	
476  	static int check_fencing_done(struct lockspace *ls)
477  	{
478  		struct node *node;
479  		uint64_t fence_monotime;
480  		int wait_count = 0;
481  		int rv, in_progress;
482  	
483  		if (!opt(enable_fencing_ind)) {
484  			log_group(ls, "check_fencing disabled");
485  			return 1;
486  		}
487  	
488  		list_for_each_entry(node, &ls->node_history, list) {
489  			if (!node->need_fencing)
490  				continue;
491  	
492  			rv = fence_node_time(node->nodeid, &fence_monotime);
493  			if (rv < 0) {
494  				log_error("fenced_node_time error %d", rv);
495  				continue;
496  			}
497  	
498  			if (fence_monotime >= node->fail_monotime) {
499  				log_group(ls, "check_fencing %d done start %llu fail %llu fence %llu",
500  					  node->nodeid,
501  					  (unsigned long long)node->start_time,
502  					  (unsigned long long)node->fail_monotime,
503  					  (unsigned long long)fence_monotime);
504  	
505  				node->need_fencing = 0;
506  				node->start_time = 0;
507  				continue;
508  			} else {
509  				if (!node->fence_queries) {
510  					log_group(ls, "check_fencing %d wait start %llu fail %llu",
511  						  node->nodeid,
512  						 (unsigned long long)node->start_time,
513  						 (unsigned long long)node->fail_monotime);
514  					node->fence_queries++;
515  				}
516  				wait_count++;
517  				continue;
518  			}
519  		}
520  	
521  		if (wait_count) {
522  			log_limit(ls, "check_fencing wait_count %d", wait_count);
523  			return 0;
524  		}
525  	
526  		/* now check if there are any outstanding fencing ops (for nodes
527  		   we may not have seen in any lockspace), and return 0 if there
528  		   are any */
529  	
530  		rv = fence_in_progress(&in_progress);
531  		if (rv < 0) {
532  			log_error("fenced_domain_info error %d", rv);
533  			return 0;
534  		}
535  	
536  		if (in_progress) {
537  			log_limit(ls, "check_fencing in progress %d", in_progress);
538  			return 0;
539  		}
540  	
541  		log_group(ls, "check_fencing done");
542  		return 1;
543  	}
544  	
545  	/* wait for local fs_controld to ack each failed node */
546  	
547  	static int check_fs_done(struct lockspace *ls)
548  	{
549  		struct node *node;
550  		int wait_count = 0;
551  	
552  		/* no corresponding fs for this lockspace */
553  		if (!ls->fs_registered)
554  			return 1;
555  	
556  		list_for_each_entry(node, &ls->node_history, list) {
557  			if (!node->check_fs)
558  				continue;
559  	
560  			if (node->fs_notified) {
561  				log_group(ls, "check_fs nodeid %d clear", node->nodeid);
562  				node->check_fs = 0;
563  				node->fs_notified = 0;
564  			} else {
565  				log_group(ls, "check_fs nodeid %d needs fs notify",
566  					  node->nodeid);
567  				wait_count++;
568  			}
569  		}
570  	
571  		if (wait_count)
572  			return 0;
573  	
574  		log_group(ls, "check_fs done");
575  		return 1;
576  	}
577  	
578  	static int member_ids[MAX_NODES];
579  	static int member_count;
580  	static int renew_ids[MAX_NODES];
581  	static int renew_count;
582  	
583  	static void format_member_ids(struct lockspace *ls)
584  	{
585  		struct change *cg = list_first_entry(&ls->changes, struct change, list);
586  		struct member *memb;
587  	
588  		memset(member_ids, 0, sizeof(member_ids));
589  		member_count = 0;
590  	
591  		list_for_each_entry(memb, &cg->members, list)
592  			member_ids[member_count++] = memb->nodeid;
593  	}
594  	
595  	/* list of nodeids that have left and rejoined since last start_kernel;
596  	   is any member of startcg in the left list of any other cg's?
597  	   (if it is, then it presumably must be flagged added in another) */
598  	
599  	static void format_renew_ids(struct lockspace *ls)
600  	{
601  		struct change *cg, *startcg;
602  		struct member *memb, *leftmemb;
603  	
604  		startcg = list_first_entry(&ls->changes, struct change, list);
605  	
606  		memset(renew_ids, 0, sizeof(renew_ids));
607  		renew_count = 0;
608  	
609  		list_for_each_entry(memb, &startcg->members, list) {
610  			list_for_each_entry(cg, &ls->changes, list) {
611  				if (cg == startcg)
612  					continue;
613  				list_for_each_entry(leftmemb, &cg->removed, list) {
614  					if (memb->nodeid == leftmemb->nodeid) {
615  						renew_ids[renew_count++] = memb->nodeid;
616  					}
617  				}
618  			}
619  		}
620  	
621  	}
622  	
623  	static void start_kernel(struct lockspace *ls)
624  	{
625  		struct change *cg = list_first_entry(&ls->changes, struct change, list);
626  	
627  		if (!ls->kernel_stopped) {
628  			log_error("start_kernel cg %u not stopped", cg->seq);
629  			return;
630  		}
631  	
632  		log_group(ls, "start_kernel cg %u member_count %d",
633  			  cg->seq, cg->member_count);
634  	
635  		/* needs to happen before setting control which starts recovery */
636  		if (ls->joining)
637  			set_sysfs_id(ls->name, ls->global_id);
638  	
639  		if (ls->nodir)
640  			set_sysfs_nodir(ls->name, 1);
641  	
642  		format_member_ids(ls);
643  		format_renew_ids(ls);
644  		set_configfs_members(ls, ls->name, member_count, member_ids,
645  				     renew_count, renew_ids);
646  		set_sysfs_control(ls->name, 1);
647  		ls->kernel_stopped = 0;
648  	
649  		if (ls->joining) {
650  			set_sysfs_event_done(ls->name, 0);
651  			ls->joining = 0;
652  		}
653  	}
654  	
655  	void cpg_stop_kernel(struct lockspace *ls)
656  	{
657  		if (!ls->kernel_stopped) {
658  			log_group(ls, "%s", __func__);
659  			set_sysfs_control(ls->name, 0);
660  			ls->kernel_stopped = 1;
661  		}
662  	}
663  	
664  	static void stop_kernel(struct lockspace *ls, uint32_t seq)
665  	{
666  		log_group(ls, "%s seq %u", __func__, seq);
667  		cpg_stop_kernel(ls);
668  	}
669  	
670  	/* the first condition is that the local lockspace is stopped which we
671  	   don't need to check for because stop_kernel(), which is synchronous,
672  	   was done when the change was created */
673  	
674  	/* the fencing/quorum/fs conditions need to account for all the changes
675  	   that have occured since the last change applied to dlm-kernel, not
676  	   just the latest change */
677  	
678  	/* we know that the cluster_quorate value here is consistent with the cpg events
679  	   because the ringid's are in sync per the check_ringid_done */
680  	
681  	static int wait_conditions_done(struct lockspace *ls)
682  	{
683  		if (!check_ringid_done(ls)) {
684  			if (ls->wait_debug != DLMC_LS_WAIT_RINGID) {
685  				ls->wait_debug = DLMC_LS_WAIT_RINGID;
686  				ls->wait_retry = 0;
687  			}
688  			ls->wait_retry++;
689  			/* the check function logs a message */
690  	
691  			poll_lockspaces++;
692  			return 0;
693  		}
694  	
695  		if (opt(enable_quorum_lockspace_ind) && !cluster_quorate) {
696  			if (ls->wait_debug != DLMC_LS_WAIT_QUORUM) {
697  				ls->wait_debug = DLMC_LS_WAIT_QUORUM;
698  				ls->wait_retry = 0;
699  			}
700  			ls->wait_retry++;
701  			log_retry(ls, "wait for quorum");
702  	
703  			poll_lockspaces++;
704  			return 0;
705  		}
706  	
707  		if (!check_fencing_done(ls)) {
708  			if (ls->wait_debug != DLMC_LS_WAIT_FENCING) {
709  				ls->wait_debug = DLMC_LS_WAIT_FENCING;
710  				ls->wait_retry = 0;
711  			}
712  			ls->wait_retry++;
713  			log_retry(ls, "wait for fencing");
714  	
715  			poll_lockspaces++;
716  			return 0;
717  		}
718  	
719  		if (!check_fs_done(ls)) {
720  			if (ls->wait_debug != DLMC_LS_WAIT_FSDONE) {
721  				ls->wait_debug = DLMC_LS_WAIT_FSDONE;
722  				ls->wait_retry = 0;
723  			}
724  			ls->wait_retry++;
725  			log_retry(ls, "wait for fsdone");
726  	
727  			poll_fs++;
728  			return 0;
729  		}
730  	
731  		ls->wait_debug = 0;
732  		ls->wait_retry = 0;
733  	
734  		return 1;
735  	}
736  	
737  	static int wait_messages_done(struct lockspace *ls)
738  	{
739  		struct change *cg = list_first_entry(&ls->changes, struct change, list);
740  		struct member *memb;
741  		int need = 0, total = 0;
742  	
743  		list_for_each_entry(memb, &cg->members, list) {
744  			if (!memb->start)
745  				need++;
746  			total++;
747  		}
748  	
749  		if (need) {
750  			log_group(ls, "wait_messages cg %u need %d of %d",
751  				  cg->seq, need, total);
752  			ls->wait_debug = need;
753  			return 0;
754  		}
755  	
756  		log_group(ls, "wait_messages cg %u got all %d", cg->seq, total);
757  	
758  		ls->wait_debug = 0;
759  	
760  		return 1;
761  	}
762  	
763  	static void cleanup_changes(struct lockspace *ls)
764  	{
765  		struct change *cg = list_first_entry(&ls->changes, struct change, list);
766  		struct change *safe;
767  	
768  		list_del(&cg->list);
769  		if (ls->started_change)
770  			free_cg(ls->started_change);
771  		ls->started_change = cg;
772  	
773  		ls->started_count++;
774  		if (!ls->started_count)
775  			ls->started_count++;
776  	
777  		cg->combined_seq = cg->seq; /* for queries */
778  	
779  		list_for_each_entry_safe(cg, safe, &ls->changes, list) {
780  			ls->started_change->combined_seq = cg->seq; /* for queries */
781  			list_del(&cg->list);
782  			free_cg(cg);
783  		}
784  	}
785  	
786  	/* There's a stream of confchg and messages. At one of these
787  	   messages, the low node needs to store plocks and new nodes
788  	   need to begin saving plock messages.  A second message is
789  	   needed to say that the plocks are ready to be read.
790  	
791  	   When the last start message is recvd for a change, the low node
792  	   stores plocks and the new nodes begin saving messages.  When the
793  	   store is done, low node sends plocks_stored message.  When
794  	   new nodes recv this, they read the plocks and their saved messages.
795  	   plocks_stored message should identify a specific change, like start
796  	   messages do; if it doesn't match ls->started_change, then it's ignored.
797  	
798  	   If a confchg adding a new node arrives after plocks are stored but
799  	   before plocks_stored msg recvd, then the message is ignored.  The low
800  	   node will send another plocks_stored message for the latest change
801  	   (although it may be able to reuse the ckpt if no plock state has changed).
802  	*/
803  	
804  	static void set_plock_data_node(struct lockspace *ls)
805  	{
806  		struct change *cg = list_first_entry(&ls->changes, struct change, list);
807  		struct member *memb;
808  		int low = 0;
809  	
810  		list_for_each_entry(memb, &cg->members, list) {
811  			if (!(memb->start_flags & DLM_MFLG_HAVEPLOCK))
812  				continue;
813  	
814  			if (!low || memb->nodeid < low)
815  				low = memb->nodeid;
816  		}
817  	
818  		log_dlock(ls, "set_plock_data_node from %d to %d",
819  			  ls->plock_data_node, low);
820  	
821  		ls->plock_data_node = low;
822  	}
823  	
824  	static struct id_info *get_id_struct(struct id_info *ids, int count, int size,
825  					     int nodeid)
826  	{
827  		struct id_info *id = ids;
828  		int i;
829  	
830  		for (i = 0; i < count; i++) {
831  			if (id->nodeid == nodeid)
832  				return id;
833  			id = (struct id_info *)((char *)id + size);
834  		}
835  		return NULL;
836  	}
837  	
838  	/* do the change details in the message match the details of the given change */
839  	
840  	static int match_change(struct lockspace *ls, struct change *cg,
841  				struct dlm_header *hd, struct ls_info *li,
842  				struct id_info *ids)
843  	{
844  		struct id_info *id;
845  		struct member *memb;
846  		struct node *node;
847  		uint64_t t;
848  		uint32_t seq = hd->msgdata;
849  		int i, members_mismatch;
850  	
851  		/* We can ignore messages if we're not in the list of members.
852  		   The one known time this will happen is after we've joined
853  		   the cpg, we can get messages for changes prior to the change
854  		   in which we're added. */
855  	
856  		id = get_id_struct(ids, li->id_info_count, li->id_info_size,our_nodeid);
857  	
858  		if (!id) {
859  			log_group(ls, "match_change %d:%u skip %u we are not in members",
860  				  hd->nodeid, seq, cg->seq);
861  			return 0;
862  		}
863  	
864  		memb = find_memb(cg, hd->nodeid);
865  		if (!memb) {
866  			log_group(ls, "match_change %d:%u skip %u sender not member",
867  				  hd->nodeid, seq, cg->seq);
868  			return 0;
869  		}
870  	
871  		if (memb->start_flags & DLM_MFLG_NACK) {
872  			log_group(ls, "match_change %d:%u skip %u is nacked",
873  				  hd->nodeid, seq, cg->seq);
874  			return 0;
875  		}
876  	
877  		if (memb->start && hd->type == DLM_MSG_START) {
878  			log_group(ls, "match_change %d:%u skip %u already start",
879  				  hd->nodeid, seq, cg->seq);
880  			return 0;
881  		}
882  	
883  		/* a node's start can't match a change if the node joined the cluster
884  		   more recently than the change was created */
885  	
886  		node = get_node_history(ls, hd->nodeid);
887  		if (!node) {
888  			log_group(ls, "match_change %d:%u skip cg %u no node history",
889  				  hd->nodeid, seq, cg->seq);
890  			return 0;
891  		}
892  	
893  		t = cluster_add_time(node->nodeid);
894  		if (t > cg->create_time) {
895  			log_group(ls, "match_change %d:%u skip cg %u created %llu "
896  				  "cluster add %llu", hd->nodeid, seq, cg->seq,
897  				  (unsigned long long)cg->create_time,
898  				  (unsigned long long)t);
899  	
900  			/* nacks can apply to older cg's */
901  			if (!(hd->flags & DLM_MFLG_NACK)) {
902  				return 0;
903  			} else {
904  				log_group(ls, "match_change %d:%u unskip cg %u for nack",
905  					  hd->nodeid, seq, cg->seq);
906  			}
907  		}
908  	
909  		if (node->last_match_seq > cg->seq) {
910  			log_group(ls, "match_change %d:%u skip cg %u last matched cg %u",
911  				  hd->nodeid, seq, cg->seq, node->last_match_seq);
912  			return 0;
913  		}
914  	
915  		/* verify this is the right change by matching the counts
916  		   and the nodeids of the current members */
917  	
918  		if (li->member_count != cg->member_count ||
919  		    li->joined_count != cg->joined_count ||
920  		    li->remove_count != cg->remove_count ||
921  		    li->failed_count != cg->failed_count) {
922  			log_group(ls, "match_change %d:%u skip %u expect counts "
923  				  "%d %d %d %d", hd->nodeid, seq, cg->seq,
924  				  cg->member_count, cg->joined_count,
925  				  cg->remove_count, cg->failed_count);
926  			return 0;
927  		}
928  	
929  		members_mismatch = 0;
930  		id = ids;
931  	
932  		for (i = 0; i < li->id_info_count; i++) {
933  			memb = find_memb(cg, id->nodeid);
934  			if (!memb) {
935  				log_group(ls, "match_change %d:%u skip %u no memb %d",
936  				  	  hd->nodeid, seq, cg->seq, id->nodeid);
937  				members_mismatch = 1;
938  				break;
939  			}
940  			id = (struct id_info *)((char *)id + li->id_info_size);
941  		}
942  	
943  		if (members_mismatch)
944  			return 0;
945  	
946  		/* Not completely sure if this is a valid assertion or not, i.e. not
947  		   sure if we really never want to nack our first and only cg.  I have
948  		   seen one case in which a node incorrectly accepted nacks for cg seq
949  		   1 and ls change_seq 1.  (It was the secondary effect of another bug.)
950  	
951  		   Or, it's possible that this should apply a little more broadly as:
952  		   don't nack our most recent cg, i.e. cg->seq == ls->change_seq (1 or
953  		   otherwise).  I'm hoping to find a test case that will exercise this
954  		   to clarify the situation here, and then update this comment. */
955  	
956  		if (cg->seq == 1 && ls->change_seq == 1 && (hd->flags & DLM_MFLG_NACK)) {
957  			log_group(ls, "match_change %d:%u skip cg %u for nack",
958  				  hd->nodeid, seq, cg->seq);
959  			return 0;
960  		}
961  	
962  		node->last_match_seq = cg->seq;
963  	
964  		log_group(ls, "match_change %d:%u matches cg %u", hd->nodeid, seq,
965  			  cg->seq);
966  		return 1;
967  	}
968  	
969  	/* Unfortunately, there's no really simple way to match a message with the
970  	   specific change that it was sent for.  We hope that by passing all the
971  	   details of the change in the message, we will be able to uniquely match the
972  	   it to the correct change. */
973  	
974  	/* A start message will usually be for the first (current) change on our list.
975  	   In some cases it will be for a non-current change, and we can ignore it:
976  	
977  	   1. A,B,C get confchg1 adding C
978  	   2. C sends start for confchg1
979  	   3. A,B,C get confchg2 adding D
980  	   4. A,B,C,D recv start from C for confchg1 - ignored
981  	   5. C,D send start for confchg2
982  	   6. A,B send start for confchg2
983  	   7. A,B,C,D recv all start messages for confchg2, and start kernel
984  	 
985  	   In step 4, how do the nodes know whether the start message from C is
986  	   for confchg1 or confchg2?  Hopefully by comparing the counts and members. */
987  	
988  	static struct change *find_change(struct lockspace *ls, struct dlm_header *hd,
989  					  struct ls_info *li, struct id_info *ids)
990  	{
991  		struct change *cg;
992  	
993  		list_for_each_entry_reverse(cg, &ls->changes, list) {
994  			if (!match_change(ls, cg, hd, li, ids))
995  				continue;
996  			return cg;
997  		}
998  	
999  		log_group(ls, "find_change %d:%u no match", hd->nodeid, hd->msgdata);
1000 		return NULL;
1001 	}
1002 	
1003 	static int is_added(struct lockspace *ls, int nodeid)
1004 	{
1005 		struct change *cg;
1006 		struct member *memb;
1007 	
1008 		list_for_each_entry(cg, &ls->changes, list) {
1009 			memb = find_memb(cg, nodeid);
1010 			if (memb && memb->added)
1011 				return 1;
1012 		}
1013 		return 0;
1014 	}
1015 	
1016 	static void receive_start(struct lockspace *ls, struct dlm_header *hd, int len)
1017 	{
1018 		struct change *cg;
1019 		struct member *memb;
1020 		struct ls_info *li;
1021 		struct id_info *ids;
1022 		uint32_t seq = hd->msgdata;
1023 		int added;
1024 	
1025 		log_group(ls, "receive_start %d:%u len %d", hd->nodeid, seq, len);
1026 	
1027 		li = (struct ls_info *)((char *)hd + sizeof(struct dlm_header));
1028 		ids = (struct id_info *)((char *)li + sizeof(struct ls_info));
1029 	
1030 		ls_info_in(li);
1031 		ids_in(li, ids);
1032 	
1033 		cg = find_change(ls, hd, li, ids);
1034 		if (!cg)
1035 			return;
1036 	
1037 		memb = find_memb(cg, hd->nodeid);
1038 		if (!memb) {
1039 			/* this should never happen since match_change checks it */
1040 			log_error("receive_start no member %d", hd->nodeid);
1041 			return;
1042 		}
1043 	
1044 		memb->start_flags = hd->flags;
1045 	
1046 		added = is_added(ls, hd->nodeid);
1047 	
1048 		if (added && li->started_count && ls->started_count) {
1049 			log_error("receive_start %d:%u add node with started_count %u",
1050 				  hd->nodeid, seq, li->started_count);
1051 	
1052 			/* see comment in fence/fenced/cpg.c */
1053 			memb->disallowed = 1;
1054 			return;
1055 		}
1056 	
1057 		if (memb->start_flags & DLM_MFLG_NACK) {
1058 			log_group(ls, "receive_start %d:%u is NACK", hd->nodeid, seq);
1059 			return;
1060 		}
1061 	
1062 		node_history_start(ls, hd->nodeid);
1063 		memb->start = 1;
1064 	}
1065 	
1066 	static void receive_plocks_done(struct lockspace *ls, struct dlm_header *hd,
1067 					int len)
1068 	{
1069 		struct ls_info *li;
1070 		struct id_info *ids;
1071 	
1072 		log_dlock(ls, "receive_plocks_done %d:%u flags %x plocks_data %u need %d save %d",
1073 			  hd->nodeid, hd->msgdata, hd->flags, hd->msgdata2,
1074 			  ls->need_plocks, ls->save_plocks);
1075 	
1076 		if (!ls->need_plocks)
1077 			return;
1078 	
1079 		if (ls->need_plocks && !ls->save_plocks)
1080 			return;
1081 	
1082 		if (!ls->started_change) {
1083 			/* don't think this should happen */
1084 			log_elock(ls, "receive_plocks_done %d:%u no started_change",
1085 				  hd->nodeid, hd->msgdata);
1086 			return;
1087 		}
1088 	
1089 		li = (struct ls_info *)((char *)hd + sizeof(struct dlm_header));
1090 		ids = (struct id_info *)((char *)li + sizeof(struct ls_info));
1091 		ls_info_in(li);
1092 		ids_in(li, ids);
1093 	
1094 		if (!match_change(ls, ls->started_change, hd, li, ids)) {
1095 			/* don't think this should happen */
1096 			log_elock(ls, "receive_plocks_done %d:%u no match_change",
1097 				  hd->nodeid, hd->msgdata);
1098 	
1099 			/* remove/free anything we've saved from
1100 			   receive_plocks_data messages that weren't for us */
1101 			clear_plocks_data(ls);
1102 			return;
1103 		}
1104 	
1105 		if (ls->recv_plocks_data_count != hd->msgdata2) {
1106 			log_elock(ls, "receive_plocks_done plocks_data %u recv %u",
1107 				  hd->msgdata2, ls->recv_plocks_data_count);
1108 		}
1109 	
1110 		process_saved_plocks(ls);
1111 		ls->need_plocks = 0;
1112 		ls->save_plocks = 0;
1113 	
1114 		log_dlock(ls, "receive_plocks_done %d:%u plocks_data_count %u",
1115 			  hd->nodeid, hd->msgdata, ls->recv_plocks_data_count);
1116 	}
1117 	
1118 	static void send_info(struct lockspace *ls, struct change *cg, int type,
1119 			      uint32_t flags, uint32_t msgdata2)
1120 	{
1121 		struct dlm_header *hd;
1122 		struct ls_info *li;
1123 		struct id_info *id;
1124 		struct member *memb;
1125 		char *buf;
1126 		int len, id_count;
1127 	
1128 		id_count = cg->member_count;
1129 	
1130 		len = sizeof(struct dlm_header) + sizeof(struct ls_info) +
1131 		      id_count * sizeof(struct id_info);
1132 	
1133 		buf = malloc(len);
1134 		if (!buf) {
1135 			log_error("send_info len %d no mem", len);
1136 			return;
1137 		}
1138 		memset(buf, 0, len);
1139 	
1140 		hd = (struct dlm_header *)buf;
1141 		li = (struct ls_info *)(buf + sizeof(*hd));
1142 		id = (struct id_info *)(buf + sizeof(*hd) + sizeof(*li));
1143 	
1144 		/* fill in header (dlm_send_message handles part of header) */
1145 	
1146 		hd->type = type;
1147 		hd->msgdata = cg->seq;
1148 		hd->flags = flags;
1149 		hd->msgdata2 = msgdata2;
1150 	
1151 		if (ls->joining)
1152 			hd->flags |= DLM_MFLG_JOINING;
1153 		if (!ls->need_plocks)
1154 			hd->flags |= DLM_MFLG_HAVEPLOCK;
1155 	
1156 		/* fill in ls_info */
1157 	
1158 		li->ls_info_size  = cpu_to_le32(sizeof(struct ls_info));
1159 		li->id_info_size  = cpu_to_le32(sizeof(struct id_info));
1160 		li->id_info_count = cpu_to_le32(id_count);
1161 		li->started_count = cpu_to_le32(ls->started_count);
1162 		li->member_count  = cpu_to_le32(cg->member_count);
1163 		li->joined_count  = cpu_to_le32(cg->joined_count);
1164 		li->remove_count  = cpu_to_le32(cg->remove_count);
1165 		li->failed_count  = cpu_to_le32(cg->failed_count);
1166 	
1167 		/* fill in id_info entries */
1168 	
1169 		list_for_each_entry(memb, &cg->members, list) {
1170 			id->nodeid = cpu_to_le32(memb->nodeid);
1171 			id++;
1172 		}
1173 	
1174 		dlm_send_message(ls, buf, len);
1175 	
1176 		free(buf);
1177 	}
1178 	
1179 	/* fenced used the DUPLICATE_CG flag instead of sending nacks like we
1180 	   do here.  I think the nacks didn't work for fenced for some reason,
1181 	   but I don't remember why (possibly because the node blocked doing
1182 	   the fencing hadn't created the cg to nack yet). */
1183 	
1184 	static void send_start(struct lockspace *ls, struct change *cg)
1185 	{
1186 		log_group(ls, "send_start %d:%u counts %u %d %d %d %d",
1187 			  our_nodeid, cg->seq, ls->started_count,
1188 			  cg->member_count, cg->joined_count, cg->remove_count,
1189 			  cg->failed_count);
1190 	
1191 		send_info(ls, cg, DLM_MSG_START, 0, 0);
1192 	}
1193 	
1194 	static void send_plocks_done(struct lockspace *ls, struct change *cg, uint32_t plocks_data)
1195 	{
1196 		log_dlock(ls, "send_plocks_done %d:%u counts %u %d %d %d %d plocks_data %u",
1197 			  our_nodeid, cg->seq, ls->started_count,
1198 			  cg->member_count, cg->joined_count, cg->remove_count,
1199 			  cg->failed_count, plocks_data);
1200 	
1201 		send_info(ls, cg, DLM_MSG_PLOCKS_DONE, 0, plocks_data);
1202 	}
1203 	
1204 	static int same_members(struct change *cg1, struct change *cg2)
1205 	{
1206 		struct member *memb;
1207 	
1208 		list_for_each_entry(memb, &cg1->members, list) {
1209 			if (!find_memb(cg2, memb->nodeid))
1210 				return 0;
1211 		}
1212 		return 1;
1213 	}
1214 	
1215 	static void send_nacks(struct lockspace *ls, struct change *startcg)
1216 	{
1217 		struct change *cg;
1218 	
1219 		list_for_each_entry(cg, &ls->changes, list) {
1220 			if (cg->seq < startcg->seq &&
1221 			    cg->member_count == startcg->member_count &&
1222 			    cg->joined_count == startcg->joined_count &&
1223 			    cg->remove_count == startcg->remove_count &&
1224 			    cg->failed_count == startcg->failed_count &&
1225 			    same_members(cg, startcg)) {
1226 				log_group(ls, "send nack old cg %u new cg %u",
1227 					   cg->seq, startcg->seq);
1228 				send_info(ls, cg, DLM_MSG_START, DLM_MFLG_NACK, 0);
1229 			}
1230 		}
1231 	}
1232 	
1233 	static int nodes_added(struct lockspace *ls)
1234 	{
1235 		struct change *cg;
1236 	
1237 		list_for_each_entry(cg, &ls->changes, list) {
1238 			if (cg->joined_count)
1239 				return 1;
1240 		}
1241 		return 0;
1242 	}
1243 	
1244 	static void prepare_plocks(struct lockspace *ls)
1245 	{
1246 		struct change *cg = list_first_entry(&ls->changes, struct change, list);
1247 		uint32_t plocks_data = 0;
1248 		struct member *memb;
1249 	
1250 		if (!opt(enable_plock_ind) || ls->disable_plock)
1251 			return;
1252 	
1253 		log_dlock(ls, "prepare_plocks");
1254 	
1255 		/* if we're the only node in the lockspace, then we are the data_node
1256 		   and we don't need plocks */
1257 	
1258 		if (cg->member_count == 1) {
1259 			list_for_each_entry(memb, &cg->members, list) {
1260 				if (memb->nodeid != our_nodeid) {
1261 					log_elock(ls, "prepare_plocks other member %d",
1262 						  memb->nodeid);
1263 				}
1264 			}
1265 			ls->plock_data_node = our_nodeid;
1266 			ls->need_plocks = 0;
1267 			return;
1268 		}
1269 	
1270 		/* the low node that indicated it had plock state in its last
1271 		   start message is the data_node */
1272 	
1273 		set_plock_data_node(ls);
1274 	
1275 		/* there is no node with plock state, so there's no syncing to do */
1276 	
1277 		if (!ls->plock_data_node) {
1278 			ls->need_plocks = 0;
1279 			ls->save_plocks = 0;
1280 			return;
1281 		}
1282 	
1283 		/* We save all plock messages received after our own confchg and
1284 		   apply them after we receive the plocks_done message from the
1285 		   data_node. */
1286 	
1287 		if (ls->need_plocks) {
1288 			log_dlock(ls, "save_plocks start");
1289 			ls->save_plocks = 1;
1290 			return;
1291 		}
1292 	
1293 		if (ls->plock_data_node != our_nodeid)
1294 			return;
1295 	
1296 		if (nodes_added(ls))
1297 			send_all_plocks_data(ls, cg->seq, &plocks_data);
1298 	
1299 		send_plocks_done(ls, cg, plocks_data);
1300 	}
1301 	
1302 	static void apply_changes(struct lockspace *ls)
1303 	{
1304 		struct change *cg;
1305 	
1306 		if (list_empty(&ls->changes))
1307 			return;
1308 		cg = list_first_entry(&ls->changes, struct change, list);
1309 	
1310 		switch (cg->state) {
1311 	
1312 		case CGST_WAIT_CONDITIONS:
1313 			if (wait_conditions_done(ls)) {
1314 				send_nacks(ls, cg);
1315 				send_start(ls, cg);
1316 				cg->state = CGST_WAIT_MESSAGES;
1317 			}
1318 			break;
1319 	
1320 		case CGST_WAIT_MESSAGES:
1321 			if (wait_messages_done(ls)) {
1322 				set_protocol_stateful();
1323 				start_kernel(ls);
1324 				prepare_plocks(ls);
1325 				cleanup_changes(ls);
1326 			}
1327 			break;
1328 	
1329 		default:
1330 			log_error("apply_changes invalid state %d", cg->state);
1331 		}
1332 	}
1333 	
1334 	void process_lockspace_changes(void)
1335 	{
1336 		struct lockspace *ls, *safe;
1337 	
1338 		poll_lockspaces = 0;
1339 		poll_fs = 0;
1340 	
1341 		list_for_each_entry_safe(ls, safe, &lockspaces, list) {
1342 			if (!list_empty(&ls->changes))
1343 				apply_changes(ls);
1344 		}
1345 	}
1346 	
1347 	static int add_change(struct lockspace *ls,
1348 			      const struct cpg_address *member_list,
1349 			      size_t member_list_entries,
1350 			      const struct cpg_address *left_list,
1351 			      size_t left_list_entries,
1352 			      const struct cpg_address *joined_list,
1353 			      size_t joined_list_entries,
1354 			      struct change **cg_out)
1355 	{
1356 		struct change *cg;
1357 		struct member *memb;
1358 		int i, error;
1359 		uint64_t now = monotime();
1360 	
1361 		cg = malloc(sizeof(struct change));
(1) Event cond_true: Condition "!cg", taking true branch.
(2) Event var_compare_op: Comparing "cg" to null implies that "cg" might be null.
Also see events: [var_deref_model]
1362 		if (!cg)
(3) Event goto: Jumping to label "fail_nomem".
1363 			goto fail_nomem;
1364 		memset(cg, 0, sizeof(struct change));
1365 		INIT_LIST_HEAD(&cg->members);
1366 		INIT_LIST_HEAD(&cg->removed);
1367 		cg->state = CGST_WAIT_CONDITIONS;
1368 		cg->create_time = now;
1369 		cg->seq = ++ls->change_seq;
1370 		if (!cg->seq)
1371 			cg->seq = ++ls->change_seq;
1372 	
1373 		cg->member_count = member_list_entries;
1374 		cg->joined_count = joined_list_entries;
1375 		cg->remove_count = left_list_entries;
1376 	
1377 		for (i = 0; i < member_list_entries; i++) {
1378 			memb = malloc(sizeof(struct member));
1379 			if (!memb)
1380 				goto fail_nomem;
1381 			memset(memb, 0, sizeof(struct member));
1382 			memb->nodeid = member_list[i].nodeid;
1383 			list_add_tail(&memb->list, &cg->members);
1384 		}
1385 	
1386 		for (i = 0; i < left_list_entries; i++) {
1387 			memb = malloc(sizeof(struct member));
1388 			if (!memb)
1389 				goto fail_nomem;
1390 			memset(memb, 0, sizeof(struct member));
1391 			memb->nodeid = left_list[i].nodeid;
1392 			if (left_list[i].reason == CPG_REASON_NODEDOWN ||
1393 			    left_list[i].reason == CPG_REASON_PROCDOWN) {
1394 				memb->failed = 1;
1395 				cg->failed_count++;
1396 			}
1397 			list_add_tail(&memb->list, &cg->removed);
1398 	
1399 			if (left_list[i].reason == CPG_REASON_NODEDOWN)
1400 				ls->cpg_ringid_wait = 1;
1401 	
1402 			if (memb->failed) {
1403 				node_history_lockspace_fail(ls, memb->nodeid, cg,
1404 							    left_list[i].reason, now);
1405 			} else {
1406 				node_history_lockspace_left(ls, memb->nodeid, cg, now);
1407 			}
1408 	
1409 			log_group(ls, "add_change cg %u remove nodeid %d reason %s",
1410 				  cg->seq, memb->nodeid, reason_str(left_list[i].reason));
1411 	
1412 			if (left_list[i].reason == CPG_REASON_PROCDOWN)
1413 				kick_node_from_cluster(memb->nodeid);
1414 		}
1415 	
1416 		for (i = 0; i < joined_list_entries; i++) {
1417 			memb = find_memb(cg, joined_list[i].nodeid);
1418 			if (!memb) {
1419 				log_error("no member %d", joined_list[i].nodeid);
1420 				error = -ENOENT;
1421 				goto fail;
1422 			}
1423 			memb->added = 1;
1424 	
1425 			if (memb->nodeid == our_nodeid) {
1426 				cg->we_joined = 1;
1427 			} else {
1428 				node_history_lockspace_add(ls, memb->nodeid, cg, now);
1429 			}
1430 	
1431 			log_group(ls, "add_change cg %u joined nodeid %d", cg->seq,
1432 				  memb->nodeid);
1433 		}
1434 	
1435 		if (cg->we_joined) {
1436 			log_group(ls, "add_change cg %u we joined", cg->seq);
1437 			list_for_each_entry(memb, &cg->members, list) {
1438 				node_history_lockspace_add(ls, memb->nodeid, cg, now);
1439 			}
1440 		}
1441 	
1442 		log_group(ls, "add_change cg %u counts member %d joined %d remove %d "
1443 			  "failed %d", cg->seq, cg->member_count, cg->joined_count,
1444 			  cg->remove_count, cg->failed_count);
1445 	
1446 		list_add(&cg->list, &ls->changes);
1447 		*cg_out = cg;
1448 		return 0;
1449 	
(4) Event label: Reached label "fail_nomem".
1450 	 fail_nomem:
1451 		log_error("no memory");
1452 		error = -ENOMEM;
1453 	 fail:
(5) Event var_deref_model: Passing null pointer "cg" to "free_cg", which dereferences it. [details]
Also see events: [var_compare_op]
1454 		free_cg(cg);
1455 		return error;
1456 	}
1457 	
1458 	static int we_left(const struct cpg_address *left_list,
1459 			   size_t left_list_entries)
1460 	{
1461 		int i;
1462 	
1463 		for (i = 0; i < left_list_entries; i++) {
1464 			if (left_list[i].nodeid == our_nodeid)
1465 				return 1;
1466 		}
1467 		return 0;
1468 	}
1469 	
1470 	static void confchg_cb(cpg_handle_t handle,
1471 			       const struct cpg_name *group_name,
1472 			       const struct cpg_address *member_list,
1473 			       size_t member_list_entries,
1474 			       const struct cpg_address *left_list,
1475 			       size_t left_list_entries,
1476 			       const struct cpg_address *joined_list,
1477 			       size_t joined_list_entries)
1478 	{
1479 		struct lockspace *ls;
1480 		struct change *cg;
1481 		struct member *memb;
1482 		int rv;
1483 	
1484 		log_config(group_name, member_list, member_list_entries,
1485 			   left_list, left_list_entries,
1486 			   joined_list, joined_list_entries);
1487 	
1488 		ls = find_ls_handle(handle);
1489 		if (!ls) {
1490 			log_error("confchg_cb no lockspace for cpg %s",
1491 				  group_name->value);
1492 			return;
1493 		}
1494 	
1495 		if (ls->leaving && we_left(left_list, left_list_entries)) {
1496 			/* we called cpg_leave(), and this should be the final
1497 			   cpg callback we receive */
1498 			log_group(ls, "confchg for our leave");
1499 			stop_kernel(ls, 0);
1500 			set_configfs_members(ls, ls->name, 0, NULL, 0, NULL);
1501 			set_sysfs_event_done(ls->name, 0);
1502 			cpg_finalize(ls->cpg_handle);
1503 			client_dead(ls->cpg_client);
1504 			purge_plocks(ls, our_nodeid, 1);
1505 			list_del(&ls->list);
1506 			free_ls(ls);
1507 			return;
1508 		}
1509 	
1510 		rv = add_change(ls, member_list, member_list_entries,
1511 				left_list, left_list_entries,
1512 				joined_list, joined_list_entries, &cg);
1513 		if (rv)
1514 			return;
1515 	
1516 		stop_kernel(ls, cg->seq);
1517 	
1518 		list_for_each_entry(memb, &cg->removed, list)
1519 			purge_plocks(ls, memb->nodeid, 0);
1520 	
1521 		apply_changes(ls);
1522 	
1523 	#if 0
1524 		deadlk_confchg(ls, member_list, member_list_entries,
1525 			       left_list, left_list_entries,
1526 			       joined_list, joined_list_entries);
1527 	#endif
1528 	}
1529 	
1530 	/* after our join confchg, we want to ignore plock messages (see need_plocks
1531 	   checks below) until the point in time where the ckpt_node saves plock
1532 	   state (final start message received); at this time we want to shift from
1533 	   ignoring plock messages to saving plock messages to apply on top of the
1534 	   plock state that we read. */
1535 	
1536 	static void deliver_cb(cpg_handle_t handle,
1537 			       const struct cpg_name *group_name,
1538 			       uint32_t nodeid, uint32_t pid,
1539 			       void *data, size_t len)
1540 	{
1541 		struct lockspace *ls;
1542 		struct dlm_header *hd;
1543 		int ignore_plock;
1544 		int rv;
1545 	
1546 		int enable_plock = opt(enable_plock_ind);
1547 		int plock_ownership = opt(plock_ownership_ind);
1548 	
1549 		ls = find_ls_handle(handle);
1550 		if (!ls) {
1551 			log_error("deliver_cb no ls for cpg %s", group_name->value);
1552 			return;
1553 		}
1554 	
1555 		if (len < sizeof(struct dlm_header)) {
1556 			log_error("deliver_cb short message %zd", len);
1557 			return;
1558 		}
1559 	
1560 		hd = (struct dlm_header *)data;
1561 		dlm_header_in(hd);
1562 	
1563 		rv = dlm_header_validate(hd, nodeid);
1564 		if (rv < 0)
1565 			return;
1566 	
1567 		ignore_plock = 0;
1568 	
1569 		switch (hd->type) {
1570 		case DLM_MSG_START:
1571 			receive_start(ls, hd, len);
1572 			break;
1573 	
1574 		case DLM_MSG_PLOCK:
1575 			if (ls->disable_plock)
1576 				break;
1577 			if (ls->need_plocks && !ls->save_plocks) {
1578 				ignore_plock = 1;
1579 				break;
1580 			}
1581 			if (enable_plock)
1582 				receive_plock(ls, hd, len);
1583 			else
1584 				log_error("msg %d nodeid %d enable_plock %d",
1585 					  hd->type, nodeid, enable_plock);
1586 			break;
1587 	
1588 		case DLM_MSG_PLOCK_OWN:
1589 			if (ls->disable_plock)
1590 				break;
1591 			if (ls->need_plocks && !ls->save_plocks) {
1592 				ignore_plock = 1;
1593 				break;
1594 			}
1595 			if (enable_plock && plock_ownership)
1596 				receive_own(ls, hd, len);
1597 			else
1598 				log_error("msg %d nodeid %d enable_plock %d owner %d",
1599 					  hd->type, nodeid, enable_plock, plock_ownership);
1600 			break;
1601 	
1602 		case DLM_MSG_PLOCK_DROP:
1603 			if (ls->disable_plock)
1604 				break;
1605 			if (ls->need_plocks && !ls->save_plocks) {
1606 				ignore_plock = 1;
1607 				break;
1608 			}
1609 			if (enable_plock && plock_ownership)
1610 				receive_drop(ls, hd, len);
1611 			else
1612 				log_error("msg %d nodeid %d enable_plock %d owner %d",
1613 					  hd->type, nodeid, enable_plock, plock_ownership);
1614 			break;
1615 	
1616 		case DLM_MSG_PLOCK_SYNC_LOCK:
1617 		case DLM_MSG_PLOCK_SYNC_WAITER:
1618 			if (ls->disable_plock)
1619 				break;
1620 			if (ls->need_plocks && !ls->save_plocks) {
1621 				ignore_plock = 1;
1622 				break;
1623 			}
1624 			if (enable_plock && plock_ownership)
1625 				receive_sync(ls, hd, len);
1626 			else
1627 				log_error("msg %d nodeid %d enable_plock %d owner %d",
1628 					  hd->type, nodeid, enable_plock, plock_ownership);
1629 			break;
1630 	
1631 		case DLM_MSG_PLOCKS_DATA:
1632 			if (ls->disable_plock)
1633 				break;
1634 			if (enable_plock)
1635 				receive_plocks_data(ls, hd, len);
1636 			else
1637 				log_error("msg %d nodeid %d enable_plock %d",
1638 					  hd->type, nodeid, enable_plock);
1639 			break;
1640 	
1641 		case DLM_MSG_PLOCKS_DONE:
1642 			if (ls->disable_plock)
1643 				break;
1644 			if (enable_plock)
1645 				receive_plocks_done(ls, hd, len);
1646 			else
1647 				log_error("msg %d nodeid %d enable_plock %d",
1648 					  hd->type, nodeid, enable_plock);
1649 			break;
1650 	
1651 	#if 0
1652 		case DLM_MSG_DEADLK_CYCLE_START:
1653 			if (opt(enable_deadlk))
1654 				receive_cycle_start(ls, hd, len);
1655 			else
1656 				log_error("msg %d nodeid %d enable_deadlk %d",
1657 					  hd->type, nodeid, opt(enable_deadlk));
1658 			break;
1659 	
1660 		case DLM_MSG_DEADLK_CYCLE_END:
1661 			if (opt(enable_deadlk))
1662 				receive_cycle_end(ls, hd, len);
1663 			else
1664 				log_error("msg %d nodeid %d enable_deadlk %d",
1665 					  hd->type, nodeid, opt(enable_deadlk));
1666 			break;
1667 	
1668 		case DLM_MSG_DEADLK_CHECKPOINT_READY:
1669 			if (opt(enable_deadlk))
1670 				receive_checkpoint_ready(ls, hd, len);
1671 			else
1672 				log_error("msg %d nodeid %d enable_deadlk %d",
1673 					  hd->type, nodeid, opt(enable_deadlk));
1674 			break;
1675 	
1676 		case DLM_MSG_DEADLK_CANCEL_LOCK:
1677 			if (opt(enable_deadlk))
1678 				receive_cancel_lock(ls, hd, len);
1679 			else
1680 				log_error("msg %d nodeid %d enable_deadlk %d",
1681 					  hd->type, nodeid, opt(enable_deadlk));
1682 			break;
1683 	#endif
1684 	
1685 		default:
1686 			log_error("unknown msg type %d", hd->type);
1687 		}
1688 	
1689 		if (ignore_plock)
1690 			log_plock(ls, "msg %s nodeid %d need_plock ignore",
1691 				  msg_name(hd->type), nodeid);
1692 	
1693 		apply_changes(ls);
1694 	}
1695 	
1696 	/* save ringid to compare with cman's.
1697 	   also save member_list to double check with cman's member list?
1698 	   they should match */
1699 	
1700 	static void totem_cb(cpg_handle_t handle,
1701 			     struct cpg_ring_id ring_id,
1702 			     uint32_t member_list_entries,
1703 			     const uint32_t *member_list)
1704 	{
1705 		struct lockspace *ls;
1706 		char name[128];
1707 	
1708 		ls = find_ls_handle(handle);
1709 		if (!ls) {
1710 			log_error("totem_cb no lockspace for handle");
1711 			return;
1712 		}
1713 	
1714 		memset(&name, 0, sizeof(name));
1715 		sprintf(name, "dlm:ls:%s", ls->name);
1716 	
1717 		log_ringid(name, &ring_id, member_list, member_list_entries);
1718 	
1719 		ls->cpg_ringid.nodeid = ring_id.nodeid;
1720 		ls->cpg_ringid.seq = ring_id.seq;
1721 		ls->cpg_ringid_wait = 0;
1722 	
1723 		apply_changes(ls);
1724 	}
1725 	
1726 	static cpg_model_v1_data_t cpg_callbacks = {
1727 		.cpg_deliver_fn = deliver_cb,
1728 		.cpg_confchg_fn = confchg_cb,
1729 		.cpg_totem_confchg_fn = totem_cb,
1730 		.flags = CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF,
1731 	};
1732 	
1733 	static void process_cpg_lockspace(int ci)
1734 	{
1735 		struct lockspace *ls;
1736 		cs_error_t error;
1737 	
1738 		ls = find_ls_ci(ci);
1739 		if (!ls) {
1740 			log_error("process_lockspace_cpg no lockspace for ci %d", ci);
1741 			return;
1742 		}
1743 	
1744 		error = cpg_dispatch(ls->cpg_handle, CS_DISPATCH_ALL);
1745 		if (error != CS_OK && error != CS_ERR_BAD_HANDLE) {
1746 			log_error("cpg_dispatch error %d", error);
1747 			return;
1748 		}
1749 	}
1750 	
1751 	/* received an "online" uevent from dlm-kernel */
1752 	
1753 	int dlm_join_lockspace(struct lockspace *ls)
1754 	{
1755 		cs_error_t error;
1756 		cpg_handle_t h;
1757 		struct cpg_name name;
1758 		int i = 0, fd, ci, rv;
1759 	
1760 		error = cpg_model_initialize(&h, CPG_MODEL_V1,
1761 					     (cpg_model_data_t *)&cpg_callbacks, NULL);
1762 		if (error != CS_OK) {
1763 			log_error("cpg_model_initialize error %d", error);
1764 			rv = -1;
1765 			goto fail_free;
1766 		}
1767 	
1768 		cpg_fd_get(h, &fd);
1769 	
1770 		ci = client_add(fd, process_cpg_lockspace, NULL);
1771 	
1772 		list_add(&ls->list, &lockspaces);
1773 	
1774 		ls->cpg_handle = h;
1775 		ls->cpg_client = ci;
1776 		ls->cpg_fd = fd;
1777 		ls->kernel_stopped = 1;
1778 		ls->need_plocks = 1;
1779 		ls->joining = 1;
1780 	
1781 		memset(&name, 0, sizeof(name));
1782 		sprintf(name.value, "dlm:ls:%s", ls->name);
1783 		name.length = strlen(name.value) + 1;
1784 	
1785 		/* TODO: allow global_id to be set in cluster.conf? */
1786 		ls->global_id = cpgname_to_crc(name.value, name.length);
1787 	
1788 		log_group(ls, "cpg_join %s ...", name.value);
1789 	 retry:
1790 		error = cpg_join(h, &name);
1791 		if (error == CS_ERR_TRY_AGAIN) {
1792 			sleep(1);
1793 			if (!(++i % 10))
1794 				log_error("cpg_join error retrying");
1795 			goto retry;
1796 		}
1797 		if (error != CS_OK) {
1798 			log_error("cpg_join error %d", error);
1799 			cpg_finalize(h);
1800 			rv = -1;
1801 			goto fail;
1802 		}
1803 	
1804 		return 0;
1805 	
1806 	 fail:
1807 		list_del(&ls->list);
1808 		client_dead(ci);
1809 		cpg_finalize(h);
1810 	 fail_free:
1811 		set_sysfs_event_done(ls->name, rv);
1812 		free_ls(ls);
1813 		return rv;
1814 	}
1815 	
1816 	/* received an "offline" uevent from dlm-kernel */
1817 	
1818 	int dlm_leave_lockspace(struct lockspace *ls)
1819 	{
1820 		cs_error_t error;
1821 		struct cpg_name name;
1822 		int i = 0;
1823 	
1824 		ls->leaving = 1;
1825 	
1826 		memset(&name, 0, sizeof(name));
1827 		sprintf(name.value, "dlm:ls:%s", ls->name);
1828 		name.length = strlen(name.value) + 1;
1829 	
1830 	 retry:
1831 		error = cpg_leave(ls->cpg_handle, &name);
1832 		if (error == CS_ERR_TRY_AGAIN) {
1833 			sleep(1);
1834 			if (!(++i % 10))
1835 				log_error("cpg_leave error retrying");
1836 			goto retry;
1837 		}
1838 		if (error != CS_OK)
1839 			log_error("cpg_leave error %d", error);
1840 	
1841 		return 0;
1842 	}
1843 	
1844 	int set_fs_notified(struct lockspace *ls, int nodeid)
1845 	{
1846 		struct node *node;
1847 	
1848 		/* this shouldn't happen */
1849 		node = get_node_history(ls, nodeid);
1850 		if (!node) {
1851 			log_error("set_fs_notified no nodeid %d", nodeid);
1852 			return -ESRCH;
1853 		}
1854 	
1855 		if (!find_memb(ls->started_change, nodeid)) {
1856 			log_group(ls, "set_fs_notified %d not in ls", nodeid);
1857 			return 0;
1858 		}
1859 	
1860 		/* this can happen, we haven't seen a nodedown for this node yet,
1861 		   but we should soon */
1862 		if (!node->check_fs) {
1863 			log_group(ls, "set_fs_notified %d zero check_fs", nodeid);
1864 			return -EAGAIN;
1865 		}
1866 	
1867 		log_group(ls, "set_fs_notified nodeid %d", nodeid);
1868 		node->fs_notified = 1;
1869 		return 0;
1870 	}
1871 	
1872 	int set_lockspace_info(struct lockspace *ls, struct dlmc_lockspace *lockspace)
1873 	{
1874 		struct change *cg, *last = NULL;
1875 	
1876 		strncpy(lockspace->name, ls->name, DLM_LOCKSPACE_LEN + 1);
1877 		lockspace->name[DLM_LOCKSPACE_LEN] = '\0';
1878 		lockspace->global_id = ls->global_id;
1879 	
1880 		if (ls->joining)
1881 			lockspace->flags |= DLMC_LF_JOINING;
1882 		if (ls->leaving)
1883 			lockspace->flags |= DLMC_LF_LEAVING;
1884 		if (ls->kernel_stopped)
1885 			lockspace->flags |= DLMC_LF_KERNEL_STOPPED;
1886 		if (ls->fs_registered)
1887 			lockspace->flags |= DLMC_LF_FS_REGISTERED;
1888 		if (ls->need_plocks)
1889 			lockspace->flags |= DLMC_LF_NEED_PLOCKS;
1890 		if (ls->save_plocks)
1891 			lockspace->flags |= DLMC_LF_SAVE_PLOCKS;
1892 	
1893 		if (!ls->started_change)
1894 			goto next;
1895 	
1896 		cg = ls->started_change;
1897 	
1898 		lockspace->cg_prev.member_count = cg->member_count;
1899 		lockspace->cg_prev.joined_count = cg->joined_count;
1900 		lockspace->cg_prev.remove_count = cg->remove_count;
1901 		lockspace->cg_prev.failed_count = cg->failed_count;
1902 		lockspace->cg_prev.combined_seq = cg->combined_seq;
1903 		lockspace->cg_prev.seq = cg->seq;
1904 	
1905 	 next:
1906 		if (list_empty(&ls->changes))
1907 			goto out;
1908 	
1909 		list_for_each_entry(cg, &ls->changes, list)
1910 			last = cg;
1911 	
1912 		cg = list_first_entry(&ls->changes, struct change, list);
1913 	
1914 		lockspace->cg_next.member_count = cg->member_count;
1915 		lockspace->cg_next.joined_count = cg->joined_count;
1916 		lockspace->cg_next.remove_count = cg->remove_count;
1917 		lockspace->cg_next.failed_count = cg->failed_count;
1918 		lockspace->cg_next.combined_seq = last->seq;
1919 		lockspace->cg_next.seq = cg->seq;
1920 		lockspace->cg_next.wait_condition = ls->wait_debug;
1921 		if (cg->state == CGST_WAIT_MESSAGES)
1922 			lockspace->cg_next.wait_messages = 1;
1923 	 out:
1924 		return 0;
1925 	}
1926 	
1927 	static int _set_node_info(struct lockspace *ls, struct change *cg, int nodeid,
1928 				  struct dlmc_node *node)
1929 	{
1930 		struct member *m = NULL;
1931 		struct node *n;
1932 	
1933 		node->nodeid = nodeid;
1934 	
1935 		if (cg)
1936 			m = find_memb(cg, nodeid);
1937 		if (!m)
1938 			goto history;
1939 	
1940 		node->flags |= DLMC_NF_MEMBER;
1941 	
1942 		if (m->start)
1943 			node->flags |= DLMC_NF_START;
1944 		if (m->disallowed)
1945 			node->flags |= DLMC_NF_DISALLOWED;
1946 	
1947 	 history:
1948 		n = get_node_history(ls, nodeid);
1949 		if (!n)
1950 			goto out;
1951 	
1952 		if (n->need_fencing)
1953 			node->flags |= DLMC_NF_NEED_FENCING;
1954 		if (n->check_fs)
1955 			node->flags |= DLMC_NF_CHECK_FS;
1956 	
1957 		node->added_seq = n->lockspace_add_seq;
1958 		node->removed_seq = n->lockspace_rem_seq;
1959 	
1960 		node->fail_reason = n->lockspace_fail_reason;
1961 		node->fail_walltime = n->fail_walltime;
1962 		node->fail_monotime = n->fail_monotime;
1963 	 out:
1964 		return 0;
1965 	}
1966 	
1967 	int set_node_info(struct lockspace *ls, int nodeid, struct dlmc_node *node)
1968 	{
1969 		struct change *cg;
1970 	
1971 		if (!list_empty(&ls->changes)) {
1972 			cg = list_first_entry(&ls->changes, struct change, list);
1973 			return _set_node_info(ls, cg, nodeid, node);
1974 		}
1975 	
1976 		return _set_node_info(ls, ls->started_change, nodeid, node);
1977 	}
1978 	
1979 	int set_lockspaces(int *count, struct dlmc_lockspace **lss_out)
1980 	{
1981 		struct lockspace *ls;
1982 		struct dlmc_lockspace *lss, *lsp;
1983 		int ls_count = 0;
1984 	
1985 		list_for_each_entry(ls, &lockspaces, list)
1986 			ls_count++;
1987 	
1988 		lss = malloc(ls_count * sizeof(struct dlmc_lockspace));
1989 		if (!lss)
1990 			return -ENOMEM;
1991 		memset(lss, 0, ls_count * sizeof(struct dlmc_lockspace));
1992 	
1993 		lsp = lss;
1994 		list_for_each_entry(ls, &lockspaces, list) {
1995 			set_lockspace_info(ls, lsp++);
1996 		}
1997 	
1998 		*count = ls_count;
1999 		*lss_out = lss;
2000 		return 0;
2001 	}
2002 	
2003 	int set_lockspace_nodes(struct lockspace *ls, int option, int *node_count,
2004 	                        struct dlmc_node **nodes_out)
2005 	{
2006 		struct change *cg;
2007 		struct node *n;
2008 		struct dlmc_node *nodes = NULL, *nodep;
2009 		struct member *memb;
2010 		int count = 0;
2011 	
2012 		if (option == DLMC_NODES_ALL) {
2013 			if (!list_empty(&ls->changes))
2014 				cg = list_first_entry(&ls->changes, struct change,list);
2015 			else
2016 				cg = ls->started_change;
2017 	
2018 			list_for_each_entry(n, &ls->node_history, list)
2019 				count++;
2020 	
2021 		} else if (option == DLMC_NODES_MEMBERS) {
2022 			if (!ls->started_change)
2023 				goto out;
2024 			cg = ls->started_change;
2025 			count = cg->member_count;
2026 	
2027 		} else if (option == DLMC_NODES_NEXT) {
2028 			if (list_empty(&ls->changes))
2029 				goto out;
2030 			cg = list_first_entry(&ls->changes, struct change, list);
2031 			count = cg->member_count;
2032 		} else
2033 			goto out;
2034 	
2035 		nodes = malloc(count * sizeof(struct dlmc_node));
2036 		if (!nodes)
2037 			return -ENOMEM;
2038 		memset(nodes, 0, count * sizeof(struct dlmc_node));
2039 		nodep = nodes;
2040 	
2041 		if (option == DLMC_NODES_ALL) {
2042 			list_for_each_entry(n, &ls->node_history, list)
2043 				_set_node_info(ls, cg, n->nodeid, nodep++);
2044 		} else {
2045 			list_for_each_entry(memb, &cg->members, list)
2046 				_set_node_info(ls, cg, memb->nodeid, nodep++);
2047 		}
2048 	 out:
2049 		*node_count = count;
2050 		*nodes_out = nodes;
2051 		return 0;
2052 	}
2053 	
2054