ceph: mount non-default filesystem by name

To mount non-default filesytem, user currently needs to provide mds
namespace ID. This is inconvenience.

This patch makes user be able to mount filesystem by name. If user
wants to mount non-default filesystem. Client first subscribes to
fsmap.user. Subscribe to mdsmap.<ID> after getting ID of filesystem.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 78a3495..e555745 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2166,6 +2166,11 @@
 	mds = __choose_mds(mdsc, req);
 	if (mds < 0 ||
 	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
+		if (mdsc->mdsmap_err) {
+			err = mdsc->mdsmap_err;
+			dout("do_request mdsmap err %d\n", err);
+			goto finish;
+		}
 		dout("do_request no mds or not active, waiting for map\n");
 		list_add(&req->r_wait, &mdsc->waiting_for_map);
 		goto out;
@@ -3683,11 +3688,86 @@
 	dout("mdsc_destroy %p done\n", mdsc);
 }
 
+void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+	struct ceph_fs_client *fsc = mdsc->fsc;
+	const char *mds_namespace = fsc->mount_options->mds_namespace;
+	void *p = msg->front.iov_base;
+	void *end = p + msg->front.iov_len;
+	u32 epoch;
+	u32 map_len;
+	u32 num_fs;
+	u32 mount_fscid = (u32)-1;
+	u8 struct_v, struct_cv;
+	int err = -EINVAL;
+
+	ceph_decode_need(&p, end, sizeof(u32), bad);
+	epoch = ceph_decode_32(&p);
+
+	dout("handle_fsmap epoch %u\n", epoch);
+
+	ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+	struct_v = ceph_decode_8(&p);
+	struct_cv = ceph_decode_8(&p);
+	map_len = ceph_decode_32(&p);
+
+	ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
+	p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
+
+	num_fs = ceph_decode_32(&p);
+	while (num_fs-- > 0) {
+		void *info_p, *info_end;
+		u32 info_len;
+		u8 info_v, info_cv;
+		u32 fscid, namelen;
+
+		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+		info_v = ceph_decode_8(&p);
+		info_cv = ceph_decode_8(&p);
+		info_len = ceph_decode_32(&p);
+		ceph_decode_need(&p, end, info_len, bad);
+		info_p = p;
+		info_end = p + info_len;
+		p = info_end;
+
+		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
+		fscid = ceph_decode_32(&info_p);
+		namelen = ceph_decode_32(&info_p);
+		ceph_decode_need(&info_p, info_end, namelen, bad);
+
+		if (mds_namespace &&
+		    strlen(mds_namespace) == namelen &&
+		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
+			mount_fscid = fscid;
+			break;
+		}
+	}
+
+	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
+	if (mount_fscid != (u32)-1) {
+		fsc->client->monc.fs_cluster_id = mount_fscid;
+		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
+				   0, true);
+		ceph_monc_renew_subs(&fsc->client->monc);
+	} else {
+		err = -ENOENT;
+		goto err_out;
+	}
+	return;
+bad:
+	pr_err("error decoding fsmap\n");
+err_out:
+	mutex_lock(&mdsc->mutex);
+	mdsc->mdsmap_err = -ENOENT;
+	__wake_requests(mdsc, &mdsc->waiting_for_map);
+	mutex_unlock(&mdsc->mutex);
+	return;
+}
 
 /*
  * handle mds map update.
  */
-void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
 	u32 epoch;
 	u32 maplen;
@@ -3794,7 +3874,10 @@
 
 	switch (type) {
 	case CEPH_MSG_MDS_MAP:
-		ceph_mdsc_handle_map(mdsc, msg);
+		ceph_mdsc_handle_mdsmap(mdsc, msg);
+		break;
+	case CEPH_MSG_FS_MAP_USER:
+		ceph_mdsc_handle_fsmap(mdsc, msg);
 		break;
 	case CEPH_MSG_CLIENT_SESSION:
 		handle_session(s, msg);