SUNRPC: Eliminate side effects from rpc_malloc

Currently rpc_malloc sets req->rq_buffer internally.  Make this a more
generic interface:  return a pointer to the new buffer (or NULL) and
make the caller set req->rq_buffer and req->rq_bufsize.  This looks much
more like kmalloc and eliminates the side effects.

To fix a potential deadlock, this patch also replaces GFP_NOFS with
GFP_NOWAIT in rpc_malloc.  This prevents async RPCs from sleeping outside
the RPC's task scheduler while allocating their buffer.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 3069ecc..2047fb2 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -264,7 +264,7 @@
 void		rpc_wake_up_status(struct rpc_wait_queue *, int);
 void		rpc_delay(struct rpc_task *, unsigned long);
 void *		rpc_malloc(struct rpc_task *, size_t);
-void		rpc_free(struct rpc_task *);
+void		rpc_free(void *);
 int		rpciod_up(void);
 void		rpciod_down(void);
 int		__rpc_wait_for_completion_task(struct rpc_task *task, int (*)(void *));
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 7aa2950..745afc1 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -114,7 +114,7 @@
 	void		(*set_port)(struct rpc_xprt *xprt, unsigned short port);
 	void		(*connect)(struct rpc_task *task);
 	void *		(*buf_alloc)(struct rpc_task *task, size_t size);
-	void		(*buf_free)(struct rpc_task *task);
+	void		(*buf_free)(void *buffer);
 	int		(*send_request)(struct rpc_task *task);
 	void		(*set_retrans_timeout)(struct rpc_task *task);
 	void		(*timer)(struct rpc_task *task);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 12487aa..e7dc09e 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -774,7 +774,8 @@
 	req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
 	req->rq_rcvsize <<= 2;
 
-	xprt->ops->buf_alloc(task, req->rq_callsize + req->rq_rcvsize);
+	req->rq_buffer = xprt->ops->buf_alloc(task,
+					req->rq_callsize + req->rq_rcvsize);
 	if (req->rq_buffer != NULL)
 		return;
 
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 6d87320..4a53e94 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -741,50 +741,53 @@
  * @task: RPC task that will use this buffer
  * @size: requested byte size
  *
- * We try to ensure that some NFS reads and writes can always proceed
- * by using a mempool when allocating 'small' buffers.
+ * To prevent rpciod from hanging, this allocator never sleeps,
+ * returning NULL if the request cannot be serviced immediately.
+ * The caller can arrange to sleep in a way that is safe for rpciod.
+ *
+ * Most requests are 'small' (under 2KiB) and can be serviced from a
+ * mempool, ensuring that NFS reads and writes can always proceed,
+ * and that there is good locality of reference for these buffers.
+ *
  * In order to avoid memory starvation triggering more writebacks of
- * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
+ * NFS requests, we avoid using GFP_KERNEL.
  */
-void * rpc_malloc(struct rpc_task *task, size_t size)
+void *rpc_malloc(struct rpc_task *task, size_t size)
 {
-	struct rpc_rqst *req = task->tk_rqstp;
-	gfp_t	gfp;
+	size_t *buf;
+	gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
 
-	if (task->tk_flags & RPC_TASK_SWAPPER)
-		gfp = GFP_ATOMIC;
+	size += sizeof(size_t);
+	if (size <= RPC_BUFFER_MAXSIZE)
+		buf = mempool_alloc(rpc_buffer_mempool, gfp);
 	else
-		gfp = GFP_NOFS;
-
-	if (size > RPC_BUFFER_MAXSIZE) {
-		req->rq_buffer = kmalloc(size, gfp);
-		if (req->rq_buffer)
-			req->rq_bufsize = size;
-	} else {
-		req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
-		if (req->rq_buffer)
-			req->rq_bufsize = RPC_BUFFER_MAXSIZE;
-	}
-	return req->rq_buffer;
+		buf = kmalloc(size, gfp);
+	*buf = size;
+	dprintk("RPC: %5u allocated buffer of size %u at %p\n",
+			task->tk_pid, size, buf);
+	return (void *) ++buf;
 }
 
 /**
  * rpc_free - free buffer allocated via rpc_malloc
- * @task: RPC task with a buffer to be freed
+ * @buffer: buffer to free
  *
  */
-void rpc_free(struct rpc_task *task)
+void rpc_free(void *buffer)
 {
-	struct rpc_rqst *req = task->tk_rqstp;
+	size_t size, *buf = (size_t *) buffer;
 
-	if (req->rq_buffer) {
-		if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
-			mempool_free(req->rq_buffer, rpc_buffer_mempool);
-		else
-			kfree(req->rq_buffer);
-		req->rq_buffer = NULL;
-		req->rq_bufsize = 0;
-	}
+	if (!buffer)
+		return;
+	size = *buf;
+	buf--;
+
+	dprintk("RPC:       freeing buffer of size %u at %p\n",
+			size, buf);
+	if (size <= RPC_BUFFER_MAXSIZE)
+		mempool_free(buf, rpc_buffer_mempool);
+	else
+		kfree(buf);
 }
 
 /*
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 432ee92..81fe830 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -854,7 +854,7 @@
 		mod_timer(&xprt->timer,
 				xprt->last_used + xprt->idle_timeout);
 	spin_unlock_bh(&xprt->transport_lock);
-	xprt->ops->buf_free(task);
+	xprt->ops->buf_free(req->rq_buffer);
 	task->tk_rqstp = NULL;
 	if (req->rq_release_snd_buf)
 		req->rq_release_snd_buf(req);