Btrfs: Optimize compressed writeback and reads

When reading compressed extents, try to put pages into the page cache
for any pages covered by the compressed extent that readpages didn't already
preload.

Add an async work queue to handle transformations at delayed allocation processing
time.  Right now this is just compression.  The workflow is:

1) Find offsets in the file marked for delayed allocation
2) Lock the pages
3) Lock the state bits
4) Call the async delalloc code

The async delalloc code clears the state lock bits and delalloc bits.  It is
important this happens before the range goes into the work queue because
otherwise it might deadlock with other work queue items that try to lock
those extent bits.

The file pages are compressed, and if the compression doesn't work the
pages are written back directly.

An ordered work queue is used to make sure the inodes are written in the same
order that pdflush or writepages sent them down.

This changes extent_write_cache_pages to let the writepage function
update the wbc nr_written count.

Signed-off-by: Chris Mason <chris.mason@oracle.com>

diff --git a/fs/btrfs/compression.c b/fs/btrfs/compression.c
index 3549131..284f210 100644
--- a/fs/btrfs/compression.c
+++ b/fs/btrfs/compression.c
@@ -33,6 +33,7 @@
 #include <linux/writeback.h>
 #include <linux/bit_spinlock.h>
 #include <linux/version.h>
+#include <linux/pagevec.h>
 #include "ctree.h"
 #include "disk-io.h"
 #include "transaction.h"
@@ -145,9 +146,9 @@
 	}
 
 	/* do io completion on the original bio */
-	if (cb->errors)
+	if (cb->errors) {
 		bio_io_error(cb->orig_bio);
-	else
+	} else
 		bio_endio(cb->orig_bio, 0);
 
 	/* finally free the cb struct */
@@ -333,6 +334,7 @@
 		}
 		bytes_left -= PAGE_CACHE_SIZE;
 		first_byte += PAGE_CACHE_SIZE;
+		cond_resched();
 	}
 	bio_get(bio);
 
@@ -346,6 +348,130 @@
 	return 0;
 }
 
+static noinline int add_ra_bio_pages(struct inode *inode,
+				     u64 compressed_end,
+				     struct compressed_bio *cb)
+{
+	unsigned long end_index;
+	unsigned long page_index;
+	u64 last_offset;
+	u64 isize = i_size_read(inode);
+	int ret;
+	struct page *page;
+	unsigned long nr_pages = 0;
+	struct extent_map *em;
+	struct address_space *mapping = inode->i_mapping;
+	struct pagevec pvec;
+	struct extent_map_tree *em_tree;
+	struct extent_io_tree *tree;
+	u64 end;
+	int misses = 0;
+
+	page = cb->orig_bio->bi_io_vec[cb->orig_bio->bi_vcnt - 1].bv_page;
+	last_offset = (page_offset(page) + PAGE_CACHE_SIZE);
+	em_tree = &BTRFS_I(inode)->extent_tree;
+	tree = &BTRFS_I(inode)->io_tree;
+
+	if (isize == 0)
+		return 0;
+
+	end_index = (i_size_read(inode) - 1) >> PAGE_CACHE_SHIFT;
+
+	pagevec_init(&pvec, 0);
+	while(last_offset < compressed_end) {
+		page_index = last_offset >> PAGE_CACHE_SHIFT;
+
+		if (page_index > end_index)
+			break;
+
+		rcu_read_lock();
+		page = radix_tree_lookup(&mapping->page_tree, page_index);
+		rcu_read_unlock();
+		if (page) {
+			misses++;
+			if (misses > 4)
+				break;
+			goto next;
+		}
+
+		page = alloc_page(mapping_gfp_mask(mapping) | GFP_NOFS);
+		if (!page)
+			break;
+
+		page->index = page_index;
+		/*
+		 * what we want to do here is call add_to_page_cache_lru,
+		 * but that isn't exported, so we reproduce it here
+		 */
+		if (add_to_page_cache(page, mapping,
+				      page->index, GFP_NOFS)) {
+			page_cache_release(page);
+			goto next;
+		}
+
+		/* open coding of lru_cache_add, also not exported */
+		page_cache_get(page);
+		if (!pagevec_add(&pvec, page))
+			__pagevec_lru_add(&pvec);
+
+		end = last_offset + PAGE_CACHE_SIZE - 1;
+		/*
+		 * at this point, we have a locked page in the page cache
+		 * for these bytes in the file.  But, we have to make
+		 * sure they map to this compressed extent on disk.
+		 */
+		set_page_extent_mapped(page);
+		lock_extent(tree, last_offset, end, GFP_NOFS);
+		spin_lock(&em_tree->lock);
+		em = lookup_extent_mapping(em_tree, last_offset,
+					   PAGE_CACHE_SIZE);
+		spin_unlock(&em_tree->lock);
+
+		if (!em || last_offset < em->start ||
+		    (last_offset + PAGE_CACHE_SIZE > extent_map_end(em)) ||
+		    (em->block_start >> 9) != cb->orig_bio->bi_sector) {
+			free_extent_map(em);
+			unlock_extent(tree, last_offset, end, GFP_NOFS);
+			unlock_page(page);
+			page_cache_release(page);
+			break;
+		}
+		free_extent_map(em);
+
+		if (page->index == end_index) {
+			char *userpage;
+			size_t zero_offset = isize & (PAGE_CACHE_SIZE - 1);
+
+			if (zero_offset) {
+				int zeros;
+				zeros = PAGE_CACHE_SIZE - zero_offset;
+				userpage = kmap_atomic(page, KM_USER0);
+				memset(userpage + zero_offset, 0, zeros);
+				flush_dcache_page(page);
+				kunmap_atomic(userpage, KM_USER0);
+			}
+		}
+
+		ret = bio_add_page(cb->orig_bio, page,
+				   PAGE_CACHE_SIZE, 0);
+
+		if (ret == PAGE_CACHE_SIZE) {
+			nr_pages++;
+			page_cache_release(page);
+		} else {
+			unlock_extent(tree, last_offset, end, GFP_NOFS);
+			unlock_page(page);
+			page_cache_release(page);
+			break;
+		}
+next:
+		last_offset += PAGE_CACHE_SIZE;
+	}
+	if (pagevec_count(&pvec))
+		__pagevec_lru_add(&pvec);
+	return 0;
+}
+
 /*
  * for a compressed read, the bio we get passed has all the inode pages
  * in it.  We don't actually do IO on those pages but allocate new ones
@@ -373,6 +499,7 @@
 	struct block_device *bdev;
 	struct bio *comp_bio;
 	u64 cur_disk_byte = (u64)bio->bi_sector << 9;
+	u64 em_len;
 	struct extent_map *em;
 	int ret;
 
@@ -393,6 +520,7 @@
 
 	cb->start = em->start;
 	compressed_len = em->block_len;
+	em_len = em->len;
 	free_extent_map(em);
 
 	cb->len = uncompressed_len;
@@ -411,6 +539,17 @@
 	}
 	cb->nr_pages = nr_pages;
 
+	add_ra_bio_pages(inode, cb->start + em_len, cb);
+
+	if (!btrfs_test_opt(root, NODATASUM) &&
+	    !btrfs_test_flag(inode, NODATASUM)) {
+		btrfs_lookup_bio_sums(root, inode, cb->orig_bio);
+	}
+
+	/* include any pages we added in add_ra-bio_pages */
+	uncompressed_len = bio->bi_vcnt * PAGE_CACHE_SIZE;
+	cb->len = uncompressed_len;
+
 	comp_bio = compressed_bio_alloc(bdev, cur_disk_byte, GFP_NOFS);
 	comp_bio->bi_private = cb;
 	comp_bio->bi_end_io = end_compressed_bio_read;
@@ -442,9 +581,10 @@
 			comp_bio = compressed_bio_alloc(bdev, cur_disk_byte,
 							GFP_NOFS);
 			atomic_inc(&cb->pending_bios);
-			bio->bi_private = cb;
-			bio->bi_end_io = end_compressed_bio_write;
-			bio_add_page(bio, page, PAGE_CACHE_SIZE, 0);
+			comp_bio->bi_private = cb;
+			comp_bio->bi_end_io = end_compressed_bio_read;
+
+			bio_add_page(comp_bio, page, PAGE_CACHE_SIZE, 0);
 		}
 		cur_disk_byte += PAGE_CACHE_SIZE;
 	}
