Support batch writes for non-compressed ops

This also improves atomicity of ops. If a single Add*Blocks() call
with 100 blocks failed in the middle, partially written blocks would be
discarded, and op count on disk stays unchanged. Previously wew ould
update the op count on disk with partially written blocks, causing
labels to be inaccurate.

Test: th
Bug: 313962438
Change-Id: If175a705f6ec46c1b25c52d0d9f02f01a540ce55
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
index 2c4d40f..b0eb723 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/cow_format.cpp
@@ -14,7 +14,6 @@
 // limitations under the License.
 //
 
-#include <inttypes.h>
 #include <libsnapshot/cow_format.h>
 #include <sys/types.h>
 #include <unistd.h>
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
index 27accdc..44b7344 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/test_v3.cpp
@@ -97,7 +97,7 @@
     options.op_count_max = 20;
     auto writer = CreateCowWriter(3, options, GetCowFd());
     ASSERT_FALSE(writer->AddZeroBlocks(1, 21));
-    ASSERT_FALSE(writer->AddZeroBlocks(1, 1));
+    ASSERT_TRUE(writer->AddZeroBlocks(1, 20));
     std::string data = "This is some data, believe it";
     data.resize(options.block_size, '\0');
 
@@ -184,7 +184,7 @@
     std::string data;
     data.resize(options.block_size * 5);
     for (int i = 0; i < data.size(); i++) {
-        data[i] = char(rand() % 256);
+        data[i] = static_cast<char>('A' + i / options.block_size);
     }
 
     ASSERT_TRUE(writer->AddRawBlocks(5, data.data(), data.size()));
@@ -205,19 +205,20 @@
     ASSERT_FALSE(iter->AtEnd());
 
     size_t i = 0;
-    std::string sink(data.size(), '\0');
 
     while (!iter->AtEnd()) {
         auto op = iter->Get();
+        std::string sink(options.block_size, '\0');
         ASSERT_EQ(op->type(), kCowReplaceOp);
         ASSERT_EQ(op->data_length, options.block_size);
         ASSERT_EQ(op->new_block, 5 + i);
-        ASSERT_TRUE(
-                ReadData(reader, op, sink.data() + (i * options.block_size), options.block_size));
+        ASSERT_TRUE(ReadData(reader, op, sink.data(), options.block_size));
+        ASSERT_EQ(std::string_view(sink),
+                  std::string_view(data).substr(i * options.block_size, options.block_size))
+                << " readback data for " << i << "th block does not match";
         iter->Next();
         i++;
     }
-    ASSERT_EQ(sink, data);
 
     ASSERT_EQ(i, 5);
 }
@@ -372,41 +373,33 @@
     ASSERT_NE(iter, nullptr);
     ASSERT_FALSE(iter->AtEnd());
 
-    size_t i = 0;
-
-    while (i < 5) {
+    for (size_t i = 0; i < 5; i++) {
         auto op = iter->Get();
         ASSERT_EQ(op->type(), kCowZeroOp);
         ASSERT_EQ(op->new_block, 10 + i);
         iter->Next();
-        i++;
     }
-    i = 0;
-    while (i < 5) {
+    for (size_t i = 0; i < 5; i++) {
         auto op = iter->Get();
         ASSERT_EQ(op->type(), kCowCopyOp);
         ASSERT_EQ(op->new_block, 15 + i);
         ASSERT_EQ(op->source(), 3 + i);
         iter->Next();
-        i++;
     }
-    i = 0;
     std::string sink(data.size(), '\0');
 
-    while (i < 5) {
+    for (size_t i = 0; i < 5; i++) {
         auto op = iter->Get();
         ASSERT_EQ(op->type(), kCowReplaceOp);
         ASSERT_EQ(op->new_block, 18 + i);
-        ASSERT_TRUE(
-                ReadData(reader, op, sink.data() + (i * options.block_size), options.block_size));
+        ASSERT_EQ(reader.ReadData(op, sink.data() + (i * options.block_size), options.block_size),
+                  options.block_size);
         iter->Next();
-        i++;
     }
     ASSERT_EQ(sink, data);
 
-    i = 0;
     std::fill(sink.begin(), sink.end(), '\0');
-    while (i < 5) {
+    for (size_t i = 0; i < 5; i++) {
         auto op = iter->Get();
         ASSERT_EQ(op->type(), kCowXorOp);
         ASSERT_EQ(op->new_block, 50 + i);
@@ -414,7 +407,6 @@
         ASSERT_TRUE(
                 ReadData(reader, op, sink.data() + (i * options.block_size), options.block_size));
         iter->Next();
-        i++;
     }
     ASSERT_EQ(sink, data);
 }
@@ -671,5 +663,25 @@
     ASSERT_LE(writer.GetCowSize(), cow_size);
 }
 
+TEST_F(CowTestV3, CopyOpMany) {
+    CowOptions options;
+    options.op_count_max = 100;
+    CowWriterV3 writer(options, GetCowFd());
+    writer.Initialize();
+    ASSERT_TRUE(writer.AddCopy(100, 50, 50));
+    ASSERT_TRUE(writer.AddCopy(150, 100, 50));
+    ASSERT_TRUE(writer.Finalize());
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(GetCowFd()));
+    auto it = reader.GetOpIter();
+    for (size_t i = 0; i < 100; i++) {
+        ASSERT_FALSE(it->AtEnd()) << " op iterator ended at " << i;
+        const auto op = *it->Get();
+        ASSERT_EQ(op.type(), kCowCopyOp);
+        ASSERT_EQ(op.new_block, 100 + i);
+        it->Next();
+    }
+}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
index 07f6f00..4df0e76 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.cpp
@@ -113,7 +113,13 @@
     }
 
     compression_.algorithm = *algorithm;
