Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph changes from Sage Weil:
 "Lots of stuff this time around:

   - lots of cleanup and refactoring in the libceph messenger code, and
     many hard to hit races and bugs closed as a result.
   - lots of cleanup and refactoring in the rbd code from Alex Elder,
     mostly in preparation for the layering functionality that will be
     coming in 3.7.
   - some misc rbd cleanups from Josh Durgin that are finally going
     upstream
   - support for CRUSH tunables (used by newer clusters to improve the
     data placement)
   - some cleanup in our use of d_parent that Al brought up a while back
   - a random collection of fixes across the tree

  There is another patch coming that fixes up our ->atomic_open()
  behavior, but I'm going to hammer on it a bit more before sending it."

Fix up conflicts due to commits that were already committed earlier in
drivers/block/rbd.c, net/ceph/{messenger.c, osd_client.c}

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (132 commits)
  rbd: create rbd_refresh_helper()
  rbd: return obj version in __rbd_refresh_header()
  rbd: fixes in rbd_header_from_disk()
  rbd: always pass ops array to rbd_req_sync_op()
  rbd: pass null version pointer in add_snap()
  rbd: make rbd_create_rw_ops() return a pointer
  rbd: have __rbd_add_snap_dev() return a pointer
  libceph: recheck con state after allocating incoming message
  libceph: change ceph_con_in_msg_alloc convention to be less weird
  libceph: avoid dropping con mutex before fault
  libceph: verify state after retaking con lock after dispatch
  libceph: revoke mon_client messages on session restart
  libceph: fix handling of immediate socket connect failure
  ceph: update MAINTAINERS file
  libceph: be less chatty about stray replies
  libceph: clear all flags on con_close
  libceph: clean up con flags
  libceph: replace connection state bits with states
  libceph: drop unnecessary CLOSED check in socket state change callback
  libceph: close socket directly from ceph_con_close()
  ...
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index bcd88eb..3c17b62 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -35,8 +35,14 @@
 
 pool
 
-	The pool where this rbd image resides. The pool-name pair is unique
-	per rados system.
+	The name of the storage pool where this rbd image resides.
+	An rbd image name is unique within its pool.
+
+pool_id
+
+	The unique identifier for the rbd image's pool.  This is
+	a permanent attribute of the pool.  A pool's id will never
+	change.
 
 size
 
diff --git a/MAINTAINERS b/MAINTAINERS
index fb036a0..5b44872 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1789,15 +1789,16 @@
 F:	arch/powerpc/platforms/cell/
 
 CEPH DISTRIBUTED FILE SYSTEM CLIENT
-M:	Sage Weil <sage@newdream.net>
+M:	Sage Weil <sage@inktank.com>
 L:	ceph-devel@vger.kernel.org
-W:	http://ceph.newdream.net/
+W:	http://ceph.com/
 T:	git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
 S:	Supported
 F:	Documentation/filesystems/ceph.txt
 F:	fs/ceph
 F:	net/ceph
 F:	include/linux/ceph
+F:	include/linux/crush
 
 CERTIFIED WIRELESS USB (WUSB) SUBSYSTEM:
 L:	linux-usb@vger.kernel.org
@@ -5639,10 +5640,12 @@
 F:	arch/hexagon/
 
 RADOS BLOCK DEVICE (RBD)
-F:	include/linux/qnxtypes.h
-M:	Yehuda Sadeh <yehuda@hq.newdream.net>
-M:	Sage Weil <sage@newdream.net>
+M:	Yehuda Sadeh <yehuda@inktank.com>
+M:	Sage Weil <sage@inktank.com>
+M:	Alex Elder <elder@inktank.com>
 M:	ceph-devel@vger.kernel.org
+W:	http://ceph.com/
+T:	git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
 S:	Supported
 F:	drivers/block/rbd.c
 F:	drivers/block/rbd_types.h
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 8f428a8..9917943 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -55,8 +55,6 @@
 
 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
 
-#define RBD_MAX_MD_NAME_LEN	(RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
-#define RBD_MAX_POOL_NAME_LEN	64
 #define RBD_MAX_SNAP_NAME_LEN	32
 #define RBD_MAX_OPT_LEN		1024
 
@@ -78,13 +76,12 @@
  */
 struct rbd_image_header {
 	u64 image_size;
-	char block_name[32];
+	char *object_prefix;
 	__u8 obj_order;
 	__u8 crypt_type;
 	__u8 comp_type;
 	struct ceph_snap_context *snapc;
 	size_t snap_names_len;
-	u64 snap_seq;
 	u32 total_snaps;
 
 	char *snap_names;
@@ -150,7 +147,7 @@
  * a single device
  */
 struct rbd_device {
-	int			id;		/* blkdev unique id */
+	int			dev_id;		/* blkdev unique id */
 
 	int			major;		/* blkdev assigned major */
 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
@@ -163,20 +160,24 @@
 	spinlock_t		lock;		/* queue lock */
 
 	struct rbd_image_header	header;
-	char			obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
-	int			obj_len;
-	char			obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
-	char			pool_name[RBD_MAX_POOL_NAME_LEN];
-	int			poolid;
+	char			*image_name;
+	size_t			image_name_len;
+	char			*header_name;
+	char			*pool_name;
+	int			pool_id;
 
 	struct ceph_osd_event   *watch_event;
 	struct ceph_osd_request *watch_request;
 
 	/* protects updating the header */
 	struct rw_semaphore     header_rwsem;
-	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
+	/* name of the snapshot this device reads from */
+	char                    *snap_name;
+	/* id of the snapshot this device reads from */
 	u64                     snap_id;	/* current snapshot id */
-	int read_only;
+	/* whether the snap_id this device reads from still exists */
+	bool                    snap_exists;
+	int                     read_only;
 
 	struct list_head	node;
 
@@ -201,8 +202,7 @@
 			    struct device_attribute *attr,
 			    const char *buf,
 			    size_t count);
-static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
-				  struct rbd_snap *snap);
+static void __rbd_remove_snap_dev(struct rbd_snap *snap);
 
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 		       size_t count);
@@ -240,7 +240,7 @@
 	put_device(&rbd_dev->dev);
 }
 
-static int __rbd_refresh_header(struct rbd_device *rbd_dev);
+static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
@@ -273,9 +273,9 @@
 
 /*
  * Initialize an rbd client instance.
- * We own *opt.
+ * We own *ceph_opts.
  */
-static struct rbd_client *rbd_client_create(struct ceph_options *opt,
+static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
 					    struct rbd_options *rbd_opts)
 {
 	struct rbd_client *rbdc;
@@ -291,10 +291,10 @@
 
 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
 
-	rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
+	rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
 	if (IS_ERR(rbdc->client))
 		goto out_mutex;
-	opt = NULL; /* Now rbdc->client is responsible for opt */
+	ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
 
 	ret = ceph_open_session(rbdc->client);
 	if (ret < 0)
@@ -317,23 +317,23 @@
 	mutex_unlock(&ctl_mutex);
 	kfree(rbdc);
 out_opt:
-	if (opt)
-		ceph_destroy_options(opt);
+	if (ceph_opts)
+		ceph_destroy_options(ceph_opts);
 	return ERR_PTR(ret);
 }
 
 /*
  * Find a ceph client with specific addr and configuration.
  */
-static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
+static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
 {
 	struct rbd_client *client_node;
 
-	if (opt->flags & CEPH_OPT_NOSHARE)
+	if (ceph_opts->flags & CEPH_OPT_NOSHARE)
 		return NULL;
 
 	list_for_each_entry(client_node, &rbd_client_list, node)
-		if (ceph_compare_options(opt, client_node->client) == 0)
+		if (!ceph_compare_options(ceph_opts, client_node->client))
 			return client_node;
 	return NULL;
 }
@@ -349,7 +349,7 @@
 	/* string args above */
 };
 
-static match_table_t rbdopt_tokens = {
+static match_table_t rbd_opts_tokens = {
 	{Opt_notify_timeout, "notify_timeout=%d"},
 	/* int args above */
 	/* string args above */
@@ -358,11 +358,11 @@
 
 static int parse_rbd_opts_token(char *c, void *private)
 {
-	struct rbd_options *rbdopt = private;
+	struct rbd_options *rbd_opts = private;
 	substring_t argstr[MAX_OPT_ARGS];
 	int token, intval, ret;
 
-	token = match_token(c, rbdopt_tokens, argstr);
+	token = match_token(c, rbd_opts_tokens, argstr);
 	if (token < 0)
 		return -EINVAL;
 
@@ -383,7 +383,7 @@
 
 	switch (token) {
 	case Opt_notify_timeout:
-		rbdopt->notify_timeout = intval;
+		rbd_opts->notify_timeout = intval;
 		break;
 	default:
 		BUG_ON(token);
@@ -400,7 +400,7 @@
 					 char *options)
 {
 	struct rbd_client *rbdc;
-	struct ceph_options *opt;
+	struct ceph_options *ceph_opts;
 	struct rbd_options *rbd_opts;
 
 	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
@@ -409,29 +409,29 @@
 
 	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
 
-	opt = ceph_parse_options(options, mon_addr,
-				mon_addr + mon_addr_len,
-				parse_rbd_opts_token, rbd_opts);
-	if (IS_ERR(opt)) {
+	ceph_opts = ceph_parse_options(options, mon_addr,
+					mon_addr + mon_addr_len,
+					parse_rbd_opts_token, rbd_opts);
+	if (IS_ERR(ceph_opts)) {
 		kfree(rbd_opts);
-		return ERR_CAST(opt);
+		return ERR_CAST(ceph_opts);
 	}
 
 	spin_lock(&rbd_client_list_lock);
-	rbdc = __rbd_client_find(opt);
+	rbdc = __rbd_client_find(ceph_opts);
 	if (rbdc) {
 		/* using an existing client */
 		kref_get(&rbdc->kref);
 		spin_unlock(&rbd_client_list_lock);
 
-		ceph_destroy_options(opt);
+		ceph_destroy_options(ceph_opts);
 		kfree(rbd_opts);
 
 		return rbdc;
 	}
 	spin_unlock(&rbd_client_list_lock);
 
-	rbdc = rbd_client_create(opt, rbd_opts);
+	rbdc = rbd_client_create(ceph_opts, rbd_opts);
 
 	if (IS_ERR(rbdc))
 		kfree(rbd_opts);
@@ -480,46 +480,60 @@
 	kfree(coll);
 }
 
+static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
+{
+	return !memcmp(&ondisk->text,
+			RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
+}
+
 /*
  * Create a new header structure, translate header format from the on-disk
  * header.
  */
 static int rbd_header_from_disk(struct rbd_image_header *header,
 				 struct rbd_image_header_ondisk *ondisk,
-				 u32 allocated_snaps,
-				 gfp_t gfp_flags)
+				 u32 allocated_snaps)
 {
-	u32 i, snap_count;
+	u32 snap_count;
 
-	if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
+	if (!rbd_dev_ondisk_valid(ondisk))
 		return -ENXIO;
 
 	snap_count = le32_to_cpu(ondisk->snap_count);
-	if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
-			 / sizeof (*ondisk))
+	if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
+				 / sizeof (u64))
 		return -EINVAL;
 	header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
 				snap_count * sizeof(u64),
-				gfp_flags);
+				GFP_KERNEL);
 	if (!header->snapc)
 		return -ENOMEM;
 
-	header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
 	if (snap_count) {
+		header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
 		header->snap_names = kmalloc(header->snap_names_len,
-					     gfp_flags);
+					     GFP_KERNEL);
 		if (!header->snap_names)
 			goto err_snapc;
 		header->snap_sizes = kmalloc(snap_count * sizeof(u64),
-					     gfp_flags);
+					     GFP_KERNEL);
 		if (!header->snap_sizes)
 			goto err_names;
 	} else {
+		WARN_ON(ondisk->snap_names_len);
+		header->snap_names_len = 0;
 		header->snap_names = NULL;
 		header->snap_sizes = NULL;
 	}
-	memcpy(header->block_name, ondisk->block_name,
+
+	header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
+					GFP_KERNEL);
+	if (!header->object_prefix)
+		goto err_sizes;
+
+	memcpy(header->object_prefix, ondisk->block_name,
 	       sizeof(ondisk->block_name));
+	header->object_prefix[sizeof (ondisk->block_name)] = '\0';
 
 	header->image_size = le64_to_cpu(ondisk->image_size);
 	header->obj_order = ondisk->options.order;
@@ -527,11 +541,13 @@
 	header->comp_type = ondisk->options.comp_type;
 
 	atomic_set(&header->snapc->nref, 1);
-	header->snap_seq = le64_to_cpu(ondisk->snap_seq);
+	header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
 	header->snapc->num_snaps = snap_count;
 	header->total_snaps = snap_count;
 
 	if (snap_count && allocated_snaps == snap_count) {
+		int i;
+
 		for (i = 0; i < snap_count; i++) {
 			header->snapc->snaps[i] =
 				le64_to_cpu(ondisk->snaps[i].id);
@@ -540,16 +556,22 @@
 		}
 
 		/* copy snapshot names */
-		memcpy(header->snap_names, &ondisk->snaps[i],
+		memcpy(header->snap_names, &ondisk->snaps[snap_count],
 			header->snap_names_len);
 	}
 
 	return 0;
 
+err_sizes:
+	kfree(header->snap_sizes);
+	header->snap_sizes = NULL;
 err_names:
 	kfree(header->snap_names);
+	header->snap_names = NULL;
 err_snapc:
 	kfree(header->snapc);
+	header->snapc = NULL;
+
 	return -ENOMEM;
 }
 
@@ -575,52 +597,50 @@
 	return -ENOENT;
 }
 
-static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
+static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
 {
-	struct rbd_image_header *header = &dev->header;
-	struct ceph_snap_context *snapc = header->snapc;
-	int ret = -ENOENT;
+	int ret;
 
-	BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
+	down_write(&rbd_dev->header_rwsem);
 
-	down_write(&dev->header_rwsem);
-
-	if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
+	if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
 		    sizeof (RBD_SNAP_HEAD_NAME))) {
-		if (header->total_snaps)
-			snapc->seq = header->snap_seq;
-		else
-			snapc->seq = 0;
-		dev->snap_id = CEPH_NOSNAP;
-		dev->read_only = 0;
+		rbd_dev->snap_id = CEPH_NOSNAP;
+		rbd_dev->snap_exists = false;
+		rbd_dev->read_only = 0;
 		if (size)
-			*size = header->image_size;
+			*size = rbd_dev->header.image_size;
 	} else {
-		ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
+		u64 snap_id = 0;
+
+		ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
+					&snap_id, size);
 		if (ret < 0)
 			goto done;
-		dev->snap_id = snapc->seq;
-		dev->read_only = 1;
+		rbd_dev->snap_id = snap_id;
+		rbd_dev->snap_exists = true;
+		rbd_dev->read_only = 1;
 	}
 
 	ret = 0;
 done:
-	up_write(&dev->header_rwsem);
+	up_write(&rbd_dev->header_rwsem);
 	return ret;
 }
 
 static void rbd_header_free(struct rbd_image_header *header)
 {
-	kfree(header->snapc);
-	kfree(header->snap_names);
+	kfree(header->object_prefix);
 	kfree(header->snap_sizes);
+	kfree(header->snap_names);
+	ceph_put_snap_context(header->snapc);
 }
 
 /*
  * get the actual striped segment name, offset and length
  */
 static u64 rbd_get_segment(struct rbd_image_header *header,
-			   const char *block_name,
+			   const char *object_prefix,
 			   u64 ofs, u64 len,
 			   char *seg_name, u64 *segofs)
 {
@@ -628,7 +648,7 @@
 
 	if (seg_name)
 		snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
-			 "%s.%012llx", block_name, seg);
+			 "%s.%012llx", object_prefix, seg);
 
 	ofs = ofs & ((1 << header->obj_order) - 1);
 	len = min_t(u64, len, (1 << header->obj_order) - ofs);
@@ -726,9 +746,8 @@
 			 * split_bio will BUG_ON if this is not the case
 			 */
 			dout("bio_chain_clone split! total=%d remaining=%d"
-			     "bi_size=%d\n",
-			     (int)total, (int)len-total,
-			     (int)old_chain->bi_size);
+			     "bi_size=%u\n",
+			     total, len - total, old_chain->bi_size);
 
 			/* split the bio. We'll release it either in the next
 			   call, or it will have to be released outside */
@@ -777,22 +796,24 @@
 /*
  * helpers for osd request op vectors.
  */
-static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
-			    int num_ops,
-			    int opcode,
-			    u32 payload_len)
+static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
+					int opcode, u32 payload_len)
 {
-	*ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
-		       GFP_NOIO);
-	if (!*ops)
-		return -ENOMEM;
-	(*ops)[0].op = opcode;
+	struct ceph_osd_req_op *ops;
+
+	ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
+	if (!ops)
+		return NULL;
+
+	ops[0].op = opcode;
+
 	/*
 	 * op extent offset and length will be set later on
 	 * in calc_raw_layout()
 	 */
-	(*ops)[0].payload_len = payload_len;
-	return 0;
+	ops[0].payload_len = payload_len;
+
+	return ops;
 }
 
 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
@@ -808,8 +829,8 @@
 	struct request_queue *q;
 	int min, max, i;
 
-	dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
-	     coll, index, ret, len);
+	dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
+	     coll, index, ret, (unsigned long long) len);
 
 	if (!rq)
 		return;
@@ -848,16 +869,15 @@
  * Send ceph osd request
  */
 static int rbd_do_request(struct request *rq,
-			  struct rbd_device *dev,
+			  struct rbd_device *rbd_dev,
 			  struct ceph_snap_context *snapc,
 			  u64 snapid,
-			  const char *obj, u64 ofs, u64 len,
+			  const char *object_name, u64 ofs, u64 len,
 			  struct bio *bio,
 			  struct page **pages,
 			  int num_pages,
 			  int flags,
 			  struct ceph_osd_req_op *ops,
-			  int num_reply,
 			  struct rbd_req_coll *coll,
 			  int coll_index,
 			  void (*rbd_cb)(struct ceph_osd_request *req,
@@ -887,15 +907,13 @@
 		req_data->coll_index = coll_index;
 	}
 
-	dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
+	dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
+		(unsigned long long) ofs, (unsigned long long) len);
 
-	down_read(&dev->header_rwsem);
-
-	osdc = &dev->rbd_client->client->osdc;
+	osdc = &rbd_dev->rbd_client->client->osdc;
 	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
 					false, GFP_NOIO, pages, bio);
 	if (!req) {
-		up_read(&dev->header_rwsem);
 		ret = -ENOMEM;
 		goto done_pages;
 	}
@@ -912,7 +930,7 @@
 	reqhead = req->r_request->front.iov_base;
 	reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
 
-	strncpy(req->r_oid, obj, sizeof(req->r_oid));
+	strncpy(req->r_oid, object_name, sizeof(req->r_oid));
 	req->r_oid_len = strlen(req->r_oid);
 
 	layout = &req->r_file_layout;
@@ -920,7 +938,7 @@
 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 	layout->fl_stripe_count = cpu_to_le32(1);
 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-	layout->fl_pg_pool = cpu_to_le32(dev->poolid);
+	layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
 	ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
 				req, ops);
 
@@ -929,7 +947,6 @@
 				snapc,
 				&mtime,
 				req->r_oid, req->r_oid_len);
