Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/teigland/linux-dlm:
  dlm: add recovery callbacks
  dlm: add node slots and generation
  dlm: move recovery barrier calls
  dlm: convert rsb list to rb_tree
diff --git a/fs/dlm/config.c b/fs/dlm/config.c
index 6cf72fc..e7e327d 100644
--- a/fs/dlm/config.c
+++ b/fs/dlm/config.c
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -17,6 +17,7 @@
 #include <linux/slab.h>
 #include <linux/in.h>
 #include <linux/in6.h>
+#include <linux/dlmconstants.h>
 #include <net/ipv6.h>
 #include <net/sock.h>
 
@@ -36,6 +37,7 @@
 static struct config_group *space_list;
 static struct config_group *comm_list;
 static struct dlm_comm *local_comm;
+static uint32_t dlm_comm_count;
 
 struct dlm_clusters;
 struct dlm_cluster;
@@ -103,6 +105,8 @@
 	unsigned int cl_timewarn_cs;
 	unsigned int cl_waitwarn_us;
 	unsigned int cl_new_rsb_count;
+	unsigned int cl_recover_callbacks;
+	char cl_cluster_name[DLM_LOCKSPACE_LEN];
 };
 
 enum {
@@ -118,6 +122,8 @@
 	CLUSTER_ATTR_TIMEWARN_CS,
 	CLUSTER_ATTR_WAITWARN_US,
 	CLUSTER_ATTR_NEW_RSB_COUNT,
+	CLUSTER_ATTR_RECOVER_CALLBACKS,
+	CLUSTER_ATTR_CLUSTER_NAME,
 };
 
 struct cluster_attribute {
@@ -126,6 +132,27 @@
 	ssize_t (*store)(struct dlm_cluster *, const char *, size_t);
 };
 
+static ssize_t cluster_cluster_name_read(struct dlm_cluster *cl, char *buf)
+{
+	return sprintf(buf, "%s\n", cl->cl_cluster_name);
+}
+
+static ssize_t cluster_cluster_name_write(struct dlm_cluster *cl,
+					  const char *buf, size_t len)
+{
+	strncpy(dlm_config.ci_cluster_name, buf, DLM_LOCKSPACE_LEN);
+	strncpy(cl->cl_cluster_name, buf, DLM_LOCKSPACE_LEN);
+	return len;
+}
+
+static struct cluster_attribute cluster_attr_cluster_name = {
+	.attr   = { .ca_owner = THIS_MODULE,
+                    .ca_name = "cluster_name",
+                    .ca_mode = S_IRUGO | S_IWUSR },
+	.show   = cluster_cluster_name_read,
+	.store  = cluster_cluster_name_write,
+};
+
 static ssize_t cluster_set(struct dlm_cluster *cl, unsigned int *cl_field,
 			   int *info_field, int check_zero,
 			   const char *buf, size_t len)
@@ -171,6 +198,7 @@
 CLUSTER_ATTR(timewarn_cs, 1);
 CLUSTER_ATTR(waitwarn_us, 0);
 CLUSTER_ATTR(new_rsb_count, 0);
+CLUSTER_ATTR(recover_callbacks, 0);
 
 static struct configfs_attribute *cluster_attrs[] = {
 	[CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
@@ -185,6 +213,8 @@
 	[CLUSTER_ATTR_TIMEWARN_CS] = &cluster_attr_timewarn_cs.attr,
 	[CLUSTER_ATTR_WAITWARN_US] = &cluster_attr_waitwarn_us.attr,
 	[CLUSTER_ATTR_NEW_RSB_COUNT] = &cluster_attr_new_rsb_count.attr,
+	[CLUSTER_ATTR_RECOVER_CALLBACKS] = &cluster_attr_recover_callbacks.attr,
+	[CLUSTER_ATTR_CLUSTER_NAME] = &cluster_attr_cluster_name.attr,
 	NULL,
 };
 
@@ -293,6 +323,7 @@
 
 struct dlm_comm {
 	struct config_item item;
+	int seq;
 	int nodeid;
 	int local;
 	int addr_count;
@@ -309,6 +340,7 @@
 	int nodeid;
 	int weight;
 	int new;
+	int comm_seq; /* copy of cm->seq when nd->nodeid is set */
 };
 
 static struct configfs_group_operations clusters_ops = {
@@ -455,6 +487,9 @@
 	cl->cl_timewarn_cs = dlm_config.ci_timewarn_cs;
 	cl->cl_waitwarn_us = dlm_config.ci_waitwarn_us;
 	cl->cl_new_rsb_count = dlm_config.ci_new_rsb_count;
+	cl->cl_recover_callbacks = dlm_config.ci_recover_callbacks;
+	memcpy(cl->cl_cluster_name, dlm_config.ci_cluster_name,
+	       DLM_LOCKSPACE_LEN);
 
 	space_list = &sps->ss_group;
 	comm_list = &cms->cs_group;
@@ -558,6 +593,11 @@
 		return ERR_PTR(-ENOMEM);
 
 	config_item_init_type_name(&cm->item, name, &comm_type);
+
+	cm->seq = dlm_comm_count++;
+	if (!cm->seq)
+		cm->seq = dlm_comm_count++;
+
 	cm->nodeid = -1;
 	cm->local = 0;
 	cm->addr_count = 0;
@@ -801,7 +841,10 @@
 static ssize_t node_nodeid_write(struct dlm_node *nd, const char *buf,
 				 size_t len)
 {
+	uint32_t seq = 0;
 	nd->nodeid = simple_strtol(buf, NULL, 0);
+	dlm_comm_seq(nd->nodeid, &seq);
+	nd->comm_seq = seq;
 	return len;
 }
 
@@ -908,13 +951,13 @@
 }
 
 /* caller must free mem */
-int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out,
-		    int **new_out, int *new_count_out)
+int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
+		     int *count_out)
 {
 	struct dlm_space *sp;
 	struct dlm_node *nd;
-	int i = 0, rv = 0, ids_count = 0, new_count = 0;
-	int *ids, *new;
+	struct dlm_config_node *nodes, *node;
+	int rv, count;
 
 	sp = get_space(lsname);
 	if (!sp)
@@ -927,73 +970,42 @@
 		goto out;
 	}
 
-	ids_count = sp->members_count;
+	count = sp->members_count;
 
-	ids = kcalloc(ids_count, sizeof(int), GFP_NOFS);
-	if (!ids) {
+	nodes = kcalloc(count, sizeof(struct dlm_config_node), GFP_NOFS);
+	if (!nodes) {
 		rv = -ENOMEM;
 		goto out;
 	}
 
+	node = nodes;
 	list_for_each_entry(nd, &sp->members, list) {
-		ids[i++] = nd->nodeid;
-		if (nd->new)
-			new_count++;
+		node->nodeid = nd->nodeid;
+		node->weight = nd->weight;
+		node->new = nd->new;
+		node->comm_seq = nd->comm_seq;
+		node++;
+
+		nd->new = 0;
 	}
 
-	if (ids_count != i)
-		printk(KERN_ERR "dlm: bad nodeid count %d %d\n", ids_count, i);
-
-	if (!new_count)
-		goto out_ids;
-
-	new = kcalloc(new_count, sizeof(int), GFP_NOFS);
-	if (!new) {
-		kfree(ids);
-		rv = -ENOMEM;
-		goto out;
-	}
-
-	i = 0;
-	list_for_each_entry(nd, &sp->members, list) {
-		if (nd->new) {
-			new[i++] = nd->nodeid;
-			nd->new = 0;
-		}
-	}
-	*new_count_out = new_count;
-	*new_out = new;
-
- out_ids:
-	*ids_count_out = ids_count;
-	*ids_out = ids;
+	*count_out = count;
+	*nodes_out = nodes;
+	rv = 0;
  out:
 	mutex_unlock(&sp->members_lock);
 	put_space(sp);
 	return rv;
 }
 
-int dlm_node_weight(char *lsname, int nodeid)
+int dlm_comm_seq(int nodeid, uint32_t *seq)
 {
-	struct dlm_space *sp;
-	struct dlm_node *nd;
-	int w = -EEXIST;
-
-	sp = get_space(lsname);
-	if (!sp)
-		goto out;
-
-	mutex_lock(&sp->members_lock);
-	list_for_each_entry(nd, &sp->members, list) {
-		if (nd->nodeid != nodeid)
-			continue;
-		w = nd->weight;
-		break;
-	}
-	mutex_unlock(&sp->members_lock);
-	put_space(sp);
- out:
-	return w;
+	struct dlm_comm *cm = get_comm(nodeid, NULL);
+	if (!cm)
+		return -EEXIST;
+	*seq = cm->seq;
+	put_comm(cm);
+	return 0;
 }
 
 int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr)
@@ -1047,6 +1059,8 @@
 #define DEFAULT_TIMEWARN_CS      500 /* 5 sec = 500 centiseconds */
 #define DEFAULT_WAITWARN_US	   0
 #define DEFAULT_NEW_RSB_COUNT    128
