ocfs2_dlm: fix cluster-wide refcounting of lock resources

This was previously broken and migration of some locks had to be temporarily
disabled. We use a new (and backward-incompatible) set of network messages
to account for all references to a lock resources held across the cluster.
once these are all freed, the master node may then free the lock resource
memory once its local references are dropped.

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/dlm/dlmrecovery.c b/fs/ocfs2/dlm/dlmrecovery.c
index 367a11e..d011a2a 100644
--- a/fs/ocfs2/dlm/dlmrecovery.c
+++ b/fs/ocfs2/dlm/dlmrecovery.c
@@ -1129,6 +1129,11 @@
 	if (total_locks == mres_total_locks)
 		mres->flags |= DLM_MRES_ALL_DONE;
 
+	mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",
+	     dlm->name, res->lockname.len, res->lockname.name,
+	     orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",
+	     send_to);
+
 	/* send it */
 	ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,
 				 sz, send_to, &status);
@@ -1213,6 +1218,34 @@
 	return 0;
 }
 
+static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,
+			       struct dlm_migratable_lockres *mres)
+{
+	struct dlm_lock dummy;
+	memset(&dummy, 0, sizeof(dummy));
+	dummy.ml.cookie = 0;
+	dummy.ml.type = LKM_IVMODE;
+	dummy.ml.convert_type = LKM_IVMODE;
+	dummy.ml.highest_blocked = LKM_IVMODE;
+	dummy.lksb = NULL;
+	dummy.ml.node = dlm->node_num;
+	dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);
+}
+
+static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,
+				    struct dlm_migratable_lock *ml,
+				    u8 *nodenum)
+{
+	if (unlikely(ml->cookie == 0 &&
+	    ml->type == LKM_IVMODE &&
+	    ml->convert_type == LKM_IVMODE &&
+	    ml->highest_blocked == LKM_IVMODE &&
+	    ml->list == DLM_BLOCKED_LIST)) {
+		*nodenum = ml->node;
+		return 1;
+	}
+	return 0;
+}
 
 int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
 			 struct dlm_migratable_lockres *mres,
@@ -1260,6 +1293,14 @@
 				goto error;
 		}
 	}
+	if (total_locks == 0) {
+		/* send a dummy lock to indicate a mastery reference only */
+		mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",
+		     dlm->name, res->lockname.len, res->lockname.name,
+		     send_to, flags & DLM_MRES_RECOVERY ? "recovery" :
+		     "migration");
+		dlm_add_dummy_lock(dlm, mres);
+	}
 	/* flush any remaining locks */
 	ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);
 	if (ret < 0)
@@ -1386,13 +1427,16 @@
 		/* add an extra ref for just-allocated lockres 
 		 * otherwise the lockres will be purged immediately */
 		dlm_lockres_get(res);
-
 	}
 
 	/* at this point we have allocated everything we need,
 	 * and we have a hashed lockres with an extra ref and
 	 * the proper res->state flags. */
 	ret = 0;
+	spin_lock(&res->spinlock);
+	/* drop this either when master requery finds a different master
+	 * or when a lock is added by the recovery worker */
+	dlm_lockres_grab_inflight_ref(dlm, res);
 	if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {
 		/* migration cannot have an unknown master */
 		BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));
@@ -1400,10 +1444,11 @@
 			  "unknown owner.. will need to requery: "
 			  "%.*s\n", mres->lockname_len, mres->lockname);
 	} else {
-		spin_lock(&res->spinlock);
+		/* take a reference now to pin the lockres, drop it
+		 * when locks are added in the worker */
 		dlm_change_lockres_owner(dlm, res, dlm->node_num);
-		spin_unlock(&res->spinlock);
 	}
+	spin_unlock(&res->spinlock);
 
 	/* queue up work for dlm_mig_lockres_worker */
 	dlm_grab(dlm);  /* get an extra ref for the work item */
