Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull ceph updates from Sage Weil:
 "This includes both the first pile of Ceph patches (which I sent to
  torvalds@vger, sigh) and a few new patches that add support for
  fscache for Ceph.  That includes a few fscache core fixes that David
  Howells asked go through the Ceph tree.  (Thanks go to Milosz Tanski
  for putting this feature together)

  This first batch of patches (included here) had (has) several
  important RBD bug fixes, hole punch support, several different
  cleanups in the page cache interactions, improvements in the truncate
  code (new truncate mutex to avoid shenanigans with i_mutex), and a
  series of fixes in the synchronous striping read/write code.

  On top of that is a random collection of small fixes all across the
  tree (error code checks and error path cleanup, obsolete wq flags,
  etc)"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (43 commits)
  ceph: use d_invalidate() to invalidate aliases
  ceph: remove ceph_lookup_inode()
  ceph: trivial buildbot warnings fix
  ceph: Do not do invalidate if the filesystem is mounted nofsc
  ceph: page still marked private_2
  ceph: ceph_readpage_to_fscache didn't check if marked
  ceph: clean PgPrivate2 on returning from readpages
  ceph: use fscache as a local presisent cache
  fscache: Netfs function for cleanup post readpages
  FS-Cache: Fix heading in documentation
  CacheFiles: Implement interface to check cache consistency
  FS-Cache: Add interface to check consistency of a cached object
  rbd: fix null dereference in dout
  rbd: fix buffer size for writes to images with snapshots
  libceph: use pg_num_mask instead of pgp_num_mask for pg.seed calc
  rbd: fix I/O error propagation for reads
  ceph: use vfs __set_page_dirty_nobuffers interface instead of doing it inside filesystem
  ceph: allow sync_read/write return partial successed size of read/write.
  ceph: fix bugs about handling short-read for sync read mode.
  ceph: remove useless variable revoked_rdcache
  ...
diff --git a/Documentation/filesystems/caching/backend-api.txt b/Documentation/filesystems/caching/backend-api.txt
index d78bab9..277d1e8 100644
--- a/Documentation/filesystems/caching/backend-api.txt
+++ b/Documentation/filesystems/caching/backend-api.txt
@@ -299,6 +299,15 @@
      enough space in the cache to permit this.
 
 
+ (*) Check coherency state of an object [mandatory]:
+
+	int (*check_consistency)(struct fscache_object *object)
+
+     This method is called to have the cache check the saved auxiliary data of
+     the object against the netfs's idea of the state.  0 should be returned
+     if they're consistent and -ESTALE otherwise.  -ENOMEM and -ERESTARTSYS
+     may also be returned.
+
  (*) Update object [mandatory]:
 
 	int (*update_object)(struct fscache_object *object)
diff --git a/Documentation/filesystems/caching/netfs-api.txt b/Documentation/filesystems/caching/netfs-api.txt
index 97e6c0e..11a0a40 100644
--- a/Documentation/filesystems/caching/netfs-api.txt
+++ b/Documentation/filesystems/caching/netfs-api.txt
@@ -32,7 +32,7 @@
 	 (9) Setting the data file size
 	(10) Page alloc/read/write
 	(11) Page uncaching
-	(12) Index and data file update
+	(12) Index and data file consistency
 	(13) Miscellaneous cookie operations
 	(14) Cookie unregistration
 	(15) Index invalidation
@@ -433,7 +433,7 @@
 
 
 =====================
-PAGE READ/ALLOC/WRITE
+PAGE ALLOC/READ/WRITE
 =====================
 
 And the sixth step is to store and retrieve pages in the cache.  There are
@@ -499,7 +499,7 @@
      (*) An argument that's 0 on success or negative for an error code.
 
      If an error occurs, it should be assumed that the page contains no usable
-     data.
+     data.  fscache_readpages_cancel() may need to be called.
 
      end_io_func() will be called in process context if the read is results in
      an error, but it might be called in interrupt context if the read is
@@ -623,6 +623,22 @@
 been marked appropriately and will need uncaching.
 
 
+CANCELLATION OF UNREAD PAGES
+----------------------------
+
+If one or more pages are passed to fscache_read_or_alloc_pages() but not then
+read from the cache and also not read from the underlying filesystem then
+those pages will need to have any marks and reservations removed.  This can be
+done by calling:
+
+	void fscache_readpages_cancel(struct fscache_cookie *cookie,
+				      struct list_head *pages);
+
+prior to returning to the caller.  The cookie argument should be as passed to
+fscache_read_or_alloc_pages().  Every page in the pages list will be examined
+and any that have PG_fscache set will be uncached.
+
+
 ==============
 PAGE UNCACHING
 ==============
@@ -690,9 +706,18 @@
 error is returned.
 
 
-==========================
-INDEX AND DATA FILE UPDATE
-==========================
+===============================
+INDEX AND DATA FILE CONSISTENCY
+===============================
+
+To find out whether auxiliary data for an object is up to data within the
+cache, the following function can be called:
+
+	int fscache_check_consistency(struct fscache_cookie *cookie)
+
+This will call back to the netfs to check whether the auxiliary data associated
+with a cookie is correct.  It returns 0 if it is and -ESTALE if it isn't; it
+may also return -ENOMEM and -ERESTARTSYS.
 
 To request an update of the index data for an index or other object, the
 following function should be called:
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 191cd17..39c51cc 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1561,11 +1561,12 @@
 		obj_request, obj_request->img_request, obj_request->result,
 		xferred, length);
 	/*
-	 * ENOENT means a hole in the image.  We zero-fill the
-	 * entire length of the request.  A short read also implies
-	 * zero-fill to the end of the request.  Either way we
-	 * update the xferred count to indicate the whole request
-	 * was satisfied.
+	 * ENOENT means a hole in the image.  We zero-fill the entire
+	 * length of the request.  A short read also implies zero-fill
+	 * to the end of the request.  An error requires the whole
+	 * length of the request to be reported finished with an error
+	 * to the block layer.  In each case we update the xferred
+	 * count to indicate the whole request was satisfied.
 	 */
 	rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
 	if (obj_request->result == -ENOENT) {
@@ -1574,14 +1575,13 @@
 		else
 			zero_pages(obj_request->pages, 0, length);
 		obj_request->result = 0;
-		obj_request->xferred = length;
 	} else if (xferred < length && !obj_request->result) {
 		if (obj_request->type == OBJ_REQUEST_BIO)
 			zero_bio_chain(obj_request->bio_list, xferred);
 		else
 			zero_pages(obj_request->pages, xferred, length);
-		obj_request->xferred = length;
 	}
+	obj_request->xferred = length;
 	obj_request_done_set(obj_request);
 }
 
@@ -2167,9 +2167,9 @@
 	struct rbd_obj_request *obj_request = NULL;
 	struct rbd_obj_request *next_obj_request;
 	bool write_request = img_request_write_test(img_request);
-	struct bio *bio_list = 0;
+	struct bio *bio_list = NULL;
 	unsigned int bio_offset = 0;
-	struct page **pages = 0;
+	struct page **pages = NULL;
 	u64 img_offset;
 	u64 resid;
 	u16 opcode;
@@ -2207,6 +2207,11 @@
 		rbd_segment_name_free(object_name);
 		if (!obj_request)
 			goto out_unwind;
+		/*
+		 * set obj_request->img_request before creating the
+		 * osd_request so that it gets the right snapc
+		 */
+		rbd_img_obj_request_add(img_request, obj_request);
 
 		if (type == OBJ_REQUEST_BIO) {
 			unsigned int clone_size;
@@ -2248,11 +2253,6 @@
 					obj_request->pages, length,
 					offset & ~PAGE_MASK, false, false);
 
-		/*
-		 * set obj_request->img_request before formatting
-		 * the osd_request so that it gets the right snapc
-		 */
-		rbd_img_obj_request_add(img_request, obj_request);
 		if (write_request)
 			rbd_osd_req_format_write(obj_request);
 		else
@@ -3706,12 +3706,14 @@
 	if (ret < sizeof (size_buf))
 		return -ERANGE;
 
-	if (order)
+	if (order) {
 		*order = size_buf.order;
+		dout("  order %u", (unsigned int)*order);
+	}
 	*snap_size = le64_to_cpu(size_buf.size);
 
-	dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
-		(unsigned long long)snap_id, (unsigned int)*order,
+	dout("  snap_id 0x%016llx snap_size = %llu\n",
+		(unsigned long long)snap_id,
 		(unsigned long long)*snap_size);
 
 	return 0;
diff --git a/fs/cachefiles/interface.c b/fs/cachefiles/interface.c
index d4c1206..43eb559 100644
--- a/fs/cachefiles/interface.c
+++ b/fs/cachefiles/interface.c
@@ -378,6 +378,31 @@
 }
 
 /*
+ * check if the backing cache is updated to FS-Cache
+ * - called by FS-Cache when evaluates if need to invalidate the cache
+ */
+static bool cachefiles_check_consistency(struct fscache_operation *op)
+{
+	struct cachefiles_object *object;
+	struct cachefiles_cache *cache;
+	const struct cred *saved_cred;
+	int ret;
+
+	_enter("{OBJ%x}", op->object->debug_id);
+
+	object = container_of(op->object, struct cachefiles_object, fscache);
+	cache = container_of(object->fscache.cache,
+			     struct cachefiles_cache, cache);
+
+	cachefiles_begin_secure(cache, &saved_cred);
+	ret = cachefiles_check_auxdata(object);
+	cachefiles_end_secure(cache, saved_cred);
+
+	_leave(" = %d", ret);
+	return ret;
+}
+
+/*
  * notification the attributes on an object have changed
  * - called with reads/writes excluded by FS-Cache
  */
@@ -522,4 +547,5 @@
 	.write_page		= cachefiles_write_page,
 	.uncache_page		= cachefiles_uncache_page,
 	.dissociate_pages	= cachefiles_dissociate_pages,
+	.check_consistency	= cachefiles_check_consistency,
 };
diff --git a/fs/cachefiles/internal.h b/fs/cachefiles/internal.h
index 4938251..5349473 100644
--- a/fs/cachefiles/internal.h
+++ b/fs/cachefiles/internal.h
@@ -235,6 +235,7 @@
 				       struct cachefiles_xattr *auxdata);
 extern int cachefiles_update_object_xattr(struct cachefiles_object *object,
 					  struct cachefiles_xattr *auxdata);
