Merge changes from topic "libsnapshot-batch-writes"

* changes:
  libsnapshot: Test batch writes and threaded compression
  libsnapshot: Batch write COW operations in a cluster
  libsnapshot: Use two threads to run compression
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
index a9682a1..798bc73 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
@@ -16,10 +16,17 @@
 
 #include <stdint.h>
 
+#include <condition_variable>
 #include <cstdint>
+#include <future>
 #include <memory>
+#include <mutex>
 #include <optional>
+#include <queue>
 #include <string>
+#include <thread>
+#include <utility>
+#include <vector>
 
 #include <android-base/unique_fd.h>
 #include <libsnapshot/cow_format.h>
@@ -42,6 +49,12 @@
 
     // Preset the number of merged ops. Only useful for testing.
     uint64_t num_merge_ops = 0;
+
+    // Number of threads for compression
+    int num_compress_threads = 0;
+
+    // Batch write cluster ops
+    bool batch_write = false;
 };
 
 // Interface for writing to a snapuserd COW. All operations are ordered; merges
@@ -100,9 +113,40 @@
     CowOptions options_;
 };
 
+class CompressWorker {
+  public:
+    CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size);
+    bool RunThread();
+    void EnqueueCompressBlocks(const void* buffer, size_t num_blocks);
+    bool GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf);
+    void Finalize();
+
+  private:
+    struct CompressWork {
+        const void* buffer;
+        size_t num_blocks;
+        bool compression_status = false;
+        std::vector<std::basic_string<uint8_t>> compressed_data;
+    };
+
+    CowCompressionAlgorithm compression_;
+    uint32_t block_size_;
+
+    std::queue<CompressWork> work_queue_;
+    std::queue<CompressWork> compressed_queue_;
+    std::mutex lock_;
+    std::condition_variable cv_;
+    bool stopped_ = false;
+
+    std::basic_string<uint8_t> Compress(const void* data, size_t length);
+    bool CompressBlocks(const void* buffer, size_t num_blocks,
+                        std::vector<std::basic_string<uint8_t>>* compressed_data);
+};
+
 class CowWriter : public ICowWriter {
   public:
     explicit CowWriter(const CowOptions& options);
+    ~CowWriter();
 
     // Set up the writer.
     // The file starts from the beginning.
@@ -138,6 +182,7 @@
     bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
                     uint16_t offset, uint8_t type);
     void SetupHeaders();
+    void SetupWriteOptions();
     bool ParseOptions();
     bool OpenForWrite();
     bool OpenForAppend(uint64_t label);
@@ -145,9 +190,12 @@
     bool WriteRawData(const void* data, size_t size);
     bool WriteOperation(const CowOperation& op, const void* data = nullptr, size_t size = 0);
     void AddOperation(const CowOperation& op);
-    std::basic_string<uint8_t> Compress(const void* data, size_t length);
     void InitPos();
+    void InitBatchWrites();
+    void InitWorkers();
+    bool FlushCluster();
 
+    bool CompressBlocks(size_t num_blocks, const void* data);
     bool SetFd(android::base::borrowed_fd fd);
     bool Sync();
     bool Truncate(off_t length);
@@ -159,8 +207,11 @@
     CowHeader header_{};
     CowFooter footer_{};
     CowCompressionAlgorithm compression_ = kCowCompressNone;
+    uint64_t current_op_pos_ = 0;
     uint64_t next_op_pos_ = 0;
     uint64_t next_data_pos_ = 0;
+    uint64_t current_data_pos_ = 0;
+    ssize_t total_data_written_ = 0;
     uint32_t cluster_size_ = 0;
     uint32_t current_cluster_size_ = 0;
     uint64_t current_data_size_ = 0;
@@ -168,6 +219,21 @@
     bool merge_in_progress_ = false;
     bool is_block_device_ = false;
     uint64_t cow_image_size_ = INT64_MAX;
+
+    int num_compress_threads_ = 1;
+    std::vector<std::unique_ptr<CompressWorker>> compress_threads_;
+    std::vector<std::future<bool>> threads_;
+    std::vector<std::basic_string<uint8_t>> compressed_buf_;
+    std::vector<std::basic_string<uint8_t>>::iterator buf_iter_;
+
+    std::vector<std::unique_ptr<CowOperation>> opbuffer_vec_;
+    std::vector<std::unique_ptr<uint8_t[]>> databuffer_vec_;
+    std::unique_ptr<struct iovec[]> cowop_vec_;
+    int op_vec_index_ = 0;
+
+    std::unique_ptr<struct iovec[]> data_vec_;
+    int data_vec_index_ = 0;
+    bool batch_write_ = false;
 };
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
index 2c1187f..862ce55 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_api_test.cpp
@@ -298,6 +298,150 @@
     ASSERT_TRUE(iter->Done());
 }
 