+#define DEFAULT_RECOVER_CALLBACKS  0
+#define DEFAULT_CLUSTER_NAME      ""
 
 struct dlm_config_info dlm_config = {
 	.ci_tcp_port = DEFAULT_TCP_PORT,
@@ -1060,6 +1074,8 @@
 	.ci_protocol = DEFAULT_PROTOCOL,
 	.ci_timewarn_cs = DEFAULT_TIMEWARN_CS,
 	.ci_waitwarn_us = DEFAULT_WAITWARN_US,
-	.ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT
+	.ci_new_rsb_count = DEFAULT_NEW_RSB_COUNT,
+	.ci_recover_callbacks = DEFAULT_RECOVER_CALLBACKS,
+	.ci_cluster_name = DEFAULT_CLUSTER_NAME
 };
 
diff --git a/fs/dlm/config.h b/fs/dlm/config.h
index 3099d0d..9f5e366 100644
--- a/fs/dlm/config.h
+++ b/fs/dlm/config.h
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -14,6 +14,13 @@
 #ifndef __CONFIG_DOT_H__
 #define __CONFIG_DOT_H__
 
+struct dlm_config_node {
+	int nodeid;
+	int weight;
+	int new;
+	uint32_t comm_seq;
+};
+
 #define DLM_MAX_ADDR_COUNT 3
 
 struct dlm_config_info {
@@ -29,15 +36,17 @@
 	int ci_timewarn_cs;
 	int ci_waitwarn_us;
 	int ci_new_rsb_count;
+	int ci_recover_callbacks;
+	char ci_cluster_name[DLM_LOCKSPACE_LEN];
 };
 
 extern struct dlm_config_info dlm_config;
 
 int dlm_config_init(void);
 void dlm_config_exit(void);
-int dlm_node_weight(char *lsname, int nodeid);
-int dlm_nodeid_list(char *lsname, int **ids_out, int *ids_count_out,
-		    int **new_out, int *new_count_out);
+int dlm_config_nodes(char *lsname, struct dlm_config_node **nodes_out,
+		     int *count_out);
+int dlm_comm_seq(int nodeid, uint32_t *seq);
 int dlm_nodeid_to_addr(int nodeid, struct sockaddr_storage *addr);
 int dlm_addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid);
 int dlm_our_nodeid(void);
diff --git a/fs/dlm/debug_fs.c b/fs/dlm/debug_fs.c
index 5977923..3dca2b3 100644
--- a/fs/dlm/debug_fs.c
+++ b/fs/dlm/debug_fs.c
@@ -393,6 +393,7 @@
 
 static void *table_seq_start(struct seq_file *seq, loff_t *pos)
 {
+	struct rb_node *node;
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri;
 	struct dlm_rsb *r;
@@ -418,9 +419,10 @@
 		ri->format = 3;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-		list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list,
-				    res_hashchain) {
+	if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+		for (node = rb_first(&ls->ls_rsbtbl[bucket].keep); node;
+		     node = rb_next(node)) {
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			if (!entry--) {
 				dlm_hold_rsb(r);
 				ri->rsb = r;
@@ -449,9 +451,9 @@
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			node = rb_first(&ls->ls_rsbtbl[bucket].keep);
+			r = rb_entry(node, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
@@ -467,7 +469,7 @@
 {
 	struct dlm_ls *ls = seq->private;
 	struct rsbtbl_iter *ri = iter_ptr;
-	struct list_head *next;
+	struct rb_node *next;
 	struct dlm_rsb *r, *rp;
 	loff_t n = *pos;
 	unsigned bucket;
@@ -480,10 +482,10 @@
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
 	rp = ri->rsb;
-	next = rp->res_hashchain.next;
+	next = rb_next(&rp->res_hashnode);
 
-	if (next != &ls->ls_rsbtbl[bucket].list) {
-		r = list_entry(next, struct dlm_rsb, res_hashchain);
+	if (next) {
+		r = rb_entry(next, struct dlm_rsb, res_hashnode);
 		dlm_hold_rsb(r);
 		ri->rsb = r;
 		spin_unlock(&ls->ls_rsbtbl[bucket].lock);
@@ -511,9 +513,9 @@
 		}
 
 		spin_lock(&ls->ls_rsbtbl[bucket].lock);
-		if (!list_empty(&ls->ls_rsbtbl[bucket].list)) {
-			r = list_first_entry(&ls->ls_rsbtbl[bucket].list,
-					     struct dlm_rsb, res_hashchain);
+		if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[bucket].keep)) {
+			next = rb_first(&ls->ls_rsbtbl[bucket].keep);
+			r = rb_entry(next, struct dlm_rsb, res_hashnode);
 			dlm_hold_rsb(r);
 			ri->rsb = r;
 			ri->bucket = bucket;
diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c
index 7b84c1d..8364157 100644
--- a/fs/dlm/dir.c
+++ b/fs/dlm/dir.c
@@ -290,7 +290,6 @@
 
  out_status:
 	error = 0;
-	dlm_set_recover_status(ls, DLM_RS_DIR);
 	log_debug(ls, "dlm_recover_directory %d entries", count);
  out_free:
 	kfree(last_name);
diff --git a/fs/dlm/dlm_internal.h b/fs/dlm/dlm_internal.h
index fe2860c..3a564d1 100644
--- a/fs/dlm/dlm_internal.h
+++ b/fs/dlm/dlm_internal.h
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2010 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -103,8 +103,8 @@
 };
 
 struct dlm_rsbtable {
-	struct list_head	list;
-	struct list_head	toss;
+	struct rb_root		keep;
+	struct rb_root		toss;
 	spinlock_t		lock;
 };
 
@@ -117,6 +117,10 @@
 	struct list_head	list;
 	int			nodeid;
 	int			weight;
+	int			slot;
+	int			slot_prev;
+	int			comm_seq;
+	uint32_t		generation;
 };
 
 /*
@@ -125,10 +129,8 @@
 
 struct dlm_recover {
 	struct list_head	list;
-	int			*nodeids;   /* nodeids of all members */
-	int			node_count;
-	int			*new;       /* nodeids of new members */
-	int			new_count;
+	struct dlm_config_node	*nodes;
+	int			nodes_count;
 	uint64_t		seq;
 };
 
@@ -285,7 +287,10 @@
 	unsigned long		res_toss_time;
 	uint32_t		res_first_lkid;
 	struct list_head	res_lookup;	/* lkbs waiting on first */
-	struct list_head	res_hashchain;	/* rsbtbl */
+	union {
+		struct list_head	res_hashchain;
+		struct rb_node		res_hashnode;	/* rsbtbl */
+	};
 	struct list_head	res_grantqueue;
 	struct list_head	res_convertqueue;
 	struct list_head	res_waitqueue;
@@ -334,7 +339,9 @@
 /* dlm_header is first element of all structs sent between nodes */
 
 #define DLM_HEADER_MAJOR	0x00030000
-#define DLM_HEADER_MINOR	0x00000000
+#define DLM_HEADER_MINOR	0x00000001
+
+#define DLM_HEADER_SLOTS	0x00000001
 
 #define DLM_MSG			1
 #define DLM_RCOM		2
@@ -422,10 +429,34 @@
 	struct dlm_rcom		rcom;
 };
 
+#define DLM_RSF_NEED_SLOTS	0x00000001
+
+/* RCOM_STATUS data */
+struct rcom_status {
+	__le32			rs_flags;
+	__le32			rs_unused1;
+	__le64			rs_unused2;
+};
+
+/* RCOM_STATUS_REPLY data */
 struct rcom_config {
 	__le32			rf_lvblen;
 	__le32			rf_lsflags;
-	__le64			rf_unused;
+
+	/* DLM_HEADER_SLOTS adds: */
+	__le32			rf_flags;
+	__le16			rf_our_slot;
+	__le16			rf_num_slots;
+	__le32			rf_generation;
+	__le32			rf_unused1;
+	__le64			rf_unused2;
+};
+
+struct rcom_slot {
+	__le32			ro_nodeid;
+	__le16			ro_slot;
+	__le16			ro_unused1;
+	__le64			ro_unused2;
 };
 
 struct rcom_lock {
@@ -452,6 +483,7 @@
 	struct list_head	ls_list;	/* list of lockspaces */
 	dlm_lockspace_t		*ls_local_handle;
 	uint32_t		ls_global_id;	/* global unique lockspace ID */
+	uint32_t		ls_generation;
 	uint32_t		ls_exflags;
 	int			ls_lvblen;
 	int			ls_count;	/* refcount of processes in
@@ -490,6 +522,11 @@
 	int			ls_total_weight;
 	int			*ls_node_array;
 
+	int			ls_slot;
+	int			ls_num_slots;
+	int			ls_slots_size;
+	struct dlm_slot		*ls_slots;
+
 	struct dlm_rsb		ls_stub_rsb;	/* for returning errors */
 	struct dlm_lkb		ls_stub_lkb;	/* for returning errors */
 	struct dlm_message	ls_stub_ms;	/* for faking a reply */
@@ -537,6 +574,9 @@
 	struct list_head	ls_root_list;	/* root resources */
 	struct rw_semaphore	ls_root_sem;	/* protect root_list */
 
+	const struct dlm_lockspace_ops *ls_ops;
+	void			*ls_ops_arg;
+
 	int			ls_namelen;
 	char			ls_name[1];
 };
diff --git a/fs/dlm/lock.c b/fs/dlm/lock.c
index 83b5e32..d471830 100644
--- a/fs/dlm/lock.c
+++ b/fs/dlm/lock.c
@@ -56,6 +56,7 @@
    L: receive_xxxx_reply()     <-  R: send_xxxx_reply()
 */
 #include <linux/types.h>
+#include <linux/rbtree.h>
 #include <linux/slab.h>
 #include "dlm_internal.h"
 #include <linux/dlm_device.h>
@@ -380,6 +381,8 @@
 
 	r = list_first_entry(&ls->ls_new_rsb, struct dlm_rsb, res_hashchain);
 	list_del(&r->res_hashchain);
+	/* Convert the empty list_head to a NULL rb_node for tree usage: */
+	memset(&r->res_hashnode, 0, sizeof(struct rb_node));
 	ls->ls_new_rsb_count--;
 	spin_unlock(&ls->ls_new_rsb_spin);
 
@@ -388,7 +391,6 @@
 	memcpy(r->res_name, name, len);
 	mutex_init(&r->res_mutex);
 
-	INIT_LIST_HEAD(&r->res_hashchain);
 	INIT_LIST_HEAD(&r->res_lookup);
 	INIT_LIST_HEAD(&r->res_grantqueue);
 	INIT_LIST_HEAD(&r->res_convertqueue);
@@ -400,14 +402,31 @@
 	return 0;
 }
 
-static int search_rsb_list(struct list_head *head, char *name, int len,
+static int rsb_cmp(struct dlm_rsb *r, const char *name, int nlen)
+{
+	char maxname[DLM_RESNAME_MAXLEN];
+
+	memset(maxname, 0, DLM_RESNAME_MAXLEN);
+	memcpy(maxname, name, nlen);
+	return memcmp(r->res_name, maxname, DLM_RESNAME_MAXLEN);
+}
+
+static int search_rsb_tree(struct rb_root *tree, char *name, int len,
 			   unsigned int flags, struct dlm_rsb **r_ret)
 {
+	struct rb_node *node = tree->rb_node;
 	struct dlm_rsb *r;
 	int error = 0;
+	int rc;
 
-	list_for_each_entry(r, head, res_hashchain) {
-		if (len == r->res_length && !memcmp(name, r->res_name, len))
+	while (node) {
+		r = rb_entry(node, struct dlm_rsb, res_hashnode);
+		rc = rsb_cmp(r, name, len);
+		if (rc < 0)
+			node = node->rb_left;
+		else if (rc > 0)
+			node = node->rb_right;
+		else
 			goto found;
 	}
 	*r_ret = NULL;
@@ -420,22 +439,54 @@
 	return error;
 }
 
+static int rsb_insert(struct dlm_rsb *rsb, struct rb_root *tree)
+{
+	struct rb_node **newn = &tree->rb_node;
+	struct rb_node *parent = NULL;
+	int rc;
+
+	while (*newn) {
+		struct dlm_rsb *cur = rb_entry(*newn, struct dlm_rsb,
+					       res_hashnode);
+
+		parent = *newn;
+		rc = rsb_cmp(cur, rsb->res_name, rsb->res_length);
+		if (rc < 0)
+			newn = &parent->rb_left;
+		else if (rc > 0)
+			newn = &parent->rb_right;
+		else {
+			log_print("rsb_insert match");
+			dlm_dump_rsb(rsb);
+			dlm_dump_rsb(cur);
+			return -EEXIST;
+		}
+	}
+
+	rb_link_node(&rsb->res_hashnode, parent, newn);
+	rb_insert_color(&rsb->res_hashnode, tree);
+	return 0;
+}
+
 static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
 		       unsigned int flags, struct dlm_rsb **r_ret)
 {
 	struct dlm_rsb *r;
 	int error;
 
-	error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, flags, &r);
 	if (!error) {
 		kref_get(&r->res_ref);
 		goto out;
 	}
-	error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
+	error = search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
 	if (error)
 		goto out;
 
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+	error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+	if (error)
+		return error;
 
 	if (dlm_no_directory(ls))
 		goto out;
@@ -527,8 +578,7 @@
 			nodeid = 0;
 		r->res_nodeid = nodeid;
 	}
-	list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
-	error = 0;
+	error = rsb_insert(r, &ls->ls_rsbtbl[bucket].keep);
  out_unlock:
 	spin_unlock(&ls->ls_rsbtbl[bucket].lock);
  out:
@@ -556,7 +606,8 @@
 
 	DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
 	kref_init(&r->res_ref);
-	list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
+	rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
+	rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
 	r->res_toss_time = jiffies;
 	if (r->res_lvbptr) {
 		dlm_free_lvb(r->res_lvbptr);
@@ -1082,19 +1133,19 @@
 				     r->res_name, r->res_length);
 }
 