+extern int cachefiles_check_auxdata(struct cachefiles_object *object);
 extern int cachefiles_check_object_xattr(struct cachefiles_object *object,
 					 struct cachefiles_xattr *auxdata);
 extern int cachefiles_remove_object_xattr(struct cachefiles_cache *cache,
diff --git a/fs/cachefiles/xattr.c b/fs/cachefiles/xattr.c
index 2476e51..34c88b8 100644
--- a/fs/cachefiles/xattr.c
+++ b/fs/cachefiles/xattr.c
@@ -157,6 +157,42 @@
 }
 
 /*
+ * check the consistency between the backing cache and the FS-Cache cookie
+ */
+int cachefiles_check_auxdata(struct cachefiles_object *object)
+{
+	struct cachefiles_xattr *auxbuf;
+	struct dentry *dentry = object->dentry;
+	unsigned int dlen;
+	int ret;
+
+	ASSERT(dentry);
+	ASSERT(dentry->d_inode);
+	ASSERT(object->fscache.cookie->def->check_aux);
+
+	auxbuf = kmalloc(sizeof(struct cachefiles_xattr) + 512, GFP_KERNEL);
+	if (!auxbuf)
+		return -ENOMEM;
+
+	auxbuf->len = vfs_getxattr(dentry, cachefiles_xattr_cache,
+				   &auxbuf->type, 512 + 1);
+	if (auxbuf->len < 1)
+		return -ESTALE;
+
+	if (auxbuf->type != object->fscache.cookie->def->type)
+		return -ESTALE;
+
+	dlen = auxbuf->len - 1;
+	ret = fscache_check_aux(&object->fscache, &auxbuf->data, dlen);
+
+	kfree(auxbuf);
+	if (ret != FSCACHE_CHECKAUX_OKAY)
+		return -ESTALE;
+
+	return 0;
+}
+
+/*
  * check the state xattr on a cache file
  * - return -ESTALE if the object should be deleted
  */
diff --git a/fs/ceph/Kconfig b/fs/ceph/Kconfig
index 49bc782..ac9a2ef 100644
--- a/fs/ceph/Kconfig
+++ b/fs/ceph/Kconfig
@@ -16,3 +16,12 @@
 
 	  If unsure, say N.
 
+if CEPH_FS
+config CEPH_FSCACHE
+	bool "Enable Ceph client caching support"
+	depends on CEPH_FS=m && FSCACHE || CEPH_FS=y && FSCACHE=y
+	help
+	  Choose Y here to enable persistent, read-only local
+	  caching support for Ceph clients using FS-Cache
+
+endif
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index bd35212..32e3010 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -9,3 +9,4 @@
 	mds_client.o mdsmap.o strings.o ceph_frag.o \
 	debugfs.o
 
+ceph-$(CONFIG_CEPH_FSCACHE) += cache.o
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 5318a3b..6df8bd4 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -11,6 +11,7 @@
 
 #include "super.h"
 #include "mds_client.h"
+#include "cache.h"
 #include <linux/ceph/osd_client.h>
 
 /*
@@ -70,15 +71,16 @@
 	struct address_space *mapping = page->mapping;
 	struct inode *inode;
 	struct ceph_inode_info *ci;
-	int undo = 0;
 	struct ceph_snap_context *snapc;
+	int ret;
 
 	if (unlikely(!mapping))
 		return !TestSetPageDirty(page);
 
-	if (TestSetPageDirty(page)) {
+	if (PageDirty(page)) {
 		dout("%p set_page_dirty %p idx %lu -- already dirty\n",
 		     mapping->host, page, page->index);
+		BUG_ON(!PagePrivate(page));
 		return 0;
 	}
 
@@ -107,35 +109,19 @@
 	     snapc, snapc->seq, snapc->num_snaps);
 	spin_unlock(&ci->i_ceph_lock);
 
-	/* now adjust page */
-	spin_lock_irq(&mapping->tree_lock);
-	if (page->mapping) {	/* Race with truncate? */
-		WARN_ON_ONCE(!PageUptodate(page));
-		account_page_dirtied(page, page->mapping);
-		radix_tree_tag_set(&mapping->page_tree,
-				page_index(page), PAGECACHE_TAG_DIRTY);
+	/*
+	 * Reference snap context in page->private.  Also set
+	 * PagePrivate so that we get invalidatepage callback.
+	 */
+	BUG_ON(PagePrivate(page));
+	page->private = (unsigned long)snapc;
+	SetPagePrivate(page);
 
-		/*
-		 * Reference snap context in page->private.  Also set
-		 * PagePrivate so that we get invalidatepage callback.
-		 */
-		page->private = (unsigned long)snapc;
-		SetPagePrivate(page);
-	} else {
-		dout("ANON set_page_dirty %p (raced truncate?)\n", page);
-		undo = 1;
-	}
+	ret = __set_page_dirty_nobuffers(page);
+	WARN_ON(!PageLocked(page));
+	WARN_ON(!page->mapping);
 
