Support multi-threaded compression in COW v3

Performance of COW v3 is now on par with v2 in both multi-threaded and
single threaded configurations. Note, v2 cow writer can cache up to 1024
blocks in memory if multi-threaded compression is enabled(even though
batch size is configured as 200). For a fair comparison, benchmarks are
ran with batch size of 256. For batch size of 256 or greater, v2 and v3
have similar multi-threaded performance.

Test: th
Bug: 313962438
Change-Id: I377c8291689a7a038bb00b09d7371a155e6972e9
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
index e736847..be6b6da 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
@@ -34,6 +34,7 @@
 #include <zlib.h>
 
 #include <fcntl.h>
+#include <libsnapshot/cow_compress.h>
 #include <libsnapshot_cow/parser_v3.h>
 #include <linux/fs.h>
 #include <sys/ioctl.h>
@@ -55,11 +56,35 @@
 
 using android::base::unique_fd;
 
+// Divide |x| by |y| and round up to the nearest integer.
+constexpr uint64_t DivRoundUp(uint64_t x, uint64_t y) {
+    return (x + y - 1) / y;
+}
+
 CowWriterV3::CowWriterV3(const CowOptions& options, unique_fd&& fd)
     : CowWriterBase(options, std::move(fd)), batch_size_(std::max<size_t>(options.cluster_ops, 1)) {
     SetupHeaders();
 }
 
+void CowWriterV3::InitWorkers() {
+    if (num_compress_threads_ <= 1) {
+        LOG_INFO << "Not creating new threads for compression.";
+        return;
+    }
+    compress_threads_.reserve(num_compress_threads_);
+    compress_threads_.clear();
+    threads_.reserve(num_compress_threads_);
+    threads_.clear();
+    for (size_t i = 0; i < num_compress_threads_; i++) {
+        std::unique_ptr<ICompressor> compressor =
+                ICompressor::Create(compression_, header_.block_size);
+        auto&& wt = compress_threads_.emplace_back(
+                std::make_unique<CompressWorker>(std::move(compressor), header_.block_size));
+        threads_.emplace_back(std::thread([wt = wt.get()]() { wt->RunThread(); }));
+    }
+    LOG(INFO) << num_compress_threads_ << " thread used for compression";
+}
+
 void CowWriterV3::SetupHeaders() {
     header_ = {};
     header_.prefix.magic = kCowMagicNumber;
@@ -135,10 +160,24 @@
     } else {
         LOG(INFO) << "Batch writes: disabled";
     }
+    if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false) &&
+        options_.num_compress_threads) {
+        num_compress_threads_ = options_.num_compress_threads;
+    }
+    InitWorkers();
     return true;
 }
 
