parallel lookups machinery, part 3

We will need to be able to check if there is an in-lookup
dentry with matching parent/name.  Right now it's impossible,
but as soon as start locking directories shared such beasts
will appear.

Add a secondary hash for locating those.  Hash chains go through
the same space where d_alias will be once it's not in-lookup anymore.
Search is done under the same bitlock we use for modifications -
with the primary hash we can rely on d_rehash() into the wrong
chain being the worst that could happen, but here the pointers are
buggered once it's removed from the chain.  On the other hand,
the chains are not going to be long and normally we'll end up
adding to the chain anyway.  That allows us to avoid bothering with
->d_lock when doing the comparisons - everything is stable until
removed from chain.

New helper: d_alloc_parallel().  Right now it allocates, verifies
that no hashed and in-lookup matches exist and adds to in-lookup
hash.

Returns ERR_PTR() for error, hashed match (in the unlikely case it's
been found) or new dentry.  In-lookup matches trigger BUG() for
now; that will change in the next commit when we introduce waiting
for ongoing lookup to finish.  Note that in-lookup matches won't be
possible until we actually go for shared locking.

lookup_slow() switched to use of d_alloc_parallel().

Again, these commits are separated only for making it easier to
review.  All this machinery will start doing something useful only
when we go for shared locking; it's just that the combination is
too large for my taste.

Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
diff --git a/fs/dcache.c b/fs/dcache.c
index 10988f7..ea2de7c 100644
--- a/fs/dcache.c
+++ b/fs/dcache.c
@@ -111,6 +111,17 @@
 	return dentry_hashtable + hash_32(hash, d_hash_shift);
 }
 
+#define IN_LOOKUP_SHIFT 10
+static struct hlist_bl_head in_lookup_hashtable[1 << IN_LOOKUP_SHIFT];
+
+static inline struct hlist_bl_head *in_lookup_hash(const struct dentry *parent,
+					unsigned int hash)
+{
+	hash += (unsigned long) parent / L1_CACHE_BYTES;
+	return in_lookup_hashtable + hash_32(hash, IN_LOOKUP_SHIFT);
+}
+
+
 /* Statistics gathering. */
 struct dentry_stat_t dentry_stat = {
 	.age_limit = 45,
@@ -2380,9 +2391,102 @@
 	smp_store_release(&dir->i_dir_seq, n + 2);
 }
 
+struct dentry *d_alloc_parallel(struct dentry *parent,
+				const struct qstr *name)
+{
+	unsigned int len = name->len;
+	unsigned int hash = name->hash;
+	const unsigned char *str = name->name;
+	struct hlist_bl_head *b = in_lookup_hash(parent, hash);
+	struct hlist_bl_node *node;
+	struct dentry *new = d_alloc(parent, name);
+	struct dentry *dentry;
+	unsigned seq, r_seq, d_seq;
+
+	if (unlikely(!new))
+		return ERR_PTR(-ENOMEM);
+
+retry:
+	rcu_read_lock();
+	seq = smp_load_acquire(&parent->d_inode->i_dir_seq) & ~1;
+	r_seq = read_seqbegin(&rename_lock);
+	dentry = __d_lookup_rcu(parent, name, &d_seq);
+	if (unlikely(dentry)) {
+		if (!lockref_get_not_dead(&dentry->d_lockref)) {
+			rcu_read_unlock();
+			goto retry;
+		}
+		if (read_seqcount_retry(&dentry->d_seq, d_seq)) {
+			rcu_read_unlock();
+			dput(dentry);
+			goto retry;
+		}
+		rcu_read_unlock();
+		dput(new);
+		return dentry;
+	}
+	if (unlikely(read_seqretry(&rename_lock, r_seq))) {
+		rcu_read_unlock();
+		goto retry;
+	}
+	hlist_bl_lock(b);
+	if (unlikely(parent->d_inode->i_dir_seq != seq)) {
+		hlist_bl_unlock(b);
+		rcu_read_unlock();
+		goto retry;
+	}
+	rcu_read_unlock();
+	/*
+	 * No changes for the parent since the beginning of d_lookup().
+	 * Since all removals from the chain happen with hlist_bl_lock(),
+	 * any potential in-lookup matches are going to stay here until
+	 * we unlock the chain.  All fields are stable in everything
+	 * we encounter.
+	 */
+	hlist_bl_for_each_entry(dentry, node, b, d_u.d_in_lookup_hash) {
+		if (dentry->d_name.hash != hash)
+			continue;
+		if (dentry->d_parent != parent)
+			continue;
+		if (d_unhashed(dentry))
+			continue;
+		if (parent->d_flags & DCACHE_OP_COMPARE) {
+			int tlen = dentry->d_name.len;
+			const char *tname = dentry->d_name.name;
+			if (parent->d_op->d_compare(parent, dentry, tlen, tname, name))
+				continue;
+		} else {
+			if (dentry->d_name.len != len)
+				continue;
+			if (dentry_cmp(dentry, str, len))
+				continue;
+		}
+		dget(dentry);
+		hlist_bl_unlock(b);
+		/* impossible until we actually enable parallel lookups */
+		BUG();
+		/* and this will be "wait for it to stop being in-lookup" */
+		/* this one will be handled in the next commit */
+		dput(new);
+		return dentry;
+	}
+	/* we can't take ->d_lock here; it's OK, though. */
+	new->d_flags |= DCACHE_PAR_LOOKUP;
+	hlist_bl_add_head_rcu(&new->d_u.d_in_lookup_hash, b);
+	hlist_bl_unlock(b);
+	return new;
+}
+EXPORT_SYMBOL(d_alloc_parallel);
+
 void __d_lookup_done(struct dentry *dentry)
 {
+	struct hlist_bl_head *b = in_lookup_hash(dentry->d_parent,
+						 dentry->d_name.hash);
+	hlist_bl_lock(b);
 	dentry->d_flags &= ~DCACHE_PAR_LOOKUP;
+	__hlist_bl_del(&dentry->d_u.d_in_lookup_hash);
+	hlist_bl_unlock(b);
+	INIT_HLIST_NODE(&dentry->d_u.d_alias);
 	/* more stuff will land here */
 }
 EXPORT_SYMBOL(__d_lookup_done);
