NFS: Replace atomic_t variables in nfs_direct_req with a single spin lock

Three atomic_t variables cause a lot of bus locking.  Because they are all
used in the same places in the code, just use a single spin lock.

Now that the atomic_t variables are gone, we can remove the request size
limitation since the code no longer depends on the limited width of atomic_t
on some platforms.

Test plan:
Compile with CONFIG_NFS and CONFIG_NFS_DIRECTIO enabled.  Millions of fsx
operations, iozone, OraSim.

Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index bcbc213..3de7c4b 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -58,7 +58,6 @@
 #include "iostat.h"
 
 #define NFSDBG_FACILITY		NFSDBG_VFS
-#define MAX_DIRECTIO_SIZE	(4096UL << PAGE_SHIFT)
 
 static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty);
 static kmem_cache_t *nfs_direct_cachep;
@@ -68,6 +67,8 @@
  */
 struct nfs_direct_req {
 	struct kref		kref;		/* release manager */
+
+	/* I/O parameters */
 	struct list_head	list;		/* nfs_read/write_data structs */
 	struct file *		filp;		/* file descriptor */
 	struct kiocb *		iocb;		/* controlling i/o request */
@@ -75,12 +76,14 @@
 	struct inode *		inode;		/* target file of i/o */
 	struct page **		pages;		/* pages in our buffer */
 	unsigned int		npages;		/* count of pages */
-	atomic_t		complete,	/* i/os we're waiting for */
-				count,		/* bytes actually processed */
+
+	/* completion state */
+	spinlock_t		lock;		/* protect completion state */
+	int			outstanding;	/* i/os we're waiting for */
+	ssize_t			count,		/* bytes actually processed */
 				error;		/* any reported error */
 };
 
-
 /**
  * nfs_direct_IO - NFS address space operation for direct I/O
  * @rw: direction (read or write)
@@ -110,12 +113,6 @@
 	unsigned long page_count;
 	size_t array_size;
 
-	/* set an arbitrary limit to prevent type overflow */
-	if (size > MAX_DIRECTIO_SIZE) {
-		*pages = NULL;
-		return -EFBIG;
-	}
-
 	page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
 	page_count -= user_addr >> PAGE_SHIFT;
 
@@ -164,8 +161,10 @@
 	init_waitqueue_head(&dreq->wait);
 	INIT_LIST_HEAD(&dreq->list);
 	dreq->iocb = NULL;
-	atomic_set(&dreq->count, 0);
-	atomic_set(&dreq->error, 0);
+	spin_lock_init(&dreq->lock);
+	dreq->outstanding = 0;
+	dreq->count = 0;
+	dreq->error = 0;
 
 	return dreq;
 }
@@ -181,19 +180,18 @@
  */
 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
 {
-	int result = -EIOCBQUEUED;
+	ssize_t result = -EIOCBQUEUED;
 
 	/* Async requests don't wait here */
 	if (dreq->iocb)
 		goto out;
 
-	result = wait_event_interruptible(dreq->wait,
-					(atomic_read(&dreq->complete) == 0));
+	result = wait_event_interruptible(dreq->wait, (dreq->outstanding == 0));
 
 	if (!result)
-		result = atomic_read(&dreq->error);
+		result = dreq->error;
 	if (!result)
-		result = atomic_read(&dreq->count);
+		result = dreq->count;
 
 out:
 	kref_put(&dreq->kref, nfs_direct_req_release);
@@ -214,9 +212,9 @@
 	nfs_free_user_pages(dreq->pages, dreq->npages, 1);
 
 	if (dreq->iocb) {
-		long res = atomic_read(&dreq->error);
+		long res = (long) dreq->error;
 		if (!res)
-			res = atomic_read(&dreq->count);
+			res = (long) dreq->count;
 		aio_complete(dreq->iocb, res, 0);
 	} else
 		wake_up(&dreq->wait);
@@ -233,7 +231,6 @@
 {
 	struct list_head *list;
 	struct nfs_direct_req *dreq;
-	unsigned int reads = 0;
 	unsigned int rpages = (rsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
 
 	dreq = nfs_direct_req_alloc();
@@ -259,13 +256,12 @@
 		list_add(&data->pages, list);
 
 		data->req = (struct nfs_page *) dreq;
-		reads++;
+		dreq->outstanding++;
 		if (nbytes <= rsize)
 			break;
 		nbytes -= rsize;
 	}
 	kref_get(&dreq->kref);
-	atomic_set(&dreq->complete, reads);
 	return dreq;
 }
 
@@ -276,13 +272,21 @@
 
 	if (nfs_readpage_result(task, data) != 0)
 		return;
-	if (likely(task->tk_status >= 0))
-		atomic_add(data->res.count, &dreq->count);
-	else
-		atomic_set(&dreq->error, task->tk_status);
 
-	if (unlikely(atomic_dec_and_test(&dreq->complete)))
-		nfs_direct_complete(dreq);
+	spin_lock(&dreq->lock);
+
+	if (likely(task->tk_status >= 0))
+		dreq->count += data->res.count;
+	else
+		dreq->error = task->tk_status;
+
+	if (--dreq->outstanding) {
+		spin_unlock(&dreq->lock);
+		return;
+	}
+
+	spin_unlock(&dreq->lock);
+	nfs_direct_complete(dreq);
 }
 
 static const struct rpc_call_ops nfs_read_direct_ops = {
@@ -388,7 +392,6 @@
 {
 	struct list_head *list;
 	struct nfs_direct_req *dreq;
-	unsigned int writes = 0;
 	unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
 
 	dreq = nfs_direct_req_alloc();
@@ -414,16 +417,19 @@
 		list_add(&data->pages, list);
 
 		data->req = (struct nfs_page *) dreq;
-		writes++;
+		dreq->outstanding++;
 		if (nbytes <= wsize)
 			break;
 		nbytes -= wsize;
 	}
 	kref_get(&dreq->kref);
-	atomic_set(&dreq->complete, writes);
 	return dreq;
 }
 
+/*
+ * NB: Return the value of the first error return code.  Subsequent
+ *     errors after the first one are ignored.
+ */
 static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
 {
 	struct nfs_write_data *data = calldata;
@@ -436,15 +442,22 @@
 	if (unlikely(data->res.verf->committed != NFS_FILE_SYNC))
 		status = -EIO;
 
-	if (likely(status >= 0))
-		atomic_add(data->res.count, &dreq->count);
-	else
-		atomic_set(&dreq->error, status);
+	spin_lock(&dreq->lock);
 
-	if (unlikely(atomic_dec_and_test(&dreq->complete))) {
-		nfs_end_data_update(data->inode);
-		nfs_direct_complete(dreq);
+	if (likely(status >= 0))
+		dreq->count += data->res.count;
+	else
+		dreq->error = status;
+
+	if (--dreq->outstanding) {
+		spin_unlock(&dreq->lock);
+		return;
 	}
+
+	spin_unlock(&dreq->lock);
+
+	nfs_end_data_update(data->inode);
+	nfs_direct_complete(dreq);
 }
 
 static const struct rpc_call_ops nfs_write_direct_ops = {