Merge "create_snapshot: Enable v3 writer + variable block size" into main am: c65b6e62cb am: 321c71426a

Original change: https://android-review.googlesource.com/c/platform/system/core/+/2954450

Change-Id: I1d7a912c530532d3b6e37ec2b4e749820c6dcd8a
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/create_cow.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/create_cow.cpp
index efb1035..5497b72 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/create_cow.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/create_cow.cpp
@@ -71,6 +71,8 @@
 
     const int kNumThreads = 6;
     const size_t kBlockSizeToRead = 1_MiB;
+    const size_t compression_factor_ = 64_KiB;
+    size_t replace_ops_ = 0, copy_ops_ = 0, zero_ops_ = 0, in_place_ops_ = 0;
 
     std::unordered_map<std::string, int> source_block_hash_;
     std::mutex source_block_hash_lock_;
@@ -81,7 +83,12 @@
     std::unique_ptr<uint8_t[]> zblock_;
 
     std::string compression_ = "lz4";
-    unique_fd fd_;
+    unique_fd cow_fd_;
+    unique_fd target_fd_;
+
+    std::vector<uint64_t> zero_blocks_;
+    std::vector<uint64_t> replace_blocks_;
+    std::unordered_map<uint64_t, uint64_t> copy_blocks_;
 
     const int BLOCK_SZ = 4_KiB;
     void SHA256(const void* data, size_t length, uint8_t out[32]);
@@ -93,7 +100,14 @@
     bool FindSourceBlockHash();
     bool PrepareParse(std::string& parsing_file, const bool createSnapshot);
     bool ParsePartition();
-    bool WriteSnapshot(const void* buffer, uint64_t block, std::string& block_hash);
+    void PrepareMergeBlock(const void* buffer, uint64_t block, std::string& block_hash);
+    bool WriteV3Snapshots();
+    size_t PrepareWrite(size_t* pending_ops, size_t start_index);
+
+    bool CreateSnapshotWriter();
+    bool WriteOrderedSnapshots();
+    bool WriteNonOrderedSnapshots();
+    bool VerifyMergeOrder();
 };
 
 void CreateSnapshotLogger(android::base::LogId, android::base::LogSeverity severity, const char*,
@@ -118,21 +132,19 @@
     create_snapshot_patch_ = createSnapshot;
 
     if (createSnapshot) {
-        fd_.reset(open(patch_file_.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666));
-        if (fd_ < 0) {
+        cow_fd_.reset(open(patch_file_.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666));
+        if (cow_fd_ < 0) {
             PLOG(ERROR) << "Failed to open the snapshot-patch file: " << patch_file_;
             return false;
         }
 
+        target_fd_.reset((open(parsing_file_.c_str(), O_RDONLY)));
+        if (target_fd_ < 0) {
+            LOG(ERROR) << "open failed: " << parsing_file_;
+            return false;
+        }
         zblock_ = std::make_unique<uint8_t[]>(BLOCK_SZ);
         std::memset(zblock_.get(), 0, BLOCK_SZ);
-
-        CowOptions options;
-        options.compression = compression_;
-        options.num_compress_threads = 2;
-        options.batch_write = true;
-        options.cluster_ops = 600;
-        writer_ = CreateCowWriter(2, options, std::move(fd_));
     }
     return true;
 }
@@ -187,19 +199,158 @@
     return out;
 }
 
