Move some SOURCE_COPY op processing out of Init() function

Currently, all COPY ops are processed upfront in
VABCPartitionWriter::Init() call. While update_engine is processing copy
ops, it can't respond to user actions such as pausing the OTA. To
improve UX, move some of the processing logic out of Init() function,
and spread the work through out the OTA install ops.

Before:
    1. Write all COPY blocks through cow_writer
    2. For blocks that gets converted to COW_REPLACE, compress them and
       write to userspace snapshots using cow_writer

Item #2 take the most time, as compression + writing large blocks of
data to disk take a lot of time. This CL moves #2 to
VABCPartitionWriter::PerformSourceCopyOperation, so that the work is
done in chunks as we install the OTA.

Result:
    Tested on O6, time spent on VABCPartitionWriter::Init() reduced from
    177.6s to 2.3s

Test: Install oriole-UP1A.220930.002-to-UP1A.221007.001.zip
Bug: 248404111
Change-Id: I44737b7115e0ad7616ec49b8934fd6d70dc07447
diff --git a/common/cow_operation_convert.cc b/common/cow_operation_convert.cc
index a8f7541..adbfb7d 100644
--- a/common/cow_operation_convert.cc
+++ b/common/cow_operation_convert.cc
@@ -24,23 +24,14 @@
 
 namespace chromeos_update_engine {
 
-namespace {
-
-bool IsConsecutive(const CowOperation& op1, const CowOperation& op2) {
-  return op1.op == op2.op && op1.dst_block + op1.block_count == op2.dst_block &&
-         op1.src_block + op1.block_count == op2.src_block;
-}
-
 void push_back(std::vector<CowOperation>* converted, const CowOperation& op) {
   if (!converted->empty() && IsConsecutive(converted->back(), op)) {
-    converted->back().block_count++;
+    converted->back().block_count += op.block_count;
   } else {
     converted->push_back(op);
   }
 }
 
-}  // namespace
-
 std::vector<CowOperation> ConvertToCowOperations(
     const ::google::protobuf::RepeatedPtrField<
         ::chromeos_update_engine::InstallOperation>& operations,
diff --git a/common/cow_operation_convert.h b/common/cow_operation_convert.h
index 60c820f..a260a4a 100644
--- a/common/cow_operation_convert.h
+++ b/common/cow_operation_convert.h
@@ -29,7 +29,7 @@
     CowCopy = android::snapshot::kCowCopyOp,
     CowReplace = android::snapshot::kCowReplaceOp,
   };
-  Type op;
+  Type op{};
   uint64_t src_block{};
   uint64_t dst_block{};
   uint64_t block_count{1};
@@ -53,5 +53,13 @@
         ::chromeos_update_engine::InstallOperation>& operations,
     const ::google::protobuf::RepeatedPtrField<CowMergeOperation>&
         merge_operations);
+
+constexpr bool IsConsecutive(const CowOperation& op1, const CowOperation& op2) {
+  return op1.op == op2.op && op1.dst_block + op1.block_count == op2.dst_block &&
+         op1.src_block + op1.block_count == op2.src_block;
+}
+
+void push_back(std::vector<CowOperation>* converted, const CowOperation& op);
+
 }  // namespace chromeos_update_engine
 #endif
diff --git a/payload_consumer/partition_writer_factory_android.cc b/payload_consumer/partition_writer_factory_android.cc
index 6736620..a04d726 100644
--- a/payload_consumer/partition_writer_factory_android.cc
+++ b/payload_consumer/partition_writer_factory_android.cc
@@ -19,6 +19,7 @@
 
 #include <base/logging.h>
 
