dlm: add node slots and generation

Slot numbers are assigned to nodes when they join the lockspace.
The slot number chosen is the minimum unused value starting at 1.
Once a node is assigned a slot, that slot number will not change
while the node remains a lockspace member.  If the node leaves
and rejoins it can be assigned a new slot number.

A new generation number is also added to a lockspace.  It is
set and incremented during each recovery along with the slot
collection/assignment.

The slot numbers will be passed to gfs2 which will use them as
journal id's.

Signed-off-by: David Teigland <teigland@redhat.com>
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index f10a50f..ac5c616 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -23,6 +23,7 @@
 #include "memory.h"
 #include "lock.h"
 #include "util.h"
+#include "member.h"
 
 
 static int rcom_response(struct dlm_ls *ls)
@@ -72,20 +73,30 @@
 	dlm_lowcomms_commit_buffer(mh);
 }
 
+static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
+			    uint32_t flags)
+{
+	rs->rs_flags = cpu_to_le32(flags);
+}
+
 /* When replying to a status request, a node also sends back its
    configuration values.  The requesting node then checks that the remote
    node is configured the same way as itself. */
 
-static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
+static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
+			    uint32_t num_slots)
 {
 	rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
 	rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
+
+	rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
+	rf->rf_num_slots = cpu_to_le16(num_slots);
+	rf->rf_generation =  cpu_to_le32(ls->ls_generation);
 }
 
-static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
+static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 {
 	struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
-	size_t conf_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 
 	if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
 		log_error(ls, "version mismatch: %x nodeid %d: %x",
@@ -94,12 +105,6 @@
 		return -EPROTO;
 	}
 
-	if (rc->rc_header.h_length < conf_size) {
-		log_error(ls, "config too short: %d nodeid %d",
-			  rc->rc_header.h_length, nodeid);
-		return -EPROTO;
-	}
-
 	if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
 	    le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
 		log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
@@ -127,7 +132,18 @@
 	spin_unlock(&ls->ls_rcom_spin);
 }
 
-int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
+/*
+ * low nodeid gathers one slot value at a time from each node.
+ * it sets need_slots=0, and saves rf_our_slot returned from each
+ * rcom_config.
+ *
+ * other nodes gather all slot values at once from the low nodeid.
+ * they set need_slots=1, and ignore the rf_our_slot returned from each
+ * rcom_config.  they use the rf_num_slots returned from the low
+ * node's rcom_config.
+ */
+
+int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
 {
 	struct dlm_rcom *rc;
 	struct dlm_mhandle *mh;
@@ -141,10 +157,13 @@
 		goto out;
 	}
 
-	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh);
+	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
+			    sizeof(struct rcom_status), &rc, &mh);
 	if (error)
 		goto out;
 
+	set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
+
 	allow_sync_reply(ls, &rc->rc_id);
 	memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 
@@ -161,8 +180,11 @@
 		/* we pretend the remote lockspace exists with 0 status */
 		log_debug(ls, "remote node %d not ready", nodeid);
 		rc->rc_result = 0;
-	} else
-		error = check_config(ls, rc, nodeid);
+		error = 0;
+	} else {
+		error = check_rcom_config(ls, rc, nodeid);
+	}
+
 	/* the caller looks at rc_result for the remote recovery status */
  out:
 	return error;
@@ -172,17 +194,60 @@
 {
 	struct dlm_rcom *rc;
 	struct dlm_mhandle *mh;
-	int error, nodeid = rc_in->rc_header.h_nodeid;
+	struct rcom_status *rs;
+	uint32_t status;
+	int nodeid = rc_in->rc_header.h_nodeid;
+	int len = sizeof(struct rcom_config);
+	int num_slots = 0;
+	int error;
 
+	if (!dlm_slots_version(&rc_in->rc_header)) {
+		status = dlm_recover_status(ls);
+		goto do_create;
+	}
+
+	rs = (struct rcom_status *)rc_in->rc_buf;
+
+	if (!(rs->rs_flags & DLM_RSF_NEED_SLOTS)) {
+		status = dlm_recover_status(ls);
+		goto do_create;
+	}
+
+	spin_lock(&ls->ls_recover_lock);
+	status = ls->ls_recover_status;
+	num_slots = ls->ls_num_slots;
+	spin_unlock(&ls->ls_recover_lock);
+	len += num_slots * sizeof(struct rcom_slot);
+
+ do_create:
 	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
-			    sizeof(struct rcom_config), &rc, &mh);
+			    len, &rc, &mh);
 	if (error)
 		return;
+
 	rc->rc_id = rc_in->rc_id;
 	rc->rc_seq_reply = rc_in->rc_seq;
-	rc->rc_result = dlm_recover_status(ls);
-	make_config(ls, (struct rcom_config *) rc->rc_buf);
+	rc->rc_result = status;
 
+	set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
+
+	if (!num_slots)
+		goto do_send;
+
+	spin_lock(&ls->ls_recover_lock);
+	if (ls->ls_num_slots != num_slots) {
+		spin_unlock(&ls->ls_recover_lock);
+		log_debug(ls, "receive_rcom_status num_slots %d to %d",
+			  num_slots, ls->ls_num_slots);
+		rc->rc_result = 0;
+		set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
+		goto do_send;
+	}
+
+	dlm_slots_copy_out(ls, rc);
+	spin_unlock(&ls->ls_recover_lock);
+
+ do_send:
 	send_rcom(ls, mh, rc);
 }