-	spin_unlock_irq(&mapping->tree_lock);
-
-	if (undo)
-		/* whoops, we failed to dirty the page */
-		ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
-
-	__mark_inode_dirty(mapping->host, I_DIRTY_PAGES);
-
-	BUG_ON(!PageDirty(page));
-	return 1;
+	return ret;
 }
 
 /*
@@ -150,11 +136,19 @@
 	struct ceph_inode_info *ci;
 	struct ceph_snap_context *snapc = page_snap_context(page);
 
-	BUG_ON(!PageLocked(page));
-	BUG_ON(!PagePrivate(page));
-	BUG_ON(!page->mapping);
-
 	inode = page->mapping->host;
+	ci = ceph_inode(inode);
+
+	if (offset != 0 || length != PAGE_CACHE_SIZE) {
+		dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
+		     inode, page, page->index, offset, length);
+		return;
+	}
+
+	ceph_invalidate_fscache_page(inode, page);
+
+	if (!PagePrivate(page))
+		return;
 
 	/*
 	 * We can get non-dirty pages here due to races between
@@ -164,31 +158,28 @@
 	if (!PageDirty(page))
 		pr_err("%p invalidatepage %p page not dirty\n", inode, page);
 
-	if (offset == 0 && length == PAGE_CACHE_SIZE)
-		ClearPageChecked(page);
+	ClearPageChecked(page);
 
-	ci = ceph_inode(inode);
-	if (offset == 0 && length == PAGE_CACHE_SIZE) {
-		dout("%p invalidatepage %p idx %lu full dirty page\n",
-		     inode, page, page->index);
-		ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
-		ceph_put_snap_context(snapc);
-		page->private = 0;
-		ClearPagePrivate(page);
-	} else {
-		dout("%p invalidatepage %p idx %lu partial dirty page %u(%u)\n",
-		     inode, page, page->index, offset, length);
-	}
+	dout("%p invalidatepage %p idx %lu full dirty page\n",
+	     inode, page, page->index);
+
+	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
+	ceph_put_snap_context(snapc);
+	page->private = 0;
+	ClearPagePrivate(page);
 }
 
-/* just a sanity check */
 static int ceph_releasepage(struct page *page, gfp_t g)
 {
 	struct inode *inode = page->mapping ? page->mapping->host : NULL;
 	dout("%p releasepage %p idx %lu\n", inode, page, page->index);
 	WARN_ON(PageDirty(page));
-	WARN_ON(PagePrivate(page));
-	return 0;
+
+	/* Can we release the page from the cache? */
+	if (!ceph_release_fscache_page(page, g))
+		return 0;
+
+	return !PagePrivate(page);
 }
 
 /*
@@ -198,11 +189,16 @@
 {
 	struct inode *inode = file_inode(filp);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_osd_client *osdc = 
+	struct ceph_osd_client *osdc =
 		&ceph_inode_to_client(inode)->client->osdc;
 	int err = 0;
 	u64 len = PAGE_CACHE_SIZE;
 
+	err = ceph_readpage_from_fscache(inode, page);
+
+	if (err == 0)
+		goto out;
+
 	dout("readpage inode %p file %p page %p index %lu\n",
 	     inode, filp, page, page->index);
 	err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
@@ -220,6 +216,9 @@
 	}
 	SetPageUptodate(page);
 
+	if (err == 0)
+		ceph_readpage_to_fscache(inode, page);
+
 out:
 	return err < 0 ? err : 0;
 }
@@ -262,6 +261,7 @@
 		     page->index);
 		flush_dcache_page(page);
 		SetPageUptodate(page);
+		ceph_readpage_to_fscache(inode, page);
 		unlock_page(page);
 		page_cache_release(page);
 		bytes -= PAGE_CACHE_SIZE;
@@ -331,11 +331,12 @@
 		page = list_entry(page_list->prev, struct page, lru);
 		BUG_ON(PageLocked(page));
 		list_del(&page->lru);
-		
+
  		dout("start_read %p adding %p idx %lu\n", inode, page,
 		     page->index);
 		if (add_to_page_cache_lru(page, &inode->i_data, page->index,
 					  GFP_NOFS)) {
+			ceph_fscache_uncache_page(inode, page);
 			page_cache_release(page);
 			dout("start_read %p add_to_page_cache failed %p\n",
 			     inode, page);
@@ -378,6 +379,12 @@
 	int rc = 0;
 	int max = 0;
 
+	rc = ceph_readpages_from_fscache(mapping->host, mapping, page_list,
+					 &nr_pages);
+
+	if (rc == 0)
+		goto out;
+
 	if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
 		max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
 			>> PAGE_SHIFT;
@@ -392,6 +399,8 @@
 		BUG_ON(rc == 0);
 	}
 out:
+	ceph_fscache_readpages_cancel(inode, page_list);
+
 	dout("readpages %p file %p ret %d\n", inode, file, rc);
 	return rc;
 }
@@ -497,6 +506,8 @@
 	    CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
 		set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
 
+	ceph_readpage_to_fscache(inode, page);
+
 	set_page_writeback(page);
 	err = ceph_osdc_writepages(osdc, ceph_vino(inode),
 				   &ci->i_layout, snapc,
@@ -552,7 +563,6 @@
 	pagevec_release(&pvec);
 }
 
-
 /*
  * async writeback completion handler.
  *
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
new file mode 100644
index 0000000..6bfe65e
--- /dev/null
+++ b/fs/ceph/cache.c
@@ -0,0 +1,398 @@
+/*
+ * Ceph cache definitions.
+ *
+ *  Copyright (C) 2013 by Adfin Solutions, Inc. All Rights Reserved.
+ *  Written by Milosz Tanski (milosz@adfin.com)
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License version 2
+ *  as published by the Free Software Foundation.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to:
+ *  Free Software Foundation
+ *  51 Franklin Street, Fifth Floor
+ *  Boston, MA  02111-1301  USA
+ *
+ */
+
+#include "super.h"
+#include "cache.h"
+
+struct ceph_aux_inode {
+	struct timespec	mtime;
+	loff_t          size;
+};
+
+struct fscache_netfs ceph_cache_netfs = {
+	.name		= "ceph",
+	.version	= 0,
+};
+
+static uint16_t ceph_fscache_session_get_key(const void *cookie_netfs_data,
+					     void *buffer, uint16_t maxbuf)
+{
+	const struct ceph_fs_client* fsc = cookie_netfs_data;
+	uint16_t klen;
+
+	klen = sizeof(fsc->client->fsid);
+	if (klen > maxbuf)
+		return 0;
+
+	memcpy(buffer, &fsc->client->fsid, klen);
+	return klen;
+}
+
+static const struct fscache_cookie_def ceph_fscache_fsid_object_def = {
+	.name		= "CEPH.fsid",
+	.type		= FSCACHE_COOKIE_TYPE_INDEX,
+	.get_key	= ceph_fscache_session_get_key,
+};
+
+int ceph_fscache_register(void)
+{
+	return fscache_register_netfs(&ceph_cache_netfs);
+}
+
+void ceph_fscache_unregister(void)
+{
+	fscache_unregister_netfs(&ceph_cache_netfs);
+}
+
+int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
+{
+	fsc->fscache = fscache_acquire_cookie(ceph_cache_netfs.primary_index,
+					      &ceph_fscache_fsid_object_def,
+					      fsc);
+
+	if (fsc->fscache == NULL) {
+		pr_err("Unable to resgister fsid: %p fscache cookie", fsc);
+		return 0;
+	}
+
+	fsc->revalidate_wq = alloc_workqueue("ceph-revalidate", 0, 1);
+	if (fsc->revalidate_wq == NULL)
+		return -ENOMEM;
+
+	return 0;
+}
+
+static uint16_t ceph_fscache_inode_get_key(const void *cookie_netfs_data,
+					   void *buffer, uint16_t maxbuf)
+{
+	const struct ceph_inode_info* ci = cookie_netfs_data;
+	uint16_t klen;
+
+	/* use ceph virtual inode (id + snaphot) */
+	klen = sizeof(ci->i_vino);
+	if (klen > maxbuf)
+		return 0;
+
+	memcpy(buffer, &ci->i_vino, klen);
+	return klen;
+}
+
+static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data,
+					   void *buffer, uint16_t bufmax)
+{
+	struct ceph_aux_inode aux;
+	const struct ceph_inode_info* ci = cookie_netfs_data;
+	const struct inode* inode = &ci->vfs_inode;
+
+	memset(&aux, 0, sizeof(aux));
+	aux.mtime = inode->i_mtime;
+	aux.size = inode->i_size;
+
+	memcpy(buffer, &aux, sizeof(aux));
+
+	return sizeof(aux);
+}
+
+static void ceph_fscache_inode_get_attr(const void *cookie_netfs_data,
+					uint64_t *size)
+{
+	const struct ceph_inode_info* ci = cookie_netfs_data;
+	const struct inode* inode = &ci->vfs_inode;
+
+	*size = inode->i_size;
+}
+
+static enum fscache_checkaux ceph_fscache_inode_check_aux(
+	void *cookie_netfs_data, const void *data, uint16_t dlen)
+{
+	struct ceph_aux_inode aux;
+	struct ceph_inode_info* ci = cookie_netfs_data;
+	struct inode* inode = &ci->vfs_inode;
+
+	if (dlen != sizeof(aux))
+		return FSCACHE_CHECKAUX_OBSOLETE;
+
+	memset(&aux, 0, sizeof(aux));
+	aux.mtime = inode->i_mtime;
+	aux.size = inode->i_size;
+
+	if (memcmp(data, &aux, sizeof(aux)) != 0)
+		return FSCACHE_CHECKAUX_OBSOLETE;
+
+	dout("ceph inode 0x%p cached okay", ci);
+	return FSCACHE_CHECKAUX_OKAY;
+}
+
+static void ceph_fscache_inode_now_uncached(void* cookie_netfs_data)
+{
+	struct ceph_inode_info* ci = cookie_netfs_data;
+	struct pagevec pvec;
+	pgoff_t first;
+	int loop, nr_pages;
+
+	pagevec_init(&pvec, 0);
+	first = 0;
+
+	dout("ceph inode 0x%p now uncached", ci);
+
+	while (1) {
+		nr_pages = pagevec_lookup(&pvec, ci->vfs_inode.i_mapping, first,
+					  PAGEVEC_SIZE - pagevec_count(&pvec));
+
+		if (!nr_pages)
+			break;
+
+		for (loop = 0; loop < nr_pages; loop++)
+			ClearPageFsCache(pvec.pages[loop]);
+
+		first = pvec.pages[nr_pages - 1]->index + 1;
+
+		pvec.nr = nr_pages;
+		pagevec_release(&pvec);
+		cond_resched();
+	}
+}
+
+static const struct fscache_cookie_def ceph_fscache_inode_object_def = {
+	.name		= "CEPH.inode",
+	.type		= FSCACHE_COOKIE_TYPE_DATAFILE,
+	.get_key	= ceph_fscache_inode_get_key,
+	.get_attr	= ceph_fscache_inode_get_attr,
+	.get_aux	= ceph_fscache_inode_get_aux,
+	.check_aux	= ceph_fscache_inode_check_aux,
+	.now_uncached	= ceph_fscache_inode_now_uncached,
+};
+
+void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc,
+					struct ceph_inode_info* ci)
+{
+	struct inode* inode = &ci->vfs_inode;
+
+	/* No caching for filesystem */
+	if (fsc->fscache == NULL)
+		return;
+
+	/* Only cache for regular files that are read only */
+	if ((ci->vfs_inode.i_mode & S_IFREG) == 0)
+		return;
+
+	/* Avoid multiple racing open requests */
+	mutex_lock(&inode->i_mutex);
+
+	if (ci->fscache)
+		goto done;
+
+	ci->fscache = fscache_acquire_cookie(fsc->fscache,
+					     &ceph_fscache_inode_object_def,
+					     ci);
+done:
+	mutex_unlock(&inode->i_mutex);
+
+}
+
+void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
+{
+	struct fscache_cookie* cookie;
+
+	if ((cookie = ci->fscache) == NULL)
+		return;
+
+	ci->fscache = NULL;
+
+	fscache_uncache_all_inode_pages(cookie, &ci->vfs_inode);
+	fscache_relinquish_cookie(cookie, 0);
+}
+
+static void ceph_vfs_readpage_complete(struct page *page, void *data, int error)
+{
+	if (!error)
+		SetPageUptodate(page);
+}
+
+static void ceph_vfs_readpage_complete_unlock(struct page *page, void *data, int error)
+{
+	if (!error)
+		SetPageUptodate(page);
+
+	unlock_page(page);
+}
+
+static inline int cache_valid(struct ceph_inode_info *ci)
+{
+	return ((ceph_caps_issued(ci) & CEPH_CAP_FILE_CACHE) &&
+		(ci->i_fscache_gen == ci->i_rdcache_gen));
+}
+
+
+/* Atempt to read from the fscache,
+ *
+ * This function is called from the readpage_nounlock context. DO NOT attempt to
+ * unlock the page here (or in the callback).
+ */
+int ceph_readpage_from_fscache(struct inode *inode, struct page *page)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int ret;
+
+	if (!cache_valid(ci))
+		return -ENOBUFS;
+
+	ret = fscache_read_or_alloc_page(ci->fscache, page,
+					 ceph_vfs_readpage_complete, NULL,
+					 GFP_KERNEL);
+
+	switch (ret) {
+		case 0: /* Page found */
+			dout("page read submitted\n");
+			return 0;
+		case -ENOBUFS: /* Pages were not found, and can't be */
+		case -ENODATA: /* Pages were not found */
+			dout("page/inode not in cache\n");
+			return ret;
+		default:
+			dout("%s: unknown error ret = %i\n", __func__, ret);
+			return ret;
+	}
+}
+
+int ceph_readpages_from_fscache(struct inode *inode,
+				  struct address_space *mapping,
+				  struct list_head *pages,
+				  unsigned *nr_pages)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int ret;
+
+	if (!cache_valid(ci))
+		return -ENOBUFS;
+
+	ret = fscache_read_or_alloc_pages(ci->fscache, mapping, pages, nr_pages,
+					  ceph_vfs_readpage_complete_unlock,
+					  NULL, mapping_gfp_mask(mapping));
+
+	switch (ret) {
+		case 0: /* All pages found */
+			dout("all-page read submitted\n");
+			return 0;
+		case -ENOBUFS: /* Some pages were not found, and can't be */
+		case -ENODATA: /* some pages were not found */
+			dout("page/inode not in cache\n");
+			return ret;
+		default:
+			dout("%s: unknown error ret = %i\n", __func__, ret);
+			return ret;
+	}
+}
+
+void ceph_readpage_to_fscache(struct inode *inode, struct page *page)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	int ret;
+
+	if (!PageFsCache(page))
+		return;
+
+	if (!cache_valid(ci))
+		return;
+
+	ret = fscache_write_page(ci->fscache, page, GFP_KERNEL);
+	if (ret)
+		 fscache_uncache_page(ci->fscache, page);
+}
+
+void ceph_invalidate_fscache_page(struct inode* inode, struct page *page)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+
+	fscache_wait_on_page_write(ci->fscache, page);
+	fscache_uncache_page(ci->fscache, page);
+}
+
+void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc)
+{
+	if (fsc->revalidate_wq)
+		destroy_workqueue(fsc->revalidate_wq);
+
+	fscache_relinquish_cookie(fsc->fscache, 0);
+	fsc->fscache = NULL;
+}
+
+static void ceph_revalidate_work(struct work_struct *work)
+{
+	int issued;
+	u32 orig_gen;
+	struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
+						  i_revalidate_work);
+	struct inode *inode = &ci->vfs_inode;
+
+	spin_lock(&ci->i_ceph_lock);
+	issued = __ceph_caps_issued(ci, NULL);
+	orig_gen = ci->i_rdcache_gen;
+	spin_unlock(&ci->i_ceph_lock);
+
+	if (!(issued & CEPH_CAP_FILE_CACHE)) {
+		dout("revalidate_work lost cache before validation %p\n",
+		     inode);
+		goto out;
+	}
+
+	if (!fscache_check_consistency(ci->fscache))
+		fscache_invalidate(ci->fscache);
+
+	spin_lock(&ci->i_ceph_lock);
+	/* Update the new valid generation (backwards sanity check too) */
+	if (orig_gen > ci->i_fscache_gen) {
+		ci->i_fscache_gen = orig_gen;
+	}
+	spin_unlock(&ci->i_ceph_lock);
+
+out:
+	iput(&ci->vfs_inode);
+}
+
+void ceph_queue_revalidate(struct inode *inode)
+{
+	struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
+	struct ceph_inode_info *ci = ceph_inode(inode);
+
+	if (fsc->revalidate_wq == NULL || ci->fscache == NULL)
+		return;
+
+	ihold(inode);
+
+	if (queue_work(ceph_sb_to_client(inode->i_sb)->revalidate_wq,
+		       &ci->i_revalidate_work)) {
+		dout("ceph_queue_revalidate %p\n", inode);
+	} else {
+		dout("ceph_queue_revalidate %p failed\n)", inode);
+		iput(inode);
+	}
+}
+
+void ceph_fscache_inode_init(struct ceph_inode_info *ci)
+{
+	ci->fscache = NULL;
+	/* The first load is verifed cookie open time */
+	ci->i_fscache_gen = 1;
+	INIT_WORK(&ci->i_revalidate_work, ceph_revalidate_work);
+}
diff --git a/fs/ceph/cache.h b/fs/ceph/cache.h
new file mode 100644
index 0000000..ba94940
--- /dev/null
+++ b/fs/ceph/cache.h
@@ -0,0 +1,159 @@
+/*
+ * Ceph cache definitions.
+ *
+ *  Copyright (C) 2013 by Adfin Solutions, Inc. All Rights Reserved.
+ *  Written by Milosz Tanski (milosz@adfin.com)
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License version 2
+ *  as published by the Free Software Foundation.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to:
+ *  Free Software Foundation
+ *  51 Franklin Street, Fifth Floor
+ *  Boston, MA  02111-1301  USA
+ *
+ */
+
+#ifndef _CEPH_CACHE_H
+#define _CEPH_CACHE_H
+
+#ifdef CONFIG_CEPH_FSCACHE
+
+extern struct fscache_netfs ceph_cache_netfs;
+
+int ceph_fscache_register(void);
+void ceph_fscache_unregister(void);
+
+int ceph_fscache_register_fs(struct ceph_fs_client* fsc);
+void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc);
+
+void ceph_fscache_inode_init(struct ceph_inode_info *ci);
+void ceph_fscache_register_inode_cookie(struct ceph_fs_client* fsc,
+					struct ceph_inode_info* ci);
+void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci);
+
+int ceph_readpage_from_fscache(struct inode *inode, struct page *page);
+int ceph_readpages_from_fscache(struct inode *inode,
+				struct address_space *mapping,
+				struct list_head *pages,
+				unsigned *nr_pages);
+void ceph_readpage_to_fscache(struct inode *inode, struct page *page);
+void ceph_invalidate_fscache_page(struct inode* inode, struct page *page);
+void ceph_queue_revalidate(struct inode *inode);
+
+static inline void ceph_fscache_invalidate(struct inode *inode)
+{
+	fscache_invalidate(ceph_inode(inode)->fscache);
+}
+
+static inline void ceph_fscache_uncache_page(struct inode *inode,
+					     struct page *page)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	return fscache_uncache_page(ci->fscache, page);
+}
+
+static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
+{
+	struct inode* inode = page->mapping->host;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	return fscache_maybe_release_page(ci->fscache, page, gfp);
+}
+
+static inline void ceph_fscache_readpages_cancel(struct inode *inode,
+						 struct list_head *pages)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	return fscache_readpages_cancel(ci->fscache, pages);
+}
+
+#else
+
+static inline int ceph_fscache_register(void)
+{
+	return 0;
+}
+
+static inline void ceph_fscache_unregister(void)
+{
+}
+
+static inline int ceph_fscache_register_fs(struct ceph_fs_client* fsc)
+{
+	return 0;
+}
+
+static inline void ceph_fscache_unregister_fs(struct ceph_fs_client* fsc)
+{
+}
+
+static inline void ceph_fscache_inode_init(struct ceph_inode_info *ci)
+{
+}
+
+static inline void ceph_fscache_register_inode_cookie(struct ceph_fs_client* parent_fsc,
+						      struct ceph_inode_info* ci)
+{
+}
+
+static inline void ceph_fscache_uncache_page(struct inode *inode,
+					     struct page *pages)
+{
+}
+
+static inline int ceph_readpage_from_fscache(struct inode* inode,
+					     struct page *page)
+{
+	return -ENOBUFS;
+}
+
+static inline int ceph_readpages_from_fscache(struct inode *inode,
+					      struct address_space *mapping,
+					      struct list_head *pages,
+					      unsigned *nr_pages)
+{
+	return -ENOBUFS;
+}
+
+static inline void ceph_readpage_to_fscache(struct inode *inode,
+					    struct page *page)
+{
+}
+
+static inline void ceph_fscache_invalidate(struct inode *inode)
+{
+}
+
+static inline void ceph_invalidate_fscache_page(struct inode *inode,
+						struct page *page)
+{
+}
+
+static inline void ceph_fscache_unregister_inode_cookie(struct ceph_inode_info* ci)
+{
+}
+
+static inline int ceph_release_fscache_page(struct page *page, gfp_t gfp)
+{
+	return 1;
+}
+
+static inline void ceph_fscache_readpages_cancel(struct inode *inode,
+						 struct list_head *pages)
+{
+}
+
+static inline void ceph_queue_revalidate(struct inode *inode)
+{
+}
+
+#endif
+
+#endif
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 25442b4..13976c3 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -10,6 +10,7 @@
 
 #include "super.h"
 #include "mds_client.h"