-/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
-   found since they are in order of newest to oldest? */
+/* FIXME: make this more efficient */
 
 static int shrink_bucket(struct dlm_ls *ls, int b)
 {
+	struct rb_node *n;
 	struct dlm_rsb *r;
 	int count = 0, found;
 
 	for (;;) {
 		found = 0;
 		spin_lock(&ls->ls_rsbtbl[b].lock);
-		list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
-					    res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = rb_next(n)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			if (!time_after_eq(jiffies, r->res_toss_time +
 					   dlm_config.ci_toss_secs * HZ))
 				continue;
@@ -1108,7 +1159,7 @@
 		}
 
 		if (kref_put(&r->res_ref, kill_rsb)) {
-			list_del(&r->res_hashchain);
+			rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
 			spin_unlock(&ls->ls_rsbtbl[b].lock);
 
 			if (is_master(r))
@@ -4441,10 +4492,12 @@
 
 static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
 {
+	struct rb_node *n;
 	struct dlm_rsb *r, *r_ret = NULL;
 
 	spin_lock(&ls->ls_rsbtbl[bucket].lock);
-	list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
+	for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
+		r = rb_entry(n, struct dlm_rsb, res_hashnode);
 		if (!rsb_flag(r, RSB_LOCKS_PURGED))
 			continue;
 		hold_rsb(r);
diff --git a/fs/dlm/lockspace.c b/fs/dlm/lockspace.c
index a1d8f1a..a1ea25f 100644
--- a/fs/dlm/lockspace.c
+++ b/fs/dlm/lockspace.c
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -386,12 +386,15 @@
 	dlm_lowcomms_stop();
 }
 
-static int new_lockspace(const char *name, int namelen, void **lockspace,
-			 uint32_t flags, int lvblen)
+static int new_lockspace(const char *name, const char *cluster,
+			 uint32_t flags, int lvblen,
+			 const struct dlm_lockspace_ops *ops, void *ops_arg,
+			 int *ops_result, dlm_lockspace_t **lockspace)
 {
 	struct dlm_ls *ls;
 	int i, size, error;
 	int do_unreg = 0;
+	int namelen = strlen(name);
 
 	if (namelen > DLM_LOCKSPACE_LEN)
 		return -EINVAL;
@@ -403,8 +406,24 @@
 		return -EINVAL;
 
 	if (!dlm_user_daemon_available()) {
-		module_put(THIS_MODULE);
-		return -EUNATCH;
+		log_print("dlm user daemon not available");
+		error = -EUNATCH;
+		goto out;
+	}
+
+	if (ops && ops_result) {
+	       	if (!dlm_config.ci_recover_callbacks)
+			*ops_result = -EOPNOTSUPP;
+		else
+			*ops_result = 0;
+	}
+
+	if (dlm_config.ci_recover_callbacks && cluster &&
+	    strncmp(cluster, dlm_config.ci_cluster_name, DLM_LOCKSPACE_LEN)) {
+		log_print("dlm cluster name %s mismatch %s",
+			  dlm_config.ci_cluster_name, cluster);
+		error = -EBADR;
+		goto out;
 	}
 
 	error = 0;
@@ -442,6 +461,11 @@
 	ls->ls_flags = 0;
 	ls->ls_scan_time = jiffies;
 
+	if (ops && dlm_config.ci_recover_callbacks) {
+		ls->ls_ops = ops;
+		ls->ls_ops_arg = ops_arg;
+	}
+
 	if (flags & DLM_LSFL_TIMEWARN)
 		set_bit(LSFL_TIMEWARN, &ls->ls_flags);
 
@@ -457,8 +481,8 @@
 	if (!ls->ls_rsbtbl)
 		goto out_lsfree;
 	for (i = 0; i < size; i++) {
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list);
-		INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss);
+		ls->ls_rsbtbl[i].keep.rb_node = NULL;
+		ls->ls_rsbtbl[i].toss.rb_node = NULL;
 		spin_lock_init(&ls->ls_rsbtbl[i].lock);
 	}
 
@@ -525,6 +549,11 @@
 	if (!ls->ls_recover_buf)
 		goto out_dirfree;
 
+	ls->ls_slot = 0;
+	ls->ls_num_slots = 0;
+	ls->ls_slots_size = 0;
+	ls->ls_slots = NULL;
+
 	INIT_LIST_HEAD(&ls->ls_recover_list);
 	spin_lock_init(&ls->ls_recover_list_lock);
 	ls->ls_recover_list_count = 0;
@@ -614,8 +643,10 @@
 	return error;
 }
 
