ceph: don't pre-allocate space for cap release messages

Previously we pre-allocate cap release messages for each caps. This
wastes lots of memory when there are large amount of caps. This patch
make the code not pre-allocate the cap release messages. Instead,
we add the corresponding ceph_cap struct to a list when releasing a
cap. Later when flush cap releases is needed, we allocate the cap
release messages dynamically.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 2bb9264..76eb144 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -458,7 +458,6 @@
 	s->s_cap_reconnect = 0;
 	s->s_cap_iterator = NULL;
 	INIT_LIST_HEAD(&s->s_cap_releases);
-	INIT_LIST_HEAD(&s->s_cap_releases_done);
 	INIT_LIST_HEAD(&s->s_cap_flushing);
 	INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
 
@@ -998,27 +997,25 @@
  * session caps
  */
 
-/*
- * Free preallocated cap messages assigned to this session
- */
-static void cleanup_cap_releases(struct ceph_mds_session *session)
+/* caller holds s_cap_lock, we drop it */
+static void cleanup_cap_releases(struct ceph_mds_client *mdsc,
+				 struct ceph_mds_session *session)
+	__releases(session->s_cap_lock)
 {
-	struct ceph_msg *msg;
-
-	spin_lock(&session->s_cap_lock);
-	while (!list_empty(&session->s_cap_releases)) {
-		msg = list_first_entry(&session->s_cap_releases,
-				       struct ceph_msg, list_head);
-		list_del_init(&msg->list_head);
-		ceph_msg_put(msg);
-	}
-	while (!list_empty(&session->s_cap_releases_done)) {
-		msg = list_first_entry(&session->s_cap_releases_done,
-				       struct ceph_msg, list_head);
-		list_del_init(&msg->list_head);
-		ceph_msg_put(msg);
-	}
+	LIST_HEAD(tmp_list);
+	list_splice_init(&session->s_cap_releases, &tmp_list);
+	session->s_num_cap_releases = 0;
 	spin_unlock(&session->s_cap_lock);
+
+	dout("cleanup_cap_releases mds%d\n", session->s_mds);
+	while (!list_empty(&tmp_list)) {
+		struct ceph_cap *cap;
+		/* zero out the in-progress message */
+		cap = list_first_entry(&tmp_list,
+					struct ceph_cap, session_caps);
+		list_del(&cap->session_caps);
+		ceph_put_cap(mdsc, cap);
+	}
 }
 
 static void cleanup_session_requests(struct ceph_mds_client *mdsc,
@@ -1095,10 +1092,16 @@
 			dout("iterate_session_caps  finishing cap %p removal\n",
 			     cap);
 			BUG_ON(cap->session != session);
+			cap->session = NULL;
 			list_del_init(&cap->session_caps);
 			session->s_nr_caps--;
-			cap->session = NULL;
-			old_cap = cap;  /* put_cap it w/o locks held */
+			if (cap->queue_release) {
+				list_add_tail(&cap->session_caps,
+					      &session->s_cap_releases);
+				session->s_num_cap_releases++;
+			} else {
+				old_cap = cap;  /* put_cap it w/o locks held */
+			}
 		}
 		if (ret < 0)
 			goto out;
@@ -1191,11 +1194,12 @@
 			spin_lock(&session->s_cap_lock);
 		}
 	}
-	spin_unlock(&session->s_cap_lock);
+
+	// drop cap expires and unlock s_cap_lock
+	cleanup_cap_releases(session->s_mdsc, session);
 
 	BUG_ON(session->s_nr_caps > 0);
 	BUG_ON(!list_empty(&session->s_cap_flushing));