+#include "cache.h"
 #include <linux/ceph/decode.h>
 #include <linux/ceph/messenger.h>
 
@@ -479,8 +480,9 @@
 	 * i_rdcache_gen.
 	 */
 	if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
-	    (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
+	    (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
 		ci->i_rdcache_gen++;
+	}
 
 	/*
 	 * if we are newly issued FILE_SHARED, mark dir not complete; we
@@ -2072,19 +2074,17 @@
 	/* finish pending truncate */
 	while (ci->i_truncate_pending) {
 		spin_unlock(&ci->i_ceph_lock);
-		if (!(need & CEPH_CAP_FILE_WR))
-			mutex_lock(&inode->i_mutex);
 		__ceph_do_pending_vmtruncate(inode);
-		if (!(need & CEPH_CAP_FILE_WR))
-			mutex_unlock(&inode->i_mutex);
 		spin_lock(&ci->i_ceph_lock);
 	}
 
-	if (need & CEPH_CAP_FILE_WR) {
+	have = __ceph_caps_issued(ci, &implemented);
+
+	if (have & need & CEPH_CAP_FILE_WR) {
 		if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
 			dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
 			     inode, endoff, ci->i_max_size);
-			if (endoff > ci->i_wanted_max_size) {
+			if (endoff > ci->i_requested_max_size) {
 				*check_max = 1;
 				ret = 1;
 			}
@@ -2099,7 +2099,6 @@
 			goto out;
 		}
 	}
-	have = __ceph_caps_issued(ci, &implemented);
 
 	if ((have & need) == need) {
 		/*
@@ -2141,14 +2140,17 @@
 
 	/* do we need to explicitly request a larger max_size? */
 	spin_lock(&ci->i_ceph_lock);
-	if ((endoff >= ci->i_max_size ||
-	     endoff > (inode->i_size << 1)) &&
-	    endoff > ci->i_wanted_max_size) {
+	if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) {
 		dout("write %p at large endoff %llu, req max_size\n",
 		     inode, endoff);
 		ci->i_wanted_max_size = endoff;
-		check = 1;
 	}
+	/* duplicate ceph_check_caps()'s logic */
+	if (ci->i_auth_cap &&
+	    (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) &&
+	    ci->i_wanted_max_size > ci->i_max_size &&
+	    ci->i_wanted_max_size > ci->i_requested_max_size)
+		check = 1;
 	spin_unlock(&ci->i_ceph_lock);
 	if (check)
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
@@ -2334,6 +2336,38 @@
 }
 
 /*
+ * Invalidate unlinked inode's aliases, so we can drop the inode ASAP.
+ */
+static void invalidate_aliases(struct inode *inode)
+{
+	struct dentry *dn, *prev = NULL;
+
+	dout("invalidate_aliases inode %p\n", inode);
+	d_prune_aliases(inode);
+	/*
+	 * For non-directory inode, d_find_alias() only returns
+	 * connected dentry. After calling d_invalidate(), the
+	 * dentry become disconnected.
+	 *
+	 * For directory inode, d_find_alias() can return
+	 * disconnected dentry. But directory inode should have
+	 * one alias at most.
+	 */
+	while ((dn = d_find_alias(inode))) {
+		if (dn == prev) {
+			dput(dn);
+			break;
+		}
+		d_invalidate(dn);
+		if (prev)
+			dput(prev);
+		prev = dn;
+	}
+	if (prev)
+		dput(prev);
+}
+
+/*
  * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
  * actually be a revocation if it specifies a smaller cap set.)
  *
@@ -2361,8 +2395,9 @@
 	int check_caps = 0;
 	int wake = 0;
 	int writeback = 0;
-	int revoked_rdcache = 0;
 	int queue_invalidate = 0;
+	int deleted_inode = 0;
+	int queue_revalidate = 0;
 
 	dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
 	     inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2377,9 +2412,7 @@
 	if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
 	    (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
 	    !ci->i_wrbuffer_ref) {
-		if (try_nonblocking_invalidate(inode) == 0) {
-			revoked_rdcache = 1;
-		} else {
+		if (try_nonblocking_invalidate(inode)) {
 			/* there were locked pages.. invalidate later
 			   in a separate thread. */
 			if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
@@ -2387,6 +2420,8 @@
 				ci->i_rdcache_revoking = ci->i_rdcache_gen;
 			}
 		}
+
+		ceph_fscache_invalidate(inode);
 	}
 
 	/* side effects now are allowed */
@@ -2407,8 +2442,12 @@
 		     from_kgid(&init_user_ns, inode->i_gid));
 	}
 
