Merge tag 'rxrpc-next-20170829' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc: Miscellany

Here are a number of patches that make some changes/fixes and add a couple
of extensions to AF_RXRPC for kernel services to use.  The changes and
fixes are:

 (1) Use time64_t rather than u32 outside of protocol or
     UAPI-representative structures.

 (2) Use the correct time stamp when loading a key from an XDR-encoded
     Kerberos 5 key.

 (3) Fix IPv6 support.

 (4) Fix some places where the error code is being incorrectly made
     positive before returning.

 (5) Remove some white space.

And the extensions:

 (6) Add an end-of-Tx phase notification, thereby allowing kAFS to
     transition the state on its own call record at the correct point,
     rather than having to do it in advance and risk non-completion of the
     call in the wrong state.

 (7) Allow a kernel client call to be retried if it fails on a network
     error, thereby making it possible for kAFS to iterate over a number of
     IP addresses without having to reload the Tx queue and re-encrypt data
     each time.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/Documentation/networking/rxrpc.txt b/Documentation/networking/rxrpc.txt
index 8c70ba5..8106201 100644
--- a/Documentation/networking/rxrpc.txt
+++ b/Documentation/networking/rxrpc.txt
@@ -818,10 +818,15 @@
 
  (*) Send data through a call.
 
+	typedef void (*rxrpc_notify_end_tx_t)(struct sock *sk,
+					      unsigned long user_call_ID,
+					      struct sk_buff *skb);
+
 	int rxrpc_kernel_send_data(struct socket *sock,
 				   struct rxrpc_call *call,
 				   struct msghdr *msg,
-				   size_t len);
+				   size_t len,
+				   rxrpc_notify_end_tx_t notify_end_rx);
 
      This is used to supply either the request part of a client call or the
      reply part of a server call.  msg.msg_iovlen and msg.msg_iov specify the
@@ -832,6 +837,11 @@
      The msg must not specify a destination address, control data or any flags
      other than MSG_MORE.  len is the total amount of data to transmit.
 
+     notify_end_rx can be NULL or it can be used to specify a function to be
+     called when the call changes state to end the Tx phase.  This function is
+     called with the call-state spinlock held to prevent any reply or final ACK
+     from being delivered first.
+
  (*) Receive data from a call.
 
 	int rxrpc_kernel_recv_data(struct socket *sock,
@@ -965,6 +975,51 @@
      size should be set when the call is begun.  tx_total_len may not be less
      than zero.
 
+ (*) Check to see the completion state of a call so that the caller can assess
+     whether it needs to be retried.
+
+	enum rxrpc_call_completion {
+		RXRPC_CALL_SUCCEEDED,
+		RXRPC_CALL_REMOTELY_ABORTED,
+		RXRPC_CALL_LOCALLY_ABORTED,
+		RXRPC_CALL_LOCAL_ERROR,
+		RXRPC_CALL_NETWORK_ERROR,
+	};
+
+	int rxrpc_kernel_check_call(struct socket *sock, struct rxrpc_call *call,
+				    enum rxrpc_call_completion *_compl,
+				    u32 *_abort_code);
+
+     On return, -EINPROGRESS will be returned if the call is still ongoing; if
+     it is finished, *_compl will be set to indicate the manner of completion,
+     *_abort_code will be set to any abort code that occurred.  0 will be
+     returned on a successful completion, -ECONNABORTED will be returned if the
+     client failed due to a remote abort and anything else will return an
+     appropriate error code.
+
+     The caller should look at this information to decide if it's worth
+     retrying the call.
+
+ (*) Retry a client call.
+
+	int rxrpc_kernel_retry_call(struct socket *sock,
+				    struct rxrpc_call *call,
+				    struct sockaddr_rxrpc *srx,
+				    struct key *key);
+
+     This attempts to partially reinitialise a call and submit it again whilst
+     reusing the original call's Tx queue to avoid the need to repackage and
+     re-encrypt the data to be sent.  call indicates the call to retry, srx the
+     new address to send it to and key the encryption key to use for signing or
+     encrypting the packets.
+
+     For this to work, the first Tx data packet must still be in the transmit
+     queue, and currently this is only permitted for local and network errors
+     and the call must not have been aborted.  Any partially constructed Tx
+     packet is left as is and can continue being filled afterwards.
+
+     It returns 0 if the call was requeued and an error otherwise.
+
 
 =======================
 CONFIGURABLE PARAMETERS
diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c
index 1074304..0bf191f 100644
--- a/fs/afs/rxrpc.c
+++ b/fs/afs/rxrpc.c
@@ -292,6 +292,19 @@ static void afs_load_bvec(struct afs_call *call, struct msghdr *msg,
 }
 
 /*
+ * Advance the AFS call state when the RxRPC call ends the transmit phase.
+ */
+static void afs_notify_end_request_tx(struct sock *sock,
+				      struct rxrpc_call *rxcall,
+				      unsigned long call_user_ID)
+{
+	struct afs_call *call = (struct afs_call *)call_user_ID;
+
+	if (call->state == AFS_CALL_REQUESTING)
+		call->state = AFS_CALL_AWAIT_REPLY;
+}
+
+/*
  * attach the data from a bunch of pages on an inode to a call
  */
 static int afs_send_pages(struct afs_call *call, struct msghdr *msg)
@@ -310,14 +323,8 @@ static int afs_send_pages(struct afs_call *call, struct msghdr *msg)
 		bytes = msg->msg_iter.count;
 		nr = msg->msg_iter.nr_segs;
 
-		/* Have to change the state *before* sending the last
-		 * packet as RxRPC might give us the reply before it
-		 * returns from sending the request.
-		 */
-		if (first + nr - 1 >= last)
-			call->state = AFS_CALL_AWAIT_REPLY;
-		ret = rxrpc_kernel_send_data(afs_socket, call->rxcall,
-					     msg, bytes);
+		ret = rxrpc_kernel_send_data(afs_socket, call->rxcall, msg,
+					     bytes, afs_notify_end_request_tx);
 		for (loop = 0; loop < nr; loop++)
 			put_page(bv[loop].bv_page);
 		if (ret < 0)
@@ -409,7 +416,8 @@ int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
 	if (!call->send_pages)
 		call->state = AFS_CALL_AWAIT_REPLY;
 	ret = rxrpc_kernel_send_data(afs_socket, rxcall,
-				     &msg, call->request_size);
+				     &msg, call->request_size,
+				     afs_notify_end_request_tx);
 	if (ret < 0)
 		goto error_do_abort;
 
