Support batching ops across Add*Blocks() call

Performance of V3 COW writer is now on-par with V2 in both incremental
OTA and full OTA.

Test: th
Bug: 313962438
Change-Id: If56e0fe42367f947c513fc4c93119c3825763cb9
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
index 6b34152..9401c66 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
@@ -107,7 +107,7 @@
 static constexpr uint8_t kNumResumePoints = 4;
 
 struct CowHeaderV3 : public CowHeader {
-    // Number of sequence data stored (each of which is a 32 byte integer)
+    // Number of sequence data stored (each of which is a 32 bit integer)
     uint64_t sequence_data_count;
     // Number of currently written resume points &&
     uint32_t resume_point_count;
@@ -311,6 +311,8 @@
 
 std::ostream& operator<<(std::ostream& os, ResumePoint const& arg);
 
+std::ostream& operator<<(std::ostream& os, CowOperationType cow_type);
+
 int64_t GetNextOpOffset(const CowOperationV2& op, uint32_t cluster_size);
 int64_t GetNextDataOffset(const CowOperationV2& op, uint32_t cluster_size);
 
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
index b0eb723..8d1786c 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
@@ -52,6 +52,10 @@
     }
 }
 
+std::ostream& operator<<(std::ostream& os, CowOperationType cow_type) {
+    return EmitCowTypeString(os, cow_type);
+}
+
 std::ostream& operator<<(std::ostream& os, CowOperationV2 const& op) {
     os << "CowOperationV2(";
     EmitCowTypeString(os, op.type) << ", ";
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v3.cpp
index ce68b39..036d335 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/parser_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/parser_v3.cpp
@@ -114,6 +114,12 @@
     for (auto op : *ops_) {
         if (op.type() == kCowXorOp) {
             xor_data_loc_->insert({op.new_block, data_pos});
+        } else if (op.type() == kCowReplaceOp) {
+            if (data_pos != op.source()) {
+                LOG(ERROR) << "Invalid data location for operation " << op
+                           << ", expected: " << data_pos;
+                return false;
+            }
         }
         data_pos += op.data_length;
     }
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
index 500f8d0..3383a58 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
@@ -688,5 +688,15 @@
     }
 }
 
+TEST_F(CowTestV3, CheckOpCount) {
+    CowOptions options;
+    options.op_count_max = 20;
+    options.batch_write = true;
+    options.cluster_ops = 200;
+    auto writer = CreateCowWriter(3, options, GetCowFd());
+    ASSERT_TRUE(writer->AddZeroBlocks(0, 19));
+    ASSERT_FALSE(writer->AddZeroBlocks(0, 19));
+}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
index c56e22a..d99e6e6 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
@@ -71,9 +71,6 @@
     header_.block_size = options_.block_size;
     header_.num_merge_ops = options_.num_merge_ops;
     header_.cluster_ops = 0;
-    if (batch_size_ > 1) {
-        LOG(INFO) << "Batch writes enabled with batch size of " << batch_size_;
-    }
     if (options_.scratch_space) {
         header_.buffer_size = BUFFER_REGION_DEFAULT_SIZE;
     }
@@ -82,6 +79,7 @@
     // WIP: not quite sure how some of these are calculated yet, assuming buffer_size is determined
     // during COW size estimation
     header_.sequence_data_count = 0;
+
     header_.resume_point_count = 0;
     header_.resume_point_max = kNumResumePoints;
     header_.op_count = 0;
@@ -123,6 +121,19 @@
             LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm;
             return false;
         }
+        if (options_.cluster_ops &&
+            (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
+             options_.batch_write)) {
+            batch_size_ = std::max<size_t>(options_.cluster_ops, 1);
+            data_vec_.reserve(batch_size_);
+            cached_data_.reserve(batch_size_);
+            cached_ops_.reserve(batch_size_);
+        }
+    }
+    if (batch_size_ > 1) {
+        LOG(INFO) << "Batch writes: enabled with batch size " << batch_size_;
+    } else {
+        LOG(INFO) << "Batch writes: disabled";
     }
     return true;
 }