-int dlm_new_lockspace(const char *name, int namelen, void **lockspace,
-		      uint32_t flags, int lvblen)
+int dlm_new_lockspace(const char *name, const char *cluster,
+		      uint32_t flags, int lvblen,
+		      const struct dlm_lockspace_ops *ops, void *ops_arg,
+		      int *ops_result, dlm_lockspace_t **lockspace)
 {
 	int error = 0;
 
@@ -625,7 +656,8 @@
 	if (error)
 		goto out;
 
-	error = new_lockspace(name, namelen, lockspace, flags, lvblen);
+	error = new_lockspace(name, cluster, flags, lvblen, ops, ops_arg,
+			      ops_result, lockspace);
 	if (!error)
 		ls_count++;
 	if (error > 0)
@@ -685,7 +717,7 @@
 static int release_lockspace(struct dlm_ls *ls, int force)
 {
 	struct dlm_rsb *rsb;
-	struct list_head *head;
+	struct rb_node *n;
 	int i, busy, rv;
 
 	busy = lockspace_busy(ls, force);
@@ -746,20 +778,15 @@
 	 */
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-		head = &ls->ls_rsbtbl[i].list;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].keep);
 			dlm_free_rsb(rsb);
 		}
 
-		head = &ls->ls_rsbtbl[i].toss;
-		while (!list_empty(head)) {
-			rsb = list_entry(head->next, struct dlm_rsb,
-					 res_hashchain);
-			list_del(&rsb->res_hashchain);
+		while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			rb_erase(n, &ls->ls_rsbtbl[i].toss);
 			dlm_free_rsb(rsb);
 		}
 	}
diff --git a/fs/dlm/member.c b/fs/dlm/member.c
index b12532e..862640a 100644
--- a/fs/dlm/member.c
+++ b/fs/dlm/member.c
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005-2009 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -19,6 +19,280 @@
 #include "config.h"
 #include "lowcomms.h"
 
+int dlm_slots_version(struct dlm_header *h)
+{
+	if ((h->h_version & 0x0000FFFF) < DLM_HEADER_SLOTS)
+		return 0;
+	return 1;
+}
+
+void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
+		   struct dlm_member *memb)
+{
+	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
+
+	if (!dlm_slots_version(&rc->rc_header))
+		return;
+
+	memb->slot = le16_to_cpu(rf->rf_our_slot);
+	memb->generation = le32_to_cpu(rf->rf_generation);
+}
+
+void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc)
+{
+	struct dlm_slot *slot;
+	struct rcom_slot *ro;
+	int i;
+
+	ro = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
+
+	/* ls_slots array is sparse, but not rcom_slots */
+
+	for (i = 0; i < ls->ls_slots_size; i++) {
+		slot = &ls->ls_slots[i];
+		if (!slot->nodeid)
+			continue;
+		ro->ro_nodeid = cpu_to_le32(slot->nodeid);
+		ro->ro_slot = cpu_to_le16(slot->slot);
+		ro++;
+	}
+}
+
+#define SLOT_DEBUG_LINE 128
+
+static void log_debug_slots(struct dlm_ls *ls, uint32_t gen, int num_slots,
+			    struct rcom_slot *ro0, struct dlm_slot *array,
+			    int array_size)
+{
+	char line[SLOT_DEBUG_LINE];
+	int len = SLOT_DEBUG_LINE - 1;
+	int pos = 0;
+	int ret, i;
+
+	if (!dlm_config.ci_log_debug)
+		return;
+
+	memset(line, 0, sizeof(line));
+
+	if (array) {
+		for (i = 0; i < array_size; i++) {
+			if (!array[i].nodeid)
+				continue;
+
+			ret = snprintf(line + pos, len - pos, " %d:%d",
+				       array[i].slot, array[i].nodeid);
+			if (ret >= len - pos)
+				break;
+			pos += ret;
+		}
+	} else if (ro0) {
+		for (i = 0; i < num_slots; i++) {
+			ret = snprintf(line + pos, len - pos, " %d:%d",
+				       ro0[i].ro_slot, ro0[i].ro_nodeid);
+			if (ret >= len - pos)
+				break;
+			pos += ret;
+		}
+	}
+
+	log_debug(ls, "generation %u slots %d%s", gen, num_slots, line);
+}
+
+int dlm_slots_copy_in(struct dlm_ls *ls)
+{
+	struct dlm_member *memb;
+	struct dlm_rcom *rc = ls->ls_recover_buf;
+	struct rcom_config *rf = (struct rcom_config *)rc->rc_buf;
+	struct rcom_slot *ro0, *ro;
+	int our_nodeid = dlm_our_nodeid();
+	int i, num_slots;
+	uint32_t gen;
+
+	if (!dlm_slots_version(&rc->rc_header))
+		return -1;
+
+	gen = le32_to_cpu(rf->rf_generation);
+	if (gen <= ls->ls_generation) {
+		log_error(ls, "dlm_slots_copy_in gen %u old %u",
+			  gen, ls->ls_generation);
+	}
+	ls->ls_generation = gen;
+
+	num_slots = le16_to_cpu(rf->rf_num_slots);
+	if (!num_slots)
+		return -1;
+
+	ro0 = (struct rcom_slot *)(rc->rc_buf + sizeof(struct rcom_config));
+
+	for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
+		ro->ro_nodeid = le32_to_cpu(ro->ro_nodeid);
+		ro->ro_slot = le16_to_cpu(ro->ro_slot);
+	}
+
+	log_debug_slots(ls, gen, num_slots, ro0, NULL, 0);
+
+	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		for (i = 0, ro = ro0; i < num_slots; i++, ro++) {
+			if (ro->ro_nodeid != memb->nodeid)
+				continue;
+			memb->slot = ro->ro_slot;
+			memb->slot_prev = memb->slot;
+			break;
+		}
+
+		if (memb->nodeid == our_nodeid) {
+			if (ls->ls_slot && ls->ls_slot != memb->slot) {
+				log_error(ls, "dlm_slots_copy_in our slot "
+					  "changed %d %d", ls->ls_slot,
+					  memb->slot);
+				return -1;
+			}
+
+			if (!ls->ls_slot)
+				ls->ls_slot = memb->slot;
+		}
+
+		if (!memb->slot) {
+			log_error(ls, "dlm_slots_copy_in nodeid %d no slot",
+				   memb->nodeid);
+			return -1;
+		}
+	}
+
+	return 0;
+}
+
+/* for any nodes that do not support slots, we will not have set memb->slot
+   in wait_status_all(), so memb->slot will remain -1, and we will not
+   assign slots or set ls_num_slots here */
+
+int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
+		     struct dlm_slot **slots_out, uint32_t *gen_out)
+{
+	struct dlm_member *memb;
+	struct dlm_slot *array;
+	int our_nodeid = dlm_our_nodeid();
+	int array_size, max_slots, i;
+	int need = 0;
+	int max = 0;
+	int num = 0;
+	uint32_t gen = 0;
+
+	/* our own memb struct will have slot -1 gen 0 */
+
+	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		if (memb->nodeid == our_nodeid) {
+			memb->slot = ls->ls_slot;
+			memb->generation = ls->ls_generation;
+			break;
+		}
+	}
+
+	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		if (memb->generation > gen)
+			gen = memb->generation;
+
+		/* node doesn't support slots */
+
+		if (memb->slot == -1)
+			return -1;
+
+		/* node needs a slot assigned */
+
+		if (!memb->slot)
+			need++;
+
+		/* node has a slot assigned */
+
+		num++;
+
+		if (!max || max < memb->slot)
+			max = memb->slot;
+
+		/* sanity check, once slot is assigned it shouldn't change */
+
+		if (memb->slot_prev && memb->slot && memb->slot_prev != memb->slot) {
+			log_error(ls, "nodeid %d slot changed %d %d",
+				  memb->nodeid, memb->slot_prev, memb->slot);
+			return -1;
+		}
+		memb->slot_prev = memb->slot;
+	}
+
+	array_size = max + need;
+
+	array = kzalloc(array_size * sizeof(struct dlm_slot), GFP_NOFS);
+	if (!array)
+		return -ENOMEM;
+
+	num = 0;
+
+	/* fill in slots (offsets) that are used */
+
+	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		if (!memb->slot)
+			continue;
+
+		if (memb->slot > array_size) {
+			log_error(ls, "invalid slot number %d", memb->slot);
+			kfree(array);
+			return -1;
+		}
+
+		array[memb->slot - 1].nodeid = memb->nodeid;
+		array[memb->slot - 1].slot = memb->slot;
+		num++;
+	}
+
+	/* assign new slots from unused offsets */
+
+	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		if (memb->slot)
+			continue;
+
+		for (i = 0; i < array_size; i++) {
+			if (array[i].nodeid)
+				continue;
+
+			memb->slot = i + 1;
+			memb->slot_prev = memb->slot;
+			array[i].nodeid = memb->nodeid;
+			array[i].slot = memb->slot;
+			num++;
+
+			if (!ls->ls_slot && memb->nodeid == our_nodeid)
+				ls->ls_slot = memb->slot;
+			break;
+		}
+
+		if (!memb->slot) {
+			log_error(ls, "no free slot found");
+			kfree(array);
+			return -1;
+		}
+	}
+
+	gen++;
+
+	log_debug_slots(ls, gen, num, NULL, array, array_size);
+
+	max_slots = (dlm_config.ci_buffer_size - sizeof(struct dlm_rcom) -
+		     sizeof(struct rcom_config)) / sizeof(struct rcom_slot);
+
+	if (num > max_slots) {
+		log_error(ls, "num_slots %d exceeds max_slots %d",
+			  num, max_slots);
+		kfree(array);
+		return -1;
+	}
+
+	*gen_out = gen;
+	*slots_out = array;
+	*slots_size = array_size;
+	*num_slots = num;
+	return 0;
+}
+
 static void add_ordered_member(struct dlm_ls *ls, struct dlm_member *new)
 {
 	struct dlm_member *memb = NULL;
@@ -43,59 +317,51 @@
 	}
 }
 