diff --git a/fs/namei.c b/fs/namei.c
index 26e5f84..aa04320 100644
--- a/fs/namei.c
+++ b/fs/namei.c
@@ -1603,46 +1603,40 @@
 				  struct dentry *dir,
 				  unsigned int flags)
 {
-	struct dentry *dentry, *old;
+	struct dentry *dentry = ERR_PTR(-ENOENT), *old;
 	struct inode *inode = dir->d_inode;
 
 	inode_lock(inode);
 	/* Don't go there if it's already dead */
-	if (unlikely(IS_DEADDIR(inode))) {
-		inode_unlock(inode);
-		return ERR_PTR(-ENOENT);
-	}
-	dentry = d_lookup(dir, name);
-	if (unlikely(dentry)) {
+	if (unlikely(IS_DEADDIR(inode)))
+		goto out;
+again:
+	dentry = d_alloc_parallel(dir, name);
+	if (IS_ERR(dentry))
+		goto out;
+	if (unlikely(!d_in_lookup(dentry))) {
 		if ((dentry->d_flags & DCACHE_OP_REVALIDATE) &&
 		    !(flags & LOOKUP_NO_REVAL)) {
 			int error = d_revalidate(dentry, flags);
 			if (unlikely(error <= 0)) {
-				if (!error)
+				if (!error) {
 					d_invalidate(dentry);
+					dput(dentry);
+					goto again;
+				}
 				dput(dentry);
 				dentry = ERR_PTR(error);
 			}
 		}
-		if (dentry) {
-			inode_unlock(inode);
-			return dentry;
+	} else {
+		old = inode->i_op->lookup(inode, dentry, flags);
+		d_lookup_done(dentry);
+		if (unlikely(old)) {
+			dput(dentry);
+			dentry = old;
 		}
 	}
-	dentry = d_alloc(dir, name);
-	if (unlikely(!dentry)) {
-		inode_unlock(inode);
-		return ERR_PTR(-ENOMEM);
-	}
-	spin_lock(&dentry->d_lock);
-	dentry->d_flags |= DCACHE_PAR_LOOKUP;
-	spin_unlock(&dentry->d_lock);
-	old = inode->i_op->lookup(inode, dentry, flags);
-	d_lookup_done(dentry);
-	if (unlikely(old)) {
-		dput(dentry);
-		dentry = old;
-	}
+out:
 	inode_unlock(inode);
 	return dentry;
 }