+class CompressionRWTest : public CowTest, public testing::WithParamInterface<const char*> {};
+
+TEST_P(CompressionRWTest, ThreadedBatchWrites) {
+    CowOptions options;
+    options.compression = GetParam();
+    options.num_compress_threads = 2;
+
+    CowWriter writer(options);
+
+    ASSERT_TRUE(writer.Initialize(cow_->fd));
+
+    std::string xor_data = "This is test data-1. Testing xor";
+    xor_data.resize(options.block_size, '\0');
+    ASSERT_TRUE(writer.AddXorBlocks(50, xor_data.data(), xor_data.size(), 24, 10));
+
+    std::string data = "This is test data-2. Testing replace ops";
+    data.resize(options.block_size * 2048, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(100, data.data(), data.size()));
+
+    std::string data2 = "This is test data-3. Testing replace ops";
+    data2.resize(options.block_size * 259, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(6000, data2.data(), data2.size()));
+
+    std::string data3 = "This is test data-4. Testing replace ops";
+    data3.resize(options.block_size, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(9000, data3.data(), data3.size()));
+
+    ASSERT_TRUE(writer.Finalize());
+
+    int expected_blocks = (1 + 2048 + 259 + 1);
+    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+
+    auto iter = reader.GetOpIter();
+    ASSERT_NE(iter, nullptr);
+
+    int total_blocks = 0;
+    while (!iter->Done()) {
+        auto op = &iter->Get();
+
+        if (op->type == kCowXorOp) {
+            total_blocks += 1;
+            StringSink sink;
+            ASSERT_EQ(op->new_block, 50);
+            ASSERT_EQ(op->source, 98314);  // 4096 * 24 + 10
+            ASSERT_TRUE(reader.ReadData(*op, &sink));
+            ASSERT_EQ(sink.stream(), xor_data);
+        }
+
+        if (op->type == kCowReplaceOp) {
+            total_blocks += 1;
+            if (op->new_block == 100) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                data.resize(options.block_size);
+                ASSERT_EQ(sink.stream(), data);
+            }
+            if (op->new_block == 6000) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                data2.resize(options.block_size);
+                ASSERT_EQ(sink.stream(), data2);
+            }
+            if (op->new_block == 9000) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                ASSERT_EQ(sink.stream(), data3);
+            }
+        }
+
+        iter->Next();
+    }
+
+    ASSERT_EQ(total_blocks, expected_blocks);
+}
+
+TEST_P(CompressionRWTest, NoBatchWrites) {
+    CowOptions options;
+    options.compression = GetParam();
+    options.num_compress_threads = 1;
+    options.cluster_ops = 0;
+
+    CowWriter writer(options);
+
+    ASSERT_TRUE(writer.Initialize(cow_->fd));
+
+    std::string data = "Testing replace ops without batch writes";
+    data.resize(options.block_size * 1024, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(50, data.data(), data.size()));
+
+    std::string data2 = "Testing odd blocks without batch writes";
+    data2.resize(options.block_size * 111, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(3000, data2.data(), data2.size()));
+
+    std::string data3 = "Testing single 4k block";
+    data3.resize(options.block_size, '\0');
+    ASSERT_TRUE(writer.AddRawBlocks(5000, data3.data(), data3.size()));
+
+    ASSERT_TRUE(writer.Finalize());
+
+    int expected_blocks = (1024 + 111 + 1);
+    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+
+    auto iter = reader.GetOpIter();
+    ASSERT_NE(iter, nullptr);
+
+    int total_blocks = 0;
+    while (!iter->Done()) {
+        auto op = &iter->Get();
+
+        if (op->type == kCowReplaceOp) {
+            total_blocks += 1;
+            if (op->new_block == 50) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                data.resize(options.block_size);
+                ASSERT_EQ(sink.stream(), data);
+            }
+            if (op->new_block == 3000) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                data2.resize(options.block_size);
+                ASSERT_EQ(sink.stream(), data2);
+            }
+            if (op->new_block == 5000) {
+                StringSink sink;
+                ASSERT_TRUE(reader.ReadData(*op, &sink));
+                ASSERT_EQ(sink.stream(), data3);
+            }
+        }
+
+        iter->Next();
+    }
+
+    ASSERT_EQ(total_blocks, expected_blocks);
+}
+
+INSTANTIATE_TEST_SUITE_P(CowApi, CompressionRWTest, testing::Values("none", "gz", "brotli", "lz4"));
+
 TEST_F(CowTest, ClusterCompressGz) {
     CowOptions options;
     options.compression = "gz";
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
index 0eb231b..4d9b748 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_compress.cpp
@@ -33,7 +33,7 @@
 namespace android {
 namespace snapshot {
 
-std::basic_string<uint8_t> CowWriter::Compress(const void* data, size_t length) {
+std::basic_string<uint8_t> CompressWorker::Compress(const void* data, size_t length) {
     switch (compression_) {
         case kCowCompressGz: {
             const auto bound = compressBound(length);
@@ -100,5 +100,119 @@
     return {};
 }
 
+bool CompressWorker::CompressBlocks(const void* buffer, size_t num_blocks,
+                                    std::vector<std::basic_string<uint8_t>>* compressed_data) {
+    const uint8_t* iter = reinterpret_cast<const uint8_t*>(buffer);
+    while (num_blocks) {
+        auto data = Compress(iter, block_size_);
+        if (data.empty()) {
+            PLOG(ERROR) << "CompressBlocks: Compression failed";
+            return false;
+        }
+        if (data.size() > std::numeric_limits<uint16_t>::max()) {
+            LOG(ERROR) << "Compressed block is too large: " << data.size();
+            return false;
+        }
+
+        compressed_data->emplace_back(std::move(data));
+        num_blocks -= 1;
+        iter += block_size_;
+    }
+    return true;
+}
+
+bool CompressWorker::RunThread() {
+    while (true) {
+        // Wait for work
+        CompressWork blocks;
+        {
+            std::unique_lock<std::mutex> lock(lock_);
+            while (work_queue_.empty() && !stopped_) {
+                cv_.wait(lock);
+            }
+
+            if (stopped_) {
+                return true;
+            }
+
+            blocks = std::move(work_queue_.front());
+            work_queue_.pop();
+        }
+
+        // Compress blocks
+        bool ret = CompressBlocks(blocks.buffer, blocks.num_blocks, &blocks.compressed_data);
+        blocks.compression_status = ret;
+        {
+            std::lock_guard<std::mutex> lock(lock_);
+            compressed_queue_.push(std::move(blocks));
+        }
+
+        // Notify completion
+        cv_.notify_all();
+
+        if (!ret) {
+            LOG(ERROR) << "CompressBlocks failed";
+            return false;
+        }
+    }
+
+    return true;
+}
+
+void CompressWorker::EnqueueCompressBlocks(const void* buffer, size_t num_blocks) {
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+
+        CompressWork blocks = {};
+        blocks.buffer = buffer;
+        blocks.num_blocks = num_blocks;
+        work_queue_.push(std::move(blocks));
+    }
+    cv_.notify_all();
+}
+
+bool CompressWorker::GetCompressedBuffers(std::vector<std::basic_string<uint8_t>>* compressed_buf) {
+    {
+        std::unique_lock<std::mutex> lock(lock_);
+        while (compressed_queue_.empty() && !stopped_) {
+            cv_.wait(lock);
+        }
+
+        if (stopped_) {
+            return true;
+        }
+    }
+
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        while (compressed_queue_.size() > 0) {
+            CompressWork blocks = std::move(compressed_queue_.front());
+            compressed_queue_.pop();
+
+            if (blocks.compression_status) {
+                compressed_buf->insert(compressed_buf->end(),
+                                       std::make_move_iterator(blocks.compressed_data.begin()),
+                                       std::make_move_iterator(blocks.compressed_data.end()));
+            } else {
+                LOG(ERROR) << "Block compression failed";
+                return false;
+            }
+        }
+    }
+
+    return true;
+}
+
+void CompressWorker::Finalize() {
+    {
+        std::unique_lock<std::mutex> lock(lock_);
+        stopped_ = true;
+    }
+    cv_.notify_all();
+}
+
+CompressWorker::CompressWorker(CowCompressionAlgorithm compression, uint32_t block_size)
+    : compression_(compression), block_size_(block_size) {}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
index 43c17a6..2d5e4bc 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_writer.cpp
@@ -15,6 +15,7 @@
 //
 
 #include <sys/types.h>
+#include <sys/uio.h>
 #include <unistd.h>
 
 #include <limits>
@@ -22,6 +23,7 @@
 
 #include <android-base/file.h>
 #include <android-base/logging.h>
+#include <android-base/properties.h>
 #include <android-base/unique_fd.h>
 #include <brotli/encode.h>
 #include <libsnapshot/cow_format.h>
@@ -132,6 +134,46 @@
 
 CowWriter::CowWriter(const CowOptions& options) : ICowWriter(options), fd_(-1) {
     SetupHeaders();
+    SetupWriteOptions();
+}
+
+CowWriter::~CowWriter() {
+    for (size_t i = 0; i < compress_threads_.size(); i++) {
+        CompressWorker* worker = compress_threads_[i].get();
+        if (worker) {
+            worker->Finalize();
+        }
+    }
+
+    bool ret = true;
+    for (auto& t : threads_) {
+        ret = t.get() && ret;
+    }
+
+    if (!ret) {
+        LOG(ERROR) << "Compression failed";
+    }
+    compress_threads_.clear();
+}
+
+void CowWriter::SetupWriteOptions() {
+    num_compress_threads_ = options_.num_compress_threads;
+
+    if (!num_compress_threads_) {
+        num_compress_threads_ = 1;
+        // We prefer not to have more than two threads as the overhead of additional
+        // threads is far greater than cutting down compression time.
+        if (header_.cluster_ops &&
+            android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) {
+            num_compress_threads_ = 2;
+        }
+    }
+
+    if (header_.cluster_ops &&
+        (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
+         options_.batch_write)) {
+        batch_write_ = true;
+    }
 }
 
 void CowWriter::SetupHeaders() {
@@ -206,6 +248,42 @@
     return true;
 }
 
+void CowWriter::InitBatchWrites() {
+    if (batch_write_) {
+        cowop_vec_ = std::make_unique<struct iovec[]>(header_.cluster_ops);
+        data_vec_ = std::make_unique<struct iovec[]>(header_.cluster_ops);
+        struct iovec* cowop_ptr = cowop_vec_.get();
+        struct iovec* data_ptr = data_vec_.get();
+        for (size_t i = 0; i < header_.cluster_ops; i++) {
+            std::unique_ptr<CowOperation> op = std::make_unique<CowOperation>();
+            cowop_ptr[i].iov_base = op.get();
+            cowop_ptr[i].iov_len = sizeof(CowOperation);
+            opbuffer_vec_.push_back(std::move(op));
+
+            std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(header_.block_size * 2);
+            data_ptr[i].iov_base = buffer.get();
+            data_ptr[i].iov_len = header_.block_size * 2;
+            databuffer_vec_.push_back(std::move(buffer));
+        }
+
+        current_op_pos_ = next_op_pos_;
+        current_data_pos_ = next_data_pos_;
+    }
+
+    std::string batch_write = batch_write_ ? "enabled" : "disabled";
+    LOG(INFO) << "Batch writes: " << batch_write;
+}
+
+void CowWriter::InitWorkers() {
+    for (int i = 0; i < num_compress_threads_; i++) {
+        auto wt = std::make_unique<CompressWorker>(compression_, header_.block_size);
+        threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
+        compress_threads_.push_back(std::move(wt));
+    }
+
+    LOG(INFO) << num_compress_threads_ << " thread used for compression";
+}
+
 bool CowWriter::Initialize(unique_fd&& fd) {
     owned_fd_ = std::move(fd);
     return Initialize(borrowed_fd{owned_fd_});
@@ -216,7 +294,13 @@
         return false;
     }
 
-    return OpenForWrite();
+    bool ret = OpenForWrite();
+
+    if (ret) {
+        InitWorkers();
+    }
+
+    return ret;
 }
 
 bool CowWriter::InitializeAppend(android::base::unique_fd&& fd, uint64_t label) {
@@ -229,7 +313,13 @@
         return false;
     }
 
-    return OpenForAppend(label);
+    bool ret = OpenForAppend(label);
+
+    if (ret && !compress_threads_.size()) {
+        InitWorkers();
+    }
+
+    return ret;
 }
 
 void CowWriter::InitPos() {
@@ -287,6 +377,7 @@
     }
 
     InitPos();
+    InitBatchWrites();
 
     return true;
 }
@@ -320,6 +411,9 @@
         PLOG(ERROR) << "lseek failed";
         return false;
     }
