ceph: set i_head_snapc when getting CEPH_CAP_FILE_WR reference

In most cases that snap context is needed, we are holding
reference of CEPH_CAP_FILE_WR. So we can set ceph inode's
i_head_snapc when getting the CEPH_CAP_FILE_WR reference,
and make codes get snap context from i_head_snapc. This makes
the code simpler.

Another benefit of this change is that we can handle snap
notification more elegantly. Especially when snap context
is updated while someone else is doing write. The old queue
cap_snap code may set cap_snap's context to ether the old
context or the new snap context, depending on if i_head_snapc
is set. The new queue capp_snap code always set cap_snap's
context to the old snap context.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index ccc4325..5f53ac0 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -87,17 +87,21 @@
 	inode = mapping->host;
 	ci = ceph_inode(inode);
 
-	/*
-	 * Note that we're grabbing a snapc ref here without holding
-	 * any locks!
-	 */
-	snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
-
 	/* dirty the head */
 	spin_lock(&ci->i_ceph_lock);
-	if (ci->i_head_snapc == NULL)
-		ci->i_head_snapc = ceph_get_snap_context(snapc);
-	++ci->i_wrbuffer_ref_head;
+	BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
+	if (__ceph_have_pending_cap_snap(ci)) {
+		struct ceph_cap_snap *capsnap =
+				list_last_entry(&ci->i_cap_snaps,
+						struct ceph_cap_snap,
+						ci_item);
+		snapc = ceph_get_snap_context(capsnap->context);
+		capsnap->dirty_pages++;
+	} else {
+		BUG_ON(!ci->i_head_snapc);
+		snapc = ceph_get_snap_context(ci->i_head_snapc);
+		++ci->i_wrbuffer_ref_head;
+	}
 	if (ci->i_wrbuffer_ref == 0)
 		ihold(inode);
 	++ci->i_wrbuffer_ref;