-bool CreateSnapshot::WriteSnapshot(const void* buffer, uint64_t block, std::string& block_hash) {
+void CreateSnapshot::PrepareMergeBlock(const void* buffer, uint64_t block,
+                                       std::string& block_hash) {
     if (std::memcmp(zblock_.get(), buffer, BLOCK_SZ) == 0) {
         std::lock_guard<std::mutex> lock(write_lock_);
-        return writer_->AddZeroBlocks(block, 1);
+        zero_blocks_.push_back(block);
+        return;
     }
 
     auto iter = source_block_hash_.find(block_hash);
     if (iter != source_block_hash_.end()) {
         std::lock_guard<std::mutex> lock(write_lock_);
-        return writer_->AddCopy(block, iter->second, 1);
+        // In-place copy is skipped
+        if (block != iter->second) {
+            copy_blocks_[block] = iter->second;
+        } else {
+            in_place_ops_ += 1;
+        }
+        return;
     }
     std::lock_guard<std::mutex> lock(write_lock_);
-    return writer_->AddRawBlocks(block, buffer, BLOCK_SZ);
+    replace_blocks_.push_back(block);
+}
+
+size_t CreateSnapshot::PrepareWrite(size_t* pending_ops, size_t start_index) {
+    size_t num_ops = *pending_ops;
+    uint64_t start_block = replace_blocks_[start_index];
+    size_t nr_consecutive = 1;
+    num_ops -= 1;
+    while (num_ops) {
+        uint64_t next_block = replace_blocks_[start_index + nr_consecutive];
+        if (next_block != start_block + nr_consecutive) {
+            break;
+        }
+        nr_consecutive += 1;
+        num_ops -= 1;
+    }
+    return nr_consecutive;
+}
+
+bool CreateSnapshot::CreateSnapshotWriter() {
+    uint64_t dev_sz = lseek(target_fd_.get(), 0, SEEK_END);
+    CowOptions options;
+    options.compression = compression_;
+    options.num_compress_threads = 2;
+    options.batch_write = true;
+    options.cluster_ops = 600;
+    options.compression_factor = compression_factor_;
+    options.max_blocks = {dev_sz / options.block_size};
+    writer_ = CreateCowWriter(3, options, std::move(cow_fd_));
+    return true;
+}
+
+bool CreateSnapshot::WriteNonOrderedSnapshots() {
+    zero_ops_ = zero_blocks_.size();
+    for (auto it = zero_blocks_.begin(); it != zero_blocks_.end(); it++) {
+        if (!writer_->AddZeroBlocks(*it, 1)) {
+            return false;
+        }
+    }
+    std::string buffer(compression_factor_, '\0');
+
+    replace_ops_ = replace_blocks_.size();
+    size_t blocks_to_compress = replace_blocks_.size();
+    size_t num_ops = 0;
+    size_t block_index = 0;
+    while (blocks_to_compress) {
+        num_ops = std::min((compression_factor_ / BLOCK_SZ), blocks_to_compress);
+        auto linear_blocks = PrepareWrite(&num_ops, block_index);
+        if (!android::base::ReadFullyAtOffset(target_fd_.get(), buffer.data(),
+                                              (linear_blocks * BLOCK_SZ),
+                                              replace_blocks_[block_index] * BLOCK_SZ)) {
+            LOG(ERROR) << "Failed to read at offset: " << replace_blocks_[block_index] * BLOCK_SZ
+                       << " size: " << linear_blocks * BLOCK_SZ;
+            return false;
+        }
+        if (!writer_->AddRawBlocks(replace_blocks_[block_index], buffer.data(),
+                                   linear_blocks * BLOCK_SZ)) {
+            LOG(ERROR) << "AddRawBlocks failed";
+            return false;
+        }
+
+        block_index += linear_blocks;
+        blocks_to_compress -= linear_blocks;
+    }
+    if (!writer_->Finalize()) {
+        return false;
+    }
+    return true;
+}
+
+bool CreateSnapshot::WriteOrderedSnapshots() {
+    std::unordered_map<uint64_t, uint64_t> overwritten_blocks;
+    std::vector<std::pair<uint64_t, uint64_t>> merge_sequence;
+    for (auto it = copy_blocks_.begin(); it != copy_blocks_.end(); it++) {
+        if (overwritten_blocks.count(it->second)) {
+            replace_blocks_.push_back(it->first);
+            continue;
+        }
+        overwritten_blocks[it->first] = it->second;
+        merge_sequence.emplace_back(std::make_pair(it->first, it->second));
+    }
+    // Sort the blocks so that if the blocks are contiguous, it would help
+    // compress multiple blocks in one shot based on the compression factor.
+    std::sort(replace_blocks_.begin(), replace_blocks_.end());
+
+    copy_ops_ = merge_sequence.size();
+    for (auto it = merge_sequence.begin(); it != merge_sequence.end(); it++) {
+        if (!writer_->AddCopy(it->first, it->second, 1)) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+bool CreateSnapshot::VerifyMergeOrder() {
+    unique_fd read_fd;
+    read_fd.reset(open(patch_file_.c_str(), O_RDONLY));
+    if (read_fd < 0) {
+        PLOG(ERROR) << "Failed to open the snapshot-patch file: " << patch_file_;
+        return false;
+    }
+    CowReader reader;
+    if (!reader.Parse(read_fd)) {
+        LOG(ERROR) << "Parse failed";
+        return false;
+    }
+
+    if (!reader.VerifyMergeOps()) {
+        LOG(ERROR) << "MergeOps Order is wrong";
+        return false;
+    }
+    return true;
+}
+
+bool CreateSnapshot::WriteV3Snapshots() {
+    if (!CreateSnapshotWriter()) {
+        return false;
+    }
+    if (!WriteOrderedSnapshots()) {
+        return false;
+    }
+    if (!WriteNonOrderedSnapshots()) {
+        return false;
+    }
+    if (!VerifyMergeOrder()) {
+        return false;
+    }
+
+    LOG(INFO) << "In-place: " << in_place_ops_ << " Zero: " << zero_ops_
+              << " Replace: " << replace_ops_ << " copy: " << copy_ops_;
+    return true;
 }
 
 bool CreateSnapshot::ReadBlocks(off_t offset, const int skip_blocks, const uint64_t dev_sz) {
@@ -241,10 +392,7 @@
             std::string hash = ToHexString(checksum, sizeof(checksum));
 
             if (create_snapshot_patch_) {
-                if (!WriteSnapshot(bufptr, blkindex, hash)) {
-                    LOG(ERROR) << "WriteSnapshot failed for block: " << blkindex;
-                    return false;
-                }
+                PrepareMergeBlock(bufptr, blkindex, hash);
             } else {
                 std::lock_guard<std::mutex> lock(source_block_hash_lock_);
                 {
@@ -306,8 +454,8 @@
         ret = t.get() && ret;
     }
 
-    if (ret && create_snapshot_patch_ && !writer_->Finalize()) {
-        LOG(ERROR) << "Finzalize failed";
+    if (ret && create_snapshot_patch_ && !WriteV3Snapshots()) {
+        LOG(ERROR) << "Snapshot Write failed";
         return false;
     }