-static int dlm_add_member(struct dlm_ls *ls, int nodeid)
+static int dlm_add_member(struct dlm_ls *ls, struct dlm_config_node *node)
 {
 	struct dlm_member *memb;
-	int w, error;
+	int error;
 
 	memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
 	if (!memb)
 		return -ENOMEM;
 
-	w = dlm_node_weight(ls->ls_name, nodeid);
-	if (w < 0) {
-		kfree(memb);
-		return w;
-	}
-
-	error = dlm_lowcomms_connect_node(nodeid);
+	error = dlm_lowcomms_connect_node(node->nodeid);
 	if (error < 0) {
 		kfree(memb);
 		return error;
 	}
 
-	memb->nodeid = nodeid;
-	memb->weight = w;
+	memb->nodeid = node->nodeid;
+	memb->weight = node->weight;
+	memb->comm_seq = node->comm_seq;
 	add_ordered_member(ls, memb);
 	ls->ls_num_nodes++;
 	return 0;
 }
 
-static void dlm_remove_member(struct dlm_ls *ls, struct dlm_member *memb)
+static struct dlm_member *find_memb(struct list_head *head, int nodeid)
 {
-	list_move(&memb->list, &ls->ls_nodes_gone);
-	ls->ls_num_nodes--;
+	struct dlm_member *memb;
+
+	list_for_each_entry(memb, head, list) {
+		if (memb->nodeid == nodeid)
+			return memb;
+	}
+	return NULL;
 }
 
 int dlm_is_member(struct dlm_ls *ls, int nodeid)
 {
-	struct dlm_member *memb;
-
-	list_for_each_entry(memb, &ls->ls_nodes, list) {
-		if (memb->nodeid == nodeid)
-			return 1;
-	}
+	if (find_memb(&ls->ls_nodes, nodeid))
+		return 1;
 	return 0;
 }
 
 int dlm_is_removed(struct dlm_ls *ls, int nodeid)
 {
-	struct dlm_member *memb;
-
-	list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
-		if (memb->nodeid == nodeid)
-			return 1;
-	}
+	if (find_memb(&ls->ls_nodes_gone, nodeid))
+		return 1;
 	return 0;
 }
 
@@ -176,7 +442,7 @@
 		error = dlm_recovery_stopped(ls);
 		if (error)
 			break;
-		error = dlm_rcom_status(ls, memb->nodeid);
+		error = dlm_rcom_status(ls, memb->nodeid, 0);
 		if (error)
 			break;
 	}
@@ -186,10 +452,88 @@
 	return error;
 }
 
+static void dlm_lsop_recover_prep(struct dlm_ls *ls)
+{
+	if (!ls->ls_ops || !ls->ls_ops->recover_prep)
+		return;
+	ls->ls_ops->recover_prep(ls->ls_ops_arg);
+}
+
+static void dlm_lsop_recover_slot(struct dlm_ls *ls, struct dlm_member *memb)
+{
+	struct dlm_slot slot;
+	uint32_t seq;
+	int error;
+
+	if (!ls->ls_ops || !ls->ls_ops->recover_slot)
+		return;
+
+	/* if there is no comms connection with this node
+	   or the present comms connection is newer
+	   than the one when this member was added, then
+	   we consider the node to have failed (versus
+	   being removed due to dlm_release_lockspace) */
+
+	error = dlm_comm_seq(memb->nodeid, &seq);
+
+	if (!error && seq == memb->comm_seq)
+		return;
+
+	slot.nodeid = memb->nodeid;
+	slot.slot = memb->slot;
+
+	ls->ls_ops->recover_slot(ls->ls_ops_arg, &slot);
+}
+
+void dlm_lsop_recover_done(struct dlm_ls *ls)
+{
+	struct dlm_member *memb;
+	struct dlm_slot *slots;
+	int i, num;
+
+	if (!ls->ls_ops || !ls->ls_ops->recover_done)
+		return;
+
+	num = ls->ls_num_nodes;
+
+	slots = kzalloc(num * sizeof(struct dlm_slot), GFP_KERNEL);
+	if (!slots)
+		return;
+
+	i = 0;
+	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		if (i == num) {
+			log_error(ls, "dlm_lsop_recover_done bad num %d", num);
+			goto out;
+		}
+		slots[i].nodeid = memb->nodeid;
+		slots[i].slot = memb->slot;
+		i++;
+	}
+
+	ls->ls_ops->recover_done(ls->ls_ops_arg, slots, num,
+				 ls->ls_slot, ls->ls_generation);
+ out:
+	kfree(slots);
+}
+
+static struct dlm_config_node *find_config_node(struct dlm_recover *rv,
+						int nodeid)
+{
+	int i;
+
+	for (i = 0; i < rv->nodes_count; i++) {
+		if (rv->nodes[i].nodeid == nodeid)
+			return &rv->nodes[i];
+	}
+	return NULL;
+}
+
 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv, int *neg_out)
 {
 	struct dlm_member *memb, *safe;
-	int i, error, found, pos = 0, neg = 0, low = -1;
+	struct dlm_config_node *node;
+	int i, error, neg = 0, low = -1;
 
 	/* previously removed members that we've not finished removing need to
 	   count as a negative change so the "neg" recovery steps will happen */
@@ -202,46 +546,32 @@
 	/* move departed members from ls_nodes to ls_nodes_gone */
 
 	list_for_each_entry_safe(memb, safe, &ls->ls_nodes, list) {
-		found = 0;
-		for (i = 0; i < rv->node_count; i++) {
-			if (memb->nodeid == rv->nodeids[i]) {
-				found = 1;
-				break;
-			}
-		}
-
-		if (!found) {
-			neg++;
-			dlm_remove_member(ls, memb);
-			log_debug(ls, "remove member %d", memb->nodeid);
-		}
-	}
-
-	/* Add an entry to ls_nodes_gone for members that were removed and
-	   then added again, so that previous state for these nodes will be
-	   cleared during recovery. */
-
-	for (i = 0; i < rv->new_count; i++) {
-		if (!dlm_is_member(ls, rv->new[i]))
+		node = find_config_node(rv, memb->nodeid);
+		if (node && !node->new)
 			continue;
-		log_debug(ls, "new nodeid %d is a re-added member", rv->new[i]);
 
-		memb = kzalloc(sizeof(struct dlm_member), GFP_NOFS);
-		if (!memb)
-			return -ENOMEM;
-		memb->nodeid = rv->new[i];
-		list_add_tail(&memb->list, &ls->ls_nodes_gone);
+		if (!node) {
+			log_debug(ls, "remove member %d", memb->nodeid);
+		} else {
+			/* removed and re-added */
+			log_debug(ls, "remove member %d comm_seq %u %u",
+				  memb->nodeid, memb->comm_seq, node->comm_seq);
+		}
+
 		neg++;
+		list_move(&memb->list, &ls->ls_nodes_gone);
+		ls->ls_num_nodes--;
+		dlm_lsop_recover_slot(ls, memb);
 	}
 
 	/* add new members to ls_nodes */
 
-	for (i = 0; i < rv->node_count; i++) {
-		if (dlm_is_member(ls, rv->nodeids[i]))
+	for (i = 0; i < rv->nodes_count; i++) {
+		node = &rv->nodes[i];
+		if (dlm_is_member(ls, node->nodeid))
 			continue;
-		dlm_add_member(ls, rv->nodeids[i]);
-		pos++;
-		log_debug(ls, "add member %d", rv->nodeids[i]);
+		dlm_add_member(ls, node);
+		log_debug(ls, "add member %d", node->nodeid);
 	}
 
 	list_for_each_entry(memb, &ls->ls_nodes, list) {
@@ -251,7 +581,6 @@
 	ls->ls_low_nodeid = low;
 
 	make_member_array(ls);
-	dlm_set_recover_status(ls, DLM_RS_NODES);
 	*neg_out = neg;
 
 	error = ping_members(ls);
@@ -261,12 +590,8 @@
 		ls->ls_members_result = error;
 		complete(&ls->ls_members_done);
 	}
-	if (error)
-		goto out;
 
-	error = dlm_recover_members_wait(ls);
- out:
-	log_debug(ls, "total members %d error %d", ls->ls_num_nodes, error);
+	log_debug(ls, "dlm_recover_members %d nodes", ls->ls_num_nodes);
 	return error;
 }
 
@@ -327,26 +652,35 @@
 	 */
 
 	dlm_recoverd_suspend(ls);
+
+	spin_lock(&ls->ls_recover_lock);
+	kfree(ls->ls_slots);
+	ls->ls_slots = NULL;
+	ls->ls_num_slots = 0;
+	ls->ls_slots_size = 0;
 	ls->ls_recover_status = 0;
+	spin_unlock(&ls->ls_recover_lock);
+
 	dlm_recoverd_resume(ls);
 
 	if (!ls->ls_recover_begin)
 		ls->ls_recover_begin = jiffies;
+
+	dlm_lsop_recover_prep(ls);
 	return 0;
 }
 
 int dlm_ls_start(struct dlm_ls *ls)
 {
 	struct dlm_recover *rv = NULL, *rv_old;
-	int *ids = NULL, *new = NULL;
-	int error, ids_count = 0, new_count = 0;
+	struct dlm_config_node *nodes;
+	int error, count;
 
 	rv = kzalloc(sizeof(struct dlm_recover), GFP_NOFS);
 	if (!rv)
 		return -ENOMEM;
 
-	error = dlm_nodeid_list(ls->ls_name, &ids, &ids_count,
-				&new, &new_count);
+	error = dlm_config_nodes(ls->ls_name, &nodes, &count);
 	if (error < 0)
 		goto fail;
 
@@ -361,10 +695,8 @@
 		goto fail;
 	}
 