-	up_read(&dev->header_rwsem);
 
 	if (linger_req) {
 		ceph_osdc_set_request_linger(osdc, req);
@@ -944,8 +961,9 @@
 		ret = ceph_osdc_wait_request(osdc, req);
 		if (ver)
 			*ver = le64_to_cpu(req->r_reassert_version.version);
-		dout("reassert_ver=%lld\n",
-		     le64_to_cpu(req->r_reassert_version.version));
+		dout("reassert_ver=%llu\n",
+			(unsigned long long)
+				le64_to_cpu(req->r_reassert_version.version));
 		ceph_osdc_put_request(req);
 	}
 	return ret;
@@ -979,7 +997,8 @@
 	bytes = le64_to_cpu(op->extent.length);
 	read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
 
-	dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
+	dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
+		(unsigned long long) bytes, read_op, (int) rc);
 
 	if (rc == -ENOENT && read_op) {
 		zero_bio_chain(req_data->bio, 0);
@@ -1006,14 +1025,12 @@
 /*
  * Do a synchronous ceph osd operation
  */
-static int rbd_req_sync_op(struct rbd_device *dev,
+static int rbd_req_sync_op(struct rbd_device *rbd_dev,
 			   struct ceph_snap_context *snapc,
 			   u64 snapid,
-			   int opcode,
 			   int flags,
-			   struct ceph_osd_req_op *orig_ops,
-			   int num_reply,
-			   const char *obj,
+			   struct ceph_osd_req_op *ops,
+			   const char *object_name,
 			   u64 ofs, u64 len,
 			   char *buf,
 			   struct ceph_osd_request **linger_req,
@@ -1022,45 +1039,28 @@
 	int ret;
 	struct page **pages;
 	int num_pages;
-	struct ceph_osd_req_op *ops = orig_ops;
-	u32 payload_len;
+
+	BUG_ON(ops == NULL);
 
 	num_pages = calc_pages_for(ofs , len);
 	pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
 	if (IS_ERR(pages))
 		return PTR_ERR(pages);
 
-	if (!orig_ops) {
-		payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
-		ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
-		if (ret < 0)
-			goto done;
-
-		if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
-			ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
-			if (ret < 0)
-				goto done_ops;
-		}
-	}
-
-	ret = rbd_do_request(NULL, dev, snapc, snapid,
-			  obj, ofs, len, NULL,
+	ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
+			  object_name, ofs, len, NULL,
 			  pages, num_pages,
 			  flags,
 			  ops,
-			  2,
 			  NULL, 0,
 			  NULL,
 			  linger_req, ver);
 	if (ret < 0)
-		goto done_ops;
+		goto done;
 
 	if ((flags & CEPH_OSD_FLAG_READ) && buf)
 		ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
 
-done_ops:
-	if (!orig_ops)
-		rbd_destroy_ops(ops);
 done:
 	ceph_release_page_vector(pages, num_pages);
 	return ret;
@@ -1070,10 +1070,10 @@
  * Do an asynchronous ceph osd operation
  */
 static int rbd_do_op(struct request *rq,
-		     struct rbd_device *rbd_dev ,
+		     struct rbd_device *rbd_dev,
 		     struct ceph_snap_context *snapc,
 		     u64 snapid,
-		     int opcode, int flags, int num_reply,
+		     int opcode, int flags,
 		     u64 ofs, u64 len,
 		     struct bio *bio,
 		     struct rbd_req_coll *coll,
@@ -1091,14 +1091,15 @@
 		return -ENOMEM;
 
 	seg_len = rbd_get_segment(&rbd_dev->header,
-				  rbd_dev->header.block_name,
+				  rbd_dev->header.object_prefix,
 				  ofs, len,
 				  seg_name, &seg_ofs);
 
 	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
 
-	ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
-	if (ret < 0)
+	ret = -ENOMEM;
+	ops = rbd_create_rw_ops(1, opcode, payload_len);
+	if (!ops)
 		goto done;
 
 	/* we've taken care of segment sizes earlier when we
@@ -1112,7 +1113,6 @@
 			     NULL, 0,
 			     flags,
 			     ops,
-			     num_reply,
 			     coll, coll_index,
 			     rbd_req_cb, 0, NULL);
 
@@ -1136,7 +1136,6 @@
 	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
 			 CEPH_OSD_OP_WRITE,
 			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-			 2,
 			 ofs, len, bio, coll, coll_index);
 }
 
@@ -1155,55 +1154,58 @@
 			 snapid,
 			 CEPH_OSD_OP_READ,
 			 CEPH_OSD_FLAG_READ,
-			 2,
 			 ofs, len, bio, coll, coll_index);
 }
 
 /*
  * Request sync osd read
  */
-static int rbd_req_sync_read(struct rbd_device *dev,
-			  struct ceph_snap_context *snapc,
+static int rbd_req_sync_read(struct rbd_device *rbd_dev,
 			  u64 snapid,
-			  const char *obj,
+			  const char *object_name,
 			  u64 ofs, u64 len,
 			  char *buf,
 			  u64 *ver)
 {
-	return rbd_req_sync_op(dev, NULL,
+	struct ceph_osd_req_op *ops;
+	int ret;
+
+	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
+	if (!ops)
+		return -ENOMEM;
+
+	ret = rbd_req_sync_op(rbd_dev, NULL,
 			       snapid,
-			       CEPH_OSD_OP_READ,
 			       CEPH_OSD_FLAG_READ,
-			       NULL,
-			       1, obj, ofs, len, buf, NULL, ver);
+			       ops, object_name, ofs, len, buf, NULL, ver);
+	rbd_destroy_ops(ops);
+
+	return ret;
 }
 
 /*
  * Request sync osd watch
  */
-static int rbd_req_sync_notify_ack(struct rbd_device *dev,
+static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
 				   u64 ver,
-				   u64 notify_id,
-				   const char *obj)
+				   u64 notify_id)
 {
 	struct ceph_osd_req_op *ops;
-	struct page **pages = NULL;
 	int ret;
 
-	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
-	if (ret < 0)
-		return ret;
+	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
+	if (!ops)
+		return -ENOMEM;
 
-	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
+	ops[0].watch.ver = cpu_to_le64(ver);
 	ops[0].watch.cookie = notify_id;
 	ops[0].watch.flag = 0;
 
-	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
-			  obj, 0, 0, NULL,
-			  pages, 0,
+	ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
+			  rbd_dev->header_name, 0, 0, NULL,
+			  NULL, 0,
 			  CEPH_OSD_FLAG_READ,
 			  ops,
-			  1,
 			  NULL, 0,
 			  rbd_simple_req_cb, 0, NULL);
 
@@ -1213,54 +1215,53 @@
 
 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 {
-	struct rbd_device *dev = (struct rbd_device *)data;
+	struct rbd_device *rbd_dev = (struct rbd_device *)data;
+	u64 hver;
 	int rc;
 
-	if (!dev)
+	if (!rbd_dev)
 		return;
 
-	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
-		notify_id, (int)opcode);
-	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-	rc = __rbd_refresh_header(dev);
-	mutex_unlock(&ctl_mutex);
+	dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
+		rbd_dev->header_name, (unsigned long long) notify_id,
+		(unsigned int) opcode);
+	rc = rbd_refresh_header(rbd_dev, &hver);
 	if (rc)
 		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
-			   " update snaps: %d\n", dev->major, rc);
+			   " update snaps: %d\n", rbd_dev->major, rc);
 
-	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
+	rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
 }
 
 /*
  * Request sync osd watch
  */
-static int rbd_req_sync_watch(struct rbd_device *dev,
-			      const char *obj,
-			      u64 ver)
+static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_req_op *ops;
-	struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	int ret;
 
-	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
-	if (ret < 0)
-		return ret;
+	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
+	if (!ops)
+		return -ENOMEM;
 
 	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
-				     (void *)dev, &dev->watch_event);
+				     (void *)rbd_dev, &rbd_dev->watch_event);
 	if (ret < 0)
 		goto fail;
 
-	ops[0].watch.ver = cpu_to_le64(ver);
-	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
+	ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
+	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
 	ops[0].watch.flag = 1;
 
-	ret = rbd_req_sync_op(dev, NULL,
+	ret = rbd_req_sync_op(rbd_dev, NULL,
 			      CEPH_NOSNAP,
-			      0,
 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
 			      ops,
-			      1, obj, 0, 0, NULL,
-			      &dev->watch_request, NULL);
+			      rbd_dev->header_name,
+			      0, 0, NULL,
+			      &rbd_dev->watch_request, NULL);
 
 	if (ret < 0)
 		goto fail_event;
@@ -1269,8 +1270,8 @@
 	return 0;
 
 fail_event:
-	ceph_osdc_cancel_event(dev->watch_event);
-	dev->watch_event = NULL;
+	ceph_osdc_cancel_event(rbd_dev->watch_event);
+	rbd_dev->watch_event = NULL;
 fail:
 	rbd_destroy_ops(ops);
 	return ret;
@@ -1279,64 +1280,65 @@
 /*
  * Request sync osd unwatch
  */
-static int rbd_req_sync_unwatch(struct rbd_device *dev,
-				const char *obj)
+static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_req_op *ops;
+	int ret;
 
-	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
-	if (ret < 0)
-		return ret;
+	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
+	if (!ops)
+		return -ENOMEM;
 
 	ops[0].watch.ver = 0;
-	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
+	ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
 	ops[0].watch.flag = 0;
 
-	ret = rbd_req_sync_op(dev, NULL,
+	ret = rbd_req_sync_op(rbd_dev, NULL,
 			      CEPH_NOSNAP,
-			      0,
 			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
 			      ops,
-			      1, obj, 0, 0, NULL, NULL, NULL);
+			      rbd_dev->header_name,
+			      0, 0, NULL, NULL, NULL);
+
 
 	rbd_destroy_ops(ops);
-	ceph_osdc_cancel_event(dev->watch_event);
-	dev->watch_event = NULL;
+	ceph_osdc_cancel_event(rbd_dev->watch_event);
+	rbd_dev->watch_event = NULL;
 	return ret;
 }
 
 struct rbd_notify_info {
-	struct rbd_device *dev;
+	struct rbd_device *rbd_dev;
 };
 
 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
 {
-	struct rbd_device *dev = (struct rbd_device *)data;
-	if (!dev)
+	struct rbd_device *rbd_dev = (struct rbd_device *)data;
+	if (!rbd_dev)
 		return;
 
-	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
-		notify_id, (int)opcode);
+	dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
+			rbd_dev->header_name, (unsigned long long) notify_id,
+			(unsigned int) opcode);
 }
 
 /*
  * Request sync osd notify
  */
-static int rbd_req_sync_notify(struct rbd_device *dev,
-		          const char *obj)
+static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_req_op *ops;
-	struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	struct ceph_osd_event *event;
 	struct rbd_notify_info info;
 	int payload_len = sizeof(u32) + sizeof(u32);
 	int ret;
 
-	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
-	if (ret < 0)
-		return ret;
+	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
+	if (!ops)
+		return -ENOMEM;
 
-	info.dev = dev;
+	info.rbd_dev = rbd_dev;
 
 	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
 				     (void *)&info, &event);
@@ -1349,12 +1351,12 @@
 	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
 	ops[0].watch.timeout = 12;
 
-	ret = rbd_req_sync_op(dev, NULL,
+	ret = rbd_req_sync_op(rbd_dev, NULL,
 			       CEPH_NOSNAP,
-			       0,
 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
 			       ops,
-			       1, obj, 0, 0, NULL, NULL, NULL);
+			       rbd_dev->header_name,
+			       0, 0, NULL, NULL, NULL);
 	if (ret < 0)
 		goto fail_event;
 
@@ -1373,36 +1375,37 @@
 /*
  * Request sync osd read
  */
-static int rbd_req_sync_exec(struct rbd_device *dev,
-			     const char *obj,
-			     const char *cls,
-			     const char *method,
+static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
+			     const char *object_name,
+			     const char *class_name,
+			     const char *method_name,
 			     const char *data,
 			     int len,
 			     u64 *ver)
 {
 	struct ceph_osd_req_op *ops;
-	int cls_len = strlen(cls);
-	int method_len = strlen(method);
-	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
-				    cls_len + method_len + len);
-	if (ret < 0)
-		return ret;
+	int class_name_len = strlen(class_name);
+	int method_name_len = strlen(method_name);
+	int ret;
 
-	ops[0].cls.class_name = cls;
-	ops[0].cls.class_len = (__u8)cls_len;
-	ops[0].cls.method_name = method;
-	ops[0].cls.method_len = (__u8)method_len;
+	ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
+				    class_name_len + method_name_len + len);
+	if (!ops)
+		return -ENOMEM;
+
+	ops[0].cls.class_name = class_name;
+	ops[0].cls.class_len = (__u8) class_name_len;
+	ops[0].cls.method_name = method_name;
+	ops[0].cls.method_len = (__u8) method_name_len;
 	ops[0].cls.argc = 0;
 	ops[0].cls.indata = data;
 	ops[0].cls.indata_len = len;
 
-	ret = rbd_req_sync_op(dev, NULL,
+	ret = rbd_req_sync_op(rbd_dev, NULL,
 			       CEPH_NOSNAP,
-			       0,
 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
 			       ops,
-			       1, obj, 0, 0, NULL, NULL, ver);
+			       object_name, 0, 0, NULL, NULL, ver);
 
 	rbd_destroy_ops(ops);
 
@@ -1437,10 +1440,12 @@
 		struct bio *bio;
 		struct bio *rq_bio, *next_bio = NULL;
 		bool do_write;
-		int size, op_size = 0;
+		unsigned int size;
+		u64 op_size = 0;
 		u64 ofs;
 		int num_segs, cur_seg = 0;
 		struct rbd_req_coll *coll;
+		struct ceph_snap_context *snapc;
 
 		/* peek at request from block layer */
 		if (!rq)
@@ -1467,23 +1472,38 @@
 
 		spin_unlock_irq(q->queue_lock);
 
+		down_read(&rbd_dev->header_rwsem);
+
+		if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
+			up_read(&rbd_dev->header_rwsem);
+			dout("request for non-existent snapshot");
+			spin_lock_irq(q->queue_lock);
+			__blk_end_request_all(rq, -ENXIO);
+			continue;
+		}
+
+		snapc = ceph_get_snap_context(rbd_dev->header.snapc);
+
+		up_read(&rbd_dev->header_rwsem);
+
 		dout("%s 0x%x bytes at 0x%llx\n",
 		     do_write ? "write" : "read",
-		     size, blk_rq_pos(rq) * SECTOR_SIZE);
+		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
 
 		num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
 		coll = rbd_alloc_coll(num_segs);
 		if (!coll) {
 			spin_lock_irq(q->queue_lock);
 			__blk_end_request_all(rq, -ENOMEM);
+			ceph_put_snap_context(snapc);
 			continue;
 		}
 
 		do {
 			/* a bio clone to be passed down to OSD req */
-			dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
+			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
 			op_size = rbd_get_segment(&rbd_dev->header,
-						  rbd_dev->header.block_name,
+						  rbd_dev->header.object_prefix,
 						  ofs, size,
 						  NULL, NULL);
 			kref_get(&coll->kref);
@@ -1499,7 +1519,7 @@
 			/* init OSD command: write or read */
 			if (do_write)
 				rbd_req_write(rq, rbd_dev,
-					      rbd_dev->header.snapc,
+					      snapc,
 					      ofs,
 					      op_size, bio,
 					      coll, cur_seg);
@@ -1522,6 +1542,8 @@
 		if (bp)
 			bio_pair_release(bp);
 		spin_lock_irq(q->queue_lock);
+
+		ceph_put_snap_context(snapc);
 	}
 }
 
@@ -1592,18 +1614,19 @@
 			return -ENOMEM;
 
 		rc = rbd_req_sync_read(rbd_dev,
-				       NULL, CEPH_NOSNAP,
-				       rbd_dev->obj_md_name,
+				       CEPH_NOSNAP,
+				       rbd_dev->header_name,
 				       0, len,
 				       (char *)dh, &ver);
 		if (rc < 0)
 			goto out_dh;
 
-		rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
+		rc = rbd_header_from_disk(header, dh, snap_count);
 		if (rc < 0) {
 			if (rc == -ENXIO)
 				pr_warning("unrecognized header format"
-					   " for image %s", rbd_dev->obj);
+					   " for image %s\n",
+					   rbd_dev->image_name);
 			goto out_dh;
 		}
 
@@ -1628,7 +1651,7 @@
 /*
  * create a snapshot
  */
-static int rbd_header_add_snap(struct rbd_device *dev,
+static int rbd_header_add_snap(struct rbd_device *rbd_dev,
 			       const char *snap_name,
 			       gfp_t gfp_flags)
 {
@@ -1636,16 +1659,15 @@
 	u64 new_snapid;
 	int ret;
 	void *data, *p, *e;
-	u64 ver;
 	struct ceph_mon_client *monc;
 
 	/* we should create a snapshot only if we're pointing at the head */
-	if (dev->snap_id != CEPH_NOSNAP)
+	if (rbd_dev->snap_id != CEPH_NOSNAP)
 		return -EINVAL;
 
-	monc = &dev->rbd_client->client->monc;
-	ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
-	dout("created snapid=%lld\n", new_snapid);
+	monc = &rbd_dev->rbd_client->client->monc;
+	ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
+	dout("created snapid=%llu\n", (unsigned long long) new_snapid);
 	if (ret < 0)
 		return ret;
 
@@ -1659,19 +1681,13 @@
 	ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
 	ceph_encode_64_safe(&p, e, new_snapid, bad);
 
-	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
-				data, p - data, &ver);
+	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+				"rbd", "snap_add",
+				data, p - data, NULL);
 
 	kfree(data);
 
-	if (ret < 0)
-		return ret;
-
-	down_write(&dev->header_rwsem);
-	dev->header.snapc->seq = new_snapid;
-	up_write(&dev->header_rwsem);
-
-	return 0;
+	return ret < 0 ? ret : 0;
 bad:
 	return -ERANGE;
 }
@@ -1679,52 +1695,52 @@
 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
 {
 	struct rbd_snap *snap;
+	struct rbd_snap *next;
 
-	while (!list_empty(&rbd_dev->snaps)) {
-		snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
-		__rbd_remove_snap_dev(rbd_dev, snap);
-	}
+	list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
+		__rbd_remove_snap_dev(snap);
 }
 
 /*
  * only read the first part of the ondisk header, without the snaps info
  */