-    compressor_ = ICompressor::Create(compression_, header_.block_size);
+    if (compression_.algorithm != kCowCompressNone) {
+        compressor_ = ICompressor::Create(compression_, header_.block_size);
+        if (compressor_ == nullptr) {
+            LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm;
+            return false;
+        }
+    }
     return true;
 }
 
@@ -207,14 +213,15 @@
 }
 
 bool CowWriterV3::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_blocks) {
+    std::vector<CowOperationV3> ops(num_blocks);
     for (size_t i = 0; i < num_blocks; i++) {
-        CowOperationV3 op{};
+        CowOperationV3& op = ops[i];
         op.set_type(kCowCopyOp);
         op.new_block = new_block + i;
         op.set_source(old_block + i);
-        if (!WriteOperation(op)) {
-            return false;
-        }
+    }
+    if (!WriteOperation({ops.data(), ops.size()}, {})) {
+        return false;
     }
 
     return true;
@@ -231,12 +238,37 @@
 
 bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
                              uint64_t old_block, uint16_t offset, CowOperationType type) {
+    if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) {
+        LOG(ERROR) << "Compression algorithm is " << compression_.algorithm
+                   << " but compressor is uninitialized.";
+        return false;
+    }
     const size_t num_blocks = (size / header_.block_size);
+    if (compression_.algorithm == kCowCompressNone) {
+        std::vector<CowOperationV3> ops(num_blocks);
+        for (size_t i = 0; i < num_blocks; i++) {
+            CowOperation& op = ops[i];
+            op.new_block = new_block_start + i;
+
+            op.set_type(type);
+            if (type == kCowXorOp) {
+                op.set_source((old_block + i) * header_.block_size + offset);
+            } else {
+                op.set_source(next_data_pos_ + header_.block_size * i);
+            }
+            op.data_length = header_.block_size;
+        }
+        return WriteOperation({ops.data(), ops.size()},
+                              {reinterpret_cast<const uint8_t*>(data), size});
+    }
+
+    const auto saved_op_count = header_.op_count;
+    const auto saved_data_pos = next_data_pos_;
     for (size_t i = 0; i < num_blocks; i++) {
         const uint8_t* const iter =
                 reinterpret_cast<const uint8_t*>(data) + (header_.block_size * i);
 
-        CowOperation op = {};
+        CowOperation op{};
         op.new_block = new_block_start + i;
 
         op.set_type(type);
@@ -245,25 +277,21 @@
         } else {
             op.set_source(next_data_pos_);
         }
-        std::basic_string<uint8_t> compressed_data;
         const void* out_data = iter;
 
         op.data_length = header_.block_size;
 
-        if (compression_.algorithm) {
-            if (!compressor_) {
-                PLOG(ERROR) << "Compressor not initialized";
-                return false;
-            }
-            compressed_data = compressor_->Compress(out_data, header_.block_size);
-            if (compressed_data.size() < op.data_length) {
-                out_data = compressed_data.data();
-                op.data_length = compressed_data.size();
-            }
+        const std::basic_string<uint8_t> compressed_data =
+                compressor_->Compress(out_data, header_.block_size);
+        if (compressed_data.size() < op.data_length) {
+            out_data = compressed_data.data();
+            op.data_length = compressed_data.size();
         }
         if (!WriteOperation(op, out_data, op.data_length)) {
             PLOG(ERROR) << "AddRawBlocks with compression: write failed. new block: "
                         << new_block_start << " compression: " << compression_.algorithm;
+            header_.op_count = saved_op_count;
+            next_data_pos_ = saved_data_pos;
             return false;
         }
     }