@@ -1459,6 +1504,9 @@
 				   "this node will take it.\n",
 				   res->lockname.len, res->lockname.name);
 		} else {
+			spin_lock(&res->spinlock);
+			dlm_lockres_drop_inflight_ref(dlm, res);
+			spin_unlock(&res->spinlock);
 			mlog(0, "master needs to respond to sender "
 				  "that node %u still owns %.*s\n",
 				  real_master, res->lockname.len,
@@ -1666,10 +1714,25 @@
 	int i, bad;
 	struct list_head *iter;
 	struct dlm_lock *lock = NULL;
+	u8 from = O2NM_MAX_NODES;
+	unsigned int added = 0;
 
 	mlog(0, "running %d locks for this lockres\n", mres->num_locks);
 	for (i=0; i<mres->num_locks; i++) {
 		ml = &(mres->ml[i]);
+
+		if (dlm_is_dummy_lock(dlm, ml, &from)) {
+			/* placeholder, just need to set the refmap bit */
+			BUG_ON(mres->num_locks != 1);
+			mlog(0, "%s:%.*s: dummy lock for %u\n",
+			     dlm->name, mres->lockname_len, mres->lockname,
+			     from);
+			spin_lock(&res->spinlock);
+			dlm_lockres_set_refmap_bit(from, res);
+			spin_unlock(&res->spinlock);
+			added++;
+			break;
+		}
 		BUG_ON(ml->highest_blocked != LKM_IVMODE);
 		newlock = NULL;
 		lksb = NULL;
@@ -1711,6 +1774,7 @@
 			/* do not alter lock refcount.  switching lists. */
 			list_move_tail(&lock->list, queue);
 			spin_unlock(&res->spinlock);
+			added++;
 
 			mlog(0, "just reordered a local lock!\n");
 			continue;
@@ -1817,12 +1881,24 @@
 		if (!bad) {
 			dlm_lock_get(newlock);
 			list_add_tail(&newlock->list, queue);
+			mlog(0, "%s:%.*s: added lock for node %u, "
+			     "setting refmap bit\n", dlm->name,
+			     res->lockname.len, res->lockname.name, ml->node);
+			dlm_lockres_set_refmap_bit(ml->node, res);
+			added++;
 		}
 		spin_unlock(&res->spinlock);
 	}
 	mlog(0, "done running all the locks\n");
 
 leave:
+	/* balance the ref taken when the work was queued */
+	if (added > 0) {
+		spin_lock(&res->spinlock);
+		dlm_lockres_drop_inflight_ref(dlm, res);
+		spin_unlock(&res->spinlock);
+	}
+
 	if (ret < 0) {
 		mlog_errno(ret);
 		if (newlock)
@@ -1935,9 +2011,11 @@
 		if (res->owner == dead_node) {
 			list_del_init(&res->recovering);
 			spin_lock(&res->spinlock);
+			/* new_master has our reference from
+			 * the lock state sent during recovery */
 			dlm_change_lockres_owner(dlm, res, new_master);
 			res->state &= ~DLM_LOCK_RES_RECOVERING;
-			if (!__dlm_lockres_unused(res))
+			if (__dlm_lockres_has_locks(res))
 				__dlm_dirty_lockres(dlm, res);
 			spin_unlock(&res->spinlock);
 			wake_up(&res->wq);
@@ -1977,9 +2055,11 @@
 					dlm_lockres_put(res);
 				}
 				spin_lock(&res->spinlock);
+				/* new_master has our reference from
+				 * the lock state sent during recovery */
 				dlm_change_lockres_owner(dlm, res, new_master);
 				res->state &= ~DLM_LOCK_RES_RECOVERING;
-				if (!__dlm_lockres_unused(res))
+				if (__dlm_lockres_has_locks(res))
 					__dlm_dirty_lockres(dlm, res);
 				spin_unlock(&res->spinlock);
 				wake_up(&res->wq);
@@ -2048,6 +2128,7 @@
 {
 	struct list_head *iter, *tmpiter;
 	struct dlm_lock *lock;
+	unsigned int freed = 0;
 
 	/* this node is the lockres master:
 	 * 1) remove any stale locks for the dead node
@@ -2062,6 +2143,7 @@
 		if (lock->ml.node == dead_node) {
 			list_del_init(&lock->list);
 			dlm_lock_put(lock);
+			freed++;
 		}
 	}
 	list_for_each_safe(iter, tmpiter, &res->converting) {
@@ -2069,6 +2151,7 @@
 		if (lock->ml.node == dead_node) {
 			list_del_init(&lock->list);
 			dlm_lock_put(lock);
+			freed++;
 		}
 	}
 	list_for_each_safe(iter, tmpiter, &res->blocked) {
@@ -2076,9 +2159,23 @@
 		if (lock->ml.node == dead_node) {
 			list_del_init(&lock->list);
 			dlm_lock_put(lock);
+			freed++;
 		}
 	}
 
+	if (freed) {
+		mlog(0, "%s:%.*s: freed %u locks for dead node %u, "
+		     "dropping ref from lockres\n", dlm->name,
+		     res->lockname.len, res->lockname.name, freed, dead_node);
+		BUG_ON(!test_bit(dead_node, res->refmap));
+		dlm_lockres_clear_refmap_bit(dead_node, res);
+	} else if (test_bit(dead_node, res->refmap)) {
+		mlog(0, "%s:%.*s: dead node %u had a ref, but had "
+		     "no locks and had not purged before dying\n", dlm->name,
+		     res->lockname.len, res->lockname.name, dead_node);
+		dlm_lockres_clear_refmap_bit(dead_node, res);
+	}
+
 	/* do not kick thread yet */
 	__dlm_dirty_lockres(dlm, res);
 }
@@ -2141,9 +2238,21 @@
 			spin_lock(&res->spinlock);
 			/* zero the lvb if necessary */
 			dlm_revalidate_lvb(dlm, res, dead_node);
-			if (res->owner == dead_node)
+			if (res->owner == dead_node) {
+				if (res->state & DLM_LOCK_RES_DROPPING_REF)
+					mlog(0, "%s:%.*s: owned by "
+					     "dead node %u, this node was "
+					     "dropping its ref when it died. "
+					     "continue, dropping the flag.\n",
+					     dlm->name, res->lockname.len,
+					     res->lockname.name, dead_node);
+
+				/* the wake_up for this will happen when the
+				 * RECOVERING flag is dropped later */
+				res->state &= ~DLM_LOCK_RES_DROPPING_REF;
+
 				dlm_move_lockres_to_recovery_list(dlm, res);
-			else if (res->owner == dlm->node_num) {
+			} else if (res->owner == dlm->node_num) {
 				dlm_free_dead_locks(dlm, res, dead_node);
 				__dlm_lockres_calc_usage(dlm, res);
 			}