diff --git a/fs/btrfs/ctree.h b/fs/btrfs/ctree.h
index 689df07..c83cc5b 100644
--- a/fs/btrfs/ctree.h
+++ b/fs/btrfs/ctree.h
@@ -625,8 +625,8 @@
 	struct btrfs_transaction *running_transaction;
 	wait_queue_head_t transaction_throttle;
 	wait_queue_head_t transaction_wait;
-	wait_queue_head_t async_submit_wait;
 
+	wait_queue_head_t async_submit_wait;
 	wait_queue_head_t tree_log_wait;
 
 	struct btrfs_super_block super_copy;
@@ -653,6 +653,7 @@
 	atomic_t nr_async_submits;
 	atomic_t async_submit_draining;
 	atomic_t nr_async_bios;
+	atomic_t async_delalloc_pages;
 	atomic_t tree_log_writers;
 	atomic_t tree_log_commit;
 	unsigned long tree_log_batch;
@@ -677,6 +678,7 @@
 	 * two
 	 */
 	struct btrfs_workers workers;
+	struct btrfs_workers delalloc_workers;
 	struct btrfs_workers endio_workers;
 	struct btrfs_workers endio_write_workers;
 	struct btrfs_workers submit_workers;
diff --git a/fs/btrfs/disk-io.c b/fs/btrfs/disk-io.c
index e0a28f7..8efc123 100644
--- a/fs/btrfs/disk-io.c
+++ b/fs/btrfs/disk-io.c
@@ -539,6 +539,13 @@
 			   (atomic_read(&fs_info->nr_async_bios) < limit),
 			   HZ/10);
 	}
+
+	while(atomic_read(&fs_info->async_submit_draining) &&
+	      atomic_read(&fs_info->nr_async_submits)) {
+		wait_event(fs_info->async_submit_wait,
+			   (atomic_read(&fs_info->nr_async_submits) == 0));
+	}
+
 	return 0;
 }
 
@@ -1437,6 +1444,7 @@
 	INIT_LIST_HEAD(&fs_info->space_info);
 	btrfs_mapping_init(&fs_info->mapping_tree);
 	atomic_set(&fs_info->nr_async_submits, 0);
+	atomic_set(&fs_info->async_delalloc_pages, 0);
 	atomic_set(&fs_info->async_submit_draining, 0);
 	atomic_set(&fs_info->nr_async_bios, 0);
 	atomic_set(&fs_info->throttles, 0);
@@ -1550,6 +1558,9 @@
 	btrfs_init_workers(&fs_info->workers, "worker",
 			   fs_info->thread_pool_size);
 
+	btrfs_init_workers(&fs_info->delalloc_workers, "delalloc",
+			   fs_info->thread_pool_size);
+
 	btrfs_init_workers(&fs_info->submit_workers, "submit",
 			   min_t(u64, fs_devices->num_devices,
 			   fs_info->thread_pool_size));
@@ -1560,15 +1571,12 @@
 	 */
 	fs_info->submit_workers.idle_thresh = 64;
 
-	/* fs_info->workers is responsible for checksumming file data
-	 * blocks and metadata.  Using a larger idle thresh allows each
-	 * worker thread to operate on things in roughly the order they
-	 * were sent by the writeback daemons, improving overall locality
-	 * of the IO going down the pipe.
-	 */
-	fs_info->workers.idle_thresh = 8;
+	fs_info->workers.idle_thresh = 16;
 	fs_info->workers.ordered = 1;
 
+	fs_info->delalloc_workers.idle_thresh = 2;
+	fs_info->delalloc_workers.ordered = 1;
+
 	btrfs_init_workers(&fs_info->fixup_workers, "fixup", 1);
 	btrfs_init_workers(&fs_info->endio_workers, "endio",
 			   fs_info->thread_pool_size);
@@ -1584,6 +1592,7 @@
 
 	btrfs_start_workers(&fs_info->workers, 1);
 	btrfs_start_workers(&fs_info->submit_workers, 1);
+	btrfs_start_workers(&fs_info->delalloc_workers, 1);
 	btrfs_start_workers(&fs_info->fixup_workers, 1);
 	btrfs_start_workers(&fs_info->endio_workers, fs_info->thread_pool_size);
 	btrfs_start_workers(&fs_info->endio_write_workers,
@@ -1732,6 +1741,7 @@
 fail_sys_array:
 fail_sb_buffer:
 	btrfs_stop_workers(&fs_info->fixup_workers);
+	btrfs_stop_workers(&fs_info->delalloc_workers);
 	btrfs_stop_workers(&fs_info->workers);
 	btrfs_stop_workers(&fs_info->endio_workers);
 	btrfs_stop_workers(&fs_info->endio_write_workers);
@@ -1988,6 +1998,7 @@
 	truncate_inode_pages(fs_info->btree_inode->i_mapping, 0);
 
 	btrfs_stop_workers(&fs_info->fixup_workers);
+	btrfs_stop_workers(&fs_info->delalloc_workers);
 	btrfs_stop_workers(&fs_info->workers);
 	btrfs_stop_workers(&fs_info->endio_workers);
 	btrfs_stop_workers(&fs_info->endio_write_workers);
@@ -2062,7 +2073,7 @@
 	struct extent_io_tree *tree;
 	u64 num_dirty;
 	u64 start = 0;
-	unsigned long thresh = 96 * 1024 * 1024;
+	unsigned long thresh = 32 * 1024 * 1024;
 	tree = &BTRFS_I(root->fs_info->btree_inode)->io_tree;
 
 	if (current_is_pdflush() || current->flags & PF_MEMALLOC)
diff --git a/fs/btrfs/extent-tree.c b/fs/btrfs/extent-tree.c
index 8af3952..ebd8275 100644
--- a/fs/btrfs/extent-tree.c
+++ b/fs/btrfs/extent-tree.c
@@ -768,7 +768,11 @@
 	l = path->nodes[0];
 
 	btrfs_item_key_to_cpu(l, &key, path->slots[0]);
-	BUG_ON(key.objectid != bytenr);
+	if (key.objectid != bytenr) {
+		btrfs_print_leaf(root->fs_info->extent_root, path->nodes[0]);
+		printk("wanted %Lu found %Lu\n", bytenr, key.objectid);
+		BUG();
+	}
 	BUG_ON(key.type != BTRFS_EXTENT_ITEM_KEY);
 
 	item = btrfs_item_ptr(l, path->slots[0], struct btrfs_extent_item);
diff --git a/fs/btrfs/extent_io.c b/fs/btrfs/extent_io.c
index 9b37ce6..bbe3bcf 100644
--- a/fs/btrfs/extent_io.c
+++ b/fs/btrfs/extent_io.c
@@ -47,6 +47,11 @@
 	struct bio *bio;
 	struct extent_io_tree *tree;
 	get_extent_t *get_extent;
+
+	/* tells writepage not to lock the state bits for this range
+	 * it still does the unlocking
+	 */
+	int extent_locked;
 };
 
 int __init extent_io_init(void)
@@ -1198,11 +1203,18 @@
 			 * the caller is taking responsibility for
 			 * locked_page
 			 */
-			if (pages[i] != locked_page)
+			if (pages[i] != locked_page) {
 				lock_page(pages[i]);
+				if (pages[i]->mapping != inode->i_mapping) {
+					ret = -EAGAIN;
+					unlock_page(pages[i]);
+					page_cache_release(pages[i]);
+					goto done;
+				}
+			}
 			page_cache_release(pages[i]);
+			pages_locked++;
 		}
-		pages_locked += ret;
 		nrpages -= ret;
 		index += ret;
 		cond_resched();