-static int __rbd_refresh_header(struct rbd_device *rbd_dev)
+static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
 {
 	int ret;
 	struct rbd_image_header h;
-	u64 snap_seq;
-	int follow_seq = 0;
 
 	ret = rbd_read_header(rbd_dev, &h);
 	if (ret < 0)
 		return ret;
 
-	/* resized? */
-	set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
-
 	down_write(&rbd_dev->header_rwsem);
 
-	snap_seq = rbd_dev->header.snapc->seq;
-	if (rbd_dev->header.total_snaps &&
-	    rbd_dev->header.snapc->snaps[0] == snap_seq)
-		/* pointing at the head, will need to follow that
-		   if head moves */
-		follow_seq = 1;
+	/* resized? */
+	if (rbd_dev->snap_id == CEPH_NOSNAP) {
+		sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
 
-	kfree(rbd_dev->header.snapc);
-	kfree(rbd_dev->header.snap_names);
+		dout("setting size to %llu sectors", (unsigned long long) size);
+		set_capacity(rbd_dev->disk, size);
+	}
+
+	/* rbd_dev->header.object_prefix shouldn't change */
 	kfree(rbd_dev->header.snap_sizes);
+	kfree(rbd_dev->header.snap_names);
+	/* osd requests may still refer to snapc */
+	ceph_put_snap_context(rbd_dev->header.snapc);
 
+	if (hver)
+		*hver = h.obj_version;
+	rbd_dev->header.obj_version = h.obj_version;
+	rbd_dev->header.image_size = h.image_size;
 	rbd_dev->header.total_snaps = h.total_snaps;
 	rbd_dev->header.snapc = h.snapc;
 	rbd_dev->header.snap_names = h.snap_names;
 	rbd_dev->header.snap_names_len = h.snap_names_len;
 	rbd_dev->header.snap_sizes = h.snap_sizes;
-	if (follow_seq)
-		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
-	else
-		rbd_dev->header.snapc->seq = snap_seq;
+	/* Free the extra copy of the object prefix */
+	WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
+	kfree(h.object_prefix);
 
 	ret = __rbd_init_snaps_header(rbd_dev);
 
@@ -1733,6 +1749,17 @@
 	return ret;
 }
 
+static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+{
+	int ret;
+
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+	ret = __rbd_refresh_header(rbd_dev, hver);
+	mutex_unlock(&ctl_mutex);
+
+	return ret;
+}
+
 static int rbd_init_disk(struct rbd_device *rbd_dev)
 {
 	struct gendisk *disk;
@@ -1762,7 +1789,7 @@
 		goto out;
 
 	snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
-		 rbd_dev->id);
+		 rbd_dev->dev_id);
 	disk->major = rbd_dev->major;
 	disk->first_minor = 0;
 	disk->fops = &rbd_bd_ops;
@@ -1819,8 +1846,13 @@
 			     struct device_attribute *attr, char *buf)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+	sector_t size;
 
-	return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
+	down_read(&rbd_dev->header_rwsem);
+	size = get_capacity(rbd_dev->disk);
+	up_read(&rbd_dev->header_rwsem);
+
+	return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
 }
 
 static ssize_t rbd_major_show(struct device *dev,
@@ -1848,12 +1880,20 @@
 	return sprintf(buf, "%s\n", rbd_dev->pool_name);
 }
 
+static ssize_t rbd_pool_id_show(struct device *dev,
+			     struct device_attribute *attr, char *buf)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+	return sprintf(buf, "%d\n", rbd_dev->pool_id);
+}
+
 static ssize_t rbd_name_show(struct device *dev,
 			     struct device_attribute *attr, char *buf)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "%s\n", rbd_dev->obj);
+	return sprintf(buf, "%s\n", rbd_dev->image_name);
 }
 
 static ssize_t rbd_snap_show(struct device *dev,
@@ -1871,23 +1911,18 @@
 				 size_t size)
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-	int rc;
-	int ret = size;
+	int ret;
 
-	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+	ret = rbd_refresh_header(rbd_dev, NULL);
 
-	rc = __rbd_refresh_header(rbd_dev);
-	if (rc < 0)
-		ret = rc;
-
-	mutex_unlock(&ctl_mutex);
-	return ret;
+	return ret < 0 ? ret : size;
 }
 
 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
+static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
@@ -1898,6 +1933,7 @@
 	&dev_attr_major.attr,
 	&dev_attr_client_id.attr,
 	&dev_attr_pool.attr,
+	&dev_attr_pool_id.attr,
 	&dev_attr_name.attr,
 	&dev_attr_current_snap.attr,
 	&dev_attr_refresh.attr,
@@ -1977,15 +2013,13 @@
 	.release	= rbd_snap_dev_release,
 };
 
-static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
-				  struct rbd_snap *snap)
+static void __rbd_remove_snap_dev(struct rbd_snap *snap)
 {
 	list_del(&snap->node);
 	device_unregister(&snap->dev);
 }
 
-static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
-				  struct rbd_snap *snap,
+static int rbd_register_snap_dev(struct rbd_snap *snap,
 				  struct device *parent)
 {
 	struct device *dev = &snap->dev;
@@ -2000,29 +2034,36 @@
 	return ret;
 }
 
-static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
-			      int i, const char *name,
-			      struct rbd_snap **snapp)
+static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
+					      int i, const char *name)
 {
+	struct rbd_snap *snap;
 	int ret;
-	struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
+
+	snap = kzalloc(sizeof (*snap), GFP_KERNEL);
 	if (!snap)
-		return -ENOMEM;
+		return ERR_PTR(-ENOMEM);
+
+	ret = -ENOMEM;
 	snap->name = kstrdup(name, GFP_KERNEL);
+	if (!snap->name)
+		goto err;
+
 	snap->size = rbd_dev->header.snap_sizes[i];
 	snap->id = rbd_dev->header.snapc->snaps[i];
 	if (device_is_registered(&rbd_dev->dev)) {
-		ret = rbd_register_snap_dev(rbd_dev, snap,
-					     &rbd_dev->dev);
+		ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
 		if (ret < 0)
 			goto err;
 	}
-	*snapp = snap;
-	return 0;
+
+	return snap;
+
 err:
 	kfree(snap->name);
 	kfree(snap);
-	return ret;
+
+	return ERR_PTR(ret);
 }
 
 /*
@@ -2055,7 +2096,6 @@
 	const char *name, *first_name;
 	int i = rbd_dev->header.total_snaps;
 	struct rbd_snap *snap, *old_snap = NULL;
-	int ret;
 	struct list_head *p, *n;
 
 	first_name = rbd_dev->header.snap_names;
@@ -2070,8 +2110,15 @@
 			cur_id = rbd_dev->header.snapc->snaps[i - 1];
 
 		if (!i || old_snap->id < cur_id) {
-			/* old_snap->id was skipped, thus was removed */
-			__rbd_remove_snap_dev(rbd_dev, old_snap);
+			/*
+			 * old_snap->id was skipped, thus was
+			 * removed.  If this rbd_dev is mapped to
+			 * the removed snapshot, record that it no
+			 * longer exists, to prevent further I/O.
+			 */
+			if (rbd_dev->snap_id == old_snap->id)
+				rbd_dev->snap_exists = false;
+			__rbd_remove_snap_dev(old_snap);
 			continue;
 		}
 		if (old_snap->id == cur_id) {
@@ -2091,9 +2138,9 @@
 			if (cur_id >= old_snap->id)
 				break;
 			/* a new snapshot */
-			ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
-			if (ret < 0)
-				return ret;
+			snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
+			if (IS_ERR(snap))
+				return PTR_ERR(snap);
 
 			/* note that we add it backward so using n and not p */
 			list_add(&snap->node, n);
@@ -2107,9 +2154,9 @@
 			WARN_ON(1);
 			return -EINVAL;
 		}
-		ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
-		if (ret < 0)
-			return ret;
+		snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
+		if (IS_ERR(snap))
+			return PTR_ERR(snap);
 		list_add(&snap->node, &rbd_dev->snaps);
 	}
 
@@ -2129,14 +2176,13 @@
 	dev->type = &rbd_device_type;
 	dev->parent = &rbd_root_dev;
 	dev->release = rbd_dev_release;
-	dev_set_name(dev, "%d", rbd_dev->id);
+	dev_set_name(dev, "%d", rbd_dev->dev_id);
 	ret = device_register(dev);
 	if (ret < 0)
 		goto out;
 
 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
-		ret = rbd_register_snap_dev(rbd_dev, snap,
-					     &rbd_dev->dev);
+		ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
 		if (ret < 0)
 			break;
 	}
@@ -2155,12 +2201,9 @@
 	int ret, rc;
 
 	do {
-		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
-					 rbd_dev->header.obj_version);
+		ret = rbd_req_sync_watch(rbd_dev);
 		if (ret == -ERANGE) {
-			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-			rc = __rbd_refresh_header(rbd_dev);
-			mutex_unlock(&ctl_mutex);
+			rc = rbd_refresh_header(rbd_dev, NULL);
 			if (rc < 0)
 				return rc;
 		}
@@ -2177,7 +2220,7 @@
  */
 static void rbd_id_get(struct rbd_device *rbd_dev)
 {
-	rbd_dev->id = atomic64_inc_return(&rbd_id_max);
+	rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
 
 	spin_lock(&rbd_dev_list_lock);
 	list_add_tail(&rbd_dev->node, &rbd_dev_list);
@@ -2191,7 +2234,7 @@
 static void rbd_id_put(struct rbd_device *rbd_dev)
 {
 	struct list_head *tmp;
-	int rbd_id = rbd_dev->id;
+	int rbd_id = rbd_dev->dev_id;
 	int max_id;
 
 	BUG_ON(rbd_id < 1);
@@ -2282,19 +2325,58 @@
 }
 
 /*
- * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
+ * Finds the next token in *buf, dynamically allocates a buffer big
+ * enough to hold a copy of it, and copies the token into the new
+ * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
+ * that a duplicate buffer is created even for a zero-length token.
+ *
+ * Returns a pointer to the newly-allocated duplicate, or a null
+ * pointer if memory for the duplicate was not available.  If
+ * the lenp argument is a non-null pointer, the length of the token
+ * (not including the '\0') is returned in *lenp.
+ *
+ * If successful, the *buf pointer will be updated to point beyond
+ * the end of the found token.
+ *
+ * Note: uses GFP_KERNEL for allocation.
+ */
+static inline char *dup_token(const char **buf, size_t *lenp)
+{
+	char *dup;
+	size_t len;
+
+	len = next_token(buf);
+	dup = kmalloc(len + 1, GFP_KERNEL);
+	if (!dup)
+		return NULL;
+
+	memcpy(dup, *buf, len);
+	*(dup + len) = '\0';
+	*buf += len;
+
+	if (lenp)
+		*lenp = len;
+
+	return dup;
+}
+
+/*
+ * This fills in the pool_name, image_name, image_name_len, snap_name,
  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
  * on the list of monitor addresses and other options provided via
  * /sys/bus/rbd/add.
+ *
+ * Note: rbd_dev is assumed to have been initially zero-filled.
  */
 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
 			      const char *buf,
 			      const char **mon_addrs,
 			      size_t *mon_addrs_size,
 			      char *options,
-			      size_t options_size)
+			     size_t options_size)
 {
-	size_t	len;
+	size_t len;
+	int ret;
 
 	/* The first four tokens are required */
 
@@ -2310,56 +2392,74 @@
 	if (!len || len >= options_size)
 		return -EINVAL;
 
-	len = copy_token(&buf, rbd_dev->pool_name, sizeof (rbd_dev->pool_name));
-	if (!len || len >= sizeof (rbd_dev->pool_name))
-		return -EINVAL;
+	ret = -ENOMEM;
+	rbd_dev->pool_name = dup_token(&buf, NULL);
+	if (!rbd_dev->pool_name)
+		goto out_err;
 
-	len = copy_token(&buf, rbd_dev->obj, sizeof (rbd_dev->obj));
-	if (!len || len >= sizeof (rbd_dev->obj))
-		return -EINVAL;
+	rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
+	if (!rbd_dev->image_name)
+		goto out_err;
 
-	/* We have the object length in hand, save it. */
+	/* Create the name of the header object */
 
-	rbd_dev->obj_len = len;
-
-	BUILD_BUG_ON(RBD_MAX_MD_NAME_LEN
-				< RBD_MAX_OBJ_NAME_LEN + sizeof (RBD_SUFFIX));
-	sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
+	rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
+						+ sizeof (RBD_SUFFIX),
+					GFP_KERNEL);
+	if (!rbd_dev->header_name)
+		goto out_err;
+	sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
 
 	/*
-	 * The snapshot name is optional, but it's an error if it's
-	 * too long.  If no snapshot is supplied, fill in the default.
+	 * The snapshot name is optional.  If none is is supplied,
+	 * we use the default value.
 	 */
-	len = copy_token(&buf, rbd_dev->snap_name, sizeof (rbd_dev->snap_name));
-	if (!len)
+	rbd_dev->snap_name = dup_token(&buf, &len);
+	if (!rbd_dev->snap_name)
+		goto out_err;
+	if (!len) {
+		/* Replace the empty name with the default */
+		kfree(rbd_dev->snap_name);
+		rbd_dev->snap_name
+			= kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
+		if (!rbd_dev->snap_name)
+			goto out_err;
+
 		memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
 			sizeof (RBD_SNAP_HEAD_NAME));
-	else if (len >= sizeof (rbd_dev->snap_name))
-		return -EINVAL;
+	}
 
 	return 0;