-	cleanup_cap_releases(session);
 }
 
 /*
@@ -1418,76 +1422,10 @@
 		session->s_trim_caps = 0;
 	}
 
-	ceph_add_cap_releases(mdsc, session);
 	ceph_send_cap_releases(mdsc, session);
 	return 0;
 }
 
-/*
- * Allocate cap_release messages.  If there is a partially full message
- * in the queue, try to allocate enough to cover it's remainder, so that
- * we can send it immediately.
- *
- * Called under s_mutex.
- */
-int ceph_add_cap_releases(struct ceph_mds_client *mdsc,
-			  struct ceph_mds_session *session)
-{
-	struct ceph_msg *msg, *partial = NULL;
-	struct ceph_mds_cap_release *head;
-	int err = -ENOMEM;
-	int extra = mdsc->fsc->mount_options->cap_release_safety;
-	int num;
-
-	dout("add_cap_releases %p mds%d extra %d\n", session, session->s_mds,
-	     extra);
-
-	spin_lock(&session->s_cap_lock);
-
-	if (!list_empty(&session->s_cap_releases)) {
-		msg = list_first_entry(&session->s_cap_releases,
-				       struct ceph_msg,
-				 list_head);
-		head = msg->front.iov_base;
-		num = le32_to_cpu(head->num);
-		if (num) {
-			dout(" partial %p with (%d/%d)\n", msg, num,
-			     (int)CEPH_CAPS_PER_RELEASE);
-			extra += CEPH_CAPS_PER_RELEASE - num;
-			partial = msg;
-		}
-	}
-	while (session->s_num_cap_releases < session->s_nr_caps + extra) {
-		spin_unlock(&session->s_cap_lock);
-		msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE, PAGE_CACHE_SIZE,
-				   GFP_NOFS, false);
-		if (!msg)
-			goto out_unlocked;
-		dout("add_cap_releases %p msg %p now %d\n", session, msg,
-		     (int)msg->front.iov_len);
-		head = msg->front.iov_base;
-		head->num = cpu_to_le32(0);
-		msg->front.iov_len = sizeof(*head);
-		spin_lock(&session->s_cap_lock);
-		list_add(&msg->list_head, &session->s_cap_releases);
-		session->s_num_cap_releases += CEPH_CAPS_PER_RELEASE;
-	}
-
-	if (partial) {
-		head = partial->front.iov_base;
-		num = le32_to_cpu(head->num);
-		dout(" queueing partial %p with %d/%d\n", partial, num,
-		     (int)CEPH_CAPS_PER_RELEASE);
-		list_move_tail(&partial->list_head,
-			       &session->s_cap_releases_done);
-		session->s_num_cap_releases -= CEPH_CAPS_PER_RELEASE - num;
-	}
-	err = 0;
-	spin_unlock(&session->s_cap_lock);
-out_unlocked:
-	return err;
-}
-
 static int check_cap_flush(struct ceph_inode_info *ci,
 			   u64 want_flush_seq, u64 want_snap_seq)
 {
@@ -1590,62 +1528,76 @@
 void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
 			    struct ceph_mds_session *session)
 {
-	struct ceph_msg *msg;
+	struct ceph_msg *msg = NULL;
+	struct ceph_mds_cap_release *head;
+	struct ceph_mds_cap_item *item;
+	struct ceph_cap *cap;
+	LIST_HEAD(tmp_list);
+	int num_cap_releases;
 
-	dout("send_cap_releases mds%d\n", session->s_mds);
 	spin_lock(&session->s_cap_lock);
-	while (!list_empty(&session->s_cap_releases_done)) {
-		msg = list_first_entry(&session->s_cap_releases_done,
-				 struct ceph_msg, list_head);
-		list_del_init(&msg->list_head);
-		spin_unlock(&session->s_cap_lock);
+again:
+	list_splice_init(&session->s_cap_releases, &tmp_list);
+	num_cap_releases = session->s_num_cap_releases;
+	session->s_num_cap_releases = 0;
+	spin_unlock(&session->s_cap_lock);
+
+	while (!list_empty(&tmp_list)) {
+		if (!msg) {
+			msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
+					PAGE_CACHE_SIZE, GFP_NOFS, false);
+			if (!msg)
+				goto out_err;
+			head = msg->front.iov_base;
+			head->num = cpu_to_le32(0);
+			msg->front.iov_len = sizeof(*head);
+		}
+		cap = list_first_entry(&tmp_list, struct ceph_cap,
+					session_caps);
+		list_del(&cap->session_caps);
+		num_cap_releases--;
+
+		head = msg->front.iov_base;
+		le32_add_cpu(&head->num, 1);
+		item = msg->front.iov_base + msg->front.iov_len;
+		item->ino = cpu_to_le64(cap->cap_ino);
+		item->cap_id = cpu_to_le64(cap->cap_id);
+		item->migrate_seq = cpu_to_le32(cap->mseq);
+		item->seq = cpu_to_le32(cap->issue_seq);
+		msg->front.iov_len += sizeof(*item);
+
+		ceph_put_cap(mdsc, cap);
+
+		if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
+			msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+			dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
+			ceph_con_send(&session->s_con, msg);
+			msg = NULL;
+		}
+	}
+
+	BUG_ON(num_cap_releases != 0);
+
+	spin_lock(&session->s_cap_lock);
+	if (!list_empty(&session->s_cap_releases))
+		goto again;
+	spin_unlock(&session->s_cap_lock);
+
+	if (msg) {
 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 		dout("send_cap_releases mds%d %p\n", session->s_mds, msg);
 		ceph_con_send(&session->s_con, msg);
-		spin_lock(&session->s_cap_lock);
 	}
