Merge branch 'for-2.6.32' of git://linux-nfs.org/~bfields/linux

* 'for-2.6.32' of git://linux-nfs.org/~bfields/linux: (68 commits)
  nfsd4: nfsv4 clients should cross mountpoints
  nfsd: revise 4.1 status documentation
  sunrpc/cache: avoid variable over-loading in cache_defer_req
  sunrpc/cache: use list_del_init for the list_head entries in cache_deferred_req
  nfsd: return success for non-NFS4 nfs4_state_start
  nfsd41: Refactor create_client()
  nfsd41: modify nfsd4.1 backchannel to use new xprt class
  nfsd41: Backchannel: Implement cb_recall over NFSv4.1
  nfsd41: Backchannel: cb_sequence callback
  nfsd41: Backchannel: Setup sequence information
  nfsd41: Backchannel: Server backchannel RPC wait queue
  nfsd41: Backchannel: Add sequence arguments to callback RPC arguments
  nfsd41: Backchannel: callback infrastructure
  nfsd4: use common rpc_cred for all callbacks
  nfsd4: allow nfs4 state startup to fail
  SUNRPC: Defer the auth_gss upcall when the RPC call is asynchronous
  nfsd4: fix null dereference creating nfsv4 callback client
  nfsd4: fix whitespace in NFSPROC4_CLNT_CB_NULL definition
  nfsd41: sunrpc: add new xprt class for nfsv4.1 backchannel
  sunrpc/cache: simplify cache_fresh_locked and cache_fresh_unlocked.
  ...
diff --git a/Documentation/filesystems/nfs41-server.txt b/Documentation/filesystems/nfs41-server.txt
index 05d81cb..5920fe2 100644
--- a/Documentation/filesystems/nfs41-server.txt
+++ b/Documentation/filesystems/nfs41-server.txt
@@ -11,6 +11,11 @@
 control file, the nfsd service must be taken down.  Use your user-mode
 nfs-utils to set this up; see rpc.nfsd(8)
 
+(Warning: older servers will interpret "+4.1" and "-4.1" as "+4" and
+"-4", respectively.  Therefore, code meant to work on both new and old
+kernels must turn 4.1 on or off *before* turning support for version 4
+on or off; rpc.nfsd does this correctly.)
+
 The NFSv4 minorversion 1 (NFSv4.1) implementation in nfsd is based
 on the latest NFSv4.1 Internet Draft:
 http://tools.ietf.org/html/draft-ietf-nfsv4-minorversion1-29
@@ -25,6 +30,49 @@
 See http://wiki.linux-nfs.org/wiki/index.php/PNFS_prototype_design
 for more information.
 
+The current implementation is intended for developers only: while it
+does support ordinary file operations on clients we have tested against
+(including the linux client), it is incomplete in ways which may limit
+features unexpectedly, cause known bugs in rare cases, or cause
+interoperability problems with future clients.  Known issues:
+
+	- gss support is questionable: currently mounts with kerberos
+	  from a linux client are possible, but we aren't really
+	  conformant with the spec (for example, we don't use kerberos
+	  on the backchannel correctly).
+	- no trunking support: no clients currently take advantage of
+	  trunking, but this is a mandatory failure, and its use is
+	  recommended to clients in a number of places.  (E.g. to ensure
+	  timely renewal in case an existing connection's retry timeouts
+	  have gotten too long; see section 8.3 of the draft.)
+	  Therefore, lack of this feature may cause future clients to
+	  fail.
+	- Incomplete backchannel support: incomplete backchannel gss
+	  support and no support for BACKCHANNEL_CTL mean that
+	  callbacks (hence delegations and layouts) may not be
+	  available and clients confused by the incomplete
+	  implementation may fail.
+	- Server reboot recovery is unsupported; if the server reboots,
+	  clients may fail.
+	- We do not support SSV, which provides security for shared
+	  client-server state (thus preventing unauthorized tampering
+	  with locks and opens, for example).  It is mandatory for
+	  servers to support this, though no clients use it yet.
+	- Mandatory operations which we do not support, such as
+	  DESTROY_CLIENTID, FREE_STATEID, SECINFO_NO_NAME, and
+	  TEST_STATEID, are not currently used by clients, but will be
+	  (and the spec recommends their uses in common cases), and
+	  clients should not be expected to know how to recover from the
+	  case where they are not supported.  This will eventually cause
+	  interoperability failures.
+
+In addition, some limitations are inherited from the current NFSv4
+implementation:
+
+	- Incomplete delegation enforcement: if a file is renamed or
+	  unlinked, a client holding a delegation may continue to
+	  indefinitely allow opens of the file under the old name.
+
 The table below, taken from the NFSv4.1 document, lists
 the operations that are mandatory to implement (REQ), optional
 (OPT), and NFSv4.0 operations that are required not to implement (MNI)
@@ -142,6 +190,12 @@
 
 Implementation notes:
 
+DELEGPURGE:
+* mandatory only for servers that support CLAIM_DELEGATE_PREV and/or
+  CLAIM_DELEG_PREV_FH (which allows clients to keep delegations that
+  persist across client reboots).  Thus we need not implement this for
+  now.
+
 EXCHANGE_ID:
 * only SP4_NONE state protection supported
 * implementation ids are ignored
diff --git a/fs/lockd/clntlock.c b/fs/lockd/clntlock.c
index 1f3b0fc..fc9032d 100644
--- a/fs/lockd/clntlock.c
+++ b/fs/lockd/clntlock.c
@@ -166,7 +166,7 @@
 		 */
 		if (fl_blocked->fl_u.nfs_fl.owner->pid != lock->svid)
 			continue;
-		if (!nlm_cmp_addr(nlm_addr(block->b_host), addr))
+		if (!rpc_cmp_addr(nlm_addr(block->b_host), addr))
 			continue;
 		if (nfs_compare_fh(NFS_FH(fl_blocked->fl_file->f_path.dentry->d_inode) ,fh) != 0)
 			continue;
diff --git a/fs/lockd/host.c b/fs/lockd/host.c
index 7cb076a..4600c20 100644
--- a/fs/lockd/host.c
+++ b/fs/lockd/host.c
@@ -111,7 +111,7 @@
 	 */
 	chain = &nlm_hosts[nlm_hash_address(ni->sap)];
 	hlist_for_each_entry(host, pos, chain, h_hash) {
-		if (!nlm_cmp_addr(nlm_addr(host), ni->sap))
+		if (!rpc_cmp_addr(nlm_addr(host), ni->sap))
 			continue;
 
 		/* See if we have an NSM handle for this client */
@@ -125,7 +125,7 @@
 		if (host->h_server != ni->server)
 			continue;
 		if (ni->server &&
-		    !nlm_cmp_addr(nlm_srcaddr(host), ni->src_sap))
+		    !rpc_cmp_addr(nlm_srcaddr(host), ni->src_sap))
 			continue;
 
 		/* Move to head of hash chain. */
diff --git a/fs/lockd/mon.c b/fs/lockd/mon.c
index 30c9331..f956651 100644
--- a/fs/lockd/mon.c
+++ b/fs/lockd/mon.c
@@ -209,7 +209,7 @@
 	struct nsm_handle *nsm;
 
 	list_for_each_entry(nsm, &nsm_handles, sm_link)
-		if (nlm_cmp_addr(nsm_addr(nsm), sap))
+		if (rpc_cmp_addr(nsm_addr(nsm), sap))
 			return nsm;
 	return NULL;
 }
diff --git a/fs/lockd/svcsubs.c b/fs/lockd/svcsubs.c
index 9e4d6aab..ad478da 100644
--- a/fs/lockd/svcsubs.c
+++ b/fs/lockd/svcsubs.c
@@ -417,7 +417,7 @@
 static int
 nlmsvc_match_ip(void *datap, struct nlm_host *host)
 {
-	return nlm_cmp_addr(nlm_srcaddr(host), datap);
+	return rpc_cmp_addr(nlm_srcaddr(host), datap);
 }
 
 /**
diff --git a/fs/nfsd/export.c b/fs/nfsd/export.c
index d946264..984a5eb 100644
--- a/fs/nfsd/export.c
+++ b/fs/nfsd/export.c
@@ -1341,6 +1341,8 @@
 	if (rv)
 		goto out;
 	rv = check_nfsd_access(exp, rqstp);
+	if (rv)
+		fh_put(fhp);
 out:
 	exp_put(exp);
 	return rv;
diff --git a/fs/nfsd/nfs3xdr.c b/fs/nfsd/nfs3xdr.c
index 01d4ec1..edf926e 100644
--- a/fs/nfsd/nfs3xdr.c
+++ b/fs/nfsd/nfs3xdr.c
@@ -814,17 +814,6 @@
 	return p;
 }
 
-static __be32 *
-encode_entryplus_baggage(struct nfsd3_readdirres *cd, __be32 *p,
-		struct svc_fh *fhp)
-{
-	p = encode_post_op_attr(cd->rqstp, p, fhp);
-	*p++ = xdr_one;			/* yes, a file handle follows */
-	p = encode_fh(p, fhp);
-	fh_put(fhp);
-	return p;
-}
-
 static int
 compose_entry_fh(struct nfsd3_readdirres *cd, struct svc_fh *fhp,
 		const char *name, int namlen)
@@ -836,29 +825,54 @@
 	dparent = cd->fh.fh_dentry;
 	exp  = cd->fh.fh_export;
 
-	fh_init(fhp, NFS3_FHSIZE);
 	if (isdotent(name, namlen)) {
 		if (namlen == 2) {
 			dchild = dget_parent(dparent);
 			if (dchild == dparent) {
 				/* filesystem root - cannot return filehandle for ".." */
 				dput(dchild);
-				return 1;
+				return -ENOENT;
 			}
 		} else
 			dchild = dget(dparent);
 	} else
 		dchild = lookup_one_len(name, dparent, namlen);
 	if (IS_ERR(dchild))
-		return 1;
-	if (d_mountpoint(dchild) ||
-	    fh_compose(fhp, exp, dchild, &cd->fh) != 0 ||
-	    !dchild->d_inode)
-		rv = 1;
+		return -ENOENT;
+	rv = -ENOENT;
+	if (d_mountpoint(dchild))
+		goto out;
+	rv = fh_compose(fhp, exp, dchild, &cd->fh);
+	if (rv)
+		goto out;
+	if (!dchild->d_inode)
+		goto out;
+	rv = 0;
+out:
 	dput(dchild);
 	return rv;
 }
 
+__be32 *encode_entryplus_baggage(struct nfsd3_readdirres *cd, __be32 *p, const char *name, int namlen)
+{
+	struct svc_fh	fh;
+	int err;
+
+	fh_init(&fh, NFS3_FHSIZE);
+	err = compose_entry_fh(cd, &fh, name, namlen);
+	if (err) {
+		*p++ = 0;
+		*p++ = 0;
+		goto out;
+	}
+	p = encode_post_op_attr(cd->rqstp, p, &fh);
+	*p++ = xdr_one;			/* yes, a file handle follows */
+	p = encode_fh(p, &fh);
+out:
+	fh_put(&fh);
+	return p;
+}
+
 /*
  * Encode a directory entry. This one works for both normal readdir
  * and readdirplus.
@@ -929,16 +943,8 @@
 
 		p = encode_entry_baggage(cd, p, name, namlen, ino);
 
-		/* throw in readdirplus baggage */
-		if (plus) {
-			struct svc_fh	fh;
-
-			if (compose_entry_fh(cd, &fh, name, namlen) > 0) {
-				*p++ = 0;
-				*p++ = 0;
-			} else
-				p = encode_entryplus_baggage(cd, p, &fh);
-		}
+		if (plus)
+			p = encode_entryplus_baggage(cd, p, name, namlen);
 		num_entry_words = p - cd->buffer;
 	} else if (cd->rqstp->rq_respages[pn+1] != NULL) {
 		/* temporarily encode entry into next page, then move back to
@@ -951,17 +957,8 @@
 
 		p1 = encode_entry_baggage(cd, p1, name, namlen, ino);
 
-		/* throw in readdirplus baggage */
-		if (plus) {
-			struct svc_fh	fh;
-
-			if (compose_entry_fh(cd, &fh, name, namlen) > 0) {
-				/* zero out the filehandle */
-				*p1++ = 0;
-				*p1++ = 0;
-			} else
-				p1 = encode_entryplus_baggage(cd, p1, &fh);
-		}
+		if (plus)
+			p = encode_entryplus_baggage(cd, p1, name, namlen);
 
 		/* determine entry word length and lengths to go in pages */
 		num_entry_words = p1 - tmp;
diff --git a/fs/nfsd/nfs4acl.c b/fs/nfsd/nfs4acl.c
index 54b8b41..725d02f 100644
--- a/fs/nfsd/nfs4acl.c
+++ b/fs/nfsd/nfs4acl.c
@@ -321,7 +321,7 @@
 	deny = ~pas.group & pas.other;
 	if (deny) {
 		ace->type = NFS4_ACE_ACCESS_DENIED_ACE_TYPE;
-		ace->flag = eflag | NFS4_ACE_IDENTIFIER_GROUP;
+		ace->flag = eflag;
 		ace->access_mask = deny_mask_from_posix(deny, flags);
 		ace->whotype = NFS4_ACL_WHO_GROUP;
 		ace++;
@@ -335,7 +335,7 @@
 		if (deny) {
 			ace->type = NFS4_ACE_ACCESS_DENIED_ACE_TYPE;
 			ace->flag = eflag | NFS4_ACE_IDENTIFIER_GROUP;
-			ace->access_mask = mask_from_posix(deny, flags);
+			ace->access_mask = deny_mask_from_posix(deny, flags);
 			ace->whotype = NFS4_ACL_WHO_NAMED;
 			ace->who = pa->e_id;
 			ace++;
diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c
index 3fd23f7..24e8d78 100644
--- a/fs/nfsd/nfs4callback.c
+++ b/fs/nfsd/nfs4callback.c
@@ -43,25 +43,30 @@
 #include <linux/sunrpc/xdr.h>
 #include <linux/sunrpc/svc.h>
 #include <linux/sunrpc/clnt.h>
+#include <linux/sunrpc/svcsock.h>
 #include <linux/nfsd/nfsd.h>
 #include <linux/nfsd/state.h>
 #include <linux/sunrpc/sched.h>
 #include <linux/nfs4.h>
+#include <linux/sunrpc/xprtsock.h>
 
 #define NFSDDBG_FACILITY                NFSDDBG_PROC
 
 #define NFSPROC4_CB_NULL 0
 #define NFSPROC4_CB_COMPOUND 1
+#define NFS4_STATEID_SIZE 16
 
 /* Index of predefined Linux callback client operations */
 
 enum {
-        NFSPROC4_CLNT_CB_NULL = 0,
+	NFSPROC4_CLNT_CB_NULL = 0,
 	NFSPROC4_CLNT_CB_RECALL,
+	NFSPROC4_CLNT_CB_SEQUENCE,
 };
 
 enum nfs_cb_opnum4 {
 	OP_CB_RECALL            = 4,
+	OP_CB_SEQUENCE          = 11,
 };
 
 #define NFS4_MAXTAGLEN		20
@@ -70,17 +75,29 @@
 #define NFS4_dec_cb_null_sz		0
 #define cb_compound_enc_hdr_sz		4
 #define cb_compound_dec_hdr_sz		(3 + (NFS4_MAXTAGLEN >> 2))
+#define sessionid_sz			(NFS4_MAX_SESSIONID_LEN >> 2)
+#define cb_sequence_enc_sz		(sessionid_sz + 4 +             \
+					1 /* no referring calls list yet */)
+#define cb_sequence_dec_sz		(op_dec_sz + sessionid_sz + 4)
+
 #define op_enc_sz			1
 #define op_dec_sz			2
 #define enc_nfs4_fh_sz			(1 + (NFS4_FHSIZE >> 2))
 #define enc_stateid_sz			(NFS4_STATEID_SIZE >> 2)
 #define NFS4_enc_cb_recall_sz		(cb_compound_enc_hdr_sz +       \
+					cb_sequence_enc_sz +            \
 					1 + enc_stateid_sz +            \
 					enc_nfs4_fh_sz)
 
 #define NFS4_dec_cb_recall_sz		(cb_compound_dec_hdr_sz  +      \
+					cb_sequence_dec_sz +            \
 					op_dec_sz)
 
+struct nfs4_rpc_args {
+	void				*args_op;
+	struct nfsd4_cb_sequence	args_seq;
+};
+
 /*
 * Generic encode routines from fs/nfs/nfs4xdr.c
 */
@@ -137,11 +154,13 @@
 } while (0)
 
 struct nfs4_cb_compound_hdr {
-	int		status;
-	u32		ident;
+	/* args */
+	u32		ident;	/* minorversion 0 only */
 	u32		nops;
 	__be32		*nops_p;
 	u32		minorversion;
+	/* res */
+	int		status;
 	u32		taglen;
 	char		*tag;
 };
@@ -238,6 +257,27 @@
 	hdr->nops++;
 }
 
+static void
+encode_cb_sequence(struct xdr_stream *xdr, struct nfsd4_cb_sequence *args,
+		   struct nfs4_cb_compound_hdr *hdr)
+{
+	__be32 *p;
+
+	if (hdr->minorversion == 0)
+		return;
+
+	RESERVE_SPACE(1 + NFS4_MAX_SESSIONID_LEN + 20);
+
+	WRITE32(OP_CB_SEQUENCE);
+	WRITEMEM(args->cbs_clp->cl_sessionid.data, NFS4_MAX_SESSIONID_LEN);
+	WRITE32(args->cbs_clp->cl_cb_seq_nr);
+	WRITE32(0);		/* slotid, always 0 */
+	WRITE32(0);		/* highest slotid always 0 */
+	WRITE32(0);		/* cachethis always 0 */
+	WRITE32(0); /* FIXME: support referring_call_lists */
+	hdr->nops++;
+}
+
 static int
 nfs4_xdr_enc_cb_null(struct rpc_rqst *req, __be32 *p)
 {
@@ -249,15 +289,19 @@
 }
 
 static int
-nfs4_xdr_enc_cb_recall(struct rpc_rqst *req, __be32 *p, struct nfs4_delegation *args)
+nfs4_xdr_enc_cb_recall(struct rpc_rqst *req, __be32 *p,
+		struct nfs4_rpc_args *rpc_args)
 {
 	struct xdr_stream xdr;
+	struct nfs4_delegation *args = rpc_args->args_op;
 	struct nfs4_cb_compound_hdr hdr = {
 		.ident = args->dl_ident,
+		.minorversion = rpc_args->args_seq.cbs_minorversion,
 	};
 
 	xdr_init_encode(&xdr, &req->rq_snd_buf, p);
 	encode_cb_compound_hdr(&xdr, &hdr);
+	encode_cb_sequence(&xdr, &rpc_args->args_seq, &hdr);
 	encode_cb_recall(&xdr, args, &hdr);
 	encode_cb_nops(&hdr);
 	return 0;
@@ -299,6 +343,57 @@
 	return 0;
 }
 
+/*
+ * Our current back channel implmentation supports a single backchannel
+ * with a single slot.
+ */
+static int
+decode_cb_sequence(struct xdr_stream *xdr, struct nfsd4_cb_sequence *res,
+		   struct rpc_rqst *rqstp)
+{
+	struct nfs4_sessionid id;
+	int status;
+	u32 dummy;
+	__be32 *p;
+
+	if (res->cbs_minorversion == 0)
+		return 0;
+
+	status = decode_cb_op_hdr(xdr, OP_CB_SEQUENCE);
+	if (status)
+		return status;
+
+	/*
+	 * If the server returns different values for sessionID, slotID or
+	 * sequence number, the server is looney tunes.
+	 */
+	status = -ESERVERFAULT;
+
+	READ_BUF(NFS4_MAX_SESSIONID_LEN + 16);
+	memcpy(id.data, p, NFS4_MAX_SESSIONID_LEN);
+	p += XDR_QUADLEN(NFS4_MAX_SESSIONID_LEN);
+	if (memcmp(id.data, res->cbs_clp->cl_sessionid.data,
+		   NFS4_MAX_SESSIONID_LEN)) {
+		dprintk("%s Invalid session id\n", __func__);
+		goto out;
+	}
+	READ32(dummy);
+	if (dummy != res->cbs_clp->cl_cb_seq_nr) {
+		dprintk("%s Invalid sequence number\n", __func__);
+		goto out;
+	}
+	READ32(dummy); 	/* slotid must be 0 */
+	if (dummy != 0) {
+		dprintk("%s Invalid slotid\n", __func__);
+		goto out;
+	}
+	/* FIXME: process highest slotid and target highest slotid */
+	status = 0;
+out:
+	return status;
+}
+
+
 static int
 nfs4_xdr_dec_cb_null(struct rpc_rqst *req, __be32 *p)
 {
@@ -306,7 +401,8 @@
 }
 
 static int