-	rv->nodeids = ids;
-	rv->node_count = ids_count;
-	rv->new = new;
-	rv->new_count = new_count;
+	rv->nodes = nodes;
+	rv->nodes_count = count;
 	rv->seq = ++ls->ls_recover_seq;
 	rv_old = ls->ls_recover_args;
 	ls->ls_recover_args = rv;
@@ -372,9 +704,8 @@
 
 	if (rv_old) {
 		log_error(ls, "unused recovery %llx %d",
-			  (unsigned long long)rv_old->seq, rv_old->node_count);
-		kfree(rv_old->nodeids);
-		kfree(rv_old->new);
+			  (unsigned long long)rv_old->seq, rv_old->nodes_count);
+		kfree(rv_old->nodes);
 		kfree(rv_old);
 	}
 
@@ -383,8 +714,7 @@
 
  fail:
 	kfree(rv);
-	kfree(ids);
-	kfree(new);
+	kfree(nodes);
 	return error;
 }
 
diff --git a/fs/dlm/member.h b/fs/dlm/member.h
index 7a26fca..3deb706 100644
--- a/fs/dlm/member.h
+++ b/fs/dlm/member.h
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005-2008 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2011 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -20,6 +20,14 @@
 int dlm_recover_members(struct dlm_ls *ls, struct dlm_recover *rv,int *neg_out);
 int dlm_is_removed(struct dlm_ls *ls, int nodeid);
 int dlm_is_member(struct dlm_ls *ls, int nodeid);
+int dlm_slots_version(struct dlm_header *h);
+void dlm_slot_save(struct dlm_ls *ls, struct dlm_rcom *rc,
+		   struct dlm_member *memb);
+void dlm_slots_copy_out(struct dlm_ls *ls, struct dlm_rcom *rc);
+int dlm_slots_copy_in(struct dlm_ls *ls);
+int dlm_slots_assign(struct dlm_ls *ls, int *num_slots, int *slots_size,
+		     struct dlm_slot **slots_out, uint32_t *gen_out);
+void dlm_lsop_recover_done(struct dlm_ls *ls);
 
 #endif                          /* __MEMBER_DOT_H__ */
 
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index f10a50f..ac5c616 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -23,6 +23,7 @@
 #include "memory.h"
 #include "lock.h"
 #include "util.h"
+#include "member.h"
 
 
 static int rcom_response(struct dlm_ls *ls)
@@ -72,20 +73,30 @@
 	dlm_lowcomms_commit_buffer(mh);
 }
 
+static void set_rcom_status(struct dlm_ls *ls, struct rcom_status *rs,
+			    uint32_t flags)
+{
+	rs->rs_flags = cpu_to_le32(flags);
+}
+
 /* When replying to a status request, a node also sends back its
    configuration values.  The requesting node then checks that the remote
    node is configured the same way as itself. */
 
-static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
+static void set_rcom_config(struct dlm_ls *ls, struct rcom_config *rf,
+			    uint32_t num_slots)
 {
 	rf->rf_lvblen = cpu_to_le32(ls->ls_lvblen);
 	rf->rf_lsflags = cpu_to_le32(ls->ls_exflags);
+
+	rf->rf_our_slot = cpu_to_le16(ls->ls_slot);
+	rf->rf_num_slots = cpu_to_le16(num_slots);
+	rf->rf_generation =  cpu_to_le32(ls->ls_generation);
 }
 
-static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
+static int check_rcom_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 {
 	struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
-	size_t conf_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_config);
 
 	if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
 		log_error(ls, "version mismatch: %x nodeid %d: %x",
@@ -94,12 +105,6 @@
 		return -EPROTO;
 	}
 
-	if (rc->rc_header.h_length < conf_size) {
-		log_error(ls, "config too short: %d nodeid %d",
-			  rc->rc_header.h_length, nodeid);
-		return -EPROTO;
-	}
-
 	if (le32_to_cpu(rf->rf_lvblen) != ls->ls_lvblen ||
 	    le32_to_cpu(rf->rf_lsflags) != ls->ls_exflags) {
 		log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
@@ -127,7 +132,18 @@
 	spin_unlock(&ls->ls_rcom_spin);
 }
 
-int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
+/*
+ * low nodeid gathers one slot value at a time from each node.
+ * it sets need_slots=0, and saves rf_our_slot returned from each
+ * rcom_config.
+ *
+ * other nodes gather all slot values at once from the low nodeid.
+ * they set need_slots=1, and ignore the rf_our_slot returned from each
+ * rcom_config.  they use the rf_num_slots returned from the low
+ * node's rcom_config.
+ */
+
+int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags)
 {
 	struct dlm_rcom *rc;
 	struct dlm_mhandle *mh;
@@ -141,10 +157,13 @@
 		goto out;
 	}
 
-	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS, 0, &rc, &mh);
+	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS,
+			    sizeof(struct rcom_status), &rc, &mh);
 	if (error)
 		goto out;
 
+	set_rcom_status(ls, (struct rcom_status *)rc->rc_buf, status_flags);
+
 	allow_sync_reply(ls, &rc->rc_id);
 	memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 
@@ -161,8 +180,11 @@
 		/* we pretend the remote lockspace exists with 0 status */
 		log_debug(ls, "remote node %d not ready", nodeid);
 		rc->rc_result = 0;
-	} else
-		error = check_config(ls, rc, nodeid);
+		error = 0;
+	} else {
+		error = check_rcom_config(ls, rc, nodeid);
+	}
+
 	/* the caller looks at rc_result for the remote recovery status */
  out:
 	return error;
@@ -172,17 +194,60 @@
 {
 	struct dlm_rcom *rc;
 	struct dlm_mhandle *mh;
-	int error, nodeid = rc_in->rc_header.h_nodeid;
+	struct rcom_status *rs;
+	uint32_t status;
+	int nodeid = rc_in->rc_header.h_nodeid;
+	int len = sizeof(struct rcom_config);
+	int num_slots = 0;
+	int error;
 
+	if (!dlm_slots_version(&rc_in->rc_header)) {
+		status = dlm_recover_status(ls);
+		goto do_create;
+	}
+
+	rs = (struct rcom_status *)rc_in->rc_buf;
+
+	if (!(rs->rs_flags & DLM_RSF_NEED_SLOTS)) {
+		status = dlm_recover_status(ls);
+		goto do_create;
+	}
+
+	spin_lock(&ls->ls_recover_lock);
+	status = ls->ls_recover_status;
+	num_slots = ls->ls_num_slots;
+	spin_unlock(&ls->ls_recover_lock);
+	len += num_slots * sizeof(struct rcom_slot);
+
+ do_create:
 	error = create_rcom(ls, nodeid, DLM_RCOM_STATUS_REPLY,
-			    sizeof(struct rcom_config), &rc, &mh);
+			    len, &rc, &mh);
 	if (error)
 		return;
+
 	rc->rc_id = rc_in->rc_id;
 	rc->rc_seq_reply = rc_in->rc_seq;
-	rc->rc_result = dlm_recover_status(ls);
-	make_config(ls, (struct rcom_config *) rc->rc_buf);
+	rc->rc_result = status;
 