+#include "update_engine/payload_consumer/partition_writer.h"
 #include "update_engine/payload_consumer/vabc_partition_writer.h"
 
 namespace chromeos_update_engine::partition_writer {
diff --git a/payload_consumer/partition_writer_interface.h b/payload_consumer/partition_writer_interface.h
index e346292..8080795 100644
--- a/payload_consumer/partition_writer_interface.h
+++ b/payload_consumer/partition_writer_interface.h
@@ -23,9 +23,6 @@
 #include <brillo/secure_blob.h>
 #include <gtest/gtest_prod.h>
 
-#include "update_engine/common/dynamic_partition_control_interface.h"
-#include "update_engine/payload_consumer/extent_writer.h"
-#include "update_engine/payload_consumer/file_descriptor.h"
 #include "update_engine/payload_consumer/install_plan.h"
 #include "update_engine/update_metadata.pb.h"
 
diff --git a/payload_consumer/vabc_partition_writer.cc b/payload_consumer/vabc_partition_writer.cc
index c3b2e41..df9c0a6 100644
--- a/payload_consumer/vabc_partition_writer.cc
+++ b/payload_consumer/vabc_partition_writer.cc
@@ -29,13 +29,9 @@
 
 #include "update_engine/common/cow_operation_convert.h"
 #include "update_engine/common/utils.h"
-#include "update_engine/payload_consumer/block_extent_writer.h"
 #include "update_engine/payload_consumer/extent_map.h"
-#include "update_engine/payload_consumer/extent_reader.h"
 #include "update_engine/payload_consumer/file_descriptor.h"
-#include "update_engine/payload_consumer/file_descriptor_utils.h"
 #include "update_engine/payload_consumer/install_plan.h"
-#include "update_engine/payload_consumer/partition_writer.h"
 #include "update_engine/payload_consumer/snapshot_extent_writer.h"
 #include "update_engine/payload_consumer/xor_extent_writer.h"
 #include "update_engine/payload_generator/extent_ranges.h"
@@ -92,7 +88,14 @@
       dynamic_control_(dynamic_control),
       block_size_(block_size),
       executor_(block_size),
-      verified_source_fd_(block_size, install_part.source_path) {}
+      verified_source_fd_(block_size, install_part.source_path) {
+  for (const auto& cow_op : partition_update_.merge_operations()) {
+    if (cow_op.type() != CowMergeOperation::COW_COPY) {
+      continue;
+    }
+    copy_blocks_.AddExtent(cow_op.dst_extent());
+  }
+}
 
 bool VABCPartitionWriter::Init(const InstallPlan* install_plan,
                                bool source_may_exist,
@@ -144,23 +147,33 @@
       TEST_AND_RETURN_FALSE(WriteMergeSequence(
           partition_update_.merge_operations(), cow_writer_.get()));
     }
-  }
+    const bool userSnapshots = android::base::GetBoolProperty(
+        "ro.virtual_ab.userspace.snapshots.enabled", false);
 
-  // TODO(zhangkelvin) Rewrite this in C++20 coroutine once that's available.
-  // TODO(177104308) Don't write all COPY ops up-front if merge sequence is
-  // written
-  const auto converted = ConvertToCowOperations(
-      partition_update_.operations(), partition_update_.merge_operations());
-
-  if (!converted.empty()) {
-    // Use source fd directly. Ideally we want to verify all extents used in
-    // source copy, but then what do we do if some extents contain correct
-    // hashes and some don't?
-    auto source_fd = std::make_shared<EintrSafeFileDescriptor>();
-    TEST_AND_RETURN_FALSE_ERRNO(
-        source_fd->Open(install_part_.source_path.c_str(), O_RDONLY));
-    TEST_AND_RETURN_FALSE(WriteSourceCopyCowOps(
-        block_size_, converted, cow_writer_.get(), source_fd));
+    for (const auto& cow_op : partition_update_.merge_operations()) {
+      if (cow_op.type() != CowMergeOperation::COW_COPY) {
+        continue;
+      }
+      if (cow_op.dst_extent() == cow_op.src_extent()) {
+        continue;
+      }
+      if (userSnapshots) {
+        TEST_AND_RETURN_FALSE(cow_op.src_extent().num_blocks() != 0);
+        TEST_AND_RETURN_FALSE(
+            cow_writer_->AddCopy(cow_op.dst_extent().start_block(),
+                                 cow_op.src_extent().start_block(),
+                                 cow_op.src_extent().num_blocks()));
+      } else {
+        // Add blocks in reverse order, because snapused specifically prefers
+        // this ordering. Since we already eliminated all self-overlapping
+        // SOURCE_COPY during delta generation, this should be safe to do.
+        for (size_t i = cow_op.src_extent().num_blocks(); i > 0; i--) {
+          TEST_AND_RETURN_FALSE(
+              cow_writer_->AddCopy(cow_op.dst_extent().start_block() + i - 1,
+                                   cow_op.src_extent().start_block() + i - 1));
+        }
+      }
+    }
     cow_writer_->AddLabel(0);
   }
   return true;
@@ -221,60 +234,6 @@
                                      blocks_merge_order.data());
 }
 
