libceph, rbd: ceph_osd_linger_request, watch/notify v2

This adds support and switches rbd to a new, more reliable version of
watch/notify protocol.  As with the OSD client update, this is mostly
about getting the right structures linked into the right places so that
reconnects are properly sent when needed.  watch/notify v2 also
requires sending regular pings to the OSDs - send_linger_ping().

A major change from the old watch/notify implementation is the
introduction of ceph_osd_linger_request - linger requests no longer
piggy back on ceph_osd_request.  ceph_osd_event has been merged into
ceph_osd_linger_request.

All the details are now hidden within libceph, the interface consists
of a simple pair of watch/unwatch functions and ceph_osdc_notify_ack().
ceph_osdc_watch() does return ceph_osd_linger_request, but only to keep
the lifetime management simple.

ceph_osdc_notify_ack() accepts an optional data payload, which is
relayed back to the notifier.

Portions of this patch are loosely based on work by Douglas Fuller
<dfuller@redhat.com> and Mike Christie <michaelc@cs.wisc.edu>.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index fce23dc..d0834c4 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -351,11 +351,11 @@
 	struct rbd_options	*opts;
 
 	struct ceph_object_id	header_oid;
+	struct ceph_object_locator header_oloc;
 
 	struct ceph_file_layout	layout;
 
-	struct ceph_osd_event   *watch_event;
-	struct rbd_obj_request	*watch_request;
+	struct ceph_osd_linger_request *watch_handle;
 
 	struct rbd_spec		*parent_spec;
 	u64			parent_overlap;
@@ -1596,12 +1596,6 @@
 	return __rbd_obj_request_wait(obj_request, 0);
 }
 
-static int rbd_obj_request_wait_timeout(struct rbd_obj_request *obj_request,
-					unsigned long timeout)
-{
-	return __rbd_obj_request_wait(obj_request, timeout);
-}
-
 static void rbd_img_request_complete(struct rbd_img_request *img_request)
 {
 
@@ -1751,12 +1745,6 @@
 		complete_all(&obj_request->completion);
 }
 
-static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
-{
-	dout("%s: obj %p\n", __func__, obj_request);
-	obj_request_done_set(obj_request);
-}
-
 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
 {
 	struct rbd_img_request *img_request = NULL;
@@ -1877,10 +1865,6 @@
 	case CEPH_OSD_OP_CALL:
 		rbd_osd_call_callback(obj_request);
 		break;
-	case CEPH_OSD_OP_NOTIFY_ACK:
-	case CEPH_OSD_OP_WATCH:
-		rbd_osd_trivial_callback(obj_request);
-		break;
 	default:
 		rbd_warn(NULL, "%s: unsupported op %hu",
 			obj_request->object_name, (unsigned short) opcode);
@@ -3100,45 +3084,18 @@
 	obj_request_done_set(obj_request);
 }
 
-static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
+static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
+
+static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
+			 u64 notifier_id, void *data, size_t data_len)
 {
-	struct rbd_obj_request *obj_request;
+	struct rbd_device *rbd_dev = arg;
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	int ret;
 
-	obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0,
-							OBJ_REQUEST_NODATA);
-	if (!obj_request)
-		return -ENOMEM;
-
-	ret = -ENOMEM;
-	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
-						  obj_request);
-	if (!obj_request->osd_req)
-		goto out;
-
-	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
-					notify_id, 0, 0);
-	rbd_osd_req_format_read(obj_request);
-
-	ret = rbd_obj_request_submit(osdc, obj_request);
-	if (ret)
-		goto out;
-	ret = rbd_obj_request_wait(obj_request);
-out:
-	rbd_obj_request_put(obj_request);
-
-	return ret;
-}
-
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
-{
-	struct rbd_device *rbd_dev = (struct rbd_device *)data;
-	int ret;
-
-	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
-		rbd_dev->header_oid.name, (unsigned long long)notify_id,
-		(unsigned int)opcode);
+	dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev,
+	     cookie, notify_id);
 
 	/*
 	 * Until adequate refresh error handling is in place, there is
@@ -3150,63 +3107,31 @@
 	if (ret)
 		rbd_warn(rbd_dev, "refresh failed: %d", ret);
 
-	ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
+	ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
+				   &rbd_dev->header_oloc, notify_id, cookie,
+				   NULL, 0);
 	if (ret)
 		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
 }
 
-/*
- * Send a (un)watch request and wait for the ack.  Return a request
- * with a ref held on success or error.
- */
-static struct rbd_obj_request *rbd_obj_watch_request_helper(
-						struct rbd_device *rbd_dev,
-						bool watch)
+static void rbd_watch_errcb(void *arg, u64 cookie, int err)
 {
-	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-	struct ceph_options *opts = osdc->client->options;
-	struct rbd_obj_request *obj_request;
+	struct rbd_device *rbd_dev = arg;
 	int ret;
 
-	obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0,
-					     OBJ_REQUEST_NODATA);
-	if (!obj_request)
-		return ERR_PTR(-ENOMEM);
+	rbd_warn(rbd_dev, "encountered watch error: %d", err);
 
-	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1,
-						  obj_request);
-	if (!obj_request->osd_req) {
-		ret = -ENOMEM;
-		goto out;
-	}
+	__rbd_dev_header_unwatch_sync(rbd_dev);
 
-	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-			      rbd_dev->watch_event->cookie, 0, watch);
-	rbd_osd_req_format_write(obj_request);
-
-	if (watch)
-		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
-
-	ret = rbd_obj_request_submit(osdc, obj_request);
-	if (ret)
-		goto out;
-
-	ret = rbd_obj_request_wait_timeout(obj_request, opts->mount_timeout);
-	if (ret)
-		goto out;
-
-	ret = obj_request->result;
+	ret = rbd_dev_header_watch_sync(rbd_dev);
 	if (ret) {
-		if (watch)
-			rbd_obj_request_end(obj_request);
-		goto out;
+		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
+		return;
 	}
 
-	return obj_request;
-
-out:
-	rbd_obj_request_put(obj_request);
-	return ERR_PTR(ret);
+	ret = rbd_dev_refresh(rbd_dev);
+	if (ret)
+		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
 }
 
 /*
@@ -3215,57 +3140,33 @@
 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-	struct rbd_obj_request *obj_request;
-	int ret;
+	struct ceph_osd_linger_request *handle;
 
-	rbd_assert(!rbd_dev->watch_event);
-	rbd_assert(!rbd_dev->watch_request);
+	rbd_assert(!rbd_dev->watch_handle);
 
-	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
-				     &rbd_dev->watch_event);
-	if (ret < 0)
-		return ret;
+	handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
+				 &rbd_dev->header_oloc, rbd_watch_cb,
+				 rbd_watch_errcb, rbd_dev);
+	if (IS_ERR(handle))
+		return PTR_ERR(handle);
 
-	obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
-	if (IS_ERR(obj_request)) {
-		ceph_osdc_cancel_event(rbd_dev->watch_event);
-		rbd_dev->watch_event = NULL;
-		return PTR_ERR(obj_request);
-	}
-
-	/*
-	 * A watch request is set to linger, so the underlying osd
-	 * request won't go away until we unregister it.  We retain
-	 * a pointer to the object request during that time (in
-	 * rbd_dev->watch_request), so we'll keep a reference to it.
-	 * We'll drop that reference after we've unregistered it in
-	 * rbd_dev_header_unwatch_sync().
-	 */
-	rbd_dev->watch_request = obj_request;
-
+	rbd_dev->watch_handle = handle;
 	return 0;
 }
 
 static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
 {
-	struct rbd_obj_request *obj_request;
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	int ret;
 
-	rbd_assert(rbd_dev->watch_event);
-	rbd_assert(rbd_dev->watch_request);
+	if (!rbd_dev->watch_handle)
+		return;
 
-	rbd_obj_request_end(rbd_dev->watch_request);
-	rbd_obj_request_put(rbd_dev->watch_request);
-	rbd_dev->watch_request = NULL;
+	ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
+	if (ret)
+		rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
 
-	obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
-	if (!IS_ERR(obj_request))
-		rbd_obj_request_put(obj_request);
-	else
-		rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
-			 PTR_ERR(obj_request));
-
-	ceph_osdc_cancel_event(rbd_dev->watch_event);
-	rbd_dev->watch_event = NULL;
+	rbd_dev->watch_handle = NULL;
 }
 
 /*
@@ -4081,6 +3982,7 @@
 	init_rwsem(&rbd_dev->header_rwsem);
 
 	ceph_oid_init(&rbd_dev->header_oid);
+	ceph_oloc_init(&rbd_dev->header_oloc);
 
 	rbd_dev->dev.bus = &rbd_bus_type;
 	rbd_dev->dev.type = &rbd_device_type;
@@ -5285,6 +5187,7 @@
 
 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 
+	rbd_dev->header_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
 	if (rbd_dev->image_format == 1)
 		ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
 				       spec->image_name, RBD_SUFFIX);
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index 37f28bf..3b911ff 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -153,8 +153,9 @@
 
 /* watch-notify operations */
 enum {
-  WATCH_NOTIFY				= 1, /* notifying watcher */
-  WATCH_NOTIFY_COMPLETE			= 2, /* notifier notified when done */
+	CEPH_WATCH_EVENT_NOTIFY		  = 1, /* notifying watcher */
+	CEPH_WATCH_EVENT_NOTIFY_COMPLETE  = 2, /* notifier notified when done */
+	CEPH_WATCH_EVENT_DISCONNECT       = 3, /* we were disconnected */
 };
 
 
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 342f22f..cd2dcb8 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -34,7 +34,7 @@
 	struct rb_node o_node;
 	struct ceph_connection o_con;
 	struct rb_root o_requests;