-	if ((issued & CEPH_CAP_LINK_EXCL) == 0)
+	if ((issued & CEPH_CAP_LINK_EXCL) == 0) {
 		set_nlink(inode, le32_to_cpu(grant->nlink));
+		if (inode->i_nlink == 0 &&
+		    (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
+			deleted_inode = 1;
+	}
 
 	if ((issued & CEPH_CAP_XATTR_EXCL) == 0 && grant->xattr_len) {
 		int len = le32_to_cpu(grant->xattr_len);
@@ -2424,6 +2463,11 @@
 		}
 	}
 
+	/* Do we need to revalidate our fscache cookie. Don't bother on the
+	 * first cache cap as we already validate at cookie creation time. */
+	if ((issued & CEPH_CAP_FILE_CACHE) && ci->i_rdcache_gen > 1)
+		queue_revalidate = 1;
+
 	/* size/ctime/mtime/atime? */
 	ceph_fill_file_size(inode, issued,
 			    le32_to_cpu(grant->truncate_seq),
@@ -2508,6 +2552,7 @@
 	BUG_ON(cap->issued & ~cap->implemented);
 
 	spin_unlock(&ci->i_ceph_lock);
+
 	if (writeback)
 		/*
 		 * queue inode for writeback: we can't actually call
@@ -2517,6 +2562,10 @@
 		ceph_queue_writeback(inode);
 	if (queue_invalidate)
 		ceph_queue_invalidate(inode);
+	if (deleted_inode)
+		invalidate_aliases(inode);
+	if (queue_revalidate)
+		ceph_queue_revalidate(inode);
 	if (wake)
 		wake_up_all(&ci->i_cap_wq);
 
@@ -2673,8 +2722,10 @@
 					  truncate_seq, truncate_size, size);
 	spin_unlock(&ci->i_ceph_lock);
 
-	if (queue_trunc)
+	if (queue_trunc) {
 		ceph_queue_vmtruncate(inode);
+		ceph_fscache_invalidate(inode);
+	}
 }
 
 /*
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index a40ceda..868b61d 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -793,6 +793,8 @@
 	req->r_locked_dir = dir;
 	req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
 	req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
+	/* release LINK_SHARED on source inode (mds will lock it) */
+	req->r_old_inode_drop = CEPH_CAP_LINK_SHARED;
 	err = ceph_mdsc_do_request(mdsc, dir, req);
 	if (err) {
 		d_drop(dentry);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 2ddf061..3de8982 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -8,9 +8,11 @@
 #include <linux/namei.h>
 #include <linux/writeback.h>
 #include <linux/aio.h>
+#include <linux/falloc.h>
 
 #include "super.h"
 #include "mds_client.h"
+#include "cache.h"
 
 /*
  * Ceph file operations
@@ -68,9 +70,23 @@
 {
 	struct ceph_file_info *cf;
 	int ret = 0;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_fs_client *fsc = ceph_sb_to_client(inode->i_sb);
+	struct ceph_mds_client *mdsc = fsc->mdsc;
 
 	switch (inode->i_mode & S_IFMT) {
 	case S_IFREG:
+		/* First file open request creates the cookie, we want to keep
+		 * this cookie around for the filetime of the inode as not to
+		 * have to worry about fscache register / revoke / operation
+		 * races.
+		 *
+		 * Also, if we know the operation is going to invalidate data
+		 * (non readonly) just nuke the cache right away.
+		 */
+		ceph_fscache_register_inode_cookie(mdsc->fsc, ci);
+		if ((fmode & CEPH_FILE_MODE_WR))
+			ceph_fscache_invalidate(inode);
 	case S_IFDIR:
 		dout("init_file %p %p 0%o (regular)\n", inode, file,
 		     inode->i_mode);
@@ -181,6 +197,7 @@
 		spin_unlock(&ci->i_ceph_lock);
 		return ceph_init_file(inode, file, fmode);
 	}
+
 	spin_unlock(&ci->i_ceph_lock);
 
 	dout("open fmode %d wants %s\n", fmode, ceph_cap_string(wanted));
@@ -191,6 +208,7 @@
 	}
 	req->r_inode = inode;
 	ihold(inode);
+
 	req->r_num_caps = 1;
 	if (flags & (O_CREAT|O_TRUNC))
 		parent_inode = ceph_get_dentry_parent_inode(file->f_dentry);
@@ -313,9 +331,9 @@
 {
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	u64 pos, this_len;
+	u64 pos, this_len, left;
 	int io_align, page_align;
-	int left, pages_left;
+	int pages_left;
 	int read;
 	struct page **page_pos;
 	int ret;
@@ -346,47 +364,40 @@
 		ret = 0;
 	hit_stripe = this_len < left;
 	was_short = ret >= 0 && ret < this_len;
-	dout("striped_read %llu~%u (read %u) got %d%s%s\n", pos, left, read,
+	dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
 	     ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
 
-	if (ret > 0) {
-		int didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
-
-		if (read < pos - off) {
-			dout(" zero gap %llu to %llu\n", off + read, pos);
-			ceph_zero_page_vector_range(page_align + read,
-						    pos - off - read, pages);
+	if (ret >= 0) {
+		int didpages;
+		if (was_short && (pos + ret < inode->i_size)) {
+			u64 tmp = min(this_len - ret,
+					inode->i_size - pos - ret);
+			dout(" zero gap %llu to %llu\n",
+				pos + ret, pos + ret + tmp);
+			ceph_zero_page_vector_range(page_align + read + ret,
+							tmp, pages);
+			ret += tmp;
 		}
+
+		didpages = (page_align + ret) >> PAGE_CACHE_SHIFT;
 		pos += ret;
 		read = pos - off;
 		left -= ret;
 		page_pos += didpages;
 		pages_left -= didpages;
 
-		/* hit stripe? */
-		if (left && hit_stripe)
+		/* hit stripe and need continue*/
+		if (left && hit_stripe && pos < inode->i_size)
 			goto more;
 	}
 
-	if (was_short) {
+	if (read > 0) {
+		ret = read;
 		/* did we bounce off eof? */
 		if (pos + left > inode->i_size)
 			*checkeof = 1;
-
-		/* zero trailing bytes (inside i_size) */
-		if (left > 0 && pos < inode->i_size) {
-			if (pos + left > inode->i_size)
-				left = inode->i_size - pos;
-
-			dout("zero tail %d\n", left);
-			ceph_zero_page_vector_range(page_align + read, left,
-						    pages);
-			read += left;
-		}
 	}
 
-	if (ret >= 0)
-		ret = read;
 	dout("striped_read returns %d\n", ret);
 	return ret;
 }
@@ -618,6 +629,8 @@
 		if (check_caps)
 			ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
 					NULL);
+	} else if (ret != -EOLDSNAPC && written > 0) {
+		ret = written;
 	}
 	return ret;
 }
@@ -659,7 +672,6 @@
 
 	if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
 	    (iocb->ki_filp->f_flags & O_DIRECT) ||
-	    (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
 	    (fi->flags & CEPH_F_SYNC))
 		/* hmm, this isn't really async... */
 		ret = ceph_sync_read(filp, base, len, ppos, &checkeof);
@@ -711,13 +723,11 @@
 		&ceph_sb_to_client(inode->i_sb)->client->osdc;
 	ssize_t count, written = 0;
 	int err, want, got;
-	bool hold_mutex;
 
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EROFS;
 
 	mutex_lock(&inode->i_mutex);
-	hold_mutex = true;
 
 	err = generic_segment_checks(iov, &nr_segs, &count, VERIFY_READ);
 	if (err)
@@ -763,18 +773,31 @@
 
 	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
 	    (iocb->ki_filp->f_flags & O_DIRECT) ||
-	    (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
 	    (fi->flags & CEPH_F_SYNC)) {
 		mutex_unlock(&inode->i_mutex);
 		written = ceph_sync_write(file, iov->iov_base, count,
 					  pos, &iocb->ki_pos);
+		if (written == -EOLDSNAPC) {
+			dout("aio_write %p %llx.%llx %llu~%u"
+				"got EOLDSNAPC, retrying\n",
+				inode, ceph_vinop(inode),
+				pos, (unsigned)iov->iov_len);
+			mutex_lock(&inode->i_mutex);
+			goto retry_snap;
+		}
 	} else {
+		/*
+		 * No need to acquire the i_truncate_mutex. Because
+		 * the MDS revokes Fwb caps before sending truncate
+		 * message to us. We can't get Fwb cap while there
+		 * are pending vmtruncate. So write and vmtruncate
+		 * can not run at the same time
+		 */
 		written = generic_file_buffered_write(iocb, iov, nr_segs,
 						      pos, &iocb->ki_pos,
 						      count, 0);
 		mutex_unlock(&inode->i_mutex);
 	}
-	hold_mutex = false;
 
 	if (written >= 0) {
 		int dirty;
@@ -798,18 +821,12 @@
 			written = err;
 	}
 
-	if (written == -EOLDSNAPC) {
-		dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
-		     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len);
-		mutex_lock(&inode->i_mutex);
-		hold_mutex = true;
-		goto retry_snap;
-	}
-out:
-	if (hold_mutex)
-		mutex_unlock(&inode->i_mutex);
-	current->backing_dev_info = NULL;
+	goto out_unlocked;
 
+out:
+	mutex_unlock(&inode->i_mutex);
+out_unlocked:
+	current->backing_dev_info = NULL;
 	return written ? written : err;
 }
 
@@ -822,7 +839,6 @@
 	int ret;
 
 	mutex_lock(&inode->i_mutex);
-	__ceph_do_pending_vmtruncate(inode);
 
 	if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
 		ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
@@ -871,6 +887,204 @@
 	return offset;
 }
 
+static inline void ceph_zero_partial_page(
+	struct inode *inode, loff_t offset, unsigned size)
+{
+	struct page *page;
+	pgoff_t index = offset >> PAGE_CACHE_SHIFT;
+
+	page = find_lock_page(inode->i_mapping, index);
+	if (page) {
+		wait_on_page_writeback(page);
+		zero_user(page, offset & (PAGE_CACHE_SIZE - 1), size);
+		unlock_page(page);
+		page_cache_release(page);
+	}
+}
+
+static void ceph_zero_pagecache_range(struct inode *inode, loff_t offset,
+				      loff_t length)
+{
+	loff_t nearly = round_up(offset, PAGE_CACHE_SIZE);
+	if (offset < nearly) {
+		loff_t size = nearly - offset;
+		if (length < size)
+			size = length;
+		ceph_zero_partial_page(inode, offset, size);
+		offset += size;
+		length -= size;
+	}
+	if (length >= PAGE_CACHE_SIZE) {
+		loff_t size = round_down(length, PAGE_CACHE_SIZE);
+		truncate_pagecache_range(inode, offset, offset + size - 1);
+		offset += size;
+		length -= size;
+	}
+	if (length)
+		ceph_zero_partial_page(inode, offset, length);
+}
+
+static int ceph_zero_partial_object(struct inode *inode,
+				    loff_t offset, loff_t *length)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+	struct ceph_osd_request *req;
+	int ret = 0;
+	loff_t zero = 0;
+	int op;
+
+	if (!length) {
+		op = offset ? CEPH_OSD_OP_DELETE : CEPH_OSD_OP_TRUNCATE;
+		length = &zero;
+	} else {
+		op = CEPH_OSD_OP_ZERO;
+	}
+
+	req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+					ceph_vino(inode),
+					offset, length,
+					1, op,
+					CEPH_OSD_FLAG_WRITE |
+					CEPH_OSD_FLAG_ONDISK,
+					NULL, 0, 0, false);
+	if (IS_ERR(req)) {
+		ret = PTR_ERR(req);
+		goto out;
+	}
+
+	ceph_osdc_build_request(req, offset, NULL, ceph_vino(inode).snap,
+				&inode->i_mtime);
+
+	ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+	if (!ret) {
+		ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+		if (ret == -ENOENT)
+			ret = 0;
+	}
+	ceph_osdc_put_request(req);
+
+out:
+	return ret;
+}
+
+static int ceph_zero_objects(struct inode *inode, loff_t offset, loff_t length)
+{
+	int ret = 0;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	s32 stripe_unit = ceph_file_layout_su(ci->i_layout);
+	s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
+	s32 object_size = ceph_file_layout_object_size(ci->i_layout);
+	u64 object_set_size = object_size * stripe_count;
+	u64 nearly, t;
+
+	/* round offset up to next period boundary */
+	nearly = offset + object_set_size - 1;
+	t = nearly;
+	nearly -= do_div(t, object_set_size);
+
+	while (length && offset < nearly) {
+		loff_t size = length;
+		ret = ceph_zero_partial_object(inode, offset, &size);
+		if (ret < 0)
+			return ret;
+		offset += size;
+		length -= size;
+	}
+	while (length >= object_set_size) {
+		int i;
+		loff_t pos = offset;
+		for (i = 0; i < stripe_count; ++i) {
+			ret = ceph_zero_partial_object(inode, pos, NULL);
+			if (ret < 0)
+				return ret;
+			pos += stripe_unit;
+		}
+		offset += object_set_size;
+		length -= object_set_size;
+	}
+	while (length) {
+		loff_t size = length;
+		ret = ceph_zero_partial_object(inode, offset, &size);
+		if (ret < 0)
+			return ret;
+		offset += size;
+		length -= size;
+	}
+	return ret;
+}
+
+static long ceph_fallocate(struct file *file, int mode,
+				loff_t offset, loff_t length)
+{
+	struct ceph_file_info *fi = file->private_data;
+	struct inode *inode = file->f_dentry->d_inode;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_osd_client *osdc =
+		&ceph_inode_to_client(inode)->client->osdc;
+	int want, got = 0;
+	int dirty;
+	int ret = 0;
+	loff_t endoff = 0;
+	loff_t size;
+
+	if (!S_ISREG(inode->i_mode))
+		return -EOPNOTSUPP;
+
+	if (IS_SWAPFILE(inode))
+		return -ETXTBSY;
+
+	mutex_lock(&inode->i_mutex);
+
+	if (ceph_snap(inode) != CEPH_NOSNAP) {
+		ret = -EROFS;
+		goto unlock;
+	}
+
+	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) &&
+		!(mode & FALLOC_FL_PUNCH_HOLE)) {
+		ret = -ENOSPC;
+		goto unlock;
+	}
+
+	size = i_size_read(inode);
+	if (!(mode & FALLOC_FL_KEEP_SIZE))
+		endoff = offset + length;
+
+	if (fi->fmode & CEPH_FILE_MODE_LAZY)
+		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+	else
+		want = CEPH_CAP_FILE_BUFFER;
+
+	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
+	if (ret < 0)
+		goto unlock;
+
+	if (mode & FALLOC_FL_PUNCH_HOLE) {
+		if (offset < size)
+			ceph_zero_pagecache_range(inode, offset, length);
+		ret = ceph_zero_objects(inode, offset, length);
+	} else if (endoff > size) {
+		truncate_pagecache_range(inode, size, -1);
+		if (ceph_inode_set_size(inode, endoff))
+			ceph_check_caps(ceph_inode(inode),
+				CHECK_CAPS_AUTHONLY, NULL);
+	}
+
+	if (!ret) {
+		spin_lock(&ci->i_ceph_lock);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+		spin_unlock(&ci->i_ceph_lock);
+		if (dirty)
+			__mark_inode_dirty(inode, dirty);
+	}
+
+	ceph_put_cap_refs(ci, got);
+unlock:
+	mutex_unlock(&inode->i_mutex);
+	return ret;
+}
+
 const struct file_operations ceph_file_fops = {
 	.open = ceph_open,
 	.release = ceph_release,
@@ -887,5 +1101,6 @@
 	.splice_write = generic_file_splice_write,
 	.unlocked_ioctl = ceph_ioctl,
 	.compat_ioctl	= ceph_ioctl,
+	.fallocate	= ceph_fallocate,
 };
 
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index f3a2abf..8549a48 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -12,6 +12,7 @@
 
 #include "super.h"
 #include "mds_client.h"