-bool VABCPartitionWriter::WriteSourceCopyCowOps(
-    size_t block_size,
-    const std::vector<CowOperation>& converted,
-    ICowWriter* cow_writer,
-    FileDescriptorPtr source_fd) {
-  for (const auto& cow_op : converted) {
-    std::vector<uint8_t> buffer;
-    switch (cow_op.op) {
-      case CowOperation::CowCopy: {
-        if (cow_op.src_block == cow_op.dst_block) {
-          continue;
-        }
-
-        const bool userSnapshots = android::base::GetBoolProperty(
-            "ro.virtual_ab.userspace.snapshots.enabled", false);
-
-        if (userSnapshots) {
-          TEST_AND_RETURN_FALSE(cow_op.block_count != 0);
-          TEST_AND_RETURN_FALSE(cow_writer->AddCopy(
-              cow_op.dst_block, cow_op.src_block, cow_op.block_count));
-        } else {
-          // Add blocks in reverse order, because snapused specifically prefers
-          // this ordering. Since we already eliminated all self-overlapping
-          // SOURCE_COPY during delta generation, this should be safe to do.
-          for (size_t i = cow_op.block_count; i > 0; i--) {
-            TEST_AND_RETURN_FALSE(cow_writer->AddCopy(
-                cow_op.dst_block + i - 1, cow_op.src_block + i - 1));
-          }
-        }
-        break;
-      }
-      case CowOperation::CowReplace: {
-        buffer.resize(block_size * cow_op.block_count);
-        ssize_t bytes_read = 0;
-        TEST_AND_RETURN_FALSE(utils::ReadAll(source_fd,
-                                             buffer.data(),
-                                             block_size * cow_op.block_count,
-                                             cow_op.src_block * block_size,
-                                             &bytes_read));
-        if (bytes_read <= 0 ||
-            static_cast<size_t>(bytes_read) != buffer.size()) {
-          LOG(ERROR) << "source_fd->Read failed: " << bytes_read;
-          return false;
-        }
-        TEST_AND_RETURN_FALSE(cow_writer->AddRawBlocks(
-            cow_op.dst_block, buffer.data(), buffer.size()));
-        break;
-      }
-    }
-  }
-
-  return true;
-}
-
 std::unique_ptr<ExtentWriter> VABCPartitionWriter::CreateBaseExtentWriter() {
   return std::make_unique<SnapshotExtentWriter>(cow_writer_.get());
 }