-	struct list_head o_linger_requests;
+	struct rb_root o_linger_requests;
 	struct list_head o_osd_lru;
 	struct ceph_auth_handshake o_auth;
 	unsigned long lru_ttl;
@@ -108,12 +108,13 @@
 		} cls;
 		struct {
 			u64 cookie;
-			u64 ver;
-			u32 prot_ver;
-			u32 timeout;
-			__u8 flag;
+			__u8 op;           /* CEPH_OSD_WATCH_OP_ */
+			u32 gen;
 		} watch;
 		struct {
+			struct ceph_osd_data request_data;
+		} notify_ack;
+		struct {
 			u64 expected_object_size;
 			u64 expected_write_size;
 		} alloc_hint;
@@ -145,8 +146,6 @@
 struct ceph_osd_request {
 	u64             r_tid;              /* unique for this client */
 	struct rb_node  r_node;
-	struct list_head r_linger_item;
-	struct list_head r_linger_osd_item;
 	struct ceph_osd *r_osd;
 
 	struct ceph_osd_request_target r_t;
@@ -162,7 +161,6 @@
 
 	int               r_result;
 	bool              r_got_reply;
-	int		  r_linger;
 
 	struct ceph_osd_client *r_osdc;
 	struct kref       r_kref;
@@ -181,6 +179,7 @@
 	struct ceph_snap_context *r_snapc;    /* for writes */
 	struct timespec r_mtime;              /* ditto */
 	u64 r_data_offset;                    /* ditto */
+	bool r_linger;                        /* don't resend on failure */
 
 	/* internal */
 	unsigned long r_stamp;                /* jiffies, send or check time */
@@ -195,23 +194,40 @@
 	struct ceph_object_locator oloc;
 };
 
-struct ceph_osd_event {
-	u64 cookie;
-	int one_shot;
-	struct ceph_osd_client *osdc;
-	void (*cb)(u64, u64, u8, void *);
-	void *data;
-	struct rb_node node;
-	struct list_head osd_node;
-	struct kref kref;
-};
+typedef void (*rados_watchcb2_t)(void *arg, u64 notify_id, u64 cookie,
+				 u64 notifier_id, void *data, size_t data_len);
+typedef void (*rados_watcherrcb_t)(void *arg, u64 cookie, int err);
 
-struct ceph_osd_event_work {
-	struct work_struct work;
-	struct ceph_osd_event *event;
-        u64 ver;
-        u64 notify_id;
-        u8 opcode;
+struct ceph_osd_linger_request {
+	struct ceph_osd_client *osdc;
+	u64 linger_id;
+	bool committed;
+
+	struct ceph_osd *osd;
+	struct ceph_osd_request *reg_req;
+	struct ceph_osd_request *ping_req;
+	unsigned long ping_sent;
+
+	struct ceph_osd_request_target t;
+	u32 last_force_resend;
+
+	struct timespec mtime;
+
+	struct kref kref;
+	struct mutex lock;
+	struct rb_node node;            /* osd */
+	struct rb_node osdc_node;       /* osdc */
+	struct list_head scan_item;
+
+	struct completion reg_commit_wait;
+	int reg_commit_error;
+	int last_error;
+
+	u32 register_gen;
+
+	rados_watchcb2_t wcb;
+	rados_watcherrcb_t errcb;
+	void *data;
 };
 
 struct ceph_osd_client {
@@ -223,9 +239,10 @@
 	struct rb_root         osds;          /* osds */
 	struct list_head       osd_lru;       /* idle osds */
 	spinlock_t             osd_lru_lock;
-	struct list_head       req_linger;    /* lingering requests */
 	struct ceph_osd        homeless_osd;
 	atomic64_t             last_tid;      /* tid of last request */
+	u64                    last_linger_id;
+	struct rb_root         linger_requests; /* lingering requests */
 	atomic_t               num_requests;
 	atomic_t               num_homeless;
 	struct delayed_work    timeout_work;
@@ -239,10 +256,6 @@
 	struct ceph_msgpool	msgpool_op;
 	struct ceph_msgpool	msgpool_op_reply;
 
-	spinlock_t		event_lock;
-	struct rb_root		event_tree;
-	u64			event_count;
-
 	struct workqueue_struct	*notify_wq;
 };
 
@@ -314,9 +327,6 @@
 extern int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
 				 u16 opcode, const char *name, const void *value,
 				 size_t size, u8 cmp_op, u8 cmp_mode);
-extern void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
-					unsigned int which, u16 opcode,
-					u64 cookie, u64 version, int flag);
 extern void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
 				       unsigned int which,
 				       u64 expected_object_size,
@@ -339,9 +349,6 @@
 				      u32 truncate_seq, u64 truncate_size,
 				      bool use_mempool);
 
-extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
-					 struct ceph_osd_request *req);
-
 extern void ceph_osdc_get_request(struct ceph_osd_request *req);
 extern void ceph_osdc_put_request(struct ceph_osd_request *req);
 
@@ -372,11 +379,23 @@
 				struct timespec *mtime,
 				struct page **pages, int nr_pages);
 
-/* watch/notify events */
-extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-				  void (*event_cb)(u64, u64, u8, void *),
-				  void *data, struct ceph_osd_event **pevent);
-extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
-extern void ceph_osdc_put_event(struct ceph_osd_event *event);
+/* watch/notify */
+struct ceph_osd_linger_request *
+ceph_osdc_watch(struct ceph_osd_client *osdc,
+		struct ceph_object_id *oid,
+		struct ceph_object_locator *oloc,
+		rados_watchcb2_t wcb,
+		rados_watcherrcb_t errcb,
+		void *data);
+int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
+		      struct ceph_osd_linger_request *lreq);
+
+int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
+			 struct ceph_object_id *oid,
+			 struct ceph_object_locator *oloc,
+			 u64 notify_id,
+			 u64 cookie,
+			 void *payload,
+			 size_t payload_len);
 #endif
 
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index 28740a5..204c8c9 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -427,7 +427,17 @@
 	CEPH_OSD_CMPXATTR_MODE_U64    = 2
 };
 
