Merge changes from topic "ota-block-size-compression" into main

* changes:
  snapuserd: Add I/O path support for variable block size
  libsnapshot_cow: Support multi-block compression
diff --git a/fs_mgr/libsnapshot/Android.bp b/fs_mgr/libsnapshot/Android.bp
index a8a7716..5ceaf28 100644
--- a/fs_mgr/libsnapshot/Android.bp
+++ b/fs_mgr/libsnapshot/Android.bp
@@ -108,7 +108,7 @@
     ],
     srcs: [":libsnapshot_sources"],
     static_libs: [
-        "libfs_mgr_binder"
+        "libfs_mgr_binder",
     ],
 }
 
@@ -128,12 +128,12 @@
     static_libs: [
         "libc++fs",
         "libsnapshot_cow",
-    ]
+    ],
 }
 
 cc_library_static {
     name: "libsnapshot_init",
-    native_coverage : true,
+    native_coverage: true,
     defaults: ["libsnapshot_defaults"],
     srcs: [":libsnapshot_sources"],
     ramdisk_available: true,
@@ -204,6 +204,10 @@
         "libsnapshot_cow/writer_v2.cpp",
         "libsnapshot_cow/writer_v3.cpp",
     ],
+
+    header_libs: [
+        "libstorage_literals_headers",
+    ],
     export_include_dirs: ["include"],
     host_supported: true,
     recovery_available: true,
