SUNRPC: Fix potential races in xprt_lock_write_next()

We have to ensure that the wake up from the waitqueue and the assignment
of xprt->snd_task are atomic. We can do this by assigning the snd_task
while under the waitqueue spinlock.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 3341d89..f982dfe 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -422,7 +422,7 @@
 /*
  * Wake up the next task on a priority queue.
  */
-static struct rpc_task * __rpc_wake_up_next_priority(struct rpc_wait_queue *queue)
+static struct rpc_task *__rpc_find_next_queued_priority(struct rpc_wait_queue *queue)
 {
 	struct list_head *q;
 	struct rpc_task *task;
@@ -467,30 +467,54 @@
 new_owner:
 	rpc_set_waitqueue_owner(queue, task->tk_owner);
 out:
-	rpc_wake_up_task_queue_locked(queue, task);
 	return task;
 }
 
+static struct rpc_task *__rpc_find_next_queued(struct rpc_wait_queue *queue)
+{
+	if (RPC_IS_PRIORITY(queue))
+		return __rpc_find_next_queued_priority(queue);
+	if (!list_empty(&queue->tasks[0]))
+		return list_first_entry(&queue->tasks[0], struct rpc_task, u.tk_wait.list);
+	return NULL;
+}
+
 /*
- * Wake up the next task on the wait queue.
+ * Wake up the first task on the wait queue.
  */
-struct rpc_task * rpc_wake_up_next(struct rpc_wait_queue *queue)
+struct rpc_task *rpc_wake_up_first(struct rpc_wait_queue *queue,
+		bool (*func)(struct rpc_task *, void *), void *data)
 {
 	struct rpc_task	*task = NULL;
 
-	dprintk("RPC:       wake_up_next(%p \"%s\")\n",
+	dprintk("RPC:       wake_up_first(%p \"%s\")\n",
 			queue, rpc_qname(queue));
 	spin_lock_bh(&queue->lock);
-	if (RPC_IS_PRIORITY(queue))
-		task = __rpc_wake_up_next_priority(queue);
-	else {
-		task_for_first(task, &queue->tasks[0])
+	task = __rpc_find_next_queued(queue);
+	if (task != NULL) {
+		if (func(task, data))
 			rpc_wake_up_task_queue_locked(queue, task);
+		else
+			task = NULL;
 	}
 	spin_unlock_bh(&queue->lock);
 
 	return task;
 }
+EXPORT_SYMBOL_GPL(rpc_wake_up_first);
+
+static bool rpc_wake_up_next_func(struct rpc_task *task, void *data)
+{
+	return true;
+}
+
+/*
+ * Wake up the next task on the wait queue.
+*/
+struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *queue)
+{
+	return rpc_wake_up_first(queue, rpc_wake_up_next_func, NULL);
+}
 EXPORT_SYMBOL_GPL(rpc_wake_up_next);
 
 /**
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index c64c0ef..839f6ef 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -292,54 +292,57 @@
 	return retval;
 }
 
-static void __xprt_lock_write_next(struct rpc_xprt *xprt)
+static bool __xprt_lock_write_func(struct rpc_task *task, void *data)
 {
-	struct rpc_task *task;
+	struct rpc_xprt *xprt = data;
 	struct rpc_rqst *req;
 
-	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
-		return;
-
-	task = rpc_wake_up_next(&xprt->sending);
-	if (task == NULL)
-		goto out_unlock;
-
 	req = task->tk_rqstp;
 	xprt->snd_task = task;
 	if (req) {
 		req->rq_bytes_sent = 0;
 		req->rq_ntrans++;
 	}
-	return;
+	return true;
+}
 
-out_unlock:
+static void __xprt_lock_write_next(struct rpc_xprt *xprt)
+{
+	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
+		return;
+
+	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_func, xprt))
+		return;
 	xprt_clear_locked(xprt);
 }
 
-static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
+static bool __xprt_lock_write_cong_func(struct rpc_task *task, void *data)
 {
-	struct rpc_task *task;
+	struct rpc_xprt *xprt = data;
 	struct rpc_rqst *req;
 
-	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
-		return;
-	if (RPCXPRT_CONGESTED(xprt))
-		goto out_unlock;
-	task = rpc_wake_up_next(&xprt->sending);
-	if (task == NULL)
-		goto out_unlock;
-
 	req = task->tk_rqstp;
 	if (req == NULL) {
 		xprt->snd_task = task;
-		return;
+		return true;
 	}
 	if (__xprt_get_cong(xprt, task)) {
 		xprt->snd_task = task;
 		req->rq_bytes_sent = 0;
 		req->rq_ntrans++;
-		return;
+		return true;
 	}
+	return false;
+}
+
+static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
+{
+	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
+		return;
+	if (RPCXPRT_CONGESTED(xprt))
+		goto out_unlock;
+	if (rpc_wake_up_first(&xprt->sending, __xprt_lock_write_cong_func, xprt))
+		return;
 out_unlock:
 	xprt_clear_locked(xprt);
 }