-#define RADOS_NOTIFY_VER	1
+enum {
+	CEPH_OSD_WATCH_OP_UNWATCH = 0,
+	CEPH_OSD_WATCH_OP_LEGACY_WATCH = 1,
+	/* note: use only ODD ids to prevent pre-giant code from
+	   interpreting the op as UNWATCH */
+	CEPH_OSD_WATCH_OP_WATCH = 3,
+	CEPH_OSD_WATCH_OP_RECONNECT = 5,
+	CEPH_OSD_WATCH_OP_PING = 7,
+};
+
+const char *ceph_osd_watch_op_name(int o);
 
 /*
  * an individual object operation.  each may be accompanied by some data
@@ -462,8 +472,9 @@
 	        } __attribute__ ((packed)) snap;
 		struct {
 			__le64 cookie;
-			__le64 ver;
-			__u8 flag;	/* 0 = unwatch, 1 = watch */
+			__le64 ver;     /* no longer used */
+			__u8 op;	/* CEPH_OSD_WATCH_OP_* */
+			__le32 gen;     /* registration generation */
 		} __attribute__ ((packed)) watch;
 		struct {
 			__le64 offset, length;
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 139a9cb..3773a4f 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -27,6 +27,22 @@
 	}
 }
 
+const char *ceph_osd_watch_op_name(int o)
+{
+	switch (o) {
+	case CEPH_OSD_WATCH_OP_UNWATCH:
+		return "unwatch";
+	case CEPH_OSD_WATCH_OP_WATCH:
+		return "watch";
+	case CEPH_OSD_WATCH_OP_RECONNECT:
+		return "reconnect";
+	case CEPH_OSD_WATCH_OP_PING:
+		return "ping";
+	default:
+		return "???";
+	}
+}
+
 const char *ceph_osd_state_name(int s)
 {
 	switch (s) {
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 61dbd9d..e64cb85 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -177,6 +177,9 @@
 
 		seq_printf(s, "%s%s", (i == 0 ? "\t" : ","),
 			   ceph_osd_op_name(op->op));
+		if (op->op == CEPH_OSD_OP_WATCH)
+			seq_printf(s, "-%s",
+				   ceph_osd_watch_op_name(op->watch.op));
 	}
 
 	seq_putc(s, '\n');
@@ -197,6 +200,31 @@
 	mutex_unlock(&osd->lock);
 }
 
+static void dump_linger_request(struct seq_file *s,
+				struct ceph_osd_linger_request *lreq)
+{
+	seq_printf(s, "%llu\t", lreq->linger_id);
+	dump_target(s, &lreq->t);
+
+	seq_printf(s, "\t%u\t%s/%d\n", lreq->register_gen,
+		   lreq->committed ? "C" : "", lreq->last_error);
+}
+
+static void dump_linger_requests(struct seq_file *s, struct ceph_osd *osd)
+{
+	struct rb_node *n;
+
+	mutex_lock(&osd->lock);
+	for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
+		struct ceph_osd_linger_request *lreq =
+		    rb_entry(n, struct ceph_osd_linger_request, node);
+
+		dump_linger_request(s, lreq);
+	}
+
+	mutex_unlock(&osd->lock);
+}
+
 static int osdc_show(struct seq_file *s, void *pp)
 {
 	struct ceph_client *client = s->private;
@@ -214,6 +242,14 @@
 	}
 	dump_requests(s, &osdc->homeless_osd);
 
+	seq_puts(s, "LINGER REQUESTS\n");
+	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+		dump_linger_requests(s, osd);
+	}
+	dump_linger_requests(s, &osdc->homeless_osd);
+
 	up_read(&osdc->lock);
 	return 0;
 }
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index ef1bcbe..ca0a7b5 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -45,6 +45,10 @@
 
 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
+static void link_linger(struct ceph_osd *osd,
+			struct ceph_osd_linger_request *lreq);
+static void unlink_linger(struct ceph_osd *osd,
+			  struct ceph_osd_linger_request *lreq);
 
 #if 1
 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
@@ -74,10 +78,15 @@
 		  rwsem_is_locked(&osdc->lock)) &&
 		!rwsem_is_wrlocked(&osdc->lock));
 }
+static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
+{
+	WARN_ON(!mutex_is_locked(&lreq->lock));
+}
 #else
 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
 static inline void verify_osd_locked(struct ceph_osd *osd) { }
+static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
 #endif
 
 /*
@@ -322,6 +331,9 @@
 	case CEPH_OSD_OP_STAT:
 		ceph_osd_data_release(&op->raw_data_in);
 		break;
+	case CEPH_OSD_OP_NOTIFY_ACK:
+		ceph_osd_data_release(&op->notify_ack.request_data);
+		break;
 	default:
 		break;
 	}
@@ -345,6 +357,29 @@
 	t->osd = CEPH_HOMELESS_OSD;
 }
 
+static void target_copy(struct ceph_osd_request_target *dest,
+			const struct ceph_osd_request_target *src)
+{
+	ceph_oid_copy(&dest->base_oid, &src->base_oid);
+	ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
+	ceph_oid_copy(&dest->target_oid, &src->target_oid);
+	ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
+
+	dest->pgid = src->pgid; /* struct */
+	dest->pg_num = src->pg_num;
+	dest->pg_num_mask = src->pg_num_mask;
+	ceph_osds_copy(&dest->acting, &src->acting);
+	ceph_osds_copy(&dest->up, &src->up);
+	dest->size = src->size;
+	dest->min_size = src->min_size;
+	dest->sort_bitwise = src->sort_bitwise;
+
+	dest->flags = src->flags;
+	dest->paused = src->paused;
+
+	dest->osd = src->osd;
+}
+
 static void target_destroy(struct ceph_osd_request_target *t)
 {
 	ceph_oid_destroy(&t->base_oid);
@@ -357,8 +392,6 @@
 static void request_release_checks(struct ceph_osd_request *req)
 {
 	WARN_ON(!RB_EMPTY_NODE(&req->r_node));
-	WARN_ON(!list_empty(&req->r_linger_item));
-	WARN_ON(!list_empty(&req->r_linger_osd_item));
 	WARN_ON(!list_empty(&req->r_unsafe_item));
 	WARN_ON(req->r_osd);
 }
@@ -419,13 +452,48 @@
 	init_completion(&req->r_completion);
 	init_completion(&req->r_safe_completion);
 	RB_CLEAR_NODE(&req->r_node);
-	INIT_LIST_HEAD(&req->r_linger_item);
-	INIT_LIST_HEAD(&req->r_linger_osd_item);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 
 	target_init(&req->r_t);
 }
 
+/*
+ * This is ugly, but it allows us to reuse linger registration and ping
+ * requests, keeping the structure of the code around send_linger{_ping}()
+ * reasonable.  Setting up a min_nr=2 mempool for each linger request
+ * and dealing with copying ops (this blasts req only, watch op remains
+ * intact) isn't any better.
+ */
+static void request_reinit(struct ceph_osd_request *req)
+{
+	struct ceph_osd_client *osdc = req->r_osdc;
+	bool mempool = req->r_mempool;
+	unsigned int num_ops = req->r_num_ops;
+	u64 snapid = req->r_snapid;
+	struct ceph_snap_context *snapc = req->r_snapc;
+	bool linger = req->r_linger;
+	struct ceph_msg *request_msg = req->r_request;
+	struct ceph_msg *reply_msg = req->r_reply;
+
+	dout("%s req %p\n", __func__, req);
+	WARN_ON(atomic_read(&req->r_kref.refcount) != 1);
+	request_release_checks(req);
+
+	WARN_ON(atomic_read(&request_msg->kref.refcount) != 1);
+	WARN_ON(atomic_read(&reply_msg->kref.refcount) != 1);
+	target_destroy(&req->r_t);
+
+	request_init(req);
+	req->r_osdc = osdc;
+	req->r_mempool = mempool;
+	req->r_num_ops = num_ops;
+	req->r_snapid = snapid;
+	req->r_snapc = snapc;
+	req->r_linger = linger;
+	req->r_request = request_msg;
+	req->r_reply = reply_msg;
+}
+
 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
 					       struct ceph_snap_context *snapc,
 					       unsigned int num_ops,