@@ -1262,8 +1274,7 @@
 	 * if we're looping.
 	 */
 	if (delalloc_end + 1 - delalloc_start > max_bytes && loops) {
-		delalloc_end = (delalloc_start + PAGE_CACHE_SIZE - 1) &
-			~((u64)PAGE_CACHE_SIZE - 1);
+		delalloc_end = delalloc_start + PAGE_CACHE_SIZE - 1;
 	}
 	/* step two, lock all the pages after the page that has start */
 	ret = lock_delalloc_pages(inode, locked_page,
@@ -1306,7 +1317,10 @@
 int extent_clear_unlock_delalloc(struct inode *inode,
 				struct extent_io_tree *tree,
 				u64 start, u64 end, struct page *locked_page,
-				int clear_dirty, int set_writeback,
+				int unlock_pages,
+				int clear_unlock,
+				int clear_delalloc, int clear_dirty,
+				int set_writeback,
 				int end_writeback)
 {
 	int ret;
@@ -1315,12 +1329,19 @@
 	unsigned long end_index = end >> PAGE_CACHE_SHIFT;
 	unsigned long nr_pages = end_index - index + 1;
 	int i;
-	int clear_bits = EXTENT_LOCKED | EXTENT_DELALLOC;
+	int clear_bits = 0;
 
+	if (clear_unlock)
+		clear_bits |= EXTENT_LOCKED;
 	if (clear_dirty)
 		clear_bits |= EXTENT_DIRTY;
 
+	if (clear_delalloc)
+		clear_bits |= EXTENT_DELALLOC;
+
 	clear_extent_bit(tree, start, end, clear_bits, 1, 0, GFP_NOFS);
+	if (!(unlock_pages || clear_dirty || set_writeback || end_writeback))
+		return 0;
 
 	while(nr_pages > 0) {
 		ret = find_get_pages_contig(inode->i_mapping, index,
@@ -1336,7 +1357,8 @@
 				set_page_writeback(pages[i]);
 			if (end_writeback)
 				end_page_writeback(pages[i]);
-			unlock_page(pages[i]);
+			if (unlock_pages)
+				unlock_page(pages[i]);
 			page_cache_release(pages[i]);
 		}
 		nr_pages -= ret;
@@ -1741,9 +1763,10 @@
 			}
 		}
 
-		if (uptodate)
+		if (uptodate) {
 			set_extent_uptodate(tree, start, end,
 					    GFP_ATOMIC);
+		}
 		unlock_extent(tree, start, end, GFP_ATOMIC);
 
 		if (whole_page) {
@@ -1925,6 +1948,7 @@
 		set_page_private(page, EXTENT_PAGE_PRIVATE);
 	}
 }
+EXPORT_SYMBOL(set_page_extent_mapped);
 
 void set_page_extent_head(struct page *page, unsigned long len)
 {
@@ -2143,12 +2167,17 @@
 	u64 delalloc_end;
 	int page_started;
 	int compressed;
+	unsigned long nr_written = 0;
 
 	WARN_ON(!PageLocked(page));
 	pg_offset = i_size & (PAGE_CACHE_SIZE - 1);
 	if (page->index > end_index ||
 	   (page->index == end_index && !pg_offset)) {
-		page->mapping->a_ops->invalidatepage(page, 0);
+		if (epd->extent_locked) {
+			if (tree->ops && tree->ops->writepage_end_io_hook)
+				tree->ops->writepage_end_io_hook(page, start,
+							 page_end, NULL, 1);
+		}
 		unlock_page(page);
 		return 0;
 	}
@@ -2169,27 +2198,33 @@
 	delalloc_start = start;
 	delalloc_end = 0;
 	page_started = 0;
-	while(delalloc_end < page_end) {
-		nr_delalloc = find_lock_delalloc_range(inode, tree,
+	if (!epd->extent_locked) {
+		while(delalloc_end < page_end) {
+			nr_delalloc = find_lock_delalloc_range(inode, tree,
 						       page,
 						       &delalloc_start,
 						       &delalloc_end,
 						       128 * 1024 * 1024);
-		if (nr_delalloc == 0) {
+			if (nr_delalloc == 0) {
+				delalloc_start = delalloc_end + 1;
+				continue;
+			}
+			tree->ops->fill_delalloc(inode, page, delalloc_start,
+						 delalloc_end, &page_started,
+						 &nr_written);
 			delalloc_start = delalloc_end + 1;
-			continue;
 		}
-		tree->ops->fill_delalloc(inode, page, delalloc_start,
-					 delalloc_end, &page_started);
-		delalloc_start = delalloc_end + 1;
-	}
 
-	/* did the fill delalloc function already unlock and start the IO? */
-	if (page_started) {
-		return 0;
+		/* did the fill delalloc function already unlock and start
+		 * the IO?
+		 */
+		if (page_started) {
+			ret = 0;
+			goto update_nr_written;
+		}
 	}
-
 	lock_extent(tree, start, page_end, GFP_NOFS);
+
 	unlock_start = start;
 
 	if (tree->ops && tree->ops->writepage_start_hook) {
@@ -2199,10 +2234,13 @@
 			unlock_extent(tree, start, page_end, GFP_NOFS);
 			redirty_page_for_writepage(wbc, page);
 			unlock_page(page);
-			return 0;
+			ret = 0;
+			goto update_nr_written;
 		}
 	}
 
+	nr_written++;
+
 	end = page_end;
 	if (test_range_bit(tree, start, page_end, EXTENT_DELALLOC, 0)) {
 		printk("found delalloc bits after lock_extent\n");
@@ -2333,6 +2371,12 @@
 	if (unlock_start <= page_end)
 		unlock_extent(tree, unlock_start, page_end, GFP_NOFS);
 	unlock_page(page);
+
+update_nr_written:
+	wbc->nr_to_write -= nr_written;
+	if (wbc->range_cyclic || (wbc->nr_to_write > 0 &&
+	    wbc->range_start == 0 && wbc->range_end == LLONG_MAX))
+		page->mapping->writeback_index = page->index + nr_written;
 	return 0;
 }
 
@@ -2431,7 +2475,7 @@
 				unlock_page(page);
 				ret = 0;
 			}
-			if (ret || (--(wbc->nr_to_write) <= 0))
+			if (ret || wbc->nr_to_write <= 0)
 				done = 1;
 			if (wbc->nonblocking && bdi_write_congested(bdi)) {
 				wbc->encountered_congestion = 1;
@@ -2452,6 +2496,8 @@
 	}
 	if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
 		mapping->writeback_index = index;
+		if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
+			range_whole = 1;
 
 	if (wbc->range_cont)
 		wbc->range_start = index << PAGE_CACHE_SHIFT;
@@ -2469,6 +2515,7 @@
 		.bio = NULL,
 		.tree = tree,
 		.get_extent = get_extent,
+		.extent_locked = 0,
 	};
 	struct writeback_control wbc_writepages = {
 		.bdi		= wbc->bdi,
@@ -2491,6 +2538,52 @@
 }
 EXPORT_SYMBOL(extent_write_full_page);
 
+int extent_write_locked_range(struct extent_io_tree *tree, struct inode *inode,
+			      u64 start, u64 end, get_extent_t *get_extent,
+			      int mode)
+{
+	int ret = 0;
+	struct address_space *mapping = inode->i_mapping;
+	struct page *page;
+	unsigned long nr_pages = (end - start + PAGE_CACHE_SIZE) >>
+		PAGE_CACHE_SHIFT;
+
+	struct extent_page_data epd = {
+		.bio = NULL,
+		.tree = tree,
+		.get_extent = get_extent,
+		.extent_locked = 1,
+	};
+	struct writeback_control wbc_writepages = {
+		.bdi		= inode->i_mapping->backing_dev_info,
+		.sync_mode	= mode,
+		.older_than_this = NULL,
+		.nr_to_write	= nr_pages * 2,
+		.range_start	= start,
+		.range_end	= end + 1,
+	};
+
+	while(start <= end) {
+		page = find_get_page(mapping, start >> PAGE_CACHE_SHIFT);
+		if (clear_page_dirty_for_io(page))
+			ret = __extent_writepage(page, &wbc_writepages, &epd);
+		else {
+			if (tree->ops && tree->ops->writepage_end_io_hook)
+				tree->ops->writepage_end_io_hook(page, start,
+						 start + PAGE_CACHE_SIZE - 1,
+						 NULL, 1);
+			unlock_page(page);
+		}
+		page_cache_release(page);
+		start += PAGE_CACHE_SIZE;
+	}
+
+	if (epd.bio)
+		submit_one_bio(WRITE, epd.bio, 0, 0);
+	return ret;
+}
+EXPORT_SYMBOL(extent_write_locked_range);
+
 
 int extent_writepages(struct extent_io_tree *tree,
 		      struct address_space *mapping,
@@ -2502,6 +2595,7 @@
 		.bio = NULL,
 		.tree = tree,
 		.get_extent = get_extent,
+		.extent_locked = 0,
 	};
 
 	ret = extent_write_cache_pages(tree, mapping, wbc,
diff --git a/fs/btrfs/extent_io.h b/fs/btrfs/extent_io.h
index 283110e..2d5f670 100644
--- a/fs/btrfs/extent_io.h
+++ b/fs/btrfs/extent_io.h
@@ -35,7 +35,8 @@
 				       unsigned long bio_flags);
 struct extent_io_ops {
 	int (*fill_delalloc)(struct inode *inode, struct page *locked_page,
-			     u64 start, u64 end, int *page_started);
+			     u64 start, u64 end, int *page_started,
+			     unsigned long *nr_written);
 	int (*writepage_start_hook)(struct page *page, u64 start, u64 end);
 	int (*writepage_io_hook)(struct page *page, u64 start, u64 end);
 	extent_submit_bio_hook_t *submit_bio_hook;
@@ -172,6 +173,9 @@
 int extent_write_full_page(struct extent_io_tree *tree, struct page *page,
 			  get_extent_t *get_extent,
 			  struct writeback_control *wbc);
+int extent_write_locked_range(struct extent_io_tree *tree, struct inode *inode,
+			      u64 start, u64 end, get_extent_t *get_extent,
+			      int mode);
 int extent_writepages(struct extent_io_tree *tree,
 		      struct address_space *mapping,
 		      get_extent_t *get_extent,
@@ -256,6 +260,9 @@
 int extent_clear_unlock_delalloc(struct inode *inode,
 				struct extent_io_tree *tree,
 				u64 start, u64 end, struct page *locked_page,
-				int clear_dirty, int set_writeback,
-				int clear_writeback);
+				int unlock_page,
+				int clear_unlock,
+				int clear_delalloc, int clear_dirty,
+				int set_writeback,
+				int end_writeback);
 #endif
diff --git a/fs/btrfs/file.c b/fs/btrfs/file.c
index 0c8cc35..337221e 100644
--- a/fs/btrfs/file.c
+++ b/fs/btrfs/file.c
@@ -368,6 +368,8 @@
 	u64 search_start = start;
 	u64 leaf_start;
 	u64 ram_bytes = 0;
+	u64 orig_parent = 0;
+	u64 disk_bytenr = 0;
 	u8 compression;
 	u8 encryption;
 	u16 other_encoding = 0;
@@ -500,17 +502,31 @@
 				keep = 1;
 		}
 
