Merge "libsnapshot: stride compression" into main am: 4edb9c0088 am: f8dc763dd6

Original change: https://android-review.googlesource.com/c/platform/system/core/+/2958927

Change-Id: Ief2deb7ea1d29c9d53ee808ed663be3db67e1c34
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
index 30c5135..de2e528 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
@@ -603,41 +603,47 @@
 std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreadedCompression(
         const size_t num_blocks, const void* data, CowOperationType type) {
     const size_t num_threads = num_compress_threads_;
-    const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
     const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
 
+    // We will alternate which thread to send compress work to. E.g. alternate between T1 and T2
+    // until all blocks are processed
     std::vector<CompressedBuffer> compressed_vec;
-    // Submit the blocks per thread. The retrieval of
-    // compressed buffers has to be done in the same order.
-    // We should not poll for completed buffers in a different order as the
-    // buffers are tightly coupled with block ordering.
-    for (size_t i = 0; i < num_threads; i++) {
-        CompressWorker* worker = compress_threads_[i].get();
-        auto blocks_in_batch = std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
-        // Enqueue the blocks to be compressed for each thread.
-        while (blocks_in_batch) {
-            CompressedBuffer buffer;
+    int iteration = 0;
+    int blocks_to_compress = static_cast<int>(num_blocks);
+    while (blocks_to_compress) {
+        CompressedBuffer buffer;
+        CompressWorker* worker = compress_threads_[iteration % num_threads].get();
 
-            const size_t compression_factor = GetCompressionFactor(blocks_in_batch, type);
-            size_t num_blocks = compression_factor / header_.block_size;
+        const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
+        size_t num_blocks = compression_factor / header_.block_size;
 
-            buffer.compression_factor = compression_factor;
-            worker->EnqueueCompressBlocks(iter, compression_factor, 1);
-            compressed_vec.push_back(std::move(buffer));
-            blocks_in_batch -= num_blocks;
-            iter += compression_factor;
-        }
+        worker->EnqueueCompressBlocks(iter, compression_factor, 1);
+        buffer.compression_factor = compression_factor;
+        compressed_vec.push_back(std::move(buffer));
+
+        iteration++;
+        iter += compression_factor;
+        blocks_to_compress -= num_blocks;
     }
 
-    // Fetch compressed buffers from the threads
     std::vector<std::vector<uint8_t>> compressed_buf;
+    std::vector<std::vector<std::vector<uint8_t>>> worker_buffers(num_threads);
     compressed_buf.clear();
     for (size_t i = 0; i < num_threads; i++) {
         CompressWorker* worker = compress_threads_[i].get();
-        if (!worker->GetCompressedBuffers(&compressed_buf)) {
+        if (!worker->GetCompressedBuffers(&worker_buffers[i])) {
             return {};
         }
     }
+    // compressed_vec | CB 1 | CB 2 | CB 3 | CB 4 | <-compressed buffers
+    //                   t1     t2     t1     t2    <- processed by these threads
+    // Ordering is important here. We need to retrieve the compressed data in the same order we
+    // processed it and assume that that we submit data beginning with the first thread and then
+    // round robin the consecutive data calls. We need to Fetch compressed buffers from the threads
+    // via the same ordering
+    for (size_t i = 0; i < compressed_vec.size(); i++) {
+        compressed_buf.emplace_back(worker_buffers[i % num_threads][i / num_threads]);
+    }
 
     if (compressed_vec.size() != compressed_buf.size()) {
         LOG(ERROR) << "Compressed buffer size: " << compressed_buf.size()