Handle resume of VABC updates by emitting labels

To support resuming an update with Virtual AB Compression, we emit
labels in between operations. After writing all SOURCE_COPY, we
emit label 0. Each time we finished writing an InstallOp, we emit
a label incremented by 1. When resuming, we pass the label to CowWriter.

Test: treehugger
     1. update_device.py ota.zip
     --extra-headers="SWITCH_SLOT_ON_REBOOT=0"
     2. update_device.py ota.zip
     3. Verify that 2 did not re-start the entire update,
        only fs verification and postinstall may re-run.
Change-Id: I785cd04a35457181621ed7b8c0be9a46b6004b7b
diff --git a/payload_consumer/delta_performer.cc b/payload_consumer/delta_performer.cc
index ada1372..c3a321e 100644
--- a/payload_consumer/delta_performer.cc
+++ b/payload_consumer/delta_performer.cc
@@ -48,6 +48,7 @@
 #include "update_engine/common/prefs_interface.h"
 #include "update_engine/common/subprocess.h"
 #include "update_engine/common/terminator.h"
+#include "update_engine/common/utils.h"
 #include "update_engine/payload_consumer/bzip_extent_writer.h"
 #include "update_engine/payload_consumer/cached_file_descriptor.h"
 #include "update_engine/payload_consumer/certificate_parser_interface.h"
@@ -197,12 +198,9 @@
   if (op_result)
     return true;
 
-  size_t partition_first_op_num =
-      current_partition_ ? acc_num_operations_[current_partition_ - 1] : 0;
   LOG(ERROR) << "Failed to perform " << op_type_name << " operation "
              << next_operation_num_ << ", which is the operation "
-             << next_operation_num_ - partition_first_op_num
-             << " in partition \""
+             << GetPartitionOperationNum() << " in partition \""
              << partitions_[current_partition_].partition_name() << "\"";
   if (*error == ErrorCode::kSuccess)
     *error = ErrorCode::kDownloadOperationExecutionError;
@@ -253,7 +251,17 @@
   // partial update.
   bool source_may_exist = manifest_.partial_update() ||
                           payload_->type == InstallPayloadType::kDelta;
-  return partition_writer_->Init(install_plan_, source_may_exist);
+  const size_t partition_operation_num = GetPartitionOperationNum();
+
+  TEST_AND_RETURN_FALSE(partition_writer_->Init(
+      install_plan_, source_may_exist, partition_operation_num));
+  CheckpointUpdateProgress(true);
+  return true;
+}
+
+size_t DeltaPerformer::GetPartitionOperationNum() {
+  return next_operation_num_ -
+         (current_partition_ ? acc_num_operations_[current_partition_ - 1] : 0);
 }
 
 namespace {
@@ -502,12 +510,9 @@
         return false;
       }
     }
-    const size_t partition_operation_num =
-        next_operation_num_ -
-        (current_partition_ ? acc_num_operations_[current_partition_ - 1] : 0);
 
     const InstallOperation& op =
-        partitions_[current_partition_].operations(partition_operation_num);
+        partitions_[current_partition_].operations(GetPartitionOperationNum());
 
     CopyDataToBuffer(&c_bytes, &count, op.data_length());
 
@@ -1438,6 +1443,7 @@
       TEST_AND_RETURN_FALSE(
           prefs_->SetInt64(kPrefsUpdateStateNextDataLength, 0));
     }
+    partition_writer_->CheckpointUpdateProgress(GetPartitionOperationNum());
   }
   TEST_AND_RETURN_FALSE(
       prefs_->SetInt64(kPrefsUpdateStateNextOperation, next_operation_num_));
diff --git a/payload_consumer/delta_performer.h b/payload_consumer/delta_performer.h
index 4990bf8..7df2bd3 100644
--- a/payload_consumer/delta_performer.h
+++ b/payload_consumer/delta_performer.h
@@ -203,6 +203,12 @@
   FRIEND_TEST(DeltaPerformerTest, BrilloParsePayloadMetadataTest);
   FRIEND_TEST(DeltaPerformerTest, UsePublicKeyFromResponse);
 
+  // Obtain the operation index for current partition. If all operations for
+  // current partition is are finished, return # of operations. This is mostly
+  // intended to be used by CheckpointUpdateProgress, where partition writer
+  // needs to know the current operation number to properly checkpoint update.
+  size_t GetPartitionOperationNum();
+
   // Parse and move the update instructions of all partitions into our local
   // |partitions_| variable based on the version of the payload. Requires the
   // manifest to be parsed and valid.
diff --git a/payload_consumer/file_descriptor.cc b/payload_consumer/file_descriptor.cc
index 5b27c38..7c69c1b 100644
--- a/payload_consumer/file_descriptor.cc
+++ b/payload_consumer/file_descriptor.cc
@@ -134,6 +134,7 @@
   CHECK_GE(fd_, 0);
   // Implemented as a No-Op, as delta_performer typically uses |O_DSYNC|, except
   // in interactive settings.