+	set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, num_slots);
+
+	if (!num_slots)
+		goto do_send;
+
+	spin_lock(&ls->ls_recover_lock);
+	if (ls->ls_num_slots != num_slots) {
+		spin_unlock(&ls->ls_recover_lock);
+		log_debug(ls, "receive_rcom_status num_slots %d to %d",
+			  num_slots, ls->ls_num_slots);
+		rc->rc_result = 0;
+		set_rcom_config(ls, (struct rcom_config *)rc->rc_buf, 0);
+		goto do_send;
+	}
+
+	dlm_slots_copy_out(ls, rc);
+	spin_unlock(&ls->ls_recover_lock);
+
+ do_send:
 	send_rcom(ls, mh, rc);
 }
 
diff --git a/fs/dlm/rcom.h b/fs/dlm/rcom.h
index b09abd2..206723a 100644
--- a/fs/dlm/rcom.h
+++ b/fs/dlm/rcom.h
@@ -14,7 +14,7 @@
 #ifndef __RCOM_DOT_H__
 #define __RCOM_DOT_H__
 
-int dlm_rcom_status(struct dlm_ls *ls, int nodeid);
+int dlm_rcom_status(struct dlm_ls *ls, int nodeid, uint32_t status_flags);
 int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name,int last_len);
 int dlm_send_rcom_lookup(struct dlm_rsb *r, int dir_nodeid);
 int dlm_send_rcom_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
diff --git a/fs/dlm/recover.c b/fs/dlm/recover.c
index 1463823..34d5adf1f 100644
--- a/fs/dlm/recover.c
+++ b/fs/dlm/recover.c
@@ -85,14 +85,20 @@
 	return status;
 }
 
+static void _set_recover_status(struct dlm_ls *ls, uint32_t status)
+{
+	ls->ls_recover_status |= status;
+}
+
 void dlm_set_recover_status(struct dlm_ls *ls, uint32_t status)
 {
 	spin_lock(&ls->ls_recover_lock);
-	ls->ls_recover_status |= status;
+	_set_recover_status(ls, status);
 	spin_unlock(&ls->ls_recover_lock);
 }
 
-static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status)
+static int wait_status_all(struct dlm_ls *ls, uint32_t wait_status,
+			   int save_slots)
 {
 	struct dlm_rcom *rc = ls->ls_recover_buf;
 	struct dlm_member *memb;
@@ -106,10 +112,13 @@
 				goto out;
 			}
 
-			error = dlm_rcom_status(ls, memb->nodeid);
+			error = dlm_rcom_status(ls, memb->nodeid, 0);
 			if (error)
 				goto out;
 
+			if (save_slots)
+				dlm_slot_save(ls, rc, memb);
+
 			if (rc->rc_result & wait_status)
 				break;
 			if (delay < 1000)
@@ -121,7 +130,8 @@
 	return error;
 }
 
-static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status)
+static int wait_status_low(struct dlm_ls *ls, uint32_t wait_status,
+			   uint32_t status_flags)
 {
 	struct dlm_rcom *rc = ls->ls_recover_buf;
 	int error = 0, delay = 0, nodeid = ls->ls_low_nodeid;
@@ -132,7 +142,7 @@
 			goto out;
 		}
 
-		error = dlm_rcom_status(ls, nodeid);
+		error = dlm_rcom_status(ls, nodeid, status_flags);
 		if (error)
 			break;
 
@@ -152,18 +162,56 @@
 	int error;
 
 	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
-		error = wait_status_all(ls, status);
+		error = wait_status_all(ls, status, 0);
 		if (!error)
 			dlm_set_recover_status(ls, status_all);
 	} else
-		error = wait_status_low(ls, status_all);
+		error = wait_status_low(ls, status_all, 0);
 
 	return error;
 }
 
 int dlm_recover_members_wait(struct dlm_ls *ls)
 {
-	return wait_status(ls, DLM_RS_NODES);
+	struct dlm_member *memb;
+	struct dlm_slot *slots;
+	int num_slots, slots_size;
+	int error, rv;
+	uint32_t gen;
+
+	list_for_each_entry(memb, &ls->ls_nodes, list) {
+		memb->slot = -1;
+		memb->generation = 0;
+	}
+
+	if (ls->ls_low_nodeid == dlm_our_nodeid()) {
+		error = wait_status_all(ls, DLM_RS_NODES, 1);
+		if (error)
+			goto out;
+
+		/* slots array is sparse, slots_size may be > num_slots */
+
+		rv = dlm_slots_assign(ls, &num_slots, &slots_size, &slots, &gen);
+		if (!rv) {
+			spin_lock(&ls->ls_recover_lock);
+			_set_recover_status(ls, DLM_RS_NODES_ALL);
+			ls->ls_num_slots = num_slots;
+			ls->ls_slots_size = slots_size;
+			ls->ls_slots = slots;
+			ls->ls_generation = gen;
+			spin_unlock(&ls->ls_recover_lock);
+		} else {
+			dlm_set_recover_status(ls, DLM_RS_NODES_ALL);
+		}
+	} else {
+		error = wait_status_low(ls, DLM_RS_NODES_ALL, DLM_RSF_NEED_SLOTS);
+		if (error)
+			goto out;
+
+		dlm_slots_copy_in(ls);
+	}
+ out:
+	return error;
 }
 
 int dlm_recover_directory_wait(struct dlm_ls *ls)
@@ -542,8 +590,6 @@
  out:
 	if (error)
 		recover_list_clear(ls);
-	else
-		dlm_set_recover_status(ls, DLM_RS_LOCKS);
 	return error;
 }
 
@@ -715,6 +761,7 @@
 
 int dlm_create_root_list(struct dlm_ls *ls)
 {
+	struct rb_node *n;
 	struct dlm_rsb *r;
 	int i, error = 0;
 
@@ -727,7 +774,8 @@
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].list, res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
 		}
@@ -741,7 +789,8 @@
 			continue;
 		}
 
-		list_for_each_entry(r, &ls->ls_rsbtbl[i].toss, res_hashchain) {
+		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = rb_next(n)) {
+			r = rb_entry(n, struct dlm_rsb, res_hashnode);
 			list_add(&r->res_root_list, &ls->ls_root_list);
 			dlm_hold_rsb(r);
 		}