-nfs4_xdr_dec_cb_recall(struct rpc_rqst *rqstp, __be32 *p)
+nfs4_xdr_dec_cb_recall(struct rpc_rqst *rqstp, __be32 *p,
+		struct nfsd4_cb_sequence *seq)
 {
 	struct xdr_stream xdr;
 	struct nfs4_cb_compound_hdr hdr;
@@ -316,6 +412,11 @@
 	status = decode_cb_compound_hdr(&xdr, &hdr);
 	if (status)
 		goto out;
+	if (seq) {
+		status = decode_cb_sequence(&xdr, seq, rqstp);
+		if (status)
+			goto out;
+	}
 	status = decode_cb_op_hdr(&xdr, OP_CB_RECALL);
 out:
 	return status;
@@ -377,16 +478,15 @@
 
 int setup_callback_client(struct nfs4_client *clp)
 {
-	struct sockaddr_in	addr;
 	struct nfs4_cb_conn *cb = &clp->cl_cb_conn;
 	struct rpc_timeout	timeparms = {
 		.to_initval	= max_cb_time(),
 		.to_retries	= 0,
 	};
 	struct rpc_create_args args = {
-		.protocol	= IPPROTO_TCP,
-		.address	= (struct sockaddr *)&addr,
-		.addrsize	= sizeof(addr),
+		.protocol	= XPRT_TRANSPORT_TCP,
+		.address	= (struct sockaddr *) &cb->cb_addr,
+		.addrsize	= cb->cb_addrlen,
 		.timeout	= &timeparms,
 		.program	= &cb_program,
 		.prognumber	= cb->cb_prog,
@@ -399,13 +499,10 @@
 
 	if (!clp->cl_principal && (clp->cl_flavor >= RPC_AUTH_GSS_KRB5))
 		return -EINVAL;
-
-	/* Initialize address */
-	memset(&addr, 0, sizeof(addr));
-	addr.sin_family = AF_INET;
-	addr.sin_port = htons(cb->cb_port);
-	addr.sin_addr.s_addr = htonl(cb->cb_addr);
-
+	if (cb->cb_minorversion) {
+		args.bc_xprt = clp->cl_cb_xprt;
+		args.protocol = XPRT_TRANSPORT_BC_TCP;
+	}
 	/* Create RPC client */
 	client = rpc_create(&args);
 	if (IS_ERR(client)) {
@@ -439,42 +536,29 @@
 	.rpc_call_done = nfsd4_cb_probe_done,
 };
 
-static struct rpc_cred *lookup_cb_cred(struct nfs4_cb_conn *cb)
-{
-	struct auth_cred acred = {
-		.machine_cred = 1
-	};
+static struct rpc_cred *callback_cred;
 
-	/*
-	 * Note in the gss case this doesn't actually have to wait for a
-	 * gss upcall (or any calls to the client); this just creates a
-	 * non-uptodate cred which the rpc state machine will fill in with
-	 * a refresh_upcall later.
-	 */
-	return rpcauth_lookup_credcache(cb->cb_client->cl_auth, &acred,
-							RPCAUTH_LOOKUP_NEW);
+int set_callback_cred(void)
+{
+	callback_cred = rpc_lookup_machine_cred();
+	if (!callback_cred)
+		return -ENOMEM;
+	return 0;
 }
 
+
 void do_probe_callback(struct nfs4_client *clp)
 {
 	struct nfs4_cb_conn *cb = &clp->cl_cb_conn;
 	struct rpc_message msg = {
 		.rpc_proc       = &nfs4_cb_procedures[NFSPROC4_CLNT_CB_NULL],
 		.rpc_argp       = clp,
+		.rpc_cred	= callback_cred
 	};
-	struct rpc_cred *cred;
 	int status;
 
-	cred = lookup_cb_cred(cb);
-	if (IS_ERR(cred)) {
-		status = PTR_ERR(cred);
-		goto out;
-	}
-	cb->cb_cred = cred;
-	msg.rpc_cred = cb->cb_cred;
 	status = rpc_call_async(cb->cb_client, &msg, RPC_TASK_SOFT,
 				&nfsd4_cb_probe_ops, (void *)clp);
-out:
 	if (status) {
 		warn_no_callback_path(clp, status);
 		put_nfs4_client(clp);
@@ -503,11 +587,95 @@
 	do_probe_callback(clp);
 }
 
+/*
+ * There's currently a single callback channel slot.
+ * If the slot is available, then mark it busy.  Otherwise, set the
+ * thread for sleeping on the callback RPC wait queue.
+ */
+static int nfsd41_cb_setup_sequence(struct nfs4_client *clp,
+		struct rpc_task *task)
+{
+	struct nfs4_rpc_args *args = task->tk_msg.rpc_argp;
+	u32 *ptr = (u32 *)clp->cl_sessionid.data;
+	int status = 0;
+
+	dprintk("%s: %u:%u:%u:%u\n", __func__,
+		ptr[0], ptr[1], ptr[2], ptr[3]);
+
+	if (test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
+		rpc_sleep_on(&clp->cl_cb_waitq, task, NULL);
+		dprintk("%s slot is busy\n", __func__);
+		status = -EAGAIN;
+		goto out;
+	}
+
+	/*
+	 * We'll need the clp during XDR encoding and decoding,
+	 * and the sequence during decoding to verify the reply
+	 */
+	args->args_seq.cbs_clp = clp;
+	task->tk_msg.rpc_resp = &args->args_seq;
+
+out:
+	dprintk("%s status=%d\n", __func__, status);
+	return status;
+}
+
+/*
+ * TODO: cb_sequence should support referring call lists, cachethis, multiple
+ * slots, and mark callback channel down on communication errors.
+ */
+static void nfsd4_cb_prepare(struct rpc_task *task, void *calldata)
+{
+	struct nfs4_delegation *dp = calldata;
+	struct nfs4_client *clp = dp->dl_client;
+	struct nfs4_rpc_args *args = task->tk_msg.rpc_argp;
+	u32 minorversion = clp->cl_cb_conn.cb_minorversion;
+	int status = 0;
+
+	args->args_seq.cbs_minorversion = minorversion;
+	if (minorversion) {
+		status = nfsd41_cb_setup_sequence(clp, task);
+		if (status) {
+			if (status != -EAGAIN) {
+				/* terminate rpc task */
+				task->tk_status = status;
+				task->tk_action = NULL;
+			}
+			return;
+		}
+	}
+	rpc_call_start(task);
+}
+
+static void nfsd4_cb_done(struct rpc_task *task, void *calldata)
+{
+	struct nfs4_delegation *dp = calldata;
+	struct nfs4_client *clp = dp->dl_client;
+
+	dprintk("%s: minorversion=%d\n", __func__,
+		clp->cl_cb_conn.cb_minorversion);
+
+	if (clp->cl_cb_conn.cb_minorversion) {
+		/* No need for lock, access serialized in nfsd4_cb_prepare */
+		++clp->cl_cb_seq_nr;
+		clear_bit(0, &clp->cl_cb_slot_busy);
+		rpc_wake_up_next(&clp->cl_cb_waitq);
+		dprintk("%s: freed slot, new seqid=%d\n", __func__,
+			clp->cl_cb_seq_nr);
+
+		/* We're done looking into the sequence information */
+		task->tk_msg.rpc_resp = NULL;
+	}
+}
+
 static void nfsd4_cb_recall_done(struct rpc_task *task, void *calldata)
 {
 	struct nfs4_delegation *dp = calldata;
 	struct nfs4_client *clp = dp->dl_client;
 
+	nfsd4_cb_done(task, calldata);
+
 	switch (task->tk_status) {
 	case -EIO:
 		/* Network partition? */
@@ -520,16 +688,19 @@
 		break;
 	default:
 		/* success, or error we can't handle */
-		return;
+		goto done;
 	}
 	if (dp->dl_retries--) {
 		rpc_delay(task, 2*HZ);
 		task->tk_status = 0;
 		rpc_restart_call(task);
+		return;
 	} else {
 		atomic_set(&clp->cl_cb_conn.cb_set, 0);
 		warn_no_callback_path(clp, task->tk_status);
 	}
+done:
+	kfree(task->tk_msg.rpc_argp);
 }
 
 static void nfsd4_cb_recall_release(void *calldata)
@@ -542,6 +713,7 @@
 }
 
 static const struct rpc_call_ops nfsd4_cb_recall_ops = {
+	.rpc_call_prepare = nfsd4_cb_prepare,
 	.rpc_call_done = nfsd4_cb_recall_done,
 	.rpc_release = nfsd4_cb_recall_release,
 };
@@ -554,17 +726,24 @@
 {
 	struct nfs4_client *clp = dp->dl_client;
 	struct rpc_clnt *clnt = clp->cl_cb_conn.cb_client;
+	struct nfs4_rpc_args *args;
 	struct rpc_message msg = {
 		.rpc_proc = &nfs4_cb_procedures[NFSPROC4_CLNT_CB_RECALL],
-		.rpc_argp = dp,
-		.rpc_cred = clp->cl_cb_conn.cb_cred
+		.rpc_cred = callback_cred
 	};
-	int status;
+	int status = -ENOMEM;
 
+	args = kzalloc(sizeof(*args), GFP_KERNEL);
+	if (!args)
+		goto out;
+	args->args_op = dp;
+	msg.rpc_argp = args;
 	dp->dl_retries = 1;
 	status = rpc_call_async(clnt, &msg, RPC_TASK_SOFT,
 				&nfsd4_cb_recall_ops, dp);
+out:
 	if (status) {
+		kfree(args);
 		put_nfs4_client(clp);
 		nfs4_put_delegation(dp);
 	}
diff --git a/fs/nfsd/nfs4proc.c b/fs/nfsd/nfs4proc.c
index 7c88017..bebc0c2 100644
--- a/fs/nfsd/nfs4proc.c
+++ b/fs/nfsd/nfs4proc.c
@@ -68,7 +68,6 @@
 		   u32 *bmval, u32 *writable)
 {
 	struct dentry *dentry = cstate->current_fh.fh_dentry;
-	struct svc_export *exp = cstate->current_fh.fh_export;
 
 	/*
 	 * Check about attributes are supported by the NFSv4 server or not.
@@ -80,17 +79,13 @@
 		return nfserr_attrnotsupp;
 
 	/*
-	 * Check FATTR4_WORD0_ACL & FATTR4_WORD0_FS_LOCATIONS can be supported
+	 * Check FATTR4_WORD0_ACL can be supported
 	 * in current environment or not.
 	 */
 	if (bmval[0] & FATTR4_WORD0_ACL) {
 		if (!IS_POSIXACL(dentry->d_inode))
 			return nfserr_attrnotsupp;
 	}
-	if (bmval[0] & FATTR4_WORD0_FS_LOCATIONS) {
-		if (exp->ex_fslocs.locations == NULL)
-			return nfserr_attrnotsupp;
-	}
 
 	/*
 	 * According to spec, read-only attributes return ERR_INVAL.
@@ -123,6 +118,35 @@
 	return status;
 }
 
+static int
+is_create_with_attrs(struct nfsd4_open *open)
+{
+	return open->op_create == NFS4_OPEN_CREATE
+		&& (open->op_createmode == NFS4_CREATE_UNCHECKED
+		    || open->op_createmode == NFS4_CREATE_GUARDED
+		    || open->op_createmode == NFS4_CREATE_EXCLUSIVE4_1);
+}
+
+/*
+ * if error occurs when setting the acl, just clear the acl bit
+ * in the returned attr bitmap.
+ */
+static void
+do_set_nfs4_acl(struct svc_rqst *rqstp, struct svc_fh *fhp,
+		struct nfs4_acl *acl, u32 *bmval)
+{
+	__be32 status;
+
+	status = nfsd4_set_nfs4_acl(rqstp, fhp, acl);
+	if (status)
+		/*
+		 * We should probably fail the whole open at this point,
+		 * but we've already created the file, so it's too late;
+		 * So this seems the least of evils:
+		 */
+		bmval[0] &= ~FATTR4_WORD0_ACL;
+}
+
 static inline void
 fh_dup2(struct svc_fh *dst, struct svc_fh *src)
 {
@@ -206,6 +230,9 @@
 	if (status)
 		goto out;
 
+	if (is_create_with_attrs(open) && open->op_acl != NULL)
+		do_set_nfs4_acl(rqstp, &resfh, open->op_acl, open->op_bmval);
+
 	set_change_info(&open->op_cinfo, current_fh);
 	fh_dup2(current_fh, &resfh);
 
@@ -536,12 +563,17 @@
 		status = nfserr_badtype;
 	}
 
-	if (!status) {
-		fh_unlock(&cstate->current_fh);
-		set_change_info(&create->cr_cinfo, &cstate->current_fh);
-		fh_dup2(&cstate->current_fh, &resfh);
-	}
+	if (status)
+		goto out;
 
+	if (create->cr_acl != NULL)
+		do_set_nfs4_acl(rqstp, &resfh, create->cr_acl,
+				create->cr_bmval);
+
+	fh_unlock(&cstate->current_fh);
+	set_change_info(&create->cr_cinfo, &cstate->current_fh);
+	fh_dup2(&cstate->current_fh, &resfh);
+out:
 	fh_put(&resfh);
 	return status;
 }
@@ -947,34 +979,6 @@
 static const char *nfsd4_op_name(unsigned opnum);
 
 /*
- * This is a replay of a compound for which no cache entry pages
- * were used. Encode the sequence operation, and if cachethis is FALSE
- * encode the uncache rep error on the next operation.
- */
-static __be32
-nfsd4_enc_uncached_replay(struct nfsd4_compoundargs *args,
-			 struct nfsd4_compoundres *resp)
-{
-	struct nfsd4_op *op;
-
-	dprintk("--> %s resp->opcnt %d ce_cachethis %u \n", __func__,
-		resp->opcnt, resp->cstate.slot->sl_cache_entry.ce_cachethis);
-
-	/* Encode the replayed sequence operation */
-	BUG_ON(resp->opcnt != 1);
-	op = &args->ops[resp->opcnt - 1];
-	nfsd4_encode_operation(resp, op);
-
-	/*return nfserr_retry_uncached_rep in next operation. */
-	if (resp->cstate.slot->sl_cache_entry.ce_cachethis == 0) {
-		op = &args->ops[resp->opcnt++];
-		op->status = nfserr_retry_uncached_rep;
-		nfsd4_encode_operation(resp, op);
-	}
-	return op->status;
-}
-
-/*
  * Enforce NFSv4.1 COMPOUND ordering rules.
  *
  * TODO:
@@ -1083,13 +1087,10 @@
 			BUG_ON(op->status == nfs_ok);
 
 encode_op:
-		/* Only from SEQUENCE or CREATE_SESSION */
+		/* Only from SEQUENCE */
 		if (resp->cstate.status == nfserr_replay_cache) {
 			dprintk("%s NFS4.1 replay from cache\n", __func__);
-			if (nfsd4_not_cached(resp))
-				status = nfsd4_enc_uncached_replay(args, resp);
-			else
-				status = op->status;
+			status = op->status;
 			goto out;
 		}
 		if (op->status == nfserr_replay_me) {
diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
index 766d3d5..2153f9bd 100644
--- a/fs/nfsd/nfs4state.c
+++ b/fs/nfsd/nfs4state.c
@@ -55,6 +55,7 @@
 #include <linux/lockd/bind.h>
 #include <linux/module.h>
 #include <linux/sunrpc/svcauth_gss.h>
+#include <linux/sunrpc/clnt.h>
 
 #define NFSDDBG_FACILITY                NFSDDBG_PROC
 
@@ -413,36 +414,65 @@
 }
 
 /*
- * Give the client the number of slots it requests bound by
- * NFSD_MAX_SLOTS_PER_SESSION and by sv_drc_max_pages.
- *
- * If we run out of pages (sv_drc_pages_used == sv_drc_max_pages) we
- * should (up to a point) re-negotiate active sessions and reduce their
- * slot usage to make rooom for new connections. For now we just fail the
- * create session.
+ * The protocol defines ca_maxresponssize_cached to include the size of
+ * the rpc header, but all we need to cache is the data starting after
+ * the end of the initial SEQUENCE operation--the rest we regenerate
+ * each time.  Therefore we can advertise a ca_maxresponssize_cached
+ * value that is the number of bytes in our cache plus a few additional
+ * bytes.  In order to stay on the safe side, and not promise more than
+ * we can cache, those additional bytes must be the minimum possible: 24
+ * bytes of rpc header (xid through accept state, with AUTH_NULL
+ * verifier), 12 for the compound header (with zero-length tag), and 44
+ * for the SEQUENCE op response:
  */
-static int set_forechannel_maxreqs(struct nfsd4_channel_attrs *fchan)
+#define NFSD_MIN_HDR_SEQ_SZ  (24 + 12 + 44)
+
+/*
+ * Give the client the number of ca_maxresponsesize_cached slots it
+ * requests, of size bounded by NFSD_SLOT_CACHE_SIZE,
+ * NFSD_MAX_MEM_PER_SESSION, and nfsd_drc_max_mem. Do not allow more
+ * than NFSD_MAX_SLOTS_PER_SESSION.
+ *
+ * If we run out of reserved DRC memory we should (up to a point)
+ * re-negotiate active sessions and reduce their slot usage to make
+ * rooom for new connections. For now we just fail the create session.
+ */
+static int set_forechannel_drc_size(struct nfsd4_channel_attrs *fchan)
 {
-	int status = 0, np = fchan->maxreqs * NFSD_PAGES_PER_SLOT;
+	int mem, size = fchan->maxresp_cached;
 
 	if (fchan->maxreqs < 1)
 		return nfserr_inval;
-	else if (fchan->maxreqs > NFSD_MAX_SLOTS_PER_SESSION)
-		fchan->maxreqs = NFSD_MAX_SLOTS_PER_SESSION;
 
-	spin_lock(&nfsd_serv->sv_lock);
-	if (np + nfsd_serv->sv_drc_pages_used > nfsd_serv->sv_drc_max_pages)
-		np = nfsd_serv->sv_drc_max_pages - nfsd_serv->sv_drc_pages_used;
-	nfsd_serv->sv_drc_pages_used += np;
-	spin_unlock(&nfsd_serv->sv_lock);
+	if (size < NFSD_MIN_HDR_SEQ_SZ)
+		size = NFSD_MIN_HDR_SEQ_SZ;
+	size -= NFSD_MIN_HDR_SEQ_SZ;
+	if (size > NFSD_SLOT_CACHE_SIZE)
+		size = NFSD_SLOT_CACHE_SIZE;
 
-	if (np <= 0) {
-		status = nfserr_resource;
-		fchan->maxreqs = 0;
-	} else
-		fchan->maxreqs = np / NFSD_PAGES_PER_SLOT;
+	/* bound the maxreqs by NFSD_MAX_MEM_PER_SESSION */
+	mem = fchan->maxreqs * size;
+	if (mem > NFSD_MAX_MEM_PER_SESSION) {
+		fchan->maxreqs = NFSD_MAX_MEM_PER_SESSION / size;
+		if (fchan->maxreqs > NFSD_MAX_SLOTS_PER_SESSION)
+			fchan->maxreqs = NFSD_MAX_SLOTS_PER_SESSION;
+		mem = fchan->maxreqs * size;
+	}
 
-	return status;
+	spin_lock(&nfsd_drc_lock);
+	/* bound the total session drc memory ussage */
+	if (mem + nfsd_drc_mem_used > nfsd_drc_max_mem) {
+		fchan->maxreqs = (nfsd_drc_max_mem - nfsd_drc_mem_used) / size;
+		mem = fchan->maxreqs * size;
+	}
+	nfsd_drc_mem_used += mem;
+	spin_unlock(&nfsd_drc_lock);
+
+	if (fchan->maxreqs == 0)
+		return nfserr_serverfault;
+
+	fchan->maxresp_cached = size + NFSD_MIN_HDR_SEQ_SZ;
+	return 0;
 }
 
 /*
@@ -466,36 +496,41 @@
 		fchan->maxresp_sz = maxcount;
 	session_fchan->maxresp_sz = fchan->maxresp_sz;
 
-	/* Set the max response cached size our default which is
-	 * a multiple of PAGE_SIZE and small */
-	session_fchan->maxresp_cached = NFSD_PAGES_PER_SLOT * PAGE_SIZE;
-	fchan->maxresp_cached = session_fchan->maxresp_cached;
-
 	/* Use the client's maxops if possible */
 	if (fchan->maxops > NFSD_MAX_OPS_PER_COMPOUND)
 		fchan->maxops = NFSD_MAX_OPS_PER_COMPOUND;
 	session_fchan->maxops = fchan->maxops;
 
-	/* try to use the client requested number of slots */
-	if (fchan->maxreqs > NFSD_MAX_SLOTS_PER_SESSION)
-		fchan->maxreqs = NFSD_MAX_SLOTS_PER_SESSION;
-
 	/* FIXME: Error means no more DRC pages so the server should
 	 * recover pages from existing sessions. For now fail session
 	 * creation.
 	 */
-	status = set_forechannel_maxreqs(fchan);
+	status = set_forechannel_drc_size(fchan);
 
+	session_fchan->maxresp_cached = fchan->maxresp_cached;
 	session_fchan->maxreqs = fchan->maxreqs;
+
+	dprintk("%s status %d\n", __func__, status);
 	return status;
 }
 
+static void
+free_session_slots(struct nfsd4_session *ses)
+{
+	int i;
+
+	for (i = 0; i < ses->se_fchannel.maxreqs; i++)
+		kfree(ses->se_slots[i]);
+}
+
 static int
 alloc_init_session(struct svc_rqst *rqstp, struct nfs4_client *clp,
 		   struct nfsd4_create_session *cses)
 {
 	struct nfsd4_session *new, tmp;
-	int idx, status = nfserr_resource, slotsize;
+	struct nfsd4_slot *sp;
+	int idx, slotsize, cachesize, i;
+	int status;
 
 	memset(&tmp, 0, sizeof(tmp));
 
@@ -506,14 +541,27 @@
 	if (status)
 		goto out;
 
-	/* allocate struct nfsd4_session and slot table in one piece */
-	slotsize = tmp.se_fchannel.maxreqs * sizeof(struct nfsd4_slot);
+	BUILD_BUG_ON(NFSD_MAX_SLOTS_PER_SESSION * sizeof(struct nfsd4_slot)
+		     + sizeof(struct nfsd4_session) > PAGE_SIZE);
+
+	status = nfserr_serverfault;
+	/* allocate struct nfsd4_session and slot table pointers in one piece */
+	slotsize = tmp.se_fchannel.maxreqs * sizeof(struct nfsd4_slot *);
 	new = kzalloc(sizeof(*new) + slotsize, GFP_KERNEL);
 	if (!new)
 		goto out;
 
 	memcpy(new, &tmp, sizeof(*new));
 
+	/* allocate each struct nfsd4_slot and data cache in one piece */
+	cachesize = new->se_fchannel.maxresp_cached - NFSD_MIN_HDR_SEQ_SZ;
+	for (i = 0; i < new->se_fchannel.maxreqs; i++) {
+		sp = kzalloc(sizeof(*sp) + cachesize, GFP_KERNEL);
+		if (!sp)
+			goto out_free;
+		new->se_slots[i] = sp;
+	}
+
 	new->se_client = clp;
 	gen_sessionid(new);
 	idx = hash_sessionid(&new->se_sessionid);
@@ -530,6 +578,10 @@
 	status = nfs_ok;
 out:
 	return status;
+out_free:
+	free_session_slots(new);
+	kfree(new);
+	goto out;
 }
 
 /* caller must hold sessionid_lock */
@@ -572,19 +624,16 @@
 	nfsd4_put_session(ses);
 }
 
-static void nfsd4_release_respages(struct page **respages, short resused);
-
 void
 free_session(struct kref *kref)
 {
 	struct nfsd4_session *ses;
-	int i;
 
 	ses = container_of(kref, struct nfsd4_session, se_ref);
-	for (i = 0; i < ses->se_fchannel.maxreqs; i++) {
-		struct nfsd4_cache_entry *e = &ses->se_slots[i].sl_cache_entry;
-		nfsd4_release_respages(e->ce_respages, e->ce_resused);
-	}
+	spin_lock(&nfsd_drc_lock);
+	nfsd_drc_mem_used -= ses->se_fchannel.maxreqs * NFSD_SLOT_CACHE_SIZE;
+	spin_unlock(&nfsd_drc_lock);
+	free_session_slots(ses);
 	kfree(ses);
 }
 
@@ -647,18 +696,14 @@
 		clp->cl_cb_conn.cb_client = NULL;
 		rpc_shutdown_client(clnt);
 	}
-	if (clp->cl_cb_conn.cb_cred) {
-		put_rpccred(clp->cl_cb_conn.cb_cred);
-		clp->cl_cb_conn.cb_cred = NULL;
-	}
 }
 
 static inline void
 free_client(struct nfs4_client *clp)
 {
 	shutdown_callback_client(clp);
-	nfsd4_release_respages(clp->cl_slot.sl_cache_entry.ce_respages,
-			     clp->cl_slot.sl_cache_entry.ce_resused);
+	if (clp->cl_cb_xprt)
+		svc_xprt_put(clp->cl_cb_xprt);
 	if (clp->cl_cred.cr_group_info)
 		put_group_info(clp->cl_cred.cr_group_info);
 	kfree(clp->cl_principal);
@@ -714,25 +759,6 @@
 	put_nfs4_client(clp);
 }
 