+
+out_err:
+	kfree(rbd_dev->header_name);
+	kfree(rbd_dev->image_name);
+	kfree(rbd_dev->pool_name);
+	rbd_dev->pool_name = NULL;
+
+	return ret;
 }
 
 static ssize_t rbd_add(struct bus_type *bus,
 		       const char *buf,
 		       size_t count)
 {
-	struct rbd_device *rbd_dev;
+	char *options;
+	struct rbd_device *rbd_dev = NULL;
 	const char *mon_addrs = NULL;
 	size_t mon_addrs_size = 0;
-	char *options = NULL;
 	struct ceph_osd_client *osdc;
 	int rc = -ENOMEM;
 
 	if (!try_module_get(THIS_MODULE))
 		return -ENODEV;
 
-	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
-	if (!rbd_dev)
-		goto err_nomem;
 	options = kmalloc(count, GFP_KERNEL);
 	if (!options)
 		goto err_nomem;
+	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
+	if (!rbd_dev)
+		goto err_nomem;
 
 	/* static rbd_device initialization */
 	spin_lock_init(&rbd_dev->lock);
@@ -2367,15 +2467,13 @@
 	INIT_LIST_HEAD(&rbd_dev->snaps);
 	init_rwsem(&rbd_dev->header_rwsem);
 
-	init_rwsem(&rbd_dev->header_rwsem);
-
 	/* generate unique id: find highest unique id, add one */
 	rbd_id_get(rbd_dev);
 
 	/* Fill in the device name, now that we have its id. */
 	BUILD_BUG_ON(DEV_NAME_LEN
 			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
-	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
+	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
 
 	/* parse add command */
 	rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
@@ -2395,7 +2493,7 @@
 	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
 	if (rc < 0)
 		goto err_out_client;
-	rbd_dev->poolid = rc;
+	rbd_dev->pool_id = rc;
 
 	/* register our block device */
 	rc = register_blkdev(0, rbd_dev->name);
@@ -2435,10 +2533,16 @@
 err_out_client:
 	rbd_put_client(rbd_dev);
 err_put_id:
+	if (rbd_dev->pool_name) {
+		kfree(rbd_dev->snap_name);
+		kfree(rbd_dev->header_name);
+		kfree(rbd_dev->image_name);
+		kfree(rbd_dev->pool_name);
+	}
 	rbd_id_put(rbd_dev);
 err_nomem:
-	kfree(options);
 	kfree(rbd_dev);
+	kfree(options);
 
 	dout("Error adding device %s\n", buf);
 	module_put(THIS_MODULE);
@@ -2446,7 +2550,7 @@
 	return (ssize_t) rc;
 }
 
-static struct rbd_device *__rbd_get_dev(unsigned long id)
+static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
 {
 	struct list_head *tmp;
 	struct rbd_device *rbd_dev;
@@ -2454,7 +2558,7 @@
 	spin_lock(&rbd_dev_list_lock);
 	list_for_each(tmp, &rbd_dev_list) {
 		rbd_dev = list_entry(tmp, struct rbd_device, node);
-		if (rbd_dev->id == id) {
+		if (rbd_dev->dev_id == dev_id) {
 			spin_unlock(&rbd_dev_list_lock);
 			return rbd_dev;
 		}
@@ -2474,7 +2578,7 @@
 						    rbd_dev->watch_request);
 	}
 	if (rbd_dev->watch_event)
-		rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
+		rbd_req_sync_unwatch(rbd_dev);
 
 	rbd_put_client(rbd_dev);
 
@@ -2483,6 +2587,10 @@
 	unregister_blkdev(rbd_dev->major, rbd_dev->name);
 
 	/* done with the id, and with the rbd_dev */
+	kfree(rbd_dev->snap_name);
+	kfree(rbd_dev->header_name);
+	kfree(rbd_dev->pool_name);
+	kfree(rbd_dev->image_name);
 	rbd_id_put(rbd_dev);
 	kfree(rbd_dev);
 
@@ -2544,7 +2652,7 @@
 	if (ret < 0)
 		goto err_unlock;
 
-	ret = __rbd_refresh_header(rbd_dev);
+	ret = __rbd_refresh_header(rbd_dev, NULL);
 	if (ret < 0)
 		goto err_unlock;
 
@@ -2553,7 +2661,7 @@
 	mutex_unlock(&ctl_mutex);
 
 	/* make a best effort, don't error if failed */
-	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
+	rbd_req_sync_notify(rbd_dev);
 
 	ret = count;
 	kfree(name);
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index 9507086..0924e9e 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -31,7 +31,6 @@
 #define RBD_MIN_OBJ_ORDER       16
 #define RBD_MAX_OBJ_ORDER       30
 
-#define RBD_MAX_OBJ_NAME_LEN	96
 #define RBD_MAX_SEG_NAME_LEN	128
 
 #define RBD_COMP_NONE		0
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 00894ff..f391f1e 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -51,8 +51,7 @@
 		goto out_unlock;
 	}
 
-	if (dentry->d_parent == NULL ||   /* nfs fh_to_dentry */
-	    ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
+	if (ceph_snap(dentry->d_parent->d_inode) == CEPH_NOSNAP)
 		d_set_d_op(dentry, &ceph_dentry_ops);
 	else if (ceph_snap(dentry->d_parent->d_inode) == CEPH_SNAPDIR)
 		d_set_d_op(dentry, &ceph_snapdir_dentry_ops);
@@ -79,7 +78,7 @@
 		return NULL;
 
 	spin_lock(&dentry->d_lock);
-	if (dentry->d_parent) {
+	if (!IS_ROOT(dentry)) {
 		inode = dentry->d_parent->d_inode;
 		ihold(inode);
 	}
@@ -1154,7 +1153,7 @@
 	dout("ceph_d_prune %p\n", dentry);
 
 	/* do we have a valid parent? */
-	if (!dentry->d_parent || IS_ROOT(dentry))
+	if (IS_ROOT(dentry))
 		return;
 
 	/* if we are not hashed, we don't affect D_COMPLETE */
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 200bc87..a5a7354 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -10,6 +10,7 @@
 #include "super.h"
 #include "mds_client.h"
 
+#include <linux/ceph/ceph_features.h>
 #include <linux/ceph/messenger.h>
 #include <linux/ceph/decode.h>
 #include <linux/ceph/pagelist.h>
@@ -394,11 +395,7 @@
 	s->s_seq = 0;
 	mutex_init(&s->s_mutex);
 
-	ceph_con_init(mdsc->fsc->client->msgr, &s->s_con);
-	s->s_con.private = s;
-	s->s_con.ops = &mds_con_ops;
-	s->s_con.peer_name.type = CEPH_ENTITY_TYPE_MDS;
-	s->s_con.peer_name.num = cpu_to_le64(mds);
+	ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);
 
 	spin_lock_init(&s->s_gen_ttl_lock);
 	s->s_cap_gen = 0;
@@ -440,7 +437,8 @@
 	mdsc->sessions[mds] = s;
 	atomic_inc(&s->s_ref);  /* one ref to sessions[], one to caller */
 
-	ceph_con_open(&s->s_con, ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
+	ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
+		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 
 	return s;
 
@@ -1472,11 +1470,6 @@
 		else
 			len += 1 + temp->d_name.len;
 		temp = temp->d_parent;
-		if (temp == NULL) {
-			rcu_read_unlock();
-			pr_err("build_path corrupt dentry %p\n", dentry);
-			return ERR_PTR(-EINVAL);
-		}
 	}
 	rcu_read_unlock();
 	if (len)
@@ -1513,12 +1506,6 @@
 		if (pos)
 			path[--pos] = '/';
 		temp = temp->d_parent;
-		if (temp == NULL) {
-			rcu_read_unlock();
-			pr_err("build_path corrupt dentry\n");
-			kfree(path);
-			return ERR_PTR(-EINVAL);
-		}
 	}
 	rcu_read_unlock();
 	if (pos != 0 || read_seqretry(&rename_lock, seq)) {
@@ -2531,7 +2518,9 @@
 	session->s_state = CEPH_MDS_SESSION_RECONNECTING;
 	session->s_seq = 0;
 
+	ceph_con_close(&session->s_con);
 	ceph_con_open(&session->s_con,
+		      CEPH_ENTITY_TYPE_MDS, mds,
 		      ceph_mdsmap_get_addr(mdsc->mdsmap, mds));
 
 	/* replay unsafe requests */
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index e5206fc..cbb2f54 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -296,8 +296,7 @@
 	struct ceph_snap_realm *parent = realm->parent;
 	struct ceph_snap_context *snapc;
 	int err = 0;
-	int i;
-	int num = realm->num_prior_parent_snaps + realm->num_snaps;
+	u32 num = realm->num_prior_parent_snaps + realm->num_snaps;
 
 	/*
 	 * build parent context, if it hasn't been built.
@@ -321,11 +320,11 @@
 	    realm->cached_context->seq == realm->seq &&
 	    (!parent ||
 	     realm->cached_context->seq >= parent->cached_context->seq)) {
-		dout("build_snap_context %llx %p: %p seq %lld (%d snaps)"
+		dout("build_snap_context %llx %p: %p seq %lld (%u snaps)"
 		     " (unchanged)\n",
 		     realm->ino, realm, realm->cached_context,
 		     realm->cached_context->seq,
-		     realm->cached_context->num_snaps);
+		     (unsigned int) realm->cached_context->num_snaps);
 		return 0;
 	}
 
@@ -342,6 +341,8 @@
 	num = 0;
 	snapc->seq = realm->seq;
 	if (parent) {
+		u32 i;
+
 		/* include any of parent's snaps occurring _after_ my
 		   parent became my parent */
 		for (i = 0; i < parent->cached_context->num_snaps; i++)
@@ -361,8 +362,9 @@
 
 	sort(snapc->snaps, num, sizeof(u64), cmpu64_rev, NULL);
 	snapc->num_snaps = num;
-	dout("build_snap_context %llx %p: %p seq %lld (%d snaps)\n",
-	     realm->ino, realm, snapc, snapc->seq, snapc->num_snaps);
+	dout("build_snap_context %llx %p: %p seq %lld (%u snaps)\n",
+	     realm->ino, realm, snapc, snapc->seq,
+	     (unsigned int) snapc->num_snaps);
 
 	if (realm->cached_context)
 		ceph_put_snap_context(realm->cached_context);
@@ -402,9 +404,9 @@
  * helper to allocate and decode an array of snapids.  free prior
  * instance, if any.
  */
-static int dup_array(u64 **dst, __le64 *src, int num)
+static int dup_array(u64 **dst, __le64 *src, u32 num)
 {
-	int i;
+	u32 i;
 
 	kfree(*dst);
 	if (num) {
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 7076109..b982239 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -18,6 +18,7 @@
 #include "super.h"
 #include "mds_client.h"
 
+#include <linux/ceph/ceph_features.h>
 #include <linux/ceph/decode.h>
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/auth.h>
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index f4d5522..ebc95cc 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -612,9 +612,9 @@
 	u64 parent_since;   /* snapid when our current parent became so */
 
 	u64 *prior_parent_snaps;      /* snaps inherited from any parents we */
-	int num_prior_parent_snaps;   /*  had prior to parent_since */
+	u32 num_prior_parent_snaps;   /*  had prior to parent_since */
 	u64 *snaps;                   /* snaps specific to this realm */
-	int num_snaps;
+	u32 num_snaps;
 
 	struct ceph_snap_realm *parent;
 	struct list_head children;       /* list of child realms */
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 785cb30..2c2ae5b 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -457,6 +457,7 @@
 			for (i = 0; i < numattr; i++)
 				kfree(xattrs[i]);
 			kfree(xattrs);
+			xattrs = NULL;
 			goto start;
 		}
 		err = -EIO;
diff --git a/include/linux/ceph/ceph_features.h b/include/linux/ceph/ceph_features.h
new file mode 100644
index 0000000..dad579b
--- /dev/null
+++ b/include/linux/ceph/ceph_features.h
@@ -0,0 +1,27 @@
+#ifndef __CEPH_FEATURES
+#define __CEPH_FEATURES
+
+/*
+ * feature bits
+ */
+#define CEPH_FEATURE_UID            (1<<0)
+#define CEPH_FEATURE_NOSRCADDR      (1<<1)
+#define CEPH_FEATURE_MONCLOCKCHECK  (1<<2)
+#define CEPH_FEATURE_FLOCK          (1<<3)
+#define CEPH_FEATURE_SUBSCRIBE2     (1<<4)
+#define CEPH_FEATURE_MONNAMES       (1<<5)
+#define CEPH_FEATURE_RECONNECT_SEQ  (1<<6)
+#define CEPH_FEATURE_DIRLAYOUTHASH  (1<<7)
+/* bits 8-17 defined by user-space; not supported yet here */
+#define CEPH_FEATURE_CRUSH_TUNABLES (1<<18)
+
+/*
+ * Features supported.
+ */
+#define CEPH_FEATURES_SUPPORTED_DEFAULT  \
+	(CEPH_FEATURE_NOSRCADDR |	 \
+	 CEPH_FEATURE_CRUSH_TUNABLES)
+
+#define CEPH_FEATURES_REQUIRED_DEFAULT   \
+	(CEPH_FEATURE_NOSRCADDR)
+#endif
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index e81ab30..d021610 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -35,20 +35,6 @@
 /* arbitrary limit on max # of monitors (cluster of 3 is typical) */
 #define CEPH_MAX_MON   31
 
-
-/*
- * feature bits
- */
-#define CEPH_FEATURE_UID            (1<<0)
-#define CEPH_FEATURE_NOSRCADDR      (1<<1)
-#define CEPH_FEATURE_MONCLOCKCHECK  (1<<2)
-#define CEPH_FEATURE_FLOCK          (1<<3)
-#define CEPH_FEATURE_SUBSCRIBE2     (1<<4)
-#define CEPH_FEATURE_MONNAMES       (1<<5)
-#define CEPH_FEATURE_RECONNECT_SEQ  (1<<6)
-#define CEPH_FEATURE_DIRLAYOUTHASH  (1<<7)
-
-
 /*
  * ceph_file_layout - describe data layout for a file/inode
  */
diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
index d8615de..4bbf2db 100644
--- a/include/linux/ceph/decode.h
+++ b/include/linux/ceph/decode.h
@@ -1,6 +1,7 @@
 #ifndef __CEPH_DECODE_H
 #define __CEPH_DECODE_H
 
+#include <linux/err.h>
 #include <linux/bug.h>
 #include <linux/time.h>
 #include <asm/unaligned.h>
@@ -85,6 +86,52 @@
 	} while (0)
 
 /*
+ * Allocate a buffer big enough to hold the wire-encoded string, and
+ * decode the string into it.  The resulting string will always be
+ * terminated with '\0'.  If successful, *p will be advanced
+ * past the decoded data.  Also, if lenp is not a null pointer, the
+ * length (not including the terminating '\0') will be recorded in
+ * *lenp.  Note that a zero-length string is a valid return value.
+ *
+ * Returns a pointer to the newly-allocated string buffer, or a
+ * pointer-coded errno if an error occurs.  Neither *p nor *lenp
+ * will have been updated if an error is returned.
+ *
+ * There are two possible failures:
+ *   - converting the string would require accessing memory at or
+ *     beyond the "end" pointer provided (-E
+ *   - memory could not be allocated for the result
+ */
+static inline char *ceph_extract_encoded_string(void **p, void *end,
+						size_t *lenp, gfp_t gfp)
+{
+	u32 len;
+	void *sp = *p;
+	char *buf;
+
+	ceph_decode_32_safe(&sp, end, len, bad);
+	if (!ceph_has_room(&sp, end, len))
+		goto bad;
+
+	buf = kmalloc(len + 1, gfp);
+	if (!buf)
+		return ERR_PTR(-ENOMEM);
+
+	if (len)
+		memcpy(buf, sp, len);
+	buf[len] = '\0';
+
+	*p = (char *) *p + sizeof (u32) + len;
+	if (lenp)
+		*lenp = (size_t) len;
+
+	return buf;
+
+bad:
+	return ERR_PTR(-ERANGE);
+}
+
+/*
  * struct ceph_timespec <-> struct timespec
  */
 static inline void ceph_decode_timespec(struct timespec *ts,
@@ -151,7 +198,7 @@
 					u64 ino, const char *path)
 {
 	u32 len = path ? strlen(path) : 0;
-	BUG_ON(*p + sizeof(ino) + sizeof(len) + len > end);
+	BUG_ON(*p + 1 + sizeof(ino) + sizeof(len) + len > end);
 	ceph_encode_8(p, 1);
 	ceph_encode_64(p, ino);
 	ceph_encode_32(p, len);
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index e71d683..4262478 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -23,12 +23,6 @@
 #include "ceph_fs.h"
 
 /*
- * Supported features
- */
-#define CEPH_FEATURE_SUPPORTED_DEFAULT CEPH_FEATURE_NOSRCADDR
-#define CEPH_FEATURE_REQUIRED_DEFAULT  CEPH_FEATURE_NOSRCADDR
-
-/*
  * mount options
  */
 #define CEPH_OPT_FSID             (1<<0)
@@ -132,7 +126,7 @@
 	u32 supported_features;
 	u32 required_features;
 
-	struct ceph_messenger *msgr;   /* messenger instance */
+	struct ceph_messenger msgr;   /* messenger instance */
 	struct ceph_mon_client monc;
 	struct ceph_osd_client osdc;
 
@@ -160,7 +154,7 @@
 struct ceph_snap_context {
 	atomic_t nref;
 	u64 seq;
-	int num_snaps;
+	u32 num_snaps;
 	u64 snaps[];
 };
 
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 44c87e7..189ae06 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -31,9 +31,6 @@
 	int (*verify_authorizer_reply) (struct ceph_connection *con, int len);
 	int (*invalidate_authorizer)(struct ceph_connection *con);
 
-	/* protocol version mismatch */
-	void (*bad_proto) (struct ceph_connection *con);
-
 	/* there was some error on the socket (disconnect, whatever) */
 	void (*fault) (struct ceph_connection *con);
 
@@ -53,6 +50,7 @@
 	struct ceph_entity_inst inst;    /* my name+address */
 	struct ceph_entity_addr my_enc_addr;
 
+	atomic_t stopping;
 	bool nocrc;
 
 	/*
@@ -80,7 +78,10 @@
 	unsigned nr_pages;              /* size of page array */
 	unsigned page_alignment;        /* io offset in first page */
 	struct ceph_pagelist *pagelist; /* instead of pages */
+
+	struct ceph_connection *con;
 	struct list_head list_head;
+
 	struct kref kref;
 	struct bio  *bio;		/* instead of pages/pagelist */
 	struct bio  *bio_iter;		/* bio iterator */
@@ -106,23 +107,6 @@
 #define MAX_DELAY_INTERVAL	(5 * 60 * HZ)
 
 /*
- * ceph_connection state bit flags
- */
-#define LOSSYTX         0  /* we can close channel or drop messages on errors */
-#define CONNECTING	1
-#define NEGOTIATING	2
-#define KEEPALIVE_PENDING      3
-#define WRITE_PENDING	4  /* we have data ready to send */
-#define STANDBY		8  /* no outgoing messages, socket closed.  we keep
-			    * the ceph_connection around to maintain shared
-			    * state with the peer. */
-#define CLOSED		10 /* we've closed the connection */
-#define SOCK_CLOSED	11 /* socket state changed to closed */
-#define OPENING         13 /* open connection w/ (possibly new) peer */
-#define DEAD            14 /* dead, about to kfree */
-#define BACKOFF         15
-
-/*
  * A single connection with another host.
  *
  * We maintain a queue of outgoing messages, and some session state to
@@ -131,18 +115,22 @@
  */
 struct ceph_connection {
 	void *private;
-	atomic_t nref;
 
 	const struct ceph_connection_operations *ops;
 
 	struct ceph_messenger *msgr;
+
+	atomic_t sock_state;
 	struct socket *sock;
-	unsigned long state;	/* connection state (see flags above) */
+	struct ceph_entity_addr peer_addr; /* peer address */
+	struct ceph_entity_addr peer_addr_for_me;
+
+	unsigned long flags;
+	unsigned long state;
 	const char *error_msg;  /* error message, if any */
 
-	struct ceph_entity_addr peer_addr; /* peer address */
 	struct ceph_entity_name peer_name; /* peer name */
-	struct ceph_entity_addr peer_addr_for_me;
+
 	unsigned peer_features;
 	u32 connect_seq;      /* identify the most recent connection
 				 attempt for this connection, client */
@@ -207,24 +195,26 @@
 extern void ceph_msgr_exit(void);
 extern void ceph_msgr_flush(void);
 
-extern struct ceph_messenger *ceph_messenger_create(
-	struct ceph_entity_addr *myaddr,
-	u32 features, u32 required);
-extern void ceph_messenger_destroy(struct ceph_messenger *);
+extern void ceph_messenger_init(struct ceph_messenger *msgr,
+			struct ceph_entity_addr *myaddr,
+			u32 supported_features,
+			u32 required_features,
+			bool nocrc);
 
-extern void ceph_con_init(struct ceph_messenger *msgr,
-			  struct ceph_connection *con);
+extern void ceph_con_init(struct ceph_connection *con, void *private,
+			const struct ceph_connection_operations *ops,
+			struct ceph_messenger *msgr);
 extern void ceph_con_open(struct ceph_connection *con,
+			  __u8 entity_type, __u64 entity_num,
 			  struct ceph_entity_addr *addr);
 extern bool ceph_con_opened(struct ceph_connection *con);
 extern void ceph_con_close(struct ceph_connection *con);
 extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
-extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
-extern void ceph_con_revoke_message(struct ceph_connection *con,
-				  struct ceph_msg *msg);
+
+extern void ceph_msg_revoke(struct ceph_msg *msg);
+extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
+
 extern void ceph_con_keepalive(struct ceph_connection *con);
-extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
-extern void ceph_con_put(struct ceph_connection *con);
 
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
 				     bool can_fail);
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 545f859..2113e38 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -70,7 +70,7 @@
 	bool hunting;
 	int cur_mon;                       /* last monitor i contacted */
 	unsigned long sub_sent, sub_renew_after;
-	struct ceph_connection *con;
+	struct ceph_connection con;
 	bool have_fsid;
 
 	/* pending generic requests */
diff --git a/include/linux/ceph/msgpool.h b/include/linux/ceph/msgpool.h
index a362605f..09fa96b 100644
--- a/include/linux/ceph/msgpool.h
+++ b/include/linux/ceph/msgpool.h
@@ -11,10 +11,11 @@
 struct ceph_msgpool {
 	const char *name;
 	mempool_t *pool;
+	int type;               /* preallocated message type */
 	int front_len;          /* preallocated payload size */
 };
 
-extern int ceph_msgpool_init(struct ceph_msgpool *pool,
+extern int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
 			     int front_len, int size, bool blocking,
 			     const char *name);
 extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);
diff --git a/include/linux/crush/crush.h b/include/linux/crush/crush.h
index 7c47508..25baa28 100644
--- a/include/linux/crush/crush.h
+++ b/include/linux/crush/crush.h
@@ -154,6 +154,14 @@
 	__s32 max_buckets;
 	__u32 max_rules;
 	__s32 max_devices;
+
+	/* choose local retries before re-descent */
+	__u32 choose_local_tries;
+	/* choose local attempts using a fallback permutation before
+	 * re-descent */
+	__u32 choose_local_fallback_tries;
+	/* choose attempts before giving up */ 
+	__u32 choose_total_tries;
 };
 
 
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index ba4323b..69e38db 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -17,6 +17,7 @@
 #include <linux/string.h>
 
 
+#include <linux/ceph/ceph_features.h>
 #include <linux/ceph/libceph.h>
 #include <linux/ceph/debugfs.h>
 #include <linux/ceph/decode.h>
@@ -460,27 +461,23 @@
 	client->auth_err = 0;
 
 	client->extra_mon_dispatch = NULL;
-	client->supported_features = CEPH_FEATURE_SUPPORTED_DEFAULT |
+	client->supported_features = CEPH_FEATURES_SUPPORTED_DEFAULT |
 		supported_features;
-	client->required_features = CEPH_FEATURE_REQUIRED_DEFAULT |
+	client->required_features = CEPH_FEATURES_REQUIRED_DEFAULT |
 		required_features;
 
 	/* msgr */
 	if (ceph_test_opt(client, MYIP))
 		myaddr = &client->options->my_addr;
-	client->msgr = ceph_messenger_create(myaddr,
-					     client->supported_features,
-					     client->required_features);
-	if (IS_ERR(client->msgr)) {
-		err = PTR_ERR(client->msgr);
-		goto fail;
-	}
-	client->msgr->nocrc = ceph_test_opt(client, NOCRC);
+	ceph_messenger_init(&client->msgr, myaddr,
+		client->supported_features,
+		client->required_features,
+		ceph_test_opt(client, NOCRC));
 
 	/* subsystems */
 	err = ceph_monc_init(&client->monc, client);
 	if (err < 0)
-		goto fail_msgr;
+		goto fail;
 	err = ceph_osdc_init(&client->osdc, client);
 	if (err < 0)
 		goto fail_monc;
@@ -489,8 +486,6 @@
 
 fail_monc:
 	ceph_monc_stop(&client->monc);
-fail_msgr:
-	ceph_messenger_destroy(client->msgr);
 fail:
 	kfree(client);
 	return ERR_PTR(err);
@@ -501,6 +496,8 @@
 {
 	dout("destroy_client %p\n", client);
 
+	atomic_set(&client->msgr.stopping, 1);
+
 	/* unmount */
 	ceph_osdc_stop(&client->osdc);
 
@@ -508,8 +505,6 @@
 
 	ceph_debugfs_client_cleanup(client);
 
-	ceph_messenger_destroy(client->msgr);
-
 	ceph_destroy_options(client->options);
 
 	kfree(client);
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index d7edc24..35fce75 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -306,7 +306,6 @@
 	int item = 0;
 	int itemtype;
 	int collide, reject;
-	const unsigned int orig_tries = 5; /* attempts before we fall back to search */
 
 	dprintk("CHOOSE%s bucket %d x %d outpos %d numrep %d\n", recurse_to_leaf ? "_LEAF" : "",
 		bucket->id, x, outpos, numrep);
@@ -351,8 +350,9 @@
 					reject = 1;
 					goto reject;
 				}
-				if (flocal >= (in->size>>1) &&
-				    flocal > orig_tries)
+				if (map->choose_local_fallback_tries > 0 &&
+				    flocal >= (in->size>>1) &&
+				    flocal > map->choose_local_fallback_tries)
 					item = bucket_perm_choose(in, x, r);
 				else
 					item = crush_bucket_choose(in, x, r);
@@ -422,13 +422,14 @@
 					ftotal++;
 					flocal++;
 
-					if (collide && flocal < 3)
+					if (collide && flocal <= map->choose_local_tries)
 						/* retry locally a few times */
 						retry_bucket = 1;
-					else if (flocal <= in->size + orig_tries)
+					else if (map->choose_local_fallback_tries > 0 &&
+						 flocal <= in->size + map->choose_local_fallback_tries)
 						/* exhaustive bucket search */
 						retry_bucket = 1;
-					else if (ftotal < 20)
+					else if (ftotal <= map->choose_total_tries)
 						/* then retry descent */
 						retry_descent = 1;
 					else
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 10255e8..b979675 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -29,6 +29,74 @@
  * the sender.
  */
 
+/*
+ * We track the state of the socket on a given connection using
+ * values defined below.  The transition to a new socket state is
+ * handled by a function which verifies we aren't coming from an
+ * unexpected state.
+ *
+ *      --------
+ *      | NEW* |  transient initial state
+ *      --------
+ *          | con_sock_state_init()
+ *          v
+ *      ----------
+ *      | CLOSED |  initialized, but no socket (and no
+ *      ----------  TCP connection)
+ *       ^      \
+ *       |       \ con_sock_state_connecting()
+ *       |        ----------------------
+ *       |                              \
+ *       + con_sock_state_closed()       \
+ *       |+---------------------------    \
+ *       | \                          \    \
+ *       |  -----------                \    \
+ *       |  | CLOSING |  socket event;  \    \
+ *       |  -----------  await close     \    \
+ *       |       ^                        \   |
+ *       |       |                         \  |
+ *       |       + con_sock_state_closing() \ |
+ *       |      / \                         | |
+ *       |     /   ---------------          | |
+ *       |    /                   \         v v
+ *       |   /                    --------------
+ *       |  /    -----------------| CONNECTING |  socket created, TCP
+ *       |  |   /                 --------------  connect initiated
+ *       |  |   | con_sock_state_connected()
+ *       |  |   v
+ *      -------------
+ *      | CONNECTED |  TCP connection established
+ *      -------------
+ *
+ * State values for ceph_connection->sock_state; NEW is assumed to be 0.
+ */
+
+#define CON_SOCK_STATE_NEW		0	/* -> CLOSED */
+#define CON_SOCK_STATE_CLOSED		1	/* -> CONNECTING */
+#define CON_SOCK_STATE_CONNECTING	2	/* -> CONNECTED or -> CLOSING */
+#define CON_SOCK_STATE_CONNECTED	3	/* -> CLOSING or -> CLOSED */
+#define CON_SOCK_STATE_CLOSING		4	/* -> CLOSED */
+
+/*
+ * connection states
+ */
+#define CON_STATE_CLOSED        1  /* -> PREOPEN */
+#define CON_STATE_PREOPEN       2  /* -> CONNECTING, CLOSED */
+#define CON_STATE_CONNECTING    3  /* -> NEGOTIATING, CLOSED */
+#define CON_STATE_NEGOTIATING   4  /* -> OPEN, CLOSED */
+#define CON_STATE_OPEN          5  /* -> STANDBY, CLOSED */
+#define CON_STATE_STANDBY       6  /* -> PREOPEN, CLOSED */
+
+/*
+ * ceph_connection flag bits
+ */
+#define CON_FLAG_LOSSYTX           0  /* we can close channel or drop
+				       * messages on errors */
+#define CON_FLAG_KEEPALIVE_PENDING 1  /* we need to send a keepalive */
+#define CON_FLAG_WRITE_PENDING	   2  /* we have data ready to send */
+#define CON_FLAG_SOCK_CLOSED	   3  /* socket state changed to closed */
+#define CON_FLAG_BACKOFF           4  /* need to retry queuing delayed work */
+
 /* static tag bytes (protocol control messages) */
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -147,72 +215,130 @@
 }
 EXPORT_SYMBOL(ceph_msgr_flush);
 
