RDS: IB: split mr pool to improve 8K messages performance

8K message sizes are pretty important usecase for RDS current
workloads so we make provison to have 8K mrs available from the pool.
Based on number of SG's in the RDS message, we pick a pool to use.

Also to make sure that we don't under utlise mrs when say 8k messages
are dominating which could lead to 8k pull being exhausted, we fall-back
to 1m pool till 8k pool recovers for use.

This helps to at least push ~55 kB/s bidirectional data which
is a nice improvement.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index bb62024..a234074 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -65,6 +65,7 @@
  * Our own little FMR pool
  */
 struct rds_ib_mr_pool {
+	unsigned int            pool_type;
 	struct mutex		flush_lock;		/* serialize fmr invalidate */
 	struct delayed_work	flush_worker;		/* flush worker */
 
@@ -234,7 +235,8 @@
 		rds_conn_destroy(ic->conn);
 }
 
-struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev)
+struct rds_ib_mr_pool *rds_ib_create_mr_pool(struct rds_ib_device *rds_ibdev,
+					     int pool_type)
 {
 	struct rds_ib_mr_pool *pool;
 
@@ -242,6 +244,7 @@
 	if (!pool)
 		return ERR_PTR(-ENOMEM);
 
+	pool->pool_type = pool_type;
 	init_llist_head(&pool->free_list);
 	init_llist_head(&pool->drop_list);
 	init_llist_head(&pool->clean_list);
@@ -249,28 +252,30 @@
 	init_waitqueue_head(&pool->flush_wait);
 	INIT_DELAYED_WORK(&pool->flush_worker, rds_ib_mr_pool_flush_worker);
 
-	pool->fmr_attr.max_pages = fmr_message_size;
+	if (pool_type == RDS_IB_MR_1M_POOL) {
+		/* +1 allows for unaligned MRs */
+		pool->fmr_attr.max_pages = RDS_FMR_1M_MSG_SIZE + 1;
+		pool->max_items = RDS_FMR_1M_POOL_SIZE;
+	} else {
+		/* pool_type == RDS_IB_MR_8K_POOL */
+		pool->fmr_attr.max_pages = RDS_FMR_8K_MSG_SIZE + 1;
+		pool->max_items = RDS_FMR_8K_POOL_SIZE;
+	}
+
+	pool->max_free_pinned = pool->max_items * pool->fmr_attr.max_pages / 4;
 	pool->fmr_attr.max_maps = rds_ibdev->fmr_max_remaps;
 	pool->fmr_attr.page_shift = PAGE_SHIFT;
-	pool->max_free_pinned = rds_ibdev->max_fmrs * fmr_message_size / 4;
-
-	/* We never allow more than max_items MRs to be allocated.
-	 * When we exceed more than max_items_soft, we start freeing
-	 * items more aggressively.
-	 * Make sure that max_items > max_items_soft > max_items / 2
-	 */
 	pool->max_items_soft = rds_ibdev->max_fmrs * 3 / 4;
-	pool->max_items = rds_ibdev->max_fmrs;
 
 	return pool;
 }
 
 void rds_ib_get_mr_info(struct rds_ib_device *rds_ibdev, struct rds_info_rdma_connection *iinfo)
 {
-	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
+	struct rds_ib_mr_pool *pool_1m = rds_ibdev->mr_1m_pool;
 
-	iinfo->rdma_mr_max = pool->max_items;
-	iinfo->rdma_mr_size = pool->fmr_attr.max_pages;
+	iinfo->rdma_mr_max = pool_1m->max_items;
+	iinfo->rdma_mr_size = pool_1m->fmr_attr.max_pages;
 }
 
 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *pool)
@@ -312,15 +317,29 @@
 	}
 }
 
-static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
+static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev,
+					  int npages)
 {
-	struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
+	struct rds_ib_mr_pool *pool;
 	struct rds_ib_mr *ibmr = NULL;
 	int err = 0, iter = 0;
 
+	if (npages <= RDS_FMR_8K_MSG_SIZE)
+		pool = rds_ibdev->mr_8k_pool;
+	else
+		pool = rds_ibdev->mr_1m_pool;
+
 	if (atomic_read(&pool->dirty_count) >= pool->max_items / 10)
 		queue_delayed_work(rds_ib_fmr_wq, &pool->flush_worker, 10);
 
+	/* Switch pools if one of the pool is reaching upper limit */
+	if (atomic_read(&pool->dirty_count) >=  pool->max_items * 9 / 10) {
+		if (pool->pool_type == RDS_IB_MR_8K_POOL)
+			pool = rds_ibdev->mr_1m_pool;
+		else
+			pool = rds_ibdev->mr_8k_pool;
+	}
+
 	while (1) {
 		ibmr = rds_ib_reuse_fmr(pool);
 		if (ibmr)
@@ -341,12 +360,18 @@
 		atomic_dec(&pool->item_count);
 
 		if (++iter > 2) {
-			rds_ib_stats_inc(s_ib_rdma_mr_pool_depleted);
+			if (pool->pool_type == RDS_IB_MR_8K_POOL)
+				rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_depleted);
+			else
+				rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_depleted);
 			return ERR_PTR(-EAGAIN);
 		}
 
 		/* We do have some empty MRs. Flush them out. */