@@ -771,16 +820,18 @@
 
 void dlm_clear_toss_list(struct dlm_ls *ls)
 {
-	struct dlm_rsb *r, *safe;
+	struct rb_node *n, *next;
+	struct dlm_rsb *rsb;
 	int i;
 
 	for (i = 0; i < ls->ls_rsbtbl_size; i++) {
 		spin_lock(&ls->ls_rsbtbl[i].lock);
-		list_for_each_entry_safe(r, safe, &ls->ls_rsbtbl[i].toss,
-					 res_hashchain) {
-			if (dlm_no_directory(ls) || !is_master(r)) {
-				list_del(&r->res_hashchain);
-				dlm_free_rsb(r);
+		for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
+			next = rb_next(n);;
+			rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
+			if (dlm_no_directory(ls) || !is_master(rsb)) {
+				rb_erase(n, &ls->ls_rsbtbl[i].toss);
+				dlm_free_rsb(rsb);
 			}
 		}
 		spin_unlock(&ls->ls_rsbtbl[i].lock);
diff --git a/fs/dlm/recoverd.c b/fs/dlm/recoverd.c
index 774da3c..3780caf 100644
--- a/fs/dlm/recoverd.c
+++ b/fs/dlm/recoverd.c
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -54,7 +54,7 @@
 	unsigned long start;
 	int error, neg = 0;
 
-	log_debug(ls, "recover %llx", (unsigned long long)rv->seq);
+	log_debug(ls, "dlm_recover %llx", (unsigned long long)rv->seq);
 
 	mutex_lock(&ls->ls_recoverd_active);
 
@@ -76,14 +76,22 @@
 
 	/*
 	 * Add or remove nodes from the lockspace's ls_nodes list.
-	 * Also waits for all nodes to complete dlm_recover_members.
 	 */
 
 	error = dlm_recover_members(ls, rv, &neg);
 	if (error) {
-		log_debug(ls, "recover_members failed %d", error);
+		log_debug(ls, "dlm_recover_members error %d", error);
 		goto fail;
 	}
+
+	dlm_set_recover_status(ls, DLM_RS_NODES);
+
+	error = dlm_recover_members_wait(ls);
+	if (error) {
+		log_debug(ls, "dlm_recover_members_wait error %d", error);
+		goto fail;
+	}
+
 	start = jiffies;
 
 	/*
@@ -93,17 +101,15 @@
 
 	error = dlm_recover_directory(ls);
 	if (error) {
-		log_debug(ls, "recover_directory failed %d", error);
+		log_debug(ls, "dlm_recover_directory error %d", error);
 		goto fail;
 	}
 
-	/*
-	 * Wait for all nodes to complete directory rebuild.
-	 */
+	dlm_set_recover_status(ls, DLM_RS_DIR);
 
 	error = dlm_recover_directory_wait(ls);
 	if (error) {
-		log_debug(ls, "recover_directory_wait failed %d", error);
+		log_debug(ls, "dlm_recover_directory_wait error %d", error);
 		goto fail;
 	}
 
@@ -133,7 +139,7 @@
 
 		error = dlm_recover_masters(ls);
 		if (error) {
-			log_debug(ls, "recover_masters failed %d", error);
+			log_debug(ls, "dlm_recover_masters error %d", error);
 			goto fail;
 		}
 
@@ -143,13 +149,15 @@
 
 		error = dlm_recover_locks(ls);
 		if (error) {
-			log_debug(ls, "recover_locks failed %d", error);
+			log_debug(ls, "dlm_recover_locks error %d", error);
 			goto fail;
 		}
 
+		dlm_set_recover_status(ls, DLM_RS_LOCKS);
+
 		error = dlm_recover_locks_wait(ls);
 		if (error) {
-			log_debug(ls, "recover_locks_wait failed %d", error);
+			log_debug(ls, "dlm_recover_locks_wait error %d", error);
 			goto fail;
 		}
 
@@ -170,7 +178,7 @@
 
 		error = dlm_recover_locks_wait(ls);
 		if (error) {
-			log_debug(ls, "recover_locks_wait failed %d", error);
+			log_debug(ls, "dlm_recover_locks_wait error %d", error);
 			goto fail;
 		}
 	}
@@ -186,9 +194,10 @@
 	dlm_purge_requestqueue(ls);
 
 	dlm_set_recover_status(ls, DLM_RS_DONE);
+
 	error = dlm_recover_done_wait(ls);
 	if (error) {
-		log_debug(ls, "recover_done_wait failed %d", error);
+		log_debug(ls, "dlm_recover_done_wait error %d", error);
 		goto fail;
 	}
 
@@ -200,34 +209,35 @@
 
 	error = enable_locking(ls, rv->seq);
 	if (error) {
-		log_debug(ls, "enable_locking failed %d", error);
+		log_debug(ls, "enable_locking error %d", error);
 		goto fail;
 	}
 
 	error = dlm_process_requestqueue(ls);
 	if (error) {
-		log_debug(ls, "process_requestqueue failed %d", error);
+		log_debug(ls, "dlm_process_requestqueue error %d", error);
 		goto fail;
 	}
 
 	error = dlm_recover_waiters_post(ls);
 	if (error) {
-		log_debug(ls, "recover_waiters_post failed %d", error);
+		log_debug(ls, "dlm_recover_waiters_post error %d", error);
 		goto fail;
 	}
 
 	dlm_grant_after_purge(ls);
 
-	log_debug(ls, "recover %llx done: %u ms",
-		  (unsigned long long)rv->seq,
+	log_debug(ls, "dlm_recover %llx generation %u done: %u ms",
+		  (unsigned long long)rv->seq, ls->ls_generation,
 		  jiffies_to_msecs(jiffies - start));
 	mutex_unlock(&ls->ls_recoverd_active);
 
+	dlm_lsop_recover_done(ls);
 	return 0;
 
  fail:
 	dlm_release_root_list(ls);
-	log_debug(ls, "recover %llx error %d",
+	log_debug(ls, "dlm_recover %llx error %d",
 		  (unsigned long long)rv->seq, error);
 	mutex_unlock(&ls->ls_recoverd_active);
 	return error;
@@ -250,8 +260,7 @@
 
 	if (rv) {
 		ls_recover(ls, rv);
-		kfree(rv->nodeids);
-		kfree(rv->new);
+		kfree(rv->nodes);
 		kfree(rv);
 	}
 }
diff --git a/fs/dlm/user.c b/fs/dlm/user.c
index d8ea607..eb4ed9b 100644
--- a/fs/dlm/user.c
+++ b/fs/dlm/user.c
@@ -392,8 +392,9 @@
 	if (!capable(CAP_SYS_ADMIN))
 		return -EPERM;
 
-	error = dlm_new_lockspace(params->name, strlen(params->name),
-				  &lockspace, params->flags, DLM_USER_LVB_LEN);
+	error = dlm_new_lockspace(params->name, NULL, params->flags,
+				  DLM_USER_LVB_LEN, NULL, NULL, NULL,
+				  &lockspace);
 	if (error)
 		return error;
 
diff --git a/fs/gfs2/lock_dlm.c b/fs/gfs2/lock_dlm.c
index 98c80d8..ce85b62 100644
--- a/fs/gfs2/lock_dlm.c
+++ b/fs/gfs2/lock_dlm.c
@@ -195,10 +195,10 @@
 		return -EINVAL;
 	}
 
-	error = dlm_new_lockspace(fsname, strlen(fsname), &ls->ls_dlm,
+	error = dlm_new_lockspace(fsname, NULL, 
 				  DLM_LSFL_FS | DLM_LSFL_NEWEXCL |
 				  (ls->ls_nodir ? DLM_LSFL_NODIR : 0),
-				  GDLM_LVB_SIZE);
+				  GDLM_LVB_SIZE, NULL, NULL, NULL, &ls->ls_dlm);
 	if (error)
 		printk(KERN_ERR "dlm_new_lockspace error %d", error);
 
diff --git a/fs/ocfs2/stack_user.c b/fs/ocfs2/stack_user.c
index a5ebe42..286edf1 100644
--- a/fs/ocfs2/stack_user.c
+++ b/fs/ocfs2/stack_user.c
@@ -827,8 +827,8 @@
 		goto out;
 	}
 
-	rc = dlm_new_lockspace(conn->cc_name, strlen(conn->cc_name),
-			       &fsdlm, DLM_LSFL_FS, DLM_LVB_LEN);
+	rc = dlm_new_lockspace(conn->cc_name, NULL, DLM_LSFL_FS, DLM_LVB_LEN,
+			       NULL, NULL, NULL, &fsdlm);
 	if (rc) {
 		ocfs2_live_connection_drop(control);
 		goto out;
diff --git a/include/linux/dlm.h b/include/linux/dlm.h
index d4e02f5..6c7f6e9 100644
--- a/include/linux/dlm.h
+++ b/include/linux/dlm.h
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2008 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2011 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -74,15 +74,76 @@
 
 #ifdef __KERNEL__
 
+struct dlm_slot {
+	int nodeid; /* 1 to MAX_INT */
+	int slot;   /* 1 to MAX_INT */
+};
+
+/*
+ * recover_prep: called before the dlm begins lock recovery.
+ *   Notfies lockspace user that locks from failed members will be granted.
+ * recover_slot: called after recover_prep and before recover_done.
+ *   Identifies a failed lockspace member.
+ * recover_done: called after the dlm completes lock recovery.
+ *   Identifies lockspace members and lockspace generation number.
+ */
+
+struct dlm_lockspace_ops {
+	void (*recover_prep) (void *ops_arg);
+	void (*recover_slot) (void *ops_arg, struct dlm_slot *slot);
+	void (*recover_done) (void *ops_arg, struct dlm_slot *slots,
+			      int num_slots, int our_slot, uint32_t generation);
+};
+
 /*
  * dlm_new_lockspace
  *
- * Starts a lockspace with the given name.  If the named lockspace exists in
- * the cluster, the calling node joins it.
+ * Create/join a lockspace.
+ *
+ * name: lockspace name, null terminated, up to DLM_LOCKSPACE_LEN (not
+ *   including terminating null).
+ *
+ * cluster: cluster name, null terminated, up to DLM_LOCKSPACE_LEN (not
+ *   including terminating null).  Optional.  When cluster is null, it
+ *   is not used.  When set, dlm_new_lockspace() returns -EBADR if cluster
+ *   is not equal to the dlm cluster name.
+ *
+ * flags:
+ * DLM_LSFL_NODIR
+ *   The dlm should not use a resource directory, but statically assign
+ *   resource mastery to nodes based on the name hash that is otherwise
+ *   used to select the directory node.  Must be the same on all nodes.
+ * DLM_LSFL_TIMEWARN
+ *   The dlm should emit netlink messages if locks have been waiting
+ *   for a configurable amount of time.  (Unused.)
+ * DLM_LSFL_FS
+ *   The lockspace user is in the kernel (i.e. filesystem).  Enables
+ *   direct bast/cast callbacks.
+ * DLM_LSFL_NEWEXCL
+ *   dlm_new_lockspace() should return -EEXIST if the lockspace exists.
+ *
+ * lvblen: length of lvb in bytes.  Must be multiple of 8.
+ *   dlm_new_lockspace() returns an error if this does not match
+ *   what other nodes are using.
+ *
+ * ops: callbacks that indicate lockspace recovery points so the
+ *   caller can coordinate its recovery and know lockspace members.
+ *   This is only used by the initial dlm_new_lockspace() call.
+ *   Optional.
+ *
+ * ops_arg: arg for ops callbacks.
+ *
+ * ops_result: tells caller if the ops callbacks (if provided) will
+ *   be used or not.  0: will be used, -EXXX will not be used.
+ *   -EOPNOTSUPP: the dlm does not have recovery_callbacks enabled.
+ *
+ * lockspace: handle for dlm functions
  */
 
-int dlm_new_lockspace(const char *name, int namelen,
-		      dlm_lockspace_t **lockspace, uint32_t flags, int lvblen);
+int dlm_new_lockspace(const char *name, const char *cluster,
+		      uint32_t flags, int lvblen,
+		      const struct dlm_lockspace_ops *ops, void *ops_arg,
+		      int *ops_result, dlm_lockspace_t **lockspace);
 
 /*
  * dlm_release_lockspace