@@ -243,7 +247,10 @@
 
 cc_defaults {
     name: "libsnapshot_test_defaults",
-    defaults: ["libsnapshot_defaults", "libsnapshot_cow_defaults"],
+    defaults: [
+        "libsnapshot_defaults",
+        "libsnapshot_cow_defaults",
+    ],
     srcs: [
         "partition_cow_creator_test.cpp",
         "snapshot_metadata_updater_test.cpp",
@@ -283,10 +290,13 @@
 
 cc_test {
     name: "vts_libsnapshot_test",
-    defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"],
+    defaults: [
+        "libsnapshot_test_defaults",
+        "libsnapshot_hal_deps",
+    ],
     test_suites: [
         "vts",
-        "device-tests"
+        "device-tests",
     ],
     test_options: {
         min_shipping_api_level: 30,
@@ -295,12 +305,15 @@
 
 cc_test {
     name: "vab_legacy_tests",
-    defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"],
+    defaults: [
+        "libsnapshot_test_defaults",
+        "libsnapshot_hal_deps",
+    ],
     cppflags: [
         "-DLIBSNAPSHOT_TEST_VAB_LEGACY",
     ],
     test_suites: [
-        "device-tests"
+        "device-tests",
     ],
     test_options: {
         // Legacy VAB launched in Android R.
@@ -310,12 +323,15 @@
 
 cc_test {
     name: "vabc_legacy_tests",
-    defaults: ["libsnapshot_test_defaults", "libsnapshot_hal_deps"],
+    defaults: [
+        "libsnapshot_test_defaults",
+        "libsnapshot_hal_deps",
+    ],
     cppflags: [
         "-DLIBSNAPSHOT_TEST_VABC_LEGACY",
     ],
     test_suites: [
-        "device-tests"
+        "device-tests",
     ],
     test_options: {
         // Legacy VABC launched in Android S.
@@ -343,7 +359,10 @@
 
 cc_binary {
     name: "snapshotctl",
-    defaults: ["libsnapshot_cow_defaults", "libsnapshot_hal_deps"],
+    defaults: [
+        "libsnapshot_cow_defaults",
+        "libsnapshot_hal_deps",
+    ],
     srcs: [
         "snapshotctl.cpp",
     ],
@@ -412,8 +431,11 @@
         "libgtest",
         "libsnapshot_cow",
     ],
+    header_libs: [
+        "libstorage_literals_headers",
+    ],
     test_suites: [
-        "device-tests"
+        "device-tests",
     ],
     test_options: {
         min_shipping_api_level: 30,
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
index d410c14..6865b19 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
@@ -201,6 +201,12 @@
 static constexpr uint64_t kCowOpSourceInfoTypeBit = 60;
 static constexpr uint64_t kCowOpSourceInfoTypeNumBits = 4;
 static constexpr uint64_t kCowOpSourceInfoTypeMask = (1ULL << kCowOpSourceInfoTypeNumBits) - 1;
+
+static constexpr uint64_t kCowOpSourceInfoCompressionBit = 57;
+static constexpr uint64_t kCowOpSourceInfoCompressionNumBits = 3;
+static constexpr uint64_t kCowOpSourceInfoCompressionMask =
+        ((1ULL << kCowOpSourceInfoCompressionNumBits) - 1);
+
 // The on disk format of cow (currently ==  CowOperation)
 struct CowOperationV3 {
     // If this operation reads from the data section of the COW, this contains
@@ -211,8 +217,8 @@
     uint32_t new_block;
 
     // source_info with have the following layout
-    // |---4 bits ---| ---12 bits---| --- 48 bits ---|
-    // |--- type --- | -- unused -- | --- source --- |
+    // |--- 4 bits -- | --------- 3 bits ------ | --- 9 bits --- | --- 48 bits ---|
+    // |--- type ---  | -- compression factor --| --- unused --- | --- source --- |
     //
     // The value of |source| depends on the operation code.
     //
@@ -225,6 +231,17 @@
     // For ops other than Label:
     //  Bits 47-62 are reserved and must be zero.
     // A block is compressed if it’s data is < block_sz
+    //
+    // Bits [57-59] represents the compression factor.
+    //
+    //       Compression - factor
+    // ==========================
+    // 000 -  4k
+    // 001 -  8k
+    // 010 -  16k
+    // ...
+    // 110 -  256k
+    //
     uint64_t source_info_;
     constexpr uint64_t source() const { return source_info_ & kCowOpSourceInfoDataMask; }
     constexpr void set_source(uint64_t source) {
@@ -245,6 +262,20 @@
         source_info_ |= (static_cast<uint64_t>(type) & kCowOpSourceInfoTypeMask)
                         << kCowOpSourceInfoTypeBit;
     }
+    constexpr void set_compression_bits(uint8_t compression_factor) {
+        // Clear the 3 bits from bit 57 - [57-59]
+        source_info_ &= ~(kCowOpSourceInfoCompressionMask << kCowOpSourceInfoCompressionBit);
+        // Set the actual compression factor
+        source_info_ |=
+                (static_cast<uint64_t>(compression_factor) & kCowOpSourceInfoCompressionMask)
+                << kCowOpSourceInfoCompressionBit;
+    }
+    constexpr uint8_t compression_bits() const {
+        // Grab the 3 bits from [57-59]
+        const auto compression_factor =
+                (source_info_ >> kCowOpSourceInfoCompressionBit) & kCowOpSourceInfoCompressionMask;
+        return static_cast<uint8_t>(compression_factor);
+    }
 } __attribute__((packed));
 
 // Ensure that getters/setters added to CowOperationV3 does not increases size
@@ -326,5 +357,11 @@
 // Convert compression name to internal value.
 std::optional<CowCompressionAlgorithm> CompressionAlgorithmFromString(std::string_view name);
 
+// Return block size used for compression
+size_t CowOpCompressionSize(const CowOperation* op, size_t block_size);
+
+// Return the relative offset of the I/O block which the CowOperation
+// multi-block compression
+bool GetBlockOffset(const CowOperation* op, uint64_t io_block, size_t block_size, off_t* offset);
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
index bf4c79f..3f49c69 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
@@ -162,6 +162,9 @@
     // Creates a clone of the current CowReader without the file handlers
     std::unique_ptr<CowReader> CloneCowReader();
 
+    // Get the max compression size
+    uint32_t GetMaxCompressionSize();
+
     void UpdateMergeOpsCompleted(int num_merge_ops) { header_.num_merge_ops += num_merge_ops; }
 
   private:
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
index 0194ffd..89699dc 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
@@ -119,9 +119,9 @@
 
 class CompressWorker {
   public:
-    CompressWorker(std::unique_ptr<ICompressor>&& compressor, uint32_t block_size);
+    CompressWorker(std::unique_ptr<ICompressor>&& compressor);
     bool RunThread();
-    void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
+    void EnqueueCompressBlocks(const void* buffer, size_t block_size, size_t num_blocks);
     bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
     void Finalize();
     static uint32_t GetDefaultCompressionLevel(CowCompressionAlgorithm compression);
@@ -134,20 +134,22 @@
     struct CompressWork {
         const void* buffer;
         size_t num_blocks;
+        size_t block_size;
         bool compression_status = false;
         std::vector<std::basic_string<uint8_t>> compressed_data;
     };
 
     std::unique_ptr<ICompressor> compressor_;
-    uint32_t block_size_;
 
     std::queue<CompressWork> work_queue_;
     std::queue<CompressWork> compressed_queue_;
     std::mutex lock_;
     std::condition_variable cv_;
     bool stopped_ = false;
+    size_t total_submitted_ = 0;
+    size_t total_processed_ = 0;
 
-    bool CompressBlocks(const void* buffer, size_t num_blocks,
+    bool CompressBlocks(const void* buffer, size_t num_blocks, size_t block_size,
                         std::vector<std::basic_string<uint8_t>>* compressed_data);
 };
 
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
index abc7d33..577cdbd 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
@@ -208,9 +208,9 @@
     std::unique_ptr<ZSTD_CCtx, decltype(&ZSTD_freeCCtx)> zstd_context_;
 };
 
-bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
+bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks, size_t block_size,
                                     std::vector<std::basic_string<uint8_t>>* compressed_data) {
-    return CompressBlocks(compressor_.get(), block_size_, buffer, num_blocks, compressed_data);
+    return CompressBlocks(compressor_.get(), block_size, buffer, num_blocks, compressed_data);
 }
 
 bool CompressWorker::CompressBlocks(ICompressor* compressor, size_t block_size, const void* buffer,
@@ -223,7 +223,7 @@
             PLOG(ERROR) << "CompressBlocks: Compression failed";
             return false;
         }
-        if (data.size() > std::numeric_limits<uint16_t>::max()) {
+        if (data.size() > std::numeric_limits<uint32_t>::max()) {
             LOG(ERROR) << "Compressed block is too large: " << data.size();
             return false;
         }
@@ -254,7 +254,8 @@
         }
 
         // Compress blocks
-        bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data);
+        bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, blocks.block_size,
+                                  &blocks.compressed_data);
         blocks.compression_status = ret;
         {
             std::lock_guard<std::mutex> lock(lock_);
@@ -273,35 +274,31 @@
     return true;
 }
 
-void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) {
+void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t block_size,
+                                           size_t num_blocks) {
     {
         std::lock_guard<std::mutex> lock(lock_);
 
         CompressWork blocks = {};
         blocks.buffer = buffer;
+        blocks.block_size = block_size;
         blocks.num_blocks = num_blocks;
         work_queue_.push(std::move(blocks));
+        total_submitted_ += 1;
     }
     cv_.notify_all();
 }
 
 bool CompressWorker::GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf) {
-    {
+    while (true) {
         std::unique_lock<std::mutex> lock(lock_);
-        while (compressed_queue_.empty() && !stopped_) {
+        while ((total_submitted_ != total_processed_) && compressed_queue_.empty() && !stopped_) {
             cv_.wait(lock);
         }
-
-        if (stopped_) {
-            return true;
-        }
-    }
-
-    {
-        std::lock_guard<std::mutex> lock(lock_);
         while (compressed_queue_.size() > 0) {
             CompressWork blocks = std::move(compressed_queue_.front());
             compressed_queue_.pop();
+            total_processed_ += 1;
 
             if (blocks.compression_status) {
                 compressed_buf->insert(compressed_buf->end(),
@@ -312,9 +309,12 @@
                 return false;
             }
         }
+        if ((total_submitted_ == total_processed_) || stopped_) {
+            total_submitted_ = 0;
+            total_processed_ = 0;
+            return true;
+        }
     }
-
-    return true;
 }
 
 std::unique_ptr<ICompressor> ICompressor::Brotli(uint32_t compression_level,
@@ -344,8 +344,8 @@
     cv_.notify_all();
 }
 
-CompressWorker::CompressWorker(std::unique_ptr<ICompressor>&& compressor, uint32_t block_size)
-    : compressor_(std::move(compressor)), block_size_(block_size) {}
+CompressWorker::CompressWorker(std::unique_ptr<ICompressor>&& compressor)
+    : compressor_(std::move(compressor)) {}
 
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
index 8d1786c..19014c0 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
@@ -21,6 +21,7 @@
 #include <android-base/logging.h>
 #include <android-base/stringprintf.h>
 #include <libsnapshot/cow_format.h>
+#include <storage_literals/storage_literals.h>
 #include "writer_v2.h"
 #include "writer_v3.h"
 
@@ -28,6 +29,7 @@
 namespace snapshot {
 
 using android::base::unique_fd;
+using namespace android::storage_literals;
 
 std::ostream& EmitCowTypeString(std::ostream& os, CowOperationType cow_type) {
     switch (cow_type) {
@@ -174,5 +176,36 @@
     return CreateCowWriter(version, options, unique_fd{-1}, std::nullopt);
 }
 
+size_t CowOpCompressionSize(const CowOperation* op, size_t block_size) {
+    uint8_t compression_bits = op->compression_bits();
+    return (block_size << compression_bits);
+}
+
+bool GetBlockOffset(const CowOperation* op, uint64_t io_block, size_t block_size, off_t* offset) {
+    const uint64_t new_block = op->new_block;
+
+    if (op->type() != kCowReplaceOp || io_block < new_block) {
+        LOG(VERBOSE) << "Invalid IO request for block: " << io_block
+                     << " CowOperation: new_block: " << new_block;
+        return false;
+    }
+
+    // Get the actual compression size
+    const size_t compression_size = CowOpCompressionSize(op, block_size);
+    // Find the number of blocks spanned
+    const size_t num_blocks = compression_size / block_size;
+    // Find the distance of the I/O block which this
+    // CowOperation encompasses
+    const size_t block_distance = io_block - new_block;
+    // Check if this block is within this range;
+    // if so, return the relative offset
+    if (block_distance < num_blocks) {
+        *offset = block_distance * block_size;
+        return true;
+    }
+
+    return false;
+}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
index 1b4a971..6516499 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_reader.cpp
@@ -26,6 +26,7 @@
 #include <android-base/logging.h>
 #include <libsnapshot/cow_format.h>
 #include <libsnapshot/cow_reader.h>
+#include <storage_literals/storage_literals.h>
 #include <zlib.h>
 
 #include "cow_decompress.h"
@@ -35,6 +36,8 @@
 namespace android {
 namespace snapshot {
 
+using namespace android::storage_literals;
+
 bool ReadCowHeader(android::base::borrowed_fd fd, CowHeaderV3* header) {
     if (lseek(fd.get(), 0, SEEK_SET) < 0) {
         PLOG(ERROR) << "lseek header failed";
@@ -161,6 +164,21 @@
     return PrepMergeOps();
 }
 
+uint32_t CowReader::GetMaxCompressionSize() {
+    switch (header_.prefix.major_version) {
+        case 1:
+        case 2:
+            // Old versions supports only 4KB compression.
+            return header_.block_size;
+            ;
+        case 3:
+            return header_.max_compression_size;
+        default:
+            LOG(ERROR) << "Unknown version: " << header_.prefix.major_version;
+            return 0;
+    }
+}
+
 //
 // This sets up the data needed for MergeOpIter. MergeOpIter presents
 // data in the order we intend to merge in.
@@ -705,6 +723,11 @@
 ssize_t CowReader::ReadData(const CowOperation* op, void* buffer, size_t buffer_size,
                             size_t ignore_bytes) {
     std::unique_ptr<IDecompressor> decompressor;
+    const size_t op_buf_size = CowOpCompressionSize(op, header_.block_size);
+    if (!op_buf_size) {
+        LOG(ERROR) << "Compression size is zero. op: " << *op;
+        return -1;
+    }
     switch (GetCompressionType()) {
         case kCowCompressNone:
             break;
@@ -715,12 +738,12 @@
             decompressor = IDecompressor::Brotli();
             break;
         case kCowCompressZstd:
-            if (header_.block_size != op->data_length) {
+            if (op_buf_size != op->data_length) {
                 decompressor = IDecompressor::Zstd();
             }
             break;
         case kCowCompressLz4:
-            if (header_.block_size != op->data_length) {
+            if (op_buf_size != op->data_length) {
                 decompressor = IDecompressor::Lz4();
             }
             break;
@@ -736,14 +759,14 @@
         offset = op->source();
     }
     if (!decompressor ||
-        ((op->data_length == header_.block_size) && (header_.prefix.major_version == 3))) {
+        ((op->data_length == op_buf_size) && (header_.prefix.major_version == 3))) {
         CowDataStream stream(this, offset + ignore_bytes, op->data_length - ignore_bytes);
         return stream.ReadFully(buffer, buffer_size);
     }
 
     CowDataStream stream(this, offset, op->data_length);
     decompressor->set_stream(&stream);
-    return decompressor->Decompress(buffer, buffer_size, header_.block_size, ignore_bytes);
+    return decompressor->Decompress(buffer, buffer_size, op_buf_size, ignore_bytes);
 }
 
 bool CowReader::GetSourceOffset(const CowOperation* op, uint64_t* source_offset) {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp
index fe977b7..a35b614 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v2.cpp
@@ -206,6 +206,8 @@
 
         auto& new_op = out->ops->at(i);
         new_op.set_type(v2_op.type);
+        // v2 ops always have 4k compression
+        new_op.set_compression_bits(0);
         new_op.data_length = v2_op.data_length;
 
         if (v2_op.new_block > std::numeric_limits<uint32_t>::max()) {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/snapshot_reader.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/snapshot_reader.cpp
index 4e90a0f..12073fc 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/snapshot_reader.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/snapshot_reader.cpp
@@ -42,10 +42,21 @@
             op_iter_->Next();
             continue;
         }
-        if (op->new_block >= ops_.size()) {
-            ops_.resize(op->new_block + 1, nullptr);
+
+        size_t num_blocks = 1;
+        if (op->type() == kCowReplaceOp) {
+            num_blocks = (CowOpCompressionSize(op, block_size_) / block_size_);
         }
-        ops_[op->new_block] = op;
+        if (op->new_block >= ops_.size()) {
+            ops_.resize(op->new_block + num_blocks, nullptr);
+        }
+
+        size_t vec_index = op->new_block;
+        while (num_blocks) {
+            ops_[vec_index] = op;
+            num_blocks -= 1;
+            vec_index += 1;
+        }
         op_iter_->Next();
     }
 }
@@ -172,11 +183,20 @@
     } else if (op->type() == kCowZeroOp) {
         memset(buffer, 0, bytes_to_read);
     } else if (op->type() == kCowReplaceOp) {
-        if (cow_->ReadData(op, buffer, bytes_to_read, start_offset) < bytes_to_read) {
-            LOG(ERROR) << "CompressedSnapshotReader failed to read replace op";
+        size_t buffer_size = CowOpCompressionSize(op, block_size_);
+        uint8_t temp_buffer[buffer_size];
+        if (cow_->ReadData(op, temp_buffer, buffer_size, 0) < buffer_size) {
+            LOG(ERROR) << "CompressedSnapshotReader failed to read replace op: buffer_size: "
+                       << buffer_size << "start_offset: " << start_offset;
             errno = EIO;
             return -1;
         }
+        off_t block_offset{};
+        if (!GetBlockOffset(op, chunk, block_size_, &block_offset)) {
+            LOG(ERROR) << "GetBlockOffset failed";
+            return -1;
+        }
+        std::memcpy(buffer, (char*)temp_buffer + block_offset + start_offset, bytes_to_read);
     } else if (op->type() == kCowXorOp) {
         borrowed_fd fd = GetSourceFd();
         if (fd < 0) {
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
index de60213..3c5b394 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
@@ -18,6 +18,7 @@
 #include <libsnapshot/cow_format.h>
 #include <libsnapshot/cow_reader.h>
 #include <libsnapshot/cow_writer.h>
+#include <storage_literals/storage_literals.h>
 #include "writer_v2.h"
 #include "writer_v3.h"
 
@@ -29,6 +30,9 @@
 namespace android {
 namespace snapshot {
 
+using namespace android::storage_literals;
+using ::testing::TestWithParam;
+
 class CowTestV3 : public ::testing::Test {
   protected:
     virtual void SetUp() override {
@@ -484,14 +488,14 @@
     ASSERT_TRUE(reader.Parse(cow_->fd));
 
     auto header = reader.header_v3();
-    ASSERT_EQ(header.sequence_data_count, 0);
+    ASSERT_EQ(header.sequence_data_count, static_cast<uint64_t>(0));
     ASSERT_EQ(header.resume_point_count, 0);
     ASSERT_EQ(header.resume_point_max, 4);
 
     writer->AddLabel(0);
     ASSERT_TRUE(reader.Parse(cow_->fd));
     header = reader.header_v3();
-    ASSERT_EQ(header.sequence_data_count, 0);
+    ASSERT_EQ(header.sequence_data_count, static_cast<uint64_t>(0));
     ASSERT_EQ(header.resume_point_count, 1);
     ASSERT_EQ(header.resume_point_max, 4);
 
@@ -699,5 +703,189 @@
     ASSERT_FALSE(writer->AddZeroBlocks(0, 19));
 }
 
+struct TestParam {
+    std::string compression;
+    int block_size;
+    int num_threads;
+    size_t cluster_ops;
+};
+
+class VariableBlockTest : public ::testing::TestWithParam<TestParam> {
+  protected:
+    virtual void SetUp() override {
+        cow_ = std::make_unique<TemporaryFile>();
+        ASSERT_GE(cow_->fd, 0) << strerror(errno);
+    }
+
+    virtual void TearDown() override { cow_ = nullptr; }
+
+    unique_fd GetCowFd() { return unique_fd{dup(cow_->fd)}; }
+
+    std::unique_ptr<TemporaryFile> cow_;
+};
+
+// Helper to check read sizes.
+static inline void ReadBlockData(CowReader& reader, const CowOperation* op, void* buffer,
+                                 size_t size) {
+    size_t block_size = CowOpCompressionSize(op, 4096);
+    std::string data(block_size, '\0');
+    size_t value = reader.ReadData(op, data.data(), block_size);
+    ASSERT_TRUE(value == block_size);
+    std::memcpy(buffer, data.data(), size);
+}
+
+TEST_P(VariableBlockTest, VariableBlockCompressionTest) {
+    const TestParam params = GetParam();
+
+    CowOptions options;
+    options.op_count_max = 100000;
+    options.compression = params.compression;
+    options.num_compress_threads = params.num_threads;
+    options.batch_write = true;
+    options.compression_factor = params.block_size;
+    options.cluster_ops = params.cluster_ops;
+
+    CowWriterV3 writer(options, GetCowFd());
+
+    ASSERT_TRUE(writer.Initialize());
+
+    std::string xor_data = "This is test data-1. Testing xor";
+    xor_data.resize(options.block_size, '\0');
+    ASSERT_TRUE(writer.AddXorBlocks(50, xor_data.data(), xor_data.size(), 24, 10));
+
+    // Large number of blocks
+    std::string data = "This is test data-2. Testing replace ops";
+    data.resize(options.block_size * 2048, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(100, data.data(), data.size()));
+
+    std::string data2 = "This is test data-3. Testing replace ops";
+    data2.resize(options.block_size * 259, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(6000, data2.data(), data2.size()));
+
+    // Test data size is smaller than block size
+
+    // 4k block
+    std::string data3 = "This is test data-4. Testing replace ops";
+    data3.resize(options.block_size, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(9000, data3.data(), data3.size()));
+
+    // 8k block
+    std::string data4;
+    data4.resize(options.block_size * 2, '\0');
+    for (size_t i = 0; i < data4.size(); i++) {
+        data4[i] = static_cast<char>('A' + i / options.block_size);
+    }
+    ASSERT_TRUE(writer.AddRawBlocks(10000, data4.data(), data4.size()));
+
+    // 16k block
+    std::string data5;
+    data.resize(options.block_size * 4, '\0');
+    for (int i = 0; i < data5.size(); i++) {
+        data5[i] = static_cast<char>('C' + i / options.block_size);
+    }
+    ASSERT_TRUE(writer.AddRawBlocks(11000, data5.data(), data5.size()));
+
+    // 64k Random buffer which cannot be compressed
+    unique_fd rnd_fd(open("/dev/random", O_RDONLY));
+    ASSERT_GE(rnd_fd, 0);
+    std::string random_buffer;
+    random_buffer.resize(65536, '\0');
+    ASSERT_EQ(android::base::ReadFullyAtOffset(rnd_fd, random_buffer.data(), 65536, 0), true);
+    ASSERT_TRUE(writer.AddRawBlocks(12000, random_buffer.data(), 65536));
+
+    ASSERT_TRUE(writer.Finalize());
+
+    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+
+    auto iter = reader.GetOpIter();
+    ASSERT_NE(iter, nullptr);
+
+    while (!iter->AtEnd()) {
+        auto op = iter->Get();
+
+        if (op->type() == kCowXorOp) {
+            std::string sink(xor_data.size(), '\0');
+            ASSERT_EQ(op->new_block, 50);
+            ASSERT_EQ(op->source(), 98314);  // 4096 * 24 + 10
+            ReadBlockData(reader, op, sink.data(), sink.size());
+            ASSERT_EQ(sink, xor_data);
+        }
+        if (op->type() == kCowReplaceOp) {
+            if (op->new_block == 100) {
+                data.resize(options.block_size);
+                std::string sink(data.size(), '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink.size(), data.size());
+                ASSERT_EQ(sink, data);
+            }
+            if (op->new_block == 6000) {
+                data2.resize(options.block_size);
+                std::string sink(data2.size(), '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink, data2);
+            }
+            if (op->new_block == 9000) {
+                std::string sink(data3.size(), '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink, data3);
+            }
+            if (op->new_block == 10000) {
+                data4.resize(options.block_size);
+                std::string sink(options.block_size, '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink, data4);
+            }
+            if (op->new_block == 11000) {
+                data5.resize(options.block_size);
+                std::string sink(options.block_size, '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink, data5);
+            }
+            if (op->new_block == 12000) {
+                random_buffer.resize(options.block_size);
+                std::string sink(options.block_size, '\0');
+                ReadBlockData(reader, op, sink.data(), sink.size());
+                ASSERT_EQ(sink, random_buffer);
+            }
+        }
+
+        iter->Next();
+    }
+}
+
+std::vector<TestParam> GetTestConfigs() {
+    std::vector<TestParam> testParams;
+
+    std::vector<int> block_sizes = {4_KiB, 8_KiB, 16_KiB, 32_KiB, 64_KiB, 128_KiB, 256_KiB};
+    std::vector<std::string> compression_algo = {"none", "lz4", "zstd", "gz"};
+    std::vector<int> threads = {1, 2};
+    // This will also test batch size
+    std::vector<size_t> cluster_ops = {1, 256};
+
+    // This should test 112 combination
+    for (auto block : block_sizes) {
+        for (auto compression : compression_algo) {
+            for (auto thread : threads) {
+                for (auto cluster : cluster_ops) {
+                    TestParam param;
+                    param.block_size = block;
+                    param.compression = compression;
+                    param.num_threads = thread;
+                    param.cluster_ops = cluster;
+                    testParams.push_back(std::move(param));
+                }
+            }
+        }
+    }
+
+    return testParams;
+}
+
+INSTANTIATE_TEST_SUITE_P(CompressorsWithVariableBlocks, VariableBlockTest,
+                         ::testing::ValuesIn(GetTestConfigs()));
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp
index 75cd111..d0864e0 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v2.cpp
@@ -185,7 +185,7 @@
     for (int i = 0; i < num_compress_threads_; i++) {
         std::unique_ptr<ICompressor> compressor =
                 ICompressor::Create(compression_, header_.block_size);
-        auto wt = std::make_unique<CompressWorker>(std::move(compressor), header_.block_size);
+        auto wt = std::make_unique<CompressWorker>(std::move(compressor));
         threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
         compress_threads_.push_back(std::move(wt));
     }
@@ -353,7 +353,7 @@
         if (i == num_threads - 1) {
             num_blocks_per_thread = num_blocks;
         }
-        worker->EnqueueCompressBlocks(iter, num_blocks_per_thread);
+        worker->EnqueueCompressBlocks(iter, header_.block_size, num_blocks_per_thread);
         iter += (num_blocks_per_thread * header_.block_size);
         num_blocks -= num_blocks_per_thread;
     }
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
index 251b24e..c92460a 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
@@ -37,6 +37,7 @@
 #include <libsnapshot/cow_compress.h>
 #include <libsnapshot_cow/parser_v3.h>
 #include <linux/fs.h>
+#include <storage_literals/storage_literals.h>
 #include <sys/ioctl.h>
 #include <unistd.h>
 #include <numeric>
@@ -54,6 +55,7 @@
 
 static_assert(sizeof(off_t) == sizeof(uint64_t));
 
+using namespace android::storage_literals;
 using android::base::unique_fd;
 
 // Divide |x| by |y| and round up to the nearest integer.
@@ -77,9 +79,9 @@
     threads_.clear();
     for (size_t i = 0; i < num_compress_threads_; i++) {
         std::unique_ptr<ICompressor> compressor =
-                ICompressor::Create(compression_, header_.block_size);
+                ICompressor::Create(compression_, header_.max_compression_size);
         auto&& wt = compress_threads_.emplace_back(
-                std::make_unique<CompressWorker>(std::move(compressor), header_.block_size));
+                std::make_unique<CompressWorker>(std::move(compressor)));
         threads_.emplace_back(std::thread([wt = wt.get()]() { wt->RunThread(); }));
     }
     LOG(INFO) << num_compress_threads_ << " thread used for compression";
@@ -111,10 +113,15 @@
     header_.op_count_max = 0;
     header_.compression_algorithm = kCowCompressNone;
     header_.max_compression_size = options_.compression_factor;
-    return;
 }
 
 bool CowWriterV3::ParseOptions() {
+    if (!header_.max_compression_size || !IsBlockAligned(header_.max_compression_size)) {
+        LOG(ERROR) << "Invalid compression factor: " << header_.max_compression_size;
+        return false;
+    }
+
+    LOG(INFO) << "Compression factor: " << header_.max_compression_size;
     num_compress_threads_ = std::max(int(options_.num_compress_threads), 1);
     auto parts = android::base::Split(options_.compression, ",");
     if (parts.size() > 2) {
@@ -154,20 +161,22 @@
 
     compression_.algorithm = *algorithm;
     if (compression_.algorithm != kCowCompressNone) {
-        compressor_ = ICompressor::Create(compression_, header_.block_size);
+        compressor_ = ICompressor::Create(compression_, header_.max_compression_size);
         if (compressor_ == nullptr) {
             LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm;
             return false;
         }
-        if (options_.cluster_ops &&
-            (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
-             options_.batch_write)) {
-            batch_size_ = std::max<size_t>(options_.cluster_ops, 1);
-            data_vec_.reserve(batch_size_);
-            cached_data_.reserve(batch_size_);
-            cached_ops_.reserve(batch_size_);
-        }
     }
+
+    if (options_.cluster_ops &&
+        (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
+         options_.batch_write)) {
+        batch_size_ = std::max<size_t>(options_.cluster_ops, 1);
+        data_vec_.reserve(batch_size_);
+        cached_data_.reserve(batch_size_);
+        cached_ops_.reserve(batch_size_);
+    }
+
     if (batch_size_ > 1) {
         LOG(INFO) << "Batch writes: enabled with batch size " << batch_size_;
     } else {
@@ -178,6 +187,7 @@
         num_compress_threads_ = options_.num_compress_threads;
     }
     InitWorkers();
+
     return true;
 }
 
@@ -206,6 +216,14 @@
         }
     }
 
+    // TODO: b/322279333
+    // Set compression factor to 4k during estimation.
+    // Once COW estimator is ready to support variable
+    // block size, this check has to be removed.
+    if (IsEstimating()) {
+        header_.max_compression_size = header_.block_size;
+    }
+
     return true;
 }
 
@@ -328,6 +346,46 @@
     return cached_data_.size() >= batch_size_ || cached_ops_.size() >= batch_size_ * 16;
 }
 
+bool CowWriterV3::ConstructCowOpCompressedBuffers(uint64_t new_block_start, const void* data,
+                                                  uint64_t old_block, uint16_t offset,
+                                                  CowOperationType type, size_t blocks_to_write) {
+    size_t compressed_bytes = 0;
+    auto&& blocks = CompressBlocks(blocks_to_write, data, type);
+    if (blocks.empty()) {
+        LOG(ERROR) << "Failed to compress blocks " << new_block_start << ", " << blocks_to_write
+                   << ", actual number of blocks received from compressor " << blocks.size();
+        return false;
+    }
+    size_t blocks_written = 0;
+    for (size_t blk_index = 0; blk_index < blocks.size(); blk_index++) {
+        CowOperation& op = cached_ops_.emplace_back();
+        auto& vec = data_vec_.emplace_back();
+        CompressedBuffer buffer = std::move(blocks[blk_index]);
+        auto& compressed_data = cached_data_.emplace_back(std::move(buffer.compressed_data));
+        op.new_block = new_block_start + blocks_written;
+
+        op.set_type(type);
+        op.set_compression_bits(std::log2(buffer.compression_factor / header_.block_size));
+
+        if (type == kCowXorOp) {
+            op.set_source((old_block + blocks_written) * header_.block_size + offset);
+        } else {
+            op.set_source(next_data_pos_ + compressed_bytes);
+        }
+
+        vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
+        op.data_length = vec.iov_len;
+        compressed_bytes += op.data_length;
+        blocks_written += (buffer.compression_factor / header_.block_size);
+    }
+    if (blocks_written != blocks_to_write) {
+        LOG(ERROR) << "Total compressed blocks: " << blocks_written
+                   << " Expected: " << blocks_to_write;
+        return false;
+    }
+    return true;
+}
+
 bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
                              uint64_t old_block, uint16_t offset, CowOperationType type) {
     if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) {
@@ -341,38 +399,21 @@
         return false;
     }
     for (size_t i = 0; i < num_blocks;) {
-        const auto blocks_to_write =
+        const size_t blocks_to_write =
                 std::min<size_t>(batch_size_ - cached_data_.size(), num_blocks - i);
-        size_t compressed_bytes = 0;
-        auto&& blocks = CompressBlocks(blocks_to_write, bytes + header_.block_size * i);
-        if (blocks.size() != blocks_to_write) {
-            LOG(ERROR) << "Failed to compress blocks " << new_block_start + i << ", "
-                       << blocks_to_write << ", actual number of blocks received from compressor "
-                       << blocks.size();
+
+        if (!ConstructCowOpCompressedBuffers(new_block_start + i, bytes + header_.block_size * i,
+                                             old_block + i, offset, type, blocks_to_write)) {
             return false;
         }
-        for (size_t j = 0; j < blocks_to_write; j++) {
-            CowOperation& op = cached_ops_.emplace_back();
-            auto& vec = data_vec_.emplace_back();
-            auto& compressed_data = cached_data_.emplace_back(std::move(blocks[j]));
-            op.new_block = new_block_start + i + j;
 
-            op.set_type(type);
-            if (type == kCowXorOp) {
-                op.set_source((old_block + i + j) * header_.block_size + offset);
-            } else {
-                op.set_source(next_data_pos_ + compressed_bytes);
-            }
-            vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
-            op.data_length = vec.iov_len;
-            compressed_bytes += op.data_length;
-        }
         if (NeedsFlush() && !FlushCacheOps()) {
             LOG(ERROR) << "EmitBlocks with compression: write failed. new block: "
                        << new_block_start << " compression: " << compression_.algorithm
                        << ", op type: " << type;
             return false;
         }
+
         i += blocks_to_write;
     }
 
@@ -482,55 +523,165 @@
     return true;
 }
 
-std::vector<std::basic_string<uint8_t>> CowWriterV3::CompressBlocks(const size_t num_blocks,
-                                                                    const void* data) {
-    const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
-    const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
-    std::vector<std::basic_string<uint8_t>> compressed_buf;
-    compressed_buf.clear();
-    const uint8_t* const iter = reinterpret_cast<const uint8_t*>(data);
-    if (compression_.algorithm == kCowCompressNone) {
-        for (size_t i = 0; i < num_blocks; i++) {
-            auto& buf = compressed_buf.emplace_back();
-            buf.resize(header_.block_size);
-            std::memcpy(buf.data(), iter + i * header_.block_size, header_.block_size);
-        }
-        return compressed_buf;
+size_t CowWriterV3::GetCompressionFactor(const size_t blocks_to_compress,
+                                         CowOperationType type) const {
+    // For XOR ops, we don't support bigger block size compression yet.
+    // For bigger block size support, snapshot-merge also has to changed. We
+    // aren't there yet; hence, just stick to 4k for now until
+    // snapshot-merge is ready for XOR operation.
+    if (type == kCowXorOp) {
+        return header_.block_size;
     }
-    if (num_threads <= 1) {
-        if (!CompressWorker::CompressBlocks(compressor_.get(), header_.block_size, data, num_blocks,
-                                            &compressed_buf)) {
+
+    size_t compression_factor = header_.max_compression_size;
+    while (compression_factor > header_.block_size) {
+        size_t num_blocks = compression_factor / header_.block_size;
+        if (blocks_to_compress >= num_blocks) {
+            return compression_factor;
+        }
+        compression_factor >>= 1;
+    }
+    return header_.block_size;
+}
+
+std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithNoCompression(
+        const size_t num_blocks, const void* data, CowOperationType type) {
+    size_t blocks_to_compress = num_blocks;
+    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
+    std::vector<CompressedBuffer> compressed_vec;
+
+    while (blocks_to_compress) {
+        CompressedBuffer buffer;
+
+        const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
+        size_t num_blocks = compression_factor / header_.block_size;
+
+        buffer.compression_factor = compression_factor;
+        buffer.compressed_data.resize(compression_factor);
+
+        // No compression. Just copy the data as-is.
+        std::memcpy(buffer.compressed_data.data(), iter, compression_factor);
+
+        compressed_vec.push_back(std::move(buffer));
+        blocks_to_compress -= num_blocks;
+        iter += compression_factor;
+    }
+    return compressed_vec;
+}
+
+std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithCompression(
+        const size_t num_blocks, const void* data, CowOperationType type) {
+    size_t blocks_to_compress = num_blocks;
+    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
+    std::vector<CompressedBuffer> compressed_vec;
+
+    while (blocks_to_compress) {
+        CompressedBuffer buffer;
+
+        const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
+        size_t num_blocks = compression_factor / header_.block_size;
+
+        buffer.compression_factor = compression_factor;
+        // Compress the blocks
+        buffer.compressed_data = compressor_->Compress(iter, compression_factor);
+        if (buffer.compressed_data.empty()) {
+            PLOG(ERROR) << "Compression failed";
             return {};
         }
-    } else {
-        // Submit the blocks per thread. The retrieval of
-        // compressed buffers has to be done in the same order.
-        // We should not poll for completed buffers in a different order as the
-        // buffers are tightly coupled with block ordering.
-        for (size_t i = 0; i < num_threads; i++) {
-            CompressWorker* worker = compress_threads_[i].get();
-            const auto blocks_in_batch =
-                    std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
-            worker->EnqueueCompressBlocks(iter + i * blocks_per_thread * header_.block_size,
-                                          blocks_in_batch);
+
+        // Check if the buffer was indeed compressed
+        if (buffer.compressed_data.size() >= compression_factor) {
+            buffer.compressed_data.resize(compression_factor);
+            std::memcpy(buffer.compressed_data.data(), iter, compression_factor);
         }
 
-        for (size_t i = 0; i < num_threads; i++) {
-            CompressWorker* worker = compress_threads_[i].get();
-            if (!worker->GetCompressedBuffers(&compressed_buf)) {
-                return {};
-            }
+        compressed_vec.push_back(std::move(buffer));
+        blocks_to_compress -= num_blocks;
+        iter += compression_factor;
+    }
+    return compressed_vec;
+}
+
+std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreadedCompression(
+        const size_t num_blocks, const void* data, CowOperationType type) {
+    const size_t num_threads = num_compress_threads_;
+    const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
+    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
+
+    std::vector<CompressedBuffer> compressed_vec;
+    // Submit the blocks per thread. The retrieval of
+    // compressed buffers has to be done in the same order.
+    // We should not poll for completed buffers in a different order as the
+    // buffers are tightly coupled with block ordering.
+    for (size_t i = 0; i < num_threads; i++) {
+        CompressWorker* worker = compress_threads_[i].get();
+        auto blocks_in_batch = std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
+        // Enqueue the blocks to be compressed for each thread.
+        while (blocks_in_batch) {
+            CompressedBuffer buffer;
+
+            const size_t compression_factor = GetCompressionFactor(blocks_in_batch, type);
+            size_t num_blocks = compression_factor / header_.block_size;
+
+            buffer.compression_factor = compression_factor;
+            worker->EnqueueCompressBlocks(iter, compression_factor, 1);
+            compressed_vec.push_back(std::move(buffer));
+            blocks_in_batch -= num_blocks;
+            iter += compression_factor;
         }
     }
-    for (size_t i = 0; i < num_blocks; i++) {
+
+    // Fetch compressed buffers from the threads
+    std::vector<std::basic_string<uint8_t>> compressed_buf;
+    compressed_buf.clear();
+    for (size_t i = 0; i < num_threads; i++) {
+        CompressWorker* worker = compress_threads_[i].get();
+        if (!worker->GetCompressedBuffers(&compressed_buf)) {
+            return {};
+        }
+    }
+
+    if (compressed_vec.size() != compressed_buf.size()) {
+        LOG(ERROR) << "Compressed buffer size: " << compressed_buf.size()
+                   << " - Expected: " << compressed_vec.size();
+        return {};
+    }
+
+    iter = reinterpret_cast<const uint8_t*>(data);
+    // Walk through all the compressed buffers
+    for (size_t i = 0; i < compressed_buf.size(); i++) {
+        auto& buffer = compressed_vec[i];
         auto& block = compressed_buf[i];
-        if (block.size() >= header_.block_size) {
-            block.resize(header_.block_size);
-            std::memcpy(block.data(), iter + header_.block_size * i, header_.block_size);
+        size_t block_size = buffer.compression_factor;
+        // Check if the blocks was indeed compressed
+        if (block.size() >= block_size) {
+            buffer.compressed_data.resize(block_size);
+            std::memcpy(buffer.compressed_data.data(), iter, block_size);
+        } else {
+            // Compressed block
+            buffer.compressed_data.resize(block.size());
+            std::memcpy(buffer.compressed_data.data(), block.data(), block.size());
         }
+        iter += block_size;
+    }
+    return compressed_vec;
+}
+
+std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::CompressBlocks(const size_t num_blocks,
+                                                                       const void* data,
+                                                                       CowOperationType type) {
+    if (compression_.algorithm == kCowCompressNone) {
+        return ProcessBlocksWithNoCompression(num_blocks, data, type);
     }
 
-    return compressed_buf;
+    const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
+
+    // If no threads are required, just compress the blocks inline.
+    if (num_threads <= 1) {
+        return ProcessBlocksWithCompression(num_blocks, data, type);
+    }
+
+    return ProcessBlocksWithThreadedCompression(num_blocks, data, type);
 }
 
 bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
index b19af60..4915e9c 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
@@ -19,11 +19,15 @@
 #include <thread>
 #include <vector>
 
+#include <libsnapshot/cow_format.h>
+#include <storage_literals/storage_literals.h>
 #include "writer_base.h"
 
 namespace android {
 namespace snapshot {
 
+using namespace android::storage_literals;
+
 class CowWriterV3 : public CowWriterBase {
   public:
     explicit CowWriterV3(const CowOptions& options, android::base::unique_fd&& fd);
@@ -43,6 +47,10 @@
     virtual bool EmitSequenceData(size_t num_ops, const uint32_t* data) override;
 
   private:
+    struct CompressedBuffer {
+        size_t compression_factor;
+        std::basic_string<uint8_t> compressed_data;
+    };
     void SetupHeaders();
     bool NeedsFlush() const;
     bool ParseOptions();
@@ -52,11 +60,38 @@
                         std::basic_string_view<struct iovec> data);
     bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
                     uint16_t offset, CowOperationType type);
+    bool ConstructCowOpCompressedBuffers(uint64_t new_block_start, const void* data,
+                                         uint64_t old_block, uint16_t offset, CowOperationType type,
+                                         size_t blocks_to_write);
     bool CheckOpCount(size_t op_count);
 
   private:
-    std::vector<std::basic_string<uint8_t>> CompressBlocks(const size_t num_blocks,
-                                                           const void* data);
+    std::vector<CompressedBuffer> ProcessBlocksWithNoCompression(const size_t num_blocks,
+                                                                 const void* data,
+                                                                 CowOperationType type);
+    std::vector<CompressedBuffer> ProcessBlocksWithCompression(const size_t num_blocks,
+                                                               const void* data,
+                                                               CowOperationType type);
+    std::vector<CompressedBuffer> ProcessBlocksWithThreadedCompression(const size_t num_blocks,
+                                                                       const void* data,
+                                                                       CowOperationType type);
+    std::vector<CompressedBuffer> CompressBlocks(const size_t num_blocks, const void* data,
+                                                 CowOperationType type);
+    size_t GetCompressionFactor(const size_t blocks_to_compress, CowOperationType type) const;
+
+    constexpr bool IsBlockAligned(const size_t size) {
+        // These are the only block size supported. Block size beyond 256k
+        // may impact random read performance post OTA boot.
+        const size_t values[] = {4_KiB, 8_KiB, 16_KiB, 32_KiB, 64_KiB, 128_KiB, 256_KiB};
+
+        auto it = std::lower_bound(std::begin(values), std::end(values), size);
+
+        if (it != std::end(values) && *it == size) {
+            return true;
+        }
+        return false;
+    }
+
     bool ReadBackVerification();
     bool FlushCacheOps();
     void InitWorkers();
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
index 1e7d0c0..bd7eaca 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
@@ -13,10 +13,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "merge_worker.h"
 
+#include <libsnapshot/cow_format.h>
 #include <pthread.h>
 
+#include "merge_worker.h"
 #include "snapuserd_core.h"
 #include "utility.h"
 
@@ -37,6 +38,7 @@
     int num_ops = *pending_ops;
     int nr_consecutive = 0;
     bool checkOrderedOp = (replace_zero_vec == nullptr);
+    size_t num_blocks = 1;
 
     do {
         if (!cowop_iter_->AtEnd() && num_ops) {
@@ -48,11 +50,15 @@
             *source_offset = cow_op->new_block * BLOCK_SZ;
             if (!checkOrderedOp) {
                 replace_zero_vec->push_back(cow_op);
+                if (cow_op->type() == kCowReplaceOp) {
+                    // Get the number of blocks this op has compressed
+                    num_blocks = (CowOpCompressionSize(cow_op, BLOCK_SZ) / BLOCK_SZ);
+                }
             }
 
             cowop_iter_->Next();
-            num_ops -= 1;
-            nr_consecutive = 1;
+            num_ops -= num_blocks;
+            nr_consecutive = num_blocks;
 
             while (!cowop_iter_->AtEnd() && num_ops) {
                 const CowOperation* op = cowop_iter_->Get();
@@ -66,11 +72,20 @@
                 }
 
                 if (!checkOrderedOp) {
+                    if (op->type() == kCowReplaceOp) {
+                        num_blocks = (CowOpCompressionSize(op, BLOCK_SZ) / BLOCK_SZ);
+                        if (num_ops < num_blocks) {
+                            break;
+                        }
+                    } else {
+                        // zero op
+                        num_blocks = 1;
+                    }
                     replace_zero_vec->push_back(op);
                 }
 
-                nr_consecutive += 1;
-                num_ops -= 1;
+                nr_consecutive += num_blocks;
+                num_ops -= num_blocks;
                 cowop_iter_->Next();
             }
         }
@@ -108,18 +123,24 @@
 
         for (size_t i = 0; i < replace_zero_vec.size(); i++) {
             const CowOperation* cow_op = replace_zero_vec[i];
-
-            void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
-            if (!buffer) {
-                SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
-                return false;
-            }
             if (cow_op->type() == kCowReplaceOp) {
-                if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
+                size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
+                void* buffer = bufsink_.AcquireBuffer(buffer_size);
+                if (!buffer) {
+                    SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
+                    return false;
+                }
+                // Read the entire compressed buffer spanning multiple blocks
+                if (!reader_->ReadData(cow_op, buffer, buffer_size)) {
                     SNAP_LOG(ERROR) << "Failed to read COW in merge";
                     return false;
                 }
             } else {
+                void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ);
+                if (!buffer) {
+                    SNAP_LOG(ERROR) << "AcquireBuffer failed in MergeReplaceOps";
+                    return false;
+                }
                 CHECK(cow_op->type() == kCowZeroOp);
                 memset(buffer, 0, BLOCK_SZ);
             }
@@ -137,7 +158,7 @@
             return false;
         }
 
-        num_ops_merged += linear_blocks;
+        num_ops_merged += replace_zero_vec.size();
 
         if (num_ops_merged >= total_ops_merged_per_commit) {
             // Flush the data
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
index f1d4065..d40b6d1 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
@@ -14,10 +14,10 @@
  * limitations under the License.
  */
 
-#include "read_worker.h"
-
+#include <libsnapshot/cow_format.h>
 #include <pthread.h>
 
+#include "read_worker.h"
 #include "snapuserd_core.h"
 #include "utility.h"
 
@@ -48,9 +48,10 @@
 // Start the replace operation. This will read the
 // internal COW format and if the block is compressed,
 // it will be de-compressed.
-bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op, void* buffer) {
-    if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
-        SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
+bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op, void* buffer, size_t buffer_size) {
+    if (!reader_->ReadData(cow_op, buffer, buffer_size)) {
+        SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block
+                        << " buffer_size: " << buffer_size;
         return false;
     }
     return true;
@@ -183,7 +184,13 @@
 
     switch (cow_op->type()) {
         case kCowReplaceOp: {
-            return ProcessReplaceOp(cow_op, buffer);
+            size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
+            uint8_t chunk[buffer_size];
+            if (!ProcessReplaceOp(cow_op, chunk, buffer_size)) {
+                return false;
+            }
+            std::memcpy(buffer, chunk, BLOCK_SZ);
+            return true;
         }
 
         case kCowZeroOp: {
@@ -209,6 +216,13 @@
         return false;
     }
 
+    const size_t compression_factor = reader_->GetMaxCompressionSize();
+    if (!compression_factor) {
+        SNAP_LOG(ERROR) << "Compression factor is set to 0 which is invalid.";
+        return false;
+    }
+    decompressed_buffer_ = std::make_unique<uint8_t[]>(compression_factor);
+
     backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
     if (backing_store_fd_ < 0) {
         SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
@@ -276,6 +290,20 @@
     return true;
 }
 
+bool ReadWorker::GetCowOpBlockOffset(const CowOperation* cow_op, uint64_t io_block,
+                                     off_t* block_offset) {
+    // If this is a replace op, get the block offset of this I/O
+    // block. Multi-block compression is supported only for
+    // Replace ops.
+    //
+    // Note: This can be extended when we support COPY and XOR ops down the
+    // line as the blocks are mostly contiguous.
+    if (cow_op && cow_op->type() == kCowReplaceOp) {
+        return GetBlockOffset(cow_op, io_block, BLOCK_SZ, block_offset);
+    }
+    return false;
+}
+
 bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
     size_t remaining_size = sz;
     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
@@ -286,7 +314,7 @@
         size_t read_size = std::min(PAYLOAD_BUFFER_SZ, remaining_size);
 
         size_t total_bytes_read = 0;
-
+        const CowOperation* prev_op = nullptr;
         while (read_size) {
             // We need to check every 4k block to verify if it is
             // present in the mapping.
@@ -294,7 +322,7 @@
 
             auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
                                        std::make_pair(sector, nullptr), SnapshotHandler::compare);
-            bool not_found = (it == chunk_vec.end() || it->first != sector);
+            const bool sector_not_found = (it == chunk_vec.end() || it->first != sector);
 
             void* buffer = block_server_->GetResponseBuffer(BLOCK_SZ, size);
             if (!buffer) {
@@ -302,15 +330,88 @@
                 return false;
             }
 
-            if (not_found) {
-                // Block not found in map - which means this block was not
-                // changed as per the OTA. Just route the I/O to the base
-                // device.
-                if (!ReadDataFromBaseDevice(sector, buffer, size)) {
-                    SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
-                    return false;
+            if (sector_not_found) {
+                // Find the 4k block
+                uint64_t io_block = SectorToChunk(sector);
+                // Get the previous iterator. Since the vector is sorted, the
+                // lookup of this sector can fall in a range of blocks if
+                // CowOperation has compressed multiple blocks.
+                if (it != chunk_vec.begin()) {
+                    std::advance(it, -1);
                 }
 
+                bool is_mapping_present = true;
+
+                // Vector itself is empty. This can happen if the block was not
+                // changed per the OTA or if the merge was already complete but
+                // snapshot table was not yet collapsed.
+                if (it == chunk_vec.end()) {
+                    is_mapping_present = false;
+                }
+
+                const CowOperation* cow_op = nullptr;
+                // Relative offset within the compressed multiple blocks
+                off_t block_offset = 0;
+                if (is_mapping_present) {
+                    // Get the nearest operation found in the vector
+                    cow_op = it->second;
+                    is_mapping_present = GetCowOpBlockOffset(cow_op, io_block, &block_offset);
+                }
+
+                // Thus, we have a case wherein sector was not found in the sorted
+                // vector; however, we indeed have a mapping of this sector
+                // embedded in one of the CowOperation which spans multiple
+                // block size.
+                if (is_mapping_present) {
+                    // block_offset = 0 would mean that the CowOperation should
+                    // already be in the sorted vector. Hence, lookup should
+                    // have already found it. If not, this is a bug.
+                    if (block_offset == 0) {
+                        SNAP_LOG(ERROR)
+                                << "GetBlockOffset returned offset 0 for io_block: " << io_block;
+                        return false;
+                    }
+
+                    // Get the CowOperation actual compression size
+                    size_t compression_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
+                    // Offset cannot be greater than the compression size
+                    if (block_offset > compression_size) {
+                        SNAP_LOG(ERROR) << "Invalid I/O block found. io_block: " << io_block
+                                        << " CowOperation-new-block: " << cow_op->new_block
+                                        << " compression-size: " << compression_size;
+                        return false;
+                    }
+
+                    // Cached copy of the previous iteration. Just retrieve the
+                    // data
+                    if (prev_op && prev_op->new_block == cow_op->new_block) {
+                        std::memcpy(buffer, (char*)decompressed_buffer_.get() + block_offset, size);
+                    } else {
+                        // Get the data from the disk based on the compression
+                        // size
+                        if (!ProcessReplaceOp(cow_op, decompressed_buffer_.get(),
+                                              compression_size)) {
+                            return false;
+                        }
+                        // Copy the data from the decompressed buffer relative
+                        // to the i/o block offset.
+                        std::memcpy(buffer, (char*)decompressed_buffer_.get() + block_offset, size);
+                        // Cache this CowOperation pointer for successive I/O
+                        // operation. Since the request is sequential and the
+                        // block is already decompressed, subsequest I/O blocks
+                        // can fetch the data directly from this decompressed
+                        // buffer.
+                        prev_op = cow_op;
+                    }
+                } else {
+                    // Block not found in map - which means this block was not
+                    // changed as per the OTA. Just route the I/O to the base
+                    // device.
+                    if (!ReadDataFromBaseDevice(sector, buffer, size)) {
+                        SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
+                        return false;
+                    }
+                }
                 ret = size;
             } else {
                 // We found the sector in mapping. Check the type of COW OP and
@@ -341,12 +442,50 @@
     return true;
 }
 
+bool ReadWorker::IsMappingPresent(const CowOperation* cow_op, loff_t requested_offset,
+                                  loff_t cow_op_offset) {
+    const bool replace_op = (cow_op->type() == kCowReplaceOp);
+    if (replace_op) {
+        size_t max_compressed_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
+        if ((requested_offset >= cow_op_offset) &&
+            (requested_offset < (cow_op_offset + max_compressed_size))) {
+            return true;
+        }
+    }
+    return false;
+}
+
 int ReadWorker::ReadUnalignedSector(
         sector_t sector, size_t size,
         std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
     SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
                     << " Aligned sector: " << it->first;
 
+    loff_t requested_offset = sector << SECTOR_SHIFT;
+    loff_t final_offset = (it->first) << SECTOR_SHIFT;
+
+    const CowOperation* cow_op = it->second;
+    if (IsMappingPresent(cow_op, requested_offset, final_offset)) {
+        size_t buffer_size = CowOpCompressionSize(cow_op, BLOCK_SZ);
+        uint8_t chunk[buffer_size];
+        // Read the entire decompressed buffer based on the block-size
+        if (!ProcessReplaceOp(cow_op, chunk, buffer_size)) {
+            return -1;
+        }
+        size_t skip_offset = (requested_offset - final_offset);
+        size_t write_sz = std::min(size, buffer_size - skip_offset);
+
+        auto buffer =
+                reinterpret_cast<uint8_t*>(block_server_->GetResponseBuffer(BLOCK_SZ, write_sz));
+        if (!buffer) {
+            SNAP_LOG(ERROR) << "ReadUnalignedSector failed to allocate buffer";
+            return -1;
+        }
+
+        std::memcpy(buffer, (char*)chunk + skip_offset, write_sz);
+        return write_sz;
+    }
+
     int num_sectors_skip = sector - it->first;
     size_t skip_size = num_sectors_skip << SECTOR_SHIFT;
     size_t write_size = std::min(size, BLOCK_SZ - skip_size);
@@ -445,8 +584,11 @@
 
     size_t remaining_size = size;
     int ret = 0;
+
+    const CowOperation* cow_op = it->second;
     if (!merge_complete && (requested_offset >= final_offset) &&
-        (requested_offset - final_offset) < BLOCK_SZ) {
+        (((requested_offset - final_offset) < BLOCK_SZ) ||
+         IsMappingPresent(cow_op, requested_offset, final_offset))) {
         // Read the partial un-aligned data
         ret = ReadUnalignedSector(sector, remaining_size, it);
         if (ret < 0) {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
index 1aff50c..43e896a 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
@@ -44,9 +44,12 @@
     bool ProcessXorOp(const CowOperation* cow_op, void* buffer);
     bool ProcessOrderedOp(const CowOperation* cow_op, void* buffer);
     bool ProcessCopyOp(const CowOperation* cow_op, void* buffer);
-    bool ProcessReplaceOp(const CowOperation* cow_op, void* buffer);
+    bool ProcessReplaceOp(const CowOperation* cow_op, void* buffer, size_t buffer_size);
     bool ProcessZeroOp(void* buffer);
 
+    bool IsMappingPresent(const CowOperation* cow_op, loff_t requested_offset,
+                          loff_t cow_op_offset);
+    bool GetCowOpBlockOffset(const CowOperation* cow_op, uint64_t io_block, off_t* block_offset);
     bool ReadAlignedSector(sector_t sector, size_t sz);
     bool ReadUnalignedSector(sector_t sector, size_t size);
     int ReadUnalignedSector(sector_t sector, size_t size,
@@ -56,6 +59,7 @@
 
     constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
     constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
+    constexpr chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
 
     std::string backing_store_device_;
     unique_fd backing_store_fd_;
@@ -67,6 +71,7 @@
 
     std::basic_string<uint8_t> xor_buffer_;
     std::unique_ptr<void, decltype(&::free)> aligned_buffer_;
+    std::unique_ptr<uint8_t[]> decompressed_buffer_;
 };
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
index 8ddb0f4..76b44b4 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
@@ -64,6 +64,9 @@
 struct TestParam {
     bool io_uring;
     bool o_direct;
+    std::string compression;
+    int block_size;
+    int num_threads;
 };
 
 class SnapuserdTestBase : public ::testing::TestWithParam<TestParam> {
@@ -74,6 +77,7 @@
     void CreateCowDevice();
     void SetDeviceControlName();
     std::unique_ptr<ICowWriter> CreateCowDeviceInternal();
+    std::unique_ptr<ICowWriter> CreateV3Cow();
 
     std::unique_ptr<ITestHarness> harness_;
     size_t size_ = 10_MiB;
@@ -133,6 +137,24 @@
     return CreateCowWriter(kDefaultCowVersion, options, std::move(fd));
 }
 
+std::unique_ptr<ICowWriter> SnapuserdTestBase::CreateV3Cow() {
+    const TestParam params = GetParam();
+
+    CowOptions options;
+    options.op_count_max = 100000;
+    options.compression = params.compression;
+    options.num_compress_threads = params.num_threads;
+    options.batch_write = true;
+    options.compression_factor = params.block_size;
+
+    cow_system_ = std::make_unique<TemporaryFile>();
+
+    unique_fd fd(cow_system_->fd);
+    cow_system_->fd = -1;
+
+    return CreateCowWriter(3, options, std::move(fd));
+}
+
 void SnapuserdTestBase::CreateCowDevice() {
     unique_fd rnd_fd;
     loff_t offset = 0;
@@ -236,6 +258,7 @@
     void SetupOrderedOpsInverted();
     void SetupCopyOverlap_1();
     void SetupCopyOverlap_2();
+    void SetupDeviceForPassthrough();
     bool Merge();
     void ValidateMerge();
     void ReadSnapshotDeviceAndValidate();
@@ -258,6 +281,9 @@
 
     void SimulateDaemonRestart();
 
+    void CreateCowDeviceWithNoBlockChanges();
+    void ValidateDeviceWithNoBlockChanges();
+
     void CreateCowDeviceOrderedOps();
     void CreateCowDeviceOrderedOpsInverted();
     void CreateCowDeviceWithCopyOverlap_1();
@@ -307,6 +333,12 @@
     ASSERT_NO_FATAL_FAILURE(SetupDaemon());
 }
 
+void SnapuserdTest::SetupDeviceForPassthrough() {
+    ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
+    ASSERT_NO_FATAL_FAILURE(CreateCowDeviceWithNoBlockChanges());
+    ASSERT_NO_FATAL_FAILURE(SetupDaemon());
+}
+
 void SnapuserdTest::SetupOrderedOpsInverted() {
     ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
     ASSERT_NO_FATAL_FAILURE(CreateCowDeviceOrderedOpsInverted());
@@ -480,6 +512,47 @@
     }
 }
 
+void SnapuserdTest::CreateCowDeviceWithNoBlockChanges() {
+    auto writer = CreateCowDeviceInternal();
+    ASSERT_NE(writer, nullptr);
+
+    std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(BLOCK_SZ);
+    std::memset(buffer.get(), 'A', BLOCK_SZ);
+
+    // This test focusses on not changing all the blocks thereby validating
+    // the pass-through I/O
+
+    // Replace the first block
+    ASSERT_TRUE(writer->AddRawBlocks(1, buffer.get(), BLOCK_SZ));
+
+    // Set zero block of Block 3
+    ASSERT_TRUE(writer->AddZeroBlocks(3, 1));
+
+    ASSERT_TRUE(writer->Finalize());
+    orig_buffer_ = std::make_unique<uint8_t[]>(total_base_size_);
+
+    // Read the entire base device
+    ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, orig_buffer_.get(), total_base_size_, 0),
+              true);
+
+    off_t offset = BLOCK_SZ;
+    std::memcpy(orig_buffer_.get() + offset, buffer.get(), BLOCK_SZ);
+    offset = 3 * BLOCK_SZ;
+    std::memset(orig_buffer_.get() + offset, 0, BLOCK_SZ);
+}
+
+void SnapuserdTest::ValidateDeviceWithNoBlockChanges() {
+    unique_fd fd(open(dmuser_dev_->GetPath().c_str(), O_RDONLY));
+    ASSERT_GE(fd, 0);
+    std::unique_ptr<uint8_t[]> snapshot_buffer = std::make_unique<uint8_t[]>(size_);
+    std::memset(snapshot_buffer.get(), 'B', size_);
+
+    // All the I/O request should be a pass through to base device except for
+    // Block 1 and Block 3.
+    ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), size_, 0), true);
+    ASSERT_EQ(memcmp(snapshot_buffer.get(), orig_buffer_.get(), size_), 0);
+}
+
 void SnapuserdTest::CreateCowDeviceWithCopyOverlap_1() {
     auto writer = CreateCowDeviceInternal();
     ASSERT_NE(writer, nullptr);
@@ -781,6 +854,20 @@
     ASSERT_TRUE(Merge());
 }
 
+TEST_P(SnapuserdTest, Snapshot_Passthrough) {
+    if (!harness_->HasUserDevice()) {
+        GTEST_SKIP() << "Skipping snapshot read; not supported";
+    }
+    ASSERT_NO_FATAL_FAILURE(SetupDeviceForPassthrough());
+    // I/O before merge
+    ASSERT_NO_FATAL_FAILURE(ValidateDeviceWithNoBlockChanges());
+    ASSERT_TRUE(Merge());
+    ValidateMerge();
+    // I/O after merge - daemon should read directly
+    // from base device
+    ASSERT_NO_FATAL_FAILURE(ValidateDeviceWithNoBlockChanges());
+}
+
 TEST_P(SnapuserdTest, Snapshot_IO_TEST) {
     if (!harness_->HasUserDevice()) {
         GTEST_SKIP() << "Skipping snapshot read; not supported";
@@ -853,7 +940,7 @@
         GTEST_SKIP() << "Skipping snapshot read; not supported";
     }
     ASSERT_NO_FATAL_FAILURE(SetupCopyOverlap_2());
-    ASSERT_NO_FATAL_FAILURE(MergeInterruptAndValidate(2));
+    ASSERT_NO_FATAL_FAILURE(MergeInterruptFixed(300));
     ValidateMerge();
 }
 
@@ -881,11 +968,243 @@
     ValidateMerge();
 }
 
+class SnapuserdVariableBlockSizeTest : public SnapuserdTest {
+  public:
+    void SetupCowV3ForVariableBlockSize();
+    void ReadSnapshotWithVariableBlockSize();
+
+  protected:
+    void SetUp() override;
+    void TearDown() override;
+
+    void CreateV3CowDeviceForVariableBlockSize();
+};
+
+void SnapuserdVariableBlockSizeTest::SetupCowV3ForVariableBlockSize() {
+    ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
+    ASSERT_NO_FATAL_FAILURE(CreateV3CowDeviceForVariableBlockSize());
+    ASSERT_NO_FATAL_FAILURE(SetupDaemon());
+}
+
+void SnapuserdVariableBlockSizeTest::CreateV3CowDeviceForVariableBlockSize() {
+    auto writer = CreateV3Cow();
+
+    size_t total_data_to_write = size_;
+
+    size_t total_blocks_to_write = total_data_to_write / BLOCK_SZ;
+    size_t num_blocks_per_op = total_blocks_to_write / 4;
+    size_t source_block = 0;
+
+    size_t seq_len = num_blocks_per_op;
+    uint32_t sequence[seq_len];
+    size_t xor_block_start = seq_len * 3;
+    for (size_t i = 0; i < seq_len; i++) {
+        sequence[i] = xor_block_start + i;
+    }
+    ASSERT_TRUE(writer->AddSequenceData(seq_len, sequence));
+
+    size_t total_replace_blocks = num_blocks_per_op;
+    // Write some data which can be compressed
+    std::string data;
+    data.resize(total_replace_blocks * BLOCK_SZ, '\0');
+    for (size_t i = 0; i < data.size(); i++) {
+        data[i] = static_cast<char>('A' + i / BLOCK_SZ);
+    }
+    // REPLACE ops
+    ASSERT_TRUE(writer->AddRawBlocks(source_block, data.data(), data.size()));
+
+    total_blocks_to_write -= total_replace_blocks;
+    source_block = source_block + total_replace_blocks;
+
+    // ZERO ops
+    size_t total_zero_blocks = total_blocks_to_write / 3;
+    ASSERT_TRUE(writer->AddZeroBlocks(source_block, total_zero_blocks));
+
+    total_blocks_to_write -= total_zero_blocks;
+    source_block = source_block + total_zero_blocks;
+
+    // Generate some random data wherein few blocks cannot be compressed.
+    // This is to test the I/O path for those blocks which aren't compressed.
+    size_t total_random_data_blocks = total_blocks_to_write / 2;
+    unique_fd rnd_fd(open("/dev/random", O_RDONLY));
+
+    ASSERT_GE(rnd_fd, 0);
+    std::string random_buffer;
+    random_buffer.resize(total_random_data_blocks * BLOCK_SZ, '\0');
+    ASSERT_EQ(
+            android::base::ReadFullyAtOffset(rnd_fd, random_buffer.data(), random_buffer.size(), 0),
+            true);
+    // REPLACE ops
+    ASSERT_TRUE(writer->AddRawBlocks(source_block, random_buffer.data(), random_buffer.size()));
+
+    total_blocks_to_write -= total_random_data_blocks;
+    source_block = source_block + total_random_data_blocks;
+
+    // XOR ops will always be 4k blocks
+    std::string xor_buffer;
+    xor_buffer.resize(total_blocks_to_write * BLOCK_SZ, '\0');
+    for (size_t i = 0; i < xor_buffer.size(); i++) {
+        xor_buffer[i] = static_cast<char>('C' + i / BLOCK_SZ);
+    }
+    size_t xor_offset = 21;
+    std::string source_buffer;
+    source_buffer.resize(total_blocks_to_write * BLOCK_SZ, '\0');
+    ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, source_buffer.data(), source_buffer.size(),
+                                               size_ + xor_offset),
+              true);
+    for (size_t i = 0; i < xor_buffer.size(); i++) {
+        xor_buffer[i] ^= source_buffer[i];
+    }
+
+    ASSERT_EQ(xor_block_start, source_block);
+
+    ASSERT_TRUE(writer->AddXorBlocks(source_block, xor_buffer.data(), xor_buffer.size(),
+                                     (size_ / BLOCK_SZ), xor_offset));
+    // Flush operations
+    ASSERT_TRUE(writer->Finalize());
+
+    // Construct the buffer required for validation
+    orig_buffer_ = std::make_unique<uint8_t[]>(total_base_size_);
+
+    // Read the entire base device
+    ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, orig_buffer_.get(), total_base_size_, 0),
+              true);
+
+    // REPLACE ops which are compressed
+    std::memcpy(orig_buffer_.get(), data.data(), data.size());
+    size_t offset = data.size();
+
+    // ZERO ops
+    std::string zero_buffer(total_zero_blocks * BLOCK_SZ, 0);
+    std::memcpy((char*)orig_buffer_.get() + offset, (void*)zero_buffer.c_str(), zero_buffer.size());
+    offset += zero_buffer.size();
+
+    // REPLACE ops - Random buffers which aren't compressed
+    std::memcpy((char*)orig_buffer_.get() + offset, random_buffer.c_str(), random_buffer.size());
+    offset += random_buffer.size();
+
+    // XOR Ops which default to 4k block size compression irrespective of
+    // compression factor
+    ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, (char*)orig_buffer_.get() + offset,
+                                               xor_buffer.size(), size_ + xor_offset),
+              true);
+    for (size_t i = 0; i < xor_buffer.size(); i++) {
+        orig_buffer_.get()[offset + i] = (uint8_t)(orig_buffer_.get()[offset + i] ^ xor_buffer[i]);
+    }
+}
+
+void SnapuserdVariableBlockSizeTest::ReadSnapshotWithVariableBlockSize() {
+    unique_fd fd(open(dmuser_dev_->GetPath().c_str(), O_RDONLY | O_DIRECT));
+    ASSERT_GE(fd, 0);
+
+    void* addr;
+    ssize_t page_size = getpagesize();
+    ASSERT_EQ(posix_memalign(&addr, page_size, size_), 0);
+    std::unique_ptr<void, decltype(&::free)> snapshot_buffer(addr, ::free);
+
+    const TestParam params = GetParam();
+
+    // Issue I/O request with various block sizes
+    size_t num_blocks = size_ / params.block_size;
+    off_t offset = 0;
+    for (size_t i = 0; i < num_blocks; i++) {
+        ASSERT_EQ(ReadFullyAtOffset(fd, (char*)snapshot_buffer.get() + offset, params.block_size,
+                                    offset),
+                  true);
+        offset += params.block_size;
+    }
+    // Validate buffer
+    ASSERT_EQ(memcmp(snapshot_buffer.get(), orig_buffer_.get(), size_), 0);
+
+    // Reset the buffer
+    std::memset(snapshot_buffer.get(), 0, size_);
+
+    // Read one full chunk in a single shot and re-validate.
+    ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), size_, 0), true);
+    ASSERT_EQ(memcmp(snapshot_buffer.get(), orig_buffer_.get(), size_), 0);
+
+    // Reset the buffer
+    std::memset(snapshot_buffer.get(), 0, size_);
+
+    // Buffered I/O test
+    fd.reset(open(dmuser_dev_->GetPath().c_str(), O_RDONLY));
+    ASSERT_GE(fd, 0);
+
+    // Try not to cache
+    posix_fadvise(fd.get(), 0, size_, POSIX_FADV_DONTNEED);
+
+    size_t num_blocks_per_op = (size_ / BLOCK_SZ) / 4;
+    offset = num_blocks_per_op * BLOCK_SZ;
+    size_t read_size = 1019;  // bytes
+    offset -= 111;
+
+    // Issue a un-aligned read which crosses the boundary between a REPLACE block and a ZERO
+    // block.
+    ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true);
+
+    // Validate the data
+    ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0);
+
+    offset = (num_blocks_per_op * 3) * BLOCK_SZ;
+    offset -= (BLOCK_SZ - 119);
+    read_size = 8111;
+
+    // Issue an un-aligned read which crosses the boundary between a REPLACE block of random
+    // un-compressed data and a XOR block
+    ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true);
+
+    // Validate the data
+    ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0);
+
+    // Reset the buffer
+    std::memset(snapshot_buffer.get(), 0, size_);
+
+    // Read just one byte at an odd offset which is a REPLACE op
+    offset = 19;
+    read_size = 1;
+    ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true);
+    // Validate the data
+    ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0);
+
+    // Reset the buffer
+    std::memset(snapshot_buffer.get(), 0, size_);
+
+    // Read a block which has no mapping to a COW operation. This read should be
+    // a pass-through to the underlying base device.
+    offset = size_ + 9342;
+    read_size = 30;
+    ASSERT_EQ(ReadFullyAtOffset(fd, snapshot_buffer.get(), read_size, offset), true);
+    // Validate the data
+    ASSERT_EQ(std::memcmp(snapshot_buffer.get(), (char*)orig_buffer_.get() + offset, read_size), 0);
+}
+
+void SnapuserdVariableBlockSizeTest::SetUp() {
+    ASSERT_NO_FATAL_FAILURE(SnapuserdTest::SetUp());
+}
+
+void SnapuserdVariableBlockSizeTest::TearDown() {
+    SnapuserdTest::TearDown();
+}
+
+TEST_P(SnapuserdVariableBlockSizeTest, Snapshot_Test_Variable_Block_Size) {
+    if (!harness_->HasUserDevice()) {
+        GTEST_SKIP() << "Skipping snapshot read; not supported";
+    }
+    ASSERT_NO_FATAL_FAILURE(SetupCowV3ForVariableBlockSize());
+    ASSERT_NO_FATAL_FAILURE(ReadSnapshotWithVariableBlockSize());
+    ASSERT_TRUE(StartMerge());
+    CheckMergeCompletion();
+    ValidateMerge();
+    ASSERT_NO_FATAL_FAILURE(ReadSnapshotWithVariableBlockSize());
+}
+
 class HandlerTest : public SnapuserdTestBase {
   protected:
     void SetUp() override;
     void TearDown() override;
 
+    void SetUpV2Cow();
+    void InitializeDevice();
     AssertionResult ReadSectors(sector_t sector, uint64_t size, void* buffer);
 
     TestBlockServerFactory factory_;
@@ -896,10 +1215,11 @@
     std::future<bool> handler_thread_;
 };
 
-void HandlerTest::SetUp() {
-    ASSERT_NO_FATAL_FAILURE(SnapuserdTestBase::SetUp());
-    ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
+void HandlerTest::SetUpV2Cow() {
     ASSERT_NO_FATAL_FAILURE(CreateCowDevice());
+}
+
+void HandlerTest::InitializeDevice() {
     ASSERT_NO_FATAL_FAILURE(SetDeviceControlName());
 
     opener_ = factory_.CreateTestOpener(system_device_ctrl_name_);
@@ -921,6 +1241,13 @@
     handler_thread_ = std::async(std::launch::async, &SnapshotHandler::Start, handler_.get());
 }
 
+void HandlerTest::SetUp() {
+    ASSERT_NO_FATAL_FAILURE(SnapuserdTestBase::SetUp());
+    ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
+    ASSERT_NO_FATAL_FAILURE(SetUpV2Cow());
+    ASSERT_NO_FATAL_FAILURE(InitializeDevice());
+}
+
 void HandlerTest::TearDown() {
     ASSERT_TRUE(factory_.DeleteQueue(system_device_ctrl_name_));
     ASSERT_TRUE(handler_thread_.get());
@@ -986,6 +1313,147 @@
     ASSERT_EQ(memcmp(snapuserd_buffer.get(), orig_buffer_.get(), SECTOR_SIZE), 0);
 }
 
+class HandlerTestV3 : public HandlerTest {
+  public:
+    void ReadSnapshotWithVariableBlockSize();
+
+  protected:
+    void SetUp() override;
+    void TearDown() override;
+    void SetUpV3Cow();
+};
+
+void HandlerTestV3::SetUp() {
+    ASSERT_NO_FATAL_FAILURE(SnapuserdTestBase::SetUp());
+    ASSERT_NO_FATAL_FAILURE(CreateBaseDevice());
+    ASSERT_NO_FATAL_FAILURE(SetUpV3Cow());
+    ASSERT_NO_FATAL_FAILURE(InitializeDevice());
+}
+
+void HandlerTestV3::TearDown() {
+    ASSERT_NO_FATAL_FAILURE(HandlerTest::TearDown());
+}
+
+void HandlerTestV3::SetUpV3Cow() {
+    auto writer = CreateV3Cow();
+
+    size_t total_data_to_write = size_;
+
+    size_t total_blocks_to_write = total_data_to_write / BLOCK_SZ;
+    size_t num_blocks_per_op = total_blocks_to_write / 4;
+    size_t source_block = 0;
+
+    size_t total_replace_blocks = num_blocks_per_op;
+    // Write some data which can be compressed
+    std::string data;
+    data.resize(total_replace_blocks * BLOCK_SZ, '\0');
+    for (size_t i = 0; i < data.size(); i++) {
+        data[i] = static_cast<char>('A' + i / BLOCK_SZ);
+    }
+    // REPLACE ops
+    ASSERT_TRUE(writer->AddRawBlocks(source_block, data.data(), data.size()));
+
+    total_blocks_to_write -= total_replace_blocks;
+    source_block = source_block + total_replace_blocks;
+
+    // ZERO ops
+    size_t total_zero_blocks = total_blocks_to_write / 3;
+    ASSERT_TRUE(writer->AddZeroBlocks(source_block, total_zero_blocks));
+
+    total_blocks_to_write -= total_zero_blocks;
+    source_block = source_block + total_zero_blocks;
+
+    // Generate some random data wherein few blocks cannot be compressed.
+    // This is to test the I/O path for those blocks which aren't compressed.
+    size_t total_random_data_blocks = total_blocks_to_write;
+    unique_fd rnd_fd(open("/dev/random", O_RDONLY));
+
+    ASSERT_GE(rnd_fd, 0);
+    std::string random_buffer;
+    random_buffer.resize(total_random_data_blocks * BLOCK_SZ, '\0');
+    ASSERT_EQ(
+            android::base::ReadFullyAtOffset(rnd_fd, random_buffer.data(), random_buffer.size(), 0),
+            true);
+    // REPLACE ops
+    ASSERT_TRUE(writer->AddRawBlocks(source_block, random_buffer.data(), random_buffer.size()));
+    // Flush operations
+    ASSERT_TRUE(writer->Finalize());
+
+    // Construct the buffer required for validation
+    orig_buffer_ = std::make_unique<uint8_t[]>(total_base_size_);
+
+    // Read the entire base device
+    ASSERT_EQ(android::base::ReadFullyAtOffset(base_fd_, orig_buffer_.get(), total_base_size_, 0),
+              true);
+
+    // REPLACE ops which are compressed
+    std::memcpy(orig_buffer_.get(), data.data(), data.size());
+    size_t offset = data.size();
+
+    // ZERO ops
+    std::string zero_buffer(total_zero_blocks * BLOCK_SZ, 0);
+    std::memcpy((char*)orig_buffer_.get() + offset, (void*)zero_buffer.c_str(), zero_buffer.size());
+    offset += zero_buffer.size();
+
+    // REPLACE ops - Random buffers which aren't compressed
+    std::memcpy((char*)orig_buffer_.get() + offset, random_buffer.c_str(), random_buffer.size());
+}
+
+TEST_P(HandlerTestV3, Read) {
+    std::unique_ptr<uint8_t[]> snapuserd_buffer = std::make_unique<uint8_t[]>(size_);
+
+    size_t read_size = SECTOR_SIZE;
+    off_t offset = 0;
+    // Read the first sector
+    ASSERT_TRUE(ReadSectors(1, read_size, snapuserd_buffer.get()));
+    // Validate the data
+    ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), orig_buffer_.get(), read_size), 0);
+
+    // Read the second block at offset 7680 (Sector 15). This will map to the
+    // first COW operation for variable block size
+    offset += (((BLOCK_SZ * 2) - SECTOR_SIZE));
+    read_size = BLOCK_SZ;  // Span across two REPLACE ops
+    ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get()));
+    // Validate the data
+    ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size),
+              0);
+
+    // Fill some other data since we are going to read zero blocks
+    std::memset(snapuserd_buffer.get(), 'Z', size_);
+
+    size_t num_blocks_per_op = (size_ / BLOCK_SZ) / 4;
+    offset = num_blocks_per_op * BLOCK_SZ;
+    // Issue read spanning between a REPLACE op and ZERO ops. The starting point
+    // is the last REPLACE op at sector 5118
+    offset -= (SECTOR_SIZE * 2);
+    // This will make sure it falls back to aligned reads after reading the
+    // first unaligned block
+    read_size = BLOCK_SZ * 6;
+    ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get()));
+    // Validate the data
+    ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size),
+              0);
+
+    // Issue I/O request at the last block. The first chunk of (SECTOR_SIZE * 2)
+    // will be from REPLACE op which has random buffers
+    offset = (size_ - (SECTOR_SIZE * 2));
+    // Request will span beyond the COW mapping, thereby fetching data from base
+    // device.
+    read_size = BLOCK_SZ * 8;
+    ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get()));
+    // Validate the data
+    ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size),
+              0);
+
+    // Issue I/O request which are not mapped to any COW operations
+    offset = (size_ + (SECTOR_SIZE * 3));
+    read_size = BLOCK_SZ * 3;
+    ASSERT_TRUE(ReadSectors(offset / SECTOR_SIZE, read_size, snapuserd_buffer.get()));
+    // Validate the data
+    ASSERT_EQ(std::memcmp(snapuserd_buffer.get(), (char*)orig_buffer_.get() + offset, read_size),
+              0);
+}
+
 std::vector<bool> GetIoUringConfigs() {
 #if __ANDROID__
     if (!android::base::GetBoolProperty("ro.virtual_ab.io_uring.enabled", false)) {
@@ -1018,6 +1486,37 @@
     return testParams;
 }
 
+std::vector<TestParam> GetVariableBlockTestConfigs() {
+    std::vector<TestParam> testParams;
+
+    std::vector<int> block_sizes = {4096, 8192, 16384, 32768, 65536, 131072};
+    std::vector<std::string> compression_algo = {"none", "lz4", "zstd", "gz"};
+    std::vector<int> threads = {1, 2};
+    std::vector<bool> uring_configs = GetIoUringConfigs();
+
+    // This should test 96 combination and validates the I/O path
+    for (auto block : block_sizes) {
+        for (auto compression : compression_algo) {
+            for (auto thread : threads) {
+                for (auto io_uring : uring_configs) {
+                    TestParam param;
+                    param.block_size = block;
+                    param.compression = compression;
+                    param.num_threads = thread;
+                    param.io_uring = io_uring;
+                    param.o_direct = false;
+                    testParams.push_back(std::move(param));
+                }
+            }
+        }
+    }
+
+    return testParams;
+}
+
+INSTANTIATE_TEST_SUITE_P(Io, SnapuserdVariableBlockSizeTest,
+                         ::testing::ValuesIn(GetVariableBlockTestConfigs()));
+INSTANTIATE_TEST_SUITE_P(Io, HandlerTestV3, ::testing::ValuesIn(GetVariableBlockTestConfigs()));
 INSTANTIATE_TEST_SUITE_P(Io, SnapuserdTest, ::testing::ValuesIn(GetTestConfigs()));
 INSTANTIATE_TEST_SUITE_P(Io, HandlerTest, ::testing::ValuesIn(GetTestConfigs()));