-		rds_ib_stats_inc(s_ib_rdma_mr_pool_wait);
+		if (pool->pool_type == RDS_IB_MR_8K_POOL)
+			rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_wait);
+		else
+			rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_wait);
 		rds_ib_flush_mr_pool(pool, 0, &ibmr);
 		if (ibmr)
 			return ibmr;
@@ -371,7 +396,12 @@
 		goto out_no_cigar;
 	}
 
-	rds_ib_stats_inc(s_ib_rdma_mr_alloc);
+	ibmr->pool = pool;
+	if (pool->pool_type == RDS_IB_MR_8K_POOL)
+		rds_ib_stats_inc(s_ib_rdma_mr_8k_alloc);
+	else
+		rds_ib_stats_inc(s_ib_rdma_mr_1m_alloc);
+
 	return ibmr;
 
 out_no_cigar:
@@ -427,7 +457,7 @@
 	}
 
 	page_cnt += len >> PAGE_SHIFT;
-	if (page_cnt > fmr_message_size)
+	if (page_cnt > ibmr->pool->fmr_attr.max_pages)
 		return -EINVAL;
 
 	dma_pages = kmalloc_node(sizeof(u64) * page_cnt, GFP_ATOMIC,
@@ -459,7 +489,10 @@
 	ibmr->sg_dma_len = sg_dma_len;
 	ibmr->remap_count++;
 
-	rds_ib_stats_inc(s_ib_rdma_mr_used);
+	if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
+		rds_ib_stats_inc(s_ib_rdma_mr_8k_used);
+	else
+		rds_ib_stats_inc(s_ib_rdma_mr_1m_used);
 	ret = 0;
 
 out:
@@ -591,7 +624,7 @@
  * to free as many MRs as needed to get back to this limit.
  */
 static int rds_ib_flush_mr_pool(struct rds_ib_mr_pool *pool,
-			        int free_all, struct rds_ib_mr **ibmr_ret)
+				int free_all, struct rds_ib_mr **ibmr_ret)
 {
 	struct rds_ib_mr *ibmr, *next;
 	struct llist_node *clean_nodes;
@@ -602,11 +635,14 @@
 	unsigned int nfreed = 0, dirty_to_clean = 0, free_goal;
 	int ret = 0;
 
-	rds_ib_stats_inc(s_ib_rdma_mr_pool_flush);
+	if (pool->pool_type == RDS_IB_MR_8K_POOL)
+		rds_ib_stats_inc(s_ib_rdma_mr_8k_pool_flush);
+	else
+		rds_ib_stats_inc(s_ib_rdma_mr_1m_pool_flush);
 
 	if (ibmr_ret) {
 		DEFINE_WAIT(wait);
-		while(!mutex_trylock(&pool->flush_lock)) {
+		while (!mutex_trylock(&pool->flush_lock)) {
 			ibmr = rds_ib_reuse_fmr(pool);
 			if (ibmr) {
 				*ibmr_ret = ibmr;
@@ -663,8 +699,12 @@
 	list_for_each_entry_safe(ibmr, next, &unmap_list, unmap_list) {
 		unpinned += ibmr->sg_len;
 		__rds_ib_teardown_mr(ibmr);
-		if (nfreed < free_goal || ibmr->remap_count >= pool->fmr_attr.max_maps) {
-			rds_ib_stats_inc(s_ib_rdma_mr_free);
+		if (nfreed < free_goal ||
+		    ibmr->remap_count >= pool->fmr_attr.max_maps) {
+			if (ibmr->pool->pool_type == RDS_IB_MR_8K_POOL)
+				rds_ib_stats_inc(s_ib_rdma_mr_8k_free);
+			else
+				rds_ib_stats_inc(s_ib_rdma_mr_1m_free);
 			list_del(&ibmr->unmap_list);
 			ib_dealloc_fmr(ibmr->fmr);
 			kfree(ibmr);
@@ -756,10 +796,11 @@
 
 	down_read(&rds_ib_devices_lock);
 	list_for_each_entry(rds_ibdev, &rds_ib_devices, list) {
-		struct rds_ib_mr_pool *pool = rds_ibdev->mr_pool;
+		if (rds_ibdev->mr_8k_pool)
+			rds_ib_flush_mr_pool(rds_ibdev->mr_8k_pool, 0, NULL);
 
-		if (pool)
-			rds_ib_flush_mr_pool(pool, 0, NULL);
+		if (rds_ibdev->mr_1m_pool)
+			rds_ib_flush_mr_pool(rds_ibdev->mr_1m_pool, 0, NULL);
 	}
 	up_read(&rds_ib_devices_lock);
 }
@@ -777,12 +818,12 @@
 		goto out;
 	}
 
-	if (!rds_ibdev->mr_pool) {
+	if (!rds_ibdev->mr_8k_pool || !rds_ibdev->mr_1m_pool) {
 		ret = -ENODEV;
 		goto out;
 	}
 
-	ibmr = rds_ib_alloc_fmr(rds_ibdev);
+	ibmr = rds_ib_alloc_fmr(rds_ibdev, nents);
 	if (IS_ERR(ibmr)) {
 		rds_ib_dev_put(rds_ibdev);
 		return ibmr;