+  fsync(fd_);
   return true;
 }
 
diff --git a/payload_consumer/partition_writer.cc b/payload_consumer/partition_writer.cc
index bec2594..6f06dd2 100644
--- a/payload_consumer/partition_writer.cc
+++ b/payload_consumer/partition_writer.cc
@@ -274,7 +274,8 @@
 }
 
 bool PartitionWriter::Init(const InstallPlan* install_plan,
-                           bool source_may_exist) {
+                           bool source_may_exist,
+                           size_t next_op_index) {
   const PartitionUpdate& partition = partition_update_;
   uint32_t source_slot = install_plan->source_slot;
   uint32_t target_slot = install_plan->target_slot;
@@ -329,7 +330,7 @@
       writer->Init(target_fd_, operation.dst_extents(), block_size_));
   TEST_AND_RETURN_FALSE(writer->Write(data, operation.data_length()));
 
-  return Flush();
+  return true;
 }
 
 bool PartitionWriter::PerformZeroOrDiscardOperation(
@@ -362,7 +363,7 @@
           target_fd_, zeros.data(), chunk_length, start + offset));
     }
   }
-  return Flush();
+  return true;
 }
 
 bool PartitionWriter::PerformSourceCopyOperation(
@@ -473,7 +474,7 @@
                                                        block_size_,
                                                        nullptr));
   }
-  return Flush();
+  return true;
 }
 
 bool PartitionWriter::PerformSourceBsdiffOperation(
@@ -502,7 +503,7 @@
                                         std::move(dst_file),
                                         reinterpret_cast<const uint8_t*>(data),
                                         count) == 0);
-  return Flush();
+  return true;
 }
 
 bool PartitionWriter::PerformPuffDiffOperation(
@@ -534,7 +535,7 @@
                         reinterpret_cast<const uint8_t*>(data),
                         count,
                         kMaxCacheSize));
-  return Flush();
+  return true;
 }
 
 FileDescriptorPtr PartitionWriter::ChooseSourceFD(
@@ -652,12 +653,12 @@
   return -err;
 }
 
-std::unique_ptr<ExtentWriter> PartitionWriter::CreateBaseExtentWriter() {
-  return std::make_unique<DirectExtentWriter>();
+void PartitionWriter::CheckpointUpdateProgress(size_t next_op_index) {
+  target_fd_->Flush();
 }
 
-bool PartitionWriter::Flush() {
-  return target_fd_->Flush();
+std::unique_ptr<ExtentWriter> PartitionWriter::CreateBaseExtentWriter() {
+  return std::make_unique<DirectExtentWriter>();
 }
 
 }  // namespace chromeos_update_engine
diff --git a/payload_consumer/partition_writer.h b/payload_consumer/partition_writer.h
index 79ba665..4b420d2 100644
--- a/payload_consumer/partition_writer.h
+++ b/payload_consumer/partition_writer.h
@@ -46,7 +46,15 @@
   // Perform necessary initialization work before InstallOperation can be
   // applied to this partition
   [[nodiscard]] virtual bool Init(const InstallPlan* install_plan,
-                                  bool source_may_exist);
+                                  bool source_may_exist,
+                                  size_t next_op_index);
+
+  // |CheckpointUpdateProgress| will be called after SetNextOpIndex(), but it's
+  // optional. DeltaPerformer may or may not call this everytime an operation is
+  // applied.
+  //   |next_op_index| is index of next operation that should be applied.
+  // |next_op_index-1| is the last operation that is already applied.
+  virtual void CheckpointUpdateProgress(size_t next_op_index);
 
   // Close partition writer, when calling this function there's no guarantee
   // that all |InstallOperations| are sent to |PartitionWriter|. This function
@@ -73,7 +81,6 @@
       ErrorCode* error,
       const void* data,
       size_t count);
-  [[nodiscard]] virtual bool Flush();
 
   // |DeltaPerformer| calls this when all Install Ops are sent to partition
   // writer. No |Perform*Operation| methods will be called in the future, and
diff --git a/payload_consumer/partition_writer_unittest.cc b/payload_consumer/partition_writer_unittest.cc
index 1ef4783..91e5e26 100644
--- a/payload_consumer/partition_writer_unittest.cc
+++ b/payload_consumer/partition_writer_unittest.cc
@@ -96,8 +96,9 @@
     install_part_.target_size = blob_data.size();
 
     ErrorCode error;
-    EXPECT_TRUE(writer_.Init(&install_plan_, true));
+    EXPECT_TRUE(writer_.Init(&install_plan_, true, 0));
     EXPECT_TRUE(writer_.PerformSourceCopyOperation(op, &error));
