[PATCH] ocfs2: fix orphan recovery deadlock
Orphan dir recovery can deadlock with another process in
ocfs2_delete_inode() in some corner cases. Fix this by tracking recovery
state more closely and allowing it to handle inode wipes which might
deadlock.
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/heartbeat.c b/fs/ocfs2/heartbeat.c
index 0bbd22f..cbfd45a 100644
--- a/fs/ocfs2/heartbeat.c
+++ b/fs/ocfs2/heartbeat.c
@@ -67,6 +67,7 @@
ocfs2_node_map_init(&osb->mounted_map);
ocfs2_node_map_init(&osb->recovery_map);
ocfs2_node_map_init(&osb->umount_map);
+ ocfs2_node_map_init(&osb->osb_recovering_orphan_dirs);
}
static void ocfs2_do_node_down(int node_num,
diff --git a/fs/ocfs2/inode.c b/fs/ocfs2/inode.c
index 8122489..315472a 100644
--- a/fs/ocfs2/inode.c
+++ b/fs/ocfs2/inode.c
@@ -41,6 +41,7 @@
#include "dlmglue.h"
#include "extent_map.h"
#include "file.h"
+#include "heartbeat.h"
#include "inode.h"
#include "journal.h"
#include "namei.h"
@@ -544,6 +545,42 @@
return status;
}
+/*
+ * Serialize with orphan dir recovery. If the process doing
+ * recovery on this orphan dir does an iget() with the dir
+ * i_mutex held, we'll deadlock here. Instead we detect this
+ * and exit early - recovery will wipe this inode for us.
+ */
+static int ocfs2_check_orphan_recovery_state(struct ocfs2_super *osb,
+ int slot)
+{
+ int ret = 0;
+
+ spin_lock(&osb->osb_lock);
+ if (ocfs2_node_map_test_bit(osb, &osb->osb_recovering_orphan_dirs, slot)) {
+ mlog(0, "Recovery is happening on orphan dir %d, will skip "
+ "this inode\n", slot);
+ ret = -EDEADLK;
+ goto out;
+ }
+ /* This signals to the orphan recovery process that it should
+ * wait for us to handle the wipe. */
+ osb->osb_orphan_wipes[slot]++;
+out:
+ spin_unlock(&osb->osb_lock);
+ return ret;
+}
+
+static void ocfs2_signal_wipe_completion(struct ocfs2_super *osb,
+ int slot)
+{
+ spin_lock(&osb->osb_lock);
+ osb->osb_orphan_wipes[slot]--;
+ spin_unlock(&osb->osb_lock);
+
+ wake_up(&osb->osb_wipe_event);
+}
+
static int ocfs2_wipe_inode(struct inode *inode,
struct buffer_head *di_bh)
{
@@ -555,6 +592,11 @@
/* We've already voted on this so it should be readonly - no
* spinlock needed. */
orphaned_slot = OCFS2_I(inode)->ip_orphaned_slot;
+
+ status = ocfs2_check_orphan_recovery_state(osb, orphaned_slot);
+ if (status)
+ return status;
+
orphan_dir_inode = ocfs2_get_system_file_inode(osb,
ORPHAN_DIR_SYSTEM_INODE,
orphaned_slot);
@@ -597,6 +639,7 @@
brelse(orphan_dir_bh);
bail:
iput(orphan_dir_inode);
+ ocfs2_signal_wipe_completion(osb, orphaned_slot);
return status;
}
@@ -822,7 +865,8 @@
status = ocfs2_wipe_inode(inode, di_bh);
if (status < 0) {
- mlog_errno(status);
+ if (status != -EDEADLK)
+ mlog_errno(status);
goto bail_unlock_inode;
}
diff --git a/fs/ocfs2/journal.c b/fs/ocfs2/journal.c
index d329c9d..4be801f 100644
--- a/fs/ocfs2/journal.c
+++ b/fs/ocfs2/journal.c
@@ -1408,21 +1408,17 @@
return status;
}
-static int ocfs2_recover_orphans(struct ocfs2_super *osb,
- int slot)
+static int ocfs2_queue_orphans(struct ocfs2_super *osb,
+ int slot,
+ struct inode **head)
{
- int status = 0;
- int have_disk_lock = 0;
- struct inode *inode = NULL;
- struct inode *iter;
+ int status;
struct inode *orphan_dir_inode = NULL;
+ struct inode *iter;
unsigned long offset, blk, local;
struct buffer_head *bh = NULL;
struct ocfs2_dir_entry *de;
struct super_block *sb = osb->sb;
- struct ocfs2_inode_info *oi;
-
- mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
orphan_dir_inode = ocfs2_get_system_file_inode(osb,
ORPHAN_DIR_SYSTEM_INODE,
@@ -1430,17 +1426,15 @@
if (!orphan_dir_inode) {
status = -ENOENT;
mlog_errno(status);
- goto out;
- }
+ return status;
+ }
mutex_lock(&orphan_dir_inode->i_mutex);
status = ocfs2_meta_lock(orphan_dir_inode, NULL, NULL, 0);
if (status < 0) {
- mutex_unlock(&orphan_dir_inode->i_mutex);
mlog_errno(status);
goto out;
}
- have_disk_lock = 1;
offset = 0;
iter = NULL;
@@ -1451,11 +1445,10 @@
if (!bh)
status = -EINVAL;
if (status < 0) {
- mutex_unlock(&orphan_dir_inode->i_mutex);
if (bh)
brelse(bh);
mlog_errno(status);
- goto out;
+ goto out_unlock;
}
local = 0;
@@ -1465,11 +1458,10 @@
if (!ocfs2_check_dir_entry(orphan_dir_inode,
de, bh, local)) {
- mutex_unlock(&orphan_dir_inode->i_mutex);
status = -EINVAL;
mlog_errno(status);
brelse(bh);
- goto out;
+ goto out_unlock;
}
local += le16_to_cpu(de->rec_len);
@@ -1504,18 +1496,95 @@
mlog(0, "queue orphan %"MLFu64"\n",
OCFS2_I(iter)->ip_blkno);
- OCFS2_I(iter)->ip_next_orphan = inode;
- inode = iter;
+ /* No locking is required for the next_orphan
+ * queue as there is only ever a single
+ * process doing orphan recovery. */
+ OCFS2_I(iter)->ip_next_orphan = *head;
+ *head = iter;
}
brelse(bh);
}
- mutex_unlock(&orphan_dir_inode->i_mutex);
+out_unlock:
ocfs2_meta_unlock(orphan_dir_inode, 0);
- have_disk_lock = 0;
-
+out:
+ mutex_unlock(&orphan_dir_inode->i_mutex);
iput(orphan_dir_inode);
- orphan_dir_inode = NULL;
+ return status;
+}
+
+static int ocfs2_orphan_recovery_can_continue(struct ocfs2_super *osb,
+ int slot)
+{
+ int ret;
+
+ spin_lock(&osb->osb_lock);
+ ret = !osb->osb_orphan_wipes[slot];
+ spin_unlock(&osb->osb_lock);
+ return ret;
+}
+
+static void ocfs2_mark_recovering_orphan_dir(struct ocfs2_super *osb,
+ int slot)
+{
+ spin_lock(&osb->osb_lock);
+ /* Mark ourselves such that new processes in delete_inode()
+ * know to quit early. */
+ ocfs2_node_map_set_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
+ while (osb->osb_orphan_wipes[slot]) {
+ /* If any processes are already in the middle of an
+ * orphan wipe on this dir, then we need to wait for
+ * them. */
+ spin_unlock(&osb->osb_lock);
+ wait_event_interruptible(osb->osb_wipe_event,
+ ocfs2_orphan_recovery_can_continue(osb, slot));
+ spin_lock(&osb->osb_lock);
+ }
+ spin_unlock(&osb->osb_lock);
+}
+
+static void ocfs2_clear_recovering_orphan_dir(struct ocfs2_super *osb,
+ int slot)
+{
+ ocfs2_node_map_clear_bit(osb, &osb->osb_recovering_orphan_dirs, slot);
+}
+
+/*
+ * Orphan recovery. Each mounted node has it's own orphan dir which we
+ * must run during recovery. Our strategy here is to build a list of
+ * the inodes in the orphan dir and iget/iput them. The VFS does
+ * (most) of the rest of the work.
+ *
+ * Orphan recovery can happen at any time, not just mount so we have a
+ * couple of extra considerations.
+ *
+ * - We grab as many inodes as we can under the orphan dir lock -
+ * doing iget() outside the orphan dir risks getting a reference on
+ * an invalid inode.
+ * - We must be sure not to deadlock with other processes on the
+ * system wanting to run delete_inode(). This can happen when they go
+ * to lock the orphan dir and the orphan recovery process attempts to
+ * iget() inside the orphan dir lock. This can be avoided by
+ * advertising our state to ocfs2_delete_inode().
+ */
+static int ocfs2_recover_orphans(struct ocfs2_super *osb,
+ int slot)
+{
+ int ret = 0;
+ struct inode *inode = NULL;
+ struct inode *iter;
+ struct ocfs2_inode_info *oi;
+
+ mlog(0, "Recover inodes from orphan dir in slot %d\n", slot);
+
+ ocfs2_mark_recovering_orphan_dir(osb, slot);
+ ret = ocfs2_queue_orphans(osb, slot, &inode);
+ ocfs2_clear_recovering_orphan_dir(osb, slot);
+
+ /* Error here should be noted, but we want to continue with as
+ * many queued inodes as we've got. */
+ if (ret)
+ mlog_errno(ret);
while (inode) {
oi = OCFS2_I(inode);
@@ -1541,14 +1610,7 @@
inode = iter;
}
-out:
- if (have_disk_lock)
- ocfs2_meta_unlock(orphan_dir_inode, 0);
-
- if (orphan_dir_inode)
- iput(orphan_dir_inode);
-
- return status;
+ return ret;
}
static int ocfs2_wait_on_mount(struct ocfs2_super *osb)
diff --git a/fs/ocfs2/ocfs2.h b/fs/ocfs2/ocfs2.h
index 19360e3..e89de9b 100644
--- a/fs/ocfs2/ocfs2.h
+++ b/fs/ocfs2/ocfs2.h
@@ -287,6 +287,10 @@
struct inode *osb_tl_inode;
struct buffer_head *osb_tl_bh;
struct work_struct osb_truncate_log_wq;
+
+ struct ocfs2_node_map osb_recovering_orphan_dirs;
+ unsigned int *osb_orphan_wipes;
+ wait_queue_head_t osb_wipe_event;
};
#define OCFS2_SB(sb) ((struct ocfs2_super *)(sb)->s_fs_info)
diff --git a/fs/ocfs2/super.c b/fs/ocfs2/super.c
index 046824b..8dd3aaf 100644
--- a/fs/ocfs2/super.c
+++ b/fs/ocfs2/super.c
@@ -1325,6 +1325,16 @@
}
mlog(ML_NOTICE, "max_slots for this device: %u\n", osb->max_slots);
+ init_waitqueue_head(&osb->osb_wipe_event);
+ osb->osb_orphan_wipes = kcalloc(osb->max_slots,
+ sizeof(*osb->osb_orphan_wipes),
+ GFP_KERNEL);
+ if (!osb->osb_orphan_wipes) {
+ status = -ENOMEM;
+ mlog_errno(status);
+ goto bail;
+ }
+
osb->s_feature_compat =
le32_to_cpu(OCFS2_RAW_SB(di)->s_feature_compat);
osb->s_feature_ro_compat =
@@ -1638,6 +1648,7 @@
if (osb->slot_info)
ocfs2_free_slot_info(osb->slot_info);
+ kfree(osb->osb_orphan_wipes);
/* FIXME
* This belongs in journal shutdown, but because we have to
* allocate osb->journal at the start of ocfs2_initalize_osb(),