ceph: try getting buffer capability for readahead/fadvise

For readahead/fadvise cases, caller of ceph_readpages does not
hold buffer capability. Pages can be added to page cache while
there is no buffer capability. This can cause data integrity
issue.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index ef3ebd7..dbb5f7d 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -315,7 +315,32 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 	struct page **pages;
 	pgoff_t next_index;
 	int nr_pages = 0;
-	int ret;
+	int got = 0;
+	int ret = 0;
+
+	if (!current->journal_info) {
+		/* caller of readpages does not hold buffer and read caps
+		 * (fadvise, madvise and readahead cases) */
+		int want = CEPH_CAP_FILE_CACHE;
+		ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got);
+		if (ret < 0) {
+			dout("start_read %p, error getting cap\n", inode);
+		} else if (!(got & want)) {
+			dout("start_read %p, no cache cap\n", inode);
+			ret = 0;
+		}
+		if (ret <= 0) {
+			if (got)
+				ceph_put_cap_refs(ci, got);
+			while (!list_empty(page_list)) {
+				page = list_entry(page_list->prev,
+						  struct page, lru);
+				list_del(&page->lru);
+				put_page(page);
+			}
+			return ret;
+		}
+	}
 
 	off = (u64) page_offset(page);
 
@@ -338,15 +363,18 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 				    CEPH_OSD_FLAG_READ, NULL,
 				    ci->i_truncate_seq, ci->i_truncate_size,
 				    false);
-	if (IS_ERR(req))
-		return PTR_ERR(req);
+	if (IS_ERR(req)) {
+		ret = PTR_ERR(req);
+		goto out;
+	}
 
 	/* build page vector */
 	nr_pages = calc_pages_for(0, len);
 	pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
-	ret = -ENOMEM;
-	if (!pages)
-		goto out;
+	if (!pages) {
+		ret = -ENOMEM;
+		goto out_put;
+	}
 	for (i = 0; i < nr_pages; ++i) {
 		page = list_entry(page_list->prev, struct page, lru);
 		BUG_ON(PageLocked(page));
@@ -378,6 +406,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 	if (ret < 0)
 		goto out_pages;
 	ceph_osdc_put_request(req);
+
+	/* After adding locked pages to page cache, the inode holds cache cap.
+	 * So we can drop our cap refs. */
+	if (got)
+		ceph_put_cap_refs(ci, got);
+
 	return nr_pages;
 
 out_pages:
@@ -386,8 +420,11 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
 		unlock_page(pages[i]);
 	}
 	ceph_put_page_vector(pages, nr_pages, false);
-out:
+out_put:
 	ceph_osdc_put_request(req);
+out:
+	if (got)
+		ceph_put_cap_refs(ci, got);
 	return ret;
 }
 
@@ -424,7 +461,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
 		rc = start_read(inode, page_list, max);
 		if (rc < 0)
 			goto out;
-		BUG_ON(rc == 0);
 	}
 out:
 	ceph_fscache_readpages_cancel(inode, page_list);
@@ -1371,9 +1407,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
 	     inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
 
 	if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
-	    ci->i_inline_version == CEPH_INLINE_NONE)
+	    ci->i_inline_version == CEPH_INLINE_NONE) {
+		current->journal_info = vma->vm_file;
 		ret = filemap_fault(vma, vmf);
-	else
+		current->journal_info = NULL;
+	} else
 		ret = -EAGAIN;
 
 	dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 4037b38..edb407f 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -2479,6 +2479,27 @@ static void check_max_size(struct inode *inode, loff_t endoff)
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 }
 
+int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
+{
+	int ret, err = 0;
+
+	BUG_ON(need & ~CEPH_CAP_FILE_RD);
+	BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
+	ret = ceph_pool_perm_check(ci, need);
+	if (ret < 0)
+		return ret;
+
+	ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
+	if (ret) {
+		if (err == -EAGAIN) {
+			ret = 0;
+		} else if (err < 0) {
+			ret = err;
+		}
+	}
+	return ret;
+}
+
 /*
  * Wait for caps, and take cap references.  If we can't get a WR cap
  * due to a small max_size, make sure we check_max_size (and possibly
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 10cd6ac..ae3cec5 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -1249,8 +1249,9 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
 		dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
 		     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
 		     ceph_cap_string(got));
-
+		current->journal_info = filp;
 		ret = generic_file_read_iter(iocb, to);
+		current->journal_info = NULL;
 	}
 	dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
 	     inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 3e3fa916..622d5dd 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -905,6 +905,8 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
 
 extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
 			 loff_t endoff, int *got, struct page **pinned_page);
+extern int ceph_try_get_caps(struct ceph_inode_info *ci,
+			     int need, int want, int *got);
 
 /* for counting open files by mode */
 extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);