@@ -681,21 +749,19 @@
 }
 EXPORT_SYMBOL(osd_req_op_xattr_init);
 
-void osd_req_op_watch_init(struct ceph_osd_request *osd_req,
-				unsigned int which, u16 opcode,
-				u64 cookie, u64 version, int flag)
+/*
+ * @watch_opcode: CEPH_OSD_WATCH_OP_*
+ */
+static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
+				  u64 cookie, u8 watch_opcode)
 {
-	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
-						      opcode, 0);
+	struct ceph_osd_req_op *op;
 
-	BUG_ON(opcode != CEPH_OSD_OP_NOTIFY_ACK && opcode != CEPH_OSD_OP_WATCH);
-
+	op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
 	op->watch.cookie = cookie;
-	op->watch.ver = version;
-	if (opcode == CEPH_OSD_OP_WATCH && flag)
-		op->watch.flag = (u8)1;
+	op->watch.op = watch_opcode;
+	op->watch.gen = 0;
 }
-EXPORT_SYMBOL(osd_req_op_watch_init);
 
 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
 				unsigned int which,
@@ -771,11 +837,13 @@
 		break;
 	case CEPH_OSD_OP_STARTSYNC:
 		break;
-	case CEPH_OSD_OP_NOTIFY_ACK:
 	case CEPH_OSD_OP_WATCH:
 		dst->watch.cookie = cpu_to_le64(src->watch.cookie);
-		dst->watch.ver = cpu_to_le64(src->watch.ver);
-		dst->watch.flag = src->watch.flag;
+		dst->watch.ver = cpu_to_le64(0);
+		dst->watch.op = src->watch.op;
+		dst->watch.gen = cpu_to_le32(src->watch.gen);
+		break;
+	case CEPH_OSD_OP_NOTIFY_ACK:
 		break;
 	case CEPH_OSD_OP_SETALLOCHINT:
 		dst->alloc_hint.expected_object_size =
@@ -915,7 +983,7 @@
 	atomic_set(&osd->o_ref, 1);
 	RB_CLEAR_NODE(&osd->o_node);
 	osd->o_requests = RB_ROOT;
-	INIT_LIST_HEAD(&osd->o_linger_requests);
+	osd->o_linger_requests = RB_ROOT;
 	INIT_LIST_HEAD(&osd->o_osd_lru);
 	INIT_LIST_HEAD(&osd->o_keepalive_item);
 	osd->o_incarnation = 1;
@@ -926,7 +994,7 @@
 {
 	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
 	WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
-	WARN_ON(!list_empty(&osd->o_linger_requests));
+	WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
 	WARN_ON(!list_empty(&osd->o_osd_lru));
 	WARN_ON(!list_empty(&osd->o_keepalive_item));
 
@@ -996,7 +1064,7 @@
 static void maybe_move_osd_to_lru(struct ceph_osd *osd)
 {
 	if (RB_EMPTY_ROOT(&osd->o_requests) &&
-	    list_empty(&osd->o_linger_requests))
+	    RB_EMPTY_ROOT(&osd->o_linger_requests))
 		__move_osd_to_lru(osd);
 }
 
@@ -1036,6 +1104,17 @@
 		unlink_request(osd, req);
 		link_request(&osdc->homeless_osd, req);
 	}
+	for (n = rb_first(&osd->o_linger_requests); n; ) {
+		struct ceph_osd_linger_request *lreq =
+		    rb_entry(n, struct ceph_osd_linger_request, node);
+
+		n = rb_next(n); /* unlink_linger() */
+
+		dout(" reassigning lreq %p linger_id %llu\n", lreq,
+		     lreq->linger_id);
+		unlink_linger(osd, lreq);
+		link_linger(&osdc->homeless_osd, lreq);
+	}
 
 	__remove_osd_from_lru(osd);
 	erase_osd(&osdc->osds, osd);
@@ -1052,7 +1131,7 @@
 	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
 
 	if (RB_EMPTY_ROOT(&osd->o_requests) &&
-	    list_empty(&osd->o_linger_requests)) {
+	    RB_EMPTY_ROOT(&osd->o_linger_requests)) {
 		close_osd(osd);
 		return -ENODEV;
 	}
@@ -1148,52 +1227,6 @@
 		atomic_dec(&osd->o_osdc->num_homeless);
 }
 
-static void __register_linger_request(struct ceph_osd *osd,
-				    struct ceph_osd_request *req)
-{
-	dout("%s %p tid %llu\n", __func__, req, req->r_tid);
-	WARN_ON(!req->r_linger);
-
-	ceph_osdc_get_request(req);
-	list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger);
-	list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests);
-	__remove_osd_from_lru(osd);
-	req->r_osd = osd;
-}
-
-static void __unregister_linger_request(struct ceph_osd_client *osdc,
-					struct ceph_osd_request *req)
-{
-	WARN_ON(!req->r_linger);
-
-	if (list_empty(&req->r_linger_item)) {
-		dout("%s %p tid %llu not registered\n", __func__, req,
-		     req->r_tid);
-		return;
-	}
-
-	dout("%s %p tid %llu\n", __func__, req, req->r_tid);
-	list_del_init(&req->r_linger_item);
-
-	if (req->r_osd) {
-		list_del_init(&req->r_linger_osd_item);
-		maybe_move_osd_to_lru(req->r_osd);
-		if (RB_EMPTY_ROOT(&req->r_osd->o_requests))
-			req->r_osd = NULL;
-	}
-	ceph_osdc_put_request(req);
-}
-
-void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
-				  struct ceph_osd_request *req)
-{
-	if (!req->r_linger) {
-		dout("set_request_linger %p\n", req);
-		req->r_linger = 1;
-	}
-}
-EXPORT_SYMBOL(ceph_osdc_set_request_linger);
-
 static bool __pool_full(struct ceph_pg_pool_info *pi)
 {
 	return pi->flags & CEPH_POOL_FLAG_FULL;
@@ -1379,6 +1412,10 @@
 						  op->xattr.value_len);
 			ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
 			break;
+		case CEPH_OSD_OP_NOTIFY_ACK:
+			ceph_osdc_msg_data_add(msg,
+					       &op->notify_ack.request_data);
+			break;
 
 		/* reply */
 		case CEPH_OSD_OP_STAT:
@@ -1684,6 +1721,460 @@
 }
 
 /*
+ * lingering requests, watch/notify v2 infrastructure
+ */
+static void linger_release(struct kref *kref)
+{
+	struct ceph_osd_linger_request *lreq =
+	    container_of(kref, struct ceph_osd_linger_request, kref);
+
+	dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
+	     lreq->reg_req, lreq->ping_req);
+	WARN_ON(!RB_EMPTY_NODE(&lreq->node));
+	WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
+	WARN_ON(!list_empty(&lreq->scan_item));
+	WARN_ON(lreq->osd);
+
+	if (lreq->reg_req)
+		ceph_osdc_put_request(lreq->reg_req);
+	if (lreq->ping_req)
+		ceph_osdc_put_request(lreq->ping_req);
+	target_destroy(&lreq->t);
+	kfree(lreq);
+}
+
+static void linger_put(struct ceph_osd_linger_request *lreq)
+{
+	if (lreq)
+		kref_put(&lreq->kref, linger_release);
+}
+
+static struct ceph_osd_linger_request *
+linger_get(struct ceph_osd_linger_request *lreq)
+{
+	kref_get(&lreq->kref);
+	return lreq;
+}
+
+static struct ceph_osd_linger_request *
+linger_alloc(struct ceph_osd_client *osdc)
+{
+	struct ceph_osd_linger_request *lreq;
+
+	lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
+	if (!lreq)
+		return NULL;
+
+	kref_init(&lreq->kref);
+	mutex_init(&lreq->lock);
+	RB_CLEAR_NODE(&lreq->node);
+	RB_CLEAR_NODE(&lreq->osdc_node);
+	INIT_LIST_HEAD(&lreq->scan_item);
+	init_completion(&lreq->reg_commit_wait);
+
+	lreq->osdc = osdc;
+	target_init(&lreq->t);
+
+	dout("%s lreq %p\n", __func__, lreq);
+	return lreq;
+}
+
+DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
+DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
+
+/*
+ * Create linger request <-> OSD session relation.
+ *
+ * @lreq has to be registered, @osd may be homeless.
+ */
+static void link_linger(struct ceph_osd *osd,
+			struct ceph_osd_linger_request *lreq)
+{
+	verify_osd_locked(osd);
+	WARN_ON(!lreq->linger_id || lreq->osd);
+	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
+	     osd->o_osd, lreq, lreq->linger_id);
+
+	if (!osd_homeless(osd))
+		__remove_osd_from_lru(osd);
+	else
+		atomic_inc(&osd->o_osdc->num_homeless);
+
+	get_osd(osd);
+	insert_linger(&osd->o_linger_requests, lreq);
+	lreq->osd = osd;
+}
+
+static void unlink_linger(struct ceph_osd *osd,
+			  struct ceph_osd_linger_request *lreq)
+{
+	verify_osd_locked(osd);
+	WARN_ON(lreq->osd != osd);
+	dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
+	     osd->o_osd, lreq, lreq->linger_id);
+
+	lreq->osd = NULL;
+	erase_linger(&osd->o_linger_requests, lreq);
+	put_osd(osd);
+
+	if (!osd_homeless(osd))
+		maybe_move_osd_to_lru(osd);
+	else
+		atomic_dec(&osd->o_osdc->num_homeless);
+}
+
+static bool __linger_registered(struct ceph_osd_linger_request *lreq)
+{
+	verify_osdc_locked(lreq->osdc);
+
+	return !RB_EMPTY_NODE(&lreq->osdc_node);
+}
+
+static bool linger_registered(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_client *osdc = lreq->osdc;
+	bool registered;
+
+	down_read(&osdc->lock);
+	registered = __linger_registered(lreq);
+	up_read(&osdc->lock);
+
+	return registered;
+}
+
+static void linger_register(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_client *osdc = lreq->osdc;
+
+	verify_osdc_wrlocked(osdc);
+	WARN_ON(lreq->linger_id);
+
+	linger_get(lreq);
+	lreq->linger_id = ++osdc->last_linger_id;
+	insert_linger_osdc(&osdc->linger_requests, lreq);
+}
+
+static void linger_unregister(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_client *osdc = lreq->osdc;
+
+	verify_osdc_wrlocked(osdc);
+
+	erase_linger_osdc(&osdc->linger_requests, lreq);
+	linger_put(lreq);
+}
+
+static void cancel_linger_request(struct ceph_osd_request *req)
+{
+	struct ceph_osd_linger_request *lreq = req->r_priv;
+
+	WARN_ON(!req->r_linger);
+	cancel_request(req);
+	linger_put(lreq);
+}
+
+struct linger_work {
+	struct work_struct work;
+	struct ceph_osd_linger_request *lreq;
+
+	union {
+		struct {
+			u64 notify_id;
+			u64 notifier_id;
+			void *payload; /* points into @msg front */
+			size_t payload_len;
+
+			struct ceph_msg *msg; /* for ceph_msg_put() */
+		} notify;
+		struct {
+			int err;
+		} error;
+	};
+};
+
+static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
+				       work_func_t workfn)
+{
+	struct linger_work *lwork;
+
+	lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
+	if (!lwork)
+		return NULL;
+
+	INIT_WORK(&lwork->work, workfn);
+	lwork->lreq = linger_get(lreq);
+
+	return lwork;
+}
+
+static void lwork_free(struct linger_work *lwork)
+{
+	struct ceph_osd_linger_request *lreq = lwork->lreq;
+
+	linger_put(lreq);
+	kfree(lwork);
+}
+
+static void lwork_queue(struct linger_work *lwork)
+{
+	struct ceph_osd_linger_request *lreq = lwork->lreq;
+	struct ceph_osd_client *osdc = lreq->osdc;
+
+	verify_lreq_locked(lreq);
+	queue_work(osdc->notify_wq, &lwork->work);
+}
+
+static void do_watch_notify(struct work_struct *w)
+{
+	struct linger_work *lwork = container_of(w, struct linger_work, work);
+	struct ceph_osd_linger_request *lreq = lwork->lreq;
+
+	if (!linger_registered(lreq)) {
+		dout("%s lreq %p not registered\n", __func__, lreq);
+		goto out;
+	}
+
+	dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
+	     __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
+	     lwork->notify.payload_len);
+	lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
+		  lwork->notify.notifier_id, lwork->notify.payload,
+		  lwork->notify.payload_len);
+
+out:
+	ceph_msg_put(lwork->notify.msg);
+	lwork_free(lwork);
+}
+
+static void do_watch_error(struct work_struct *w)
+{
+	struct linger_work *lwork = container_of(w, struct linger_work, work);
+	struct ceph_osd_linger_request *lreq = lwork->lreq;
+
+	if (!linger_registered(lreq)) {
+		dout("%s lreq %p not registered\n", __func__, lreq);
+		goto out;
+	}
+
+	dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
+	lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
+
+out:
+	lwork_free(lwork);
+}
+
+static void queue_watch_error(struct ceph_osd_linger_request *lreq)
+{
+	struct linger_work *lwork;
+
+	lwork = lwork_alloc(lreq, do_watch_error);
+	if (!lwork) {
+		pr_err("failed to allocate error-lwork\n");
+		return;
+	}
+
+	lwork->error.err = lreq->last_error;
+	lwork_queue(lwork);
+}
+
+static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
+				       int result)
+{
+	if (!completion_done(&lreq->reg_commit_wait)) {
+		lreq->reg_commit_error = (result <= 0 ? result : 0);
+		complete_all(&lreq->reg_commit_wait);
+	}
+}
+
+static void linger_commit_cb(struct ceph_osd_request *req)
+{
+	struct ceph_osd_linger_request *lreq = req->r_priv;
+
+	mutex_lock(&lreq->lock);
+	dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
+	     lreq->linger_id, req->r_result);
+	WARN_ON(!__linger_registered(lreq));
+	linger_reg_commit_complete(lreq, req->r_result);
+	lreq->committed = true;
+
+	mutex_unlock(&lreq->lock);
+	linger_put(lreq);
+}
+
+static int normalize_watch_error(int err)
+{
+	/*
+	 * Translate ENOENT -> ENOTCONN so that a delete->disconnection
+	 * notification and a failure to reconnect because we raced with
+	 * the delete appear the same to the user.
+	 */
+	if (err == -ENOENT)
+		err = -ENOTCONN;
+
+	return err;
+}
+
+static void linger_reconnect_cb(struct ceph_osd_request *req)
+{
+	struct ceph_osd_linger_request *lreq = req->r_priv;
+
+	mutex_lock(&lreq->lock);
+	dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
+	     lreq, lreq->linger_id, req->r_result, lreq->last_error);
+	if (req->r_result < 0) {
+		if (!lreq->last_error) {
+			lreq->last_error = normalize_watch_error(req->r_result);
+			queue_watch_error(lreq);
+		}
+	}
+
+	mutex_unlock(&lreq->lock);
+	linger_put(lreq);
+}
+
+static void send_linger(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_request *req = lreq->reg_req;
+	struct ceph_osd_req_op *op = &req->r_ops[0];
+
+	verify_osdc_wrlocked(req->r_osdc);
+	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
+
+	if (req->r_osd)
+		cancel_linger_request(req);
+
+	request_reinit(req);
+	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
+	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
+	req->r_flags = lreq->t.flags;
+	req->r_mtime = lreq->mtime;
+
+	mutex_lock(&lreq->lock);
+	if (lreq->committed) {
+		WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
+			op->watch.cookie != lreq->linger_id);
+		op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
+		op->watch.gen = ++lreq->register_gen;
+		dout("lreq %p reconnect register_gen %u\n", lreq,
+		     op->watch.gen);
+		req->r_callback = linger_reconnect_cb;
+	} else {
+		WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
+		dout("lreq %p register\n", lreq);
+		req->r_callback = linger_commit_cb;
+	}
+	mutex_unlock(&lreq->lock);
+
+	req->r_priv = linger_get(lreq);
+	req->r_linger = true;
+
+	submit_request(req, true);
+}
+
+static void linger_ping_cb(struct ceph_osd_request *req)
+{
+	struct ceph_osd_linger_request *lreq = req->r_priv;
+
+	mutex_lock(&lreq->lock);
+	dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
+	     __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
+	     lreq->last_error);
+	if (lreq->register_gen == req->r_ops[0].watch.gen) {
+		if (req->r_result && !lreq->last_error) {
+			lreq->last_error = normalize_watch_error(req->r_result);
+			queue_watch_error(lreq);
+		}
+	} else {
+		dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
+		     lreq->register_gen, req->r_ops[0].watch.gen);
+	}
+
+	mutex_unlock(&lreq->lock);
+	linger_put(lreq);
+}
+
+static void send_linger_ping(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_client *osdc = lreq->osdc;
+	struct ceph_osd_request *req = lreq->ping_req;
+	struct ceph_osd_req_op *op = &req->r_ops[0];
+
+	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
+		dout("%s PAUSERD\n", __func__);
+		return;
+	}
+
+	lreq->ping_sent = jiffies;
+	dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
+	     __func__, lreq, lreq->linger_id, lreq->ping_sent,
+	     lreq->register_gen);
+
+	if (req->r_osd)
+		cancel_linger_request(req);
+
+	request_reinit(req);
+	target_copy(&req->r_t, &lreq->t);
+
+	WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
+		op->watch.cookie != lreq->linger_id ||
+		op->watch.op != CEPH_OSD_WATCH_OP_PING);
+	op->watch.gen = lreq->register_gen;
+	req->r_callback = linger_ping_cb;
+	req->r_priv = linger_get(lreq);
+	req->r_linger = true;
+
+	ceph_osdc_get_request(req);
+	account_request(req);
+	req->r_tid = atomic64_inc_return(&osdc->last_tid);
+	link_request(lreq->osd, req);
+	send_request(req);
+}
+
+static void linger_submit(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_client *osdc = lreq->osdc;
+	struct ceph_osd *osd;
+
+	calc_target(osdc, &lreq->t, &lreq->last_force_resend, false);
+	osd = lookup_create_osd(osdc, lreq->t.osd, true);
+	link_linger(osd, lreq);
+
+	send_linger(lreq);
+}
+
+/*
+ * @lreq has to be both registered and linked.
+ */
+static void __linger_cancel(struct ceph_osd_linger_request *lreq)
+{
+	if (lreq->ping_req->r_osd)
+		cancel_linger_request(lreq->ping_req);
+	if (lreq->reg_req->r_osd)
+		cancel_linger_request(lreq->reg_req);
+	unlink_linger(lreq->osd, lreq);
+	linger_unregister(lreq);
+}
+
+static void linger_cancel(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_client *osdc = lreq->osdc;
+
+	down_write(&osdc->lock);
+	if (__linger_registered(lreq))
+		__linger_cancel(lreq);
+	up_write(&osdc->lock);
+}
+
+static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
+{
+	int ret;
+
+	dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
+	ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
+	return ret ?: lreq->reg_commit_error;
+}
+
+/*
  * Timeout callback, called every N seconds.  When 1 or more OSD
  * requests has been active for more than N seconds, we send a keepalive
  * (tag + timestamp) to its OSD to ensure any communications channel
@@ -1720,6 +2211,19 @@
 				found = true;
 			}
 		}
+		for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
+			struct ceph_osd_linger_request *lreq =
+			    rb_entry(p, struct ceph_osd_linger_request, node);
+
+			dout(" lreq %p linger_id %llu is served by osd%d\n",
+			     lreq, lreq->linger_id, osd->o_osd);
+			found = true;
+
+			mutex_lock(&lreq->lock);
+			if (lreq->committed && !lreq->last_error)
+				send_linger_ping(lreq);
+			mutex_unlock(&lreq->lock);
+		}
 
 		if (found)
 			list_move_tail(&osd->o_keepalive_item, &slow_osds);
@@ -1756,7 +2260,7 @@
 			break;
 
 		WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
-		WARN_ON(!list_empty(&osd->o_linger_requests));
+		WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
 		close_osd(osd);
 	}
 
@@ -2082,7 +2586,8 @@
 		__finish_request(req);
 		if (req->r_linger) {
 			WARN_ON(req->r_unsafe_callback);
-			__register_linger_request(osd, req);
+			dout("req %p tid %llu cb (locked)\n", req, req->r_tid);
+			__complete_request(req);
 		}
 	}
 
@@ -2093,7 +2598,7 @@
 		if (already_acked && req->r_unsafe_callback) {
 			dout("req %p tid %llu safe-cb\n", req, req->r_tid);
 			req->r_unsafe_callback(req, false);
-		} else {
+		} else if (!req->r_linger) {
 			dout("req %p tid %llu cb\n", req, req->r_tid);
 			__complete_request(req);
 		}
@@ -2145,6 +2650,26 @@
 	return pi->was_full && !__pool_full(pi);
 }
 
+static enum calc_target_result
+recalc_linger_target(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_client *osdc = lreq->osdc;
+	enum calc_target_result ct_res;
+
+	ct_res = calc_target(osdc, &lreq->t, &lreq->last_force_resend, true);
+	if (ct_res == CALC_TARGET_NEED_RESEND) {
+		struct ceph_osd *osd;
+
+		osd = lookup_create_osd(osdc, lreq->t.osd, true);
+		if (osd != lreq->osd) {
+			unlink_linger(lreq->osd, lreq);
+			link_linger(osd, lreq);
+		}
+	}
+
+	return ct_res;
+}
+
 /*
  * Requeue requests whose mapping to an OSD has changed.
  */