-		if (bookend && found_extent && locked_end < extent_end) {
-			ret = try_lock_extent(&BTRFS_I(inode)->io_tree,
-					locked_end, extent_end - 1, GFP_NOFS);
-			if (!ret) {
-				btrfs_release_path(root, path);
-				lock_extent(&BTRFS_I(inode)->io_tree,
-					locked_end, extent_end - 1, GFP_NOFS);
+		if (bookend && found_extent) {
+			if (locked_end < extent_end) {
+				ret = try_lock_extent(&BTRFS_I(inode)->io_tree,
+						locked_end, extent_end - 1,
+						GFP_NOFS);
+				if (!ret) {
+					btrfs_release_path(root, path);
+					lock_extent(&BTRFS_I(inode)->io_tree,
+						locked_end, extent_end - 1,
+						GFP_NOFS);
+					locked_end = extent_end;
+					continue;
+				}
 				locked_end = extent_end;
-				continue;
 			}
-			locked_end = extent_end;
+			orig_parent = path->nodes[0]->start;
+			disk_bytenr = le64_to_cpu(old.disk_bytenr);
+			if (disk_bytenr != 0) {
+				ret = btrfs_inc_extent_ref(trans, root,
+					   disk_bytenr,
+					   le64_to_cpu(old.disk_num_bytes),
+					   orig_parent, root->root_key.objectid,
+					   trans->transid, inode->i_ino);
+				BUG_ON(ret);
+			}
 		}
 
 		if (found_inline) {
@@ -537,8 +553,12 @@
 					inode_sub_bytes(inode, old_num -
 							new_num);
 				}
-				btrfs_set_file_extent_num_bytes(leaf, extent,
-								new_num);
+				if (!compression && !encryption) {
+					btrfs_set_file_extent_ram_bytes(leaf,
+							extent, new_num);
+				}
+				btrfs_set_file_extent_num_bytes(leaf,
+							extent, new_num);
 				btrfs_mark_buffer_dirty(leaf);
 			} else if (key.offset < inline_limit &&
 				   (end > extent_end) &&
@@ -582,11 +602,11 @@
 		}
 		/* create bookend, splitting the extent in two */
 		if (bookend && found_extent) {
-			u64 disk_bytenr;
 			struct btrfs_key ins;
 			ins.objectid = inode->i_ino;
 			ins.offset = end;
 			btrfs_set_key_type(&ins, BTRFS_EXTENT_DATA_KEY);
+
 			btrfs_release_path(root, path);
 			ret = btrfs_insert_empty_item(trans, root, path, &ins,
 						      sizeof(*extent));
@@ -623,14 +643,13 @@
 
 			btrfs_mark_buffer_dirty(path->nodes[0]);
 
-			disk_bytenr = le64_to_cpu(old.disk_bytenr);
 			if (disk_bytenr != 0) {
-				ret = btrfs_inc_extent_ref(trans, root,
-						disk_bytenr,
-						le64_to_cpu(old.disk_num_bytes),
-						leaf->start,
+				ret = btrfs_update_extent_ref(trans, root,
+						disk_bytenr, orig_parent,
+					        leaf->start,
 						root->root_key.objectid,
 						trans->transid, ins.objectid);
+
 				BUG_ON(ret);
 			}
 			btrfs_release_path(root, path);
diff --git a/fs/btrfs/inode.c b/fs/btrfs/inode.c
index 3df0ffa..e01c0d0 100644
--- a/fs/btrfs/inode.c
+++ b/fs/btrfs/inode.c
@@ -86,6 +86,10 @@
 
 static void btrfs_truncate(struct inode *inode);
 static int btrfs_finish_ordered_io(struct inode *inode, u64 start, u64 end);
+static noinline int cow_file_range(struct inode *inode,
+				   struct page *locked_page,
+				   u64 start, u64 end, int *page_started,
+				   unsigned long *nr_written, int unlock);
 
 /*
  * a very lame attempt at stopping writes when the FS is 85% full.  There
@@ -262,35 +266,72 @@
 	return 0;
 }
 
+struct async_extent {
+	u64 start;
+	u64 ram_size;
+	u64 compressed_size;
+	struct page **pages;
+	unsigned long nr_pages;
+	struct list_head list;
+};
+
+struct async_cow {
+	struct inode *inode;
+	struct btrfs_root *root;
+	struct page *locked_page;
+	u64 start;
+	u64 end;
+	struct list_head extents;
+	struct btrfs_work work;
+};
+
+static noinline int add_async_extent(struct async_cow *cow,
+				     u64 start, u64 ram_size,
+				     u64 compressed_size,
+				     struct page **pages,
+				     unsigned long nr_pages)
+{
+	struct async_extent *async_extent;
+
+	async_extent = kmalloc(sizeof(*async_extent), GFP_NOFS);
+	async_extent->start = start;
+	async_extent->ram_size = ram_size;
+	async_extent->compressed_size = compressed_size;
+	async_extent->pages = pages;
+	async_extent->nr_pages = nr_pages;
+	list_add_tail(&async_extent->list, &cow->extents);
+	return 0;
+}
+
 /*
- * when extent_io.c finds a delayed allocation range in the file,
- * the call backs end up in this code.  The basic idea is to
- * allocate extents on disk for the range, and create ordered data structs
- * in ram to track those extents.
+ * we create compressed extents in two phases.  The first
+ * phase compresses a range of pages that have already been
+ * locked (both pages and state bits are locked).
  *
- * locked_page is the page that writepage had locked already.  We use
- * it to make sure we don't do extra locks or unlocks.
+ * This is done inside an ordered work queue, and the compression
+ * is spread across many cpus.  The actual IO submission is step
+ * two, and the ordered work queue takes care of making sure that
+ * happens in the same order things were put onto the queue by
+ * writepages and friends.
  *
- * *page_started is set to one if we unlock locked_page and do everything
- * required to start IO on it.  It may be clean and already done with
- * IO when we return.
+ * If this code finds it can't get good compression, it puts an
+ * entry onto the work queue to write the uncompressed bytes.  This
+ * makes sure that both compressed inodes and uncompressed inodes
+ * are written in the same order that pdflush sent them down.
  */
-static int cow_file_range(struct inode *inode, struct page *locked_page,
-			  u64 start, u64 end, int *page_started)
+static noinline int compress_file_range(struct inode *inode,
+					struct page *locked_page,
+					u64 start, u64 end,
+					struct async_cow *async_cow,
+					int *num_added)
 {
 	struct btrfs_root *root = BTRFS_I(inode)->root;
 	struct btrfs_trans_handle *trans;
-	u64 alloc_hint = 0;
 	u64 num_bytes;
-	unsigned long ram_size;
 	u64 orig_start;
 	u64 disk_num_bytes;
-	u64 cur_alloc_size;
 	u64 blocksize = root->sectorsize;
 	u64 actual_end;
-	struct btrfs_key ins;
-	struct extent_map *em;
-	struct extent_map_tree *em_tree = &BTRFS_I(inode)->extent_tree;
 	int ret = 0;
 	struct page **pages = NULL;
 	unsigned long nr_pages;
@@ -298,22 +339,12 @@
 	unsigned long total_compressed = 0;
 	unsigned long total_in = 0;
 	unsigned long max_compressed = 128 * 1024;
-	unsigned long max_uncompressed = 256 * 1024;
+	unsigned long max_uncompressed = 128 * 1024;
 	int i;
-	int ordered_type;
 	int will_compress;
 
-	trans = btrfs_join_transaction(root, 1);
-	BUG_ON(!trans);
-	btrfs_set_trans_block_group(trans, inode);
 	orig_start = start;
 
-	/*
-	 * compression made this loop a bit ugly, but the basic idea is to
-	 * compress some pages but keep the total size of the compressed
-	 * extent relatively small.  If compression is off, this goto target
-	 * is never used.
-	 */
 again:
 	will_compress = 0;
 	nr_pages = (end >> PAGE_CACHE_SHIFT) - (start >> PAGE_CACHE_SHIFT) + 1;
@@ -324,7 +355,13 @@
 
 	/* we want to make sure that amount of ram required to uncompress
 	 * an extent is reasonable, so we limit the total size in ram
-	 * of a compressed extent to 256k
+	 * of a compressed extent to 128k.  This is a crucial number
+	 * because it also controls how easily we can spread reads across
+	 * cpus for decompression.
+	 *
+	 * We also want to make sure the amount of IO required to do
+	 * a random read is reasonably small, so we limit the size of
+	 * a compressed extent to 128k.
 	 */
 	total_compressed = min(total_compressed, max_uncompressed);
 	num_bytes = (end - start + blocksize) & ~(blocksize - 1);
@@ -333,18 +370,16 @@
 	total_in = 0;
 	ret = 0;
 
-	/* we do compression for mount -o compress and when the
-	 * inode has not been flagged as nocompress
+	/*
+	 * we do compression for mount -o compress and when the
+	 * inode has not been flagged as nocompress.  This flag can
+	 * change at any time if we discover bad compression ratios.
 	 */
 	if (!btrfs_test_flag(inode, NOCOMPRESS) &&
 	    btrfs_test_opt(root, COMPRESS)) {
 		WARN_ON(pages);
 		pages = kzalloc(sizeof(struct page *) * nr_pages, GFP_NOFS);
 
-		/* we want to make sure the amount of IO required to satisfy
-		 * a random read is reasonably small, so we limit the size
-		 * of a compressed extent to 128k
-		 */
 		ret = btrfs_zlib_compress_pages(inode->i_mapping, start,
 						total_compressed, pages,
 						nr_pages, &nr_pages_ret,
@@ -371,26 +406,34 @@
 		}
 	}
 	if (start == 0) {
+		trans = btrfs_join_transaction(root, 1);
+		BUG_ON(!trans);
+		btrfs_set_trans_block_group(trans, inode);
+
 		/* lets try to make an inline extent */
-		if (ret || total_in < (end - start + 1)) {
+		if (ret || total_in < (actual_end - start)) {
 			/* we didn't compress the entire range, try
-			 * to make an uncompressed inline extent.  This
-			 * is almost sure to fail, but maybe inline sizes
-			 * will get bigger later
+			 * to make an uncompressed inline extent.
 			 */
 			ret = cow_file_range_inline(trans, root, inode,
 						    start, end, 0, NULL);
 		} else {
+			/* try making a compressed inline extent */
 			ret = cow_file_range_inline(trans, root, inode,
 						    start, end,
 						    total_compressed, pages);
 		}
+		btrfs_end_transaction(trans, root);
 		if (ret == 0) {
+			/*
+			 * inline extent creation worked, we don't need
+			 * to create any more async work items.  Unlock
+			 * and free up our temp pages.
+			 */
 			extent_clear_unlock_delalloc(inode,
 						     &BTRFS_I(inode)->io_tree,
-						     start, end, NULL,
-						     1, 1, 1);
-			*page_started = 1;
+						     start, end, NULL, 1, 0,
+						     0, 1, 1, 1);
 			ret = 0;
 			goto free_pages_out;
 		}
@@ -435,6 +478,255 @@
 		/* flag the file so we don't compress in the future */
 		btrfs_set_flag(inode, NOCOMPRESS);
 	}
+	if (will_compress) {
+		*num_added += 1;
+
+		/* the async work queues will take care of doing actual
+		 * allocation on disk for these compressed pages,
+		 * and will submit them to the elevator.
+		 */
+		add_async_extent(async_cow, start, num_bytes,
+				 total_compressed, pages, nr_pages_ret);
+
+		if (start + num_bytes < end) {
+			start += num_bytes;
+			pages = NULL;
+			cond_resched();
+			goto again;
+		}
+	} else {
+		/*
+		 * No compression, but we still need to write the pages in
+		 * the file we've been given so far.  redirty the locked
+		 * page if it corresponds to our extent and set things up
+		 * for the async work queue to run cow_file_range to do
+		 * the normal delalloc dance
+		 */
+		if (page_offset(locked_page) >= start &&
+		    page_offset(locked_page) <= end) {
+			__set_page_dirty_nobuffers(locked_page);
+			/* unlocked later on in the async handlers */
+		}
+		add_async_extent(async_cow, start, end - start + 1, 0, NULL, 0);
+		*num_added += 1;
+	}
+
+out:
+	return 0;
+
+free_pages_out:
+	for (i = 0; i < nr_pages_ret; i++) {
+		WARN_ON(pages[i]->mapping);
+		page_cache_release(pages[i]);
+	}
+	if (pages)
+		kfree(pages);
+
+	goto out;
+}
+
+/*
+ * phase two of compressed writeback.  This is the ordered portion
+ * of the code, which only gets called in the order the work was
+ * queued.  We walk all the async extents created by compress_file_range
+ * and send them down to the disk.
+ */
+static noinline int submit_compressed_extents(struct inode *inode,
+					      struct async_cow *async_cow)
+{
+	struct async_extent *async_extent;
+	u64 alloc_hint = 0;
+	struct btrfs_trans_handle *trans;
+	struct btrfs_key ins;
+	struct extent_map *em;
+	struct btrfs_root *root = BTRFS_I(inode)->root;
+	struct extent_map_tree *em_tree = &BTRFS_I(inode)->extent_tree;
+	struct extent_io_tree *io_tree;
+	int ret;
+
+	if (list_empty(&async_cow->extents))
+		return 0;
+
+	trans = btrfs_join_transaction(root, 1);
+
+	while(!list_empty(&async_cow->extents)) {
+		async_extent = list_entry(async_cow->extents.next,
+					  struct async_extent, list);
+		list_del(&async_extent->list);
+
+		io_tree = &BTRFS_I(inode)->io_tree;
+
+		/* did the compression code fall back to uncompressed IO? */
+		if (!async_extent->pages) {
+			int page_started = 0;
+			unsigned long nr_written = 0;
+
+			lock_extent(io_tree, async_extent->start,
+				    async_extent->start + async_extent->ram_size - 1,
+				    GFP_NOFS);
+
+			/* allocate blocks */
+			cow_file_range(inode, async_cow->locked_page,
+				       async_extent->start,
+				       async_extent->start +
+				       async_extent->ram_size - 1,
+				       &page_started, &nr_written, 0);
+
+			/*
+			 * if page_started, cow_file_range inserted an
+			 * inline extent and took care of all the unlocking
+			 * and IO for us.  Otherwise, we need to submit
+			 * all those pages down to the drive.
+			 */
+			if (!page_started)
+				extent_write_locked_range(io_tree,
+						  inode, async_extent->start,
+					          async_extent->start +
+						  async_extent->ram_size - 1,
+						  btrfs_get_extent,
+						  WB_SYNC_ALL);
+			kfree(async_extent);
+			cond_resched();
+			continue;
+		}
+
+		lock_extent(io_tree, async_extent->start,
+			    async_extent->start + async_extent->ram_size - 1,
+			    GFP_NOFS);
+		/*
+		 * here we're doing allocation and writeback of the
+		 * compressed pages
+		 */
+		btrfs_drop_extent_cache(inode, async_extent->start,
+					async_extent->start +
+					async_extent->ram_size - 1, 0);
+
+		ret = btrfs_reserve_extent(trans, root,
+					   async_extent->compressed_size,
+					   async_extent->compressed_size,
+					   0, alloc_hint,
+					   (u64)-1, &ins, 1);
+		BUG_ON(ret);
+		em = alloc_extent_map(GFP_NOFS);
+		em->start = async_extent->start;
+		em->len = async_extent->ram_size;
+
+		em->block_start = ins.objectid;
+		em->block_len = ins.offset;
+		em->bdev = root->fs_info->fs_devices->latest_bdev;
+		set_bit(EXTENT_FLAG_PINNED, &em->flags);
+		set_bit(EXTENT_FLAG_COMPRESSED, &em->flags);
+
+		while(1) {
+			spin_lock(&em_tree->lock);
+			ret = add_extent_mapping(em_tree, em);
+			spin_unlock(&em_tree->lock);
+			if (ret != -EEXIST) {
+				free_extent_map(em);
+				break;
+			}
+			btrfs_drop_extent_cache(inode, async_extent->start,
+						async_extent->start +
+						async_extent->ram_size - 1, 0);
+		}
+
+		ret = btrfs_add_ordered_extent(inode, async_extent->start,
+					       ins.objectid,
+					       async_extent->ram_size,
+					       ins.offset,
+					       BTRFS_ORDERED_COMPRESSED);
+		BUG_ON(ret);
+
+		btrfs_end_transaction(trans, root);
+
+		/*
+		 * clear dirty, set writeback and unlock the pages.
+		 */
+		extent_clear_unlock_delalloc(inode,
+					     &BTRFS_I(inode)->io_tree,
+					     async_extent->start,
+					     async_extent->start +
+					     async_extent->ram_size - 1,
+					     NULL, 1, 1, 0, 1, 1, 0);
+
+		ret = btrfs_submit_compressed_write(inode,
+				         async_extent->start,
+					 async_extent->ram_size,
+					 ins.objectid,
+					 ins.offset, async_extent->pages,
+					 async_extent->nr_pages);
+
+		BUG_ON(ret);
+		trans = btrfs_join_transaction(root, 1);
+		alloc_hint = ins.objectid + ins.offset;
+		kfree(async_extent);
+		cond_resched();
+	}
+
+	btrfs_end_transaction(trans, root);
+	return 0;
+}
+
+/*
+ * when extent_io.c finds a delayed allocation range in the file,
+ * the call backs end up in this code.  The basic idea is to
+ * allocate extents on disk for the range, and create ordered data structs
+ * in ram to track those extents.
+ *
+ * locked_page is the page that writepage had locked already.  We use
+ * it to make sure we don't do extra locks or unlocks.
+ *
+ * *page_started is set to one if we unlock locked_page and do everything
+ * required to start IO on it.  It may be clean and already done with
+ * IO when we return.
+ */
+static noinline int cow_file_range(struct inode *inode,
+				   struct page *locked_page,
+				   u64 start, u64 end, int *page_started,
+				   unsigned long *nr_written,
+				   int unlock)
+{
+	struct btrfs_root *root = BTRFS_I(inode)->root;
+	struct btrfs_trans_handle *trans;
+	u64 alloc_hint = 0;
+	u64 num_bytes;
+	unsigned long ram_size;
+	u64 disk_num_bytes;
+	u64 cur_alloc_size;
+	u64 blocksize = root->sectorsize;
+	u64 actual_end;
+	struct btrfs_key ins;
+	struct extent_map *em;
+	struct extent_map_tree *em_tree = &BTRFS_I(inode)->extent_tree;
+	int ret = 0;
+
+	trans = btrfs_join_transaction(root, 1);
+	BUG_ON(!trans);
+	btrfs_set_trans_block_group(trans, inode);
+
+	actual_end = min_t(u64, i_size_read(inode), end + 1);
+
+	num_bytes = (end - start + blocksize) & ~(blocksize - 1);
+	num_bytes = max(blocksize,  num_bytes);
+	disk_num_bytes = num_bytes;
+	ret = 0;
+
+	if (start == 0) {
+		/* lets try to make an inline extent */
+		ret = cow_file_range_inline(trans, root, inode,
+					    start, end, 0, NULL);
+		if (ret == 0) {
+			extent_clear_unlock_delalloc(inode,
+						     &BTRFS_I(inode)->io_tree,
+						     start, end, NULL, 1, 1,
+						     1, 1, 1, 1);
+			*nr_written = *nr_written +
+			     (end - start + PAGE_CACHE_SIZE) / PAGE_CACHE_SIZE;
+			*page_started = 1;
+			ret = 0;
+			goto out;
+		}
+	}
 
 	BUG_ON(disk_num_bytes >
 	       btrfs_super_total_bytes(&root->fs_info->super_copy));
@@ -442,46 +734,24 @@
 	btrfs_drop_extent_cache(inode, start, start + num_bytes - 1, 0);
 
 	while(disk_num_bytes > 0) {
-		unsigned long min_bytes;
-
-		/*
-		 * the max size of a compressed extent is pretty small,
-		 * make the code a little less complex by forcing
-		 * the allocator to find a whole compressed extent at once
-		 */
-		if (will_compress)
-			min_bytes = disk_num_bytes;
-		else
-			min_bytes = root->sectorsize;
-
 		cur_alloc_size = min(disk_num_bytes, root->fs_info->max_extent);
 		ret = btrfs_reserve_extent(trans, root, cur_alloc_size,
-					   min_bytes, 0, alloc_hint,
+					   root->sectorsize, 0, alloc_hint,
 					   (u64)-1, &ins, 1);
 		if (ret) {
-			WARN_ON(1);
-			goto free_pages_out_fail;
+			BUG();
 		}
 		em = alloc_extent_map(GFP_NOFS);
 		em->start = start;
 
-		if (will_compress) {
-			ram_size = num_bytes;
-			em->len = num_bytes;
-		} else {
-			/* ramsize == disk size */
-			ram_size = ins.offset;
-			em->len = ins.offset;
-		}
+		ram_size = ins.offset;
+		em->len = ins.offset;
 
 		em->block_start = ins.objectid;
 		em->block_len = ins.offset;
 		em->bdev = root->fs_info->fs_devices->latest_bdev;
 		set_bit(EXTENT_FLAG_PINNED, &em->flags);
 
-		if (will_compress)
-			set_bit(EXTENT_FLAG_COMPRESSED, &em->flags);
-
 		while(1) {
 			spin_lock(&em_tree->lock);
 			ret = add_extent_mapping(em_tree, em);
@@ -495,10 +765,8 @@
 		}
 
 		cur_alloc_size = ins.offset;
-		ordered_type = will_compress ? BTRFS_ORDERED_COMPRESSED : 0;
 		ret = btrfs_add_ordered_extent(inode, start, ins.objectid,
-					       ram_size, cur_alloc_size,
-					       ordered_type);
+					       ram_size, cur_alloc_size, 0);
 		BUG_ON(ret);
 
 		if (disk_num_bytes < cur_alloc_size) {
@@ -506,82 +774,145 @@
 			       cur_alloc_size);
 			break;
 		}
-
-		if (will_compress) {
-			/*
-			 * we're doing compression, we and we need to
-			 * submit the compressed extents down to the device.
-			 *
-			 * We lock down all the file pages, clearing their
-			 * dirty bits and setting them writeback.  Everyone
-			 * that wants to modify the page will wait on the
-			 * ordered extent above.
-			 *
-			 * The writeback bits on the file pages are
-			 * cleared when the compressed pages are on disk
-			 */
-			btrfs_end_transaction(trans, root);
-
-			if (start <= page_offset(locked_page) &&
-			    page_offset(locked_page) < start + ram_size) {
-				*page_started = 1;
-			}
-
-			extent_clear_unlock_delalloc(inode,
-						     &BTRFS_I(inode)->io_tree,
-						     start,
-						     start + ram_size - 1,
-						     NULL, 1, 1, 0);
-
-			ret = btrfs_submit_compressed_write(inode, start,
-						 ram_size, ins.objectid,
-						 cur_alloc_size, pages,
-						 nr_pages_ret);
-
-			BUG_ON(ret);
-			trans = btrfs_join_transaction(root, 1);
-			if (start + ram_size < end) {
-				start += ram_size;
-				alloc_hint = ins.objectid + ins.offset;
-				/* pages will be freed at end_bio time */
-				pages = NULL;
-				goto again;
-			} else {
-				/* we've written everything, time to go */
-				break;
-			}
-		}
 		/* we're not doing compressed IO, don't unlock the first
 		 * page (which the caller expects to stay locked), don't
 		 * clear any dirty bits and don't set any writeback bits
 		 */
 		extent_clear_unlock_delalloc(inode, &BTRFS_I(inode)->io_tree,
 					     start, start + ram_size - 1,
-					     locked_page, 0, 0, 0);
+					     locked_page, unlock, 1,
+					     1, 0, 0, 0);
 		disk_num_bytes -= cur_alloc_size;
 		num_bytes -= cur_alloc_size;
 		alloc_hint = ins.objectid + ins.offset;
 		start += cur_alloc_size;
 	}
-
-	ret = 0;
 out:
+	ret = 0;
 	btrfs_end_transaction(trans, root);
 
 	return ret;
+}
 