@@ -220,8 +231,9 @@
     if (IsEstimating()) {
         return true;
     }
-    if (header_.op_count + op_count > header_.op_count_max) {
+    if (header_.op_count + cached_ops_.size() + op_count > header_.op_count_max) {
         LOG(ERROR) << "Current number of ops on disk: " << header_.op_count
+                   << ", number of ops cached in memory: " << cached_ops_.size()
                    << ", number of ops attempting to write: " << op_count
                    << ", this will exceed max op count " << header_.op_count_max;
         return false;
@@ -233,17 +245,18 @@
     if (!CheckOpCount(num_blocks)) {
         return false;
     }
-    std::vector<CowOperationV3> ops(num_blocks);
     for (size_t i = 0; i < num_blocks; i++) {
-        CowOperationV3& op = ops[i];
+        CowOperationV3& op = cached_ops_.emplace_back();
         op.set_type(kCowCopyOp);
         op.new_block = new_block + i;
         op.set_source(old_block + i);
     }
-    if (!WriteOperation({ops.data(), ops.size()}, {})) {
-        return false;
-    }
 
+    if (NeedsFlush()) {
+        if (!FlushCacheOps()) {
+            return false;
+        }
+    }
     return true;
 }
 
@@ -262,6 +275,13 @@
     return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp);
 }
 
+bool CowWriterV3::NeedsFlush() const {
+    // Allow bigger batch sizes for ops without data. A single CowOperationV3
+    // struct uses 14 bytes of memory, even if we cache 200 * 16 ops in memory,
+    // it's only ~44K.
+    return cached_data_.size() >= batch_size_ || cached_ops_.size() >= batch_size_ * 16;
+}
+
 bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
                              uint64_t old_block, uint16_t offset, CowOperationType type) {
     if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) {
@@ -270,34 +290,18 @@
         return false;
     }
     const size_t num_blocks = (size / header_.block_size);
