rds: use RCU to protect the connection hash

The connection hash was almost entirely RCU ready, this
just makes the final couple of changes to use RCU instead
of spinlocks for everything.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
diff --git a/net/rds/connection.c b/net/rds/connection.c
index 87df15b..180b83a 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -62,6 +62,7 @@
 		var |= RDS_INFO_CONNECTION_FLAG_##suffix;	\
 } while (0)
 
+/* rcu read lock must be held or the connection spinlock */
 static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
 					      __be32 laddr, __be32 faddr,
 					      struct rds_transport *trans)
@@ -69,7 +70,7 @@
 	struct rds_connection *conn, *ret = NULL;
 	struct hlist_node *pos;
 
-	hlist_for_each_entry(conn, pos, head, c_hash_node) {
+	hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
 		if (conn->c_faddr == faddr && conn->c_laddr == laddr &&
 				conn->c_trans == trans) {
 			ret = conn;
@@ -119,7 +120,8 @@
 	unsigned long flags;
 	int ret;
 
-	spin_lock_irqsave(&rds_conn_lock, flags);
+
+	rcu_read_lock();
 	conn = rds_conn_lookup(head, laddr, faddr, trans);
 	if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
 	    !is_outgoing) {
@@ -130,7 +132,7 @@
 		parent = conn;
 		conn = parent->c_passive;
 	}
-	spin_unlock_irqrestore(&rds_conn_lock, flags);
+	rcu_read_unlock();
 	if (conn)
 		goto out;
 
@@ -227,7 +229,7 @@
 			kmem_cache_free(rds_conn_slab, conn);
 			conn = found;
 		} else {
-			hlist_add_head(&conn->c_hash_node, head);
+			hlist_add_head_rcu(&conn->c_hash_node, head);
 			rds_cong_add_conn(conn);
 			rds_conn_count++;
 		}
@@ -306,8 +308,13 @@
 	 * to the conn hash, so we never trigger a reconnect on this
 	 * conn - the reconnect is always triggered by the active peer. */
 	cancel_delayed_work_sync(&conn->c_conn_w);
-	if (!hlist_unhashed(&conn->c_hash_node))
+	rcu_read_lock();
+	if (!hlist_unhashed(&conn->c_hash_node)) {
+		rcu_read_unlock();
 		rds_queue_reconnect(conn);
+	} else {
+		rcu_read_unlock();
+	}
 }
 
 /*
@@ -323,14 +330,12 @@
 
 	/* Ensure conn will not be scheduled for reconnect */
 	spin_lock_irq(&rds_conn_lock);
-	hlist_del_init(&conn->c_hash_node);
+	hlist_del_init_rcu(&conn->c_hash_node);
 	spin_unlock_irq(&rds_conn_lock);
 
-	/* wait for the rds thread to shut it down */
-	atomic_set(&conn->c_state, RDS_CONN_ERROR);
-	cancel_delayed_work(&conn->c_conn_w);
-	queue_work(rds_wq, &conn->c_down_w);
-	flush_workqueue(rds_wq);
+	synchronize_rcu();
+
+	rds_conn_shutdown(conn);
 
 	/* tear down queued messages */
 	list_for_each_entry_safe(rm, rtmp,
@@ -369,17 +374,16 @@
 	struct list_head *list;
 	struct rds_connection *conn;
 	struct rds_message *rm;
-	unsigned long flags;
 	unsigned int total = 0;
 	size_t i;
 
 	len /= sizeof(struct rds_info_message);
 
-	spin_lock_irqsave(&rds_conn_lock, flags);
+	rcu_read_lock();
 
 	for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
 	     i++, head++) {
-		hlist_for_each_entry(conn, pos, head, c_hash_node) {
+		hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
 			if (want_send)
 				list = &conn->c_send_queue;
 			else
@@ -399,8 +403,7 @@
 			spin_unlock(&conn->c_lock);
 		}
 	}
-
-	spin_unlock_irqrestore(&rds_conn_lock, flags);
+	rcu_read_unlock();
 
 	lens->nr = total;
 	lens->each = sizeof(struct rds_info_message);
@@ -430,19 +433,17 @@
 	uint64_t buffer[(item_len + 7) / 8];
 	struct hlist_head *head;
 	struct hlist_node *pos;
-	struct hlist_node *tmp;
 	struct rds_connection *conn;
-	unsigned long flags;
 	size_t i;
 
-	spin_lock_irqsave(&rds_conn_lock, flags);
+	rcu_read_lock();
 
 	lens->nr = 0;
 	lens->each = item_len;
 
 	for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
 	     i++, head++) {
-		hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) {
+		hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
 
 			/* XXX no c_lock usage.. */
 			if (!visitor(conn, buffer))
@@ -458,8 +459,7 @@
 			lens->nr++;
 		}
 	}
-
-	spin_unlock_irqrestore(&rds_conn_lock, flags);
+	rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_for_each_conn_info);