-free_pages_out_fail:
-	extent_clear_unlock_delalloc(inode, &BTRFS_I(inode)->io_tree,
-				     start, end, locked_page, 0, 0, 0);
-free_pages_out:
-	for (i = 0; i < nr_pages_ret; i++) {
-		WARN_ON(pages[i]->mapping);
-		page_cache_release(pages[i]);
+/*
+ * work queue call back to started compression on a file and pages
+ */
+static noinline void async_cow_start(struct btrfs_work *work)
+{
+	struct async_cow *async_cow;
+	int num_added = 0;
+	async_cow = container_of(work, struct async_cow, work);
+
+	compress_file_range(async_cow->inode, async_cow->locked_page,
+			    async_cow->start, async_cow->end, async_cow,
+			    &num_added);
+	if (num_added == 0)
+		async_cow->inode = NULL;
+}
+
+/*
+ * work queue call back to submit previously compressed pages
+ */
+static noinline void async_cow_submit(struct btrfs_work *work)
+{
+	struct async_cow *async_cow;
+	struct btrfs_root *root;
+	unsigned long nr_pages;
+
+	async_cow = container_of(work, struct async_cow, work);
+
+	root = async_cow->root;
+	nr_pages = (async_cow->end - async_cow->start + PAGE_CACHE_SIZE) >>
+		PAGE_CACHE_SHIFT;
+
+	atomic_sub(nr_pages, &root->fs_info->async_delalloc_pages);
+
+	if (atomic_read(&root->fs_info->async_delalloc_pages) <
+	    5 * 1042 * 1024 &&
+	    waitqueue_active(&root->fs_info->async_submit_wait))
+		wake_up(&root->fs_info->async_submit_wait);
+
+	if (async_cow->inode) {
+		submit_compressed_extents(async_cow->inode, async_cow);
 	}
-	if (pages)
-		kfree(pages);
+}
 
-	goto out;
+static noinline void async_cow_free(struct btrfs_work *work)
+{
+	struct async_cow *async_cow;
+	async_cow = container_of(work, struct async_cow, work);
+	kfree(async_cow);
+}
+
+static int cow_file_range_async(struct inode *inode, struct page *locked_page,
+				u64 start, u64 end, int *page_started,
+				unsigned long *nr_written)
+{
+	struct async_cow *async_cow;
+	struct btrfs_root *root = BTRFS_I(inode)->root;
+	unsigned long nr_pages;
+	u64 cur_end;
+	int limit = 10 * 1024 * 1042;
+
+	if (!btrfs_test_opt(root, COMPRESS)) {
+		return cow_file_range(inode, locked_page, start, end,
+				      page_started, nr_written, 1);
+	}
+
+	clear_extent_bit(&BTRFS_I(inode)->io_tree, start, end, EXTENT_LOCKED |
+			 EXTENT_DELALLOC, 1, 0, GFP_NOFS);
+	while(start < end) {
+		async_cow = kmalloc(sizeof(*async_cow), GFP_NOFS);
+		async_cow->inode = inode;
+		async_cow->root = root;
+		async_cow->locked_page = locked_page;
+		async_cow->start = start;
+
+		if (btrfs_test_flag(inode, NOCOMPRESS))
+			cur_end = end;
+		else
+			cur_end = min(end, start + 512 * 1024 - 1);
+
+		async_cow->end = cur_end;
+		INIT_LIST_HEAD(&async_cow->extents);
+
+		async_cow->work.func = async_cow_start;
+		async_cow->work.ordered_func = async_cow_submit;
+		async_cow->work.ordered_free = async_cow_free;
+		async_cow->work.flags = 0;
+
+		while(atomic_read(&root->fs_info->async_submit_draining) &&
+		      atomic_read(&root->fs_info->async_delalloc_pages)) {
+			wait_event(root->fs_info->async_submit_wait,
+			     (atomic_read(&root->fs_info->async_delalloc_pages)
+			      == 0));
+		}
+
+		nr_pages = (cur_end - start + PAGE_CACHE_SIZE) >>
+			PAGE_CACHE_SHIFT;
+		atomic_add(nr_pages, &root->fs_info->async_delalloc_pages);
+
+		btrfs_queue_worker(&root->fs_info->delalloc_workers,
+				   &async_cow->work);
+
+		if (atomic_read(&root->fs_info->async_delalloc_pages) > limit) {
+			wait_event(root->fs_info->async_submit_wait,
+			   (atomic_read(&root->fs_info->async_delalloc_pages) <
+			    limit));
+		}
+
+		while(atomic_read(&root->fs_info->async_submit_draining) &&
+		      atomic_read(&root->fs_info->async_delalloc_pages)) {
+			wait_event(root->fs_info->async_submit_wait,
+			  (atomic_read(&root->fs_info->async_delalloc_pages) ==
+			   0));
+		}
+
+		*nr_written += nr_pages;
+		start = cur_end + 1;
+	}
+	*page_started = 1;
+	return 0;
 }
 
 /*
@@ -592,7 +923,8 @@
  * blocks on disk
  */
 static int run_delalloc_nocow(struct inode *inode, struct page *locked_page,
-			      u64 start, u64 end, int *page_started, int force)
+			      u64 start, u64 end, int *page_started, int force,
+			      unsigned long *nr_written)
 {
 	struct btrfs_root *root = BTRFS_I(inode)->root;
 	struct btrfs_trans_handle *trans;
@@ -711,7 +1043,8 @@
 		btrfs_release_path(root, path);
 		if (cow_start != (u64)-1) {
 			ret = cow_file_range(inode, locked_page, cow_start,
-					found_key.offset - 1, page_started);
+					found_key.offset - 1, page_started,
+					nr_written, 1);
 			BUG_ON(ret);
 			cow_start = (u64)-1;
 		}
