NFS: Replace atomic_t variables in nfs_direct_req with a single spin lock
Three atomic_t variables cause a lot of bus locking. Because they are all
used in the same places in the code, just use a single spin lock.
Now that the atomic_t variables are gone, we can remove the request size
limitation since the code no longer depends on the limited width of atomic_t
on some platforms.
Test plan:
Compile with CONFIG_NFS and CONFIG_NFS_DIRECTIO enabled. Millions of fsx
operations, iozone, OraSim.
Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index bcbc213..3de7c4b 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -58,7 +58,6 @@
#include "iostat.h"
#define NFSDBG_FACILITY NFSDBG_VFS
-#define MAX_DIRECTIO_SIZE (4096UL << PAGE_SHIFT)
static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty);
static kmem_cache_t *nfs_direct_cachep;
@@ -68,6 +67,8 @@
*/
struct nfs_direct_req {
struct kref kref; /* release manager */
+
+ /* I/O parameters */
struct list_head list; /* nfs_read/write_data structs */
struct file * filp; /* file descriptor */
struct kiocb * iocb; /* controlling i/o request */
@@ -75,12 +76,14 @@
struct inode * inode; /* target file of i/o */
struct page ** pages; /* pages in our buffer */
unsigned int npages; /* count of pages */
- atomic_t complete, /* i/os we're waiting for */
- count, /* bytes actually processed */
+
+ /* completion state */
+ spinlock_t lock; /* protect completion state */
+ int outstanding; /* i/os we're waiting for */
+ ssize_t count, /* bytes actually processed */
error; /* any reported error */
};
-
/**
* nfs_direct_IO - NFS address space operation for direct I/O
* @rw: direction (read or write)
@@ -110,12 +113,6 @@
unsigned long page_count;
size_t array_size;
- /* set an arbitrary limit to prevent type overflow */
- if (size > MAX_DIRECTIO_SIZE) {
- *pages = NULL;
- return -EFBIG;
- }
-
page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
page_count -= user_addr >> PAGE_SHIFT;
@@ -164,8 +161,10 @@
init_waitqueue_head(&dreq->wait);
INIT_LIST_HEAD(&dreq->list);
dreq->iocb = NULL;
- atomic_set(&dreq->count, 0);
- atomic_set(&dreq->error, 0);
+ spin_lock_init(&dreq->lock);
+ dreq->outstanding = 0;
+ dreq->count = 0;
+ dreq->error = 0;
return dreq;
}
@@ -181,19 +180,18 @@
*/
static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
{
- int result = -EIOCBQUEUED;
+ ssize_t result = -EIOCBQUEUED;
/* Async requests don't wait here */
if (dreq->iocb)
goto out;
- result = wait_event_interruptible(dreq->wait,
- (atomic_read(&dreq->complete) == 0));
+ result = wait_event_interruptible(dreq->wait, (dreq->outstanding == 0));
if (!result)
- result = atomic_read(&dreq->error);
+ result = dreq->error;
if (!result)
- result = atomic_read(&dreq->count);
+ result = dreq->count;
out:
kref_put(&dreq->kref, nfs_direct_req_release);
@@ -214,9 +212,9 @@
nfs_free_user_pages(dreq->pages, dreq->npages, 1);
if (dreq->iocb) {
- long res = atomic_read(&dreq->error);
+ long res = (long) dreq->error;
if (!res)
- res = atomic_read(&dreq->count);
+ res = (long) dreq->count;
aio_complete(dreq->iocb, res, 0);
} else
wake_up(&dreq->wait);
@@ -233,7 +231,6 @@
{
struct list_head *list;
struct nfs_direct_req *dreq;
- unsigned int reads = 0;
unsigned int rpages = (rsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
dreq = nfs_direct_req_alloc();
@@ -259,13 +256,12 @@
list_add(&data->pages, list);
data->req = (struct nfs_page *) dreq;
- reads++;
+ dreq->outstanding++;
if (nbytes <= rsize)
break;
nbytes -= rsize;
}
kref_get(&dreq->kref);
- atomic_set(&dreq->complete, reads);
return dreq;
}
@@ -276,13 +272,21 @@
if (nfs_readpage_result(task, data) != 0)
return;
- if (likely(task->tk_status >= 0))
- atomic_add(data->res.count, &dreq->count);
- else
- atomic_set(&dreq->error, task->tk_status);
- if (unlikely(atomic_dec_and_test(&dreq->complete)))
- nfs_direct_complete(dreq);
+ spin_lock(&dreq->lock);
+
+ if (likely(task->tk_status >= 0))
+ dreq->count += data->res.count;
+ else
+ dreq->error = task->tk_status;
+
+ if (--dreq->outstanding) {
+ spin_unlock(&dreq->lock);
+ return;
+ }
+
+ spin_unlock(&dreq->lock);
+ nfs_direct_complete(dreq);
}
static const struct rpc_call_ops nfs_read_direct_ops = {
@@ -388,7 +392,6 @@
{
struct list_head *list;
struct nfs_direct_req *dreq;
- unsigned int writes = 0;
unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
dreq = nfs_direct_req_alloc();
@@ -414,16 +417,19 @@
list_add(&data->pages, list);
data->req = (struct nfs_page *) dreq;
- writes++;
+ dreq->outstanding++;
if (nbytes <= wsize)
break;
nbytes -= wsize;
}
kref_get(&dreq->kref);
- atomic_set(&dreq->complete, writes);
return dreq;
}
+/*
+ * NB: Return the value of the first error return code. Subsequent
+ * errors after the first one are ignored.
+ */
static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
{
struct nfs_write_data *data = calldata;
@@ -436,15 +442,22 @@
if (unlikely(data->res.verf->committed != NFS_FILE_SYNC))
status = -EIO;
- if (likely(status >= 0))
- atomic_add(data->res.count, &dreq->count);
- else
- atomic_set(&dreq->error, status);
+ spin_lock(&dreq->lock);
- if (unlikely(atomic_dec_and_test(&dreq->complete))) {
- nfs_end_data_update(data->inode);
- nfs_direct_complete(dreq);
+ if (likely(status >= 0))
+ dreq->count += data->res.count;
+ else
+ dreq->error = status;
+
+ if (--dreq->outstanding) {
+ spin_unlock(&dreq->lock);
+ return;
}
+
+ spin_unlock(&dreq->lock);
+
+ nfs_end_data_update(data->inode);
+ nfs_direct_complete(dreq);
}
static const struct rpc_call_ops nfs_write_direct_ops = {