@@ -292,14 +251,44 @@
     const InstallOperation& operation, ErrorCode* error) {
   // COPY ops are already handled during Init(), no need to do actual work, but
   // we still want to verify that all blocks contain expected data.
-  auto source_fd = std::make_shared<EintrSafeFileDescriptor>();
-  TEST_AND_RETURN_FALSE_ERRNO(
-      source_fd->Open(install_part_.source_path.c_str(), O_RDONLY));
-  if (!operation.has_src_sha256_hash()) {
-    return true;
+  auto source_fd = verified_source_fd_.ChooseSourceFD(operation, error);
+  TEST_AND_RETURN_FALSE(source_fd != nullptr);
+  std::vector<CowOperation> converted;
+
+  const auto& src_extents = operation.src_extents();
+  const auto& dst_extents = operation.dst_extents();
+  BlockIterator it1{src_extents};
+  BlockIterator it2{dst_extents};
+  while (!it1.is_end() && !it2.is_end()) {
+    const auto src_block = *it1;
+    const auto dst_block = *it2;
+    ++it1;
+    ++it2;
+    if (src_block == dst_block) {
+      continue;
+    }
+    if (!copy_blocks_.ContainsBlock(dst_block)) {
+      push_back(&converted,
+                {CowOperation::CowReplace, src_block, dst_block, 1});
+    }
   }
-  return PartitionWriter::ValidateSourceHash(
-      operation, source_fd, block_size_, error);
+  std::vector<uint8_t> buffer;
+  for (const auto& cow_op : converted) {
+    buffer.resize(block_size_ * cow_op.block_count);
+    ssize_t bytes_read = 0;
+    TEST_AND_RETURN_FALSE(utils::ReadAll(source_fd,
+                                         buffer.data(),
+                                         block_size_ * cow_op.block_count,
+                                         cow_op.src_block * block_size_,
+                                         &bytes_read));
+    if (bytes_read <= 0 || static_cast<size_t>(bytes_read) != buffer.size()) {
+      LOG(ERROR) << "source_fd->Read failed: " << bytes_read;
+      return false;
+    }
+    TEST_AND_RETURN_FALSE(cow_writer_->AddRawBlocks(
+        cow_op.dst_block, buffer.data(), buffer.size()));
+  }
+  return true;
 }
 
 bool VABCPartitionWriter::PerformReplaceOperation(const InstallOperation& op,
@@ -354,6 +343,8 @@
 
 int VABCPartitionWriter::Close() {
   if (cow_writer_) {
+    LOG(INFO) << "Finalizing " << partition_update_.partition_name()
+              << " COW image";
     cow_writer_->Finalize();
     cow_writer_ = nullptr;
   }
diff --git a/payload_consumer/vabc_partition_writer.h b/payload_consumer/vabc_partition_writer.h
index 4df5151..8393fe5 100644
--- a/payload_consumer/vabc_partition_writer.h
+++ b/payload_consumer/vabc_partition_writer.h
@@ -24,11 +24,11 @@
 
 #include <libsnapshot/snapshot_writer.h>
 
-#include "update_engine/common/cow_operation_convert.h"
 #include "update_engine/payload_consumer/extent_map.h"
 #include "update_engine/payload_consumer/install_operation_executor.h"
 #include "update_engine/payload_consumer/install_plan.h"
-#include "update_engine/payload_consumer/partition_writer.h"
+#include "update_engine/payload_consumer/partition_writer_interface.h"
+#include "update_engine/payload_consumer/verified_source_fd.h"
 #include "update_engine/payload_generator/extent_ranges.h"
 
 namespace chromeos_update_engine {
@@ -62,12 +62,6 @@
 
   void CheckpointUpdateProgress(size_t next_op_index) override;
 
-  [[nodiscard]] static bool WriteSourceCopyCowOps(
-      size_t block_size,
-      const std::vector<CowOperation>& converted,
-      android::snapshot::ICowWriter* cow_writer,
-      FileDescriptorPtr source_fd);
-
   [[nodiscard]] bool FinishedInstallOps() override;
   int Close() override;
   // Send merge sequence data to cow writer
@@ -91,6 +85,7 @@
   InstallOperationExecutor executor_;
   VerifiedSourceFd verified_source_fd_;
   ExtentMap<const CowMergeOperation*, ExtentLess> xor_map_;
+  ExtentRanges copy_blocks_;
 };
 
 }  // namespace chromeos_update_engine
diff --git a/payload_consumer/vabc_partition_writer_unittest.cc b/payload_consumer/vabc_partition_writer_unittest.cc
index 49362ca..0bfa23b 100644
--- a/payload_consumer/vabc_partition_writer_unittest.cc
+++ b/payload_consumer/vabc_partition_writer_unittest.cc
@@ -157,8 +157,7 @@
             EXPECT_CALL(*cow_writer, EmitCopy(10, 5, 1)).InSequence(s);
             EXPECT_CALL(*cow_writer, EmitCopy(15, 10, 1)).InSequence(s);
             // libsnapshot want blocks in reverser order, so 21 goes before 20
-            EXPECT_CALL(*cow_writer, EmitCopy(21, 16, 1)).InSequence(s);
-            EXPECT_CALL(*cow_writer, EmitCopy(20, 15, 1)).InSequence(s);
+            EXPECT_CALL(*cow_writer, EmitCopy(20, 15, 2)).InSequence(s);
 
             EXPECT_CALL(*cow_writer, EmitCopy(25, 20, 1)).InSequence(s);
             return cow_writer;
diff --git a/payload_generator/extent_utils.h b/payload_generator/extent_utils.h
index bd9fc8b..17995b1 100644
--- a/payload_generator/extent_utils.h
+++ b/payload_generator/extent_utils.h
@@ -105,25 +105,25 @@
 // }
 struct BlockIterator {
   explicit BlockIterator(
-      const google::protobuf::RepeatedPtrField<Extent>& src_extents)
-      : src_extents_(src_extents) {}
+      const google::protobuf::RepeatedPtrField<Extent>& extents)
+      : extents_(extents) {}
 
   BlockIterator& operator++() {
-    CHECK_LT(cur_extent_, src_extents_.size());
+    CHECK_LT(cur_extent_, extents_.size());
     block_offset_++;
-    if (block_offset_ >= src_extents_[cur_extent_].num_blocks()) {
+    if (block_offset_ >= extents_[cur_extent_].num_blocks()) {
       cur_extent_++;
       block_offset_ = 0;
     }
     return *this;
   }
 
-  [[nodiscard]] bool is_end() { return cur_extent_ >= src_extents_.size(); }
+  [[nodiscard]] bool is_end() { return cur_extent_ >= extents_.size(); }
   [[nodiscard]] uint64_t operator*() {
-    return src_extents_[cur_extent_].start_block() + block_offset_;
+    return extents_[cur_extent_].start_block() + block_offset_;
   }
 
-  const google::protobuf::RepeatedPtrField<Extent>& src_extents_;
+  const google::protobuf::RepeatedPtrField<Extent>& extents_;
   int cur_extent_ = 0;
   size_t block_offset_ = 0;
 };