-CowWriterV3::~CowWriterV3() {}
+CowWriterV3::~CowWriterV3() {
+    for (const auto& t : compress_threads_) {
+        t->Finalize();
+    }
+    for (auto& t : threads_) {
+        if (t.joinable()) {
+            t.join();
+        }
+    }
+}
 
 bool CowWriterV3::Initialize(std::optional<uint64_t> label) {
     if (!InitFd() || !ParseOptions()) {
@@ -289,19 +328,24 @@
                    << " but compressor is uninitialized.";
         return false;
     }
+    const auto bytes = reinterpret_cast<const uint8_t*>(data);
     const size_t num_blocks = (size / header_.block_size);
 
     for (size_t i = 0; i < num_blocks;) {
         const auto blocks_to_write =
                 std::min<size_t>(batch_size_ - cached_data_.size(), num_blocks - i);
         size_t compressed_bytes = 0;
+        auto&& blocks = CompressBlocks(blocks_to_write, bytes + header_.block_size * i);
+        if (blocks.size() != blocks_to_write) {
+            LOG(ERROR) << "Failed to compress blocks " << new_block_start + i << ", "
+                       << blocks_to_write << ", actual number of blocks received from compressor "
+                       << blocks.size();
+            return false;
+        }
         for (size_t j = 0; j < blocks_to_write; j++) {
-            const uint8_t* const iter =
-                    reinterpret_cast<const uint8_t*>(data) + (header_.block_size * (i + j));
-
             CowOperation& op = cached_ops_.emplace_back();
             auto& vec = data_vec_.emplace_back();
-            auto& compressed_data = cached_data_.emplace_back();
+            auto& compressed_data = cached_data_.emplace_back(std::move(blocks[j]));
             op.new_block = new_block_start + i + j;
 
             op.set_type(type);
@@ -310,20 +354,6 @@
             } else {
                 op.set_source(next_data_pos_ + compressed_bytes);
             }
-            if (compression_.algorithm == kCowCompressNone) {
-                compressed_data.resize(header_.block_size);
-            } else {
-                compressed_data = compressor_->Compress(iter, header_.block_size);
-                if (compressed_data.empty()) {
-                    LOG(ERROR) << "Compression failed during EmitBlocks(" << new_block_start << ", "
-                               << num_blocks << ");";
-                    return false;
-                }
-            }
-            if (compressed_data.size() >= header_.block_size) {
-                compressed_data.resize(header_.block_size);
-                std::memcpy(compressed_data.data(), iter, header_.block_size);
-            }
             vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
             op.data_length = vec.iov_len;
             compressed_bytes += op.data_length;
@@ -443,6 +473,57 @@
     return true;
 }
 
+std::vector<std::basic_string<uint8_t>> CowWriterV3::CompressBlocks(const size_t num_blocks,
+                                                                    const void* data) {
+    const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
+    const size_t blocks_per_thread = DivRoundUp(num_blocks, num_threads);
+    std::vector<std::basic_string<uint8_t>> compressed_buf;
+    compressed_buf.clear();
+    const uint8_t* const iter = reinterpret_cast<const uint8_t*>(data);
+    if (compression_.algorithm == kCowCompressNone) {
+        for (size_t i = 0; i < num_blocks; i++) {
+            auto& buf = compressed_buf.emplace_back();
+            buf.resize(header_.block_size);
+            std::memcpy(buf.data(), iter + i * header_.block_size, header_.block_size);
+        }
+        return compressed_buf;
+    }
+    if (num_threads <= 1) {
+        if (!CompressWorker::CompressBlocks(compressor_.get(), header_.block_size, data, num_blocks,
+                                            &compressed_buf)) {
+            return {};
+        }
+    } else {
+        // Submit the blocks per thread. The retrieval of
+        // compressed buffers has to be done in the same order.
+        // We should not poll for completed buffers in a different order as the
+        // buffers are tightly coupled with block ordering.
+        for (size_t i = 0; i < num_threads; i++) {
+            CompressWorker* worker = compress_threads_[i].get();
+            const auto blocks_in_batch =
+                    std::min(num_blocks - i * blocks_per_thread, blocks_per_thread);
+            worker->EnqueueCompressBlocks(iter + i * blocks_per_thread * header_.block_size,
+                                          blocks_in_batch);
+        }
+
+        for (size_t i = 0; i < num_threads; i++) {
+            CompressWorker* worker = compress_threads_[i].get();
+            if (!worker->GetCompressedBuffers(&compressed_buf)) {
+                return {};
+            }
+        }
+    }
+    for (size_t i = 0; i < num_blocks; i++) {
+        auto& block = compressed_buf[i];
+        if (block.size() >= header_.block_size) {
+            block.resize(header_.block_size);
+            std::memcpy(block.data(), iter + header_.block_size * i, header_.block_size);
+        }
+    }
+
+    return compressed_buf;
+}
+
 bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
                                  std::basic_string_view<struct iovec> data) {
     const auto total_data_size =
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
index 73ac520..b19af60 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
@@ -16,6 +16,7 @@
 
 #include <android-base/logging.h>
 #include <string_view>
+#include <thread>
 #include <vector>
 
 #include "writer_base.h"
@@ -51,12 +52,14 @@
                         std::basic_string_view<struct iovec> data);
     bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
                     uint16_t offset, CowOperationType type);
-    bool CompressBlocks(size_t num_blocks, const void* data);
     bool CheckOpCount(size_t op_count);
 
   private:
+    std::vector<std::basic_string<uint8_t>> CompressBlocks(const size_t num_blocks,
+                                                           const void* data);
     bool ReadBackVerification();
     bool FlushCacheOps();
+    void InitWorkers();
     CowHeaderV3 header_{};
     CowCompression compression_;
     // in the case that we are using one thread for compression, we can store and re-use the same
@@ -75,6 +78,8 @@
     std::vector<CowOperationV3> cached_ops_;
     std::vector<std::basic_string<uint8_t>> cached_data_;
     std::vector<struct iovec> data_vec_;
+
+    std::vector<std::thread> threads_;
 };
 
 }  // namespace snapshot