+/* Connection socket state transition functions */
+
+static void con_sock_state_init(struct ceph_connection *con)
+{
+	int old_state;
+
+	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
+	if (WARN_ON(old_state != CON_SOCK_STATE_NEW))
+		printk("%s: unexpected old state %d\n", __func__, old_state);
+	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+	     CON_SOCK_STATE_CLOSED);
+}
+
+static void con_sock_state_connecting(struct ceph_connection *con)
+{
+	int old_state;
+
+	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTING);
+	if (WARN_ON(old_state != CON_SOCK_STATE_CLOSED))
+		printk("%s: unexpected old state %d\n", __func__, old_state);
+	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+	     CON_SOCK_STATE_CONNECTING);
+}
+
+static void con_sock_state_connected(struct ceph_connection *con)
+{
+	int old_state;
+
+	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CONNECTED);
+	if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING))
+		printk("%s: unexpected old state %d\n", __func__, old_state);
+	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+	     CON_SOCK_STATE_CONNECTED);
+}
+
+static void con_sock_state_closing(struct ceph_connection *con)
+{
+	int old_state;
+
+	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSING);
+	if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTING &&
+			old_state != CON_SOCK_STATE_CONNECTED &&
+			old_state != CON_SOCK_STATE_CLOSING))
+		printk("%s: unexpected old state %d\n", __func__, old_state);
+	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+	     CON_SOCK_STATE_CLOSING);
+}
+
+static void con_sock_state_closed(struct ceph_connection *con)
+{
+	int old_state;
+
+	old_state = atomic_xchg(&con->sock_state, CON_SOCK_STATE_CLOSED);
+	if (WARN_ON(old_state != CON_SOCK_STATE_CONNECTED &&
+		    old_state != CON_SOCK_STATE_CLOSING &&
+		    old_state != CON_SOCK_STATE_CONNECTING &&
+		    old_state != CON_SOCK_STATE_CLOSED))
+		printk("%s: unexpected old state %d\n", __func__, old_state);
+	dout("%s con %p sock %d -> %d\n", __func__, con, old_state,
+	     CON_SOCK_STATE_CLOSED);
+}
 
 /*
  * socket callback functions
  */
 
 /* data available on socket, or listen socket received a connect */
-static void ceph_data_ready(struct sock *sk, int count_unused)
+static void ceph_sock_data_ready(struct sock *sk, int count_unused)
 {
 	struct ceph_connection *con = sk->sk_user_data;
+	if (atomic_read(&con->msgr->stopping)) {
+		return;
+	}
 
 	if (sk->sk_state != TCP_CLOSE_WAIT) {
-		dout("ceph_data_ready on %p state = %lu, queueing work\n",
+		dout("%s on %p state = %lu, queueing work\n", __func__,
 		     con, con->state);
 		queue_con(con);
 	}
 }
 
 /* socket has buffer space for writing */
-static void ceph_write_space(struct sock *sk)
+static void ceph_sock_write_space(struct sock *sk)
 {
 	struct ceph_connection *con = sk->sk_user_data;
 
 	/* only queue to workqueue if there is data we want to write,
 	 * and there is sufficient space in the socket buffer to accept
-	 * more data.  clear SOCK_NOSPACE so that ceph_write_space()
+	 * more data.  clear SOCK_NOSPACE so that ceph_sock_write_space()
 	 * doesn't get called again until try_write() fills the socket
 	 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
 	 * and net/core/stream.c:sk_stream_write_space().
 	 */
-	if (test_bit(WRITE_PENDING, &con->state)) {
+	if (test_bit(CON_FLAG_WRITE_PENDING, &con->flags)) {
 		if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
-			dout("ceph_write_space %p queueing write work\n", con);
+			dout("%s %p queueing write work\n", __func__, con);
 			clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 			queue_con(con);
 		}
 	} else {
-		dout("ceph_write_space %p nothing to write\n", con);
+		dout("%s %p nothing to write\n", __func__, con);
 	}
 }
 
 /* socket's state has changed */
-static void ceph_state_change(struct sock *sk)
+static void ceph_sock_state_change(struct sock *sk)
 {
 	struct ceph_connection *con = sk->sk_user_data;
 
-	dout("ceph_state_change %p state = %lu sk_state = %u\n",
+	dout("%s %p state = %lu sk_state = %u\n", __func__,
 	     con, con->state, sk->sk_state);
 
-	if (test_bit(CLOSED, &con->state))
-		return;
-
 	switch (sk->sk_state) {
 	case TCP_CLOSE:
-		dout("ceph_state_change TCP_CLOSE\n");
+		dout("%s TCP_CLOSE\n", __func__);
 	case TCP_CLOSE_WAIT:
-		dout("ceph_state_change TCP_CLOSE_WAIT\n");
-		if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) {
-			if (test_bit(CONNECTING, &con->state))
-				con->error_msg = "connection failed";
-			else
-				con->error_msg = "socket closed";
-			queue_con(con);
-		}
+		dout("%s TCP_CLOSE_WAIT\n", __func__);
+		con_sock_state_closing(con);
+		set_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
+		queue_con(con);
 		break;
 	case TCP_ESTABLISHED:
-		dout("ceph_state_change TCP_ESTABLISHED\n");
+		dout("%s TCP_ESTABLISHED\n", __func__);
+		con_sock_state_connected(con);
 		queue_con(con);
 		break;
 	default:	/* Everything else is uninteresting */
@@ -228,9 +354,9 @@
 {
 	struct sock *sk = sock->sk;
 	sk->sk_user_data = con;
-	sk->sk_data_ready = ceph_data_ready;
-	sk->sk_write_space = ceph_write_space;
-	sk->sk_state_change = ceph_state_change;
+	sk->sk_data_ready = ceph_sock_data_ready;
+	sk->sk_write_space = ceph_sock_write_space;
+	sk->sk_state_change = ceph_sock_state_change;
 }
 
 
@@ -262,6 +388,7 @@
 
 	dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
 
+	con_sock_state_connecting(con);
 	ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
 				 O_NONBLOCK);
 	if (ret == -EINPROGRESS) {
@@ -277,7 +404,6 @@
 		return ret;
 	}
 	con->sock = sock;
-
 	return 0;
 }
 
@@ -333,16 +459,24 @@
  */
 static int con_close_socket(struct ceph_connection *con)
 {
-	int rc;
+	int rc = 0;
 
 	dout("con_close_socket on %p sock %p\n", con, con->sock);
-	if (!con->sock)
-		return 0;
-	set_bit(SOCK_CLOSED, &con->state);
-	rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
-	sock_release(con->sock);
-	con->sock = NULL;
-	clear_bit(SOCK_CLOSED, &con->state);
+	if (con->sock) {
+		rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
+		sock_release(con->sock);
+		con->sock = NULL;
+	}
+
+	/*
+	 * Forcibly clear the SOCK_CLOSED flag.  It gets set
+	 * independent of the connection mutex, and we could have
+	 * received a socket close event before we had the chance to
+	 * shut the socket down.
+	 */
+	clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags);
+
+	con_sock_state_closed(con);
 	return rc;
 }
 
@@ -353,6 +487,10 @@
 static void ceph_msg_remove(struct ceph_msg *msg)
 {
 	list_del_init(&msg->list_head);
+	BUG_ON(msg->con == NULL);
+	msg->con->ops->put(msg->con);
+	msg->con = NULL;
+
 	ceph_msg_put(msg);
 }
 static void ceph_msg_remove_list(struct list_head *head)
@@ -372,8 +510,11 @@
 	ceph_msg_remove_list(&con->out_sent);
 
 	if (con->in_msg) {
+		BUG_ON(con->in_msg->con != con);
+		con->in_msg->con = NULL;
 		ceph_msg_put(con->in_msg);
 		con->in_msg = NULL;
+		con->ops->put(con);
 	}
 
 	con->connect_seq = 0;
@@ -391,32 +532,44 @@
  */
 void ceph_con_close(struct ceph_connection *con)
 {
+	mutex_lock(&con->mutex);
 	dout("con_close %p peer %s\n", con,
 	     ceph_pr_addr(&con->peer_addr.in_addr));
-	set_bit(CLOSED, &con->state);  /* in case there's queued work */
-	clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
-	clear_bit(LOSSYTX, &con->state);  /* so we retry next connect */
-	clear_bit(KEEPALIVE_PENDING, &con->state);
-	clear_bit(WRITE_PENDING, &con->state);
-	mutex_lock(&con->mutex);
+	con->state = CON_STATE_CLOSED;
+
+	clear_bit(CON_FLAG_LOSSYTX, &con->flags); /* so we retry next connect */
+	clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
+	clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+	clear_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags);
+	clear_bit(CON_FLAG_BACKOFF, &con->flags);
+
 	reset_connection(con);
 	con->peer_global_seq = 0;
 	cancel_delayed_work(&con->work);
+	con_close_socket(con);
 	mutex_unlock(&con->mutex);
-	queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_close);
 
 /*
  * Reopen a closed connection, with a new peer address.
  */
-void ceph_con_open(struct ceph_connection *con, struct ceph_entity_addr *addr)
+void ceph_con_open(struct ceph_connection *con,
+		   __u8 entity_type, __u64 entity_num,
+		   struct ceph_entity_addr *addr)
 {
+	mutex_lock(&con->mutex);
 	dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
-	set_bit(OPENING, &con->state);
-	clear_bit(CLOSED, &con->state);
+
+	BUG_ON(con->state != CON_STATE_CLOSED);
+	con->state = CON_STATE_PREOPEN;
+
+	con->peer_name.type = (__u8) entity_type;
+	con->peer_name.num = cpu_to_le64(entity_num);
+
 	memcpy(&con->peer_addr, addr, sizeof(*addr));
 	con->delay = 0;      /* reset backoff memory */
+	mutex_unlock(&con->mutex);
 	queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_open);
@@ -430,42 +583,26 @@
 }
 
 /*
- * generic get/put
- */
-struct ceph_connection *ceph_con_get(struct ceph_connection *con)
-{
-	int nref = __atomic_add_unless(&con->nref, 1, 0);
-
-	dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1);
-
-	return nref ? con : NULL;
-}
-
-void ceph_con_put(struct ceph_connection *con)
-{
-	int nref = atomic_dec_return(&con->nref);
-
-	BUG_ON(nref < 0);
-	if (nref == 0) {
-		BUG_ON(con->sock);
-		kfree(con);
-	}
-	dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref);
-}
-
-/*
  * initialize a new connection.
  */
-void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
+void ceph_con_init(struct ceph_connection *con, void *private,
+	const struct ceph_connection_operations *ops,
+	struct ceph_messenger *msgr)
 {
 	dout("con_init %p\n", con);
 	memset(con, 0, sizeof(*con));
-	atomic_set(&con->nref, 1);
+	con->private = private;
+	con->ops = ops;
 	con->msgr = msgr;
+
+	con_sock_state_init(con);
+
 	mutex_init(&con->mutex);
 	INIT_LIST_HEAD(&con->out_queue);
 	INIT_LIST_HEAD(&con->out_sent);
 	INIT_DELAYED_WORK(&con->work, con_work);
+
+	con->state = CON_STATE_CLOSED;
 }
 EXPORT_SYMBOL(ceph_con_init);
 
@@ -486,14 +623,14 @@
 	return ret;
 }
 
-static void ceph_con_out_kvec_reset(struct ceph_connection *con)
+static void con_out_kvec_reset(struct ceph_connection *con)
 {
 	con->out_kvec_left = 0;
 	con->out_kvec_bytes = 0;
 	con->out_kvec_cur = &con->out_kvec[0];
 }
 
-static void ceph_con_out_kvec_add(struct ceph_connection *con,
+static void con_out_kvec_add(struct ceph_connection *con,
 				size_t size, void *data)
 {
 	int index;
@@ -507,6 +644,53 @@
 	con->out_kvec_bytes += size;
 }
 
+#ifdef CONFIG_BLOCK
+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+	if (!bio) {
+		*iter = NULL;
+		*seg = 0;
+		return;
+	}
+	*iter = bio;
+	*seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+	if (*bio_iter == NULL)
+		return;
+
+	BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+	(*seg)++;
+	if (*seg == (*bio_iter)->bi_vcnt)
+		init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+#endif
+
+static void prepare_write_message_data(struct ceph_connection *con)
+{
+	struct ceph_msg *msg = con->out_msg;
+
+	BUG_ON(!msg);
+	BUG_ON(!msg->hdr.data_len);
+
+	/* initialize page iterator */
+	con->out_msg_pos.page = 0;
+	if (msg->pages)
+		con->out_msg_pos.page_pos = msg->page_alignment;
+	else
+		con->out_msg_pos.page_pos = 0;
+#ifdef CONFIG_BLOCK
+	if (msg->bio)
+		init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+#endif
+	con->out_msg_pos.data_pos = 0;
+	con->out_msg_pos.did_page_crc = false;
+	con->out_more = 1;  /* data + footer will follow */
+}
+
 /*
  * Prepare footer for currently outgoing message, and finish things
  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
@@ -516,6 +700,8 @@
 	struct ceph_msg *m = con->out_msg;
 	int v = con->out_kvec_left;
 
+	m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
+
 	dout("prepare_write_message_footer %p\n", con);
 	con->out_kvec_is_msg = true;
 	con->out_kvec[v].iov_base = &m->footer;
@@ -534,7 +720,7 @@
 	struct ceph_msg *m;
 	u32 crc;
 
-	ceph_con_out_kvec_reset(con);
+	con_out_kvec_reset(con);
 	con->out_kvec_is_msg = true;
 	con->out_msg_done = false;
 
@@ -542,14 +728,16 @@
 	 * TCP packet that's a good thing. */
 	if (con->in_seq > con->in_seq_acked) {
 		con->in_seq_acked = con->in_seq;
-		ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+		con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 		con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-		ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
+		con_out_kvec_add(con, sizeof (con->out_temp_ack),
 			&con->out_temp_ack);
 	}
 
+	BUG_ON(list_empty(&con->out_queue));
 	m = list_first_entry(&con->out_queue, struct ceph_msg, list_head);
 	con->out_msg = m;
+	BUG_ON(m->con != con);
 
 	/* put message on sent list */
 	ceph_msg_get(m);
@@ -576,18 +764,18 @@
 	BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
 
 	/* tag + hdr + front + middle */
-	ceph_con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
-	ceph_con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
-	ceph_con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
+	con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
+	con_out_kvec_add(con, sizeof (m->hdr), &m->hdr);
+	con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
 
 	if (m->middle)
-		ceph_con_out_kvec_add(con, m->middle->vec.iov_len,
+		con_out_kvec_add(con, m->middle->vec.iov_len,
 			m->middle->vec.iov_base);
 
 	/* fill in crc (except data pages), footer */
 	crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
 	con->out_msg->hdr.crc = cpu_to_le32(crc);
-	con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
+	con->out_msg->footer.flags = 0;
 
 	crc = crc32c(0, m->front.iov_base, m->front.iov_len);
 	con->out_msg->footer.front_crc = cpu_to_le32(crc);
@@ -597,28 +785,19 @@
 		con->out_msg->footer.middle_crc = cpu_to_le32(crc);
 	} else
 		con->out_msg->footer.middle_crc = 0;
-	con->out_msg->footer.data_crc = 0;
-	dout("prepare_write_message front_crc %u data_crc %u\n",
+	dout("%s front_crc %u middle_crc %u\n", __func__,
 	     le32_to_cpu(con->out_msg->footer.front_crc),
 	     le32_to_cpu(con->out_msg->footer.middle_crc));
 
 	/* is there a data payload? */
-	if (le32_to_cpu(m->hdr.data_len) > 0) {
-		/* initialize page iterator */
-		con->out_msg_pos.page = 0;
-		if (m->pages)
-			con->out_msg_pos.page_pos = m->page_alignment;
-		else
-			con->out_msg_pos.page_pos = 0;
-		con->out_msg_pos.data_pos = 0;
-		con->out_msg_pos.did_page_crc = false;
-		con->out_more = 1;  /* data + footer will follow */
-	} else {
+	con->out_msg->footer.data_crc = 0;
+	if (m->hdr.data_len)
+		prepare_write_message_data(con);
+	else
 		/* no, queue up footer too and be done */
 		prepare_write_message_footer(con);
-	}
 
-	set_bit(WRITE_PENDING, &con->state);
+	set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 /*
@@ -630,16 +809,16 @@
 	     con->in_seq_acked, con->in_seq);
 	con->in_seq_acked = con->in_seq;
 
-	ceph_con_out_kvec_reset(con);
+	con_out_kvec_reset(con);
 
-	ceph_con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
+	con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
 
 	con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
-	ceph_con_out_kvec_add(con, sizeof (con->out_temp_ack),
+	con_out_kvec_add(con, sizeof (con->out_temp_ack),
 				&con->out_temp_ack);
 
 	con->out_more = 1;  /* more will follow.. eventually.. */
-	set_bit(WRITE_PENDING, &con->state);
+	set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 /*
@@ -648,9 +827,9 @@
 static void prepare_write_keepalive(struct ceph_connection *con)
 {
 	dout("prepare_write_keepalive %p\n", con);
-	ceph_con_out_kvec_reset(con);
-	ceph_con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
-	set_bit(WRITE_PENDING, &con->state);
+	con_out_kvec_reset(con);
+	con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
+	set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 /*
@@ -665,27 +844,21 @@
 	if (!con->ops->get_authorizer) {
 		con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
 		con->out_connect.authorizer_len = 0;
-
 		return NULL;
 	}
 
 	/* Can't hold the mutex while getting authorizer */
-
 	mutex_unlock(&con->mutex);
-
 	auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
-
 	mutex_lock(&con->mutex);
 
 	if (IS_ERR(auth))
 		return auth;
-	if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state))
+	if (con->state != CON_STATE_NEGOTIATING)
 		return ERR_PTR(-EAGAIN);
 
 	con->auth_reply_buf = auth->authorizer_reply_buf;
 	con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