+
+    InitBatchWrites();
+
     return EmitClusterIfNeeded();
 }
 
@@ -348,47 +442,99 @@
     return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp);
 }
 
+bool CowWriter::CompressBlocks(size_t num_blocks, const void* data) {
+    size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
+    size_t num_blocks_per_thread = num_blocks / num_threads;
+    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
+    compressed_buf_.clear();
+
+    // Submit the blocks per thread. The retrieval of
+    // compressed buffers has to be done in the same order.
+    // We should not poll for completed buffers in a different order as the
+    // buffers are tightly coupled with block ordering.
+    for (size_t i = 0; i < num_threads; i++) {
+        CompressWorker* worker = compress_threads_[i].get();
+        if (i == num_threads - 1) {
+            num_blocks_per_thread = num_blocks;
+        }
+        worker->EnqueueCompressBlocks(iter, num_blocks_per_thread);
+        iter += (num_blocks_per_thread * header_.block_size);
+        num_blocks -= num_blocks_per_thread;
+    }
+
+    for (size_t i = 0; i < num_threads; i++) {
+        CompressWorker* worker = compress_threads_[i].get();
+        if (!worker->GetCompressedBuffers(&compressed_buf_)) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
 bool CowWriter::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
                            uint64_t old_block, uint16_t offset, uint8_t type) {
-    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
     CHECK(!merge_in_progress_);
-    for (size_t i = 0; i < size / header_.block_size; i++) {
-        CowOperation op = {};
-        op.new_block = new_block_start + i;
-        op.type = type;
-        if (type == kCowXorOp) {
-            op.source = (old_block + i) * header_.block_size + offset;
-        } else {
-            op.source = next_data_pos_;
-        }
+    const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
+
+    // Update engine can potentially send 100MB of blocks at a time. We
+    // don't want to process all those blocks in one shot as it can
+    // stress the memory. Hence, process the blocks in chunks.
+    //
+    // 1024 blocks is reasonable given we will end up using max
+    // memory of ~4MB.
+    const size_t kProcessingBlocks = 1024;
+    size_t num_blocks = (size / header_.block_size);
+    size_t i = 0;
+
+    while (num_blocks) {
+        size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));
 
         if (compression_) {
-            auto data = Compress(iter, header_.block_size);
-            if (data.empty()) {
-                PLOG(ERROR) << "AddRawBlocks: compression failed";
+            if (!CompressBlocks(pending_blocks, iter)) {
                 return false;
             }
-            if (data.size() > std::numeric_limits<uint16_t>::max()) {
-                LOG(ERROR) << "Compressed block is too large: " << data.size() << " bytes";
-                return false;
-            }
-            op.compression = compression_;
-            op.data_length = static_cast<uint16_t>(data.size());
-
-            if (!WriteOperation(op, data.data(), data.size())) {
-                PLOG(ERROR) << "AddRawBlocks: write failed, bytes requested: " << size
-                            << ", bytes written: " << i * header_.block_size;
-                return false;
-            }
-        } else {
-            op.data_length = static_cast<uint16_t>(header_.block_size);
-            if (!WriteOperation(op, iter, header_.block_size)) {
-                PLOG(ERROR) << "AddRawBlocks: write failed";
-                return false;
-            }
+            buf_iter_ = compressed_buf_.begin();
+            CHECK(pending_blocks == compressed_buf_.size());
+            iter += (pending_blocks * header_.block_size);
         }
 
-        iter += header_.block_size;
+        num_blocks -= pending_blocks;
+
+        while (i < size / header_.block_size && pending_blocks) {
+            CowOperation op = {};
+            op.new_block = new_block_start + i;
+            op.type = type;
+            if (type == kCowXorOp) {
+                op.source = (old_block + i) * header_.block_size + offset;
+            } else {
+                op.source = next_data_pos_;
+            }
+
+            if (compression_) {
+                auto data = std::move(*buf_iter_);
+                op.compression = compression_;
+                op.data_length = static_cast<uint16_t>(data.size());
+
+                if (!WriteOperation(op, data.data(), data.size())) {
+                    PLOG(ERROR) << "AddRawBlocks: write failed";
+                    return false;
+                }
+                buf_iter_++;
+            } else {
+                op.data_length = static_cast<uint16_t>(header_.block_size);
+                if (!WriteOperation(op, iter, header_.block_size)) {
+                    PLOG(ERROR) << "AddRawBlocks: write failed";
+                    return false;
+                }
+                iter += header_.block_size;
+            }
+
+            i += 1;
+            pending_blocks -= 1;
+        }
+
+        CHECK(pending_blocks == 0);
     }
     return true;
 }
@@ -416,7 +562,7 @@
 bool CowWriter::EmitSequenceData(size_t num_ops, const uint32_t* data) {
     CHECK(!merge_in_progress_);
     size_t to_add = 0;
-    size_t max_ops = std::numeric_limits<uint16_t>::max() / sizeof(uint32_t);
+    size_t max_ops = (header_.block_size * 2) / sizeof(uint32_t);
     while (num_ops > 0) {
         CowOperation op = {};
         op.type = kCowSequenceOp;
@@ -461,6 +607,11 @@
 }
 
 bool CowWriter::Finalize() {
+    if (!FlushCluster()) {
+        LOG(ERROR) << "Finalize: FlushCluster() failed";
+        return false;
+    }
+
     auto continue_cluster_size = current_cluster_size_;
     auto continue_data_size = current_data_size_;
     auto continue_data_pos = next_data_pos_;
@@ -525,6 +676,9 @@
         next_op_pos_ = continue_op_pos;
         footer_.op.num_ops = continue_num_ops;
     }
+
+    FlushCluster();
+
     return Sync();
 }
 
@@ -556,6 +710,35 @@
     return true;
 }
 