@@ -1033,7 +1037,6 @@
 {
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	loff_t page_off = pos & PAGE_CACHE_MASK;
 	int pos_in_page = pos & ~PAGE_CACHE_MASK;
 	int end_in_page = pos_in_page + len;
@@ -1045,10 +1048,6 @@
 	/* writepages currently holds page lock, but if we change that later, */
 	wait_on_page_writeback(page);
 
-	/* check snap context */
-	BUG_ON(!ci->i_snap_realm);
-	down_read(&mdsc->snap_rwsem);
-	BUG_ON(!ci->i_snap_realm->cached_context);
 	snapc = page_snap_context(page);
 	if (snapc && snapc != ci->i_head_snapc) {
 		/*
@@ -1056,7 +1055,6 @@
 		 * context!  is it writeable now?
 		 */
 		oldest = get_oldest_context(inode, NULL);
-		up_read(&mdsc->snap_rwsem);
 
 		if (snapc->seq > oldest->seq) {
 			ceph_put_snap_context(oldest);
@@ -1113,7 +1111,6 @@
 	}
 
 	/* we need to read it. */
-	up_read(&mdsc->snap_rwsem);
 	r = readpage_nounlock(file, page);
 	if (r < 0)
 		goto fail_nosnap;
@@ -1158,16 +1155,13 @@
 
 /*
  * we don't do anything in here that simple_write_end doesn't do
- * except adjust dirty page accounting and drop read lock on
- * mdsc->snap_rwsem.
+ * except adjust dirty page accounting
  */
 static int ceph_write_end(struct file *file, struct address_space *mapping,
 			  loff_t pos, unsigned len, unsigned copied,
 			  struct page *page, void *fsdata)
 {
 	struct inode *inode = file_inode(file);
-	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	struct ceph_mds_client *mdsc = fsc->mdsc;
 	unsigned from = pos & (PAGE_CACHE_SIZE - 1);
 	int check_cap = 0;
 
@@ -1189,7 +1183,6 @@
 	set_page_dirty(page);
 
 	unlock_page(page);
-	up_read(&mdsc->snap_rwsem);
 	page_cache_release(page);
 
 	if (check_cap)
@@ -1315,7 +1308,6 @@
 	struct inode *inode = file_inode(vma->vm_file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_file_info *fi = vma->vm_file->private_data;
-	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	struct page *page = vmf->page;
 	loff_t off = page_offset(page);
 	loff_t size = i_size_read(inode);
@@ -1374,7 +1366,6 @@
 	if (ret == 0) {
 		/* success.  we'll keep the page locked. */
 		set_page_dirty(page);
-		up_read(&mdsc->snap_rwsem);
 		ret = VM_FAULT_LOCKED;
 	} else {
 		if (ret == -ENOMEM)
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 7c8e93a..feb8ec9 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -2073,7 +2073,8 @@
  *
  * Protected by i_ceph_lock.
  */
-static void __take_cap_refs(struct ceph_inode_info *ci, int got)
+static void __take_cap_refs(struct ceph_inode_info *ci, int got,
+			    bool snap_rwsem_locked)
 {
 	if (got & CEPH_CAP_PIN)
 		ci->i_pin_ref++;
@@ -2081,8 +2082,14 @@
 		ci->i_rd_ref++;
 	if (got & CEPH_CAP_FILE_CACHE)
 		ci->i_rdcache_ref++;
-	if (got & CEPH_CAP_FILE_WR)
+	if (got & CEPH_CAP_FILE_WR) {
+		if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
+			BUG_ON(!snap_rwsem_locked);
+			ci->i_head_snapc = ceph_get_snap_context(
+					ci->i_snap_realm->cached_context);
+		}
 		ci->i_wr_ref++;
+	}
 	if (got & CEPH_CAP_FILE_BUFFER) {
 		if (ci->i_wb_ref == 0)
 			ihold(&ci->vfs_inode);
@@ -2100,16 +2107,19 @@
  * requested from the MDS.
  */
 static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
-			    loff_t endoff, int *got, int *check_max, int *err)
+			    loff_t endoff, bool nonblock, int *got, int *err)
 {
 	struct inode *inode = &ci->vfs_inode;
+	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
 	int ret = 0;
 	int have, implemented;
 	int file_wanted;
+	bool snap_rwsem_locked = false;
 
 	dout("get_cap_refs %p need %s want %s\n", inode,
 	     ceph_cap_string(need), ceph_cap_string(want));
 
+again:
 	spin_lock(&ci->i_ceph_lock);
 
 	/* make sure file is actually open */
@@ -2125,6 +2135,10 @@
 	/* finish pending truncate */
 	while (ci->i_truncate_pending) {
 		spin_unlock(&ci->i_ceph_lock);
+		if (snap_rwsem_locked) {
+			up_read(&mdsc->snap_rwsem);
+			snap_rwsem_locked = false;
+		}
 		__ceph_do_pending_vmtruncate(inode);
 		spin_lock(&ci->i_ceph_lock);
 	}
@@ -2136,7 +2150,7 @@
 			dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
 			     inode, endoff, ci->i_max_size);
 			if (endoff > ci->i_requested_max_size) {
-				*check_max = 1;
+				*err = -EAGAIN;
 				ret = 1;
 			}
 			goto out_unlock;
@@ -2164,8 +2178,29 @@
 		     inode, ceph_cap_string(have), ceph_cap_string(not),
 		     ceph_cap_string(revoking));
 		if ((revoking & not) == 0) {
+			if (!snap_rwsem_locked &&
+			    !ci->i_head_snapc &&
+			    (need & CEPH_CAP_FILE_WR)) {
+				if (!down_read_trylock(&mdsc->snap_rwsem)) {
+					/*
+					 * we can not call down_read() when
+					 * task isn't in TASK_RUNNING state
+					 */
+					if (nonblock) {
+						*err = -EAGAIN;
+						ret = 1;
+						goto out_unlock;
+					}
+
+					spin_unlock(&ci->i_ceph_lock);
+					down_read(&mdsc->snap_rwsem);
+					snap_rwsem_locked = true;
+					goto again;
+				}
+				snap_rwsem_locked = true;
+			}
 			*got = need | (have & want);
-			__take_cap_refs(ci, *got);
+			__take_cap_refs(ci, *got, true);
 			ret = 1;
 		}
 	} else {
@@ -2189,6 +2224,8 @@
 	}
 out_unlock:
 	spin_unlock(&ci->i_ceph_lock);
+	if (snap_rwsem_locked)
+		up_read(&mdsc->snap_rwsem);
 
 	dout("get_cap_refs %p ret %d got %s\n", inode,
 	     ret, ceph_cap_string(*got));
@@ -2231,54 +2268,70 @@
 int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
 		  loff_t endoff, int *got, struct page **pinned_page)
 {
-	int _got, check_max, ret, err = 0;
+	int _got, ret, err = 0;
 
 	ret = ceph_pool_perm_check(ci, need);
 	if (ret < 0)
 		return ret;
 
-retry:
-	if (endoff > 0)
-		check_max_size(&ci->vfs_inode, endoff);
-	_got = 0;
-	check_max = 0;
-	ret = wait_event_interruptible(ci->i_cap_wq,
-				try_get_cap_refs(ci, need, want, endoff,
-						 &_got, &check_max, &err));
-	if (err)
-		ret = err;
-	if (ret < 0)
-		return ret;
+	while (true) {
+		if (endoff > 0)
+			check_max_size(&ci->vfs_inode, endoff);
 
-	if (check_max)
-		goto retry;
-
-	if (ci->i_inline_version != CEPH_INLINE_NONE &&
-	    (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
-	    i_size_read(&ci->vfs_inode) > 0) {
-		struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0);
-		if (page) {
-			if (PageUptodate(page)) {
-				*pinned_page = page;
-				goto out;
-			}
-			page_cache_release(page);
-		}
-		/*
-		 * drop cap refs first because getattr while holding
-		 * caps refs can cause deadlock.
-		 */
-		ceph_put_cap_refs(ci, _got);
+		err = 0;
 		_got = 0;
+		ret = try_get_cap_refs(ci, need, want, endoff,
+				       false, &_got, &err);
+		if (ret) {
+			if (err == -EAGAIN)
+				continue;
+			if (err < 0)
+				return err;
+		} else {
+			ret = wait_event_interruptible(ci->i_cap_wq,
+					try_get_cap_refs(ci, need, want, endoff,
+							 true, &_got, &err));
+			if (err == -EAGAIN)
+				continue;
+			if (err < 0)
+				ret = err;
+			if (ret < 0)
+				return ret;
+		}
 
-		/* getattr request will bring inline data into page cache */
-		ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
-					CEPH_STAT_CAP_INLINE_DATA, true);
-		if (ret < 0)
-			return ret;
-		goto retry;
+		if (ci->i_inline_version != CEPH_INLINE_NONE &&
+		    (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+		    i_size_read(&ci->vfs_inode) > 0) {
+			struct page *page =
+				find_get_page(ci->vfs_inode.i_mapping, 0);
+			if (page) {
+				if (PageUptodate(page)) {
+					*pinned_page = page;
+					break;
+				}
+				page_cache_release(page);
+			}
+			/*
+			 * drop cap refs first because getattr while
+			 * holding * caps refs can cause deadlock.
+			 */
+			ceph_put_cap_refs(ci, _got);
+			_got = 0;
+
+			/*
+			 * getattr request will bring inline data into
+			 * page cache
+			 */
+			ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
+						CEPH_STAT_CAP_INLINE_DATA,
+						true);
+			if (ret < 0)
+				return ret;
+			continue;
+		}
+		break;
 	}
-out:
+
 	*got = _got;
 	return 0;
 }
@@ -2290,7 +2343,7 @@
 void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
 {
 	spin_lock(&ci->i_ceph_lock);
-	__take_cap_refs(ci, caps);
+	__take_cap_refs(ci, caps, false);
 	spin_unlock(&ci->i_ceph_lock);
 }
 
@@ -2341,6 +2394,13 @@
 					wake = 1;
 				}
 			}