-static struct nfs4_client *create_client(struct xdr_netobj name, char *recdir)
-{
-	struct nfs4_client *clp;
-
-	clp = alloc_client(name);
-	if (clp == NULL)
-		return NULL;
-	memcpy(clp->cl_recdir, recdir, HEXDIR_LEN);
-	atomic_set(&clp->cl_count, 1);
-	atomic_set(&clp->cl_cb_conn.cb_set, 0);
-	INIT_LIST_HEAD(&clp->cl_idhash);
-	INIT_LIST_HEAD(&clp->cl_strhash);
-	INIT_LIST_HEAD(&clp->cl_openowners);
-	INIT_LIST_HEAD(&clp->cl_delegations);
-	INIT_LIST_HEAD(&clp->cl_sessions);
-	INIT_LIST_HEAD(&clp->cl_lru);
-	return clp;
-}
-
 static void copy_verf(struct nfs4_client *target, nfs4_verifier *source)
 {
 	memcpy(target->cl_verifier.data, source->data,
@@ -795,6 +821,46 @@
 	*p++ = i++;
 }
 
+static struct nfs4_client *create_client(struct xdr_netobj name, char *recdir,
+		struct svc_rqst *rqstp, nfs4_verifier *verf)
+{
+	struct nfs4_client *clp;
+	struct sockaddr *sa = svc_addr(rqstp);
+	char *princ;
+
+	clp = alloc_client(name);
+	if (clp == NULL)
+		return NULL;
+
+	princ = svc_gss_principal(rqstp);
+	if (princ) {
+		clp->cl_principal = kstrdup(princ, GFP_KERNEL);
+		if (clp->cl_principal == NULL) {
+			free_client(clp);
+			return NULL;
+		}
+	}
+
+	memcpy(clp->cl_recdir, recdir, HEXDIR_LEN);
+	atomic_set(&clp->cl_count, 1);
+	atomic_set(&clp->cl_cb_conn.cb_set, 0);
+	INIT_LIST_HEAD(&clp->cl_idhash);
+	INIT_LIST_HEAD(&clp->cl_strhash);
+	INIT_LIST_HEAD(&clp->cl_openowners);
+	INIT_LIST_HEAD(&clp->cl_delegations);
+	INIT_LIST_HEAD(&clp->cl_sessions);
+	INIT_LIST_HEAD(&clp->cl_lru);
+	clear_bit(0, &clp->cl_cb_slot_busy);
+	rpc_init_wait_queue(&clp->cl_cb_waitq, "Backchannel slot table");
+	copy_verf(clp, verf);
+	rpc_copy_addr((struct sockaddr *) &clp->cl_addr, sa);
+	clp->cl_flavor = rqstp->rq_flavor;
+	copy_cred(&clp->cl_cred, &rqstp->rq_cred);
+	gen_confirm(clp);
+
+	return clp;
+}
+
 static int check_name(struct xdr_netobj name)
 {
 	if (name.len == 0) 
@@ -902,93 +968,40 @@
 	return NULL;
 }
 
-/* a helper function for parse_callback */
-static int
-parse_octet(unsigned int *lenp, char **addrp)
-{
-	unsigned int len = *lenp;
-	char *p = *addrp;
-	int n = -1;
-	char c;
-
-	for (;;) {
-		if (!len)
-			break;
-		len--;
-		c = *p++;
-		if (c == '.')
-			break;
-		if ((c < '0') || (c > '9')) {
-			n = -1;
-			break;
-		}
-		if (n < 0)
-			n = 0;
-		n = (n * 10) + (c - '0');
-		if (n > 255) {
-			n = -1;
-			break;
-		}
-	}
-	*lenp = len;
-	*addrp = p;
-	return n;
-}
-
-/* parse and set the setclientid ipv4 callback address */
-static int
-parse_ipv4(unsigned int addr_len, char *addr_val, unsigned int *cbaddrp, unsigned short *cbportp)
-{
-	int temp = 0;
-	u32 cbaddr = 0;
-	u16 cbport = 0;
-	u32 addrlen = addr_len;
-	char *addr = addr_val;
-	int i, shift;
-
-	/* ipaddress */
-	shift = 24;
-	for(i = 4; i > 0  ; i--) {
-		if ((temp = parse_octet(&addrlen, &addr)) < 0) {
-			return 0;
-		}
-		cbaddr |= (temp << shift);
-		if (shift > 0)
-		shift -= 8;
-	}
-	*cbaddrp = cbaddr;
-
-	/* port */
-	shift = 8;
-	for(i = 2; i > 0  ; i--) {
-		if ((temp = parse_octet(&addrlen, &addr)) < 0) {
-			return 0;
-		}
-		cbport |= (temp << shift);
-		if (shift > 0)
-			shift -= 8;
-	}
-	*cbportp = cbport;
-	return 1;
-}
-
 static void
-gen_callback(struct nfs4_client *clp, struct nfsd4_setclientid *se)
+gen_callback(struct nfs4_client *clp, struct nfsd4_setclientid *se, u32 scopeid)
 {
 	struct nfs4_cb_conn *cb = &clp->cl_cb_conn;
+	unsigned short expected_family;
 
-	/* Currently, we only support tcp for the callback channel */
-	if ((se->se_callback_netid_len != 3) || memcmp((char *)se->se_callback_netid_val, "tcp", 3))
+	/* Currently, we only support tcp and tcp6 for the callback channel */
+	if (se->se_callback_netid_len == 3 &&
+	    !memcmp(se->se_callback_netid_val, "tcp", 3))
+		expected_family = AF_INET;
+	else if (se->se_callback_netid_len == 4 &&
+		 !memcmp(se->se_callback_netid_val, "tcp6", 4))
+		expected_family = AF_INET6;
+	else
 		goto out_err;
 
-	if ( !(parse_ipv4(se->se_callback_addr_len, se->se_callback_addr_val,
-	                 &cb->cb_addr, &cb->cb_port)))
+	cb->cb_addrlen = rpc_uaddr2sockaddr(se->se_callback_addr_val,
+					    se->se_callback_addr_len,
+					    (struct sockaddr *) &cb->cb_addr,
+					    sizeof(cb->cb_addr));
+
+	if (!cb->cb_addrlen || cb->cb_addr.ss_family != expected_family)
 		goto out_err;
+
+	if (cb->cb_addr.ss_family == AF_INET6)
+		((struct sockaddr_in6 *) &cb->cb_addr)->sin6_scope_id = scopeid;
+
 	cb->cb_minorversion = 0;
 	cb->cb_prog = se->se_callback_prog;
 	cb->cb_ident = se->se_callback_ident;
 	return;
 out_err:
+	cb->cb_addr.ss_family = AF_UNSPEC;
+	cb->cb_addrlen = 0;
 	dprintk(KERN_INFO "NFSD: this client (clientid %08x/%08x) "
 		"will not receive delegations\n",
 		clp->cl_clientid.cl_boot, clp->cl_clientid.cl_id);
@@ -996,175 +1009,87 @@
 	return;
 }
 
-void
-nfsd4_set_statp(struct svc_rqst *rqstp, __be32 *statp)
-{
-	struct nfsd4_compoundres *resp = rqstp->rq_resp;
-
-	resp->cstate.statp = statp;
-}
-
 /*
- * Dereference the result pages.
- */
-static void
-nfsd4_release_respages(struct page **respages, short resused)
-{
-	int i;
-
-	dprintk("--> %s\n", __func__);
-	for (i = 0; i < resused; i++) {
-		if (!respages[i])
-			continue;
-		put_page(respages[i]);
-		respages[i] = NULL;
-	}
-}
-
-static void
-nfsd4_copy_pages(struct page **topages, struct page **frompages, short count)
-{
-	int i;
-
-	for (i = 0; i < count; i++) {
-		topages[i] = frompages[i];
-		if (!topages[i])
-			continue;
-		get_page(topages[i]);
-	}
-}
-
-/*
- * Cache the reply pages up to NFSD_PAGES_PER_SLOT + 1, clearing the previous
- * pages. We add a page to NFSD_PAGES_PER_SLOT for the case where the total
- * length of the XDR response is less than se_fmaxresp_cached
- * (NFSD_PAGES_PER_SLOT * PAGE_SIZE) but the xdr_buf pages is used for a
- * of the reply (e.g. readdir).
- *
- * Store the base and length of the rq_req.head[0] page
- * of the NFSv4.1 data, just past the rpc header.
+ * Cache a reply. nfsd4_check_drc_limit() has bounded the cache size.
  */
 void
 nfsd4_store_cache_entry(struct nfsd4_compoundres *resp)
 {
-	struct nfsd4_cache_entry *entry = &resp->cstate.slot->sl_cache_entry;
-	struct svc_rqst *rqstp = resp->rqstp;
-	struct nfsd4_compoundargs *args = rqstp->rq_argp;
-	struct nfsd4_op *op = &args->ops[resp->opcnt];
-	struct kvec *resv = &rqstp->rq_res.head[0];
+	struct nfsd4_slot *slot = resp->cstate.slot;
+	unsigned int base;
 
-	dprintk("--> %s entry %p\n", __func__, entry);
+	dprintk("--> %s slot %p\n", __func__, slot);
 
-	/* Don't cache a failed OP_SEQUENCE. */
-	if (resp->opcnt == 1 && op->opnum == OP_SEQUENCE && resp->cstate.status)
-		return;
-
-	nfsd4_release_respages(entry->ce_respages, entry->ce_resused);
-	entry->ce_opcnt = resp->opcnt;
-	entry->ce_status = resp->cstate.status;
-
-	/*
-	 * Don't need a page to cache just the sequence operation - the slot
-	 * does this for us!
-	 */
+	slot->sl_opcnt = resp->opcnt;
+	slot->sl_status = resp->cstate.status;
 
 	if (nfsd4_not_cached(resp)) {
-		entry->ce_resused = 0;
-		entry->ce_rpchdrlen = 0;
-		dprintk("%s Just cache SEQUENCE. ce_cachethis %d\n", __func__,
-			resp->cstate.slot->sl_cache_entry.ce_cachethis);
+		slot->sl_datalen = 0;
 		return;
 	}
-	entry->ce_resused = rqstp->rq_resused;
-	if (entry->ce_resused > NFSD_PAGES_PER_SLOT + 1)
-		entry->ce_resused = NFSD_PAGES_PER_SLOT + 1;
-	nfsd4_copy_pages(entry->ce_respages, rqstp->rq_respages,
-			 entry->ce_resused);
-	entry->ce_datav.iov_base = resp->cstate.statp;
-	entry->ce_datav.iov_len = resv->iov_len - ((char *)resp->cstate.statp -
-				(char *)page_address(rqstp->rq_respages[0]));
-	/* Current request rpc header length*/
-	entry->ce_rpchdrlen = (char *)resp->cstate.statp -
-				(char *)page_address(rqstp->rq_respages[0]);
+	slot->sl_datalen = (char *)resp->p - (char *)resp->cstate.datap;
+	base = (char *)resp->cstate.datap -
+					(char *)resp->xbuf->head[0].iov_base;
+	if (read_bytes_from_xdr_buf(resp->xbuf, base, slot->sl_data,
+				    slot->sl_datalen))
+		WARN("%s: sessions DRC could not cache compound\n", __func__);
+	return;
 }
 
 /*
- * We keep the rpc header, but take the nfs reply from the replycache.
+ * Encode the replay sequence operation from the slot values.
+ * If cachethis is FALSE encode the uncached rep error on the next
+ * operation which sets resp->p and increments resp->opcnt for
+ * nfs4svc_encode_compoundres.
+ *
  */
-static int
-nfsd41_copy_replay_data(struct nfsd4_compoundres *resp,
-			struct nfsd4_cache_entry *entry)
+static __be32
+nfsd4_enc_sequence_replay(struct nfsd4_compoundargs *args,
+			  struct nfsd4_compoundres *resp)
 {
-	struct svc_rqst *rqstp = resp->rqstp;
-	struct kvec *resv = &resp->rqstp->rq_res.head[0];
-	int len;
+	struct nfsd4_op *op;
+	struct nfsd4_slot *slot = resp->cstate.slot;
 
-	/* Current request rpc header length*/
-	len = (char *)resp->cstate.statp -
-			(char *)page_address(rqstp->rq_respages[0]);
-	if (entry->ce_datav.iov_len + len > PAGE_SIZE) {
-		dprintk("%s v41 cached reply too large (%Zd).\n", __func__,
-			entry->ce_datav.iov_len);
-		return 0;
+	dprintk("--> %s resp->opcnt %d cachethis %u \n", __func__,
+		resp->opcnt, resp->cstate.slot->sl_cachethis);
+
+	/* Encode the replayed sequence operation */
+	op = &args->ops[resp->opcnt - 1];
+	nfsd4_encode_operation(resp, op);
+
+	/* Return nfserr_retry_uncached_rep in next operation. */
+	if (args->opcnt > 1 && slot->sl_cachethis == 0) {
+		op = &args->ops[resp->opcnt++];
+		op->status = nfserr_retry_uncached_rep;
+		nfsd4_encode_operation(resp, op);
 	}
-	/* copy the cached reply nfsd data past the current rpc header */
-	memcpy((char *)resv->iov_base + len, entry->ce_datav.iov_base,
-		entry->ce_datav.iov_len);
-	resv->iov_len = len + entry->ce_datav.iov_len;
-	return 1;
+	return op->status;
 }
 
 /*
- * Keep the first page of the replay. Copy the NFSv4.1 data from the first
- * cached page.  Replace any futher replay pages from the cache.
+ * The sequence operation is not cached because we can use the slot and
+ * session values.
  */
 __be32
 nfsd4_replay_cache_entry(struct nfsd4_compoundres *resp,
 			 struct nfsd4_sequence *seq)
 {
-	struct nfsd4_cache_entry *entry = &resp->cstate.slot->sl_cache_entry;
+	struct nfsd4_slot *slot = resp->cstate.slot;
 	__be32 status;
 
-	dprintk("--> %s entry %p\n", __func__, entry);
+	dprintk("--> %s slot %p\n", __func__, slot);
 
-	/*
-	 * If this is just the sequence operation, we did not keep
-	 * a page in the cache entry because we can just use the
-	 * slot info stored in struct nfsd4_sequence that was checked
-	 * against the slot in nfsd4_sequence().
-	 *
-	 * This occurs when seq->cachethis is FALSE, or when the client
-	 * session inactivity timer fires and a solo sequence operation
-	 * is sent (lease renewal).
-	 */
-	if (seq && nfsd4_not_cached(resp)) {
-		seq->maxslots = resp->cstate.session->se_fchannel.maxreqs;
-		return nfs_ok;
-	}
+	/* Either returns 0 or nfserr_retry_uncached */
+	status = nfsd4_enc_sequence_replay(resp->rqstp->rq_argp, resp);
+	if (status == nfserr_retry_uncached_rep)
+		return status;
 
-	if (!nfsd41_copy_replay_data(resp, entry)) {
-		/*
-		 * Not enough room to use the replay rpc header, send the
-		 * cached header. Release all the allocated result pages.
-		 */
-		svc_free_res_pages(resp->rqstp);
-		nfsd4_copy_pages(resp->rqstp->rq_respages, entry->ce_respages,
-			entry->ce_resused);
-	} else {
-		/* Release all but the first allocated result page */
+	/* The sequence operation has been encoded, cstate->datap set. */
+	memcpy(resp->cstate.datap, slot->sl_data, slot->sl_datalen);
 
-		resp->rqstp->rq_resused--;
-		svc_free_res_pages(resp->rqstp);
-
-		nfsd4_copy_pages(&resp->rqstp->rq_respages[1],
-				 &entry->ce_respages[1],
-				 entry->ce_resused - 1);
-	}
-
-	resp->rqstp->rq_resused = entry->ce_resused;
-	resp->opcnt = entry->ce_opcnt;
-	resp->cstate.iovlen = entry->ce_datav.iov_len + entry->ce_rpchdrlen;
-	status = entry->ce_status;
+	resp->opcnt = slot->sl_opcnt;
+	resp->p = resp->cstate.datap + XDR_QUADLEN(slot->sl_datalen);
+	status = slot->sl_status;
 
 	return status;
 }
@@ -1194,13 +1119,15 @@
 	int status;
 	unsigned int		strhashval;
 	char			dname[HEXDIR_LEN];
+	char			addr_str[INET6_ADDRSTRLEN];
 	nfs4_verifier		verf = exid->verifier;
-	u32			ip_addr = svc_addr_in(rqstp)->sin_addr.s_addr;
+	struct sockaddr		*sa = svc_addr(rqstp);
 
+	rpc_ntop(sa, addr_str, sizeof(addr_str));
 	dprintk("%s rqstp=%p exid=%p clname.len=%u clname.data=%p "
-		" ip_addr=%u flags %x, spa_how %d\n",
+		"ip_addr=%s flags %x, spa_how %d\n",
 		__func__, rqstp, exid, exid->clname.len, exid->clname.data,
-		ip_addr, exid->flags, exid->spa_how);
+		addr_str, exid->flags, exid->spa_how);
 
 	if (!check_name(exid->clname) || (exid->flags & ~EXCHGID4_FLAG_MASK_A))
 		return nfserr_inval;
@@ -1281,28 +1208,23 @@
 
 out_new:
 	/* Normal case */
-	new = create_client(exid->clname, dname);
+	new = create_client(exid->clname, dname, rqstp, &verf);
 	if (new == NULL) {
-		status = nfserr_resource;
+		status = nfserr_serverfault;
 		goto out;
 	}
 
-	copy_verf(new, &verf);
-	copy_cred(&new->cl_cred, &rqstp->rq_cred);
-	new->cl_addr = ip_addr;
 	gen_clid(new);
-	gen_confirm(new);
 	add_to_unconfirmed(new, strhashval);
 out_copy:
 	exid->clientid.cl_boot = new->cl_clientid.cl_boot;
 	exid->clientid.cl_id = new->cl_clientid.cl_id;
 
-	new->cl_slot.sl_seqid = 0;
 	exid->seqid = 1;
 	nfsd4_set_ex_flags(new, exid);
 
 	dprintk("nfsd4_exchange_id seqid %d flags %x\n",
-		new->cl_slot.sl_seqid, new->cl_exchange_flags);
+		new->cl_cs_slot.sl_seqid, new->cl_exchange_flags);
 	status = nfs_ok;
 
 out:
@@ -1313,40 +1235,60 @@
 }
 
 static int
-check_slot_seqid(u32 seqid, struct nfsd4_slot *slot)
+check_slot_seqid(u32 seqid, u32 slot_seqid, int slot_inuse)
 {
-	dprintk("%s enter. seqid %d slot->sl_seqid %d\n", __func__, seqid,
-		slot->sl_seqid);
+	dprintk("%s enter. seqid %d slot_seqid %d\n", __func__, seqid,
+		slot_seqid);
 
 	/* The slot is in use, and no response has been sent. */
-	if (slot->sl_inuse) {
-		if (seqid == slot->sl_seqid)
+	if (slot_inuse) {
+		if (seqid == slot_seqid)
 			return nfserr_jukebox;
 		else
 			return nfserr_seq_misordered;
 	}
 	/* Normal */
-	if (likely(seqid == slot->sl_seqid + 1))
+	if (likely(seqid == slot_seqid + 1))
 		return nfs_ok;
 	/* Replay */
-	if (seqid == slot->sl_seqid)
+	if (seqid == slot_seqid)
 		return nfserr_replay_cache;
 	/* Wraparound */
-	if (seqid == 1 && (slot->sl_seqid + 1) == 0)
+	if (seqid == 1 && (slot_seqid + 1) == 0)
 		return nfs_ok;
 	/* Misordered replay or misordered new request */
 	return nfserr_seq_misordered;
 }
 
+/*
+ * Cache the create session result into the create session single DRC
+ * slot cache by saving the xdr structure. sl_seqid has been set.
+ * Do this for solo or embedded create session operations.
+ */
+static void
+nfsd4_cache_create_session(struct nfsd4_create_session *cr_ses,
+			   struct nfsd4_clid_slot *slot, int nfserr)
+{
+	slot->sl_status = nfserr;
+	memcpy(&slot->sl_cr_ses, cr_ses, sizeof(*cr_ses));
+}
+
+static __be32
+nfsd4_replay_create_session(struct nfsd4_create_session *cr_ses,
+			    struct nfsd4_clid_slot *slot)
+{
+	memcpy(cr_ses, &slot->sl_cr_ses, sizeof(*cr_ses));
+	return slot->sl_status;
+}
+
 __be32
 nfsd4_create_session(struct svc_rqst *rqstp,
 		     struct nfsd4_compound_state *cstate,
 		     struct nfsd4_create_session *cr_ses)
 {
-	u32 ip_addr = svc_addr_in(rqstp)->sin_addr.s_addr;
-	struct nfsd4_compoundres *resp = rqstp->rq_resp;
+	struct sockaddr *sa = svc_addr(rqstp);
 	struct nfs4_client *conf, *unconf;
-	struct nfsd4_slot *slot = NULL;
+	struct nfsd4_clid_slot *cs_slot = NULL;
 	int status = 0;
 
 	nfs4_lock_state();
@@ -1354,40 +1296,38 @@
 	conf = find_confirmed_client(&cr_ses->clientid);
 
 	if (conf) {
-		slot = &conf->cl_slot;
-		status = check_slot_seqid(cr_ses->seqid, slot);
+		cs_slot = &conf->cl_cs_slot;
+		status = check_slot_seqid(cr_ses->seqid, cs_slot->sl_seqid, 0);
 		if (status == nfserr_replay_cache) {
 			dprintk("Got a create_session replay! seqid= %d\n",
-				slot->sl_seqid);
-			cstate->slot = slot;
-			cstate->status = status;
+				cs_slot->sl_seqid);
 			/* Return the cached reply status */
-			status = nfsd4_replay_cache_entry(resp, NULL);
+			status = nfsd4_replay_create_session(cr_ses, cs_slot);
 			goto out;
-		} else if (cr_ses->seqid != conf->cl_slot.sl_seqid + 1) {
+		} else if (cr_ses->seqid != cs_slot->sl_seqid + 1) {
 			status = nfserr_seq_misordered;
 			dprintk("Sequence misordered!\n");
 			dprintk("Expected seqid= %d but got seqid= %d\n",
-				slot->sl_seqid, cr_ses->seqid);
+				cs_slot->sl_seqid, cr_ses->seqid);
 			goto out;
 		}
-		conf->cl_slot.sl_seqid++;
+		cs_slot->sl_seqid++;
 	} else if (unconf) {
 		if (!same_creds(&unconf->cl_cred, &rqstp->rq_cred) ||
-		    (ip_addr != unconf->cl_addr)) {
+		    !rpc_cmp_addr(sa, (struct sockaddr *) &unconf->cl_addr)) {
 			status = nfserr_clid_inuse;
 			goto out;
 		}
 
-		slot = &unconf->cl_slot;
-		status = check_slot_seqid(cr_ses->seqid, slot);
+		cs_slot = &unconf->cl_cs_slot;
+		status = check_slot_seqid(cr_ses->seqid, cs_slot->sl_seqid, 0);
 		if (status) {
 			/* an unconfirmed replay returns misordered */
 			status = nfserr_seq_misordered;
-			goto out;
+			goto out_cache;
 		}
 
-		slot->sl_seqid++; /* from 0 to 1 */
+		cs_slot->sl_seqid++; /* from 0 to 1 */
 		move_to_confirmed(unconf);
 
 		/*
@@ -1396,6 +1336,19 @@
 		cr_ses->flags &= ~SESSION4_PERSIST;
 		cr_ses->flags &= ~SESSION4_RDMA;
 
+		if (cr_ses->flags & SESSION4_BACK_CHAN) {
+			unconf->cl_cb_xprt = rqstp->rq_xprt;
+			svc_xprt_get(unconf->cl_cb_xprt);
+			rpc_copy_addr(
+				(struct sockaddr *)&unconf->cl_cb_conn.cb_addr,
+				sa);
+			unconf->cl_cb_conn.cb_addrlen = svc_addr_len(sa);
+			unconf->cl_cb_conn.cb_minorversion =
+				cstate->minorversion;
+			unconf->cl_cb_conn.cb_prog = cr_ses->callback_prog;
+			unconf->cl_cb_seq_nr = 1;
+			nfsd4_probe_callback(unconf);
+		}
 		conf = unconf;
 	} else {
 		status = nfserr_stale_clientid;
@@ -1408,12 +1361,11 @@
 
 	memcpy(cr_ses->sessionid.data, conf->cl_sessionid.data,
 	       NFS4_MAX_SESSIONID_LEN);
-	cr_ses->seqid = slot->sl_seqid;
+	cr_ses->seqid = cs_slot->sl_seqid;
 
-	slot->sl_inuse = true;
-	cstate->slot = slot;
-	/* Ensure a page is used for the cache */
-	slot->sl_cache_entry.ce_cachethis = 1;
+out_cache:
+	/* cache solo and embedded create sessions under the state lock */
+	nfsd4_cache_create_session(cr_ses, cs_slot, status);
 out:
 	nfs4_unlock_state();
 	dprintk("%s returns %d\n", __func__, ntohl(status));
@@ -1478,18 +1430,23 @@
 	if (seq->slotid >= session->se_fchannel.maxreqs)
 		goto out;
 
-	slot = &session->se_slots[seq->slotid];
+	slot = session->se_slots[seq->slotid];
 	dprintk("%s: slotid %d\n", __func__, seq->slotid);
 
-	status = check_slot_seqid(seq->seqid, slot);
+	/* We do not negotiate the number of slots yet, so set the
+	 * maxslots to the session maxreqs which is used to encode
+	 * sr_highest_slotid and the sr_target_slot id to maxslots */
+	seq->maxslots = session->se_fchannel.maxreqs;
+
+	status = check_slot_seqid(seq->seqid, slot->sl_seqid, slot->sl_inuse);
 	if (status == nfserr_replay_cache) {
 		cstate->slot = slot;
 		cstate->session = session;
 		/* Return the cached reply status and set cstate->status
-		 * for nfsd4_svc_encode_compoundres processing */
+		 * for nfsd4_proc_compound processing */
 		status = nfsd4_replay_cache_entry(resp, seq);
 		cstate->status = nfserr_replay_cache;
-		goto replay_cache;
+		goto out;
 	}
 	if (status)
 		goto out;
@@ -1497,23 +1454,23 @@
 	/* Success! bump slot seqid */
 	slot->sl_inuse = true;
 	slot->sl_seqid = seq->seqid;
-	slot->sl_cache_entry.ce_cachethis = seq->cachethis;
-	/* Always set the cache entry cachethis for solo sequence */
-	if (nfsd4_is_solo_sequence(resp))
-		slot->sl_cache_entry.ce_cachethis = 1;
+	slot->sl_cachethis = seq->cachethis;
 
 	cstate->slot = slot;
 	cstate->session = session;
 
-replay_cache:
-	/* Renew the clientid on success and on replay.
-	 * Hold a session reference until done processing the compound:
+	/* Hold a session reference until done processing the compound:
 	 * nfsd4_put_session called only if the cstate slot is set.
 	 */
-	renew_client(session->se_client);
 	nfsd4_get_session(session);
 out:
 	spin_unlock(&sessionid_lock);
+	/* Renew the clientid on success and on replay */
+	if (cstate->session) {
+		nfs4_lock_state();
+		renew_client(session->se_client);
+		nfs4_unlock_state();
+	}
 	dprintk("%s: return %d\n", __func__, ntohl(status));
 	return status;
 }
@@ -1522,7 +1479,7 @@
 nfsd4_setclientid(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 		  struct nfsd4_setclientid *setclid)
 {
-	struct sockaddr_in	*sin = svc_addr_in(rqstp);
+	struct sockaddr		*sa = svc_addr(rqstp);
 	struct xdr_netobj 	clname = { 
 		.len = setclid->se_namelen,
 		.data = setclid->se_name,
@@ -1531,7 +1488,6 @@
 	unsigned int 		strhashval;
 	struct nfs4_client	*conf, *unconf, *new;
 	__be32 			status;
-	char			*princ;
 	char                    dname[HEXDIR_LEN];
 	
 	if (!check_name(clname))
@@ -1554,8 +1510,11 @@
 		/* RFC 3530 14.2.33 CASE 0: */
 		status = nfserr_clid_inuse;
 		if (!same_creds(&conf->cl_cred, &rqstp->rq_cred)) {
-			dprintk("NFSD: setclientid: string in use by client"
-				" at %pI4\n", &conf->cl_addr);
+			char addr_str[INET6_ADDRSTRLEN];
+			rpc_ntop((struct sockaddr *) &conf->cl_addr, addr_str,
+				 sizeof(addr_str));
+			dprintk("NFSD: setclientid: string in use by client "
+				"at %s\n", addr_str);
 			goto out;
 		}
 	}
@@ -1573,7 +1532,7 @@
 		 */
 		if (unconf)
 			expire_client(unconf);
-		new = create_client(clname, dname);
+		new = create_client(clname, dname, rqstp, &clverifier);
 		if (new == NULL)
 			goto out;
 		gen_clid(new);
@@ -1590,7 +1549,7 @@
 			 */
 			expire_client(unconf);
 		}
-		new = create_client(clname, dname);
+		new = create_client(clname, dname, rqstp, &clverifier);
 		if (new == NULL)
 			goto out;
 		copy_clid(new, conf);
@@ -1600,7 +1559,7 @@
 		 * probable client reboot; state will be removed if
 		 * confirmed.
 		 */
-		new = create_client(clname, dname);
+		new = create_client(clname, dname, rqstp, &clverifier);
 		if (new == NULL)
 			goto out;
 		gen_clid(new);
@@ -1611,25 +1570,12 @@
 		 * confirmed.
 		 */
 		expire_client(unconf);
-		new = create_client(clname, dname);
+		new = create_client(clname, dname, rqstp, &clverifier);
 		if (new == NULL)
 			goto out;
 		gen_clid(new);
 	}
-	copy_verf(new, &clverifier);
-	new->cl_addr = sin->sin_addr.s_addr;
-	new->cl_flavor = rqstp->rq_flavor;
-	princ = svc_gss_principal(rqstp);
-	if (princ) {
-		new->cl_principal = kstrdup(princ, GFP_KERNEL);
-		if (new->cl_principal == NULL) {
-			free_client(new);
-			goto out;
-		}
-	}
-	copy_cred(&new->cl_cred, &rqstp->rq_cred);
-	gen_confirm(new);
-	gen_callback(new, setclid);
+	gen_callback(new, setclid, rpc_get_scope_id(sa));
 	add_to_unconfirmed(new, strhashval);
 	setclid->se_clientid.cl_boot = new->cl_clientid.cl_boot;
 	setclid->se_clientid.cl_id = new->cl_clientid.cl_id;
@@ -1651,7 +1597,7 @@
 			 struct nfsd4_compound_state *cstate,
 			 struct nfsd4_setclientid_confirm *setclientid_confirm)
 {
-	struct sockaddr_in *sin = svc_addr_in(rqstp);
+	struct sockaddr *sa = svc_addr(rqstp);
 	struct nfs4_client *conf, *unconf;
 	nfs4_verifier confirm = setclientid_confirm->sc_confirm; 
 	clientid_t * clid = &setclientid_confirm->sc_clientid;
@@ -1670,9 +1616,9 @@
 	unconf = find_unconfirmed_client(clid);
 
 	status = nfserr_clid_inuse;
-	if (conf && conf->cl_addr != sin->sin_addr.s_addr)
+	if (conf && !rpc_cmp_addr((struct sockaddr *) &conf->cl_addr, sa))
 		goto out;
-	if (unconf && unconf->cl_addr != sin->sin_addr.s_addr)
+	if (unconf && !rpc_cmp_addr((struct sockaddr *) &unconf->cl_addr, sa))
 		goto out;
 
 	/*
@@ -4072,7 +4018,7 @@
 
 /* initialization to perform when the nfsd service is started: */
 
-static void
+static int
 __nfs4_state_start(void)
 {
 	unsigned long grace_time;
@@ -4084,19 +4030,26 @@
 	printk(KERN_INFO "NFSD: starting %ld-second grace period\n",
 	       grace_time/HZ);
 	laundry_wq = create_singlethread_workqueue("nfsd4");
+	if (laundry_wq == NULL)
+		return -ENOMEM;
 	queue_delayed_work(laundry_wq, &laundromat_work, grace_time);
 	set_max_delegations();
+	return set_callback_cred();
 }
 
-void
+int
 nfs4_state_start(void)
 {
+	int ret;
+
 	if (nfs4_init)
-		return;
+		return 0;
 	nfsd4_load_reboot_recovery_data();
-	__nfs4_state_start();
+	ret = __nfs4_state_start();
+	if (ret)
+		return ret;
 	nfs4_init = 1;
-	return;
+	return 0;
 }
 
 time_t
diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index 2dcc7fe..0fbd50c 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -1599,7 +1599,8 @@
 static char *nfsd4_path(struct svc_rqst *rqstp, struct svc_export *exp, __be32 *stat)
 {
 	struct svc_fh tmp_fh;
-	char *path, *rootpath;
+	char *path = NULL, *rootpath;
+	size_t rootlen;
 
 	fh_init(&tmp_fh, NFS4_FHSIZE);
 	*stat = exp_pseudoroot(rqstp, &tmp_fh);
@@ -1609,14 +1610,18 @@
 
 	path = exp->ex_pathname;
 
-	if (strncmp(path, rootpath, strlen(rootpath))) {
+	rootlen = strlen(rootpath);
+	if (strncmp(path, rootpath, rootlen)) {
 		dprintk("nfsd: fs_locations failed;"
 			"%s is not contained in %s\n", path, rootpath);
 		*stat = nfserr_notsupp;
-		return NULL;
+		path = NULL;
+		goto out;
 	}
-
-	return path + strlen(rootpath);
+	path += rootlen;
+out:
+	fh_put(&tmp_fh);
+	return path;
 }
 
 /*
@@ -1793,11 +1798,6 @@
 				goto out_nfserr;
 		}
 	}
-	if (bmval0 & FATTR4_WORD0_FS_LOCATIONS) {
-		if (exp->ex_fslocs.locations == NULL) {
-			bmval0 &= ~FATTR4_WORD0_FS_LOCATIONS;
-		}
-	}
 	if ((buflen -= 16) < 0)
 		goto out_resource;
 
@@ -1825,8 +1825,6 @@
 			goto out_resource;
 		if (!aclsupport)
 			word0 &= ~FATTR4_WORD0_ACL;
-		if (!exp->ex_fslocs.locations)
-			word0 &= ~FATTR4_WORD0_FS_LOCATIONS;
 		if (!word2) {
 			WRITE32(2);
 			WRITE32(word0);
@@ -3064,6 +3062,7 @@
 	WRITE32(0);
 
 	ADJUST_ARGS();
+	resp->cstate.datap = p; /* DRC cache data pointer */
 	return 0;
 }
 
@@ -3166,7 +3165,7 @@
 		return status;
 
 	session = resp->cstate.session;
-	if (session == NULL || slot->sl_cache_entry.ce_cachethis == 0)
+	if (session == NULL || slot->sl_cachethis == 0)
 		return status;
 
 	if (resp->opcnt >= args->opcnt)
@@ -3291,6 +3290,7 @@
 	/*
 	 * All that remains is to write the tag and operation count...
 	 */
+	struct nfsd4_compound_state *cs = &resp->cstate;
 	struct kvec *iov;
 	p = resp->tagp;
 	*p++ = htonl(resp->taglen);
@@ -3304,17 +3304,11 @@
 		iov = &rqstp->rq_res.head[0];
 	iov->iov_len = ((char*)resp->p) - (char*)iov->iov_base;
 	BUG_ON(iov->iov_len > PAGE_SIZE);
-	if (nfsd4_has_session(&resp->cstate)) {
-		if (resp->cstate.status == nfserr_replay_cache &&
-				!nfsd4_not_cached(resp)) {
-			iov->iov_len = resp->cstate.iovlen;
-		} else {
-			nfsd4_store_cache_entry(resp);
-			dprintk("%s: SET SLOT STATE TO AVAILABLE\n", __func__);
-			resp->cstate.slot->sl_inuse = 0;
-		}
-		if (resp->cstate.session)
-			nfsd4_put_session(resp->cstate.session);
+	if (nfsd4_has_session(cs) && cs->status != nfserr_replay_cache) {
+		nfsd4_store_cache_entry(resp);
+		dprintk("%s: SET SLOT STATE TO AVAILABLE\n", __func__);
+		resp->cstate.slot->sl_inuse = false;
+		nfsd4_put_session(resp->cstate.session);
 	}
 	return 1;
 }
diff --git a/fs/nfsd/nfsctl.c b/fs/nfsd/nfsctl.c
index 7e906c5..00388d2 100644
--- a/fs/nfsd/nfsctl.c
+++ b/fs/nfsd/nfsctl.c
@@ -174,12 +174,13 @@
 };
 
 extern int nfsd_pool_stats_open(struct inode *inode, struct file *file);
+extern int nfsd_pool_stats_release(struct inode *inode, struct file *file);
 
 static struct file_operations pool_stats_operations = {
 	.open		= nfsd_pool_stats_open,
 	.read		= seq_read,
 	.llseek		= seq_lseek,
-	.release	= seq_release,
+	.release	= nfsd_pool_stats_release,
 	.owner		= THIS_MODULE,
 };
 
@@ -776,10 +777,7 @@
 		size -= len;
 		mesg += len;
 	}
-
-	mutex_unlock(&nfsd_mutex);
-	return (mesg-buf);
-
+	rv = mesg - buf;
 out_free:
 	kfree(nthreads);
 	mutex_unlock(&nfsd_mutex);
diff --git a/fs/nfsd/nfsfh.c b/fs/nfsd/nfsfh.c
index 8847f3f..01965b2 100644
--- a/fs/nfsd/nfsfh.c
+++ b/fs/nfsd/nfsfh.c
@@ -397,44 +397,51 @@
 		fh->ofh_dirino = 0;
 }
 
-__be32
-fh_compose(struct svc_fh *fhp, struct svc_export *exp, struct dentry *dentry,
-	   struct svc_fh *ref_fh)
+static bool is_root_export(struct svc_export *exp)
 {
-	/* ref_fh is a reference file handle.
-	 * if it is non-null and for the same filesystem, then we should compose
-	 * a filehandle which is of the same version, where possible.
-	 * Currently, that means that if ref_fh->fh_handle.fh_version == 0xca
-	 * Then create a 32byte filehandle using nfs_fhbase_old
-	 *
-	 */
+	return exp->ex_path.dentry == exp->ex_path.dentry->d_sb->s_root;
+}
 