+#include "cache.h"
 #include <linux/ceph/decode.h>
 
 /*
@@ -344,6 +345,7 @@
 	for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
 		ci->i_nr_by_mode[i] = 0;
 
+	mutex_init(&ci->i_truncate_mutex);
 	ci->i_truncate_seq = 0;
 	ci->i_truncate_size = 0;
 	ci->i_truncate_pending = 0;
@@ -377,6 +379,8 @@
 
 	INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
 
+	ceph_fscache_inode_init(ci);
+
 	return &ci->vfs_inode;
 }
 
@@ -396,6 +400,8 @@
 
 	dout("destroy_inode %p ino %llx.%llx\n", inode, ceph_vinop(inode));
 
+	ceph_fscache_unregister_inode_cookie(ci);
+
 	ceph_queue_caps_release(inode);
 
 	/*
@@ -430,7 +436,6 @@
 	call_rcu(&inode->i_rcu, ceph_i_callback);
 }
 
-
 /*
  * Helpers to fill in size, ctime, mtime, and atime.  We have to be
  * careful because either the client or MDS may have more up to date
@@ -455,16 +460,20 @@
 			dout("truncate_seq %u -> %u\n",
 			     ci->i_truncate_seq, truncate_seq);
 			ci->i_truncate_seq = truncate_seq;
+
+			/* the MDS should have revoked these caps */
+			WARN_ON_ONCE(issued & (CEPH_CAP_FILE_EXCL |
+					       CEPH_CAP_FILE_RD |
+					       CEPH_CAP_FILE_WR |
+					       CEPH_CAP_FILE_LAZYIO));
 			/*
 			 * If we hold relevant caps, or in the case where we're
 			 * not the only client referencing this file and we
 			 * don't hold those caps, then we need to check whether
 			 * the file is either opened or mmaped
 			 */
-			if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_RD|
-				       CEPH_CAP_FILE_WR|CEPH_CAP_FILE_BUFFER|
-				       CEPH_CAP_FILE_EXCL|
-				       CEPH_CAP_FILE_LAZYIO)) ||
+			if ((issued & (CEPH_CAP_FILE_CACHE|
+				       CEPH_CAP_FILE_BUFFER)) ||
 			    mapping_mapped(inode->i_mapping) ||
 			    __ceph_caps_file_wanted(ci)) {
 				ci->i_truncate_pending++;
@@ -478,6 +487,10 @@
 		     truncate_size);
 		ci->i_truncate_size = truncate_size;
 	}
+
+	if (queue_trunc)
+		ceph_fscache_invalidate(inode);
+
 	return queue_trunc;
 }
 
@@ -1066,7 +1079,7 @@
 			 * complete.
 			 */
 			ceph_set_dentry_offset(req->r_old_dentry);
-			dout("dn %p gets new offset %lld\n", req->r_old_dentry, 
+			dout("dn %p gets new offset %lld\n", req->r_old_dentry,
 			     ceph_dentry(req->r_old_dentry)->offset);
 
 			dn = req->r_old_dentry;  /* use old_dentry */
@@ -1419,18 +1432,20 @@
 	u32 orig_gen;
 	int check = 0;
 
+	mutex_lock(&ci->i_truncate_mutex);
 	spin_lock(&ci->i_ceph_lock);
 	dout("invalidate_pages %p gen %d revoking %d\n", inode,
 	     ci->i_rdcache_gen, ci->i_rdcache_revoking);
 	if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
 		/* nevermind! */
 		spin_unlock(&ci->i_ceph_lock);
+		mutex_unlock(&ci->i_truncate_mutex);
 		goto out;
 	}
 	orig_gen = ci->i_rdcache_gen;
 	spin_unlock(&ci->i_ceph_lock);
 
-	truncate_inode_pages(&inode->i_data, 0);
+	truncate_inode_pages(inode->i_mapping, 0);
 
 	spin_lock(&ci->i_ceph_lock);
 	if (orig_gen == ci->i_rdcache_gen &&
@@ -1445,6 +1460,7 @@
 		     ci->i_rdcache_revoking);
 	}
 	spin_unlock(&ci->i_ceph_lock);
+	mutex_unlock(&ci->i_truncate_mutex);
 
 	if (check)
 		ceph_check_caps(ci, 0, NULL);
@@ -1465,9 +1481,7 @@
 	struct inode *inode = &ci->vfs_inode;
 
 	dout("vmtruncate_work %p\n", inode);
-	mutex_lock(&inode->i_mutex);
 	__ceph_do_pending_vmtruncate(inode);
-	mutex_unlock(&inode->i_mutex);
 	iput(inode);
 }
 
@@ -1480,6 +1494,7 @@
 	struct ceph_inode_info *ci = ceph_inode(inode);
 
 	ihold(inode);
+
 	if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
 		       &ci->i_vmtruncate_work)) {
 		dout("ceph_queue_vmtruncate %p\n", inode);
@@ -1500,11 +1515,13 @@
 	u64 to;
 	int wrbuffer_refs, finish = 0;
 
+	mutex_lock(&ci->i_truncate_mutex);
 retry:
 	spin_lock(&ci->i_ceph_lock);
 	if (ci->i_truncate_pending == 0) {
 		dout("__do_pending_vmtruncate %p none pending\n", inode);
 		spin_unlock(&ci->i_ceph_lock);
+		mutex_unlock(&ci->i_truncate_mutex);
 		return;
 	}
 
@@ -1521,6 +1538,9 @@
 		goto retry;
 	}
 
+	/* there should be no reader or writer */
+	WARN_ON_ONCE(ci->i_rd_ref || ci->i_wr_ref);
+
 	to = ci->i_truncate_size;
 	wrbuffer_refs = ci->i_wrbuffer_ref;
 	dout("__do_pending_vmtruncate %p (%d) to %lld\n", inode,
@@ -1538,13 +1558,14 @@
 	if (!finish)
 		goto retry;
 
+	mutex_unlock(&ci->i_truncate_mutex);
+
 	if (wrbuffer_refs == 0)
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 
 	wake_up_all(&ci->i_cap_wq);
 }
 
-
 /*
  * symlinks
  */
@@ -1586,8 +1607,6 @@
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EROFS;
 
-	__ceph_do_pending_vmtruncate(inode);
-
 	err = inode_change_ok(inode, attr);
 	if (err != 0)
 		return err;
@@ -1768,7 +1787,8 @@
 	     ceph_cap_string(dirtied), mask);
 
 	ceph_mdsc_put_request(req);
-	__ceph_do_pending_vmtruncate(inode);
+	if (mask & CEPH_SETATTR_SIZE)
+		__ceph_do_pending_vmtruncate(inode);
 	return err;
 out:
 	spin_unlock(&ci->i_ceph_lock);
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index e0b4ef3..669622f 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -196,8 +196,10 @@
 	r = ceph_calc_file_object_mapping(&ci->i_layout, dl.file_offset, len,
 					  &dl.object_no, &dl.object_offset,
 					  &olen);
-	if (r < 0)
+	if (r < 0) {
+		up_read(&osdc->map_sem);
 		return -EIO;
+	}
 	dl.file_offset -= dl.object_offset;
 	dl.object_size = ceph_file_layout_object_size(ci->i_layout);
 	dl.block_size = ceph_file_layout_su(ci->i_layout);
@@ -209,8 +211,12 @@
 	snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx",
 		 ceph_ino(inode), dl.object_no);
 