@@ -272,13 +300,14 @@
 }
 
 bool CowWriterV3::EmitZeroBlocks(uint64_t new_block_start, uint64_t num_blocks) {
+    std::vector<CowOperationV3> ops(num_blocks);
     for (uint64_t i = 0; i < num_blocks; i++) {
-        CowOperationV3 op{};
+        CowOperationV3& op = ops[i];
         op.set_type(kCowZeroOp);
         op.new_block = new_block_start + i;
-        if (!WriteOperation(op)) {
-            return false;
-        }
+    }
+    if (!WriteOperation({ops.data(), ops.size()}, {})) {
+        return false;
     }
     return true;
 }
@@ -324,43 +353,48 @@
     return true;
 }
 
-bool CowWriterV3::WriteOperation(const CowOperationV3& op, const void* data, size_t size) {
+bool CowWriterV3::WriteOperation(std::basic_string_view<CowOperationV3> ops,
+                                 std::basic_string_view<uint8_t> data) {
     if (IsEstimating()) {
-        header_.op_count++;
+        header_.op_count += ops.size();
         if (header_.op_count > header_.op_count_max) {
             // If we increment op_count_max, the offset of data section would
             // change. So need to update |next_data_pos_|
             next_data_pos_ += (header_.op_count - header_.op_count_max) * sizeof(CowOperationV3);
             header_.op_count_max = header_.op_count;
         }
-        next_data_pos_ += op.data_length;
+        next_data_pos_ += data.size();
         return true;
     }
 
-    if (header_.op_count + 1 > header_.op_count_max) {
-        LOG(ERROR) << "Maximum number of ops reached: " << header_.op_count_max;
+    if (header_.op_count + ops.size() > header_.op_count_max) {
+        LOG(ERROR) << "Current op count " << header_.op_count << ", attempting to write "
+                   << ops.size() << " ops will exceed the max of " << header_.op_count_max;
         return false;
     }
 
     const off_t offset = GetOpOffset(header_.op_count, header_);
-    if (!android::base::WriteFullyAtOffset(fd_, &op, sizeof(op), offset)) {
-        PLOG(ERROR) << "write failed for " << op << " at " << offset;
+    if (!android::base::WriteFullyAtOffset(fd_, ops.data(), ops.size() * sizeof(ops[0]), offset)) {
+        PLOG(ERROR) << "Write failed for " << ops.size() << " ops at " << offset;
         return false;
     }
-    if (data && size > 0) {
-        if (!android::base::WriteFullyAtOffset(fd_, data, size, next_data_pos_)) {
-            PLOG(ERROR) << "write failed for data of size: " << size
+    if (!data.empty()) {
+        if (!android::base::WriteFullyAtOffset(fd_, data.data(), data.size(), next_data_pos_)) {
+            PLOG(ERROR) << "write failed for data of size: " << data.size()
                         << " at offset: " << next_data_pos_;
             return false;
         }
     }
-    header_.op_count++;
-    next_data_pos_ += op.data_length;
-    next_op_pos_ += sizeof(CowOperationV3);
+    header_.op_count += ops.size();
+    next_data_pos_ += data.size();
 
     return true;
 }
 
+bool CowWriterV3::WriteOperation(const CowOperationV3& op, const void* data, size_t size) {
+    return WriteOperation({&op, 1}, {reinterpret_cast<const uint8_t*>(data), size});
+}
+
 bool CowWriterV3::Finalize() {
     CHECK_GE(header_.prefix.header_size, sizeof(CowHeaderV3));
     CHECK_LE(header_.prefix.header_size, sizeof(header_));
diff --git a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
index 340218f..02b4e61 100644
--- a/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
+++ b/fs_mgr/libsnapshot/libsnapshot_cow/writer_v3.h
@@ -15,6 +15,7 @@
 #pragma once
 
 #include <android-base/logging.h>
+#include <string_view>
 
 #include "writer_base.h"
 
@@ -44,6 +45,8 @@
     bool ParseOptions();
     bool OpenForWrite();
     bool OpenForAppend(uint64_t label);
+    bool WriteOperation(std::basic_string_view<CowOperationV3> op,
+                        std::basic_string_view<uint8_t> data);
     bool WriteOperation(const CowOperationV3& op, const void* data = nullptr, size_t size = 0);
     bool EmitBlocks(uint64_t new_block_start, const void* data, size_t size, uint64_t old_block,
                     uint16_t offset, CowOperationType type);
@@ -59,7 +62,6 @@
     // Resume points contain a laebl + cow_op_index.
     std::shared_ptr<std::vector<ResumePoint>> resume_points_;
 
-    uint64_t next_op_pos_ = 0;
     uint64_t next_data_pos_ = 0;
     std::vector<std::basic_string<uint8_t>> compressed_buf_;