RDS: Implement silent atomics

Signed-off-by: Andy Grover <andy.grover@oracle.com>
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index d839b40..e6745d8 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -225,15 +225,12 @@
 			/* In the error case, wc.opcode sometimes contains garbage */
 			switch (send->s_wr.opcode) {
 			case IB_WR_SEND:
-				if (send->s_rm)
-					rds_ib_send_unmap_rm(ic, send, wc.status);
-				break;
 			case IB_WR_RDMA_WRITE:
 			case IB_WR_RDMA_READ:
 			case IB_WR_ATOMIC_FETCH_AND_ADD:
 			case IB_WR_ATOMIC_CMP_AND_SWP:
-				/* Nothing to be done - the SG list will be unmapped
-				 * when the SEND completes. */
+				if (send->s_rm)
+					rds_ib_send_unmap_rm(ic, send, wc.status);
 				break;
 			default:
 				if (printk_ratelimit())
@@ -425,6 +422,21 @@
 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
 }
 
+static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
+					      struct rds_ib_send_work *send,
+					      bool notify)
+{
+	/*
+	 * We want to delay signaling completions just enough to get
+	 * the batching benefits but not so much that we create dead time
+	 * on the wire.
+	 */
+	if (ic->i_unsignaled_wrs-- == 0 || notify) {
+		ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
+		send->s_wr.send_flags |= IB_SEND_SIGNALED;
+	}
+}
+
 /*
  * This can be called multiple times for a given message.  The first time
  * we see a message we map its scatterlist into the IB device so that
@@ -517,7 +529,6 @@
 			rm->data.m_count = 0;
 		}
 
-		ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
 		rds_message_addref(rm);
 		ic->i_rm = rm;
 
@@ -608,15 +619,7 @@
 			}
 		}
 
-		/*
-		 * We want to delay signaling completions just enough to get
-		 * the batching benefits but not so much that we create dead time
-		 * on the wire.
-		 */
-		if (ic->i_unsignaled_wrs-- == 0) {
-			ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
-			send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
-		}
+		rds_ib_set_wr_signal_state(ic, send, 0);
 
 		/*
 		 * Always signal the last one if we're stopping due to flow control.
@@ -656,7 +659,7 @@
 	/* if we finished the message then send completion owns it */
 	if (scat == &rm->data.m_sg[rm->data.m_count]) {
 		prev->s_rm = ic->i_rm;
-		prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
+		prev->s_wr.send_flags |= IB_SEND_SOLICITED;
 		ic->i_rm = NULL;
 	}
 
@@ -698,9 +701,10 @@
  * A simplified version of the rdma case, we always map 1 SG, and
  * only 8 bytes, for the return value from the atomic operation.
  */
-int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
+int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
 {
 	struct rds_ib_connection *ic = conn->c_transport_data;
+	struct rm_atomic_op *op = &rm->atomic;
 	struct rds_ib_send_work *send = NULL;
 	struct ib_send_wr *failed_wr;
 	struct rds_ib_device *rds_ibdev;
@@ -731,12 +735,20 @@
 		send->s_wr.wr.atomic.compare_add = op->op_swap_add;
 		send->s_wr.wr.atomic.swap = 0;
 	}
-	send->s_wr.send_flags = IB_SEND_SIGNALED;
+	rds_ib_set_wr_signal_state(ic, send, op->op_notify);
 	send->s_wr.num_sge = 1;
 	send->s_wr.next = NULL;
 	send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
 	send->s_wr.wr.atomic.rkey = op->op_rkey;
 
+	/*
+	 * If there is no data or rdma ops in the message, then
+	 * we must fill in s_rm ourselves, so we properly clean up
+	 * on completion.
+	 */
+	if (!rm->rdma.m_rdma_op.r_active && !rm->data.op_active)
+		send->s_rm = rm;
+
 	/* map 8 byte retval buffer to the device */
 	ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
 	rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
@@ -836,14 +848,8 @@
 	for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) {
 		send->s_wr.send_flags = 0;
 		send->s_queued = jiffies;
-		/*
-		 * We want to delay signaling completions just enough to get
-		 * the batching benefits but not so much that we create dead time on the wire.
-		 */
-		if (ic->i_unsignaled_wrs-- == 0) {
-			ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
-			send->s_wr.send_flags = IB_SEND_SIGNALED;
-		}
+
+		rds_ib_set_wr_signal_state(ic, send, op->r_notify);
 
 		send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
 		send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -884,10 +890,6 @@
 			send = ic->i_sends;
 	}
 
-	/* if we finished the message then send completion owns it */
-	if (scat == &op->r_sg[op->r_count])
-		prev->s_wr.send_flags = IB_SEND_SIGNALED;
-
 	if (i < work_alloc) {
 		rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
 		work_alloc = i;