+			if (ci->i_wrbuffer_ref_head == 0 &&
+			    ci->i_dirty_caps == 0 &&
+			    ci->i_flushing_caps == 0) {
+				BUG_ON(!ci->i_head_snapc);
+				ceph_put_snap_context(ci->i_head_snapc);
+				ci->i_head_snapc = NULL;
+			}
 			/* see comment in __ceph_remove_cap() */
 			if (!__ceph_is_any_caps(ci) && ci->i_snap_realm)
 				drop_inode_snap_realm(ci);
@@ -2384,7 +2444,9 @@
 	if (ci->i_head_snapc == snapc) {
 		ci->i_wrbuffer_ref_head -= nr;
 		if (ci->i_wrbuffer_ref_head == 0 &&
-		    ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) {
+		    ci->i_wr_ref == 0 &&
+		    ci->i_dirty_caps == 0 &&
+		    ci->i_flushing_caps == 0) {
 			BUG_ON(!ci->i_head_snapc);
 			ceph_put_snap_context(ci->i_head_snapc);
 			ci->i_head_snapc = NULL;
@@ -2775,7 +2837,8 @@
 			dout(" inode %p now clean\n", inode);
 			BUG_ON(!list_empty(&ci->i_dirty_item));
 			drop = 1;
-			if (ci->i_wrbuffer_ref_head == 0) {
+			if (ci->i_wr_ref == 0 &&
+			    ci->i_wrbuffer_ref_head == 0) {
 				BUG_ON(!ci->i_head_snapc);
 				ceph_put_snap_context(ci->i_head_snapc);
 				ci->i_head_snapc = NULL;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 777eb21..0a76a37 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -557,13 +557,13 @@
  * objects, rollback on failure, etc.)
  */
 static ssize_t
-ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
+ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
+		       struct ceph_snap_context *snapc)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	struct ceph_snap_context *snapc;
 	struct ceph_vino vino;
 	struct ceph_osd_request *req;
 	struct page **pages;
@@ -600,7 +600,6 @@
 		size_t start;
 		ssize_t n;
 
-		snapc = ci->i_snap_realm->cached_context;
 		vino = ceph_vino(inode);
 		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 					    vino, pos, &len, 0,
@@ -674,13 +673,13 @@
  * objects, rollback on failure, etc.)
  */
 static ssize_t
-ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
+ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
+		struct ceph_snap_context *snapc)
 {
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file_inode(file);
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-	struct ceph_snap_context *snapc;
 	struct ceph_vino vino;
 	struct ceph_osd_request *req;
 	struct page **pages;
@@ -717,7 +716,6 @@
 		size_t left;
 		int n;
 
-		snapc = ci->i_snap_realm->cached_context;
 		vino = ceph_vino(inode);
 		req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
 					    vino, pos, &len, 0, 1,
@@ -996,14 +994,30 @@
 
 	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
 	    (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
+		struct ceph_snap_context *snapc;
 		struct iov_iter data;
 		mutex_unlock(&inode->i_mutex);
+
+		spin_lock(&ci->i_ceph_lock);
+		if (__ceph_have_pending_cap_snap(ci)) {
+			struct ceph_cap_snap *capsnap =
+					list_last_entry(&ci->i_cap_snaps,
+							struct ceph_cap_snap,
+							ci_item);
+			snapc = ceph_get_snap_context(capsnap->context);
+		} else {
+			BUG_ON(!ci->i_head_snapc);
+			snapc = ceph_get_snap_context(ci->i_head_snapc);
+		}
+		spin_unlock(&ci->i_ceph_lock);
+
 		/* we might need to revert back to that point */
 		data = *from;
 		if (iocb->ki_flags & IOCB_DIRECT)
-			written = ceph_sync_direct_write(iocb, &data, pos);
+			written = ceph_sync_direct_write(iocb, &data, pos,
+							 snapc);
 		else
-			written = ceph_sync_write(iocb, &data, pos);
+			written = ceph_sync_write(iocb, &data, pos, snapc);
 		if (written == -EOLDSNAPC) {
 			dout("aio_write %p %llx.%llx %llu~%u"
 				"got EOLDSNAPC, retrying\n",
@@ -1014,6 +1028,7 @@
 		}
 		if (written > 0)
 			iov_iter_advance(from, written);
+		ceph_put_snap_context(snapc);
 	} else {
 		loff_t old_size = inode->i_size;
 		/*
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index b2a9453..5bfdab9a 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -455,6 +455,7 @@
 {
 	struct inode *inode = &ci->vfs_inode;
 	struct ceph_cap_snap *capsnap;
+	struct ceph_snap_context *old_snapc;
 	int used, dirty;
 
 	capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
@@ -467,6 +468,8 @@
 	used = __ceph_caps_used(ci);
 	dirty = __ceph_caps_dirty(ci);
 
+	old_snapc = ci->i_head_snapc;
+
 	/*
 	 * If there is a write in progress, treat that as a dirty Fw,
 	 * even though it hasn't completed yet; by the time we finish
@@ -481,76 +484,79 @@
 		   writes in progress now were started before the previous
 		   cap_snap.  lucky us. */
 		dout("queue_cap_snap %p already pending\n", inode);
-		kfree(capsnap);
-	} else if (ci->i_snap_realm->cached_context == ceph_empty_snapc) {
+		goto update_snapc;
+	}
+	if (ci->i_snap_realm->cached_context == ceph_empty_snapc) {
 		dout("queue_cap_snap %p empty snapc\n", inode);
-		kfree(capsnap);
-	} else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
-			    CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
-		struct ceph_snap_context *snapc = ci->i_head_snapc;
-
-		/*
-		 * if we are a sync write, we may need to go to the snaprealm
-		 * to get the current snapc.
-		 */
-		if (!snapc)
-			snapc = ci->i_snap_realm->cached_context;
-
-		dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
-		     inode, capsnap, snapc, ceph_cap_string(dirty));
-		ihold(inode);
-
-		atomic_set(&capsnap->nref, 1);
-		capsnap->ci = ci;
-		INIT_LIST_HEAD(&capsnap->ci_item);
-		INIT_LIST_HEAD(&capsnap->flushing_item);
-
-		capsnap->follows = snapc->seq;
-		capsnap->issued = __ceph_caps_issued(ci, NULL);
-		capsnap->dirty = dirty;
-
-		capsnap->mode = inode->i_mode;
-		capsnap->uid = inode->i_uid;
-		capsnap->gid = inode->i_gid;
-
-		if (dirty & CEPH_CAP_XATTR_EXCL) {
-			__ceph_build_xattrs_blob(ci);
-			capsnap->xattr_blob =
-				ceph_buffer_get(ci->i_xattrs.blob);
-			capsnap->xattr_version = ci->i_xattrs.version;
-		} else {
-			capsnap->xattr_blob = NULL;
-			capsnap->xattr_version = 0;
-		}
-
-		capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
-
-		/* dirty page count moved from _head to this cap_snap;
-		   all subsequent writes page dirties occur _after_ this
-		   snapshot. */
-		capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
-		ci->i_wrbuffer_ref_head = 0;
-		capsnap->context = snapc;
-		ci->i_head_snapc =
-			ceph_get_snap_context(ci->i_snap_realm->cached_context);
-		dout(" new snapc is %p\n", ci->i_head_snapc);
-		list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
-
-		if (used & CEPH_CAP_FILE_WR) {
-			dout("queue_cap_snap %p cap_snap %p snapc %p"
-			     " seq %llu used WR, now pending\n", inode,
-			     capsnap, snapc, snapc->seq);
-			capsnap->writing = 1;
-		} else {
-			/* note mtime, size NOW. */
-			__ceph_finish_cap_snap(ci, capsnap);
-		}
-	} else {
+		goto update_snapc;
+	}
+	if (!(dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
+		       CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) {
 		dout("queue_cap_snap %p nothing dirty|writing\n", inode);
-		kfree(capsnap);
+		goto update_snapc;
 	}
 
+	BUG_ON(!old_snapc);
+
+	dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
+	     inode, capsnap, old_snapc, ceph_cap_string(dirty));
+	ihold(inode);
+
+	atomic_set(&capsnap->nref, 1);
+	capsnap->ci = ci;
+	INIT_LIST_HEAD(&capsnap->ci_item);
+	INIT_LIST_HEAD(&capsnap->flushing_item);
+
+	capsnap->follows = old_snapc->seq;
+	capsnap->issued = __ceph_caps_issued(ci, NULL);
+	capsnap->dirty = dirty;
+
+	capsnap->mode = inode->i_mode;
+	capsnap->uid = inode->i_uid;
+	capsnap->gid = inode->i_gid;
+
+	if (dirty & CEPH_CAP_XATTR_EXCL) {
+		__ceph_build_xattrs_blob(ci);
+		capsnap->xattr_blob =
+			ceph_buffer_get(ci->i_xattrs.blob);
+		capsnap->xattr_version = ci->i_xattrs.version;
+	} else {
+		capsnap->xattr_blob = NULL;
+		capsnap->xattr_version = 0;
+	}
+
+	capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+
+	/* dirty page count moved from _head to this cap_snap;
+	   all subsequent writes page dirties occur _after_ this
+	   snapshot. */
+	capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
+	ci->i_wrbuffer_ref_head = 0;
+	capsnap->context = old_snapc;
+	list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
+	old_snapc = NULL;
+
+	if (used & CEPH_CAP_FILE_WR) {
+		dout("queue_cap_snap %p cap_snap %p snapc %p"
+		     " seq %llu used WR, now pending\n", inode,
+		     capsnap, old_snapc, old_snapc->seq);
+		capsnap->writing = 1;
+	} else {
+		/* note mtime, size NOW. */
+		__ceph_finish_cap_snap(ci, capsnap);
+	}
+	capsnap = NULL;
+
+update_snapc:
+	if (ci->i_head_snapc) {
+		ci->i_head_snapc = ceph_get_snap_context(
+				ci->i_snap_realm->cached_context);
+		dout(" new snapc is %p\n", ci->i_head_snapc);
+	}
 	spin_unlock(&ci->i_ceph_lock);
+
+	kfree(capsnap);
+	ceph_put_snap_context(old_snapc);
 }
 
 /*