RPC: allow call_encode() to delay transmission of an RPC call.

 Currently, call_encode will cause the entire RPC call to abort if it returns
 an error. This is unnecessarily rigid, and gets in the way of attempts
 to allow the NFSv4 layer to order RPC calls that carry sequence ids.

 Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index a5f7029..5342740 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -678,13 +678,11 @@
 static void
 call_encode(struct rpc_task *task)
 {
-	struct rpc_clnt	*clnt = task->tk_client;
 	struct rpc_rqst	*req = task->tk_rqstp;
 	struct xdr_buf *sndbuf = &req->rq_snd_buf;
 	struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
 	unsigned int	bufsiz;
 	kxdrproc_t	encode;
-	int		status;
 	u32		*p;
 
 	dprintk("RPC: %4d call_encode (status %d)\n", 
@@ -712,12 +710,9 @@
 		rpc_exit(task, -EIO);
 		return;
 	}
-	if (encode && (status = rpcauth_wrap_req(task, encode, req, p,
-						 task->tk_msg.rpc_argp)) < 0) {
-		printk(KERN_WARNING "%s: can't encode arguments: %d\n",
-				clnt->cl_protname, -status);
-		rpc_exit(task, status);
-	}
+	if (encode != NULL)
+		task->tk_status = rpcauth_wrap_req(task, encode, req, p,
+				task->tk_msg.rpc_argp);
 }
 
 /*
@@ -865,10 +860,12 @@
 	if (task->tk_status != 0)
 		return;
 	/* Encode here so that rpcsec_gss can use correct sequence number. */
-	if (!task->tk_rqstp->rq_bytes_sent)
+	if (task->tk_rqstp->rq_bytes_sent == 0) {
 		call_encode(task);
-	if (task->tk_status < 0)
-		return;
+		/* Did the encode result in an error condition? */
+		if (task->tk_status != 0)
+			goto out_nosend;
+	}
 	xprt_transmit(task);
 	if (task->tk_status < 0)
 		return;
@@ -876,6 +873,10 @@
 		task->tk_action = NULL;
 		rpc_wake_up_task(task);
 	}
+	return;
+out_nosend:
+	/* release socket write lock before attempting to handle error */
+	xprt_abort_transmit(task);
 }
 
 /*
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 215be0d..1ba55dc 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -709,6 +709,14 @@
 	return err;
 }
 
+void
+xprt_abort_transmit(struct rpc_task *task)
+{
+	struct rpc_xprt	*xprt = task->tk_xprt;
+
+	xprt_release_write(xprt, task);
+}
+
 /**
  * xprt_transmit - send an RPC request on a transport
  * @task: controlling RPC task