+static struct super_block *exp_sb(struct svc_export *exp)
+{
+	return exp->ex_path.dentry->d_inode->i_sb;
+}
+
+static bool fsid_type_ok_for_exp(u8 fsid_type, struct svc_export *exp)
+{
+	switch (fsid_type) {
+	case FSID_DEV:
+		if (!old_valid_dev(exp_sb(exp)->s_dev))
+			return 0;
+		/* FALL THROUGH */
+	case FSID_MAJOR_MINOR:
+	case FSID_ENCODE_DEV:
+		return exp_sb(exp)->s_type->fs_flags & FS_REQUIRES_DEV;
+	case FSID_NUM:
+		return exp->ex_flags & NFSEXP_FSID;
+	case FSID_UUID8:
+	case FSID_UUID16:
+		if (!is_root_export(exp))
+			return 0;
+		/* fall through */
+	case FSID_UUID4_INUM:
+	case FSID_UUID16_INUM:
+		return exp->ex_uuid != NULL;
+	}
+	return 1;
+}
+
+
+static void set_version_and_fsid_type(struct svc_fh *fhp, struct svc_export *exp, struct svc_fh *ref_fh)
+{
 	u8 version;
-	u8 fsid_type = 0;
-	struct inode * inode = dentry->d_inode;
-	struct dentry *parent = dentry->d_parent;
-	__u32 *datap;
-	dev_t ex_dev = exp->ex_path.dentry->d_inode->i_sb->s_dev;
-	int root_export = (exp->ex_path.dentry == exp->ex_path.dentry->d_sb->s_root);
-
-	dprintk("nfsd: fh_compose(exp %02x:%02x/%ld %s/%s, ino=%ld)\n",
-		MAJOR(ex_dev), MINOR(ex_dev),
-		(long) exp->ex_path.dentry->d_inode->i_ino,
-		parent->d_name.name, dentry->d_name.name,
-		(inode ? inode->i_ino : 0));
-
-	/* Choose filehandle version and fsid type based on
-	 * the reference filehandle (if it is in the same export)
-	 * or the export options.
-	 */
- retry:
+	u8 fsid_type;
+retry:
 	version = 1;
 	if (ref_fh && ref_fh->fh_export == exp) {
 		version = ref_fh->fh_handle.fh_version;
 		fsid_type = ref_fh->fh_handle.fh_fsid_type;
 
-		if (ref_fh == fhp)
-			fh_put(ref_fh);
 		ref_fh = NULL;
 
 		switch (version) {
@@ -447,58 +454,66 @@
 			goto retry;
 		}
 
-		/* Need to check that this type works for this
-		 * export point.  As the fsid -> filesystem mapping
-		 * was guided by user-space, there is no guarantee
-		 * that the filesystem actually supports that fsid
-		 * type. If it doesn't we loop around again without
-		 * ref_fh set.
+		/*
+		 * As the fsid -> filesystem mapping was guided by
+		 * user-space, there is no guarantee that the filesystem
+		 * actually supports that fsid type. If it doesn't we
+		 * loop around again without ref_fh set.
 		 */
-		switch(fsid_type) {
-		case FSID_DEV:
-			if (!old_valid_dev(ex_dev))
-				goto retry;
-			/* FALL THROUGH */
-		case FSID_MAJOR_MINOR:
-		case FSID_ENCODE_DEV:
-			if (!(exp->ex_path.dentry->d_inode->i_sb->s_type->fs_flags
-			      & FS_REQUIRES_DEV))
-				goto retry;
-			break;
-		case FSID_NUM:
-			if (! (exp->ex_flags & NFSEXP_FSID))
-				goto retry;
-			break;
-		case FSID_UUID8:
-		case FSID_UUID16:
-			if (!root_export)
-				goto retry;
-			/* fall through */
-		case FSID_UUID4_INUM:
-		case FSID_UUID16_INUM:
-			if (exp->ex_uuid == NULL)
-				goto retry;
-			break;
-		}
+		if (!fsid_type_ok_for_exp(fsid_type, exp))
+			goto retry;
 	} else if (exp->ex_flags & NFSEXP_FSID) {
 		fsid_type = FSID_NUM;
 	} else if (exp->ex_uuid) {
 		if (fhp->fh_maxsize >= 64) {
-			if (root_export)
+			if (is_root_export(exp))
 				fsid_type = FSID_UUID16;
 			else
 				fsid_type = FSID_UUID16_INUM;
 		} else {
-			if (root_export)
+			if (is_root_export(exp))
 				fsid_type = FSID_UUID8;
 			else
 				fsid_type = FSID_UUID4_INUM;
 		}
-	} else if (!old_valid_dev(ex_dev))
+	} else if (!old_valid_dev(exp_sb(exp)->s_dev))
 		/* for newer device numbers, we must use a newer fsid format */
 		fsid_type = FSID_ENCODE_DEV;
 	else
 		fsid_type = FSID_DEV;