+bool CowWriter::FlushCluster() {
+    ssize_t ret;
+
+    if (op_vec_index_) {
+        ret = pwritev(fd_.get(), cowop_vec_.get(), op_vec_index_, current_op_pos_);
+        if (ret != (op_vec_index_ * sizeof(CowOperation))) {
+            PLOG(ERROR) << "pwritev failed for CowOperation. Expected: "
+                        << (op_vec_index_ * sizeof(CowOperation));
+            return false;
+        }
+    }
+
+    if (data_vec_index_) {
+        ret = pwritev(fd_.get(), data_vec_.get(), data_vec_index_, current_data_pos_);
+        if (ret != total_data_written_) {
+            PLOG(ERROR) << "pwritev failed for data. Expected: " << total_data_written_;
+            return false;
+        }
+    }
+
+    total_data_written_ = 0;
+    op_vec_index_ = 0;
+    data_vec_index_ = 0;
+    current_op_pos_ = next_op_pos_;
+    current_data_pos_ = next_data_pos_;
+
+    return true;
+}
+
 bool CowWriter::WriteOperation(const CowOperation& op, const void* data, size_t size) {
     if (!EnsureSpaceAvailable(next_op_pos_ + sizeof(op))) {
         return false;
@@ -564,14 +747,43 @@
         return false;
     }
 
-    if (!android::base::WriteFullyAtOffset(fd_, reinterpret_cast<const uint8_t*>(&op), sizeof(op),
-                                           next_op_pos_)) {
-        return false;
+    if (batch_write_) {
+        CowOperation* cow_op = reinterpret_cast<CowOperation*>(cowop_vec_[op_vec_index_].iov_base);
+        std::memcpy(cow_op, &op, sizeof(CowOperation));
+        op_vec_index_ += 1;
+
+        if (data != nullptr && size > 0) {
+            struct iovec* data_ptr = data_vec_.get();
+            std::memcpy(data_ptr[data_vec_index_].iov_base, data, size);
+            data_ptr[data_vec_index_].iov_len = size;
+            data_vec_index_ += 1;
+            total_data_written_ += size;
+        }
+    } else {
+        if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
+            PLOG(ERROR) << "lseek failed for writing operation.";
+            return false;
+        }
+        if (!android::base::WriteFully(fd_, reinterpret_cast<const uint8_t*>(&op), sizeof(op))) {
+            return false;
+        }
+        if (data != nullptr && size > 0) {
+            if (!WriteRawData(data, size)) return false;
+        }
     }
-    if (data != nullptr && size > 0) {
-        if (!WriteRawData(data, size)) return false;
-    }
+
     AddOperation(op);
+
+    if (batch_write_) {
+        if (op_vec_index_ == header_.cluster_ops || data_vec_index_ == header_.cluster_ops ||
+            op.type == kCowLabelOp || op.type == kCowClusterOp) {
+            if (!FlushCluster()) {
+                LOG(ERROR) << "Failed to flush cluster data";
+                return false;
+            }
+        }
+    }
+
     return EmitClusterIfNeeded();
 }