@@ -741,6 +749,20 @@ static int afs_deliver_cm_op_id(struct afs_call *call)
 }
 
 /*
+ * Advance the AFS call state when an RxRPC service call ends the transmit
+ * phase.
+ */
+static void afs_notify_end_reply_tx(struct sock *sock,
+				    struct rxrpc_call *rxcall,
+				    unsigned long call_user_ID)
+{
+	struct afs_call *call = (struct afs_call *)call_user_ID;
+
+	if (call->state == AFS_CALL_REPLYING)
+		call->state = AFS_CALL_AWAIT_ACK;
+}
+
+/*
  * send an empty reply
  */
 void afs_send_empty_reply(struct afs_call *call)
@@ -759,7 +781,8 @@ void afs_send_empty_reply(struct afs_call *call)
 	msg.msg_flags		= 0;
 
 	call->state = AFS_CALL_AWAIT_ACK;
-	switch (rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, 0)) {
+	switch (rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, 0,
+				       afs_notify_end_reply_tx)) {
 	case 0:
 		_leave(" [replied]");
 		return;
@@ -797,7 +820,8 @@ void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
 	msg.msg_flags		= 0;
 
 	call->state = AFS_CALL_AWAIT_ACK;
-	n = rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, len);
+	n = rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, len,
+				   afs_notify_end_reply_tx);
 	if (n >= 0) {
 		/* Success */
 		_leave(" [replied]");
diff --git a/include/keys/rxrpc-type.h b/include/keys/rxrpc-type.h
index 5de0673..8cf829d 100644
--- a/include/keys/rxrpc-type.h
+++ b/include/keys/rxrpc-type.h
@@ -127,4 +127,27 @@ struct rxrpc_key_data_v1 {
 #define AFSTOKEN_K5_ADDRESSES_MAX	16	/* max K5 addresses */
 #define AFSTOKEN_K5_AUTHDATA_MAX	16	/* max K5 pieces of auth data */
 
+/*
+ * Truncate a time64_t to the range from 1970 to 2106 as in the network
+ * protocol.
+ */
+static inline u32 rxrpc_time64_to_u32(time64_t time)
+{
+	if (time < 0)
+		return 0;
+
+	if (time > UINT_MAX)
+		return UINT_MAX;
+
+	return (u32)time;
+}
+
+/*
+ * Extend u32 back to time64_t using the same 1970-2106 range.
+ */
+static inline time64_t rxrpc_u32_to_time64(u32 time)
+{
+	return (time64_t)time;
+}
+
 #endif /* _KEYS_RXRPC_TYPE_H */
diff --git a/include/net/af_rxrpc.h b/include/net/af_rxrpc.h
index c172709..3ac7915 100644
--- a/include/net/af_rxrpc.h
+++ b/include/net/af_rxrpc.h
@@ -19,8 +19,22 @@ struct sock;
 struct socket;
 struct rxrpc_call;
 
+/*
+ * Call completion condition (state == RXRPC_CALL_COMPLETE).
+ */
+enum rxrpc_call_completion {
+	RXRPC_CALL_SUCCEEDED,		/* - Normal termination */
+	RXRPC_CALL_REMOTELY_ABORTED,	/* - call aborted by peer */
+	RXRPC_CALL_LOCALLY_ABORTED,	/* - call aborted locally on error or close */
+	RXRPC_CALL_LOCAL_ERROR,		/* - call failed due to local error */
+	RXRPC_CALL_NETWORK_ERROR,	/* - call terminated by network error */
+	NR__RXRPC_CALL_COMPLETIONS
+};
+
 typedef void (*rxrpc_notify_rx_t)(struct sock *, struct rxrpc_call *,
 				  unsigned long);
+typedef void (*rxrpc_notify_end_tx_t)(struct sock *, struct rxrpc_call *,
+				      unsigned long);
 typedef void (*rxrpc_notify_new_call_t)(struct sock *, struct rxrpc_call *,
 					unsigned long);
 typedef void (*rxrpc_discard_new_call_t)(struct rxrpc_call *, unsigned long);
@@ -37,7 +51,8 @@ struct rxrpc_call *rxrpc_kernel_begin_call(struct socket *,
 					   gfp_t,
 					   rxrpc_notify_rx_t);
 int rxrpc_kernel_send_data(struct socket *, struct rxrpc_call *,
-			   struct msghdr *, size_t);
+			   struct msghdr *, size_t,
+			   rxrpc_notify_end_tx_t);
 int rxrpc_kernel_recv_data(struct socket *, struct rxrpc_call *,
 			   void *, size_t, size_t *, bool, u32 *);
 bool rxrpc_kernel_abort_call(struct socket *, struct rxrpc_call *,
@@ -48,5 +63,9 @@ void rxrpc_kernel_get_peer(struct socket *, struct rxrpc_call *,
 int rxrpc_kernel_charge_accept(struct socket *, rxrpc_notify_rx_t,
 			       rxrpc_user_attach_call_t, unsigned long, gfp_t);
 void rxrpc_kernel_set_tx_length(struct socket *, struct rxrpc_call *, s64);
+int rxrpc_kernel_retry_call(struct socket *, struct rxrpc_call *,
+			    struct sockaddr_rxrpc *, struct key *);
+int rxrpc_kernel_check_call(struct socket *, struct rxrpc_call *,
+			    enum rxrpc_call_completion *, u32 *);
 
 #endif /* _NET_RXRPC_H */
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index a2ad448..fb17552 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -337,6 +337,75 @@ void rxrpc_kernel_end_call(struct socket *sock, struct rxrpc_call *call)
 EXPORT_SYMBOL(rxrpc_kernel_end_call);
 
 /**
+ * rxrpc_kernel_check_call - Check a call's state
+ * @sock: The socket the call is on
+ * @call: The call to check
+ * @_compl: Where to store the completion state
+ * @_abort_code: Where to store any abort code
+ *
+ * Allow a kernel service to query the state of a call and find out the manner
+ * of its termination if it has completed.  Returns -EINPROGRESS if the call is
+ * still going, 0 if the call finished successfully, -ECONNABORTED if the call
+ * was aborted and an appropriate error if the call failed in some other way.
+ */
+int rxrpc_kernel_check_call(struct socket *sock, struct rxrpc_call *call,
+			    enum rxrpc_call_completion *_compl, u32 *_abort_code)
+{
+	if (call->state != RXRPC_CALL_COMPLETE)
+		return -EINPROGRESS;
+	smp_rmb();
+	*_compl = call->completion;
+	*_abort_code = call->abort_code;
+	return call->error;
+}
+EXPORT_SYMBOL(rxrpc_kernel_check_call);
+
+/**
+ * rxrpc_kernel_retry_call - Allow a kernel service to retry a call
+ * @sock: The socket the call is on
+ * @call: The call to retry
+ * @srx: The address of the peer to contact
+ * @key: The security context to use (defaults to socket setting)
+ *
+ * Allow a kernel service to try resending a client call that failed due to a
+ * network error to a new address.  The Tx queue is maintained intact, thereby
+ * relieving the need to re-encrypt any request data that has already been
+ * buffered.
+ */
+int rxrpc_kernel_retry_call(struct socket *sock, struct rxrpc_call *call,
+			    struct sockaddr_rxrpc *srx, struct key *key)
+{
+	struct rxrpc_conn_parameters cp;
+	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
+	int ret;
+
+	_enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
+
+	if (!key)
+		key = rx->key;
+	if (key && !key->payload.data[0])
+		key = NULL; /* a no-security key */
+
+	memset(&cp, 0, sizeof(cp));
+	cp.local		= rx->local;
+	cp.key			= key;
+	cp.security_level	= 0;
+	cp.exclusive		= false;
+	cp.service_id		= srx->srx_service;
+
+	mutex_lock(&call->user_mutex);
+
+	ret = rxrpc_prepare_call_for_retry(rx, call);
+	if (ret == 0)
+		ret = rxrpc_retry_client_call(rx, call, &cp, srx, GFP_KERNEL);
+
+	mutex_unlock(&call->user_mutex);
+	_leave(" = %d", ret);
+	return ret;
+}
+EXPORT_SYMBOL(rxrpc_kernel_retry_call);
+
+/**
  * rxrpc_kernel_new_call_notification - Get notifications of new calls
  * @sock: The socket to intercept received messages on
  * @notify_new_call: Function to be called when new calls appear
@@ -591,13 +660,13 @@ static int rxrpc_getsockopt(struct socket *sock, int level, int optname,
 			    char __user *optval, int __user *_optlen)
 {
 	int optlen;
-	
+
 	if (level != SOL_RXRPC)
 		return -EOPNOTSUPP;
 
 	if (get_user(optlen, _optlen))
 		return -EFAULT;
-	
+
 	switch (optname) {
 	case RXRPC_SUPPORTED_CMSG:
 		if (optlen < sizeof(int))
@@ -606,7 +675,7 @@ static int rxrpc_getsockopt(struct socket *sock, int level, int optname,
 		    put_user(sizeof(int), _optlen))
 			return -EFAULT;
 		return 0;
-		
+
 	default:
 		return -EOPNOTSUPP;
 	}
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 8c0db9b..ea5600b 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -445,6 +445,7 @@ enum rxrpc_call_flag {
 	RXRPC_CALL_EXPOSED,		/* The call was exposed to the world */
 	RXRPC_CALL_RX_LAST,		/* Received the last packet (at rxtx_top) */
 	RXRPC_CALL_TX_LAST,		/* Last packet in Tx buffer (at rxtx_top) */
+	RXRPC_CALL_TX_LASTQ,		/* Last packet has been queued */
 	RXRPC_CALL_SEND_PING,		/* A ping will need to be sent */
 	RXRPC_CALL_PINGING,		/* Ping in process */
 	RXRPC_CALL_RETRANS_TIMEOUT,	/* Retransmission due to timeout occurred */
@@ -482,18 +483,6 @@ enum rxrpc_call_state {
 };
 
 /*
- * Call completion condition (state == RXRPC_CALL_COMPLETE).
- */
-enum rxrpc_call_completion {
-	RXRPC_CALL_SUCCEEDED,		/* - Normal termination */
-	RXRPC_CALL_REMOTELY_ABORTED,	/* - call aborted by peer */
-	RXRPC_CALL_LOCALLY_ABORTED,	/* - call aborted locally on error or close */
-	RXRPC_CALL_LOCAL_ERROR,		/* - call failed due to local error */
-	RXRPC_CALL_NETWORK_ERROR,	/* - call terminated by network error */
-	NR__RXRPC_CALL_COMPLETIONS
-};
-
-/*
  * Call Tx congestion management modes.
  */
 enum rxrpc_congest_mode {
@@ -687,9 +676,15 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
 					 struct rxrpc_conn_parameters *,
 					 struct sockaddr_rxrpc *,
 					 unsigned long, s64, gfp_t);
+int rxrpc_retry_client_call(struct rxrpc_sock *,
+			    struct rxrpc_call *,
+			    struct rxrpc_conn_parameters *,
+			    struct sockaddr_rxrpc *,
+			    gfp_t);
 void rxrpc_incoming_call(struct rxrpc_sock *, struct rxrpc_call *,
 			 struct sk_buff *);
 void rxrpc_release_call(struct rxrpc_sock *, struct rxrpc_call *);
+int rxrpc_prepare_call_for_retry(struct rxrpc_sock *, struct rxrpc_call *);
 void rxrpc_release_calls_on_socket(struct rxrpc_sock *);
 bool __rxrpc_queue_call(struct rxrpc_call *);
 bool rxrpc_queue_call(struct rxrpc_call *);
@@ -830,7 +825,6 @@ void rxrpc_process_connection(struct work_struct *);
  */
 extern unsigned int rxrpc_connection_expiry;
 
-int rxrpc_extract_addr_from_skb(struct sockaddr_rxrpc *, struct sk_buff *);
 struct rxrpc_connection *rxrpc_alloc_connection(gfp_t);
 struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *,
 						   struct sk_buff *);
@@ -894,7 +888,7 @@ extern struct key_type key_type_rxrpc_s;
 
 int rxrpc_request_key(struct rxrpc_sock *, char __user *, int);
 int rxrpc_server_keyring(struct rxrpc_sock *, char __user *, int);
-int rxrpc_get_server_data_key(struct rxrpc_connection *, const void *, time_t,
+int rxrpc_get_server_data_key(struct rxrpc_connection *, const void *, time64_t,
 			      u32);
 
 /*
@@ -1060,7 +1054,8 @@ static inline void rxrpc_sysctl_exit(void) {}
 /*
  * utils.c
  */
-int rxrpc_extract_addr_from_skb(struct sockaddr_rxrpc *, struct sk_buff *);
+int rxrpc_extract_addr_from_skb(struct rxrpc_local *, struct sockaddr_rxrpc *,
+				struct sk_buff *);
 
 static inline bool before(u32 seq1, u32 seq2)
 {
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index ec3383f..cbd1701 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -277,7 +277,7 @@ static struct rxrpc_call *rxrpc_alloc_incoming_call(struct rxrpc_sock *rx,
 		 * anticipation - and to save on stack space.
 		 */
 		xpeer = b->peer_backlog[peer_tail];
-		if (rxrpc_extract_addr_from_skb(&xpeer->srx, skb) < 0)
+		if (rxrpc_extract_addr_from_skb(local, &xpeer->srx, skb) < 0)
 			return NULL;
 
 		peer = rxrpc_lookup_incoming_peer(local, xpeer);
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index d7809a0..fcdd655 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -269,11 +269,6 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 	trace_rxrpc_call(call, rxrpc_call_connected, atomic_read(&call->usage),
 			 here, NULL);
 
-	spin_lock_bh(&call->conn->params.peer->lock);
-	hlist_add_head(&call->error_link,
-		       &call->conn->params.peer->error_targets);
-	spin_unlock_bh(&call->conn->params.peer->lock);
-
 	rxrpc_start_call_timer(call);
 
 	_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
@@ -304,6 +299,48 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx,
 }
 
 /*
+ * Retry a call to a new address.  It is expected that the Tx queue of the call
+ * will contain data previously packaged for an old call.
+ */
+int rxrpc_retry_client_call(struct rxrpc_sock *rx,
+			    struct rxrpc_call *call,
+			    struct rxrpc_conn_parameters *cp,
+			    struct sockaddr_rxrpc *srx,
+			    gfp_t gfp)
+{
+	const void *here = __builtin_return_address(0);
+	int ret;
+
+	/* Set up or get a connection record and set the protocol parameters,
+	 * including channel number and call ID.
+	 */
+	ret = rxrpc_connect_call(call, cp, srx, gfp);
+	if (ret < 0)
+		goto error;
+
+	trace_rxrpc_call(call, rxrpc_call_connected, atomic_read(&call->usage),
+			 here, NULL);
+
+	rxrpc_start_call_timer(call);
+
+	_net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
+
+	if (!test_and_set_bit(RXRPC_CALL_EV_RESEND, &call->events))
+		rxrpc_queue_call(call);
+
+	_leave(" = 0");
+	return 0;
+
+error:
+	rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
+				  RX_CALL_DEAD, ret);
+	trace_rxrpc_call(call, rxrpc_call_error, atomic_read(&call->usage),
+			 here, ERR_PTR(ret));
+	_leave(" = %d", ret);
+	return ret;
+}
+
+/*
  * Set up an incoming call.  call->conn points to the connection.
  * This is called in BH context and isn't allowed to fail.
  */
@@ -471,6 +508,61 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
 }
 
 /*
+ * Prepare a kernel service call for retry.
+ */
+int rxrpc_prepare_call_for_retry(struct rxrpc_sock *rx, struct rxrpc_call *call)
+{
+	const void *here = __builtin_return_address(0);
+	int i;
+	u8 last = 0;
+
+	_enter("{%d,%d}", call->debug_id, atomic_read(&call->usage));
+
+	trace_rxrpc_call(call, rxrpc_call_release, atomic_read(&call->usage),
+			 here, (const void *)call->flags);
+
+	ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
+	ASSERTCMP(call->completion, !=, RXRPC_CALL_REMOTELY_ABORTED);
+	ASSERTCMP(call->completion, !=, RXRPC_CALL_LOCALLY_ABORTED);
+	ASSERT(list_empty(&call->recvmsg_link));
+
+	del_timer_sync(&call->timer);
+
+	_debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, call->conn);
+
+	if (call->conn)
+		rxrpc_disconnect_call(call);
+
+	if (rxrpc_is_service_call(call) ||
+	    !call->tx_phase ||
+	    call->tx_hard_ack != 0 ||
+	    call->rx_hard_ack != 0 ||
+	    call->rx_top != 0)
+		return -EINVAL;
+
+	call->state = RXRPC_CALL_UNINITIALISED;
+	call->completion = RXRPC_CALL_SUCCEEDED;
+	call->call_id = 0;
+	call->cid = 0;
+	call->cong_cwnd = 0;
+	call->cong_extra = 0;
+	call->cong_ssthresh = 0;
+	call->cong_mode = 0;
+	call->cong_dup_acks = 0;
+	call->cong_cumul_acks = 0;
+	call->acks_lowest_nak = 0;
+
+	for (i = 0; i < RXRPC_RXTX_BUFF_SIZE; i++) {
+		last |= call->rxtx_annotations[i];
+		call->rxtx_annotations[i] &= RXRPC_TX_ANNO_LAST;
+		call->rxtx_annotations[i] |= RXRPC_TX_ANNO_RETRANS;
+	}
+
+	_leave(" = 0");
+	return 0;
+}
+
+/*
  * release all the calls associated with a socket
  */
 void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
diff --git a/net/rxrpc/conn_client.c b/net/rxrpc/conn_client.c
index eb21576..5f9624b 100644
--- a/net/rxrpc/conn_client.c
+++ b/net/rxrpc/conn_client.c
@@ -555,7 +555,10 @@ static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
 	trace_rxrpc_client(conn, channel, rxrpc_client_chan_activate);
 
 	write_lock_bh(&call->state_lock);
-	call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
+	if (!test_bit(RXRPC_CALL_TX_LASTQ, &call->flags))
+		call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
+	else
+		call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
 	write_unlock_bh(&call->state_lock);
 
 	rxrpc_see_call(call);
@@ -688,15 +691,23 @@ int rxrpc_connect_call(struct rxrpc_call *call,
 
 	ret = rxrpc_get_client_conn(call, cp, srx, gfp);
 	if (ret < 0)
-		return ret;
+		goto out;
 
 	rxrpc_animate_client_conn(rxnet, call->conn);
 	rxrpc_activate_channels(call->conn);
 
 	ret = rxrpc_wait_for_channel(call, gfp);
-	if (ret < 0)
+	if (ret < 0) {
 		rxrpc_disconnect_client_call(call);
+		goto out;
+	}
 
+	spin_lock_bh(&call->conn->params.peer->lock);
+	hlist_add_head(&call->error_link,
+		       &call->conn->params.peer->error_targets);
+	spin_unlock_bh(&call->conn->params.peer->lock);
+
+out:
 	_leave(" = %d", ret);
 	return ret;
 }
diff --git a/net/rxrpc/conn_object.c b/net/rxrpc/conn_object.c
index 929b50d..fe57579 100644
--- a/net/rxrpc/conn_object.c
+++ b/net/rxrpc/conn_object.c
@@ -72,7 +72,7 @@ struct rxrpc_connection *rxrpc_find_connection_rcu(struct rxrpc_local *local,
 
 	_enter(",%x", sp->hdr.cid & RXRPC_CIDMASK);
 
-	if (rxrpc_extract_addr_from_skb(&srx, skb) < 0)
+	if (rxrpc_extract_addr_from_skb(local, &srx, skb) < 0)
 		goto not_found;
 
 	k.epoch	= sp->hdr.epoch;
diff --git a/net/rxrpc/key.c b/net/rxrpc/key.c
index 5436922..e7f6b88 100644
--- a/net/rxrpc/key.c
+++ b/net/rxrpc/key.c
@@ -92,6 +92,7 @@ static int rxrpc_preparse_xdr_rxkad(struct key_preparsed_payload *prep,
 				    const __be32 *xdr, unsigned int toklen)
 {
 	struct rxrpc_key_token *token, **pptoken;
+	time64_t expiry;
 	size_t plen;
 	u32 tktlen;
 
@@ -158,8 +159,9 @@ static int rxrpc_preparse_xdr_rxkad(struct key_preparsed_payload *prep,
 	     pptoken = &(*pptoken)->next)
 		continue;
 	*pptoken = token;
-	if (token->kad->expiry < prep->expiry)
-		prep->expiry = token->kad->expiry;
+	expiry = rxrpc_u32_to_time64(token->kad->expiry);
+	if (expiry < prep->expiry)
+		prep->expiry = expiry;
 
 	_leave(" = 0");
 	return 0;
@@ -433,6 +435,7 @@ static int rxrpc_preparse_xdr_rxk5(struct key_preparsed_payload *prep,
 	struct rxrpc_key_token *token, **pptoken;
 	struct rxk5_key *rxk5;
 	const __be32 *end_xdr = xdr + (toklen >> 2);
+	time64_t expiry;
 	int ret;
 
 	_enter(",{%x,%x,%x,%x},%u",
@@ -533,8 +536,9 @@ static int rxrpc_preparse_xdr_rxk5(struct key_preparsed_payload *prep,
 	     pptoken = &(*pptoken)->next)
 		continue;
 	*pptoken = token;
-	if (token->kad->expiry < prep->expiry)
-		prep->expiry = token->kad->expiry;
+	expiry = rxrpc_u32_to_time64(token->k5->endtime);
+	if (expiry < prep->expiry)
+		prep->expiry = expiry;
 
 	_leave(" = 0");
 	return 0;
@@ -691,6 +695,7 @@ static int rxrpc_preparse(struct key_preparsed_payload *prep)
 {
 	const struct rxrpc_key_data_v1 *v1;
 	struct rxrpc_key_token *token, **pp;
+	time64_t expiry;
 	size_t plen;
 	u32 kver;
 	int ret;
@@ -777,8 +782,9 @@ static int rxrpc_preparse(struct key_preparsed_payload *prep)
 	while (*pp)
 		pp = &(*pp)->next;
 	*pp = token;
-	if (token->kad->expiry < prep->expiry)
-		prep->expiry = token->kad->expiry;
+	expiry = rxrpc_u32_to_time64(token->kad->expiry);
+	if (expiry < prep->expiry)
+		prep->expiry = expiry;
 	token = NULL;
 	ret = 0;
 
@@ -955,7 +961,7 @@ int rxrpc_server_keyring(struct rxrpc_sock *rx, char __user *optval,
  */
 int rxrpc_get_server_data_key(struct rxrpc_connection *conn,
 			      const void *session_key,
-			      time_t expiry,
+			      time64_t expiry,
 			      u32 kvno)
 {
 	const struct cred *cred = current_cred();
@@ -982,7 +988,7 @@ int rxrpc_get_server_data_key(struct rxrpc_connection *conn,
 	data.kver = 1;
 	data.v1.security_index = RXRPC_SECURITY_RXKAD;
 	data.v1.ticket_length = 0;
-	data.v1.expiry = expiry;
+	data.v1.expiry = rxrpc_time64_to_u32(expiry);
 	data.v1.kvno = 0;
 
 	memcpy(&data.v1.session_key, session_key, sizeof(data.v1.session_key));
diff --git a/net/rxrpc/local_event.c b/net/rxrpc/local_event.c
index 540d395..93b5d91 100644
--- a/net/rxrpc/local_event.c
+++ b/net/rxrpc/local_event.c
@@ -39,7 +39,7 @@ static void rxrpc_send_version_request(struct rxrpc_local *local,
 
 	_enter("");
 
-	if (rxrpc_extract_addr_from_skb(&srx, skb) < 0)
+	if (rxrpc_extract_addr_from_skb(local, &srx, skb) < 0)
 		return;
 
 	msg.msg_name	= &srx.transport;
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 5bd2d0f..71e6f71 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -444,7 +444,7 @@ void rxrpc_reject_packets(struct rxrpc_local *local)
 		rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
 		sp = rxrpc_skb(skb);
 
-		if (rxrpc_extract_addr_from_skb(&srx, skb) == 0) {
+		if (rxrpc_extract_addr_from_skb(local, &srx, skb) == 0) {
 			msg.msg_namelen = srx.transport_len;
 
 			code = htonl(skb->priority);
diff --git a/net/rxrpc/peer_event.c b/net/rxrpc/peer_event.c
index 1ed9c0c..7f74950 100644
--- a/net/rxrpc/peer_event.c
+++ b/net/rxrpc/peer_event.c
@@ -37,6 +37,7 @@ static struct rxrpc_peer *rxrpc_lookup_peer_icmp_rcu(struct rxrpc_local *local,
 
 	memset(&srx, 0, sizeof(srx));
 	srx.transport_type = local->srx.transport_type;
+	srx.transport_len = local->srx.transport_len;
 	srx.transport.family = local->srx.transport.family;
 
 	/* Can we see an ICMP4 packet on an ICMP6 listening socket?  and vice
@@ -45,7 +46,6 @@ static struct rxrpc_peer *rxrpc_lookup_peer_icmp_rcu(struct rxrpc_local *local,
 	switch (srx.transport.family) {
 	case AF_INET:
 		srx.transport.sin.sin_port = serr->port;
-		srx.transport_len = sizeof(struct sockaddr_in);
 		switch (serr->ee.ee_origin) {
 		case SO_EE_ORIGIN_ICMP:
 			_net("Rx ICMP");
@@ -69,7 +69,6 @@ static struct rxrpc_peer *rxrpc_lookup_peer_icmp_rcu(struct rxrpc_local *local,
 #ifdef CONFIG_AF_RXRPC_IPV6
 	case AF_INET6:
 		srx.transport.sin6.sin6_port = serr->port;
-		srx.transport_len = sizeof(struct sockaddr_in6);
 		switch (serr->ee.ee_origin) {
 		case SO_EE_ORIGIN_ICMP6:
 			_net("Rx ICMP6");
@@ -79,6 +78,9 @@ static struct rxrpc_peer *rxrpc_lookup_peer_icmp_rcu(struct rxrpc_local *local,
 			break;
 		case SO_EE_ORIGIN_ICMP:
 			_net("Rx ICMP on v6 sock");
+			srx.transport.sin6.sin6_addr.s6_addr32[0] = 0;
+			srx.transport.sin6.sin6_addr.s6_addr32[1] = 0;
+			srx.transport.sin6.sin6_addr.s6_addr32[2] = htonl(0xffff);
 			memcpy(srx.transport.sin6.sin6_addr.s6_addr + 12,
 			       skb_network_header(skb) + serr->addr_offset,
 			       sizeof(struct in_addr));
diff --git a/net/rxrpc/rxkad.c b/net/rxrpc/rxkad.c
index 46d1a1f..c38b3a1 100644
--- a/net/rxrpc/rxkad.c
+++ b/net/rxrpc/rxkad.c
@@ -634,8 +634,8 @@ static int rxkad_issue_challenge(struct rxrpc_connection *conn)
 	challenge.min_level	= htonl(0);
 	challenge.__padding	= 0;
 
-	msg.msg_name	= &conn->params.peer->srx.transport.sin;
-	msg.msg_namelen	= sizeof(conn->params.peer->srx.transport.sin);
+	msg.msg_name	= &conn->params.peer->srx.transport;
+	msg.msg_namelen	= conn->params.peer->srx.transport_len;
 	msg.msg_control	= NULL;
 	msg.msg_controllen = 0;
 	msg.msg_flags	= 0;
@@ -689,8 +689,8 @@ static int rxkad_send_response(struct rxrpc_connection *conn,
 
 	_enter("");
 
-	msg.msg_name	= &conn->params.peer->srx.transport.sin;
-	msg.msg_namelen	= sizeof(conn->params.peer->srx.transport.sin);
+	msg.msg_name	= &conn->params.peer->srx.transport;
+	msg.msg_namelen	= conn->params.peer->srx.transport_len;
 	msg.msg_control	= NULL;
 	msg.msg_controllen = 0;
 	msg.msg_flags	= 0;
@@ -854,7 +854,7 @@ static int rxkad_decrypt_ticket(struct rxrpc_connection *conn,
 				struct sk_buff *skb,
 				void *ticket, size_t ticket_len,
 				struct rxrpc_crypt *_session_key,
-				time_t *_expiry,
+				time64_t *_expiry,
 				u32 *_abort_code)
 {
 	struct skcipher_request *req;
@@ -864,7 +864,7 @@ static int rxkad_decrypt_ticket(struct rxrpc_connection *conn,
 	struct in_addr addr;
 	unsigned int life;
 	const char *eproto;
-	time_t issue, now;
+	time64_t issue, now;
 	bool little_endian;
 	int ret;
 	u32 abort_code;
@@ -960,15 +960,15 @@ static int rxkad_decrypt_ticket(struct rxrpc_connection *conn,
 	if (little_endian) {
 		__le32 stamp;
 		memcpy(&stamp, p, 4);
-		issue = le32_to_cpu(stamp);
+		issue = rxrpc_u32_to_time64(le32_to_cpu(stamp));
 	} else {
 		__be32 stamp;
 		memcpy(&stamp, p, 4);
-		issue = be32_to_cpu(stamp);
+		issue = rxrpc_u32_to_time64(be32_to_cpu(stamp));
 	}
 	p += 4;
-	now = get_seconds();
-	_debug("KIV ISSUE: %lx [%lx]", issue, now);
+	now = ktime_get_real_seconds();
+	_debug("KIV ISSUE: %llx [%llx]", issue, now);
 
 	/* check the ticket is in date */
 	if (issue > now) {
@@ -1053,7 +1053,7 @@ static int rxkad_verify_response(struct rxrpc_connection *conn,
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 	struct rxrpc_crypt session_key;
 	const char *eproto;
-	time_t expiry;
+	time64_t expiry;
 	void *ticket;
 	u32 abort_code, version, kvno, ticket_len, level;
 	__be32 csum;
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index b0d2cda..9ea6f97 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -61,7 +61,7 @@ static int rxrpc_wait_for_tx_window(struct rxrpc_sock *rx,
 			  call->cong_cwnd + call->cong_extra))
 			break;
 		if (call->state >= RXRPC_CALL_COMPLETE) {
-			ret = -call->error;
+			ret = call->error;
 			break;
 		}
 		if (signal_pending(current)) {
@@ -101,11 +101,23 @@ static inline void rxrpc_instant_resend(struct rxrpc_call *call, int ix)
 }
 
 /*
+ * Notify the owner of the call that the transmit phase is ended and the last
+ * packet has been queued.
+ */
+static void rxrpc_notify_end_tx(struct rxrpc_sock *rx, struct rxrpc_call *call,
+				rxrpc_notify_end_tx_t notify_end_tx)
+{
+	if (notify_end_tx)
+		notify_end_tx(&rx->sk, call, call->user_call_ID);
+}
+
+/*
  * Queue a DATA packet for transmission, set the resend timeout and send the
  * packet immediately
  */
-static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
-			       bool last)
+static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
+			       struct sk_buff *skb, bool last,
+			       rxrpc_notify_end_tx_t notify_end_tx)
 {
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 	rxrpc_seq_t seq = sp->hdr.seq;
@@ -116,8 +128,10 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
 
 	ASSERTCMP(seq, ==, call->tx_top + 1);
 
-	if (last)
+	if (last) {
 		annotation |= RXRPC_TX_ANNO_LAST;
+		set_bit(RXRPC_CALL_TX_LASTQ, &call->flags);
+	}
 
 	/* We have to set the timestamp before queueing as the retransmit
 	 * algorithm can see the packet as soon as we queue it.
@@ -141,6 +155,7 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
 		switch (call->state) {
 		case RXRPC_CALL_CLIENT_SEND_REQUEST:
 			call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
+			rxrpc_notify_end_tx(rx, call, notify_end_tx);
 			break;
 		case RXRPC_CALL_SERVER_ACK_REQUEST:
 			call->state = RXRPC_CALL_SERVER_SEND_REPLY;
@@ -153,6 +168,7 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
 				break;
 		case RXRPC_CALL_SERVER_SEND_REPLY:
 			call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
+			rxrpc_notify_end_tx(rx, call, notify_end_tx);
 			break;
 		default:
 			break;
@@ -189,7 +205,8 @@ static void rxrpc_queue_packet(struct rxrpc_call *call, struct sk_buff *skb,
  */
 static int rxrpc_send_data(struct rxrpc_sock *rx,
 			   struct rxrpc_call *call,
-			   struct msghdr *msg, size_t len)
+			   struct msghdr *msg, size_t len,
+			   rxrpc_notify_end_tx_t notify_end_tx)
 {
 	struct rxrpc_skb_priv *sp;
 	struct sk_buff *skb;
@@ -311,11 +328,6 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 				call->tx_total_len -= copy;
 		}
 
-		/* check for the far side aborting the call or a network error
-		 * occurring */
-		if (call->state == RXRPC_CALL_COMPLETE)
-			goto call_terminated;
-
 		/* add the packet to the send queue if it's now full */
 		if (sp->remain <= 0 ||
 		    (msg_data_left(msg) == 0 && !more)) {
@@ -350,9 +362,21 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 			if (ret < 0)
 				goto out;
 
-			rxrpc_queue_packet(call, skb, !msg_data_left(msg) && !more);
+			rxrpc_queue_packet(rx, call, skb,
+					   !msg_data_left(msg) && !more,
+					   notify_end_tx);
 			skb = NULL;
 		}
+
+		/* Check for the far side aborting the call or a network error
+		 * occurring.  If this happens, save any packet that was under
+		 * construction so that in the case of a network error, the
+		 * call can be retried or redirected.
+		 */
+		if (call->state == RXRPC_CALL_COMPLETE) {
+			ret = call->error;
+			goto out;
+		}
 	} while (msg_data_left(msg) > 0);
 
 success:
@@ -362,11 +386,6 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 	_leave(" = %d", ret);
 	return ret;
 
-call_terminated:
-	rxrpc_free_skb(skb, rxrpc_skb_tx_freed);
-	_leave(" = %d", -call->error);
-	return -call->error;
-
 maybe_error:
 	if (copied)
 		goto success;
@@ -611,7 +630,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
 		/* Reply phase not begun or not complete for service call. */
 		ret = -EPROTO;
 	} else {
-		ret = rxrpc_send_data(rx, call, msg, len);
+		ret = rxrpc_send_data(rx, call, msg, len, NULL);
 	}
 
 	mutex_unlock(&call->user_mutex);
@@ -631,6 +650,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
  * @call: The call to send data through
  * @msg: The data to send
  * @len: The amount of data to send
+ * @notify_end_tx: Notification that the last packet is queued.
  *
  * Allow a kernel service to send data on a call.  The call must be in an state
  * appropriate to sending data.  No control data should be supplied in @msg,
@@ -638,7 +658,8 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
  * more data to come, otherwise this data will end the transmission phase.
  */
 int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
-			   struct msghdr *msg, size_t len)
+			   struct msghdr *msg, size_t len,
+			   rxrpc_notify_end_tx_t notify_end_tx)
 {
 	int ret;
 
@@ -656,11 +677,12 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
 	case RXRPC_CALL_SERVER_ACK_REQUEST:
 	case RXRPC_CALL_SERVER_SEND_REPLY:
-		ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len);
+		ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
+				      notify_end_tx);
 		break;
 	case RXRPC_CALL_COMPLETE:
 		read_lock_bh(&call->state_lock);
-		ret = -call->error;
+		ret = call->error;
 		read_unlock_bh(&call->state_lock);
 		break;
 	default:
diff --git a/net/rxrpc/utils.c b/net/rxrpc/utils.c
index ff7af71..e801171 100644
--- a/net/rxrpc/utils.c
+++ b/net/rxrpc/utils.c
@@ -17,17 +17,28 @@
 /*
  * Fill out a peer address from a socket buffer containing a packet.
  */
-int rxrpc_extract_addr_from_skb(struct sockaddr_rxrpc *srx, struct sk_buff *skb)
+int rxrpc_extract_addr_from_skb(struct rxrpc_local *local,
+				struct sockaddr_rxrpc *srx,
+				struct sk_buff *skb)
 {
 	memset(srx, 0, sizeof(*srx));
 
 	switch (ntohs(skb->protocol)) {
 	case ETH_P_IP:
-		srx->transport_type = SOCK_DGRAM;
-		srx->transport_len = sizeof(srx->transport.sin);
-		srx->transport.sin.sin_family = AF_INET;
-		srx->transport.sin.sin_port = udp_hdr(skb)->source;
-		srx->transport.sin.sin_addr.s_addr = ip_hdr(skb)->saddr;
+		if (local->srx.transport.family == AF_INET6) {
+			srx->transport_type = SOCK_DGRAM;
+			srx->transport_len = sizeof(srx->transport.sin6);
+			srx->transport.sin6.sin6_family = AF_INET6;
+			srx->transport.sin6.sin6_port = udp_hdr(skb)->source;
+			srx->transport.sin6.sin6_addr.s6_addr32[2] = htonl(0xffff);
+			srx->transport.sin6.sin6_addr.s6_addr32[3] = ip_hdr(skb)->saddr;
+		} else {
+			srx->transport_type = SOCK_DGRAM;
+			srx->transport_len = sizeof(srx->transport.sin);
+			srx->transport.sin.sin_family = AF_INET;
+			srx->transport.sin.sin_port = udp_hdr(skb)->source;
+			srx->transport.sin.sin_addr.s_addr = ip_hdr(skb)->saddr;
+		}
 		return 0;
 
 #ifdef CONFIG_AF_RXRPC_IPV6