+    writer_.CheckpointUpdateProgress(1);
 
     brillo::Blob output_data;
     EXPECT_TRUE(utils::ReadFile(target_partition.path(), &output_data));
diff --git a/payload_consumer/vabc_partition_writer.cc b/payload_consumer/vabc_partition_writer.cc
index a869509..5cb7989 100644
--- a/payload_consumer/vabc_partition_writer.cc
+++ b/payload_consumer/vabc_partition_writer.cc
@@ -31,8 +31,33 @@
 #include "update_engine/payload_consumer/snapshot_extent_writer.h"
 
 namespace chromeos_update_engine {
+// Expected layout of COW file:
+// === Beginning of Cow Image ===
+// All Source Copy Operations
+// ========== Label 0 ==========
+// Operation 0 in PartitionUpdate
+// ========== Label 1 ==========
+// Operation 1 in PartitionUpdate
+// ========== label 2 ==========
+// Operation 2 in PartitionUpdate
+// ========== label 3 ==========
+// .
+// .
+// .
+
+// When resuming, pass |next_op_index_| as label to
+// |InitializeWithAppend|.
+// For example, suppose we finished writing SOURCE_COPY, and we finished writing
+// operation 2 completely. Update is suspended when we are half way through
+// operation 3.
+// |cnext_op_index_| would be 3, so we pass 3 as
+// label to |InitializeWithAppend|. The CowWriter will retain all data before
+// label 3, Which contains all operation 2's data, but none of operation 3's
+// data.
+
 bool VABCPartitionWriter::Init(const InstallPlan* install_plan,
-                               bool source_may_exist) {
+                               bool source_may_exist,
+                               size_t next_op_index) {
   TEST_AND_RETURN_FALSE(install_plan != nullptr);
   TEST_AND_RETURN_FALSE(
       OpenSourcePartition(install_plan->source_slot, source_may_exist));
@@ -45,11 +70,19 @@
       install_part_.name, source_path, install_plan->is_resume);
   TEST_AND_RETURN_FALSE(cow_writer_ != nullptr);
 
-  // TODO(zhangkelvin) Emit a label before writing SOURCE_COPY. When resuming,
-  // use pref or CowWriter::GetLastLabel to determine if the SOURCE_COPY ops are
-  // written. No need to handle SOURCE_COPY operations when resuming.
-
   // ===== Resume case handling code goes here ====
+  // It is possible that the SOURCE_COPY are already written but
+  // |next_op_index_| is still 0. In this case we discard previously written
+  // SOURCE_COPY, and start over.
+  if (install_plan->is_resume && next_op_index > 0) {
+    LOG(INFO) << "Resuming update on partition `"
+              << partition_update_.partition_name() << "` op index "
+              << next_op_index;
+    TEST_AND_RETURN_FALSE(cow_writer_->InitializeAppend(next_op_index));
+    return true;
+  } else {
+    TEST_AND_RETURN_FALSE(cow_writer_->Initialize());
+  }
 
   // ==============================================
 
@@ -90,6 +123,7 @@
         break;
     }
   }
+
   return true;
 }
 
@@ -113,9 +147,10 @@
   return true;
 }
 
-bool VABCPartitionWriter::Flush() {
-  // No need to do anything, as CowWriter automatically flushes every OP added.
-  return true;
+void VABCPartitionWriter::CheckpointUpdateProgress(size_t next_op_index) {
+  // No need to call fsync/sync, as CowWriter flushes after a label is added
+  // added.
+  cow_writer_->AddLabel(next_op_index);
 }
 
 [[nodiscard]] bool VABCPartitionWriter::FinishedInstallOps() {
diff --git a/payload_consumer/vabc_partition_writer.h b/payload_consumer/vabc_partition_writer.h
index 5eab23f..7fb2a2c 100644
--- a/payload_consumer/vabc_partition_writer.h
+++ b/payload_consumer/vabc_partition_writer.h
@@ -31,7 +31,8 @@
  public:
   using PartitionWriter::PartitionWriter;
   [[nodiscard]] bool Init(const InstallPlan* install_plan,
-                          bool source_may_exist) override;
+                          bool source_may_exist,
+                          size_t next_op_index) override;
   ~VABCPartitionWriter() override;
 
   [[nodiscard]] std::unique_ptr<ExtentWriter> CreateBaseExtentWriter() override;
@@ -43,7 +44,8 @@
       const InstallOperation& operation) override;
   [[nodiscard]] bool PerformSourceCopyOperation(
       const InstallOperation& operation, ErrorCode* error) override;
-  [[nodiscard]] bool Flush() override;
+
+  void CheckpointUpdateProgress(size_t next_op_index) override;
 
   static bool WriteAllCowOps(size_t block_size,
                              const std::vector<CowOperation>& converted,