-
-
 	return auth;
 }
 
@@ -694,12 +867,12 @@
  */
 static void prepare_write_banner(struct ceph_connection *con)
 {
-	ceph_con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
-	ceph_con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
+	con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
+	con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
 					&con->msgr->my_enc_addr);
 
 	con->out_more = 0;
-	set_bit(WRITE_PENDING, &con->state);
+	set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 }
 
 static int prepare_write_connect(struct ceph_connection *con)
@@ -742,14 +915,15 @@
 	con->out_connect.authorizer_len = auth ?
 		cpu_to_le32(auth->authorizer_buf_len) : 0;
 
-	ceph_con_out_kvec_add(con, sizeof (con->out_connect),
+	con_out_kvec_reset(con);
+	con_out_kvec_add(con, sizeof (con->out_connect),
 					&con->out_connect);
 	if (auth && auth->authorizer_buf_len)
-		ceph_con_out_kvec_add(con, auth->authorizer_buf_len,
+		con_out_kvec_add(con, auth->authorizer_buf_len,
 					auth->authorizer_buf);
 
 	con->out_more = 0;
-	set_bit(WRITE_PENDING, &con->state);
+	set_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 
 	return 0;
 }
@@ -797,30 +971,34 @@
 	return ret;  /* done! */
 }
 
+static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
+			size_t len, size_t sent, bool in_trail)
+{
+	struct ceph_msg *msg = con->out_msg;
+
+	BUG_ON(!msg);
+	BUG_ON(!sent);
+
+	con->out_msg_pos.data_pos += sent;
+	con->out_msg_pos.page_pos += sent;
+	if (sent < len)
+		return;
+
+	BUG_ON(sent != len);
+	con->out_msg_pos.page_pos = 0;
+	con->out_msg_pos.page++;
+	con->out_msg_pos.did_page_crc = false;
+	if (in_trail)
+		list_move_tail(&page->lru,
+			       &msg->trail->head);
+	else if (msg->pagelist)
+		list_move_tail(&page->lru,
+			       &msg->pagelist->head);
 #ifdef CONFIG_BLOCK
-static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
-{
-	if (!bio) {
-		*iter = NULL;
-		*seg = 0;
-		return;
-	}
-	*iter = bio;
-	*seg = bio->bi_idx;
-}
-
-static void iter_bio_next(struct bio **bio_iter, int *seg)
-{
-	if (*bio_iter == NULL)
-		return;
-
-	BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
-
-	(*seg)++;
-	if (*seg == (*bio_iter)->bi_vcnt)
-		init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
-}
+	else if (msg->bio)
+		iter_bio_next(&msg->bio_iter, &msg->bio_seg);
 #endif
+}
 
 /*
  * Write as much message data payload as we can.  If we finish, queue
@@ -837,41 +1015,36 @@
 	bool do_datacrc = !con->msgr->nocrc;
 	int ret;
 	int total_max_write;
-	int in_trail = 0;
-	size_t trail_len = (msg->trail ? msg->trail->length : 0);
+	bool in_trail = false;
+	const size_t trail_len = (msg->trail ? msg->trail->length : 0);
+	const size_t trail_off = data_len - trail_len;
 
 	dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
-	     con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
+	     con, msg, con->out_msg_pos.page, msg->nr_pages,
 	     con->out_msg_pos.page_pos);
 
-#ifdef CONFIG_BLOCK
-	if (msg->bio && !msg->bio_iter)
-		init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
-#endif
-
+	/*
+	 * Iterate through each page that contains data to be
+	 * written, and send as much as possible for each.
+	 *
+	 * If we are calculating the data crc (the default), we will
+	 * need to map the page.  If we have no pages, they have
+	 * been revoked, so use the zero page.
+	 */
 	while (data_len > con->out_msg_pos.data_pos) {
 		struct page *page = NULL;
 		int max_write = PAGE_SIZE;
 		int bio_offset = 0;
 
-		total_max_write = data_len - trail_len -
-			con->out_msg_pos.data_pos;
+		in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
+		if (!in_trail)
+			total_max_write = trail_off - con->out_msg_pos.data_pos;
 
-		/*
-		 * if we are calculating the data crc (the default), we need
-		 * to map the page.  if our pages[] has been revoked, use the
-		 * zero page.
-		 */
-
-		/* have we reached the trail part of the data? */
-		if (con->out_msg_pos.data_pos >= data_len - trail_len) {
-			in_trail = 1;
-
+		if (in_trail) {
 			total_max_write = data_len - con->out_msg_pos.data_pos;
 
 			page = list_first_entry(&msg->trail->head,
 						struct page, lru);
-			max_write = PAGE_SIZE;
 		} else if (msg->pages) {
 			page = msg->pages[con->out_msg_pos.page];
 		} else if (msg->pagelist) {
@@ -894,15 +1067,14 @@
 
 		if (do_datacrc && !con->out_msg_pos.did_page_crc) {
 			void *base;
-			u32 crc;
-			u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
+			u32 crc = le32_to_cpu(msg->footer.data_crc);
 			char *kaddr;
 
 			kaddr = kmap(page);
 			BUG_ON(kaddr == NULL);
 			base = kaddr + con->out_msg_pos.page_pos + bio_offset;
-			crc = crc32c(tmpcrc, base, len);
-			con->out_msg->footer.data_crc = cpu_to_le32(crc);
+			crc = crc32c(crc, base, len);
+			msg->footer.data_crc = cpu_to_le32(crc);
 			con->out_msg_pos.did_page_crc = true;
 		}
 		ret = ceph_tcp_sendpage(con->sock, page,
@@ -915,31 +1087,15 @@
 		if (ret <= 0)
 			goto out;
 
-		con->out_msg_pos.data_pos += ret;
-		con->out_msg_pos.page_pos += ret;
-		if (ret == len) {
-			con->out_msg_pos.page_pos = 0;
-			con->out_msg_pos.page++;
-			con->out_msg_pos.did_page_crc = false;
-			if (in_trail)
-				list_move_tail(&page->lru,
-					       &msg->trail->head);
-			else if (msg->pagelist)
-				list_move_tail(&page->lru,
-					       &msg->pagelist->head);
-#ifdef CONFIG_BLOCK
-			else if (msg->bio)
-				iter_bio_next(&msg->bio_iter, &msg->bio_seg);
-#endif
-		}
+		out_msg_pos_next(con, page, len, (size_t) ret, in_trail);
 	}
 
 	dout("write_partial_msg_pages %p msg %p done\n", con, msg);
 
 	/* prepare and queue up footer, too */
 	if (!do_datacrc)
-		con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
-	ceph_con_out_kvec_reset(con);
+		msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
+	con_out_kvec_reset(con);
 	prepare_write_message_footer(con);
 	ret = 1;
 out:
@@ -1351,20 +1507,14 @@
 		     ceph_pr_addr(&con->msgr->inst.addr.in_addr));
 	}
 
-	set_bit(NEGOTIATING, &con->state);
-	prepare_read_connect(con);
 	return 0;
 }
 
 static void fail_protocol(struct ceph_connection *con)
 {
 	reset_connection(con);
-	set_bit(CLOSED, &con->state);  /* in case there's queued work */
-
-	mutex_unlock(&con->mutex);
-	if (con->ops->bad_proto)
-		con->ops->bad_proto(con);
-	mutex_lock(&con->mutex);
+	BUG_ON(con->state != CON_STATE_NEGOTIATING);
+	con->state = CON_STATE_CLOSED;
 }
 
 static int process_connect(struct ceph_connection *con)
@@ -1407,7 +1557,6 @@
 			return -1;
 		}
 		con->auth_retry = 1;
-		ceph_con_out_kvec_reset(con);
 		ret = prepare_write_connect(con);
 		if (ret < 0)
 			return ret;
@@ -1428,7 +1577,6 @@
 		       ENTITY_NAME(con->peer_name),
 		       ceph_pr_addr(&con->peer_addr.in_addr));
 		reset_connection(con);
-		ceph_con_out_kvec_reset(con);
 		ret = prepare_write_connect(con);
 		if (ret < 0)
 			return ret;
@@ -1440,8 +1588,7 @@
 		if (con->ops->peer_reset)
 			con->ops->peer_reset(con);
 		mutex_lock(&con->mutex);
-		if (test_bit(CLOSED, &con->state) ||
-		    test_bit(OPENING, &con->state))
+		if (con->state != CON_STATE_NEGOTIATING)
 			return -EAGAIN;
 		break;
 
@@ -1454,7 +1601,6 @@
 		     le32_to_cpu(con->out_connect.connect_seq),
 		     le32_to_cpu(con->in_reply.connect_seq));
 		con->connect_seq = le32_to_cpu(con->in_reply.connect_seq);
-		ceph_con_out_kvec_reset(con);
 		ret = prepare_write_connect(con);
 		if (ret < 0)
 			return ret;
@@ -1471,7 +1617,6 @@
 		     le32_to_cpu(con->in_reply.global_seq));
 		get_global_seq(con->msgr,
 			       le32_to_cpu(con->in_reply.global_seq));
-		ceph_con_out_kvec_reset(con);
 		ret = prepare_write_connect(con);
 		if (ret < 0)
 			return ret;
@@ -1489,7 +1634,10 @@
 			fail_protocol(con);
 			return -1;
 		}
-		clear_bit(CONNECTING, &con->state);
+
+		BUG_ON(con->state != CON_STATE_NEGOTIATING);
+		con->state = CON_STATE_OPEN;
+
 		con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
 		con->connect_seq++;
 		con->peer_features = server_feat;
@@ -1501,7 +1649,9 @@
 			le32_to_cpu(con->in_reply.connect_seq));
 
 		if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
-			set_bit(LOSSYTX, &con->state);
+			set_bit(CON_FLAG_LOSSYTX, &con->flags);
+
+		con->delay = 0;      /* reset backoff memory */
 
 		prepare_read_tag(con);
 		break;
@@ -1587,10 +1737,7 @@
 	return 1;
 }
 
-static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
-				struct ceph_msg_header *hdr,
-				int *skip);
-
+static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip);
 
 static int read_partial_message_pages(struct ceph_connection *con,
 				      struct page **pages,
@@ -1633,9 +1780,6 @@
 	void *p;
 	int ret, left;
 
-	if (IS_ERR(bv))
-		return PTR_ERR(bv);
-
 	left = min((int)(data_len - con->in_msg_pos.data_pos),
 		   (int)(bv->bv_len - con->in_msg_pos.page_pos));
 
@@ -1672,7 +1816,6 @@
 	int ret;
 	unsigned int front_len, middle_len, data_len;
 	bool do_datacrc = !con->msgr->nocrc;
-	int skip;
 	u64 seq;
 	u32 crc;
 
@@ -1723,10 +1866,13 @@
 
 	/* allocate message? */
 	if (!con->in_msg) {
+		int skip = 0;
+
 		dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
 		     con->in_hdr.front_len, con->in_hdr.data_len);
-		skip = 0;
-		con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
+		ret = ceph_con_in_msg_alloc(con, &skip);
+		if (ret < 0)
+			return ret;
 		if (skip) {
 			/* skip this message */
 			dout("alloc_msg said skip message\n");
@@ -1737,11 +1883,9 @@
 			con->in_seq++;
 			return 0;
 		}
-		if (!con->in_msg) {
-			con->error_msg =
-				"error allocating memory for incoming message";
-			return -ENOMEM;
-		}
+
+		BUG_ON(!con->in_msg);
+		BUG_ON(con->in_msg->con != con);
 		m = con->in_msg;
 		m->front.iov_len = 0;    /* haven't read it yet */
 		if (m->middle)
@@ -1753,6 +1897,11 @@
 		else
 			con->in_msg_pos.page_pos = 0;
 		con->in_msg_pos.data_pos = 0;
+
+#ifdef CONFIG_BLOCK
+		if (m->bio)
+			init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
+#endif
 	}
 
 	/* front */
@@ -1769,10 +1918,6 @@
 		if (ret <= 0)
 			return ret;
 	}