-	ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap,
-		ceph_file_layout_pg_pool(ci->i_layout));
+	r = ceph_calc_ceph_pg(&pgid, dl.object_name, osdc->osdmap,
+				ceph_file_layout_pg_pool(ci->i_layout));
+	if (r < 0) {
+		up_read(&osdc->map_sem);
+		return r;
+	}
 
 	dl.osd = ceph_calc_pg_primary(osdc->osdmap, pgid);
 	if (dl.osd >= 0) {
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 187bf21..b7bda5d 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -414,6 +414,9 @@
 {
 	struct ceph_mds_session *s;
 
+	if (mds >= mdsc->mdsmap->m_max_mds)
+		return ERR_PTR(-EINVAL);
+
 	s = kzalloc(sizeof(*s), GFP_NOFS);
 	if (!s)
 		return ERR_PTR(-ENOMEM);
@@ -1028,6 +1031,37 @@
 {
 	dout("remove_session_caps on %p\n", session);
 	iterate_session_caps(session, remove_session_caps_cb, NULL);
+
+	spin_lock(&session->s_cap_lock);
+	if (session->s_nr_caps > 0) {
+		struct super_block *sb = session->s_mdsc->fsc->sb;
+		struct inode *inode;
+		struct ceph_cap *cap, *prev = NULL;
+		struct ceph_vino vino;
+		/*
+		 * iterate_session_caps() skips inodes that are being
+		 * deleted, we need to wait until deletions are complete.
+		 * __wait_on_freeing_inode() is designed for the job,
+		 * but it is not exported, so use lookup inode function
+		 * to access it.
+		 */
+		while (!list_empty(&session->s_caps)) {
+			cap = list_entry(session->s_caps.next,
+					 struct ceph_cap, session_caps);
+			if (cap == prev)
+				break;
+			prev = cap;
+			vino = cap->ci->i_vino;
+			spin_unlock(&session->s_cap_lock);
+
+			inode = ceph_find_inode(sb, vino);
+			iput(inode);
+
+			spin_lock(&session->s_cap_lock);
+		}
+	}
+	spin_unlock(&session->s_cap_lock);
+
 	BUG_ON(session->s_nr_caps > 0);
 	BUG_ON(!list_empty(&session->s_cap_flushing));
 	cleanup_cap_releases(session);
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 6627b26..6a0951e 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -17,6 +17,7 @@
 
 #include "super.h"
 #include "mds_client.h"
+#include "cache.h"
 
 #include <linux/ceph/ceph_features.h>
 #include <linux/ceph/decode.h>
@@ -142,6 +143,8 @@
 	Opt_nodcache,
 	Opt_ino32,
 	Opt_noino32,
+	Opt_fscache,
+	Opt_nofscache
 };
 
 static match_table_t fsopt_tokens = {
@@ -167,6 +170,8 @@
 	{Opt_nodcache, "nodcache"},
 	{Opt_ino32, "ino32"},
 	{Opt_noino32, "noino32"},
+	{Opt_fscache, "fsc"},
+	{Opt_nofscache, "nofsc"},
 	{-1, NULL}
 };
 
@@ -260,6 +265,12 @@
 	case Opt_noino32:
 		fsopt->flags &= ~CEPH_MOUNT_OPT_INO32;
 		break;
+	case Opt_fscache:
+		fsopt->flags |= CEPH_MOUNT_OPT_FSCACHE;
+		break;
+	case Opt_nofscache:
+		fsopt->flags &= ~CEPH_MOUNT_OPT_FSCACHE;
+		break;
 	default:
 		BUG_ON(token);
 	}
@@ -422,6 +433,10 @@
 		seq_puts(m, ",dcache");
 	else
 		seq_puts(m, ",nodcache");
+	if (fsopt->flags & CEPH_MOUNT_OPT_FSCACHE)
+		seq_puts(m, ",fsc");
+	else
+		seq_puts(m, ",nofsc");
 
 	if (fsopt->wsize)
 		seq_printf(m, ",wsize=%d", fsopt->wsize);
@@ -530,11 +545,18 @@
 	if (!fsc->wb_pagevec_pool)
 		goto fail_trunc_wq;
 
+	/* setup fscache */
+	if ((fsopt->flags & CEPH_MOUNT_OPT_FSCACHE) &&
+	    (ceph_fscache_register_fs(fsc) != 0))
+		goto fail_fscache;
+
 	/* caps */
 	fsc->min_caps = fsopt->max_readdir;
 
 	return fsc;
 
+fail_fscache:
+	ceph_fscache_unregister_fs(fsc);
 fail_trunc_wq:
 	destroy_workqueue(fsc->trunc_wq);
 fail_pg_inv_wq:
@@ -554,6 +576,8 @@
 {
 	dout("destroy_fs_client %p\n", fsc);
 
+	ceph_fscache_unregister_fs(fsc);
+
 	destroy_workqueue(fsc->wb_wq);
 	destroy_workqueue(fsc->pg_inv_wq);
 	destroy_workqueue(fsc->trunc_wq);
@@ -588,6 +612,8 @@
 
 static int __init init_caches(void)
 {
+	int error = -ENOMEM;
+
 	ceph_inode_cachep = kmem_cache_create("ceph_inode_info",
 				      sizeof(struct ceph_inode_info),
 				      __alignof__(struct ceph_inode_info),
@@ -611,15 +637,17 @@
 	if (ceph_file_cachep == NULL)
 		goto bad_file;
 
-	return 0;
+	if ((error = ceph_fscache_register()))
+		goto bad_file;
 
+	return 0;
 bad_file:
 	kmem_cache_destroy(ceph_dentry_cachep);
 bad_dentry:
 	kmem_cache_destroy(ceph_cap_cachep);
 bad_cap:
 	kmem_cache_destroy(ceph_inode_cachep);
-	return -ENOMEM;
+	return error;
 }
 
 static void destroy_caches(void)
@@ -629,10 +657,13 @@
 	 * destroy cache.
 	 */
 	rcu_barrier();
+
 	kmem_cache_destroy(ceph_inode_cachep);
 	kmem_cache_destroy(ceph_cap_cachep);
 	kmem_cache_destroy(ceph_dentry_cachep);
 	kmem_cache_destroy(ceph_file_cachep);
+
+	ceph_fscache_unregister();
 }
 
 
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index cbded57..6014b0a 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -16,6 +16,10 @@
 
 #include <linux/ceph/libceph.h>
 
+#ifdef CONFIG_CEPH_FSCACHE
+#include <linux/fscache.h>
+#endif
+
 /* f_type in struct statfs */
 #define CEPH_SUPER_MAGIC 0x00c36400
 
@@ -29,6 +33,7 @@
 #define CEPH_MOUNT_OPT_NOASYNCREADDIR  (1<<7) /* no dcache readdir */
 #define CEPH_MOUNT_OPT_INO32           (1<<8) /* 32 bit inos */
 #define CEPH_MOUNT_OPT_DCACHE          (1<<9) /* use dcache for readdir etc */
+#define CEPH_MOUNT_OPT_FSCACHE         (1<<10) /* use fscache */
 
 #define CEPH_MOUNT_OPT_DEFAULT    (CEPH_MOUNT_OPT_RBYTES)
 
@@ -90,6 +95,11 @@
 	struct dentry *debugfs_bdi;
 	struct dentry *debugfs_mdsc, *debugfs_mdsmap;
 #endif
+
+#ifdef CONFIG_CEPH_FSCACHE
+	struct fscache_cookie *fscache;
+	struct workqueue_struct *revalidate_wq;
+#endif
 };
 
 
@@ -288,6 +298,7 @@
 
 	int i_nr_by_mode[CEPH_FILE_MODE_NUM];  /* open file counts */
 
+	struct mutex i_truncate_mutex;
 	u32 i_truncate_seq;        /* last truncate to smaller size */
 	u64 i_truncate_size;       /*  and the size we last truncated down to */
 	int i_truncate_pending;    /*  still need to call vmtruncate */
@@ -319,6 +330,12 @@
 
 	struct work_struct i_vmtruncate_work;
 
+#ifdef CONFIG_CEPH_FSCACHE
+	struct fscache_cookie *fscache;
+	u32 i_fscache_gen; /* sequence, for delayed fscache validate */
+	struct work_struct i_revalidate_work;
+#endif
+
 	struct inode vfs_inode; /* at end */
 };
 
diff --git a/fs/fscache/cookie.c b/fs/fscache/cookie.c
index 0e91a3c..318e843 100644
--- a/fs/fscache/cookie.c
+++ b/fs/fscache/cookie.c
@@ -558,3 +558,74 @@
 
 	_leave("");
 }
+
+/*
+ * check the consistency between the netfs inode and the backing cache
+ *
+ * NOTE: it only serves no-index type
+ */
+int __fscache_check_consistency(struct fscache_cookie *cookie)
+{
+	struct fscache_operation *op;
+	struct fscache_object *object;
+	int ret;
+
+	_enter("%p,", cookie);
+
+	ASSERTCMP(cookie->def->type, ==, FSCACHE_COOKIE_TYPE_DATAFILE);
+
+	if (fscache_wait_for_deferred_lookup(cookie) < 0)
+		return -ERESTARTSYS;
+
+	if (hlist_empty(&cookie->backing_objects))
+		return 0;
+
+	op = kzalloc(sizeof(*op), GFP_NOIO | __GFP_NOMEMALLOC | __GFP_NORETRY);
+	if (!op)
+		return -ENOMEM;
+
+	fscache_operation_init(op, NULL, NULL);
+	op->flags = FSCACHE_OP_MYTHREAD |
+		(1 << FSCACHE_OP_WAITING);
+
+	spin_lock(&cookie->lock);
+
+	if (hlist_empty(&cookie->backing_objects))
+		goto inconsistent;
+	object = hlist_entry(cookie->backing_objects.first,
+			     struct fscache_object, cookie_link);
+	if (test_bit(FSCACHE_IOERROR, &object->cache->flags))
+		goto inconsistent;
+
+	op->debug_id = atomic_inc_return(&fscache_op_debug_id);
+
+	atomic_inc(&cookie->n_active);
+	if (fscache_submit_op(object, op) < 0)
+		goto submit_failed;
+
+	/* the work queue now carries its own ref on the object */
+	spin_unlock(&cookie->lock);
+
+	ret = fscache_wait_for_operation_activation(object, op,
+						    NULL, NULL, NULL);
+	if (ret == 0) {
+		/* ask the cache to honour the operation */
+		ret = object->cache->ops->check_consistency(op);
+		fscache_op_complete(op, false);
+	} else if (ret == -ENOBUFS) {
+		ret = 0;
+	}
+
+	fscache_put_operation(op);
+	_leave(" = %d", ret);
+	return ret;
+
+submit_failed:
+	atomic_dec(&cookie->n_active);
+inconsistent:
+	spin_unlock(&cookie->lock);
+	kfree(op);
+	_leave(" = -ESTALE");
+	return -ESTALE;
+}
+EXPORT_SYMBOL(__fscache_check_consistency);
diff --git a/fs/fscache/internal.h b/fs/fscache/internal.h
index 12d505b..4226f66 100644
--- a/fs/fscache/internal.h
+++ b/fs/fscache/internal.h
@@ -130,6 +130,12 @@
 /*
  * page.c
  */
+extern int fscache_wait_for_deferred_lookup(struct fscache_cookie *);
+extern int fscache_wait_for_operation_activation(struct fscache_object *,
+						 struct fscache_operation *,
+						 atomic_t *,
+						 atomic_t *,
+						 void (*)(struct fscache_operation *));
 extern void fscache_invalidate_writes(struct fscache_cookie *);
 
 /*
diff --git a/fs/fscache/page.c b/fs/fscache/page.c
index d479ab3..8702b73 100644
--- a/fs/fscache/page.c
+++ b/fs/fscache/page.c
@@ -278,7 +278,7 @@
 /*
  * wait for a deferred lookup to complete
  */