+	return;
+out_err:
+	pr_err("send_cap_releases mds%d, failed to allocate message\n",
+		session->s_mds);
+	spin_lock(&session->s_cap_lock);
+	list_splice(&tmp_list, &session->s_cap_releases);
+	session->s_num_cap_releases += num_cap_releases;
 	spin_unlock(&session->s_cap_lock);
 }
 
-static void discard_cap_releases(struct ceph_mds_client *mdsc,
-				 struct ceph_mds_session *session)
-{
-	struct ceph_msg *msg;
-	struct ceph_mds_cap_release *head;
-	unsigned num;
-
-	dout("discard_cap_releases mds%d\n", session->s_mds);
-
-	if (!list_empty(&session->s_cap_releases)) {
-		/* zero out the in-progress message */
-		msg = list_first_entry(&session->s_cap_releases,
-					struct ceph_msg, list_head);
-		head = msg->front.iov_base;
-		num = le32_to_cpu(head->num);
-		dout("discard_cap_releases mds%d %p %u\n",
-		     session->s_mds, msg, num);
-		head->num = cpu_to_le32(0);
-		msg->front.iov_len = sizeof(*head);
-		session->s_num_cap_releases += num;
-	}
-
-	/* requeue completed messages */
-	while (!list_empty(&session->s_cap_releases_done)) {
-		msg = list_first_entry(&session->s_cap_releases_done,
-				 struct ceph_msg, list_head);
-		list_del_init(&msg->list_head);
-
-		head = msg->front.iov_base;
-		num = le32_to_cpu(head->num);
-		dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg,
-		     num);
-		session->s_num_cap_releases += num;
-		head->num = cpu_to_le32(0);
-		msg->front.iov_len = sizeof(*head);
-		list_add(&msg->list_head, &session->s_cap_releases);
-	}
-}
-
 /*
  * requests
  */
@@ -2529,7 +2481,6 @@
 	}
 	mutex_unlock(&mdsc->mutex);
 
-	ceph_add_cap_releases(mdsc, req->r_session);
 	mutex_unlock(&session->s_mutex);
 
 	/* kick calling process */
@@ -2921,8 +2872,7 @@
 	 */
 	session->s_cap_reconnect = 1;
 	/* drop old cap expires; we're about to reestablish that state */
-	discard_cap_releases(mdsc, session);
-	spin_unlock(&session->s_cap_lock);
+	cleanup_cap_releases(mdsc, session);
 
 	/* trim unused caps to reduce MDS's cache rejoin time */
 	if (mdsc->fsc->sb->s_root)
@@ -3385,7 +3335,6 @@
 			send_renew_caps(mdsc, s);
 		else
 			ceph_con_keepalive(&s->s_con);
-		ceph_add_cap_releases(mdsc, s);
 		if (s->s_state == CEPH_MDS_SESSION_OPEN ||
 		    s->s_state == CEPH_MDS_SESSION_HUNG)
 			ceph_send_cap_releases(mdsc, s);