-#ifdef CONFIG_BLOCK
-	if (m->bio && !m->bio_iter)
-		init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
-#endif
 
 	/* (page) data */
 	while (con->in_msg_pos.data_pos < data_len) {
@@ -1783,7 +1928,7 @@
 				return ret;
 #ifdef CONFIG_BLOCK
 		} else if (m->bio) {
-
+			BUG_ON(!m->bio_iter);
 			ret = read_partial_message_bio(con,
 						 &m->bio_iter, &m->bio_seg,
 						 data_len, do_datacrc);
@@ -1837,8 +1982,11 @@
 {
 	struct ceph_msg *msg;
 
+	BUG_ON(con->in_msg->con != con);
+	con->in_msg->con = NULL;
 	msg = con->in_msg;
 	con->in_msg = NULL;
+	con->ops->put(con);
 
 	/* if first message, set peer_name */
 	if (con->peer_name.type == 0)
@@ -1858,7 +2006,6 @@
 	con->ops->dispatch(con, msg);
 
 	mutex_lock(&con->mutex);
-	prepare_read_tag(con);
 }
 
 
@@ -1870,22 +2017,19 @@
 {
 	int ret = 1;
 
-	dout("try_write start %p state %lu nref %d\n", con, con->state,
-	     atomic_read(&con->nref));
+	dout("try_write start %p state %lu\n", con, con->state);
 
 more:
 	dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
 	/* open the socket first? */
-	if (con->sock == NULL) {
-		ceph_con_out_kvec_reset(con);
+	if (con->state == CON_STATE_PREOPEN) {
+		BUG_ON(con->sock);
+		con->state = CON_STATE_CONNECTING;
+
+		con_out_kvec_reset(con);
 		prepare_write_banner(con);
-		ret = prepare_write_connect(con);
-		if (ret < 0)
-			goto out;
 		prepare_read_banner(con);
-		set_bit(CONNECTING, &con->state);
-		clear_bit(NEGOTIATING, &con->state);
 
 		BUG_ON(con->in_msg);
 		con->in_tag = CEPH_MSGR_TAG_READY;
@@ -1932,7 +2076,7 @@
 	}
 
 do_next:
-	if (!test_bit(CONNECTING, &con->state)) {
+	if (con->state == CON_STATE_OPEN) {
 		/* is anything else pending? */
 		if (!list_empty(&con->out_queue)) {
 			prepare_write_message(con);
@@ -1942,14 +2086,15 @@
 			prepare_write_ack(con);
 			goto more;
 		}
-		if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) {
+		if (test_and_clear_bit(CON_FLAG_KEEPALIVE_PENDING,
+				       &con->flags)) {
 			prepare_write_keepalive(con);
 			goto more;
 		}
 	}
 
 	/* Nothing to do! */
-	clear_bit(WRITE_PENDING, &con->state);
+	clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
 	dout("try_write nothing else to write.\n");
 	ret = 0;
 out:
@@ -1966,38 +2111,42 @@
 {
 	int ret = -1;
 
-	if (!con->sock)
-		return 0;
-
-	if (test_bit(STANDBY, &con->state))
-		return 0;
-
-	dout("try_read start on %p\n", con);
-
 more:
+	dout("try_read start on %p state %lu\n", con, con->state);
+	if (con->state != CON_STATE_CONNECTING &&
+	    con->state != CON_STATE_NEGOTIATING &&
+	    con->state != CON_STATE_OPEN)
+		return 0;
+
+	BUG_ON(!con->sock);
+
 	dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
 	     con->in_base_pos);
 
-	/*
-	 * process_connect and process_message drop and re-take
-	 * con->mutex.  make sure we handle a racing close or reopen.
-	 */
-	if (test_bit(CLOSED, &con->state) ||
-	    test_bit(OPENING, &con->state)) {
-		ret = -EAGAIN;
+	if (con->state == CON_STATE_CONNECTING) {
+		dout("try_read connecting\n");
+		ret = read_partial_banner(con);
+		if (ret <= 0)
+			goto out;
+		ret = process_banner(con);
+		if (ret < 0)
+			goto out;
+
+		BUG_ON(con->state != CON_STATE_CONNECTING);
+		con->state = CON_STATE_NEGOTIATING;
+
+		/* Banner is good, exchange connection info */
+		ret = prepare_write_connect(con);
+		if (ret < 0)
+			goto out;
+		prepare_read_connect(con);
+
+		/* Send connection info before awaiting response */
 		goto out;
 	}
 
-	if (test_bit(CONNECTING, &con->state)) {
-		if (!test_bit(NEGOTIATING, &con->state)) {
-			dout("try_read connecting\n");
-			ret = read_partial_banner(con);
-			if (ret <= 0)
-				goto out;
-			ret = process_banner(con);
-			if (ret < 0)
-				goto out;
-		}
+	if (con->state == CON_STATE_NEGOTIATING) {
+		dout("try_read negotiating\n");
 		ret = read_partial_connect(con);
 		if (ret <= 0)
 			goto out;
@@ -2007,6 +2156,8 @@
 		goto more;
 	}
 
+	BUG_ON(con->state != CON_STATE_OPEN);
+
 	if (con->in_base_pos < 0) {
 		/*
 		 * skipping + discarding content.
@@ -2040,7 +2191,8 @@
 			prepare_read_ack(con);
 			break;
 		case CEPH_MSGR_TAG_CLOSE:
-			set_bit(CLOSED, &con->state);   /* fixme */
+			con_close_socket(con);
+			con->state = CON_STATE_CLOSED;
 			goto out;
 		default:
 			goto bad_tag;
@@ -2063,6 +2215,8 @@
 		if (con->in_tag == CEPH_MSGR_TAG_READY)
 			goto more;
 		process_message(con);
+		if (con->state == CON_STATE_OPEN)
+			prepare_read_tag(con);
 		goto more;
 	}
 	if (con->in_tag == CEPH_MSGR_TAG_ACK) {
@@ -2091,12 +2245,6 @@
  */
 static void queue_con(struct ceph_connection *con)
 {
-	if (test_bit(DEAD, &con->state)) {
-		dout("queue_con %p ignoring: DEAD\n",
-		     con);
-		return;
-	}
-
 	if (!con->ops->get(con)) {
 		dout("queue_con %p ref count 0\n", con);
 		return;
@@ -2121,7 +2269,26 @@
 
 	mutex_lock(&con->mutex);
 restart:
-	if (test_and_clear_bit(BACKOFF, &con->state)) {
+	if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) {
+		switch (con->state) {
+		case CON_STATE_CONNECTING:
+			con->error_msg = "connection failed";
+			break;
+		case CON_STATE_NEGOTIATING:
+			con->error_msg = "negotiation failed";
+			break;
+		case CON_STATE_OPEN:
+			con->error_msg = "socket closed";
+			break;
+		default:
+			dout("unrecognized con state %d\n", (int)con->state);
+			con->error_msg = "unrecognized con state";
+			BUG();
+		}
+		goto fault;
+	}
+
+	if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
 		dout("con_work %p backing off\n", con);
 		if (queue_delayed_work(ceph_msgr_wq, &con->work,
 				       round_jiffies_relative(con->delay))) {
@@ -2135,35 +2302,35 @@
 		}
 	}
 
-	if (test_bit(STANDBY, &con->state)) {
+	if (con->state == CON_STATE_STANDBY) {
 		dout("con_work %p STANDBY\n", con);
 		goto done;
 	}
-	if (test_bit(CLOSED, &con->state)) { /* e.g. if we are replaced */
-		dout("con_work CLOSED\n");
-		con_close_socket(con);
+	if (con->state == CON_STATE_CLOSED) {
+		dout("con_work %p CLOSED\n", con);
+		BUG_ON(con->sock);
 		goto done;
 	}
-	if (test_and_clear_bit(OPENING, &con->state)) {
-		/* reopen w/ new peer */
+	if (con->state == CON_STATE_PREOPEN) {
 		dout("con_work OPENING\n");
-		con_close_socket(con);
+		BUG_ON(con->sock);
 	}
 
-	if (test_and_clear_bit(SOCK_CLOSED, &con->state))
-		goto fault;
-
 	ret = try_read(con);
 	if (ret == -EAGAIN)
 		goto restart;
-	if (ret < 0)
+	if (ret < 0) {
+		con->error_msg = "socket error on read";
 		goto fault;
+	}
 
 	ret = try_write(con);
 	if (ret == -EAGAIN)
 		goto restart;
-	if (ret < 0)
+	if (ret < 0) {
+		con->error_msg = "socket error on write";
 		goto fault;
+	}
 
 done:
 	mutex_unlock(&con->mutex);
@@ -2172,7 +2339,6 @@
 	return;
 
 fault:
-	mutex_unlock(&con->mutex);
 	ceph_fault(con);     /* error/fault path */
 	goto done_unlocked;
 }
@@ -2183,26 +2349,31 @@
  * exponential backoff
  */
 static void ceph_fault(struct ceph_connection *con)
+	__releases(con->mutex)
 {
 	pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
 	       ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
 	dout("fault %p state %lu to peer %s\n",
 	     con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
 
-	if (test_bit(LOSSYTX, &con->state)) {
-		dout("fault on LOSSYTX channel\n");
-		goto out;
-	}
-
-	mutex_lock(&con->mutex);
-	if (test_bit(CLOSED, &con->state))
-		goto out_unlock;
+	BUG_ON(con->state != CON_STATE_CONNECTING &&
+	       con->state != CON_STATE_NEGOTIATING &&
+	       con->state != CON_STATE_OPEN);
 
 	con_close_socket(con);
 
+	if (test_bit(CON_FLAG_LOSSYTX, &con->flags)) {
+		dout("fault on LOSSYTX channel, marking CLOSED\n");
+		con->state = CON_STATE_CLOSED;
+		goto out_unlock;
+	}
+
 	if (con->in_msg) {
+		BUG_ON(con->in_msg->con != con);
+		con->in_msg->con = NULL;
 		ceph_msg_put(con->in_msg);
 		con->in_msg = NULL;
+		con->ops->put(con);
 	}
 
 	/* Requeue anything that hasn't been acked */
@@ -2211,12 +2382,13 @@
 	/* If there are no messages queued or keepalive pending, place
 	 * the connection in a STANDBY state */
 	if (list_empty(&con->out_queue) &&
-	    !test_bit(KEEPALIVE_PENDING, &con->state)) {
+	    !test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags)) {
 		dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
-		clear_bit(WRITE_PENDING, &con->state);
-		set_bit(STANDBY, &con->state);
+		clear_bit(CON_FLAG_WRITE_PENDING, &con->flags);
+		con->state = CON_STATE_STANDBY;
 	} else {
 		/* retry after a delay. */
+		con->state = CON_STATE_PREOPEN;
 		if (con->delay == 0)
 			con->delay = BASE_DELAY_INTERVAL;
 		else if (con->delay < MAX_DELAY_INTERVAL)
@@ -2237,13 +2409,12 @@
 			 * that when con_work restarts we schedule the
 			 * delay then.
 			 */
-			set_bit(BACKOFF, &con->state);
+			set_bit(CON_FLAG_BACKOFF, &con->flags);
 		}
 	}
 
 out_unlock:
 	mutex_unlock(&con->mutex);
-out:
 	/*
 	 * in case we faulted due to authentication, invalidate our
 	 * current tickets so that we can get new ones.
@@ -2260,18 +2431,14 @@
 
 
 /*
- * create a new messenger instance
+ * initialize a new messenger instance
  */
-struct ceph_messenger *ceph_messenger_create(struct ceph_entity_addr *myaddr,
-					     u32 supported_features,
-					     u32 required_features)
+void ceph_messenger_init(struct ceph_messenger *msgr,
+			struct ceph_entity_addr *myaddr,
+			u32 supported_features,
+			u32 required_features,
+			bool nocrc)
 {
-	struct ceph_messenger *msgr;
-
-	msgr = kzalloc(sizeof(*msgr), GFP_KERNEL);
-	if (msgr == NULL)
-		return ERR_PTR(-ENOMEM);
-
 	msgr->supported_features = supported_features;
 	msgr->required_features = required_features;
 
@@ -2284,30 +2451,23 @@
 	msgr->inst.addr.type = 0;
 	get_random_bytes(&msgr->inst.addr.nonce, sizeof(msgr->inst.addr.nonce));
 	encode_my_addr(msgr);
+	msgr->nocrc = nocrc;
 
-	dout("messenger_create %p\n", msgr);
-	return msgr;
-}
-EXPORT_SYMBOL(ceph_messenger_create);
+	atomic_set(&msgr->stopping, 0);
 
-void ceph_messenger_destroy(struct ceph_messenger *msgr)
-{
-	dout("destroy %p\n", msgr);
-	kfree(msgr);
-	dout("destroyed messenger %p\n", msgr);
+	dout("%s %p\n", __func__, msgr);
 }
-EXPORT_SYMBOL(ceph_messenger_destroy);
+EXPORT_SYMBOL(ceph_messenger_init);
 
 static void clear_standby(struct ceph_connection *con)
 {
 	/* come back from STANDBY? */
-	if (test_and_clear_bit(STANDBY, &con->state)) {
-		mutex_lock(&con->mutex);
+	if (con->state == CON_STATE_STANDBY) {
 		dout("clear_standby %p and ++connect_seq\n", con);
+		con->state = CON_STATE_PREOPEN;
 		con->connect_seq++;
-		WARN_ON(test_bit(WRITE_PENDING, &con->state));
-		WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state));
-		mutex_unlock(&con->mutex);
+		WARN_ON(test_bit(CON_FLAG_WRITE_PENDING, &con->flags));
+		WARN_ON(test_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags));
 	}
 }
 
@@ -2316,21 +2476,24 @@
  */
 void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
 {
-	if (test_bit(CLOSED, &con->state)) {
+	/* set src+dst */
+	msg->hdr.src = con->msgr->inst.name;
+	BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
+	msg->needs_out_seq = true;
+
+	mutex_lock(&con->mutex);
+
+	if (con->state == CON_STATE_CLOSED) {
 		dout("con_send %p closed, dropping %p\n", con, msg);
 		ceph_msg_put(msg);
+		mutex_unlock(&con->mutex);
 		return;
 	}
 
-	/* set src+dst */
-	msg->hdr.src = con->msgr->inst.name;
+	BUG_ON(msg->con != NULL);
+	msg->con = con->ops->get(con);
+	BUG_ON(msg->con == NULL);
 
-	BUG_ON(msg->front.iov_len != le32_to_cpu(msg->hdr.front_len));
-
-	msg->needs_out_seq = true;
-
-	/* queue */
-	mutex_lock(&con->mutex);
 	BUG_ON(!list_empty(&msg->list_head));
 	list_add_tail(&msg->list_head, &con->out_queue);
 	dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -2339,12 +2502,13 @@
 	     le32_to_cpu(msg->hdr.front_len),
 	     le32_to_cpu(msg->hdr.middle_len),
 	     le32_to_cpu(msg->hdr.data_len));
+
+	clear_standby(con);
 	mutex_unlock(&con->mutex);
 
 	/* if there wasn't anything waiting to send before, queue
 	 * new work */
-	clear_standby(con);
-	if (test_and_set_bit(WRITE_PENDING, &con->state) == 0)
+	if (test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
 		queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_send);
@@ -2352,24 +2516,34 @@
 /*
  * Revoke a message that was previously queued for send
  */
-void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
+void ceph_msg_revoke(struct ceph_msg *msg)
 {
+	struct ceph_connection *con = msg->con;
+
+	if (!con)
+		return;		/* Message not in our possession */
+
 	mutex_lock(&con->mutex);
 	if (!list_empty(&msg->list_head)) {
-		dout("con_revoke %p msg %p - was on queue\n", con, msg);
+		dout("%s %p msg %p - was on queue\n", __func__, con, msg);
 		list_del_init(&msg->list_head);
-		ceph_msg_put(msg);
+		BUG_ON(msg->con == NULL);
+		msg->con->ops->put(msg->con);
+		msg->con = NULL;
 		msg->hdr.seq = 0;
+
+		ceph_msg_put(msg);
 	}
 	if (con->out_msg == msg) {
-		dout("con_revoke %p msg %p - was sending\n", con, msg);
+		dout("%s %p msg %p - was sending\n", __func__, con, msg);
 		con->out_msg = NULL;
 		if (con->out_kvec_is_msg) {
 			con->out_skip = con->out_kvec_bytes;
 			con->out_kvec_is_msg = false;
 		}
-		ceph_msg_put(msg);
 		msg->hdr.seq = 0;
+
+		ceph_msg_put(msg);
 	}
 	mutex_unlock(&con->mutex);
 }
@@ -2377,17 +2551,27 @@
 /*
  * Revoke a message that we may be reading data into
  */
-void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
+void ceph_msg_revoke_incoming(struct ceph_msg *msg)
 {
+	struct ceph_connection *con;
+
+	BUG_ON(msg == NULL);
+	if (!msg->con) {
+		dout("%s msg %p null con\n", __func__, msg);
+
+		return;		/* Message not in our possession */
+	}
+
+	con = msg->con;
 	mutex_lock(&con->mutex);
-	if (con->in_msg && con->in_msg == msg) {
+	if (con->in_msg == msg) {
 		unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
 		unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
 		unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
 
 		/* skip rest of message */
-		dout("con_revoke_pages %p msg %p revoked\n", con, msg);
-			con->in_base_pos = con->in_base_pos -
+		dout("%s %p msg %p revoked\n", __func__, con, msg);
+		con->in_base_pos = con->in_base_pos -
 				sizeof(struct ceph_msg_header) -
 				front_len -
 				middle_len -
@@ -2398,8 +2582,8 @@
 		con->in_tag = CEPH_MSGR_TAG_READY;
 		con->in_seq++;
 	} else {
-		dout("con_revoke_pages %p msg %p pages %p no-op\n",
-		     con, con->in_msg, msg);
+		dout("%s %p in_msg %p msg %p no-op\n",
+		     __func__, con, con->in_msg, msg);
 	}
 	mutex_unlock(&con->mutex);
 }
@@ -2410,9 +2594,11 @@
 void ceph_con_keepalive(struct ceph_connection *con)
 {
 	dout("con_keepalive %p\n", con);
+	mutex_lock(&con->mutex);
 	clear_standby(con);
-	if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 &&
-	    test_and_set_bit(WRITE_PENDING, &con->state) == 0)
+	mutex_unlock(&con->mutex);
+	if (test_and_set_bit(CON_FLAG_KEEPALIVE_PENDING, &con->flags) == 0 &&
+	    test_and_set_bit(CON_FLAG_WRITE_PENDING, &con->flags) == 0)
 		queue_con(con);
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
@@ -2431,6 +2617,8 @@
 	if (m == NULL)
 		goto out;
 	kref_init(&m->kref);
+
+	m->con = NULL;
 	INIT_LIST_HEAD(&m->list_head);
 
 	m->hdr.tid = 0;
@@ -2526,46 +2714,77 @@
 }
 
 /*
- * Generic message allocator, for incoming messages.
+ * Allocate a message for receiving an incoming message on a
+ * connection, and save the result in con->in_msg.  Uses the
+ * connection's private alloc_msg op if available.
+ *
+ * Returns 0 on success, or a negative error code.
+ *
+ * On success, if we set *skip = 1:
+ *  - the next message should be skipped and ignored.
+ *  - con->in_msg == NULL
+ * or if we set *skip = 0:
+ *  - con->in_msg is non-null.
+ * On error (ENOMEM, EAGAIN, ...),
+ *  - con->in_msg == NULL
  */
-static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
-				struct ceph_msg_header *hdr,
-				int *skip)
+static int ceph_con_in_msg_alloc(struct ceph_connection *con, int *skip)
 {
+	struct ceph_msg_header *hdr = &con->in_hdr;
 	int type = le16_to_cpu(hdr->type);
 	int front_len = le32_to_cpu(hdr->front_len);
 	int middle_len = le32_to_cpu(hdr->middle_len);
-	struct ceph_msg *msg = NULL;
-	int ret;
+	int ret = 0;
+
+	BUG_ON(con->in_msg != NULL);
 
 	if (con->ops->alloc_msg) {
+		struct ceph_msg *msg;
+
 		mutex_unlock(&con->mutex);
 		msg = con->ops->alloc_msg(con, hdr, skip);
 		mutex_lock(&con->mutex);
-		if (!msg || *skip)
-			return NULL;
+		if (con->state != CON_STATE_OPEN) {
+			ceph_msg_put(msg);
+			return -EAGAIN;
+		}
+		con->in_msg = msg;
+		if (con->in_msg) {
+			con->in_msg->con = con->ops->get(con);
+			BUG_ON(con->in_msg->con == NULL);
+		}
+		if (*skip) {
+			con->in_msg = NULL;
+			return 0;
+		}
+		if (!con->in_msg) {
+			con->error_msg =
+				"error allocating memory for incoming message";
+			return -ENOMEM;
+		}
 	}
-	if (!msg) {
-		*skip = 0;
-		msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
-		if (!msg) {
+	if (!con->in_msg) {
+		con->in_msg = ceph_msg_new(type, front_len, GFP_NOFS, false);
+		if (!con->in_msg) {
 			pr_err("unable to allocate msg type %d len %d\n",
 			       type, front_len);
-			return NULL;
+			return -ENOMEM;
 		}
-		msg->page_alignment = le16_to_cpu(hdr->data_off);
+		con->in_msg->con = con->ops->get(con);
+		BUG_ON(con->in_msg->con == NULL);
+		con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
 	}
-	memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
+	memcpy(&con->in_msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 
-	if (middle_len && !msg->middle) {
-		ret = ceph_alloc_middle(con, msg);
+	if (middle_len && !con->in_msg->middle) {
+		ret = ceph_alloc_middle(con, con->in_msg);
 		if (ret < 0) {
-			ceph_msg_put(msg);
-			return NULL;
+			ceph_msg_put(con->in_msg);
+			con->in_msg = NULL;
 		}
 	}
 
-	return msg;
+	return ret;
 }
 
 
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index d0649a9..105d533 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -106,9 +106,9 @@
 	monc->pending_auth = 1;
 	monc->m_auth->front.iov_len = len;
 	monc->m_auth->hdr.front_len = cpu_to_le32(len);
-	ceph_con_revoke(monc->con, monc->m_auth);
+	ceph_msg_revoke(monc->m_auth);
 	ceph_msg_get(monc->m_auth);  /* keep our ref */
-	ceph_con_send(monc->con, monc->m_auth);
+	ceph_con_send(&monc->con, monc->m_auth);
 }
 
 /*
@@ -117,8 +117,11 @@
 static void __close_session(struct ceph_mon_client *monc)
 {
 	dout("__close_session closing mon%d\n", monc->cur_mon);
-	ceph_con_revoke(monc->con, monc->m_auth);
-	ceph_con_close(monc->con);
+	ceph_msg_revoke(monc->m_auth);
+	ceph_msg_revoke_incoming(monc->m_auth_reply);
+	ceph_msg_revoke(monc->m_subscribe);
+	ceph_msg_revoke_incoming(monc->m_subscribe_ack);
+	ceph_con_close(&monc->con);
 	monc->cur_mon = -1;
 	monc->pending_auth = 0;
 	ceph_auth_reset(monc->auth);
@@ -142,9 +145,8 @@
 		monc->want_next_osdmap = !!monc->want_next_osdmap;
 
 		dout("open_session mon%d opening\n", monc->cur_mon);
-		monc->con->peer_name.type = CEPH_ENTITY_TYPE_MON;
-		monc->con->peer_name.num = cpu_to_le64(monc->cur_mon);
-		ceph_con_open(monc->con,
+		ceph_con_open(&monc->con,
+			      CEPH_ENTITY_TYPE_MON, monc->cur_mon,
 			      &monc->monmap->mon_inst[monc->cur_mon].addr);
 
 		/* initiatiate authentication handshake */
@@ -226,8 +228,8 @@
 
 		msg->front.iov_len = p - msg->front.iov_base;
 		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
-		ceph_con_revoke(monc->con, msg);
-		ceph_con_send(monc->con, ceph_msg_get(msg));
+		ceph_msg_revoke(msg);
+		ceph_con_send(&monc->con, ceph_msg_get(msg));
 
 		monc->sub_sent = jiffies | 1;  /* never 0 */
 	}
@@ -247,7 +249,7 @@
 	if (monc->hunting) {
 		pr_info("mon%d %s session established\n",
 			monc->cur_mon,
-			ceph_pr_addr(&monc->con->peer_addr.in_addr));
+			ceph_pr_addr(&monc->con.peer_addr.in_addr));
 		monc->hunting = false;
 	}
 	dout("handle_subscribe_ack after %d seconds\n", seconds);
@@ -439,6 +441,7 @@
 		m = NULL;
 	} else {
 		dout("get_generic_reply %lld got %p\n", tid, req->reply);
+		*skip = 0;
 		m = ceph_msg_get(req->reply);
 		/*
 		 * we don't need to track the connection reading into
@@ -461,7 +464,7 @@
 	req->request->hdr.tid = cpu_to_le64(req->tid);
 	__insert_generic_request(monc, req);
 	monc->num_generic_requests++;
-	ceph_con_send(monc->con, ceph_msg_get(req->request));
+	ceph_con_send(&monc->con, ceph_msg_get(req->request));
 	mutex_unlock(&monc->mutex);
 
 	err = wait_for_completion_interruptible(&req->completion);
@@ -684,8 +687,9 @@
 
 	for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
 		req = rb_entry(p, struct ceph_mon_generic_request, node);
-		ceph_con_revoke(monc->con, req->request);
-		ceph_con_send(monc->con, ceph_msg_get(req->request));
+		ceph_msg_revoke(req->request);
+		ceph_msg_revoke_incoming(req->reply);
+		ceph_con_send(&monc->con, ceph_msg_get(req->request));
 	}
 }
 
@@ -705,7 +709,7 @@
 		__close_session(monc);
 		__open_session(monc);  /* continue hunting */
 	} else {
-		ceph_con_keepalive(monc->con);
+		ceph_con_keepalive(&monc->con);
 
 		__validate_auth(monc);
 
@@ -760,19 +764,12 @@
 		goto out;
 
 	/* connection */
-	monc->con = kmalloc(sizeof(*monc->con), GFP_KERNEL);
-	if (!monc->con)
-		goto out_monmap;
-	ceph_con_init(monc->client->msgr, monc->con);
-	monc->con->private = monc;
-	monc->con->ops = &mon_con_ops;
-
 	/* authentication */
 	monc->auth = ceph_auth_init(cl->options->name,
 				    cl->options->key);
 	if (IS_ERR(monc->auth)) {
 		err = PTR_ERR(monc->auth);
-		goto out_con;
+		goto out_monmap;
 	}
 	monc->auth->want_keys =
 		CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
@@ -801,6 +798,9 @@
 	if (!monc->m_auth)
 		goto out_auth_reply;
 
+	ceph_con_init(&monc->con, monc, &mon_con_ops,
+		      &monc->client->msgr);
+
 	monc->cur_mon = -1;
 	monc->hunting = true;
 	monc->sub_renew_after = jiffies;
@@ -824,8 +824,6 @@
 	ceph_msg_put(monc->m_subscribe_ack);
 out_auth:
 	ceph_auth_destroy(monc->auth);
-out_con:
-	monc->con->ops->put(monc->con);
 out_monmap:
 	kfree(monc->monmap);
 out:
@@ -841,10 +839,6 @@
 	mutex_lock(&monc->mutex);
 	__close_session(monc);
 
-	monc->con->private = NULL;
-	monc->con->ops->put(monc->con);
-	monc->con = NULL;
-
 	mutex_unlock(&monc->mutex);
 
 	/*
@@ -888,8 +882,8 @@
 	} else if (!was_auth && monc->auth->ops->is_authenticated(monc->auth)) {
 		dout("authenticated, starting session\n");
 
-		monc->client->msgr->inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
-		monc->client->msgr->inst.name.num =
+		monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
+		monc->client->msgr.inst.name.num =
 					cpu_to_le64(monc->auth->global_id);
 
 		__send_subscribe(monc);
@@ -1000,6 +994,8 @@
 	case CEPH_MSG_MDS_MAP:
 	case CEPH_MSG_OSD_MAP:
 		m = ceph_msg_new(type, front_len, GFP_NOFS, false);
+		if (!m)
+			return NULL;	/* ENOMEM--return skip == 0 */
 		break;
 	}
 
@@ -1029,7 +1025,7 @@
 	if (!monc->hunting)
 		pr_info("mon%d %s session lost, "
 			"hunting for new mon\n", monc->cur_mon,
-			ceph_pr_addr(&monc->con->peer_addr.in_addr));
+			ceph_pr_addr(&monc->con.peer_addr.in_addr));
 
 	__close_session(monc);
 	if (!monc->hunting) {
@@ -1044,9 +1040,23 @@
 	mutex_unlock(&monc->mutex);
 }
 
+/*
+ * We can ignore refcounting on the connection struct, as all references
+ * will come from the messenger workqueue, which is drained prior to
+ * mon_client destruction.
+ */
+static struct ceph_connection *con_get(struct ceph_connection *con)
+{
+	return con;
+}
+
+static void con_put(struct ceph_connection *con)
+{
+}
+
 static const struct ceph_connection_operations mon_con_ops = {
-	.get = ceph_con_get,
-	.put = ceph_con_put,
+	.get = con_get,
+	.put = con_put,
 	.dispatch = dispatch,
 	.fault = mon_fault,
 	.alloc_msg = mon_alloc_msg,
diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c
index 11d5f41..ddec1c1 100644
--- a/net/ceph/msgpool.c
+++ b/net/ceph/msgpool.c
@@ -12,7 +12,7 @@
 	struct ceph_msgpool *pool = arg;
 	struct ceph_msg *msg;
 
-	msg = ceph_msg_new(0, pool->front_len, gfp_mask, true);
+	msg = ceph_msg_new(pool->type, pool->front_len, gfp_mask, true);
 	if (!msg) {
 		dout("msgpool_alloc %s failed\n", pool->name);
 	} else {
@@ -32,10 +32,11 @@
 	ceph_msg_put(msg);
 }
 
-int ceph_msgpool_init(struct ceph_msgpool *pool,
+int ceph_msgpool_init(struct ceph_msgpool *pool, int type,
 		      int front_len, int size, bool blocking, const char *name)
 {
 	dout("msgpool %s init\n", name);
+	pool->type = type;
 	pool->front_len = front_len;
 	pool->pool = mempool_create(size, msgpool_alloc, msgpool_free, pool);
 	if (!pool->pool)
@@ -61,7 +62,7 @@
 		WARN_ON(1);
 
 		/* try to alloc a fresh message */
-		return ceph_msg_new(0, front_len, GFP_NOFS, false);
+		return ceph_msg_new(pool->type, front_len, GFP_NOFS, false);
 	}
 
 	msg = mempool_alloc(pool->pool, GFP_NOFS);
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index ca59e66..42119c0 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -140,10 +140,9 @@
 	if (req->r_request)
 		ceph_msg_put(req->r_request);
 	if (req->r_con_filling_msg) {
-		dout("release_request revoking pages %p from con %p\n",
+		dout("%s revoking pages %p from con %p\n", __func__,
 		     req->r_pages, req->r_con_filling_msg);
-		ceph_con_revoke_message(req->r_con_filling_msg,
-				      req->r_reply);
+		ceph_msg_revoke_incoming(req->r_reply);
 		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
 	}
 	if (req->r_reply)
@@ -214,10 +213,13 @@
 	kref_init(&req->r_kref);
 	init_completion(&req->r_completion);
 	init_completion(&req->r_safe_completion);
+	rb_init_node(&req->r_node);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 	INIT_LIST_HEAD(&req->r_linger_item);
 	INIT_LIST_HEAD(&req->r_linger_osd);
 	INIT_LIST_HEAD(&req->r_req_lru_item);
+	INIT_LIST_HEAD(&req->r_osd_item);
+
 	req->r_flags = flags;
 
 	WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
@@ -243,6 +245,7 @@
 		}
 		ceph_pagelist_init(req->r_trail);
 	}
+
 	/* create request message; allow space for oid */
 	msg_size += MAX_OBJ_NAME_SIZE;
 	if (snapc)
@@ -256,7 +259,6 @@
 		return NULL;
 	}
 
-	msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
 	memset(msg->front.iov_base, 0, msg->front.iov_len);
 
 	req->r_request = msg;
@@ -624,7 +626,7 @@
 /*
  * Track open sessions with osds.
  */
-static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
+static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
 {
 	struct ceph_osd *osd;
 
@@ -634,15 +636,13 @@
 
 	atomic_set(&osd->o_ref, 1);
 	osd->o_osdc = osdc;
+	osd->o_osd = onum;
 	INIT_LIST_HEAD(&osd->o_requests);
 	INIT_LIST_HEAD(&osd->o_linger_requests);
 	INIT_LIST_HEAD(&osd->o_osd_lru);
 	osd->o_incarnation = 1;
 
-	ceph_con_init(osdc->client->msgr, &osd->o_con);
-	osd->o_con.private = osd;
-	osd->o_con.ops = &osd_con_ops;
-	osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
+	ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
 
 	INIT_LIST_HEAD(&osd->o_keepalive_item);
 	return osd;
@@ -688,7 +688,7 @@
 
 static void remove_all_osds(struct ceph_osd_client *osdc)
 {
-	dout("__remove_old_osds %p\n", osdc);
+	dout("%s %p\n", __func__, osdc);
 	mutex_lock(&osdc->request_mutex);
 	while (!RB_EMPTY_ROOT(&osdc->osds)) {
 		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
@@ -752,7 +752,8 @@
 		ret = -EAGAIN;
 	} else {
 		ceph_con_close(&osd->o_con);
-		ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
+		ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
+			      &osdc->osdmap->osd_addr[osd->o_osd]);
 		osd->o_incarnation++;
 	}
 	return ret;
@@ -853,7 +854,7 @@
 
 	if (req->r_osd) {
 		/* make sure the original request isn't in flight. */
-		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+		ceph_msg_revoke(req->r_request);
 
 		list_del_init(&req->r_osd_item);
 		if (list_empty(&req->r_osd->o_requests) &&
@@ -880,7 +881,7 @@
 static void __cancel_request(struct ceph_osd_request *req)
 {
 	if (req->r_sent && req->r_osd) {
-		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
+		ceph_msg_revoke(req->r_request);
 		req->r_sent = 0;
 	}
 }
@@ -890,7 +891,9 @@
 {
 	dout("__register_linger_request %p\n", req);
 	list_add_tail(&req->r_linger_item, &osdc->req_linger);
-	list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests);
+	if (req->r_osd)
+		list_add_tail(&req->r_linger_osd,
+			      &req->r_osd->o_linger_requests);
 }
 
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
@@ -998,18 +1001,18 @@
 	req->r_osd = __lookup_osd(osdc, o);
 	if (!req->r_osd && o >= 0) {
 		err = -ENOMEM;
-		req->r_osd = create_osd(osdc);
+		req->r_osd = create_osd(osdc, o);
 		if (!req->r_osd) {
 			list_move(&req->r_req_lru_item, &osdc->req_notarget);
 			goto out;
 		}
 
 		dout("map_request osd %p is osd%d\n", req->r_osd, o);
-		req->r_osd->o_osd = o;
-		req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
 		__insert_osd(osdc, req->r_osd);
 
-		ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
+		ceph_con_open(&req->r_osd->o_con,
+			      CEPH_ENTITY_TYPE_OSD, o,
+			      &osdc->osdmap->osd_addr[o]);
 	}
 
 	if (req->r_osd) {
@@ -1304,8 +1307,9 @@
 
 	dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
 	mutex_lock(&osdc->request_mutex);
-	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
+	for (p = rb_first(&osdc->requests); p; ) {
 		req = rb_entry(p, struct ceph_osd_request, r_node);
+		p = rb_next(p);
 		err = __map_request(osdc, req, force_resend);
 		if (err < 0)
 			continue;  /* error */
@@ -1313,10 +1317,23 @@
 			dout("%p tid %llu maps to no osd\n", req, req->r_tid);
 			needmap++;  /* request a newer map */
 		} else if (err > 0) {
-			dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
-			     req->r_osd ? req->r_osd->o_osd : -1);
-			if (!req->r_linger)
+			if (!req->r_linger) {
+				dout("%p tid %llu requeued on osd%d\n", req,
+				     req->r_tid,
+				     req->r_osd ? req->r_osd->o_osd : -1);
 				req->r_flags |= CEPH_OSD_FLAG_RETRY;
+			}
+		}
+		if (req->r_linger && list_empty(&req->r_linger_item)) {
+			/*
+			 * register as a linger so that we will
+			 * re-submit below and get a new tid
+			 */
+			dout("%p tid %llu restart on osd%d\n",
+			     req, req->r_tid,
+			     req->r_osd ? req->r_osd->o_osd : -1);
+			__register_linger_request(osdc, req);
+			__unregister_request(osdc, req);
 		}
 	}
 
@@ -1391,7 +1408,7 @@
 			     epoch, maplen);
 			newmap = osdmap_apply_incremental(&p, next,
 							  osdc->osdmap,
-							  osdc->client->msgr);
+							  &osdc->client->msgr);
 			if (IS_ERR(newmap)) {
 				err = PTR_ERR(newmap);
 				goto bad;
@@ -1839,11 +1856,12 @@
 	if (!osdc->req_mempool)
 		goto out;
 
-	err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
+	err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
+				OSD_OP_FRONT_LEN, 10, true,
 				"osd_op");
 	if (err < 0)
 		goto out_mempool;
-	err = ceph_msgpool_init(&osdc->msgpool_op_reply,
+	err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
 				OSD_OPREPLY_FRONT_LEN, 10, true,
 				"osd_op_reply");
 	if (err < 0)
@@ -2019,15 +2037,15 @@
 	if (!req) {
 		*skip = 1;
 		m = NULL;
-		pr_info("get_reply unknown tid %llu from osd%d\n", tid,
-			osd->o_osd);
+		dout("get_reply unknown tid %llu from osd%d\n", tid,
+		     osd->o_osd);
 		goto out;
 	}
 
 	if (req->r_con_filling_msg) {
-		dout("get_reply revoking msg %p from old con %p\n",
+		dout("%s revoking msg %p from old con %p\n", __func__,
 		     req->r_reply, req->r_con_filling_msg);
-		ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+		ceph_msg_revoke_incoming(req->r_reply);
 		req->r_con_filling_msg->ops->put(req->r_con_filling_msg);
 		req->r_con_filling_msg = NULL;
 	}
@@ -2080,6 +2098,7 @@
 	int type = le16_to_cpu(hdr->type);
 	int front = le32_to_cpu(hdr->front_len);
 
+	*skip = 0;
 	switch (type) {
 	case CEPH_MSG_OSD_MAP:
 	case CEPH_MSG_WATCH_NOTIFY:
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 81e3b84..3124b71 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -135,6 +135,21 @@
 	return -EINVAL;
 }
 
+static int skip_name_map(void **p, void *end)
+{
+        int len;
+        ceph_decode_32_safe(p, end, len ,bad);
+        while (len--) {
+                int strlen;
+                *p += sizeof(u32);
+                ceph_decode_32_safe(p, end, strlen, bad);
+                *p += strlen;
+}
+        return 0;
+bad:
+        return -EINVAL;
+}
+
 static struct crush_map *crush_decode(void *pbyval, void *end)
 {
 	struct crush_map *c;
@@ -143,6 +158,7 @@
 	void **p = &pbyval;
 	void *start = pbyval;
 	u32 magic;
+	u32 num_name_maps;
 
 	dout("crush_decode %p to %p len %d\n", *p, end, (int)(end - *p));
 
@@ -150,6 +166,11 @@
 	if (c == NULL)
 		return ERR_PTR(-ENOMEM);
 
+        /* set tunables to default values */
+        c->choose_local_tries = 2;
+        c->choose_local_fallback_tries = 5;
+        c->choose_total_tries = 19;
+
 	ceph_decode_need(p, end, 4*sizeof(u32), bad);
 	magic = ceph_decode_32(p);
 	if (magic != CRUSH_MAGIC) {
@@ -297,7 +318,25 @@
 	}
 
 	/* ignore trailing name maps. */
+        for (num_name_maps = 0; num_name_maps < 3; num_name_maps++) {
+                err = skip_name_map(p, end);
+                if (err < 0)
+                        goto done;
+        }
 
+        /* tunables */
+        ceph_decode_need(p, end, 3*sizeof(u32), done);
+        c->choose_local_tries = ceph_decode_32(p);
+        c->choose_local_fallback_tries =  ceph_decode_32(p);
+        c->choose_total_tries = ceph_decode_32(p);
+        dout("crush decode tunable choose_local_tries = %d",
+             c->choose_local_tries);
+        dout("crush decode tunable choose_local_fallback_tries = %d",
+             c->choose_local_fallback_tries);
+        dout("crush decode tunable choose_total_tries = %d",
+             c->choose_total_tries);
+
+done:
 	dout("crush_decode success\n");
 	return c;
 
@@ -488,15 +527,16 @@
 		ceph_decode_32_safe(p, end, pool, bad);
 		ceph_decode_32_safe(p, end, len, bad);
 		dout("  pool %d len %d\n", pool, len);
+		ceph_decode_need(p, end, len, bad);
 		pi = __lookup_pg_pool(&map->pg_pools, pool);
 		if (pi) {
+			char *name = kstrndup(*p, len, GFP_NOFS);
+
+			if (!name)
+				return -ENOMEM;
 			kfree(pi->name);
-			pi->name = kmalloc(len + 1, GFP_NOFS);
-			if (pi->name) {
-				memcpy(pi->name, *p, len);
-				pi->name[len] = '\0';
-				dout("  name is %s\n", pi->name);
-			}
+			pi->name = name;
+			dout("  name is %s\n", pi->name);
 		}
 		*p += len;
 	}
@@ -666,6 +706,9 @@
 		ceph_decode_need(p, end, sizeof(u32) + sizeof(u64), bad);
 		ceph_decode_copy(p, &pgid, sizeof(pgid));
 		n = ceph_decode_32(p);
+		err = -EINVAL;
+		if (n > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
+			goto bad;
 		ceph_decode_need(p, end, n * sizeof(u32), bad);
 		err = -ENOMEM;
 		pg = kmalloc(sizeof(*pg) + n*sizeof(u32), GFP_NOFS);
@@ -889,6 +932,10 @@
 			(void) __remove_pg_mapping(&map->pg_temp, pgid);
 
 			/* insert */
+			if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) {
+				err = -EINVAL;
+				goto bad;
+			}
 			pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
 			if (!pg) {
 				err = -ENOMEM;