-static int fscache_wait_for_deferred_lookup(struct fscache_cookie *cookie)
+int fscache_wait_for_deferred_lookup(struct fscache_cookie *cookie)
 {
 	unsigned long jif;
 
@@ -322,42 +322,46 @@
 /*
  * wait for an object to become active (or dead)
  */
-static int fscache_wait_for_retrieval_activation(struct fscache_object *object,
-						 struct fscache_retrieval *op,
-						 atomic_t *stat_op_waits,
-						 atomic_t *stat_object_dead)
+int fscache_wait_for_operation_activation(struct fscache_object *object,
+					  struct fscache_operation *op,
+					  atomic_t *stat_op_waits,
+					  atomic_t *stat_object_dead,
+					  void (*do_cancel)(struct fscache_operation *))
 {
 	int ret;
 
-	if (!test_bit(FSCACHE_OP_WAITING, &op->op.flags))
+	if (!test_bit(FSCACHE_OP_WAITING, &op->flags))
 		goto check_if_dead;
 
 	_debug(">>> WT");
-	fscache_stat(stat_op_waits);
-	if (wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING,
+	if (stat_op_waits)
+		fscache_stat(stat_op_waits);
+	if (wait_on_bit(&op->flags, FSCACHE_OP_WAITING,
 			fscache_wait_bit_interruptible,
 			TASK_INTERRUPTIBLE) != 0) {
-		ret = fscache_cancel_op(&op->op, fscache_do_cancel_retrieval);
+		ret = fscache_cancel_op(op, do_cancel);
 		if (ret == 0)
 			return -ERESTARTSYS;
 
 		/* it's been removed from the pending queue by another party,
 		 * so we should get to run shortly */
-		wait_on_bit(&op->op.flags, FSCACHE_OP_WAITING,
+		wait_on_bit(&op->flags, FSCACHE_OP_WAITING,
 			    fscache_wait_bit, TASK_UNINTERRUPTIBLE);
 	}
 	_debug("<<< GO");
 
 check_if_dead:
-	if (op->op.state == FSCACHE_OP_ST_CANCELLED) {
-		fscache_stat(stat_object_dead);
+	if (op->state == FSCACHE_OP_ST_CANCELLED) {
+		if (stat_object_dead)
+			fscache_stat(stat_object_dead);
 		_leave(" = -ENOBUFS [cancelled]");
 		return -ENOBUFS;
 	}
 	if (unlikely(fscache_object_is_dead(object))) {
-		pr_err("%s() = -ENOBUFS [obj dead %d]\n", __func__, op->op.state);
-		fscache_cancel_op(&op->op, fscache_do_cancel_retrieval);
-		fscache_stat(stat_object_dead);
+		pr_err("%s() = -ENOBUFS [obj dead %d]\n", __func__, op->state);
+		fscache_cancel_op(op, do_cancel);
+		if (stat_object_dead)
+			fscache_stat(stat_object_dead);
 		return -ENOBUFS;
 	}
 	return 0;
@@ -432,10 +436,11 @@
 
 	/* we wait for the operation to become active, and then process it
 	 * *here*, in this thread, and not in the thread pool */
-	ret = fscache_wait_for_retrieval_activation(
-		object, op,
+	ret = fscache_wait_for_operation_activation(
+		object, &op->op,
 		__fscache_stat(&fscache_n_retrieval_op_waits),
-		__fscache_stat(&fscache_n_retrievals_object_dead));
+		__fscache_stat(&fscache_n_retrievals_object_dead),
+		fscache_do_cancel_retrieval);
 	if (ret < 0)
 		goto error;
 
@@ -557,10 +562,11 @@
 
 	/* we wait for the operation to become active, and then process it
 	 * *here*, in this thread, and not in the thread pool */
-	ret = fscache_wait_for_retrieval_activation(
-		object, op,
+	ret = fscache_wait_for_operation_activation(
+		object, &op->op,
 		__fscache_stat(&fscache_n_retrieval_op_waits),
-		__fscache_stat(&fscache_n_retrievals_object_dead));
+		__fscache_stat(&fscache_n_retrievals_object_dead),
+		fscache_do_cancel_retrieval);
 	if (ret < 0)
 		goto error;
 
@@ -658,10 +664,11 @@
 
 	fscache_stat(&fscache_n_alloc_ops);
 
-	ret = fscache_wait_for_retrieval_activation(
-		object, op,
+	ret = fscache_wait_for_operation_activation(
+		object, &op->op,
 		__fscache_stat(&fscache_n_alloc_op_waits),
-		__fscache_stat(&fscache_n_allocs_object_dead));
+		__fscache_stat(&fscache_n_allocs_object_dead),
+		fscache_do_cancel_retrieval);
 	if (ret < 0)
 		goto error;
 
@@ -694,6 +701,22 @@
 EXPORT_SYMBOL(__fscache_alloc_page);
 
 /*
+ * Unmark pages allocate in the readahead code path (via:
+ * fscache_readpages_or_alloc) after delegating to the base filesystem
+ */
+void __fscache_readpages_cancel(struct fscache_cookie *cookie,
+				struct list_head *pages)
+{
+	struct page *page;
+
+	list_for_each_entry(page, pages, lru) {
+		if (PageFsCache(page))
+			__fscache_uncache_page(cookie, page);
+	}
+}
+EXPORT_SYMBOL(__fscache_readpages_cancel);
+
+/*
  * release a write op reference
  */
 static void fscache_release_write_op(struct fscache_operation *_op)
diff --git a/include/linux/fscache-cache.h b/include/linux/fscache-cache.h
index a9ff9a3..7823e9e 100644
--- a/include/linux/fscache-cache.h
+++ b/include/linux/fscache-cache.h
@@ -251,6 +251,10 @@
 	/* unpin an object in the cache */
 	void (*unpin_object)(struct fscache_object *object);
 
+	/* check the consistency between the backing cache and the FS-Cache
+	 * cookie */
+	bool (*check_consistency)(struct fscache_operation *op);
+
 	/* store the updated auxiliary data on an object */
 	void (*update_object)(struct fscache_object *object);
 
diff --git a/include/linux/fscache.h b/include/linux/fscache.h
index 7a08623..19b4645 100644
--- a/include/linux/fscache.h
+++ b/include/linux/fscache.h
@@ -183,6 +183,7 @@
 	const struct fscache_cookie_def *,
 	void *);
 extern void __fscache_relinquish_cookie(struct fscache_cookie *, int);
+extern int __fscache_check_consistency(struct fscache_cookie *);
 extern void __fscache_update_cookie(struct fscache_cookie *);
 extern int __fscache_attr_changed(struct fscache_cookie *);
 extern void __fscache_invalidate(struct fscache_cookie *);
@@ -208,6 +209,8 @@
 					 gfp_t);
 extern void __fscache_uncache_all_inode_pages(struct fscache_cookie *,
 					      struct inode *);
+extern void __fscache_readpages_cancel(struct fscache_cookie *cookie,
+				       struct list_head *pages);
 
 /**
  * fscache_register_netfs - Register a filesystem as desiring caching services
@@ -326,6 +329,25 @@
 }
 
 /**
+ * fscache_check_consistency - Request that if the cache is updated
+ * @cookie: The cookie representing the cache object
+ *
+ * Request an consistency check from fscache, which passes the request
+ * to the backing cache.
+ *
+ * Returns 0 if consistent and -ESTALE if inconsistent.  May also
+ * return -ENOMEM and -ERESTARTSYS.
+ */
+static inline
+int fscache_check_consistency(struct fscache_cookie *cookie)
+{
+	if (fscache_cookie_valid(cookie))
+		return __fscache_check_consistency(cookie);
+	else
+		return 0;
+}
+
+/**
  * fscache_update_cookie - Request that a cache object be updated
  * @cookie: The cookie representing the cache object
  *
@@ -570,6 +592,26 @@
 }
 
 /**
+ * fscache_readpages_cancel - Cancel read/alloc on pages
+ * @cookie: The cookie representing the inode's cache object.
+ * @pages: The netfs pages that we canceled write on in readpages()
+ *
+ * Uncache/unreserve the pages reserved earlier in readpages() via
+ * fscache_readpages_or_alloc() and similar.  In most successful caches in
+ * readpages() this doesn't do anything.  In cases when the underlying netfs's
+ * readahead failed we need to clean up the pagelist (unmark and uncache).
+ *
+ * This function may sleep as it may have to clean up disk state.
+ */
+static inline
+void fscache_readpages_cancel(struct fscache_cookie *cookie,
+			      struct list_head *pages)
+{
+	if (fscache_cookie_valid(cookie))
+		__fscache_readpages_cancel(cookie, pages);
+}
+
+/**
  * fscache_write_page - Request storage of a page in the cache
  * @cookie: The cookie representing the cache object
  * @page: The netfs page to store
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3be308e..4a5df7b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -290,7 +290,7 @@
 	if (ceph_msgr_slab_init())
 		return -ENOMEM;
 
-	ceph_msgr_wq = alloc_workqueue("ceph-msgr", WQ_NON_REENTRANT, 0);
+	ceph_msgr_wq = alloc_workqueue("ceph-msgr", 0, 0);
 	if (ceph_msgr_wq)
 		return 0;
 
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index dd47889..1606f74 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -503,7 +503,9 @@
 	struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which, opcode);
 	size_t payload_len = 0;
 
-	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
+	       opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
+	       opcode != CEPH_OSD_OP_TRUNCATE);
 
 	op->extent.offset = offset;
 	op->extent.length = length;
@@ -631,6 +633,9 @@
 		break;
 	case CEPH_OSD_OP_READ:
 	case CEPH_OSD_OP_WRITE:
+	case CEPH_OSD_OP_ZERO:
+	case CEPH_OSD_OP_DELETE:
+	case CEPH_OSD_OP_TRUNCATE:
 		if (src->op == CEPH_OSD_OP_WRITE)
 			request_data_len = src->extent.length;
 		dst->extent.offset = cpu_to_le64(src->extent.offset);
@@ -715,7 +720,9 @@
 	u64 object_base;
 	int r;
 
-	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE);
+	BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
+	       opcode != CEPH_OSD_OP_DELETE && opcode != CEPH_OSD_OP_ZERO &&
+	       opcode != CEPH_OSD_OP_TRUNCATE);
 
 	req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
 					GFP_NOFS);
@@ -1488,14 +1495,14 @@
 	dout("handle_reply %p tid %llu req %p result %d\n", msg, tid,
 	     req, result);
 
-	ceph_decode_need(&p, end, 4, bad);
+	ceph_decode_need(&p, end, 4, bad_put);
 	numops = ceph_decode_32(&p);
 	if (numops > CEPH_OSD_MAX_OP)
 		goto bad_put;
 	if (numops != req->r_num_ops)
 		goto bad_put;
 	payload_len = 0;
-	ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad);
+	ceph_decode_need(&p, end, numops * sizeof(struct ceph_osd_op), bad_put);
 	for (i = 0; i < numops; i++) {
 		struct ceph_osd_op *op = p;
 		int len;
@@ -1513,7 +1520,7 @@
 		goto bad_put;
 	}
 
-	ceph_decode_need(&p, end, 4 + numops * 4, bad);
+	ceph_decode_need(&p, end, 4 + numops * 4, bad_put);
 	retry_attempt = ceph_decode_32(&p);
 	for (i = 0; i < numops; i++)
 		req->r_reply_op_result[i] = ceph_decode_32(&p);
@@ -1786,6 +1793,8 @@
 		nr_maps--;
 	}
 
+	if (!osdc->osdmap)
+		goto bad;
 done:
 	downgrade_write(&osdc->map_sem);
 	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
@@ -2129,6 +2138,8 @@
 			dout("osdc_start_request failed map, "
 				" will retry %lld\n", req->r_tid);
 			rc = 0;
+		} else {
+			__unregister_request(osdc, req);
 		}
 		goto out_unlock;
 	}
@@ -2253,12 +2264,10 @@
 	if (err < 0)
 		goto out_msgpool;
 
+	err = -ENOMEM;
 	osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
-	if (IS_ERR(osdc->notify_wq)) {
-		err = PTR_ERR(osdc->notify_wq);
-		osdc->notify_wq = NULL;
+	if (!osdc->notify_wq)
 		goto out_msgpool;
-	}
 	return 0;
 
 out_msgpool:
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 603ddd9..dbd9a47 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -1129,7 +1129,7 @@
 
 	/* pg_temp? */
 	pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num,
-				    pool->pgp_num_mask);
+				    pool->pg_num_mask);
 	pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
 	if (pg) {
 		*num = pg->len;