+	fhp->fh_handle.fh_version = version;
+	if (version)
+		fhp->fh_handle.fh_fsid_type = fsid_type;
+}
+
+__be32
+fh_compose(struct svc_fh *fhp, struct svc_export *exp, struct dentry *dentry,
+	   struct svc_fh *ref_fh)
+{
+	/* ref_fh is a reference file handle.
+	 * if it is non-null and for the same filesystem, then we should compose
+	 * a filehandle which is of the same version, where possible.
+	 * Currently, that means that if ref_fh->fh_handle.fh_version == 0xca
+	 * Then create a 32byte filehandle using nfs_fhbase_old
+	 *
+	 */
+
+	struct inode * inode = dentry->d_inode;
+	struct dentry *parent = dentry->d_parent;
+	__u32 *datap;
+	dev_t ex_dev = exp_sb(exp)->s_dev;
+
+	dprintk("nfsd: fh_compose(exp %02x:%02x/%ld %s/%s, ino=%ld)\n",
+		MAJOR(ex_dev), MINOR(ex_dev),
+		(long) exp->ex_path.dentry->d_inode->i_ino,
+		parent->d_name.name, dentry->d_name.name,
+		(inode ? inode->i_ino : 0));
+
+	/* Choose filehandle version and fsid type based on
+	 * the reference filehandle (if it is in the same export)
+	 * or the export options.
+	 */
+	 set_version_and_fsid_type(fhp, exp, ref_fh);
 
 	if (ref_fh == fhp)
 		fh_put(ref_fh);
@@ -516,7 +531,7 @@
 	fhp->fh_export = exp;
 	cache_get(&exp->h);
 
-	if (version == 0xca) {
+	if (fhp->fh_handle.fh_version == 0xca) {
 		/* old style filehandle please */
 		memset(&fhp->fh_handle.fh_base, 0, NFS_FHSIZE);
 		fhp->fh_handle.fh_size = NFS_FHSIZE;
@@ -530,22 +545,22 @@
 			_fh_update_old(dentry, exp, &fhp->fh_handle);
 	} else {
 		int len;
-		fhp->fh_handle.fh_version = 1;
 		fhp->fh_handle.fh_auth_type = 0;
 		datap = fhp->fh_handle.fh_auth+0;
-		fhp->fh_handle.fh_fsid_type = fsid_type;
-		mk_fsid(fsid_type, datap, ex_dev,
+		mk_fsid(fhp->fh_handle.fh_fsid_type, datap, ex_dev,
 			exp->ex_path.dentry->d_inode->i_ino,
 			exp->ex_fsid, exp->ex_uuid);
 
-		len = key_len(fsid_type);
+		len = key_len(fhp->fh_handle.fh_fsid_type);
 		datap += len/4;
 		fhp->fh_handle.fh_size = 4 + len;
 
 		if (inode)
 			_fh_update(fhp, exp, dentry);
-		if (fhp->fh_handle.fh_fileid_type == 255)
+		if (fhp->fh_handle.fh_fileid_type == 255) {
+			fh_put(fhp);
 			return nfserr_opnotsupp;
+		}
 	}
 
 	return 0;
@@ -639,8 +654,7 @@
 	case FSID_DEV:
 	case FSID_ENCODE_DEV:
 	case FSID_MAJOR_MINOR:
-		if (fhp->fh_export->ex_path.dentry->d_inode->i_sb->s_type->fs_flags
-		    & FS_REQUIRES_DEV)
+		if (exp_sb(fhp->fh_export)->s_type->fs_flags & FS_REQUIRES_DEV)
 			return FSIDSOURCE_DEV;
 		break;
 	case FSID_NUM:
diff --git a/fs/nfsd/nfssvc.c b/fs/nfsd/nfssvc.c
index 24d58ad..67ea83e 100644
--- a/fs/nfsd/nfssvc.c
+++ b/fs/nfsd/nfssvc.c
@@ -34,6 +34,7 @@
 #include <linux/nfsd/syscall.h>
 #include <linux/lockd/bind.h>
 #include <linux/nfsacl.h>
+#include <linux/seq_file.h>
 
 #define NFSDDBG_FACILITY	NFSDDBG_SVC
 
@@ -66,6 +67,16 @@
 DEFINE_MUTEX(nfsd_mutex);
 struct svc_serv 		*nfsd_serv;
 
+/*
+ * nfsd_drc_lock protects nfsd_drc_max_pages and nfsd_drc_pages_used.
+ * nfsd_drc_max_pages limits the total amount of memory available for
+ * version 4.1 DRC caches.
+ * nfsd_drc_pages_used tracks the current version 4.1 DRC memory usage.
+ */
+spinlock_t	nfsd_drc_lock;
+unsigned int	nfsd_drc_max_mem;
+unsigned int	nfsd_drc_mem_used;
+
 #if defined(CONFIG_NFSD_V2_ACL) || defined(CONFIG_NFSD_V3_ACL)
 static struct svc_stat	nfsd_acl_svcstats;
 static struct svc_version *	nfsd_acl_version[] = {
@@ -235,13 +246,12 @@
  */
 static void set_max_drc(void)
 {
-	/* The percent of nr_free_buffer_pages used by the V4.1 server DRC */
-	#define NFSD_DRC_SIZE_SHIFT	7
-	nfsd_serv->sv_drc_max_pages = nr_free_buffer_pages()
-						>> NFSD_DRC_SIZE_SHIFT;
-	nfsd_serv->sv_drc_pages_used = 0;
-	dprintk("%s svc_drc_max_pages %u\n", __func__,
-		nfsd_serv->sv_drc_max_pages);
+	#define NFSD_DRC_SIZE_SHIFT	10
+	nfsd_drc_max_mem = (nr_free_buffer_pages()
+					>> NFSD_DRC_SIZE_SHIFT) * PAGE_SIZE;
+	nfsd_drc_mem_used = 0;
+	spin_lock_init(&nfsd_drc_lock);
+	dprintk("%s nfsd_drc_max_mem %u \n", __func__, nfsd_drc_max_mem);
 }
 
 int nfsd_create_serv(void)
@@ -401,7 +411,9 @@
 	error =	nfsd_racache_init(2*nrservs);
 	if (error<0)
 		goto out;
-	nfs4_state_start();
+	error = nfs4_state_start();
+	if (error)
+		goto out;
 
 	nfsd_reset_versions();
 
@@ -569,10 +581,6 @@
 		+ rqstp->rq_res.head[0].iov_len;
 	rqstp->rq_res.head[0].iov_len += sizeof(__be32);
 
-	/* NFSv4.1 DRC requires statp */
-	if (rqstp->rq_vers == 4)
-		nfsd4_set_statp(rqstp, statp);
-
 	/* Now call the procedure handler, and encode NFS status. */
 	nfserr = proc->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
 	nfserr = map_new_errors(rqstp->rq_vers, nfserr);
@@ -607,7 +615,25 @@
 
 int nfsd_pool_stats_open(struct inode *inode, struct file *file)
 {
-	if (nfsd_serv == NULL)
+	int ret;
+	mutex_lock(&nfsd_mutex);
+	if (nfsd_serv == NULL) {
+		mutex_unlock(&nfsd_mutex);
 		return -ENODEV;
-	return svc_pool_stats_open(nfsd_serv, file);
+	}
+	/* bump up the psudo refcount while traversing */
+	svc_get(nfsd_serv);
+	ret = svc_pool_stats_open(nfsd_serv, file);
+	mutex_unlock(&nfsd_mutex);
+	return ret;
+}
+
+int nfsd_pool_stats_release(struct inode *inode, struct file *file)
+{
+	int ret = seq_release(inode, file);
+	mutex_lock(&nfsd_mutex);
+	/* this function really, really should have been called svc_put() */
+	svc_destroy(nfsd_serv);
+	mutex_unlock(&nfsd_mutex);
+	return ret;
 }
diff --git a/fs/nfsd/vfs.c b/fs/nfsd/vfs.c
index 8fa09bf..a293f02 100644
--- a/fs/nfsd/vfs.c
+++ b/fs/nfsd/vfs.c
@@ -89,6 +89,12 @@
 #define RAPARM_HASH_MASK	(RAPARM_HASH_SIZE-1)
 static struct raparm_hbucket	raparm_hash[RAPARM_HASH_SIZE];
 
+static inline int
+nfsd_v4client(struct svc_rqst *rq)
+{
+    return rq->rq_prog == NFS_PROGRAM && rq->rq_vers == 4;
+}
+
 /* 
  * Called from nfsd_lookup and encode_dirent. Check if we have crossed 
  * a mount point.
@@ -115,7 +121,8 @@
 		path_put(&path);
 		goto out;
 	}
-	if ((exp->ex_flags & NFSEXP_CROSSMOUNT) || EX_NOHIDE(exp2)) {
+	if (nfsd_v4client(rqstp) ||
+		(exp->ex_flags & NFSEXP_CROSSMOUNT) || EX_NOHIDE(exp2)) {
 		/* successfully crossed mount point */
 		/*
 		 * This is subtle: path.dentry is *not* on path.mnt
diff --git a/include/linux/lockd/lockd.h b/include/linux/lockd/lockd.h
index ccf2e0d..a34dea4 100644
--- a/include/linux/lockd/lockd.h
+++ b/include/linux/lockd/lockd.h
@@ -338,49 +338,6 @@
 	}
 }
 
-static inline int __nlm_cmp_addr4(const struct sockaddr *sap1,
-				  const struct sockaddr *sap2)
-{
-	const struct sockaddr_in *sin1 = (const struct sockaddr_in *)sap1;
-	const struct sockaddr_in *sin2 = (const struct sockaddr_in *)sap2;
-	return sin1->sin_addr.s_addr == sin2->sin_addr.s_addr;
-}
-
-#if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE)
-static inline int __nlm_cmp_addr6(const struct sockaddr *sap1,
-				  const struct sockaddr *sap2)
-{
-	const struct sockaddr_in6 *sin1 = (const struct sockaddr_in6 *)sap1;
-	const struct sockaddr_in6 *sin2 = (const struct sockaddr_in6 *)sap2;
-	return ipv6_addr_equal(&sin1->sin6_addr, &sin2->sin6_addr);
-}
-#else	/* !(CONFIG_IPV6 || CONFIG_IPV6_MODULE) */
-static inline int __nlm_cmp_addr6(const struct sockaddr *sap1,
-				  const struct sockaddr *sap2)
-{
-	return 0;
-}
-#endif	/* !(CONFIG_IPV6 || CONFIG_IPV6_MODULE) */
-
-/*
- * Compare two host addresses
- *
- * Return TRUE if the addresses are the same; otherwise FALSE.
- */
-static inline int nlm_cmp_addr(const struct sockaddr *sap1,
-			       const struct sockaddr *sap2)
-{
-	if (sap1->sa_family == sap2->sa_family) {
-		switch (sap1->sa_family) {
-		case AF_INET:
-			return __nlm_cmp_addr4(sap1, sap2);
-		case AF_INET6:
-			return __nlm_cmp_addr6(sap1, sap2);
-		}
-	}
-	return 0;
-}
-
 /*
  * Compare two NLM locks.
  * When the second lock is of type F_UNLCK, this acts like a wildcard.
diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h
index 33b2836..c4c0602 100644
--- a/include/linux/nfs4.h
+++ b/include/linux/nfs4.h
@@ -234,7 +234,7 @@
 Needs to be updated if more operations are defined in future.*/
 
 #define FIRST_NFS4_OP	OP_ACCESS
-#define LAST_NFS4_OP 	OP_RELEASE_LOCKOWNER
+#define LAST_NFS4_OP 	OP_RECLAIM_COMPLETE
 
 enum nfsstat4 {
 	NFS4_OK = 0,
diff --git a/include/linux/nfsd/nfsd.h b/include/linux/nfsd/nfsd.h
index 2b49d67..03bbe9039 100644
--- a/include/linux/nfsd/nfsd.h
+++ b/include/linux/nfsd/nfsd.h
@@ -56,6 +56,9 @@
 extern u32			nfsd_supported_minorversion;
 extern struct mutex		nfsd_mutex;
 extern struct svc_serv		*nfsd_serv;
+extern spinlock_t		nfsd_drc_lock;
+extern unsigned int		nfsd_drc_max_mem;
+extern unsigned int		nfsd_drc_mem_used;
 
 extern struct seq_operations nfs_exports_op;
 
@@ -163,7 +166,7 @@
 extern unsigned int max_delegations;
 int nfs4_state_init(void);
 void nfsd4_free_slabs(void);
-void nfs4_state_start(void);
+int nfs4_state_start(void);
 void nfs4_state_shutdown(void);
 time_t nfs4_lease_time(void);
 void nfs4_reset_lease(time_t leasetime);
@@ -171,7 +174,7 @@
 #else
 static inline int nfs4_state_init(void) { return 0; }
 static inline void nfsd4_free_slabs(void) { }
-static inline void nfs4_state_start(void) { }
+static inline int nfs4_state_start(void) { return 0; }
 static inline void nfs4_state_shutdown(void) { }
 static inline time_t nfs4_lease_time(void) { return 0; }
 static inline void nfs4_reset_lease(time_t leasetime) { }
diff --git a/include/linux/nfsd/state.h b/include/linux/nfsd/state.h
index 57ab2ed..b38d113 100644
--- a/include/linux/nfsd/state.h
+++ b/include/linux/nfsd/state.h
@@ -60,6 +60,12 @@
 #define si_stateownerid   si_opaque.so_stateownerid
 #define si_fileid         si_opaque.so_fileid
 
+struct nfsd4_cb_sequence {
+	/* args/res */
+	u32			cbs_minorversion;
+	struct nfs4_client	*cbs_clp;
+};
+
 struct nfs4_delegation {
 	struct list_head	dl_perfile;
 	struct list_head	dl_perclnt;
@@ -81,38 +87,35 @@
 /* client delegation callback info */
 struct nfs4_cb_conn {
 	/* SETCLIENTID info */
-	u32                     cb_addr;
-	unsigned short          cb_port;
+	struct sockaddr_storage	cb_addr;
+	size_t			cb_addrlen;
 	u32                     cb_prog;
 	u32			cb_minorversion;
 	u32                     cb_ident;	/* minorversion 0 only */
 	/* RPC client info */
 	atomic_t		cb_set;     /* successful CB_NULL call */
 	struct rpc_clnt *       cb_client;
-	struct rpc_cred	*	cb_cred;
 };
 
-/* Maximum number of slots per session. 128 is useful for long haul TCP */
-#define NFSD_MAX_SLOTS_PER_SESSION	128
-/* Maximum number of pages per slot cache entry */
-#define NFSD_PAGES_PER_SLOT	1
+/* Maximum number of slots per session. 160 is useful for long haul TCP */
+#define NFSD_MAX_SLOTS_PER_SESSION     160
 /* Maximum number of operations per session compound */
 #define NFSD_MAX_OPS_PER_COMPOUND	16
-
-struct nfsd4_cache_entry {
-	__be32		ce_status;
-	struct kvec	ce_datav; /* encoded NFSv4.1 data in rq_res.head[0] */
-	struct page	*ce_respages[NFSD_PAGES_PER_SLOT + 1];
-	int		ce_cachethis;
-	short		ce_resused;
-	int		ce_opcnt;
-	int		ce_rpchdrlen;
-};
+/* Maximum  session per slot cache size */
+#define NFSD_SLOT_CACHE_SIZE		1024
+/* Maximum number of NFSD_SLOT_CACHE_SIZE slots per session */
+#define NFSD_CACHE_SIZE_SLOTS_PER_SESSION	32
+#define NFSD_MAX_MEM_PER_SESSION  \
+		(NFSD_CACHE_SIZE_SLOTS_PER_SESSION * NFSD_SLOT_CACHE_SIZE)
 
 struct nfsd4_slot {
-	bool				sl_inuse;
-	u32				sl_seqid;
-	struct nfsd4_cache_entry	sl_cache_entry;
+	bool	sl_inuse;
+	bool	sl_cachethis;
+	u16	sl_opcnt;
+	u32	sl_seqid;
+	__be32	sl_status;
+	u32	sl_datalen;
+	char	sl_data[];
 };
 
 struct nfsd4_channel_attrs {
@@ -126,6 +129,25 @@
 	u32		rdma_attrs;
 };
 
+struct nfsd4_create_session {
+	clientid_t			clientid;
+	struct nfs4_sessionid		sessionid;
+	u32				seqid;
+	u32				flags;
+	struct nfsd4_channel_attrs	fore_channel;
+	struct nfsd4_channel_attrs	back_channel;
+	u32				callback_prog;
+	u32				uid;
+	u32				gid;
+};
+
+/* The single slot clientid cache structure */
+struct nfsd4_clid_slot {
+	u32				sl_seqid;
+	__be32				sl_status;
+	struct nfsd4_create_session	sl_cr_ses;
+};
+
 struct nfsd4_session {
 	struct kref		se_ref;
 	struct list_head	se_hash;	/* hash by sessionid */
@@ -135,7 +157,7 @@
 	struct nfs4_sessionid	se_sessionid;
 	struct nfsd4_channel_attrs se_fchannel;
 	struct nfsd4_channel_attrs se_bchannel;
-	struct nfsd4_slot	se_slots[];	/* forward channel slots */
+	struct nfsd4_slot	*se_slots[];	/* forward channel slots */
 };
 
 static inline void
@@ -180,7 +202,7 @@
 	char                    cl_recdir[HEXDIR_LEN]; /* recovery dir */
 	nfs4_verifier		cl_verifier; 	/* generated by client */
 	time_t                  cl_time;        /* time of last lease renewal */
-	__be32			cl_addr; 	/* client ipaddress */
+	struct sockaddr_storage	cl_addr; 	/* client ipaddress */
 	u32			cl_flavor;	/* setclientid pseudoflavor */
 	char			*cl_principal;	/* setclientid principal name */
 	struct svc_cred		cl_cred; 	/* setclientid principal */
@@ -192,9 +214,17 @@
 
 	/* for nfs41 */
 	struct list_head	cl_sessions;
-	struct nfsd4_slot	cl_slot;	/* create_session slot */
+	struct nfsd4_clid_slot	cl_cs_slot;	/* create_session slot */
 	u32			cl_exchange_flags;
 	struct nfs4_sessionid	cl_sessionid;
+
+	/* for nfs41 callbacks */
+	/* We currently support a single back channel with a single slot */
+	unsigned long		cl_cb_slot_busy;
+	u32			cl_cb_seq_nr;
+	struct svc_xprt		*cl_cb_xprt;	/* 4.1 callback transport */
+	struct rpc_wait_queue	cl_cb_waitq;	/* backchannel callers may */
+						/* wait here for slots */
 };
 
 /* struct nfs4_client_reset
@@ -345,6 +375,7 @@
 extern __be32 nfs4_check_open_reclaim(clientid_t *clid);
 extern void put_nfs4_client(struct nfs4_client *clp);
 extern void nfs4_free_stateowner(struct kref *kref);
+extern int set_callback_cred(void);
 extern void nfsd4_probe_callback(struct nfs4_client *clp);
 extern void nfsd4_cb_recall(struct nfs4_delegation *dp);
 extern void nfs4_put_delegation(struct nfs4_delegation *dp);
diff --git a/include/linux/nfsd/xdr4.h b/include/linux/nfsd/xdr4.h
index 2bacf75..73164c2 100644
--- a/include/linux/nfsd/xdr4.h
+++ b/include/linux/nfsd/xdr4.h
@@ -51,7 +51,7 @@
 	/* For sessions DRC */
 	struct nfsd4_session	*session;
 	struct nfsd4_slot	*slot;
-	__be32			*statp;
+	__be32			*datap;
 	size_t			iovlen;
 	u32			minorversion;
 	u32			status;
@@ -366,18 +366,6 @@
 	int		spa_how;
 };
 
-struct nfsd4_create_session {
-	clientid_t		clientid;
-	struct nfs4_sessionid	sessionid;
-	u32			seqid;
-	u32			flags;
-	struct nfsd4_channel_attrs fore_channel;
-	struct nfsd4_channel_attrs back_channel;
-	u32			callback_prog;
-	u32			uid;
-	u32			gid;
-};
-
 struct nfsd4_sequence {
 	struct nfs4_sessionid	sessionid;		/* request/response */
 	u32			seqid;			/* request/response */
@@ -479,13 +467,12 @@
 static inline bool nfsd4_is_solo_sequence(struct nfsd4_compoundres *resp)
 {
 	struct nfsd4_compoundargs *args = resp->rqstp->rq_argp;
-	return args->opcnt == 1;
+	return resp->opcnt == 1 && args->ops[0].opnum == OP_SEQUENCE;
 }
 
 static inline bool nfsd4_not_cached(struct nfsd4_compoundres *resp)
 {
-	return !resp->cstate.slot->sl_cache_entry.ce_cachethis ||
-			nfsd4_is_solo_sequence(resp);
+	return !resp->cstate.slot->sl_cachethis || nfsd4_is_solo_sequence(resp);
 }
 
 #define NFS4_SVC_XDRSIZE		sizeof(struct nfsd4_compoundargs)
diff --git a/include/linux/sunrpc/auth.h b/include/linux/sunrpc/auth.h
index 3f63218..996df4d 100644
--- a/include/linux/sunrpc/auth.h
+++ b/include/linux/sunrpc/auth.h
@@ -111,7 +111,7 @@
 	void			(*crdestroy)(struct rpc_cred *);
 
 	int			(*crmatch)(struct auth_cred *, struct rpc_cred *, int);
-	void			(*crbind)(struct rpc_task *, struct rpc_cred *);
+	void			(*crbind)(struct rpc_task *, struct rpc_cred *, int);
 	__be32 *		(*crmarshal)(struct rpc_task *, __be32 *);
 	int			(*crrefresh)(struct rpc_task *);
 	__be32 *		(*crvalidate)(struct rpc_task *, __be32 *);
@@ -140,7 +140,7 @@
 void			rpcauth_init_cred(struct rpc_cred *, const struct auth_cred *, struct rpc_auth *, const struct rpc_credops *);
 struct rpc_cred *	rpcauth_lookupcred(struct rpc_auth *, int);
 void			rpcauth_bindcred(struct rpc_task *, struct rpc_cred *, int);
-void			rpcauth_generic_bind_cred(struct rpc_task *, struct rpc_cred *);
+void			rpcauth_generic_bind_cred(struct rpc_task *, struct rpc_cred *, int);
 void			put_rpccred(struct rpc_cred *);
 void			rpcauth_unbindcred(struct rpc_task *);
 __be32 *		rpcauth_marshcred(struct rpc_task *, __be32 *);
diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index ab3f6e9..8ed9642 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -22,6 +22,7 @@
 #include <linux/sunrpc/timer.h>
 #include <asm/signal.h>
 #include <linux/path.h>
+#include <net/ipv6.h>
 
 struct rpc_inode;
 
@@ -113,6 +114,7 @@
 	rpc_authflavor_t	authflavor;
 	unsigned long		flags;
 	char			*client_name;
+	struct svc_xprt		*bc_xprt;	/* NFSv4.1 backchannel */
 };
 
 /* Values for "flags" field */
@@ -188,5 +190,117 @@
 #define IPV6_SCOPE_DELIMITER		'%'
 #define IPV6_SCOPE_ID_LEN		sizeof("%nnnnnnnnnn")
 
+static inline bool __rpc_cmp_addr4(const struct sockaddr *sap1,
+				   const struct sockaddr *sap2)
+{
+	const struct sockaddr_in *sin1 = (const struct sockaddr_in *)sap1;
+	const struct sockaddr_in *sin2 = (const struct sockaddr_in *)sap2;
+
+	return sin1->sin_addr.s_addr == sin2->sin_addr.s_addr;
+}
+
+static inline bool __rpc_copy_addr4(struct sockaddr *dst,
+				    const struct sockaddr *src)
+{
+	const struct sockaddr_in *ssin = (struct sockaddr_in *) src;
+	struct sockaddr_in *dsin = (struct sockaddr_in *) dst;
+
+	dsin->sin_family = ssin->sin_family;
+	dsin->sin_addr.s_addr = ssin->sin_addr.s_addr;
+	return true;
+}
+
+#if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE)
+static inline bool __rpc_cmp_addr6(const struct sockaddr *sap1,
+				   const struct sockaddr *sap2)
+{
+	const struct sockaddr_in6 *sin1 = (const struct sockaddr_in6 *)sap1;
+	const struct sockaddr_in6 *sin2 = (const struct sockaddr_in6 *)sap2;
+	return ipv6_addr_equal(&sin1->sin6_addr, &sin2->sin6_addr);
+}
+
+static inline bool __rpc_copy_addr6(struct sockaddr *dst,
+				    const struct sockaddr *src)
+{
+	const struct sockaddr_in6 *ssin6 = (const struct sockaddr_in6 *) src;
+	struct sockaddr_in6 *dsin6 = (struct sockaddr_in6 *) dst;
+
+	dsin6->sin6_family = ssin6->sin6_family;
+	ipv6_addr_copy(&dsin6->sin6_addr, &ssin6->sin6_addr);
+	return true;
+}
+#else	/* !(CONFIG_IPV6 || CONFIG_IPV6_MODULE) */
+static inline bool __rpc_cmp_addr6(const struct sockaddr *sap1,
+				   const struct sockaddr *sap2)
+{
+	return false;
+}
+
+static inline bool __rpc_copy_addr6(struct sockaddr *dst,
+				    const struct sockaddr *src)
+{
+	return false;
+}
+#endif	/* !(CONFIG_IPV6 || CONFIG_IPV6_MODULE) */
+
+/**
+ * rpc_cmp_addr - compare the address portion of two sockaddrs.
+ * @sap1: first sockaddr
+ * @sap2: second sockaddr
+ *
+ * Just compares the family and address portion. Ignores port, scope, etc.
+ * Returns true if the addrs are equal, false if they aren't.
+ */
+static inline bool rpc_cmp_addr(const struct sockaddr *sap1,
+				const struct sockaddr *sap2)
+{
+	if (sap1->sa_family == sap2->sa_family) {
+		switch (sap1->sa_family) {
+		case AF_INET:
+			return __rpc_cmp_addr4(sap1, sap2);
+		case AF_INET6:
+			return __rpc_cmp_addr6(sap1, sap2);
+		}
+	}
+	return false;
+}
+
+/**
+ * rpc_copy_addr - copy the address portion of one sockaddr to another
+ * @dst: destination sockaddr
+ * @src: source sockaddr
+ *
+ * Just copies the address portion and family. Ignores port, scope, etc.
+ * Caller is responsible for making certain that dst is large enough to hold
+ * the address in src. Returns true if address family is supported. Returns
+ * false otherwise.
+ */
+static inline bool rpc_copy_addr(struct sockaddr *dst,
+				 const struct sockaddr *src)
+{
+	switch (src->sa_family) {
+	case AF_INET:
+		return __rpc_copy_addr4(dst, src);
+	case AF_INET6:
+		return __rpc_copy_addr6(dst, src);
+	}
+	return false;
+}
+
+/**
+ * rpc_get_scope_id - return scopeid for a given sockaddr
+ * @sa: sockaddr to get scopeid from
+ *
+ * Returns the value of the sin6_scope_id for AF_INET6 addrs, or 0 if
+ * not an AF_INET6 address.
+ */
+static inline u32 rpc_get_scope_id(const struct sockaddr *sa)
+{
+	if (sa->sa_family != AF_INET6)
+		return 0;
+
+	return ((struct sockaddr_in6 *) sa)->sin6_scope_id;
+}
+
 #endif /* __KERNEL__ */
 #endif /* _LINUX_SUNRPC_CLNT_H */
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index ea80096..52e8cb0 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -94,8 +94,6 @@
 	struct module *		sv_module;	/* optional module to count when
 						 * adding threads */
 	svc_thread_fn		sv_function;	/* main function for threads */
-	unsigned int		sv_drc_max_pages; /* Total pages for DRC */
-	unsigned int		sv_drc_pages_used;/* DRC pages used */
 #if defined(CONFIG_NFS_V4_1)
 	struct list_head	sv_cb_list;	/* queue for callback requests
 						 * that arrive over the same
diff --git a/include/linux/sunrpc/svc_xprt.h b/include/linux/sunrpc/svc_xprt.h
index 2223ae0..5f4e18b 100644
--- a/include/linux/sunrpc/svc_xprt.h
+++ b/include/linux/sunrpc/svc_xprt.h
@@ -65,6 +65,7 @@
 	size_t			xpt_locallen;	/* length of address */
 	struct sockaddr_storage	xpt_remote;	/* remote peer's address */
 	size_t			xpt_remotelen;	/* length of address */
+	struct rpc_wait_queue	xpt_bc_pending;	/* backchannel wait queue */
 };
 
 int	svc_reg_xprt_class(struct svc_xprt_class *);
diff --git a/include/linux/sunrpc/svcsock.h b/include/linux/sunrpc/svcsock.h
index 04dba23..1b353a7 100644
--- a/include/linux/sunrpc/svcsock.h
+++ b/include/linux/sunrpc/svcsock.h
@@ -28,6 +28,7 @@
 	/* private TCP part */
 	u32			sk_reclen;	/* length of record */
 	u32			sk_tcplen;	/* current read length */
+	struct rpc_xprt		*sk_bc_xprt;	/* NFSv4.1 backchannel xprt */
 };
 
 /*
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index c090df4..6f9457a 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -124,6 +124,23 @@
 	void		(*print_stats)(struct rpc_xprt *xprt, struct seq_file *seq);
 };
 
+/*
+ * RPC transport identifiers
+ *
+ * To preserve compatibility with the historical use of raw IP protocol
+ * id's for transport selection, UDP and TCP identifiers are specified
+ * with the previous values. No such restriction exists for new transports,
+ * except that they may not collide with these values (17 and 6,
+ * respectively).
+ */
+#define XPRT_TRANSPORT_BC       (1 << 31)
+enum xprt_transports {
+	XPRT_TRANSPORT_UDP	= IPPROTO_UDP,
+	XPRT_TRANSPORT_TCP	= IPPROTO_TCP,
+	XPRT_TRANSPORT_BC_TCP	= IPPROTO_TCP | XPRT_TRANSPORT_BC,
+	XPRT_TRANSPORT_RDMA	= 256
+};
+
 struct rpc_xprt {
 	struct kref		kref;		/* Reference count */
 	struct rpc_xprt_ops *	ops;		/* transport methods */
@@ -179,6 +196,7 @@
 	spinlock_t		reserve_lock;	/* lock slot table */
 	u32			xid;		/* Next XID value to use */
 	struct rpc_task *	snd_task;	/* Task blocked in send */
+	struct svc_xprt		*bc_xprt;	/* NFSv4.1 backchannel */
 #if defined(CONFIG_NFS_V4_1)
 	struct svc_serv		*bc_serv;       /* The RPC service which will */
 						/* process the callback */
@@ -231,6 +249,7 @@
 	struct sockaddr *	srcaddr;	/* optional local address */
 	struct sockaddr *	dstaddr;	/* remote peer address */
 	size_t			addrlen;
+	struct svc_xprt		*bc_xprt;	/* NFSv4.1 backchannel */
 };
 
 struct xprt_class {
diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h
index 54a379c..c2f04e1 100644
--- a/include/linux/sunrpc/xprtrdma.h
+++ b/include/linux/sunrpc/xprtrdma.h
@@ -41,11 +41,6 @@
 #define _LINUX_SUNRPC_XPRTRDMA_H
 
 /*
- * RPC transport identifier for RDMA
- */
-#define XPRT_TRANSPORT_RDMA	256
-
-/*
  * rpcbind (v3+) RDMA netid.
  */
 #define RPCBIND_NETID_RDMA	"rdma"
diff --git a/include/linux/sunrpc/xprtsock.h b/include/linux/sunrpc/xprtsock.h
index c2a46c4..3f14a02 100644
--- a/include/linux/sunrpc/xprtsock.h
+++ b/include/linux/sunrpc/xprtsock.h
@@ -13,17 +13,6 @@
 void		cleanup_socket_xprt(void);
 
 /*
- * RPC transport identifiers for UDP, TCP
- *
- * To preserve compatibility with the historical use of raw IP protocol
- * id's for transport selection, these are specified with the previous
- * values. No such restriction exists for new transports, except that
- * they may not collide with these values (17 and 6, respectively).
- */
-#define XPRT_TRANSPORT_UDP	IPPROTO_UDP
-#define XPRT_TRANSPORT_TCP	IPPROTO_TCP
-
-/*
  * RPC slot table sizes for UDP, TCP transports
  */
 extern unsigned int xprt_udp_slot_table_entries;
diff --git a/net/sunrpc/auth.c b/net/sunrpc/auth.c
index 0c431c2..54a4e04 100644
--- a/net/sunrpc/auth.c
+++ b/net/sunrpc/auth.c
@@ -385,7 +385,7 @@
 EXPORT_SYMBOL_GPL(rpcauth_init_cred);
 
 void
-rpcauth_generic_bind_cred(struct rpc_task *task, struct rpc_cred *cred)
+rpcauth_generic_bind_cred(struct rpc_task *task, struct rpc_cred *cred, int lookupflags)
 {
 	task->tk_msg.rpc_cred = get_rpccred(cred);
 	dprintk("RPC: %5u holding %s cred %p\n", task->tk_pid,
@@ -394,7 +394,7 @@
 EXPORT_SYMBOL_GPL(rpcauth_generic_bind_cred);
 
 static void
-rpcauth_bind_root_cred(struct rpc_task *task)
+rpcauth_bind_root_cred(struct rpc_task *task, int lookupflags)
 {
 	struct rpc_auth *auth = task->tk_client->cl_auth;
 	struct auth_cred acred = {
@@ -405,7 +405,7 @@
 
 	dprintk("RPC: %5u looking up %s cred\n",
 		task->tk_pid, task->tk_client->cl_auth->au_ops->au_name);
-	ret = auth->au_ops->lookup_cred(auth, &acred, 0);
+	ret = auth->au_ops->lookup_cred(auth, &acred, lookupflags);
 	if (!IS_ERR(ret))
 		task->tk_msg.rpc_cred = ret;
 	else
@@ -413,14 +413,14 @@
 }
 
 static void
-rpcauth_bind_new_cred(struct rpc_task *task)
+rpcauth_bind_new_cred(struct rpc_task *task, int lookupflags)
 {
 	struct rpc_auth *auth = task->tk_client->cl_auth;
 	struct rpc_cred *ret;
 
 	dprintk("RPC: %5u looking up %s cred\n",
 		task->tk_pid, auth->au_ops->au_name);
-	ret = rpcauth_lookupcred(auth, 0);
+	ret = rpcauth_lookupcred(auth, lookupflags);
 	if (!IS_ERR(ret))
 		task->tk_msg.rpc_cred = ret;
 	else
@@ -430,12 +430,16 @@
 void
 rpcauth_bindcred(struct rpc_task *task, struct rpc_cred *cred, int flags)
 {
+	int lookupflags = 0;
+
+	if (flags & RPC_TASK_ASYNC)
+		lookupflags |= RPCAUTH_LOOKUP_NEW;
 	if (cred != NULL)
-		cred->cr_ops->crbind(task, cred);
+		cred->cr_ops->crbind(task, cred, lookupflags);
 	else if (flags & RPC_TASK_ROOTCREDS)
-		rpcauth_bind_root_cred(task);
+		rpcauth_bind_root_cred(task, lookupflags);
 	else
-		rpcauth_bind_new_cred(task);
+		rpcauth_bind_new_cred(task, lookupflags);
 }
 
 void
diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c
index 4028502..bf88bf8 100644
--- a/net/sunrpc/auth_generic.c
+++ b/net/sunrpc/auth_generic.c
@@ -55,13 +55,13 @@
 EXPORT_SYMBOL_GPL(rpc_lookup_machine_cred);
 
 static void
-generic_bind_cred(struct rpc_task *task, struct rpc_cred *cred)
+generic_bind_cred(struct rpc_task *task, struct rpc_cred *cred, int lookupflags)
 {
 	struct rpc_auth *auth = task->tk_client->cl_auth;
 	struct auth_cred *acred = &container_of(cred, struct generic_cred, gc_base)->acred;
 	struct rpc_cred *ret;
 
-	ret = auth->au_ops->lookup_cred(auth, acred, 0);
+	ret = auth->au_ops->lookup_cred(auth, acred, lookupflags);
 	if (!IS_ERR(ret))
 		task->tk_msg.rpc_cred = ret;
 	else
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index 2e6a148..f6c51e5 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -1374,8 +1374,10 @@
 		if (stat)
 			goto out_err;
 		break;
-	default:
-		goto out_err;
+	/*
+	 * For any other gc_svc value, svcauth_gss_accept() already set
+	 * the auth_error appropriately; just fall through:
+	 */
 	}
 
 out:
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index 45cdaff..d6eee29 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -103,23 +103,21 @@
 EXPORT_SYMBOL_GPL(sunrpc_cache_lookup);
 
 
-static void queue_loose(struct cache_detail *detail, struct cache_head *ch);
+static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch);
 
-static int cache_fresh_locked(struct cache_head *head, time_t expiry)
+static void cache_fresh_locked(struct cache_head *head, time_t expiry)
 {
 	head->expiry_time = expiry;
 	head->last_refresh = get_seconds();
-	return !test_and_set_bit(CACHE_VALID, &head->flags);
+	set_bit(CACHE_VALID, &head->flags);
 }
 
 static void cache_fresh_unlocked(struct cache_head *head,
-			struct cache_detail *detail, int new)
+				 struct cache_detail *detail)
 {
-	if (new)
-		cache_revisit_request(head);
 	if (test_and_clear_bit(CACHE_PENDING, &head->flags)) {
 		cache_revisit_request(head);
-		queue_loose(detail, head);
+		cache_dequeue(detail, head);
 	}
 }
 
@@ -132,7 +130,6 @@
 	 */
 	struct cache_head **head;
 	struct cache_head *tmp;
-	int is_new;
 
 	if (!test_bit(CACHE_VALID, &old->flags)) {
 		write_lock(&detail->hash_lock);
@@ -141,9 +138,9 @@
 				set_bit(CACHE_NEGATIVE, &old->flags);
 			else
 				detail->update(old, new);
-			is_new = cache_fresh_locked(old, new->expiry_time);
+			cache_fresh_locked(old, new->expiry_time);
 			write_unlock(&detail->hash_lock);
-			cache_fresh_unlocked(old, detail, is_new);
+			cache_fresh_unlocked(old, detail);
 			return old;
 		}
 		write_unlock(&detail->hash_lock);
@@ -167,11 +164,11 @@
 	*head = tmp;
 	detail->entries++;
 	cache_get(tmp);
-	is_new = cache_fresh_locked(tmp, new->expiry_time);
+	cache_fresh_locked(tmp, new->expiry_time);
 	cache_fresh_locked(old, 0);
 	write_unlock(&detail->hash_lock);
-	cache_fresh_unlocked(tmp, detail, is_new);
-	cache_fresh_unlocked(old, detail, 0);
+	cache_fresh_unlocked(tmp, detail);
+	cache_fresh_unlocked(old, detail);
 	cache_put(old, detail);
 	return tmp;
 }
@@ -184,6 +181,22 @@
 	return cd->cache_upcall(cd, h);
 }
 
+static inline int cache_is_valid(struct cache_detail *detail, struct cache_head *h)
+{
+	if (!test_bit(CACHE_VALID, &h->flags) ||
+	    h->expiry_time < get_seconds())
+		return -EAGAIN;
+	else if (detail->flush_time > h->last_refresh)
+		return -EAGAIN;
+	else {
+		/* entry is valid */
+		if (test_bit(CACHE_NEGATIVE, &h->flags))
+			return -ENOENT;
+		else
+			return 0;
+	}
+}
+
 /*
  * This is the generic cache management routine for all
  * the authentication caches.
@@ -192,8 +205,10 @@
  *
  *
  * Returns 0 if the cache_head can be used, or cache_puts it and returns
- * -EAGAIN if upcall is pending,
- * -ETIMEDOUT if upcall failed and should be retried,
+ * -EAGAIN if upcall is pending and request has been queued
+ * -ETIMEDOUT if upcall failed or request could not be queue or
+ *           upcall completed but item is still invalid (implying that
+ *           the cache item has been replaced with a newer one).
  * -ENOENT if cache entry was negative
  */
 int cache_check(struct cache_detail *detail,
@@ -203,17 +218,7 @@
 	long refresh_age, age;
 
 	/* First decide return status as best we can */
-	if (!test_bit(CACHE_VALID, &h->flags) ||
-	    h->expiry_time < get_seconds())
-		rv = -EAGAIN;
-	else if (detail->flush_time > h->last_refresh)
-		rv = -EAGAIN;
-	else {
-		/* entry is valid */
-		if (test_bit(CACHE_NEGATIVE, &h->flags))
-			rv = -ENOENT;
-		else rv = 0;
-	}
+	rv = cache_is_valid(detail, h);
 
 	/* now see if we want to start an upcall */
 	refresh_age = (h->expiry_time - h->last_refresh);
@@ -229,10 +234,11 @@
 			switch (cache_make_upcall(detail, h)) {
 			case -EINVAL:
 				clear_bit(CACHE_PENDING, &h->flags);
+				cache_revisit_request(h);
 				if (rv == -EAGAIN) {
 					set_bit(CACHE_NEGATIVE, &h->flags);
-					cache_fresh_unlocked(h, detail,
-					     cache_fresh_locked(h, get_seconds()+CACHE_NEW_EXPIRY));
+					cache_fresh_locked(h, get_seconds()+CACHE_NEW_EXPIRY);
+					cache_fresh_unlocked(h, detail);
 					rv = -ENOENT;
 				}
 				break;
@@ -245,10 +251,14 @@
 		}
 	}
 
-	if (rv == -EAGAIN)
-		if (cache_defer_req(rqstp, h) != 0)
-			rv = -ETIMEDOUT;
-
+	if (rv == -EAGAIN) {
+		if (cache_defer_req(rqstp, h) < 0) {
+			/* Request is not deferred */
+			rv = cache_is_valid(detail, h);
+			if (rv == -EAGAIN)
+				rv = -ETIMEDOUT;
+		}
+	}
 	if (rv)
 		cache_put(h, detail);
 	return rv;
@@ -396,7 +406,7 @@
 				)
 				continue;
 			if (test_and_clear_bit(CACHE_PENDING, &ch->flags))
-				queue_loose(current_detail, ch);
+				cache_dequeue(current_detail, ch);
 
 			if (atomic_read(&ch->ref.refcount) == 1)
 				break;
@@ -412,8 +422,10 @@
 		if (!ch)
 			current_index ++;
 		spin_unlock(&cache_list_lock);
-		if (ch)
+		if (ch) {
+			cache_revisit_request(ch);
 			cache_put(ch, d);
+		}
 	} else
 		spin_unlock(&cache_list_lock);
 
@@ -488,7 +500,7 @@
 
 static int cache_defer_req(struct cache_req *req, struct cache_head *item)
 {
-	struct cache_deferred_req *dreq;
+	struct cache_deferred_req *dreq, *discard;
 	int hash = DFR_HASH(item);
 
 	if (cache_defer_cnt >= DFR_MAX) {
@@ -496,11 +508,11 @@
 		 * or continue and drop the oldest below
 		 */
 		if (net_random()&1)
-			return -ETIMEDOUT;
+			return -ENOMEM;
 	}
 	dreq = req->defer(req);
 	if (dreq == NULL)
-		return -ETIMEDOUT;
+		return -ENOMEM;
 
 	dreq->item = item;
 
@@ -513,23 +525,24 @@
 	list_add(&dreq->hash, &cache_defer_hash[hash]);
 
 	/* it is in, now maybe clean up */
-	dreq = NULL;
+	discard = NULL;
 	if (++cache_defer_cnt > DFR_MAX) {
-		dreq = list_entry(cache_defer_list.prev,
-				  struct cache_deferred_req, recent);
-		list_del(&dreq->recent);
-		list_del(&dreq->hash);
+		discard = list_entry(cache_defer_list.prev,
+				     struct cache_deferred_req, recent);
+		list_del_init(&discard->recent);
+		list_del_init(&discard->hash);
 		cache_defer_cnt--;
 	}
 	spin_unlock(&cache_defer_lock);
 
-	if (dreq) {
+	if (discard)
 		/* there was one too many */
-		dreq->revisit(dreq, 1);
-	}
+		discard->revisit(discard, 1);
+
 	if (!test_bit(CACHE_PENDING, &item->flags)) {
 		/* must have just been validated... */
 		cache_revisit_request(item);
+		return -EAGAIN;
 	}
 	return 0;
 }
@@ -551,7 +564,7 @@
 			dreq = list_entry(lp, struct cache_deferred_req, hash);
 			lp = lp->next;
 			if (dreq->item == item) {
-				list_del(&dreq->hash);
+				list_del_init(&dreq->hash);
 				list_move(&dreq->recent, &pending);
 				cache_defer_cnt--;
 			}
@@ -577,7 +590,7 @@
 
 	list_for_each_entry_safe(dreq, tmp, &cache_defer_list, recent) {
 		if (dreq->owner == owner) {
-			list_del(&dreq->hash);
+			list_del_init(&dreq->hash);
 			list_move(&dreq->recent, &pending);
 			cache_defer_cnt--;
 		}
@@ -887,7 +900,7 @@
 
 
 
-static void queue_loose(struct cache_detail *detail, struct cache_head *ch)
+static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
 {
 	struct cache_queue *cq;
 	spin_lock(&queue_lock);
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index fac0ca9..a417d5a 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -288,6 +288,7 @@
 		.srcaddr = args->saddress,
 		.dstaddr = args->address,
 		.addrlen = args->addrsize,
+		.bc_xprt = args->bc_xprt,
 	};
 	char servername[48];
 
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index 8f459ab..cef74ba 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -21,6 +21,8 @@
 
 #include <linux/sunrpc/clnt.h>
 
+#include "sunrpc.h"
+
 #ifdef RPC_DEBUG
 #define RPCDBG_FACILITY		RPCDBG_SCHED
 #define RPC_TASK_MAGIC_ID	0xf00baa
@@ -711,11 +713,6 @@
 	__rpc_execute(container_of(work, struct rpc_task, u.tk_work));
 }
 
-struct rpc_buffer {
-	size_t	len;
-	char	data[];
-};
-
 /**
  * rpc_malloc - allocate an RPC buffer
  * @task: RPC task that will use this buffer
diff --git a/net/sunrpc/sunrpc.h b/net/sunrpc/sunrpc.h
index 5d9dd74..90c292e 100644
--- a/net/sunrpc/sunrpc.h
+++ b/net/sunrpc/sunrpc.h
@@ -27,11 +27,25 @@
 #ifndef _NET_SUNRPC_SUNRPC_H
 #define _NET_SUNRPC_SUNRPC_H
 
+#include <linux/net.h>
+
+/*
+ * Header for dynamically allocated rpc buffers.
+ */
+struct rpc_buffer {
+	size_t	len;
+	char	data[];
+};
+
 static inline int rpc_reply_expected(struct rpc_task *task)
 {
 	return (task->tk_msg.rpc_proc != NULL) &&
 		(task->tk_msg.rpc_proc->p_decode != NULL);
 }
 
+int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
+		    struct page *headpage, unsigned long headoffset,
+		    struct page *tailpage, unsigned long tailoffset);
+
 #endif /* _NET_SUNRPC_SUNRPC_H */
 
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index 27d4433..df124f7 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -160,6 +160,7 @@
 	mutex_init(&xprt->xpt_mutex);
 	spin_lock_init(&xprt->xpt_lock);
 	set_bit(XPT_BUSY, &xprt->xpt_flags);