@@ -748,9 +1081,10 @@
 		ret = btrfs_add_ordered_extent(inode, cur_offset, disk_bytenr,
 					       num_bytes, num_bytes, type);
 		BUG_ON(ret);
+
 		extent_clear_unlock_delalloc(inode, &BTRFS_I(inode)->io_tree,
 					cur_offset, cur_offset + num_bytes - 1,
-					locked_page, 0, 0, 0);
+					locked_page, 1, 1, 1, 0, 0, 0);
 		cur_offset = extent_end;
 		if (cur_offset > end)
 			break;
@@ -761,7 +1095,7 @@
 		cow_start = cur_offset;
 	if (cow_start != (u64)-1) {
 		ret = cow_file_range(inode, locked_page, cow_start, end,
-				     page_started);
+				     page_started, nr_written, 1);
 		BUG_ON(ret);
 	}
 
@@ -775,7 +1109,8 @@
  * extent_io.c call back to do delayed allocation processing
  */
 static int run_delalloc_range(struct inode *inode, struct page *locked_page,
-			      u64 start, u64 end, int *page_started)
+			      u64 start, u64 end, int *page_started,
+			      unsigned long *nr_written)
 {
 	struct btrfs_root *root = BTRFS_I(inode)->root;
 	int ret;
@@ -783,13 +1118,13 @@
 	if (btrfs_test_opt(root, NODATACOW) ||
 	    btrfs_test_flag(inode, NODATACOW))
 		ret = run_delalloc_nocow(inode, locked_page, start, end,
-					 page_started, 0);
+					 page_started, 0, nr_written);
 	else if (btrfs_test_flag(inode, PREALLOC))
 		ret = run_delalloc_nocow(inode, locked_page, start, end,
-					 page_started, 1);
+					 page_started, 1, nr_written);
 	else