@@ -2159,6 +2684,39 @@
 	struct rb_node *n;
 	bool force_resend_writes;
 
+	for (n = rb_first(&osd->o_linger_requests); n; ) {
+		struct ceph_osd_linger_request *lreq =
+		    rb_entry(n, struct ceph_osd_linger_request, node);
+		enum calc_target_result ct_res;
+
+		n = rb_next(n); /* recalc_linger_target() */
+
+		dout("%s lreq %p linger_id %llu\n", __func__, lreq,
+		     lreq->linger_id);
+		ct_res = recalc_linger_target(lreq);
+		switch (ct_res) {
+		case CALC_TARGET_NO_ACTION:
+			force_resend_writes = cleared_full ||
+			    (check_pool_cleared_full &&
+			     pool_cleared_full(osdc, lreq->t.base_oloc.pool));
+			if (!force_resend && !force_resend_writes)
+				break;
+
+			/* fall through */
+		case CALC_TARGET_NEED_RESEND:
+			/*
+			 * scan_requests() for the previous epoch(s)
+			 * may have already added it to the list, since
+			 * it's not unlinked here.
+			 */
+			if (list_empty(&lreq->scan_item))
+				list_add_tail(&lreq->scan_item, need_resend_linger);
+			break;
+		case CALC_TARGET_POOL_DNE:
+			break;
+		}
+	}
+
 	for (n = rb_first(&osd->o_requests); n; ) {
 		struct ceph_osd_request *req =
 		    rb_entry(n, struct ceph_osd_request, r_node);
@@ -2263,6 +2821,7 @@
 			  struct rb_root *need_resend,
 			  struct list_head *need_resend_linger)
 {
+	struct ceph_osd_linger_request *lreq, *nlreq;
 	struct rb_node *n;
 
 	for (n = rb_first(need_resend); n; ) {
@@ -2280,8 +2839,17 @@
 		if (!req->r_linger) {
 			if (!osd_homeless(osd) && !req->r_t.paused)
 				send_request(req);
+		} else {
+			cancel_linger_request(req);
 		}
 	}
+
+	list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
+		if (!osd_homeless(lreq->osd))
+			send_linger(lreq);
+
+		list_del_init(&lreq->scan_item);
+	}
 }
 
 /*
@@ -2406,15 +2974,25 @@
 {
 	struct rb_node *n;
 
-	for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+	for (n = rb_first(&osd->o_requests); n; ) {
 		struct ceph_osd_request *req =
 		    rb_entry(n, struct ceph_osd_request, r_node);
 
+		n = rb_next(n); /* cancel_linger_request() */
+
 		if (!req->r_linger) {
 			if (!req->r_t.paused)
 				send_request(req);
+		} else {
+			cancel_linger_request(req);
 		}
 	}
