dlm: use rsbtbl as resource directory
Remove the dir hash table (dirtbl), and use
the rsb hash table (rsbtbl) as the resource
directory. It has always been an unnecessary
duplication of information.
This improves efficiency by using a single rsbtbl
lookup in many cases where both rsbtbl and dirtbl
lookups were needed previously.
This eliminates the need to handle cases of rsbtbl
and dirtbl being out of sync.
In many cases there will be memory savings because
the dir hash table no longer exists.
Signed-off-by: David Teigland <teigland@redhat.com>
diff --git a/fs/dlm/rcom.c b/fs/dlm/rcom.c
index 64d3e2b..c8c298d 100644
--- a/fs/dlm/rcom.c
+++ b/fs/dlm/rcom.c
@@ -23,8 +23,6 @@
#include "memory.h"
#include "lock.h"
#include "util.h"
-#include "member.h"
-
static int rcom_response(struct dlm_ls *ls)
{
@@ -275,19 +273,9 @@
struct dlm_rcom *rc;
struct dlm_mhandle *mh;
int error = 0;
- int max_size = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
ls->ls_recover_nodeid = nodeid;
- if (nodeid == dlm_our_nodeid()) {
- ls->ls_recover_buf->rc_header.h_length =
- dlm_config.ci_buffer_size;
- dlm_copy_master_names(ls, last_name, last_len,
- ls->ls_recover_buf->rc_buf,
- max_size, nodeid);
- goto out;
- }
-
error = create_rcom(ls, nodeid, DLM_RCOM_NAMES, last_len, &rc, &mh);
if (error)
goto out;
@@ -344,6 +332,25 @@
return error;
}
+int dlm_send_rcom_lookup_dump(struct dlm_rsb *r, int to_nodeid)
+{
+ struct dlm_rcom *rc;
+ struct dlm_mhandle *mh;
+ struct dlm_ls *ls = r->res_ls;
+ int error;
+
+ error = create_rcom(ls, to_nodeid, DLM_RCOM_LOOKUP, r->res_length,
+ &rc, &mh);
+ if (error)
+ goto out;
+ memcpy(rc->rc_buf, r->res_name, r->res_length);
+ rc->rc_id = 0xFFFFFFFF;
+
+ send_rcom(ls, mh, rc);
+ out:
+ return error;
+}
+
static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
{
struct dlm_rcom *rc;
@@ -355,7 +362,14 @@
if (error)
return;
- error = dlm_dir_lookup(ls, nodeid, rc_in->rc_buf, len, &ret_nodeid);
+ if (rc_in->rc_id == 0xFFFFFFFF) {
+ log_error(ls, "receive_rcom_lookup dump from %d", nodeid);
+ dlm_dump_rsb_name(ls, rc_in->rc_buf, len);
+ return;
+ }
+
+ error = dlm_master_lookup(ls, nodeid, rc_in->rc_buf, len,
+ DLM_LU_RECOVER_MASTER, &ret_nodeid, NULL);
if (error)
ret_nodeid = error;
rc->rc_result = ret_nodeid;
@@ -486,17 +500,76 @@
return 0;
}
+/*
+ * Ignore messages for stage Y before we set
+ * recover_status bit for stage X:
+ *
+ * recover_status = 0
+ *
+ * dlm_recover_members()
+ * - send nothing
+ * - recv nothing
+ * - ignore NAMES, NAMES_REPLY
+ * - ignore LOOKUP, LOOKUP_REPLY
+ * - ignore LOCK, LOCK_REPLY
+ *
+ * recover_status |= NODES
+ *
+ * dlm_recover_members_wait()
+ *
+ * dlm_recover_directory()
+ * - send NAMES
+ * - recv NAMES_REPLY
+ * - ignore LOOKUP, LOOKUP_REPLY
+ * - ignore LOCK, LOCK_REPLY
+ *
+ * recover_status |= DIR
+ *
+ * dlm_recover_directory_wait()
+ *
+ * dlm_recover_masters()
+ * - send LOOKUP
+ * - recv LOOKUP_REPLY
+ *
+ * dlm_recover_locks()
+ * - send LOCKS
+ * - recv LOCKS_REPLY
+ *
+ * recover_status |= LOCKS
+ *
+ * dlm_recover_locks_wait()
+ *
+ * recover_status |= DONE
+ */
+
/* Called by dlm_recv; corresponds to dlm_receive_message() but special
recovery-only comms are sent through here. */
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{
int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
- int stop, reply = 0, lock = 0;
+ int stop, reply = 0, names = 0, lookup = 0, lock = 0;
uint32_t status;
uint64_t seq;
switch (rc->rc_type) {
+ case DLM_RCOM_STATUS_REPLY:
+ reply = 1;
+ break;
+ case DLM_RCOM_NAMES:
+ names = 1;
+ break;
+ case DLM_RCOM_NAMES_REPLY:
+ names = 1;
+ reply = 1;
+ break;
+ case DLM_RCOM_LOOKUP:
+ lookup = 1;
+ break;
+ case DLM_RCOM_LOOKUP_REPLY:
+ lookup = 1;
+ reply = 1;
+ break;
case DLM_RCOM_LOCK:
lock = 1;
break;
@@ -504,10 +577,6 @@
lock = 1;
reply = 1;
break;
- case DLM_RCOM_STATUS_REPLY:
- case DLM_RCOM_NAMES_REPLY:
- case DLM_RCOM_LOOKUP_REPLY:
- reply = 1;
};
spin_lock(&ls->ls_recover_lock);
@@ -516,19 +585,17 @@
seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock);
- if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) ||
- (reply && (rc->rc_seq_reply != seq)) ||
- (lock && !(status & DLM_RS_DIR))) {
- log_limit(ls, "dlm_receive_rcom ignore msg %d "
- "from %d %llu %llu recover seq %llu sts %x gen %u",
- rc->rc_type,
- nodeid,
- (unsigned long long)rc->rc_seq,
- (unsigned long long)rc->rc_seq_reply,
- (unsigned long long)seq,
- status, ls->ls_generation);
- goto out;
- }
+ if (stop && (rc->rc_type != DLM_RCOM_STATUS))
+ goto ignore;
+
+ if (reply && (rc->rc_seq_reply != seq))
+ goto ignore;
+
+ if (!(status & DLM_RS_NODES) && (names || lookup || lock))
+ goto ignore;
+
+ if (!(status & DLM_RS_DIR) && (lookup || lock))
+ goto ignore;
switch (rc->rc_type) {
case DLM_RCOM_STATUS:
@@ -570,10 +637,20 @@
default:
log_error(ls, "receive_rcom bad type %d", rc->rc_type);
}
-out:
+ return;
+
+ignore:
+ log_limit(ls, "dlm_receive_rcom ignore msg %d "
+ "from %d %llu %llu recover seq %llu sts %x gen %u",
+ rc->rc_type,
+ nodeid,
+ (unsigned long long)rc->rc_seq,
+ (unsigned long long)rc->rc_seq_reply,
+ (unsigned long long)seq,
+ status, ls->ls_generation);
return;
Eshort:
- log_error(ls, "recovery message %x from %d is too short",
- rc->rc_type, nodeid);
+ log_error(ls, "recovery message %d from %d is too short",
+ rc->rc_type, nodeid);
}