+	rpc_init_wait_queue(&xprt->xpt_bc_pending, "xpt_bc_pending");
 }
 EXPORT_SYMBOL_GPL(svc_xprt_init);
 
@@ -710,10 +711,7 @@
 	spin_unlock_bh(&pool->sp_lock);
 
 	len = 0;
-	if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
-		dprintk("svc_recv: found XPT_CLOSE\n");
-		svc_delete_xprt(xprt);
-	} else if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
+	if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
 		struct svc_xprt *newxpt;
 		newxpt = xprt->xpt_ops->xpo_accept(xprt);
 		if (newxpt) {
@@ -739,7 +737,7 @@
 			svc_xprt_received(newxpt);
 		}
 		svc_xprt_received(xprt);
-	} else {
+	} else if (!test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
 		dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
 			rqstp, pool->sp_id, xprt,
 			atomic_read(&xprt->xpt_ref.refcount));
@@ -752,6 +750,11 @@
 		dprintk("svc: got len=%d\n", len);
 	}
 
+	if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) {
+		dprintk("svc_recv: found XPT_CLOSE\n");
+		svc_delete_xprt(xprt);
+	}
+
 	/* No data, incomplete (TCP) read, or accept() */
 	if (len == 0 || len == -EAGAIN) {
 		rqstp->rq_res.len = 0;
@@ -808,6 +811,7 @@
 	else
 		len = xprt->xpt_ops->xpo_sendto(rqstp);
 	mutex_unlock(&xprt->xpt_mutex);
+	rpc_wake_up(&xprt->xpt_bc_pending);
 	svc_xprt_release(rqstp);
 
 	if (len == -ECONNREFUSED || len == -ENOTCONN || len == -EAGAIN)
@@ -1166,11 +1170,6 @@
 
 	dprintk("svc_pool_stats_start, *pidx=%u\n", pidx);
 
-	lock_kernel();
-	/* bump up the pseudo refcount while traversing */
-	svc_get(serv);
-	unlock_kernel();
-
 	if (!pidx)
 		return SEQ_START_TOKEN;
 	return (pidx > serv->sv_nrpools ? NULL : &serv->sv_pools[pidx-1]);
@@ -1198,12 +1197,6 @@
 
 static void svc_pool_stats_stop(struct seq_file *m, void *p)
 {
-	struct svc_serv *serv = m->private;
-
-	lock_kernel();
-	/* this function really, really should have been called svc_put() */
-	svc_destroy(serv);
-	unlock_kernel();
 }
 
 static int svc_pool_stats_show(struct seq_file *m, void *p)
diff --git a/net/sunrpc/svcauth_unix.c b/net/sunrpc/svcauth_unix.c
index 6caffa3..117f68a 100644
--- a/net/sunrpc/svcauth_unix.c
+++ b/net/sunrpc/svcauth_unix.c
@@ -668,6 +668,7 @@
 	case 0:
 		*gip = ug->gi;
 		get_group_info(*gip);
+		cache_put(&ug->h, &unix_gid_cache);
 		return 0;
 	default:
 		return -EAGAIN;
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index 23128ee..ccc5e83 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -49,6 +49,7 @@
 #include <linux/sunrpc/msg_prot.h>
 #include <linux/sunrpc/svcsock.h>
 #include <linux/sunrpc/stats.h>
+#include <linux/sunrpc/xprt.h>
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
@@ -153,49 +154,27 @@
 }
 
 /*
- * Generic sendto routine
+ * send routine intended to be shared by the fore- and back-channel
  */
-static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
+int svc_send_common(struct socket *sock, struct xdr_buf *xdr,
+		    struct page *headpage, unsigned long headoffset,
+		    struct page *tailpage, unsigned long tailoffset)
 {
-	struct svc_sock	*svsk =
-		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
-	struct socket	*sock = svsk->sk_sock;
-	int		slen;
-	union {
-		struct cmsghdr	hdr;
-		long		all[SVC_PKTINFO_SPACE / sizeof(long)];
-	} buffer;
-	struct cmsghdr *cmh = &buffer.hdr;
-	int		len = 0;
 	int		result;
 	int		size;
 	struct page	**ppage = xdr->pages;
 	size_t		base = xdr->page_base;
 	unsigned int	pglen = xdr->page_len;
 	unsigned int	flags = MSG_MORE;
-	RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]);
+	int		slen;
+	int		len = 0;
 
 	slen = xdr->len;
 
-	if (rqstp->rq_prot == IPPROTO_UDP) {
-		struct msghdr msg = {
-			.msg_name	= &rqstp->rq_addr,
-			.msg_namelen	= rqstp->rq_addrlen,
-			.msg_control	= cmh,
-			.msg_controllen	= sizeof(buffer),
-			.msg_flags	= MSG_MORE,
-		};
-
-		svc_set_cmsg_data(rqstp, cmh);
-
-		if (sock_sendmsg(sock, &msg, 0) < 0)
-			goto out;
-	}
-
 	/* send head */
 	if (slen == xdr->head[0].iov_len)
 		flags = 0;
-	len = kernel_sendpage(sock, rqstp->rq_respages[0], 0,
+	len = kernel_sendpage(sock, headpage, headoffset,
 				  xdr->head[0].iov_len, flags);
 	if (len != xdr->head[0].iov_len)
 		goto out;
@@ -219,16 +198,58 @@
 		base = 0;
 		ppage++;
 	}
+
 	/* send tail */
 	if (xdr->tail[0].iov_len) {
-		result = kernel_sendpage(sock, rqstp->rq_respages[0],
-					     ((unsigned long)xdr->tail[0].iov_base)
-						& (PAGE_SIZE-1),
-					     xdr->tail[0].iov_len, 0);
-
+		result = kernel_sendpage(sock, tailpage, tailoffset,
+				   xdr->tail[0].iov_len, 0);
 		if (result > 0)
 			len += result;
 	}
+
+out:
+	return len;
+}
+
+
+/*
+ * Generic sendto routine
+ */
+static int svc_sendto(struct svc_rqst *rqstp, struct xdr_buf *xdr)
+{
+	struct svc_sock	*svsk =
+		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+	struct socket	*sock = svsk->sk_sock;
+	union {
+		struct cmsghdr	hdr;
+		long		all[SVC_PKTINFO_SPACE / sizeof(long)];
+	} buffer;
+	struct cmsghdr *cmh = &buffer.hdr;
+	int		len = 0;
+	unsigned long tailoff;
+	unsigned long headoff;
+	RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]);
+
+	if (rqstp->rq_prot == IPPROTO_UDP) {
+		struct msghdr msg = {
+			.msg_name	= &rqstp->rq_addr,
+			.msg_namelen	= rqstp->rq_addrlen,
+			.msg_control	= cmh,
+			.msg_controllen	= sizeof(buffer),
+			.msg_flags	= MSG_MORE,
+		};
+
+		svc_set_cmsg_data(rqstp, cmh);
+
+		if (sock_sendmsg(sock, &msg, 0) < 0)
+			goto out;
+	}
+
+	tailoff = ((unsigned long)xdr->tail[0].iov_base) & (PAGE_SIZE-1);
+	headoff = 0;
+	len = svc_send_common(sock, xdr, rqstp->rq_respages[0], headoff,
+			       rqstp->rq_respages[0], tailoff);
+
 out:
 	dprintk("svc: socket %p sendto([%p %Zu... ], %d) = %d (addr %s)\n",
 		svsk, xdr->head[0].iov_base, xdr->head[0].iov_len,
@@ -432,29 +453,49 @@
 }
 
 /*
+ * See net/ipv6/ip_sockglue.c : ip_cmsg_recv_pktinfo
+ */
+static int svc_udp_get_dest_address4(struct svc_rqst *rqstp,
+				     struct cmsghdr *cmh)
+{
+	struct in_pktinfo *pki = CMSG_DATA(cmh);
+	if (cmh->cmsg_type != IP_PKTINFO)
+		return 0;
+	rqstp->rq_daddr.addr.s_addr = pki->ipi_spec_dst.s_addr;
+	return 1;
+}
+
+/*
+ * See net/ipv6/datagram.c : datagram_recv_ctl
+ */
+static int svc_udp_get_dest_address6(struct svc_rqst *rqstp,
+				     struct cmsghdr *cmh)
+{
+	struct in6_pktinfo *pki = CMSG_DATA(cmh);
+	if (cmh->cmsg_type != IPV6_PKTINFO)
+		return 0;
+	ipv6_addr_copy(&rqstp->rq_daddr.addr6, &pki->ipi6_addr);
+	return 1;
+}
+
+/*
  * Copy the UDP datagram's destination address to the rqstp structure.
  * The 'destination' address in this case is the address to which the
  * peer sent the datagram, i.e. our local address. For multihomed
  * hosts, this can change from msg to msg. Note that only the IP
  * address changes, the port number should remain the same.
  */
-static void svc_udp_get_dest_address(struct svc_rqst *rqstp,
-				     struct cmsghdr *cmh)
+static int svc_udp_get_dest_address(struct svc_rqst *rqstp,
+				    struct cmsghdr *cmh)
 {
-	struct svc_sock *svsk =
-		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
-	switch (svsk->sk_sk->sk_family) {
-	case AF_INET: {
-		struct in_pktinfo *pki = CMSG_DATA(cmh);
-		rqstp->rq_daddr.addr.s_addr = pki->ipi_spec_dst.s_addr;
-		break;
-		}
-	case AF_INET6: {
-		struct in6_pktinfo *pki = CMSG_DATA(cmh);
-		ipv6_addr_copy(&rqstp->rq_daddr.addr6, &pki->ipi6_addr);
-		break;
-		}
+	switch (cmh->cmsg_level) {
+	case SOL_IP:
+		return svc_udp_get_dest_address4(rqstp, cmh);
+	case SOL_IPV6:
+		return svc_udp_get_dest_address6(rqstp, cmh);
 	}
+
+	return 0;
 }
 
 /*
@@ -531,16 +572,15 @@
 
 	rqstp->rq_prot = IPPROTO_UDP;
 
-	if (cmh->cmsg_level != IPPROTO_IP ||
-	    cmh->cmsg_type != IP_PKTINFO) {
+	if (!svc_udp_get_dest_address(rqstp, cmh)) {
 		if (net_ratelimit())
-			printk("rpcsvc: received unknown control message:"
-			       "%d/%d\n",
-			       cmh->cmsg_level, cmh->cmsg_type);
+			printk(KERN_WARNING
+				"svc: received unknown control message %d/%d; "
+				"dropping RPC reply datagram\n",
+					cmh->cmsg_level, cmh->cmsg_type);
 		skb_free_datagram(svsk->sk_sk, skb);
 		return 0;
 	}
-	svc_udp_get_dest_address(rqstp, cmh);
 
 	if (skb_is_nonlinear(skb)) {
 		/* we have to copy */
@@ -651,8 +691,7 @@
 
 static void svc_udp_init(struct svc_sock *svsk, struct svc_serv *serv)
 {
-	int one = 1;
-	mm_segment_t oldfs;
+	int err, level, optname, one = 1;
 
 	svc_xprt_init(&svc_udp_class, &svsk->sk_xprt, serv);
 	clear_bit(XPT_CACHE_AUTH, &svsk->sk_xprt.xpt_flags);
@@ -671,12 +710,22 @@
 	set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 	set_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags);
 
-	oldfs = get_fs();
-	set_fs(KERNEL_DS);
 	/* make sure we get destination address info */
-	svsk->sk_sock->ops->setsockopt(svsk->sk_sock, IPPROTO_IP, IP_PKTINFO,
-				       (char __user *)&one, sizeof(one));
-	set_fs(oldfs);
+	switch (svsk->sk_sk->sk_family) {
+	case AF_INET:
+		level = SOL_IP;
+		optname = IP_PKTINFO;
+		break;
+	case AF_INET6:
+		level = SOL_IPV6;
+		optname = IPV6_RECVPKTINFO;
+		break;
+	default:
+		BUG();
+	}
+	err = kernel_setsockopt(svsk->sk_sock, level, optname,
+					(char *)&one, sizeof(one));
+	dprintk("svc: kernel_setsockopt returned %d\n", err);
 }
 
 /*
@@ -826,21 +875,15 @@
 }
 
 /*
- * Receive data from a TCP socket.
+ * Receive data.
+ * If we haven't gotten the record length yet, get the next four bytes.
+ * Otherwise try to gobble up as much as possible up to the complete
+ * record length.
  */