+	for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
+		struct ceph_osd_linger_request *lreq =
+		    rb_entry(n, struct ceph_osd_linger_request, node);
+
+		send_linger(lreq);
+	}
 }
 
 /*
@@ -2442,192 +3020,76 @@
 }
 
 /*
- * watch/notify callback event infrastructure
- *
- * These callbacks are used both for watch and notify operations.
- */
-static void __release_event(struct kref *kref)
-{
-	struct ceph_osd_event *event =
-		container_of(kref, struct ceph_osd_event, kref);
-
-	dout("__release_event %p\n", event);
-	kfree(event);
-}
-
-static void get_event(struct ceph_osd_event *event)
-{
-	kref_get(&event->kref);
-}
-
-void ceph_osdc_put_event(struct ceph_osd_event *event)
-{
-	kref_put(&event->kref, __release_event);
-}
-EXPORT_SYMBOL(ceph_osdc_put_event);
-
-static void __insert_event(struct ceph_osd_client *osdc,
-			     struct ceph_osd_event *new)
-{
-	struct rb_node **p = &osdc->event_tree.rb_node;
-	struct rb_node *parent = NULL;
-	struct ceph_osd_event *event = NULL;
-
-	while (*p) {
-		parent = *p;
-		event = rb_entry(parent, struct ceph_osd_event, node);
-		if (new->cookie < event->cookie)
-			p = &(*p)->rb_left;
-		else if (new->cookie > event->cookie)
-			p = &(*p)->rb_right;
-		else
-			BUG();
-	}
-
-	rb_link_node(&new->node, parent, p);
-	rb_insert_color(&new->node, &osdc->event_tree);
-}
-
-static struct ceph_osd_event *__find_event(struct ceph_osd_client *osdc,
-					        u64 cookie)
-{
-	struct rb_node **p = &osdc->event_tree.rb_node;
-	struct rb_node *parent = NULL;
-	struct ceph_osd_event *event = NULL;
-
-	while (*p) {
-		parent = *p;
-		event = rb_entry(parent, struct ceph_osd_event, node);
-		if (cookie < event->cookie)
-			p = &(*p)->rb_left;
-		else if (cookie > event->cookie)
-			p = &(*p)->rb_right;
-		else
-			return event;
-	}
-	return NULL;
-}
-
-static void __remove_event(struct ceph_osd_event *event)
-{
-	struct ceph_osd_client *osdc = event->osdc;
-
-	if (!RB_EMPTY_NODE(&event->node)) {
-		dout("__remove_event removed %p\n", event);
-		rb_erase(&event->node, &osdc->event_tree);
-		ceph_osdc_put_event(event);
-	} else {
-		dout("__remove_event didn't remove %p\n", event);
-	}
-}
-
-int ceph_osdc_create_event(struct ceph_osd_client *osdc,
-			   void (*event_cb)(u64, u64, u8, void *),
-			   void *data, struct ceph_osd_event **pevent)
-{
-	struct ceph_osd_event *event;
-
-	event = kmalloc(sizeof(*event), GFP_NOIO);
-	if (!event)
-		return -ENOMEM;
-
-	dout("create_event %p\n", event);
-	event->cb = event_cb;
-	event->one_shot = 0;
-	event->data = data;
-	event->osdc = osdc;
-	INIT_LIST_HEAD(&event->osd_node);
-	RB_CLEAR_NODE(&event->node);
-	kref_init(&event->kref);   /* one ref for us */
-	kref_get(&event->kref);    /* one ref for the caller */
-
-	spin_lock(&osdc->event_lock);
-	event->cookie = ++osdc->event_count;
-	__insert_event(osdc, event);
-	spin_unlock(&osdc->event_lock);
-
-	*pevent = event;
-	return 0;
-}
-EXPORT_SYMBOL(ceph_osdc_create_event);
-
-void ceph_osdc_cancel_event(struct ceph_osd_event *event)
-{
-	struct ceph_osd_client *osdc = event->osdc;
-
-	dout("cancel_event %p\n", event);
-	spin_lock(&osdc->event_lock);
-	__remove_event(event);
-	spin_unlock(&osdc->event_lock);
-	ceph_osdc_put_event(event); /* caller's */
-}
-EXPORT_SYMBOL(ceph_osdc_cancel_event);
-
-
-static void do_event_work(struct work_struct *work)
-{
-	struct ceph_osd_event_work *event_work =
-		container_of(work, struct ceph_osd_event_work, work);
-	struct ceph_osd_event *event = event_work->event;
-	u64 ver = event_work->ver;
-	u64 notify_id = event_work->notify_id;
-	u8 opcode = event_work->opcode;
-
-	dout("do_event_work completing %p\n", event);
-	event->cb(ver, notify_id, opcode, event->data);
-	dout("do_event_work completed %p\n", event);
-	ceph_osdc_put_event(event);
-	kfree(event_work);
-}
-
-
-/*
  * Process osd watch notifications
  */
 static void handle_watch_notify(struct ceph_osd_client *osdc,
 				struct ceph_msg *msg)
 {
-	void *p, *end;
-	u8 proto_ver;
-	u64 cookie, ver, notify_id;
-	u8 opcode;
-	struct ceph_osd_event *event;
-	struct ceph_osd_event_work *event_work;
-
-	p = msg->front.iov_base;
-	end = p + msg->front.iov_len;
+	void *p = msg->front.iov_base;
+	void *const end = p + msg->front.iov_len;
+	struct ceph_osd_linger_request *lreq;
+	struct linger_work *lwork;
+	u8 proto_ver, opcode;
+	u64 cookie, notify_id;
+	u64 notifier_id = 0;
+	void *payload = NULL;
+	u32 payload_len = 0;
 
 	ceph_decode_8_safe(&p, end, proto_ver, bad);
 	ceph_decode_8_safe(&p, end, opcode, bad);
 	ceph_decode_64_safe(&p, end, cookie, bad);
-	ceph_decode_64_safe(&p, end, ver, bad);
+	p += 8; /* skip ver */
 	ceph_decode_64_safe(&p, end, notify_id, bad);
 
-	spin_lock(&osdc->event_lock);
-	event = __find_event(osdc, cookie);
-	if (event) {
-		BUG_ON(event->one_shot);
-		get_event(event);
+	if (proto_ver >= 1) {
+		ceph_decode_32_safe(&p, end, payload_len, bad);
+		ceph_decode_need(&p, end, payload_len, bad);
+		payload = p;
+		p += payload_len;
 	}
-	spin_unlock(&osdc->event_lock);
-	dout("handle_watch_notify cookie %lld ver %lld event %p\n",
-	     cookie, ver, event);
-	if (event) {
-		event_work = kmalloc(sizeof(*event_work), GFP_NOIO);
-		if (!event_work) {
-			pr_err("couldn't allocate event_work\n");
-			ceph_osdc_put_event(event);
-			return;
+
+	if (le16_to_cpu(msg->hdr.version) >= 2)
+		p += 4; /* skip return_code */
+
+	if (le16_to_cpu(msg->hdr.version) >= 3)
+		ceph_decode_64_safe(&p, end, notifier_id, bad);
+
+	down_read(&osdc->lock);
+	lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
+	if (!lreq) {
+		dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
+		     cookie);
+		goto out_unlock_osdc;
+	}
+
+	mutex_lock(&lreq->lock);
+	dout("%s opcode %d cookie %llu lreq %p\n", __func__, opcode, cookie,
+	     lreq);
+	if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
+		if (!lreq->last_error) {
+			lreq->last_error = -ENOTCONN;
+			queue_watch_error(lreq);
 		}
-		INIT_WORK(&event_work->work, do_event_work);
-		event_work->event = event;
-		event_work->ver = ver;
-		event_work->notify_id = notify_id;
-		event_work->opcode = opcode;
+	} else {
+		/* CEPH_WATCH_EVENT_NOTIFY */
+		lwork = lwork_alloc(lreq, do_watch_notify);
+		if (!lwork) {
+			pr_err("failed to allocate notify-lwork\n");
+			goto out_unlock_lreq;
+		}
 
-		queue_work(osdc->notify_wq, &event_work->work);
+		lwork->notify.notify_id = notify_id;
+		lwork->notify.notifier_id = notifier_id;
+		lwork->notify.payload = payload;
+		lwork->notify.payload_len = payload_len;
+		lwork->notify.msg = ceph_msg_get(msg);
+		lwork_queue(lwork);
 	}
 
+out_unlock_lreq:
+	mutex_unlock(&lreq->lock);
+out_unlock_osdc:
+	up_read(&osdc->lock);
 	return;
 
 bad:
@@ -2659,8 +3121,6 @@
 	struct ceph_osd_client *osdc = req->r_osdc;
 
 	down_write(&osdc->lock);
-	if (req->r_linger)
-		__unregister_linger_request(osdc, req);
 	if (req->r_osd)
 		cancel_request(req);
 	up_write(&osdc->lock);
@@ -2743,6 +3203,198 @@
 }
 EXPORT_SYMBOL(ceph_osdc_sync);
 
+static struct ceph_osd_request *
+alloc_linger_request(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_request *req;
+
+	req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO);
+	if (!req)
+		return NULL;
+
+	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
+	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
+
+	if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
+		ceph_osdc_put_request(req);
+		return NULL;
+	}
+
+	return req;
+}
+
+/*
+ * Returns a handle, caller owns a ref.
+ */
+struct ceph_osd_linger_request *
+ceph_osdc_watch(struct ceph_osd_client *osdc,
+		struct ceph_object_id *oid,
+		struct ceph_object_locator *oloc,
+		rados_watchcb2_t wcb,
+		rados_watcherrcb_t errcb,
+		void *data)
+{
+	struct ceph_osd_linger_request *lreq;
+	int ret;
+
+	lreq = linger_alloc(osdc);
+	if (!lreq)
+		return ERR_PTR(-ENOMEM);
+
+	lreq->wcb = wcb;
+	lreq->errcb = errcb;
+	lreq->data = data;
+
+	ceph_oid_copy(&lreq->t.base_oid, oid);
+	ceph_oloc_copy(&lreq->t.base_oloc, oloc);
+	lreq->t.flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+	lreq->mtime = CURRENT_TIME;
+
+	lreq->reg_req = alloc_linger_request(lreq);
+	if (!lreq->reg_req) {
+		ret = -ENOMEM;
+		goto err_put_lreq;
+	}
+
+	lreq->ping_req = alloc_linger_request(lreq);
+	if (!lreq->ping_req) {
+		ret = -ENOMEM;
+		goto err_put_lreq;
+	}
+
+	down_write(&osdc->lock);
+	linger_register(lreq); /* before osd_req_op_* */
+	osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
+			      CEPH_OSD_WATCH_OP_WATCH);
+	osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
+			      CEPH_OSD_WATCH_OP_PING);
+	linger_submit(lreq);
+	up_write(&osdc->lock);
+
+	ret = linger_reg_commit_wait(lreq);
+	if (ret) {
+		linger_cancel(lreq);
+		goto err_put_lreq;
+	}
+
+	return lreq;
+
+err_put_lreq:
+	linger_put(lreq);
+	return ERR_PTR(ret);
+}
+EXPORT_SYMBOL(ceph_osdc_watch);
+
+/*
+ * Releases a ref.
+ *
+ * Times out after mount_timeout to preserve rbd unmap behaviour
+ * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
+ * with mount_timeout").
+ */
+int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
+		      struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_options *opts = osdc->client->options;
+	struct ceph_osd_request *req;
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
+	if (!req)
+		return -ENOMEM;
+
+	ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
+	ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
+	req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
+	req->r_mtime = CURRENT_TIME;
+	osd_req_op_watch_init(req, 0, lreq->linger_id,
+			      CEPH_OSD_WATCH_OP_UNWATCH);
+
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		goto out_put_req;
+
+	ceph_osdc_start_request(osdc, req, false);
+	linger_cancel(lreq);
+	linger_put(lreq);
+	ret = wait_request_timeout(req, opts->mount_timeout);
+
+out_put_req:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_unwatch);
+
+static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
+				      u64 notify_id, u64 cookie, void *payload,
+				      size_t payload_len)
+{
+	struct ceph_osd_req_op *op;
+	struct ceph_pagelist *pl;
+	int ret;
+
+	op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
+
+	pl = kmalloc(sizeof(*pl), GFP_NOIO);
+	if (!pl)
+		return -ENOMEM;
+
+	ceph_pagelist_init(pl);
+	ret = ceph_pagelist_encode_64(pl, notify_id);
+	ret |= ceph_pagelist_encode_64(pl, cookie);
+	if (payload) {
+		ret |= ceph_pagelist_encode_32(pl, payload_len);
+		ret |= ceph_pagelist_append(pl, payload, payload_len);
+	} else {
+		ret |= ceph_pagelist_encode_32(pl, 0);
+	}
+	if (ret) {
+		ceph_pagelist_release(pl);
+		return -ENOMEM;
+	}
+
+	ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
+	op->indata_len = pl->length;
+	return 0;
+}
+
+int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
+			 struct ceph_object_id *oid,
+			 struct ceph_object_locator *oloc,
+			 u64 notify_id,
+			 u64 cookie,
+			 void *payload,
+			 size_t payload_len)
+{
+	struct ceph_osd_request *req;
+	int ret;
+
+	req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
+	if (!req)
+		return -ENOMEM;
+
+	ceph_oid_copy(&req->r_base_oid, oid);
+	ceph_oloc_copy(&req->r_base_oloc, oloc);
+	req->r_flags = CEPH_OSD_FLAG_READ;
+
+	ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
+	if (ret)
+		goto out_put_req;
+
+	ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
+					 payload_len);
+	if (ret)
+		goto out_put_req;
+
+	ceph_osdc_start_request(osdc, req, false);
+	ret = ceph_osdc_wait_request(osdc, req);
+
+out_put_req:
+	ceph_osdc_put_request(req);
+	return ret;
+}
+EXPORT_SYMBOL(ceph_osdc_notify_ack);
+
 /*
  * Call all pending notify callbacks - for use after a watch is
  * unregistered, to make sure no more callbacks for it will be invoked
@@ -2767,15 +3419,12 @@
 	osdc->osds = RB_ROOT;
 	INIT_LIST_HEAD(&osdc->osd_lru);
 	spin_lock_init(&osdc->osd_lru_lock);
-	INIT_LIST_HEAD(&osdc->req_linger);
 	osd_init(&osdc->homeless_osd);
 	osdc->homeless_osd.o_osdc = osdc;
 	osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
+	osdc->linger_requests = RB_ROOT;
 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
-	spin_lock_init(&osdc->event_lock);
-	osdc->event_tree = RB_ROOT;
-	osdc->event_count = 0;
 
 	err = -ENOMEM;
 	osdc->osdmap = ceph_osdmap_alloc();
@@ -2838,6 +3487,7 @@
 	osd_cleanup(&osdc->homeless_osd);
 
 	WARN_ON(!list_empty(&osdc->osd_lru));
+	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
 	WARN_ON(atomic_read(&osdc->num_requests));
 	WARN_ON(atomic_read(&osdc->num_homeless));