ocfs2_dlm: Fixes race between migrate and dirty

dlmthread was removing lockres' from the dirty list
and resetting the dirty flag before shuffling the list.
This patch retains the dirty state flag until the lists
are shuffled.

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Sunil Mushran <Sunil.Mushran@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/dlm/dlmcommon.h b/fs/ocfs2/dlm/dlmcommon.h
index 04048bb..e95ecb2 100644
--- a/fs/ocfs2/dlm/dlmcommon.h
+++ b/fs/ocfs2/dlm/dlmcommon.h
@@ -223,6 +223,7 @@
 #define DLM_LOCK_RES_IN_PROGRESS          0x00000010
 #define DLM_LOCK_RES_MIGRATING            0x00000020
 #define DLM_LOCK_RES_DROPPING_REF         0x00000040
+#define DLM_LOCK_RES_BLOCK_DIRTY          0x00001000
 
 /* max milliseconds to wait to sync up a network failure with a node death */
 #define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
diff --git a/fs/ocfs2/dlm/dlmmaster.c b/fs/ocfs2/dlm/dlmmaster.c
index 251c480..a65a877 100644
--- a/fs/ocfs2/dlm/dlmmaster.c
+++ b/fs/ocfs2/dlm/dlmmaster.c
@@ -2707,8 +2707,15 @@
 	__dlm_lockres_reserve_ast(res);
 	spin_unlock(&res->spinlock);
 
-	/* now flush all the pending asts.. hang out for a bit */
+	/* now flush all the pending asts */
 	dlm_kick_thread(dlm, res);
+	/* before waiting on DIRTY, block processes which may
+	 * try to dirty the lockres before MIGRATING is set */
+	spin_lock(&res->spinlock);
+	BUG_ON(res->state & DLM_LOCK_RES_BLOCK_DIRTY);
+	res->state |= DLM_LOCK_RES_BLOCK_DIRTY;
+	spin_unlock(&res->spinlock);
+	/* now wait on any pending asts and the DIRTY state */
 	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));
 	dlm_lockres_release_ast(dlm, res);
 
@@ -2734,6 +2741,13 @@
 		mlog(0, "trying again...\n");
 		goto again;
 	}
+	/* now that we are sure the MIGRATING state is there, drop
+	 * the unneded state which blocked threads trying to DIRTY */
+	spin_lock(&res->spinlock);
+	BUG_ON(!(res->state & DLM_LOCK_RES_BLOCK_DIRTY));
+	BUG_ON(!(res->state & DLM_LOCK_RES_MIGRATING));
+	res->state &= ~DLM_LOCK_RES_BLOCK_DIRTY;
+	spin_unlock(&res->spinlock);
 
 	/* did the target go down or die? */
 	spin_lock(&dlm->spinlock);
diff --git a/fs/ocfs2/dlm/dlmthread.c b/fs/ocfs2/dlm/dlmthread.c
index baa9997..3b94e4d 100644
--- a/fs/ocfs2/dlm/dlmthread.c
+++ b/fs/ocfs2/dlm/dlmthread.c
@@ -95,7 +95,7 @@
 int __dlm_lockres_unused(struct dlm_lock_resource *res)
 {
 	if (!__dlm_lockres_has_locks(res) &&
-	    list_empty(&res->dirty)) {
+	    (list_empty(&res->dirty) && !(res->state & DLM_LOCK_RES_DIRTY))) {
 		/* try not to scan the bitmap unless the first two
 		 * conditions are already true */
 		int bit = find_next_bit(res->refmap, O2NM_MAX_NODES, 0);
@@ -455,12 +455,17 @@
 	assert_spin_locked(&res->spinlock);
 
 	/* don't shuffle secondary queues */
-	if ((res->owner == dlm->node_num) &&
-	    !(res->state & DLM_LOCK_RES_DIRTY)) {
-		/* ref for dirty_list */
-		dlm_lockres_get(res);
-		list_add_tail(&res->dirty, &dlm->dirty_list);
-		res->state |= DLM_LOCK_RES_DIRTY;
+	if ((res->owner == dlm->node_num)) {
+		if (res->state & (DLM_LOCK_RES_MIGRATING |
+				  DLM_LOCK_RES_BLOCK_DIRTY))
+		    return;
+
+		if (list_empty(&res->dirty)) {
+			/* ref for dirty_list */
+			dlm_lockres_get(res);
+			list_add_tail(&res->dirty, &dlm->dirty_list);
+			res->state |= DLM_LOCK_RES_DIRTY;
+		}
 	}
 }
 
@@ -639,7 +644,7 @@
 			dlm_lockres_get(res);
 
 			spin_lock(&res->spinlock);
-			res->state &= ~DLM_LOCK_RES_DIRTY;
+			/* We clear the DLM_LOCK_RES_DIRTY state once we shuffle lists below */
 			list_del_init(&res->dirty);
 			spin_unlock(&res->spinlock);
 			spin_unlock(&dlm->spinlock);
@@ -663,10 +668,11 @@
 			/* it is now ok to move lockreses in these states
 			 * to the dirty list, assuming that they will only be
 			 * dirty for a short while. */
+			BUG_ON(res->state & DLM_LOCK_RES_MIGRATING);
 			if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
-					  DLM_LOCK_RES_MIGRATING |
 					  DLM_LOCK_RES_RECOVERING)) {
 				/* move it to the tail and keep going */
+				res->state &= ~DLM_LOCK_RES_DIRTY;
 				spin_unlock(&res->spinlock);
 				mlog(0, "delaying list shuffling for in-"
 				     "progress lockres %.*s, state=%d\n",
@@ -687,6 +693,7 @@
 
 			/* called while holding lockres lock */
 			dlm_shuffle_lists(dlm, res);
+			res->state &= ~DLM_LOCK_RES_DIRTY;
 			spin_unlock(&res->spinlock);
 
 			dlm_lockres_calc_usage(dlm, res);
@@ -697,11 +704,8 @@
 			/* if the lock was in-progress, stick
 			 * it on the back of the list */
 			if (delay) {
-				/* ref for dirty_list */
-				dlm_lockres_get(res);
 				spin_lock(&res->spinlock);
-				list_add_tail(&res->dirty, &dlm->dirty_list);
-				res->state |= DLM_LOCK_RES_DIRTY;
+				__dlm_dirty_lockres(dlm, res);
 				spin_unlock(&res->spinlock);
 			}
 			dlm_lockres_put(res);