-static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
+static int svc_tcp_recv_record(struct svc_sock *svsk, struct svc_rqst *rqstp)
 {
-	struct svc_sock	*svsk =
-		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
 	struct svc_serv	*serv = svsk->sk_xprt.xpt_server;
-	int		len;
-	struct kvec *vec;
-	int pnum, vlen;
-
-	dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
-		svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags),
-		test_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags),
-		test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags));
+	int len;
 
 	if (test_and_clear_bit(XPT_CHNGBUF, &svsk->sk_xprt.xpt_flags))
 		/* sndbuf needs to have room for one request
@@ -861,10 +904,6 @@
 
 	clear_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 
-	/* Receive data. If we haven't got the record length yet, get
-	 * the next four bytes. Otherwise try to gobble up as much as
-	 * possible up to the complete record length.
-	 */
 	if (svsk->sk_tcplen < sizeof(rpc_fraghdr)) {
 		int		want = sizeof(rpc_fraghdr) - svsk->sk_tcplen;
 		struct kvec	iov;
@@ -879,7 +918,7 @@
 			dprintk("svc: short recvfrom while reading record "
 				"length (%d of %d)\n", len, want);
 			svc_xprt_received(&svsk->sk_xprt);
-			return -EAGAIN; /* record header not complete */
+			goto err_again; /* record header not complete */
 		}
 
 		svsk->sk_reclen = ntohl(svsk->sk_reclen);
@@ -894,6 +933,7 @@
 					"per record not supported\n");
 			goto err_delete;
 		}
+
 		svsk->sk_reclen &= RPC_FRAGMENT_SIZE_MASK;
 		dprintk("svc: TCP record, %d bytes\n", svsk->sk_reclen);
 		if (svsk->sk_reclen > serv->sv_max_mesg) {
@@ -914,17 +954,121 @@
 		dprintk("svc: incomplete TCP record (%d of %d)\n",
 			len, svsk->sk_reclen);
 		svc_xprt_received(&svsk->sk_xprt);
-		return -EAGAIN;	/* record not complete */
+		goto err_again;	/* record not complete */
 	}
 	len = svsk->sk_reclen;
 	set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
 
+	return len;
+ error:
+	if (len == -EAGAIN) {
+		dprintk("RPC: TCP recv_record got EAGAIN\n");
+		svc_xprt_received(&svsk->sk_xprt);
+	}
+	return len;
+ err_delete:
+	set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
+ err_again:
+	return -EAGAIN;
+}
+
+static int svc_process_calldir(struct svc_sock *svsk, struct svc_rqst *rqstp,
+			       struct rpc_rqst **reqpp, struct kvec *vec)
+{
+	struct rpc_rqst *req = NULL;
+	u32 *p;
+	u32 xid;
+	u32 calldir;
+	int len;
+
+	len = svc_recvfrom(rqstp, vec, 1, 8);
+	if (len < 0)
+		goto error;
+
+	p = (u32 *)rqstp->rq_arg.head[0].iov_base;
+	xid = *p++;
+	calldir = *p;
+
+	if (calldir == 0) {
+		/* REQUEST is the most common case */
+		vec[0] = rqstp->rq_arg.head[0];
+	} else {
+		/* REPLY */
+		if (svsk->sk_bc_xprt)
+			req = xprt_lookup_rqst(svsk->sk_bc_xprt, xid);
+
+		if (!req) {
+			printk(KERN_NOTICE
+				"%s: Got unrecognized reply: "
+				"calldir 0x%x sk_bc_xprt %p xid %08x\n",
+				__func__, ntohl(calldir),
+				svsk->sk_bc_xprt, xid);
+			vec[0] = rqstp->rq_arg.head[0];
+			goto out;
+		}
+
+		memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
+		       sizeof(struct xdr_buf));
+		/* copy the xid and call direction */
+		memcpy(req->rq_private_buf.head[0].iov_base,
+		       rqstp->rq_arg.head[0].iov_base, 8);
+		vec[0] = req->rq_private_buf.head[0];
+	}
+ out:
+	vec[0].iov_base += 8;
+	vec[0].iov_len -= 8;
+	len = svsk->sk_reclen - 8;
+ error:
+	*reqpp = req;
+	return len;
+}
+
+/*
+ * Receive data from a TCP socket.
+ */
+static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
+{
+	struct svc_sock	*svsk =
+		container_of(rqstp->rq_xprt, struct svc_sock, sk_xprt);
+	struct svc_serv	*serv = svsk->sk_xprt.xpt_server;
+	int		len;
+	struct kvec *vec;
+	int pnum, vlen;
+	struct rpc_rqst *req = NULL;
+
+	dprintk("svc: tcp_recv %p data %d conn %d close %d\n",
+		svsk, test_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags),
+		test_bit(XPT_CONN, &svsk->sk_xprt.xpt_flags),
+		test_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags));
+
+	len = svc_tcp_recv_record(svsk, rqstp);
+	if (len < 0)
+		goto error;
+
 	vec = rqstp->rq_vec;
 	vec[0] = rqstp->rq_arg.head[0];
 	vlen = PAGE_SIZE;
+
+	/*
+	 * We have enough data for the whole tcp record. Let's try and read the
+	 * first 8 bytes to get the xid and the call direction. We can use this
+	 * to figure out if this is a call or a reply to a callback. If
+	 * sk_reclen is < 8 (xid and calldir), then this is a malformed packet.
+	 * In that case, don't bother with the calldir and just read the data.
+	 * It will be rejected in svc_process.
+	 */
+	if (len >= 8) {
+		len = svc_process_calldir(svsk, rqstp, &req, vec);
+		if (len < 0)
+			goto err_again;
+		vlen -= 8;
+	}
+
 	pnum = 1;
 	while (vlen < len) {
-		vec[pnum].iov_base = page_address(rqstp->rq_pages[pnum]);
+		vec[pnum].iov_base = (req) ?
+			page_address(req->rq_private_buf.pages[pnum - 1]) :
+			page_address(rqstp->rq_pages[pnum]);
 		vec[pnum].iov_len = PAGE_SIZE;
 		pnum++;
 		vlen += PAGE_SIZE;
@@ -934,8 +1078,18 @@
 	/* Now receive data */
 	len = svc_recvfrom(rqstp, vec, pnum, len);
 	if (len < 0)
-		goto error;
+		goto err_again;
 
+	/*
+	 * Account for the 8 bytes we read earlier
+	 */
+	len += 8;
+
+	if (req) {
+		xprt_complete_rqst(req->rq_task, len);
+		len = 0;
+		goto out;
+	}
 	dprintk("svc: TCP complete record (%d bytes)\n", len);
 	rqstp->rq_arg.len = len;
 	rqstp->rq_arg.page_base = 0;
@@ -949,6 +1103,7 @@
 	rqstp->rq_xprt_ctxt   = NULL;
 	rqstp->rq_prot	      = IPPROTO_TCP;
 
+out:
 	/* Reset TCP read info */
 	svsk->sk_reclen = 0;
 	svsk->sk_tcplen = 0;
@@ -960,21 +1115,19 @@
 
 	return len;
 
- err_delete:
-	set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
-	return -EAGAIN;
-
- error:
+err_again:
 	if (len == -EAGAIN) {
 		dprintk("RPC: TCP recvfrom got EAGAIN\n");
 		svc_xprt_received(&svsk->sk_xprt);
-	} else {
+		return len;
+	}
+error:
+	if (len != -EAGAIN) {
 		printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
 		       svsk->sk_xprt.xpt_server->sv_name, -len);
-		goto err_delete;
+		set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
 	}
-
-	return len;
+	return -EAGAIN;
 }
 
 /*
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index f412a85..fd46d42 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -832,6 +832,11 @@
 	spin_unlock_bh(&xprt->transport_lock);
 }
 
+static inline int xprt_has_timer(struct rpc_xprt *xprt)
+{
+	return xprt->idle_timeout != 0;
+}
+
 /**
  * xprt_prepare_transmit - reserve the transport before sending a request
  * @task: RPC task about to send a request
@@ -1013,7 +1018,7 @@
 	if (!list_empty(&req->rq_list))
 		list_del(&req->rq_list);
 	xprt->last_used = jiffies;
-	if (list_empty(&xprt->recv))
+	if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
 		mod_timer(&xprt->timer,
 				xprt->last_used + xprt->idle_timeout);
 	spin_unlock_bh(&xprt->transport_lock);
@@ -1082,8 +1087,11 @@
 #endif /* CONFIG_NFS_V4_1 */
 
 	INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
-	setup_timer(&xprt->timer, xprt_init_autodisconnect,
-			(unsigned long)xprt);
+	if (xprt_has_timer(xprt))
+		setup_timer(&xprt->timer, xprt_init_autodisconnect,
+			    (unsigned long)xprt);
+	else
+		init_timer(&xprt->timer);
 	xprt->last_used = jiffies;
 	xprt->cwnd = RPC_INITCWND;
 	xprt->bind_index = 0;
@@ -1102,7 +1110,6 @@
 
 	dprintk("RPC:       created transport %p with %u slots\n", xprt,
 			xprt->max_reqs);
-
 	return xprt;
 }
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 5151f9f..0cf5e8c 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -730,12 +730,12 @@
 		goto err;
 
 	mr = ib_alloc_fast_reg_mr(xprt->sc_pd, RPCSVC_MAXPAGES);
-	if (!mr)
+	if (IS_ERR(mr))
 		goto err_free_frmr;
 
 	pl = ib_alloc_fast_reg_page_list(xprt->sc_cm_id->device,
 					 RPCSVC_MAXPAGES);
-	if (!pl)
+	if (IS_ERR(pl))
 		goto err_free_mr;
 
 	frmr->mr = mr;
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 62438f3..bee4154 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -32,6 +32,7 @@
 #include <linux/tcp.h>
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/sched.h>
+#include <linux/sunrpc/svcsock.h>
 #include <linux/sunrpc/xprtsock.h>
 #include <linux/file.h>
 #ifdef CONFIG_NFS_V4_1
@@ -43,6 +44,7 @@
 #include <net/udp.h>
 #include <net/tcp.h>
 
+#include "sunrpc.h"
 /*
  * xprtsock tunables
  */
@@ -2098,6 +2100,134 @@
 			xprt->stat.bklog_u);
 }
 
+/*
+ * Allocate a bunch of pages for a scratch buffer for the rpc code. The reason
+ * we allocate pages instead doing a kmalloc like rpc_malloc is because we want
+ * to use the server side send routines.
+ */
+void *bc_malloc(struct rpc_task *task, size_t size)
+{
+	struct page *page;
+	struct rpc_buffer *buf;
+
+	BUG_ON(size > PAGE_SIZE - sizeof(struct rpc_buffer));
+	page = alloc_page(GFP_KERNEL);
+
+	if (!page)
+		return NULL;
+
+	buf = page_address(page);
+	buf->len = PAGE_SIZE;
+
+	return buf->data;
+}
+
+/*
+ * Free the space allocated in the bc_alloc routine
+ */
+void bc_free(void *buffer)
+{
+	struct rpc_buffer *buf;
+
+	if (!buffer)
+		return;
+
+	buf = container_of(buffer, struct rpc_buffer, data);
+	free_page((unsigned long)buf);
+}
+
+/*
+ * Use the svc_sock to send the callback. Must be called with svsk->sk_mutex
+ * held. Borrows heavily from svc_tcp_sendto and xs_tcp_send_request.
+ */
+static int bc_sendto(struct rpc_rqst *req)
+{
+	int len;
+	struct xdr_buf *xbufp = &req->rq_snd_buf;
+	struct rpc_xprt *xprt = req->rq_xprt;
+	struct sock_xprt *transport =
+				container_of(xprt, struct sock_xprt, xprt);
+	struct socket *sock = transport->sock;
+	unsigned long headoff;
+	unsigned long tailoff;
+
+	/*
+	 * Set up the rpc header and record marker stuff
+	 */
+	xs_encode_tcp_record_marker(xbufp);
+
+	tailoff = (unsigned long)xbufp->tail[0].iov_base & ~PAGE_MASK;
+	headoff = (unsigned long)xbufp->head[0].iov_base & ~PAGE_MASK;
+	len = svc_send_common(sock, xbufp,
+			      virt_to_page(xbufp->head[0].iov_base), headoff,
+			      xbufp->tail[0].iov_base, tailoff);
+
+	if (len != xbufp->len) {
+		printk(KERN_NOTICE "Error sending entire callback!\n");
+		len = -EAGAIN;
+	}
+
+	return len;
+}
+
+/*
+ * The send routine. Borrows from svc_send
+ */
+static int bc_send_request(struct rpc_task *task)
+{
+	struct rpc_rqst *req = task->tk_rqstp;
+	struct svc_xprt	*xprt;
+	struct svc_sock         *svsk;
+	u32                     len;
+
+	dprintk("sending request with xid: %08x\n", ntohl(req->rq_xid));
+	/*
+	 * Get the server socket associated with this callback xprt
+	 */
+	xprt = req->rq_xprt->bc_xprt;
+	svsk = container_of(xprt, struct svc_sock, sk_xprt);
+
+	/*
+	 * Grab the mutex to serialize data as the connection is shared
+	 * with the fore channel
+	 */
+	if (!mutex_trylock(&xprt->xpt_mutex)) {
+		rpc_sleep_on(&xprt->xpt_bc_pending, task, NULL);
+		if (!mutex_trylock(&xprt->xpt_mutex))
+			return -EAGAIN;
+		rpc_wake_up_queued_task(&xprt->xpt_bc_pending, task);
+	}
+	if (test_bit(XPT_DEAD, &xprt->xpt_flags))
+		len = -ENOTCONN;
+	else
+		len = bc_sendto(req);
+	mutex_unlock(&xprt->xpt_mutex);
+
+	if (len > 0)
+		len = 0;
+
+	return len;
+}
+
+/*
+ * The close routine. Since this is client initiated, we do nothing
+ */
+
+static void bc_close(struct rpc_xprt *xprt)
+{
+	return;
+}
+
+/*
+ * The xprt destroy routine. Again, because this connection is client
+ * initiated, we do nothing
+ */
+
+static void bc_destroy(struct rpc_xprt *xprt)
+{
+	return;
+}
+
 static struct rpc_xprt_ops xs_udp_ops = {
 	.set_buffer_size	= xs_udp_set_buffer_size,
 	.reserve_xprt		= xprt_reserve_xprt_cong,
@@ -2134,6 +2264,22 @@
 	.print_stats		= xs_tcp_print_stats,
 };
 
+/*
+ * The rpc_xprt_ops for the server backchannel
+ */
+
+static struct rpc_xprt_ops bc_tcp_ops = {
+	.reserve_xprt		= xprt_reserve_xprt,
+	.release_xprt		= xprt_release_xprt,
+	.buf_alloc		= bc_malloc,
+	.buf_free		= bc_free,
+	.send_request		= bc_send_request,
+	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
+	.close			= bc_close,
+	.destroy		= bc_destroy,
+	.print_stats		= xs_tcp_print_stats,
+};
+
 static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args,
 				      unsigned int slot_table_size)
 {
@@ -2322,11 +2468,93 @@
 	return ERR_PTR(-EINVAL);
 }
 
+/**
+ * xs_setup_bc_tcp - Set up transport to use a TCP backchannel socket
+ * @args: rpc transport creation arguments
+ *
+ */
+static struct rpc_xprt *xs_setup_bc_tcp(struct xprt_create *args)
+{
+	struct sockaddr *addr = args->dstaddr;
+	struct rpc_xprt *xprt;
+	struct sock_xprt *transport;
+	struct svc_sock *bc_sock;
+
+	if (!args->bc_xprt)
+		ERR_PTR(-EINVAL);
+
+	xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries);
+	if (IS_ERR(xprt))
+		return xprt;
+	transport = container_of(xprt, struct sock_xprt, xprt);
+
+	xprt->prot = IPPROTO_TCP;
+	xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32);
+	xprt->max_payload = RPC_MAX_FRAGMENT_SIZE;
+	xprt->timeout = &xs_tcp_default_timeout;
+
+	/* backchannel */
+	xprt_set_bound(xprt);
+	xprt->bind_timeout = 0;
+	xprt->connect_timeout = 0;
+	xprt->reestablish_timeout = 0;
+	xprt->idle_timeout = 0;
+
+	/*
+	 * The backchannel uses the same socket connection as the
+	 * forechannel
+	 */
+	xprt->bc_xprt = args->bc_xprt;
+	bc_sock = container_of(args->bc_xprt, struct svc_sock, sk_xprt);
+	bc_sock->sk_bc_xprt = xprt;
+	transport->sock = bc_sock->sk_sock;
+	transport->inet = bc_sock->sk_sk;
+
+	xprt->ops = &bc_tcp_ops;
+
+	switch (addr->sa_family) {
+	case AF_INET:
+		xs_format_peer_addresses(xprt, "tcp",
+					 RPCBIND_NETID_TCP);
+		break;
+	case AF_INET6:
+		xs_format_peer_addresses(xprt, "tcp",
+				   RPCBIND_NETID_TCP6);
+		break;
+	default:
+		kfree(xprt);
+		return ERR_PTR(-EAFNOSUPPORT);
+	}
+
+	if (xprt_bound(xprt))
+		dprintk("RPC:       set up xprt to %s (port %s) via %s\n",
+				xprt->address_strings[RPC_DISPLAY_ADDR],
+				xprt->address_strings[RPC_DISPLAY_PORT],
+				xprt->address_strings[RPC_DISPLAY_PROTO]);
+	else
+		dprintk("RPC:       set up xprt to %s (autobind) via %s\n",
+				xprt->address_strings[RPC_DISPLAY_ADDR],
+				xprt->address_strings[RPC_DISPLAY_PROTO]);
+
+	/*
+	 * Since we don't want connections for the backchannel, we set
+	 * the xprt status to connected
+	 */
+	xprt_set_connected(xprt);
+
+
+	if (try_module_get(THIS_MODULE))
+		return xprt;
+	kfree(xprt->slot);
+	kfree(xprt);
+	return ERR_PTR(-EINVAL);
+}
+
 static struct xprt_class	xs_udp_transport = {
 	.list		= LIST_HEAD_INIT(xs_udp_transport.list),
 	.name		= "udp",
 	.owner		= THIS_MODULE,
-	.ident		= IPPROTO_UDP,
+	.ident		= XPRT_TRANSPORT_UDP,
 	.setup		= xs_setup_udp,
 };
 
@@ -2334,10 +2562,18 @@
 	.list		= LIST_HEAD_INIT(xs_tcp_transport.list),
 	.name		= "tcp",
 	.owner		= THIS_MODULE,
-	.ident		= IPPROTO_TCP,
+	.ident		= XPRT_TRANSPORT_TCP,
 	.setup		= xs_setup_tcp,
 };
 
+static struct xprt_class	xs_bc_tcp_transport = {
+	.list		= LIST_HEAD_INIT(xs_bc_tcp_transport.list),
+	.name		= "tcp NFSv4.1 backchannel",
+	.owner		= THIS_MODULE,
+	.ident		= XPRT_TRANSPORT_BC_TCP,
+	.setup		= xs_setup_bc_tcp,
+};
+
 /**
  * init_socket_xprt - set up xprtsock's sysctls, register with RPC client
  *
@@ -2351,6 +2587,7 @@
 
 	xprt_register_transport(&xs_udp_transport);
 	xprt_register_transport(&xs_tcp_transport);
+	xprt_register_transport(&xs_bc_tcp_transport);
 
 	return 0;
 }
@@ -2370,6 +2607,7 @@
 
 	xprt_unregister_transport(&xs_udp_transport);
 	xprt_unregister_transport(&xs_tcp_transport);
+	xprt_unregister_transport(&xs_bc_tcp_transport);
 }
 
 static int param_set_uint_minmax(const char *val, struct kernel_param *kp,