-		ret = cow_file_range(inode, locked_page, start, end,
-				     page_started);
+		ret = cow_file_range_async(inode, locked_page, start, end,
+				     page_started, nr_written);
 
 	return ret;
 }
@@ -861,6 +1196,9 @@
 	u64 map_length;
 	int ret;
 
+	if (bio_flags & EXTENT_BIO_COMPRESSED)
+		return 0;
+
 	length = bio->bi_size;
 	map_tree = &root->fs_info->mapping_tree;
 	map_length = length;
@@ -925,12 +1263,12 @@
 		btrfs_test_flag(inode, NODATASUM);
 
 	if (!(rw & (1 << BIO_RW))) {
-		if (!skip_sum)
-			btrfs_lookup_bio_sums(root, inode, bio);
 
 		if (bio_flags & EXTENT_BIO_COMPRESSED)
 			return btrfs_submit_compressed_read(inode, bio,
 						    mirror_num, bio_flags);
+		else if (!skip_sum)
+			btrfs_lookup_bio_sums(root, inode, bio);
 		goto mapit;
 	} else if (!skip_sum) {
 		/* we're doing a write, do the async checksumming */
@@ -966,6 +1304,9 @@
 
 int btrfs_set_extent_delalloc(struct inode *inode, u64 start, u64 end)
 {
+	if ((end & (PAGE_CACHE_SIZE - 1)) == 0) {
+		WARN_ON(1);
+	}
 	return set_extent_delalloc(&BTRFS_I(inode)->io_tree, start, end,
 				   GFP_NOFS);
 }
@@ -2105,6 +2446,7 @@
 	int pending_del_nr = 0;
 	int pending_del_slot = 0;
 	int extent_type = -1;
+	int encoding;
 	u64 mask = root->sectorsize - 1;
 
 	if (root->ref_cows)
@@ -2144,6 +2486,7 @@
 		leaf = path->nodes[0];
 		btrfs_item_key_to_cpu(leaf, &found_key, path->slots[0]);
 		found_type = btrfs_key_type(&found_key);
+		encoding = 0;
 
 		if (found_key.objectid != inode->i_ino)
 			break;
@@ -2156,6 +2499,10 @@
 			fi = btrfs_item_ptr(leaf, path->slots[0],
 					    struct btrfs_file_extent_item);
 			extent_type = btrfs_file_extent_type(leaf, fi);
+			encoding = btrfs_file_extent_compression(leaf, fi);
+			encoding |= btrfs_file_extent_encryption(leaf, fi);
+			encoding |= btrfs_file_extent_other_encoding(leaf, fi);
+
 			if (extent_type != BTRFS_FILE_EXTENT_INLINE) {
 				item_end +=
 				    btrfs_file_extent_num_bytes(leaf, fi);
@@ -2200,7 +2547,7 @@
 		if (extent_type != BTRFS_FILE_EXTENT_INLINE) {
 			u64 num_dec;
 			extent_start = btrfs_file_extent_disk_bytenr(leaf, fi);
-			if (!del_item) {
+			if (!del_item && !encoding) {
 				u64 orig_num_bytes =
 					btrfs_file_extent_num_bytes(leaf, fi);
 				extent_num_bytes = new_size -
@@ -2436,7 +2783,14 @@
 		last_byte = min(extent_map_end(em), block_end);
 		last_byte = (last_byte + mask) & ~mask;
 		if (test_bit(EXTENT_FLAG_VACANCY, &em->flags)) {
+			u64 hint_byte = 0;
 			hole_size = last_byte - cur_offset;
+			err = btrfs_drop_extents(trans, root, inode,
+						 cur_offset,
+						 cur_offset + hole_size,
+						 cur_offset, &hint_byte);
+			if (err)
+				break;
 			err = btrfs_insert_file_extent(trans, root,
 					inode->i_ino, cur_offset, 0,
 					0, hole_size, 0, hole_size,
@@ -3785,6 +4139,7 @@
 		     struct writeback_control *wbc)
 {
 	struct extent_io_tree *tree;
+
 	tree = &BTRFS_I(mapping->host)->io_tree;
 	return extent_writepages(tree, mapping, btrfs_get_extent, wbc);
 }
@@ -4285,9 +4640,11 @@
 	 * ordered extents get created before we return
 	 */
 	atomic_inc(&root->fs_info->async_submit_draining);
-	while(atomic_read(&root->fs_info->nr_async_submits)) {
+	while(atomic_read(&root->fs_info->nr_async_submits) ||
+	      atomic_read(&root->fs_info->async_delalloc_pages)) {
 		wait_event(root->fs_info->async_submit_wait,
-		   (atomic_read(&root->fs_info->nr_async_submits) == 0));
+		   (atomic_read(&root->fs_info->nr_async_submits) == 0 &&
+		    atomic_read(&root->fs_info->async_delalloc_pages) == 0));
 	}
 	atomic_dec(&root->fs_info->async_submit_draining);
 	return 0;
diff --git a/fs/btrfs/ordered-data.c b/fs/btrfs/ordered-data.c
index 370bb42..027ad6b 100644
--- a/fs/btrfs/ordered-data.c
+++ b/fs/btrfs/ordered-data.c
@@ -390,7 +390,7 @@
 	 * start IO on any dirty ones so the wait doesn't stall waiting
 	 * for pdflush to find them
 	 */
-	btrfs_fdatawrite_range(inode->i_mapping, start, end, WB_SYNC_NONE);
+	btrfs_fdatawrite_range(inode->i_mapping, start, end, WB_SYNC_ALL);
 	if (wait) {
 		wait_event(entry->wait, test_bit(BTRFS_ORDERED_COMPLETE,
 						 &entry->flags));
@@ -421,6 +421,12 @@
 	 */
 	btrfs_fdatawrite_range(inode->i_mapping, start, orig_end, WB_SYNC_NONE);
 
+	/* The compression code will leave pages locked but return from
+	 * writepage without setting the page writeback.  Starting again
+	 * with WB_SYNC_ALL will end up waiting for the IO to actually start.
+	 */
+	btrfs_fdatawrite_range(inode->i_mapping, start, orig_end, WB_SYNC_ALL);
+
 	btrfs_wait_on_page_writeback_range(inode->i_mapping,
 					   start >> PAGE_CACHE_SHIFT,
 					   orig_end >> PAGE_CACHE_SHIFT);
@@ -448,10 +454,7 @@
 	}
 	if (test_range_bit(&BTRFS_I(inode)->io_tree, start, orig_end,
 			   EXTENT_ORDERED | EXTENT_DELALLOC, 0)) {
-		printk("inode %lu still ordered or delalloc after wait "
-		       "%llu %llu\n", inode->i_ino,
-		       (unsigned long long)start,
-		       (unsigned long long)orig_end);
+		schedule_timeout(1);
 		goto again;
 	}
 	return 0;
diff --git a/fs/btrfs/super.c b/fs/btrfs/super.c
index 431fdf1..ab9d5e8 100644
--- a/fs/btrfs/super.c
+++ b/fs/btrfs/super.c
@@ -375,6 +375,10 @@
 		filemap_flush(root->fs_info->btree_inode->i_mapping);
 		return 0;
 	}
+
+	btrfs_start_delalloc_inodes(root);
+	btrfs_wait_ordered_extents(root, 0);
+
 	btrfs_clean_old_snapshots(root);
 	trans = btrfs_start_transaction(root, 1);
 	ret = btrfs_commit_transaction(trans, root);
diff --git a/fs/btrfs/zlib.c b/fs/btrfs/zlib.c
index e993091..ba2527d 100644
--- a/fs/btrfs/zlib.c
+++ b/fs/btrfs/zlib.c
@@ -423,8 +423,9 @@
 			/* we didn't make progress in this inflate
 			 * call, we're done
 			 */
-			if (ret != Z_STREAM_END)
+			if (ret != Z_STREAM_END) {
 				ret = -1;
+			}
 			break;
 		}