-    if (compression_.algorithm == kCowCompressNone) {
-        std::vector<CowOperationV3> ops(num_blocks);
-        for (size_t i = 0; i < num_blocks; i++) {
-            CowOperation& op = ops[i];
-            op.new_block = new_block_start + i;
 
-            op.set_type(type);
-            if (type == kCowXorOp) {
-                op.set_source((old_block + i) * header_.block_size + offset);
-            } else {
-                op.set_source(next_data_pos_ + header_.block_size * i);
-            }
-            op.data_length = header_.block_size;
-        }
-        return WriteOperation({ops.data(), ops.size()}, data, size);
-    }
-
-    for (size_t i = 0; i < num_blocks; i += batch_size_) {
-        const auto blocks_to_write = std::min<size_t>(batch_size_, num_blocks - i);
-        std::vector<std::basic_string<uint8_t>> compressed_blocks(blocks_to_write);
-        std::vector<CowOperationV3> ops(blocks_to_write);
-        std::vector<struct iovec> vec(blocks_to_write);
+    for (size_t i = 0; i < num_blocks;) {
+        const auto blocks_to_write =
+                std::min<size_t>(batch_size_ - cached_data_.size(), num_blocks - i);
         size_t compressed_bytes = 0;
         for (size_t j = 0; j < blocks_to_write; j++) {
             const uint8_t* const iter =
                     reinterpret_cast<const uint8_t*>(data) + (header_.block_size * (i + j));
 
-            CowOperation& op = ops[j];
+            CowOperation& op = cached_ops_.emplace_back();
+            auto& vec = data_vec_.emplace_back();
+            auto& compressed_data = cached_data_.emplace_back();
             op.new_block = new_block_start + i + j;
 
             op.set_type(type);
@@ -306,29 +310,31 @@
             } else {
                 op.set_source(next_data_pos_ + compressed_bytes);
             }
-
-            std::basic_string<uint8_t> compressed_data =
-                    compressor_->Compress(iter, header_.block_size);
-            if (compressed_data.empty()) {
-                LOG(ERROR) << "Compression failed during EmitBlocks(" << new_block_start << ", "
-                           << num_blocks << ");";
-                return false;
+            if (compression_.algorithm == kCowCompressNone) {
+                compressed_data.resize(header_.block_size);
+            } else {
+                compressed_data = compressor_->Compress(iter, header_.block_size);
+                if (compressed_data.empty()) {
+                    LOG(ERROR) << "Compression failed during EmitBlocks(" << new_block_start << ", "
+                               << num_blocks << ");";
+                    return false;
+                }
             }
             if (compressed_data.size() >= header_.block_size) {
                 compressed_data.resize(header_.block_size);
                 std::memcpy(compressed_data.data(), iter, header_.block_size);
             }
-            compressed_blocks[j] = std::move(compressed_data);
-            vec[j] = {.iov_base = compressed_blocks[j].data(),
-                      .iov_len = compressed_blocks[j].size()};
-            op.data_length = vec[j].iov_len;
+            vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
+            op.data_length = vec.iov_len;
             compressed_bytes += op.data_length;
         }
-        if (!WriteOperation({ops.data(), ops.size()}, {vec.data(), vec.size()})) {
-            PLOG(ERROR) << "AddRawBlocks with compression: write failed. new block: "
-                        << new_block_start << " compression: " << compression_.algorithm;
+        if (NeedsFlush() && !FlushCacheOps()) {
+            LOG(ERROR) << "EmitBlocks with compression: write failed. new block: "
+                       << new_block_start << " compression: " << compression_.algorithm
+                       << ", op type: " << type;
             return false;
         }
+        i += blocks_to_write;
     }
 
     return true;
@@ -338,14 +344,15 @@
     if (!CheckOpCount(num_blocks)) {
         return false;
     }
-    std::vector<CowOperationV3> ops(num_blocks);
     for (uint64_t i = 0; i < num_blocks; i++) {
-        auto& op = ops[i];
+        auto& op = cached_ops_.emplace_back();
         op.set_type(kCowZeroOp);
         op.new_block = new_block_start + i;
     }
-    if (!WriteOperation({ops.data(), ops.size()})) {
-        return false;
+    if (NeedsFlush()) {
+        if (!FlushCacheOps()) {
+            return false;
+        }
     }
     return true;
 }
@@ -354,6 +361,10 @@
     // remove all labels greater than this current one. we want to avoid the situation of adding
     // in
     // duplicate labels with differing op values
+    if (!FlushCacheOps()) {
+        LOG(ERROR) << "Failed to flush cached ops before emitting label " << label;
+        return false;
+    }
     auto remove_if_callback = [&](const auto& resume_point) -> bool {
         if (resume_point.label >= label) return true;
         return false;
@@ -382,19 +393,14 @@
 
 bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) {
     // TODO: size sequence buffer based on options
-    if (header_.op_count > 0) {
-        LOG(ERROR)
-                << "There's " << header_.op_count
-                << " operations written to disk. Writing sequence data is only allowed before all "
-                   "operation writes.";
+    if (header_.op_count > 0 || !cached_ops_.empty()) {
+        LOG(ERROR) << "There's " << header_.op_count << " operations written to disk and "
+                   << cached_ops_.size()
+                   << " ops cached in memory. Writing sequence data is only allowed before all "
+                      "operation writes.";
         return false;
     }
     header_.sequence_data_count = num_ops;
-    // In COW format v3, data section is placed after op section and sequence
-    // data section. Therefore, changing the sequence data count has the effect
-    // of moving op section and data section. Therefore we need to reset the
-    // value of |next_data_pos|. This is also the reason why writing sequence
-    // data is only allowed if there's no operation written.
     next_data_pos_ = GetDataOffset(header_);
     if (!android::base::WriteFullyAtOffset(fd_, data, sizeof(data[0]) * num_ops,
                                            GetSequenceOffset(header_))) {
@@ -404,16 +410,32 @@
     return true;
 }
 
-bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> op, const void* data,
-                                 size_t size) {
-    struct iovec vec {
-        .iov_len = size
-    };
-    // Dear C++ god, this is effectively a const_cast. I had to do this because
-    // pwritev()'s struct iovec requires a non-const pointer. The input data
-    // will not be modified, as the iovec is only used for a write operation.
-    std::memcpy(&vec.iov_base, &data, sizeof(data));
-    return WriteOperation(op, {&vec, 1});
+bool CowWriterV3::FlushCacheOps() {
+    if (cached_ops_.empty()) {
+        if (!data_vec_.empty()) {
+            LOG(ERROR) << "Cached ops is empty, but data iovec has size: " << data_vec_.size()
+                       << " this is definitely a bug.";
+            return false;
+        }
+        return true;
+    }
+    size_t bytes_written = 0;
+
+    for (auto& op : cached_ops_) {
+        if (op.type() == kCowReplaceOp) {
+            op.set_source(next_data_pos_ + bytes_written);
+        }
+        bytes_written += op.data_length;
+    }
+    if (!WriteOperation({cached_ops_.data(), cached_ops_.size()},
+                        {data_vec_.data(), data_vec_.size()})) {
+        LOG(ERROR) << "Failed to flush " << cached_ops_.size() << " ops to disk";
+        return false;
+    }
+    cached_ops_.clear();
+    cached_data_.clear();
+    data_vec_.clear();
+    return true;
 }
 
 bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
@@ -438,7 +460,6 @@
                    << ops.size() << " ops will exceed the max of " << header_.op_count_max;
         return false;
     }
-
     const off_t offset = GetOpOffset(header_.op_count, header_);
     if (!android::base::WriteFullyAtOffset(fd_, ops.data(), ops.size() * sizeof(ops[0]), offset)) {
         PLOG(ERROR) << "Write failed for " << ops.size() << " ops at " << offset;
@@ -458,13 +479,12 @@
     return true;
 }
 
-bool CowWriterV3::WriteOperation(const CowOperationV3& op, const void* data, size_t size) {
-    return WriteOperation({&op, 1}, data, size);
-}
-
 bool CowWriterV3::Finalize() {
     CHECK_GE(header_.prefix.header_size, sizeof(CowHeaderV3));
     CHECK_LE(header_.prefix.header_size, sizeof(header_));
+    if (!FlushCacheOps()) {
+        return false;
+    }
     if (!android::base::WriteFullyAtOffset(fd_, &header_, header_.prefix.header_size, 0)) {
         return false;
     }
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
index 613bbe5..3a7b877 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
@@ -16,6 +16,7 @@
 
 #include <android-base/logging.h>
 #include <string_view>
+#include <vector>
 
 #include "writer_base.h"
 
@@ -42,20 +43,20 @@
 
   private:
     void SetupHeaders();
+    bool NeedsFlush() const;
     bool ParseOptions();
     bool OpenForWrite();
     bool OpenForAppend(uint64_t label);
     bool WriteOperation(std::basic_string_view<CowOperationV3> op,
                         std::basic_string_view<struct iovec> data);
-    bool WriteOperation(std::basic_string_view<CowOperationV3> op, const void* data = nullptr,
-                        size_t size = 0);
-    bool WriteOperation(const CowOperationV3& op, const void* data = nullptr, size_t size = 0);
     bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
                     uint16_t offset, CowOperationType type);
     bool CompressBlocks(size_t num_blocks, const void* data);
     bool CheckOpCount(size_t op_count);
 
   private:
+    bool ReadBackVerification();
+    bool FlushCacheOps();
     CowHeaderV3 header_{};
     CowCompression compression_;
     // in the case that we are using one thread for compression, we can store and re-use the same
@@ -66,12 +67,14 @@
     std::shared_ptr<std::vector<ResumePoint>> resume_points_;
 
     uint64_t next_data_pos_ = 0;
-    std::vector<std::basic_string<uint8_t>> compressed_buf_;
 
     // in the case that we are using one thread for compression, we can store and re-use the same
     // compressor
     int num_compress_threads_ = 1;
-    size_t batch_size_ = 0;
+    size_t batch_size_ = 1;
+    std::vector<CowOperationV3> cached_ops_;
+    std::vector<std::basic_string<uint8_t>> cached_data_;
+    std::vector<struct iovec> data_vec_;
 };
 
 }  // namespace snapshot