Merge remote-tracking branch 'remotes/aosp/upstream-master' into merge-cros

Merge back the recent update_engine changes back to Android.

Created by:
$ git merge remotes/aosp/upstream-master --commit -s recursive

No special conflict to resolve.

Bug: 163153182
Test: None
Change-Id: I4c65eb9c57448847857e2339935a5d47c8cb690a
diff --git a/Android.bp b/Android.bp
index a8b5fc2..178b7da 100644
--- a/Android.bp
+++ b/Android.bp
@@ -82,6 +82,28 @@
     },
 }
 
+// libcow_operation_convert (type: library)
+// ========================================================
+cc_library {
+    name: "libcow_operation_convert",
+    host_supported: true,
+    recovery_available: true,
+    defaults: [
+        "ue_defaults",
+        "update_metadata-protos_exports",
+    ],
+    srcs: [
+        "common/cow_operation_convert.cc",
+    ],
+    static_libs: [
+        "libsnapshot_cow",
+        "update_metadata-protos",
+        "libpayload_extent_ranges",
+        "libbrotli",
+        "libz",
+    ],
+}
+
 // update_metadata-protos (type: static_library)
 // ========================================================
 // Protobufs.
@@ -124,6 +146,11 @@
         "libfec_rs",
         "libpuffpatch",
         "libverity_tree",
+        "libsnapshot_cow",
+        "libbrotli",
+        "libz",
+        "libpayload_extent_ranges",
+        "libcow_operation_convert",
     ],
     shared_libs: [
         "libbase",
@@ -178,6 +205,10 @@
         "payload_consumer/payload_constants.cc",
         "payload_consumer/payload_metadata.cc",
         "payload_consumer/payload_verifier.cc",
+        "payload_consumer/partition_writer.cc",
+        "payload_consumer/partition_writer_factory_android.cc",
+        "payload_consumer/vabc_partition_writer.cc",
+        "payload_consumer/snapshot_extent_writer.cc",
         "payload_consumer/postinstall_runner_action.cc",
         "payload_consumer/verity_writer_android.cc",
         "payload_consumer/xz_extent_writer.cc",
@@ -199,6 +230,8 @@
         "libgsi",
         "libpayload_consumer",
         "libsnapshot",
+        "libsnapshot_cow",
+        "libz",
         "update_metadata-protos",
     ],
     shared_libs: [
@@ -260,7 +293,7 @@
     ],
 
     static_libs: [
-        "libkver",
+        "gkiprops",
         "libpayload_consumer",
         "libupdate_engine_boot_control",
     ],
@@ -383,6 +416,7 @@
         // We add the static versions of the shared libraries that are not installed to
         // recovery image due to size concerns. Need to include all the static library
         // dependencies of these static libraries.
+        "gkiprops",
         "libevent",
         "libmodpb64",
         "libgtest_prod",
@@ -390,7 +424,6 @@
         "libbrillo-stream",
         "libbrillo",
         "libchrome",
-        "libkver",
     ],
     target: {
         recovery: {
@@ -476,6 +509,7 @@
         "ue_defaults",
     ],
     host_supported: true,
+    recovery_available: true,
     srcs: [
         "payload_generator/extent_ranges.cc",
     ],
@@ -669,6 +703,7 @@
         "common/action_pipe_unittest.cc",
         "common/action_processor_unittest.cc",
         "common/action_unittest.cc",
+        "common/cow_operation_convert_unittest.cc",
         "common/cpu_limiter_unittest.cc",
         "common/fake_prefs.cc",
         "common/file_fetcher_unittest.cc",
@@ -684,12 +719,12 @@
         "common/utils_unittest.cc",
         "dynamic_partition_control_android_unittest.cc",
         "libcurl_http_fetcher_unittest.cc",
-        "hardware_android_unittest.cc",
         "payload_consumer/bzip_extent_writer_unittest.cc",
         "payload_consumer/cached_file_descriptor_unittest.cc",
         "payload_consumer/certificate_parser_android_unittest.cc",
         "payload_consumer/delta_performer_integration_test.cc",
         "payload_consumer/delta_performer_unittest.cc",
+        "payload_consumer/partition_writer_unittest.cc",
         "payload_consumer/download_action_android_unittest.cc",
         "payload_consumer/extent_reader_unittest.cc",
         "payload_consumer/extent_writer_unittest.cc",
@@ -752,9 +787,25 @@
 // update_engine header library
 cc_library_headers {
     name: "libupdate_engine_headers",
+
+    // This header library is available to core and product modules.
+    // Right now, vendor_available is the only way to specify this.
+    // vendor modules should NOT use this library.
+    // TODO(b/150902910): change this to product_available.
+    vendor_available: true,
+
     export_include_dirs: ["."],
     apex_available: [
         "com.android.gki.*",
+        "//apex_available:platform",
     ],
     host_supported: true,
+    recovery_available: true,
+    ramdisk_available: true,
+
+    target: {
+        darwin: {
+            enabled: false,
+        },
+    }
 }
diff --git a/BUILD.gn b/BUILD.gn
index 8f06513..aec447c 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -163,6 +163,8 @@
     "payload_consumer/install_plan.cc",
     "payload_consumer/mount_history.cc",
     "payload_consumer/partition_update_generator_stub.cc",
+    "payload_consumer/partition_writer_factory_chromeos.cc",
+    "payload_consumer/partition_writer.cc",
     "payload_consumer/payload_constants.cc",
     "payload_consumer/payload_metadata.cc",
     "payload_consumer/payload_verifier.cc",
diff --git a/cleanup_previous_update_action.cc b/cleanup_previous_update_action.cc
index 1a2476f..89ed6f8 100644
--- a/cleanup_previous_update_action.cc
+++ b/cleanup_previous_update_action.cc
@@ -67,30 +67,28 @@
       last_percentage_(0),
       merge_stats_(nullptr) {}
 
+CleanupPreviousUpdateAction::~CleanupPreviousUpdateAction() {
+  StopActionInternal();
+}
+
 void CleanupPreviousUpdateAction::PerformAction() {
-  ResumeAction();
+  StartActionInternal();
 }
 
 void CleanupPreviousUpdateAction::TerminateProcessing() {
-  SuspendAction();
+  StopActionInternal();
 }
 
 void CleanupPreviousUpdateAction::ResumeAction() {
-  CHECK(prefs_);
-  CHECK(boot_control_);
-
-  LOG(INFO) << "Starting/resuming CleanupPreviousUpdateAction";
-  running_ = true;
   StartActionInternal();
 }
 
 void CleanupPreviousUpdateAction::SuspendAction() {
-  LOG(INFO) << "Stopping/suspending CleanupPreviousUpdateAction";
-  running_ = false;
+  StopActionInternal();
 }
 
 void CleanupPreviousUpdateAction::ActionCompleted(ErrorCode error_code) {
-  running_ = false;
+  StopActionInternal();
   ReportMergeStats();
   metadata_device_ = nullptr;
 }
@@ -103,7 +101,52 @@
   return "CleanupPreviousUpdateAction";
 }
 
+// This function is called at the beginning of all delayed functions. By
+// resetting |scheduled_task_|, the delayed function acknowledges that the task
+// has already been executed, therefore there's no need to cancel it in the
+// future. This avoids StopActionInternal() from resetting task IDs in an
+// unexpected way because task IDs could be reused.
+void CleanupPreviousUpdateAction::AcknowledgeTaskExecuted() {
+  if (scheduled_task_ != MessageLoop::kTaskIdNull) {
+    LOG(INFO) << "Executing task " << scheduled_task_;
+  }
+  scheduled_task_ = MessageLoop::kTaskIdNull;
+}
+
+// Check that scheduled_task_ is a valid task ID. Otherwise, terminate the
+// action.
+void CleanupPreviousUpdateAction::CheckTaskScheduled(std::string_view name) {
+  if (scheduled_task_ == MessageLoop::kTaskIdNull) {
+    LOG(ERROR) << "Unable to schedule " << name;
+    processor_->ActionComplete(this, ErrorCode::kError);
+  } else {
+    LOG(INFO) << "CleanupPreviousUpdateAction scheduled task ID "
+              << scheduled_task_ << " for " << name;
+  }
+}
+
+void CleanupPreviousUpdateAction::StopActionInternal() {
+  LOG(INFO) << "Stopping/suspending/completing CleanupPreviousUpdateAction";
+  running_ = false;
+
+  if (scheduled_task_ != MessageLoop::kTaskIdNull) {
+    if (MessageLoop::current()->CancelTask(scheduled_task_)) {
+      LOG(INFO) << "CleanupPreviousUpdateAction cancelled pending task ID "
+                << scheduled_task_;
+    } else {
+      LOG(ERROR) << "CleanupPreviousUpdateAction unable to cancel task ID "
+                 << scheduled_task_;
+    }
+  }
+  scheduled_task_ = MessageLoop::kTaskIdNull;
+}
+
 void CleanupPreviousUpdateAction::StartActionInternal() {
+  CHECK(prefs_);
+  CHECK(boot_control_);
+
+  LOG(INFO) << "Starting/resuming CleanupPreviousUpdateAction";
+  running_ = true;
   // Do nothing on non-VAB device.
   if (!boot_control_->GetDynamicPartitionControl()
            ->GetVirtualAbFeatureFlag()
@@ -120,14 +163,16 @@
 
 void CleanupPreviousUpdateAction::ScheduleWaitBootCompleted() {
   TEST_AND_RETURN(running_);
-  MessageLoop::current()->PostDelayedTask(
+  scheduled_task_ = MessageLoop::current()->PostDelayedTask(
       FROM_HERE,
       base::Bind(&CleanupPreviousUpdateAction::WaitBootCompletedOrSchedule,
                  base::Unretained(this)),
       kCheckBootCompletedInterval);
+  CheckTaskScheduled("WaitBootCompleted");
 }
 
 void CleanupPreviousUpdateAction::WaitBootCompletedOrSchedule() {
+  AcknowledgeTaskExecuted();
   TEST_AND_RETURN(running_);
   if (!kIsRecovery &&
       !android::base::GetBoolProperty(kBootCompletedProp, false)) {
@@ -142,15 +187,17 @@
 
 void CleanupPreviousUpdateAction::ScheduleWaitMarkBootSuccessful() {
   TEST_AND_RETURN(running_);
-  MessageLoop::current()->PostDelayedTask(
+  scheduled_task_ = MessageLoop::current()->PostDelayedTask(
       FROM_HERE,
       base::Bind(
           &CleanupPreviousUpdateAction::CheckSlotMarkedSuccessfulOrSchedule,
           base::Unretained(this)),
       kCheckSlotMarkedSuccessfulInterval);
+  CheckTaskScheduled("WaitMarkBootSuccessful");
 }
 
 void CleanupPreviousUpdateAction::CheckSlotMarkedSuccessfulOrSchedule() {
+  AcknowledgeTaskExecuted();
   TEST_AND_RETURN(running_);
   if (!kIsRecovery &&
       !boot_control_->IsSlotMarkedSuccessful(boot_control_->GetCurrentSlot())) {
@@ -212,14 +259,16 @@
 
 void CleanupPreviousUpdateAction::ScheduleWaitForMerge() {
   TEST_AND_RETURN(running_);
-  MessageLoop::current()->PostDelayedTask(
+  scheduled_task_ = MessageLoop::current()->PostDelayedTask(
       FROM_HERE,
       base::Bind(&CleanupPreviousUpdateAction::WaitForMergeOrSchedule,
                  base::Unretained(this)),
       kWaitForMergeInterval);
+  CheckTaskScheduled("WaitForMerge");
 }
 
 void CleanupPreviousUpdateAction::WaitForMergeOrSchedule() {
+  AcknowledgeTaskExecuted();
   TEST_AND_RETURN(running_);
   auto state = snapshot_->ProcessUpdateState(
       std::bind(&CleanupPreviousUpdateAction::OnMergePercentageUpdate, this),
diff --git a/cleanup_previous_update_action.h b/cleanup_previous_update_action.h
index 6f6ce07..fe65e60 100644
--- a/cleanup_previous_update_action.h
+++ b/cleanup_previous_update_action.h
@@ -20,6 +20,7 @@
 #include <chrono>  // NOLINT(build/c++11) -- for merge times
 #include <memory>
 #include <string>
+#include <string_view>
 
 #include <brillo/message_loops/message_loop.h>
 #include <libsnapshot/snapshot.h>
@@ -51,6 +52,7 @@
       BootControlInterface* boot_control,
       android::snapshot::ISnapshotManager* snapshot,
       CleanupPreviousUpdateActionDelegateInterface* delegate);
+  ~CleanupPreviousUpdateAction();
 
   void PerformAction() override;
   void SuspendAction() override;
@@ -74,7 +76,13 @@
   bool cancel_failed_{false};
   unsigned int last_percentage_{0};
   android::snapshot::ISnapshotMergeStats* merge_stats_;
+  brillo::MessageLoop::TaskId scheduled_task_{brillo::MessageLoop::kTaskIdNull};
 
+  // Helpers for task management.
+  void AcknowledgeTaskExecuted();
+  void CheckTaskScheduled(std::string_view name);
+
+  void StopActionInternal();
   void StartActionInternal();
   void ScheduleWaitBootCompleted();
   void WaitBootCompletedOrSchedule();
diff --git a/common/cow_operation_convert.cc b/common/cow_operation_convert.cc
new file mode 100644
index 0000000..db17b5f
--- /dev/null
+++ b/common/cow_operation_convert.cc
@@ -0,0 +1,73 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/common/cow_operation_convert.h"
+
+#include <base/logging.h>
+
+#include "update_engine/payload_generator/extent_ranges.h"
+#include "update_engine/payload_generator/extent_utils.h"
+
+namespace chromeos_update_engine {
+
+std::vector<CowOperation> ConvertToCowOperations(
+    const ::google::protobuf::RepeatedPtrField<
+        ::chromeos_update_engine::InstallOperation>& operations,
+    const ::google::protobuf::RepeatedPtrField<CowMergeOperation>&
+        merge_operations) {
+  ExtentRanges merge_extents;
+  std::vector<CowOperation> converted;
+
+  // We want all CowCopy ops to be done first, before any COW_REPLACE happen.
+  // Therefore we add these ops in 2 separate loops. This is because during
+  // merge, a CowReplace might modify a block needed by CowCopy, so we always
+  // perform CowCopy first.
+
+  // This loop handles CowCopy blocks within SOURCE_COPY, and the next loop
+  // converts the leftover blocks to CowReplace?
+  for (const auto& merge_op : merge_operations) {
+    merge_extents.AddExtent(merge_op.dst_extent());
+    const auto& src_extent = merge_op.src_extent();
+    const auto& dst_extent = merge_op.dst_extent();
+    for (uint64_t i = 0; i < src_extent.num_blocks(); i++) {
+      converted.push_back({CowOperation::CowCopy,
+                           src_extent.start_block() + i,
+                           dst_extent.start_block() + i});
+    }
+  }
+  // COW_REPLACE are added after COW_COPY, because replace might modify blocks
+  // needed by COW_COPY. Please don't merge this loop with the previous one.
+  for (const auto& operation : operations) {
+    if (operation.type() != InstallOperation::SOURCE_COPY) {
+      continue;
+    }
+    const auto& src_extents = operation.src_extents();
+    const auto& dst_extents = operation.dst_extents();
+    BlockIterator it1{src_extents};
+    BlockIterator it2{dst_extents};
+    while (!it1.is_end() && !it2.is_end()) {
+      auto src_block = *it1;
+      auto dst_block = *it2;
+      if (!merge_extents.ContainsBlock(dst_block)) {
+        converted.push_back({CowOperation::CowReplace, src_block, dst_block});
+      }
+      ++it1;
+      ++it2;
+    }
+  }
+  return converted;
+}
+}  // namespace chromeos_update_engine
diff --git a/common/cow_operation_convert.h b/common/cow_operation_convert.h
new file mode 100644
index 0000000..c0543f7
--- /dev/null
+++ b/common/cow_operation_convert.h
@@ -0,0 +1,56 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef __COW_OPERATION_CONVERT_H
+#define __COW_OPERATION_CONVERT_H
+
+#include <vector>
+
+#include <libsnapshot/cow_format.h>
+
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+struct CowOperation {
+  enum Type {
+    CowCopy = android::snapshot::kCowCopyOp,
+    CowReplace = android::snapshot::kCowReplaceOp,
+  };
+  Type op;
+  uint64_t src_block;
+  uint64_t dst_block;
+};
+
+// Convert SOURCE_COPY operations in `operations` list to a list of
+// CowOperations according to the merge sequence. This function only converts
+// SOURCE_COPY, other operations are ignored. If there's a merge conflict in
+// SOURCE_COPY operations, some blocks may be converted to COW_REPLACE instead
+// of COW_COPY.
+
+// The list returned does not necessarily preserve the order of
+// SOURCE_COPY in `operations`. The only guarantee about ordering in the
+// returned list is that if operations are applied in such order, there would be
+// no merge conflicts.
+
+// This funnction is intended to be used by delta_performer to perform
+// SOURCE_COPY operations on Virtual AB Compression devices.
+std::vector<CowOperation> ConvertToCowOperations(
+    const ::google::protobuf::RepeatedPtrField<
+        ::chromeos_update_engine::InstallOperation>& operations,
+    const ::google::protobuf::RepeatedPtrField<CowMergeOperation>&
+        merge_operations);
+}  // namespace chromeos_update_engine
+#endif
diff --git a/common/cow_operation_convert_unittest.cc b/common/cow_operation_convert_unittest.cc
new file mode 100644
index 0000000..b70dcdf
--- /dev/null
+++ b/common/cow_operation_convert_unittest.cc
@@ -0,0 +1,220 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <algorithm>
+#include <array>
+#include <initializer_list>
+
+#include <gtest/gtest.h>
+
+#include "update_engine/common/cow_operation_convert.h"
+#include "update_engine/payload_generator/extent_ranges.h"
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+using OperationList = ::google::protobuf::RepeatedPtrField<
+    ::chromeos_update_engine::InstallOperation>;
+using MergeOplist = ::google::protobuf::RepeatedPtrField<
+    ::chromeos_update_engine::CowMergeOperation>;
+
+std::ostream& operator<<(std::ostream& out, CowOperation::Type op) {
+  switch (op) {
+    case CowOperation::Type::CowCopy:
+      out << "CowCopy";
+      break;
+    case CowOperation::Type::CowReplace:
+      out << "CowReplace";
+      break;
+    default:
+      out << op;
+      break;
+  }
+  return out;
+}
+
+std::ostream& operator<<(std::ostream& out, const CowOperation& c) {
+  out << "{" << c.op << ", " << c.src_block << ", " << c.dst_block << "}";
+  return out;
+}
+
+class CowOperationConvertTest : public testing::Test {
+ public:
+  void VerifyCowMergeOp(const std::vector<CowOperation>& cow_ops) {
+    // Build a set of all extents covered by InstallOps.
+    ExtentRanges src_extent_set;
+    ExtentRanges dst_extent_set;
+    for (auto&& op : operations_) {
+      src_extent_set.AddRepeatedExtents(op.src_extents());
+      dst_extent_set.AddRepeatedExtents(op.dst_extents());
+    }
+    ExtentRanges modified_extents;
+    for (auto&& cow_op : cow_ops) {
+      if (cow_op.op == CowOperation::CowCopy) {
+        EXPECT_TRUE(src_extent_set.ContainsBlock(cow_op.src_block));
+        // converted operations should be conflict free.
+        EXPECT_FALSE(modified_extents.ContainsBlock(cow_op.src_block))
+            << "SOURCE_COPY operation " << cow_op
+            << " read from a modified block";
+        src_extent_set.SubtractExtent(ExtentForRange(cow_op.src_block, 1));
+      }
+      EXPECT_TRUE(dst_extent_set.ContainsBlock(cow_op.dst_block));
+      dst_extent_set.SubtractExtent(ExtentForRange(cow_op.dst_block, 1));
+      modified_extents.AddBlock(cow_op.dst_block);
+    }
+    // The generated CowOps should cover all extents in InstallOps.
+    EXPECT_EQ(dst_extent_set.blocks(), 0UL);
+    // It's possible that src_extent_set is non-empty, because some operations
+    // will be converted to CowReplace, and we don't count the source extent for
+    // those.
+  }
+  OperationList operations_;
+  MergeOplist merge_operations_;
+};
+
+void AddOperation(OperationList* operations,
+                  ::chromeos_update_engine::InstallOperation_Type op_type,
+                  std::initializer_list<std::array<int, 2>> src_extents,
+                  std::initializer_list<std::array<int, 2>> dst_extents) {
+  auto&& op = operations->Add();
+  op->set_type(op_type);
+  for (const auto& extent : src_extents) {
+    *op->add_src_extents() = ExtentForRange(extent[0], extent[1]);
+  }
+  for (const auto& extent : dst_extents) {
+    *op->add_dst_extents() = ExtentForRange(extent[0], extent[1]);
+  }
+}
+
+void AddMergeOperation(MergeOplist* operations,
+                       ::chromeos_update_engine::CowMergeOperation_Type op_type,
+                       std::array<int, 2> src_extent,
+                       std::array<int, 2> dst_extent) {
+  auto&& op = operations->Add();
+  op->set_type(op_type);
+  *op->mutable_src_extent() = ExtentForRange(src_extent[0], src_extent[1]);
+  *op->mutable_dst_extent() = ExtentForRange(dst_extent[0], dst_extent[1]);
+}
+
+TEST_F(CowOperationConvertTest, NoConflict) {
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{20, 1}}, {{30, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{10, 1}}, {{20, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{0, 1}}, {{10, 1}});
+
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {20, 1}, {30, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {10, 1}, {20, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {0, 1}, {10, 1});
+
+  auto cow_ops = ConvertToCowOperations(operations_, merge_operations_);
+  ASSERT_EQ(cow_ops.size(), 3UL);
+  ASSERT_TRUE(std::all_of(cow_ops.begin(), cow_ops.end(), [](auto&& cow_op) {
+    return cow_op.op == CowOperation::CowCopy;
+  }));
+  VerifyCowMergeOp(cow_ops);
+}
+
+TEST_F(CowOperationConvertTest, CowReplace) {
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{30, 1}}, {{0, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{20, 1}}, {{30, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{10, 1}}, {{20, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{0, 1}}, {{10, 1}});
+
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {20, 1}, {30, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {10, 1}, {20, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {0, 1}, {10, 1});
+
+  auto cow_ops = ConvertToCowOperations(operations_, merge_operations_);
+  ASSERT_EQ(cow_ops.size(), 4UL);
+  // Expect 3 COW_COPY and 1 COW_REPLACE
+  ASSERT_EQ(std::count_if(cow_ops.begin(),
+                          cow_ops.end(),
+                          [](auto&& cow_op) {
+                            return cow_op.op == CowOperation::CowCopy;
+                          }),
+            3);
+  ASSERT_EQ(std::count_if(cow_ops.begin(),
+                          cow_ops.end(),
+                          [](auto&& cow_op) {
+                            return cow_op.op == CowOperation::CowReplace;
+                          }),
+            1);
+  VerifyCowMergeOp(cow_ops);
+}
+
+TEST_F(CowOperationConvertTest, ReOrderSourceCopy) {
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{30, 1}}, {{20, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{20, 1}}, {{10, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{10, 1}}, {{0, 1}});
+
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {10, 1}, {0, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {20, 1}, {10, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {30, 1}, {20, 1});
+
+  auto cow_ops = ConvertToCowOperations(operations_, merge_operations_);
+  ASSERT_EQ(cow_ops.size(), 3UL);
+  // Expect 3 COW_COPY
+  ASSERT_TRUE(std::all_of(cow_ops.begin(), cow_ops.end(), [](auto&& cow_op) {
+    return cow_op.op == CowOperation::CowCopy;
+  }));
+  VerifyCowMergeOp(cow_ops);
+}
+
+TEST_F(CowOperationConvertTest, InterleavingSrcExtent) {
+  AddOperation(&operations_,
+               InstallOperation::SOURCE_COPY,
+               {{30, 5}, {35, 5}},
+               {{20, 10}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{20, 1}}, {{10, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{10, 1}}, {{0, 1}});
+
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {10, 1}, {0, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {20, 1}, {10, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {30, 5}, {20, 5});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {35, 5}, {25, 5});
+
+  auto cow_ops = ConvertToCowOperations(operations_, merge_operations_);
+  // Expect 4 COW_COPY
+  ASSERT_EQ(cow_ops.size(), 12UL);
+  ASSERT_TRUE(std::all_of(cow_ops.begin(), cow_ops.end(), [](auto&& cow_op) {
+    return cow_op.op == CowOperation::CowCopy;
+  }));
+  VerifyCowMergeOp(cow_ops);
+}
+}  // namespace chromeos_update_engine
diff --git a/common/dynamic_partition_control_interface.h b/common/dynamic_partition_control_interface.h
index 7c2d0b0..530b0af 100644
--- a/common/dynamic_partition_control_interface.h
+++ b/common/dynamic_partition_control_interface.h
@@ -26,8 +26,14 @@
 #include "update_engine/common/action.h"
 #include "update_engine/common/cleanup_previous_update_action_delegate.h"
 #include "update_engine/common/error_code.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
 #include "update_engine/update_metadata.pb.h"
 
+// Forware declare for libsnapshot/snapshot_writer.h
+namespace android::snapshot {
+class ISnapshotWriter;
+}
+
 namespace chromeos_update_engine {
 
 struct FeatureFlag {
@@ -56,6 +62,8 @@
 
   // Return the feature flags of Virtual A/B on this device.
   virtual FeatureFlag GetVirtualAbFeatureFlag() = 0;
+  // Return the feature flags of Virtual A/B Compression on this device.
+  virtual FeatureFlag GetVirtualAbCompressionFeatureFlag() = 0;
 
   // Attempt to optimize |operation|.
   // If successful, |optimized| contains an operation with extents that
@@ -137,6 +145,15 @@
       uint32_t source_slot,
       uint32_t target_slot,
       const std::vector<std::string>& partitions) = 0;
+  // Partition name is expected to be unsuffixed. e.g. system, vendor
+  // Return an interface to write to a snapshoted partition.
+  // If `is_append` is false, then existing COW data will be overwritten.
+  // Otherwise the cow writer will be opened on APPEND mode, existing COW data
+  // is preserved.
+  virtual std::unique_ptr<android::snapshot::ISnapshotWriter> OpenCowWriter(
+      const std::string& unsuffixed_partition_name,
+      const std::optional<std::string>&,
+      bool is_append = false) = 0;
 };
 
 }  // namespace chromeos_update_engine
diff --git a/common/dynamic_partition_control_stub.cc b/common/dynamic_partition_control_stub.cc
index 5a8ca43..64ab201 100644
--- a/common/dynamic_partition_control_stub.cc
+++ b/common/dynamic_partition_control_stub.cc
@@ -20,6 +20,7 @@
 #include <string>
 
 #include <base/logging.h>
+#include <libsnapshot/cow_writer.h>
 
 #include "update_engine/common/dynamic_partition_control_stub.h"
 
@@ -33,6 +34,10 @@
   return FeatureFlag(FeatureFlag::Value::NONE);
 }
 
+FeatureFlag DynamicPartitionControlStub::GetVirtualAbCompressionFeatureFlag() {
+  return FeatureFlag(FeatureFlag::Value::NONE);
+}
+
 bool DynamicPartitionControlStub::OptimizeOperation(
     const std::string& partition_name,
     const InstallOperation& operation,
@@ -83,4 +88,12 @@
   return true;
 }
 
+std::unique_ptr<android::snapshot::ISnapshotWriter>
+DynamicPartitionControlStub::OpenCowWriter(
+    const std::string& /*unsuffixed_partition_name*/,
+    const std::optional<std::string>& /*source_path*/,
+    bool /*is_append*/) {
+  return nullptr;
+}
+
 }  // namespace chromeos_update_engine
diff --git a/common/dynamic_partition_control_stub.h b/common/dynamic_partition_control_stub.h
index 94dba1b..a939cfb 100644
--- a/common/dynamic_partition_control_stub.h
+++ b/common/dynamic_partition_control_stub.h
@@ -31,6 +31,7 @@
  public:
   FeatureFlag GetDynamicPartitionsFeatureFlag() override;
   FeatureFlag GetVirtualAbFeatureFlag() override;
+  FeatureFlag GetVirtualAbCompressionFeatureFlag() override;
   bool OptimizeOperation(const std::string& partition_name,
                          const InstallOperation& operation,
                          InstallOperation* optimized) override;
@@ -56,8 +57,12 @@
       uint32_t source_slot,
       uint32_t target_slot,
       const std::vector<std::string>& partitions) override;
-};
 
+  std::unique_ptr<android::snapshot::ISnapshotWriter> OpenCowWriter(
+      const std::string& unsuffixed_partition_name,
+      const std::optional<std::string>&,
+      bool is_append) override;
+};
 }  // namespace chromeos_update_engine
 
 #endif  // UPDATE_ENGINE_COMMON_DYNAMIC_PARTITION_CONTROL_STUB_H_
diff --git a/common/platform_constants.h b/common/platform_constants.h
index 243af69..c060133 100644
--- a/common/platform_constants.h
+++ b/common/platform_constants.h
@@ -58,6 +58,12 @@
 // postinstall.
 extern const char kPostinstallMountOptions[];
 
+#ifdef __ANDROID_RECOVERY__
+constexpr bool kIsRecovery = true;
+#else
+constexpr bool kIsRecovery = false;
+#endif
+
 }  // namespace constants
 }  // namespace chromeos_update_engine
 
diff --git a/dynamic_partition_control_android.cc b/dynamic_partition_control_android.cc
index ccb99ba..06e5745 100644
--- a/dynamic_partition_control_android.cc
+++ b/dynamic_partition_control_android.cc
@@ -17,6 +17,7 @@
 #include "update_engine/dynamic_partition_control_android.h"
 
 #include <chrono>  // NOLINT(build/c++11) - using libsnapshot / liblp API
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <set>
@@ -36,6 +37,8 @@
 #include <fs_mgr_overlayfs.h>
 #include <libavb/libavb.h>
 #include <libdm/dm.h>
+#include <liblp/liblp.h>
+#include <libsnapshot/cow_writer.h>
 #include <libsnapshot/snapshot.h>
 #include <libsnapshot/snapshot_stub.h>
 
@@ -71,6 +74,14 @@
     "ro.boot.dynamic_partitions_retrofit";
 constexpr char kVirtualAbEnabled[] = "ro.virtual_ab.enabled";
 constexpr char kVirtualAbRetrofit[] = "ro.virtual_ab.retrofit";
+constexpr char kVirtualAbCompressionEnabled[] =
+    "ro.virtual_ab.compression.enabled";
+
+// Currently, android doesn't have a retrofit prop for VAB Compression. However,
+// struct FeatureFlag forces us to determine if a feature is 'retrofit'. So this
+// is here just to simplify code. Replace it with real retrofit prop name once
+// there is one.
+constexpr char kVirtualAbCompressionRetrofit[] = "";
 constexpr char kPostinstallFstabPrefix[] = "ro.postinstall.fstab.prefix";
 // Map timeout for dynamic partitions.
 constexpr std::chrono::milliseconds kMapTimeout{1000};
@@ -78,19 +89,15 @@
 // needs to be mapped, this timeout is longer than |kMapTimeout|.
 constexpr std::chrono::milliseconds kMapSnapshotTimeout{5000};
 
-#ifdef __ANDROID_RECOVERY__
-constexpr bool kIsRecovery = true;
-#else
-constexpr bool kIsRecovery = false;
-#endif
-
 DynamicPartitionControlAndroid::~DynamicPartitionControlAndroid() {
   Cleanup();
 }
 
 static FeatureFlag GetFeatureFlag(const char* enable_prop,
                                   const char* retrofit_prop) {
-  bool retrofit = GetBoolProperty(retrofit_prop, false);
+  // Default retrofit to false if retrofit_prop is empty.
+  bool retrofit = retrofit_prop && retrofit_prop[0] != '\0' &&
+                  GetBoolProperty(retrofit_prop, false);
   bool enabled = GetBoolProperty(enable_prop, false);
   if (retrofit && !enabled) {
     LOG(ERROR) << retrofit_prop << " is true but " << enable_prop
@@ -109,7 +116,9 @@
 DynamicPartitionControlAndroid::DynamicPartitionControlAndroid()
     : dynamic_partitions_(
           GetFeatureFlag(kUseDynamicPartitions, kRetrfoitDynamicPartitions)),
-      virtual_ab_(GetFeatureFlag(kVirtualAbEnabled, kVirtualAbRetrofit)) {
+      virtual_ab_(GetFeatureFlag(kVirtualAbEnabled, kVirtualAbRetrofit)),
+      virtual_ab_compression_(GetFeatureFlag(kVirtualAbCompressionEnabled,
+                                             kVirtualAbCompressionRetrofit)) {
   if (GetVirtualAbFeatureFlag().IsEnabled()) {
     snapshot_ = SnapshotManager::New();
   } else {
@@ -126,6 +135,11 @@
   return virtual_ab_;
 }
 
+FeatureFlag
+DynamicPartitionControlAndroid::GetVirtualAbCompressionFeatureFlag() {
+  return virtual_ab_compression_;
+}
+
 bool DynamicPartitionControlAndroid::OptimizeOperation(
     const std::string& partition_name,
     const InstallOperation& operation,
@@ -1068,7 +1082,7 @@
 }
 
 bool DynamicPartitionControlAndroid::IsRecovery() {
-  return kIsRecovery;
+  return constants::kIsRecovery;
 }
 
 static bool IsIncrementalUpdate(const DeltaArchiveManifest& manifest) {
@@ -1219,4 +1233,30 @@
   return metadata_device_ != nullptr;
 }
 
+std::unique_ptr<android::snapshot::ISnapshotWriter>
+DynamicPartitionControlAndroid::OpenCowWriter(
+    const std::string& partition_name,
+    const std::optional<std::string>& source_path,
+    bool is_append) {
+  auto suffix = SlotSuffixForSlotNumber(target_slot_);
+
+  std::string device_dir_str;
+  if (!GetDeviceDir(&device_dir_str)) {
+    LOG(ERROR) << "Failed to get device dir!";
+    return nullptr;
+  }
+  base::FilePath device_dir(device_dir_str);
+  auto super_device =
+      device_dir.Append(GetSuperPartitionName(target_slot_)).value();
+  CreateLogicalPartitionParams params = {
+      .block_device = super_device,
+      .metadata_slot = target_slot_,
+      .partition_name = partition_name + suffix,
+      .force_writable = true,
+  };
+  // TODO(zhangkelvin) Open an APPEND mode CowWriter once there's an API to do
+  // it.
+  return snapshot_->OpenSnapshotWriter(params, std::move(source_path));
+}
+
 }  // namespace chromeos_update_engine
diff --git a/dynamic_partition_control_android.h b/dynamic_partition_control_android.h
index 49967f6..9bffb59 100644
--- a/dynamic_partition_control_android.h
+++ b/dynamic_partition_control_android.h
@@ -25,6 +25,7 @@
 #include <base/files/file_util.h>
 #include <libsnapshot/auto_device.h>
 #include <libsnapshot/snapshot.h>
+#include <libsnapshot/snapshot_writer.h>
 
 #include "update_engine/common/dynamic_partition_control_interface.h"
 
@@ -36,6 +37,7 @@
   ~DynamicPartitionControlAndroid();
   FeatureFlag GetDynamicPartitionsFeatureFlag() override;
   FeatureFlag GetVirtualAbFeatureFlag() override;
+  FeatureFlag GetVirtualAbCompressionFeatureFlag() override;
   bool OptimizeOperation(const std::string& partition_name,
                          const InstallOperation& operation,
                          InstallOperation* optimized) override;
@@ -81,6 +83,13 @@
                           uint32_t current_slot,
                           std::string* device);
 
+  // Partition name is expected to be unsuffixed. e.g. system, vendor
+  // Return an interface to write to a snapshoted partition.
+  std::unique_ptr<android::snapshot::ISnapshotWriter> OpenCowWriter(
+      const std::string& unsuffixed_partition_name,
+      const std::optional<std::string>& source_path,
+      bool is_append) override;
+
  protected:
   // These functions are exposed for testing.
 
@@ -277,6 +286,7 @@
   std::set<std::string> mapped_devices_;
   const FeatureFlag dynamic_partitions_;
   const FeatureFlag virtual_ab_;
+  const FeatureFlag virtual_ab_compression_;
   std::unique_ptr<android::snapshot::ISnapshotManager> snapshot_;
   std::unique_ptr<android::snapshot::AutoDevice> metadata_device_;
   bool target_supports_snapshot_ = false;
diff --git a/hardware_android.cc b/hardware_android.cc
index 8d1fdfd..28c139a 100644
--- a/hardware_android.cc
+++ b/hardware_android.cc
@@ -17,18 +17,16 @@
 #include "update_engine/hardware_android.h"
 
 #include <sys/types.h>
-#include <sys/utsname.h>
 
 #include <memory>
 #include <string>
 #include <string_view>
 
+#include <android/sysprop/GkiProperties.sysprop.h>
 #include <android-base/parseint.h>
 #include <android-base/properties.h>
 #include <base/files/file_util.h>
 #include <bootloader_message/bootloader_message.h>
-#include <kver/kernel_release.h>
-#include <kver/utils.h>
 
 #include "update_engine/common/error_code_utils.h"
 #include "update_engine/common/hardware.h"
@@ -38,8 +36,6 @@
 using android::base::GetBoolProperty;
 using android::base::GetIntProperty;
 using android::base::GetProperty;
-using android::kver::IsKernelUpdateValid;
-using android::kver::KernelRelease;
 using std::string;
 
 namespace chromeos_update_engine {
@@ -59,6 +55,19 @@
                                     "");
 }
 
+ErrorCode IsTimestampNewerLogged(const std::string& partition_name,
+                                 const std::string& old_version,
+                                 const std::string& new_version) {
+  auto error_code = utils::IsTimestampNewer(old_version, new_version);
+  if (error_code != ErrorCode::kSuccess) {
+    LOG(WARNING) << "Timestamp check failed with "
+                 << utils::ErrorCodeToString(error_code) << ": "
+                 << partition_name << " Partition timestamp: " << old_version
+                 << " Update timestamp: " << new_version;
+  }
+  return error_code;
+}
+
 }  // namespace
 
 namespace hardware {
@@ -222,23 +231,19 @@
 }
 
 void HardwareAndroid::SetWarmReset(bool warm_reset) {
-  constexpr char warm_reset_prop[] = "ota.warm_reset";
-  if (!android::base::SetProperty(warm_reset_prop, warm_reset ? "1" : "0")) {
-    LOG(WARNING) << "Failed to set prop " << warm_reset_prop;
+  if constexpr (!constants::kIsRecovery) {
+    constexpr char warm_reset_prop[] = "ota.warm_reset";
+    if (!android::base::SetProperty(warm_reset_prop, warm_reset ? "1" : "0")) {
+      LOG(WARNING) << "Failed to set prop " << warm_reset_prop;
+    }
   }
 }
 
 string HardwareAndroid::GetVersionForLogging(
     const string& partition_name) const {
   if (partition_name == "boot") {
-    struct utsname buf;
-    if (uname(&buf) != 0) {
-      PLOG(ERROR) << "Unable to call uname()";
-      return "";
-    }
-    auto kernel_release =
-        KernelRelease::Parse(buf.release, true /* allow_suffix */);
-    return kernel_release.has_value() ? kernel_release->string() : "";
+    // ro.bootimage.build.date.utc
+    return GetPartitionBuildDate("bootimage");
   }
   return GetPartitionBuildDate(partition_name);
 }
@@ -246,50 +251,33 @@
 ErrorCode HardwareAndroid::IsPartitionUpdateValid(
     const string& partition_name, const string& new_version) const {
   if (partition_name == "boot") {
-    struct utsname buf;
-    if (uname(&buf) != 0) {
-      PLOG(ERROR) << "Unable to call uname()";
-      return ErrorCode::kError;
+    const auto old_version = GetPartitionBuildDate("bootimage");
+    auto error_code =
+        IsTimestampNewerLogged(partition_name, old_version, new_version);
+    if (error_code == ErrorCode::kPayloadTimestampError) {
+      bool prevent_downgrade =
+          android::sysprop::GkiProperties::prevent_downgrade_version().value_or(
+              false);
+      if (!prevent_downgrade) {
+        LOG(WARNING) << "Downgrade of boot image is detected, but permitting "
+                        "update because device does not prevent boot image "
+                        "downgrade";
+        // If prevent_downgrade_version sysprop is not explicitly set, permit
+        // downgrade in boot image version.
+        // Even though error_code is overridden here, always call
+        // IsTimestampNewerLogged to produce log messages.
+        error_code = ErrorCode::kSuccess;
+      }
     }
-    return IsKernelUpdateValid(buf.release, new_version);
+    return error_code;
   }
 
   const auto old_version = GetPartitionBuildDate(partition_name);
   // TODO(zhangkelvin)  for some partitions, missing a current timestamp should
   // be an error, e.g. system, vendor, product etc.
-  auto error_code = utils::IsTimestampNewer(old_version, new_version);
-  if (error_code != ErrorCode::kSuccess) {
-    LOG(ERROR) << "Timestamp check failed with "
-               << utils::ErrorCodeToString(error_code)
-               << " Partition timestamp: " << old_version
-               << " Update timestamp: " << new_version;
-  }
+  auto error_code =
+      IsTimestampNewerLogged(partition_name, old_version, new_version);
   return error_code;
 }
 
-ErrorCode HardwareAndroid::IsKernelUpdateValid(const string& old_release,
-                                               const string& new_release) {
-  // Check that the package either contain an empty version (indicating that the
-  // new build does not use GKI), or a valid GKI kernel release.
-  std::optional<KernelRelease> new_kernel_release;
-  if (new_release.empty()) {
-    LOG(INFO) << "New build does not contain GKI.";
-  } else {
-    new_kernel_release =
-        KernelRelease::Parse(new_release, true /* allow_suffix */);
-    if (!new_kernel_release.has_value()) {
-      LOG(ERROR) << "New kernel release is not valid GKI kernel release: "
-                 << new_release;
-      return ErrorCode::kDownloadManifestParseError;
-    }
-  }
-
-  auto old_kernel_release =
-      KernelRelease::Parse(old_release, true /* allow_suffix */);
-  return android::kver::IsKernelUpdateValid(old_kernel_release,
-                                            new_kernel_release)
-             ? ErrorCode::kSuccess
-             : ErrorCode::kPayloadTimestampError;
-}
-
 }  // namespace chromeos_update_engine
diff --git a/hardware_android.h b/hardware_android.h
index d7e39f3..4d10835 100644
--- a/hardware_android.h
+++ b/hardware_android.h
@@ -22,7 +22,6 @@
 
 #include <base/macros.h>
 #include <base/time/time.h>
-#include <gtest/gtest_prod.h>
 
 #include "update_engine/common/error_code.h"
 #include "update_engine/common/hardware.h"
@@ -66,12 +65,6 @@
       const std::string& new_version) const override;
 
  private:
-  FRIEND_TEST(HardwareAndroidTest, IsKernelUpdateValid);
-
-  // Helper for IsPartitionUpdateValid.
-  static ErrorCode IsKernelUpdateValid(const std::string& old_release,
-                                       const std::string& new_release);
-
   DISALLOW_COPY_AND_ASSIGN(HardwareAndroid);
 };
 
diff --git a/hardware_android_unittest.cc b/hardware_android_unittest.cc
deleted file mode 100644
index 9a491f3..0000000
--- a/hardware_android_unittest.cc
+++ /dev/null
@@ -1,67 +0,0 @@
-//
-// Copyright (C) 2020 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-//
-
-#include <gtest/gtest.h>
-
-#include "update_engine/common/error_code.h"
-#include "update_engine/common/test_utils.h"
-#include "update_engine/hardware_android.h"
-
-namespace chromeos_update_engine {
-
-TEST(HardwareAndroidTest, IsKernelUpdateValid) {
-  EXPECT_EQ(ErrorCode::kSuccess,
-            HardwareAndroid::IsKernelUpdateValid("5.4.42-not-gki", ""))
-      << "Legacy update should be fine";
-
-  EXPECT_EQ(ErrorCode::kSuccess,
-            HardwareAndroid::IsKernelUpdateValid("5.4.42-not-gki",
-                                                 "5.4.42-android12-0"))
-      << "Update to GKI should be fine";
-
-  EXPECT_EQ(
-      ErrorCode::kDownloadManifestParseError,
-      HardwareAndroid::IsKernelUpdateValid("5.4.42-not-gki", "5.4.42-not-gki"))
-      << "Should report parse error for invalid version field";
-
-  EXPECT_EQ(ErrorCode::kSuccess,
-            HardwareAndroid::IsKernelUpdateValid(
-                "5.4.42-android12-0-something", "5.4.42-android12-0-something"))
-      << "Self update should be fine";
-
-  EXPECT_EQ(ErrorCode::kSuccess,
-            HardwareAndroid::IsKernelUpdateValid(
-                "5.4.42-android12-0-something", "5.4.43-android12-0-something"))
-      << "Sub-level update should be fine";
-
-  EXPECT_EQ(
-      ErrorCode::kSuccess,
-      HardwareAndroid::IsKernelUpdateValid("5.4.42-android12-0-something",
-                                           "5.10.10-android12-0-something"))
-      << "KMI version update should be fine";
-
-  EXPECT_EQ(ErrorCode::kPayloadTimestampError,
-            HardwareAndroid::IsKernelUpdateValid("5.4.42-android12-0-something",
-                                                 "5.4.5-android12-0-something"))
-      << "Should detect sub-level downgrade";
-
-  EXPECT_EQ(ErrorCode::kPayloadTimestampError,
-            HardwareAndroid::IsKernelUpdateValid("5.4.42-android12-0-something",
-                                                 "5.1.5-android12-0-something"))
-      << "Should detect KMI version downgrade";
-}
-
-}  // namespace chromeos_update_engine
diff --git a/mock_dynamic_partition_control.h b/mock_dynamic_partition_control.h
index e85df32..5144cbb 100644
--- a/mock_dynamic_partition_control.h
+++ b/mock_dynamic_partition_control.h
@@ -22,6 +22,9 @@
 
 #include <gmock/gmock.h>
 
+#include <libsnapshot/cow_writer.h>
+
+#include "libsnapshot/snapshot_writer.h"
 #include "update_engine/common/boot_control_interface.h"
 #include "update_engine/common/dynamic_partition_control_interface.h"
 #include "update_engine/dynamic_partition_control_android.h"
@@ -81,6 +84,12 @@
               PrepareDynamicPartitionsForUpdate,
               (uint32_t, uint32_t, const DeltaArchiveManifest&, bool),
               (override));
+  MOCK_METHOD(std::unique_ptr<android::snapshot::ISnapshotWriter>,
+              OpenCowWriter,
+              (const std::string& unsuffixed_partition_name,
+               const std::optional<std::string>& source_path,
+               bool is_append),
+              (override));
 
   void set_fake_mapped_devices(const std::set<std::string>& fake) override {
     DynamicPartitionControlAndroid::set_fake_mapped_devices(fake);
diff --git a/payload_consumer/delta_performer.cc b/payload_consumer/delta_performer.cc
index d2ed24a..a3989d6 100644
--- a/payload_consumer/delta_performer.cc
+++ b/payload_consumer/delta_performer.cc
@@ -54,6 +54,7 @@
 #include "update_engine/payload_consumer/extent_reader.h"
 #include "update_engine/payload_consumer/extent_writer.h"
 #include "update_engine/payload_consumer/partition_update_generator_interface.h"
+#include "update_engine/payload_consumer/partition_writer.h"
 #if USE_FEC
 #include "update_engine/payload_consumer/fec_file_descriptor.h"
 #endif  // USE_FEC
@@ -79,65 +80,6 @@
 const int kUpdateStateOperationInvalid = -1;
 const int kMaxResumedUpdateFailures = 10;
 
-const uint64_t kCacheSize = 1024 * 1024;  // 1MB
-
-// Opens path for read/write. On success returns an open FileDescriptor
-// and sets *err to 0. On failure, sets *err to errno and returns nullptr.
-FileDescriptorPtr OpenFile(const char* path,
-                           int mode,
-                           bool cache_writes,
-                           int* err) {
-  // Try to mark the block device read-only based on the mode. Ignore any
-  // failure since this won't work when passing regular files.
-  bool read_only = (mode & O_ACCMODE) == O_RDONLY;
-  utils::SetBlockDeviceReadOnly(path, read_only);
-
-  FileDescriptorPtr fd(new EintrSafeFileDescriptor());
-  if (cache_writes && !read_only) {
-    fd = FileDescriptorPtr(new CachedFileDescriptor(fd, kCacheSize));
-    LOG(INFO) << "Caching writes.";
-  }
-  if (!fd->Open(path, mode, 000)) {
-    *err = errno;
-    PLOG(ERROR) << "Unable to open file " << path;
-    return nullptr;
-  }
-  *err = 0;
-  return fd;
-}
-
-// Discard the tail of the block device referenced by |fd|, from the offset
-// |data_size| until the end of the block device. Returns whether the data was
-// discarded.
-bool DiscardPartitionTail(const FileDescriptorPtr& fd, uint64_t data_size) {
-  uint64_t part_size = fd->BlockDevSize();
-  if (!part_size || part_size <= data_size)
-    return false;
-
-  struct blkioctl_request {
-    int number;
-    const char* name;
-  };
-  const vector<blkioctl_request> blkioctl_requests = {
-      {BLKDISCARD, "BLKDISCARD"},
-      {BLKSECDISCARD, "BLKSECDISCARD"},
-#ifdef BLKZEROOUT
-      {BLKZEROOUT, "BLKZEROOUT"},
-#endif
-  };
-  for (const auto& req : blkioctl_requests) {
-    int error = 0;
-    if (fd->BlkIoctl(req.number, data_size, part_size - data_size, &error) &&
-        error == 0) {
-      return true;
-    }
-    LOG(WARNING) << "Error discarding the last "
-                 << (part_size - data_size) / 1024 << " KiB using ioctl("
-                 << req.name << ")";
-  }
-  return false;
-}
-
 }  // namespace
 
 // Computes the ratio of |part| and |total|, scaled to |norm|, using integer
@@ -282,33 +224,12 @@
 }
 
 int DeltaPerformer::CloseCurrentPartition() {
-  int err = 0;
-  if (source_fd_ && !source_fd_->Close()) {
-    err = errno;
-    PLOG(ERROR) << "Error closing source partition";
-    if (!err)
-      err = 1;
+  if (!partition_writer_) {
+    return 0;
   }
-  source_fd_.reset();
-  if (source_ecc_fd_ && !source_ecc_fd_->Close()) {
-    err = errno;
-    PLOG(ERROR) << "Error closing ECC source partition";
-    if (!err)
-      err = 1;
-  }
-  source_ecc_fd_.reset();
-  source_ecc_open_failure_ = false;
-  source_path_.clear();
-
-  if (target_fd_ && !target_fd_->Close()) {
-    err = errno;
-    PLOG(ERROR) << "Error closing target partition";
-    if (!err)
-      err = 1;
-  }
-  target_fd_.reset();
-  target_path_.clear();
-  return -err;
+  int err = partition_writer_->Close();
+  partition_writer_ = nullptr;
+  return err;
 }
 
 bool DeltaPerformer::OpenCurrentPartition() {
@@ -320,92 +241,19 @@
       install_plan_->partitions.size() - partitions_.size();
   const InstallPlan::Partition& install_part =
       install_plan_->partitions[num_previous_partitions + current_partition_];
+  auto dynamic_control = boot_control_->GetDynamicPartitionControl();
+  partition_writer_ = partition_writer::CreatePartitionWriter(
+      partition,
+      install_part,
+      dynamic_control,
+      block_size_,
+      interactive_,
+      IsDynamicPartition(install_part.name));
   // Open source fds if we have a delta payload, or for partitions in the
   // partial update.
   bool source_may_exist = manifest_.partial_update() ||
                           payload_->type == InstallPayloadType::kDelta;
-  // We shouldn't open the source partition in certain cases, e.g. some dynamic
-  // partitions in delta payload, partitions included in the full payload for
-  // partial updates. Use the source size as the indicator.
-  if (source_may_exist && install_part.source_size > 0) {
-    source_path_ = install_part.source_path;
-    int err;
-    source_fd_ = OpenFile(source_path_.c_str(), O_RDONLY, false, &err);
-    if (!source_fd_) {
-      LOG(ERROR) << "Unable to open source partition "
-                 << partition.partition_name() << " on slot "
-                 << BootControlInterface::SlotName(install_plan_->source_slot)
-                 << ", file " << source_path_;
-      return false;
-    }
-  }
-
-  target_path_ = install_part.target_path;
-  int err;
-
-  int flags = O_RDWR;
-  if (!interactive_)
-    flags |= O_DSYNC;
-
-  LOG(INFO) << "Opening " << target_path_ << " partition with"
-            << (interactive_ ? "out" : "") << " O_DSYNC";
-
-  target_fd_ = OpenFile(target_path_.c_str(), flags, true, &err);
-  if (!target_fd_) {
-    LOG(ERROR) << "Unable to open target partition "
-               << partition.partition_name() << " on slot "
-               << BootControlInterface::SlotName(install_plan_->target_slot)
-               << ", file " << target_path_;
-    return false;
-  }
-
-  LOG(INFO) << "Applying " << partition.operations().size()
-            << " operations to partition \"" << partition.partition_name()
-            << "\"";
-
-  // Discard the end of the partition, but ignore failures.
-  DiscardPartitionTail(target_fd_, install_part.target_size);
-
-  return true;
-}
-
-bool DeltaPerformer::OpenCurrentECCPartition() {
-  if (source_ecc_fd_)
-    return true;
-
-  if (source_ecc_open_failure_)
-    return false;
-
-  if (current_partition_ >= partitions_.size())
-    return false;
-
-  // No support for ECC for full payloads.
-  if (payload_->type == InstallPayloadType::kFull)
-    return false;
-
-#if USE_FEC
-  const PartitionUpdate& partition = partitions_[current_partition_];
-  size_t num_previous_partitions =
-      install_plan_->partitions.size() - partitions_.size();
-  const InstallPlan::Partition& install_part =
-      install_plan_->partitions[num_previous_partitions + current_partition_];
-  string path = install_part.source_path;
-  FileDescriptorPtr fd(new FecFileDescriptor());
-  if (!fd->Open(path.c_str(), O_RDONLY, 0)) {
-    PLOG(ERROR) << "Unable to open ECC source partition "
-                << partition.partition_name() << " on slot "
-                << BootControlInterface::SlotName(install_plan_->source_slot)
-                << ", file " << path;
-    source_ecc_open_failure_ = true;
-    return false;
-  }
-  source_ecc_fd_ = fd;
-#else
-  // No support for ECC compiled.
-  source_ecc_open_failure_ = true;
-#endif  // USE_FEC
-
-  return !source_ecc_open_failure_;
+  return partition_writer_->Init(install_plan_, source_may_exist);
 }
 
 namespace {
@@ -724,10 +572,6 @@
     if (!HandleOpResult(op_result, InstallOperationTypeName(op.type()), error))
       return false;
 
-    if (!target_fd_->Flush()) {
-      return false;
-    }
-
     next_operation_num_++;
     UpdateOverallProgress(false, "Completed ");
     CheckpointUpdateProgress(false);
@@ -790,6 +634,11 @@
     }
   }
 
+  auto dynamic_control = boot_control_->GetDynamicPartitionControl();
+  CHECK_NE(dynamic_control, nullptr);
+  TEST_AND_RETURN_FALSE(dynamic_control->ListDynamicPartitionsForSlot(
+      install_plan_->target_slot, &dynamic_partitions_));
+
   // Partitions in manifest are no longer needed after preparing partitions.
   manifest_.clear_partitions();
   // TODO(xunchang) TBD: allow partial update only on devices with dynamic
@@ -994,22 +843,10 @@
 
   // Since we delete data off the beginning of the buffer as we use it,
   // the data we need should be exactly at the beginning of the buffer.
-  TEST_AND_RETURN_FALSE(buffer_offset_ == operation.data_offset());
   TEST_AND_RETURN_FALSE(buffer_.size() >= operation.data_length());
 
-  // Setup the ExtentWriter stack based on the operation type.
-  std::unique_ptr<ExtentWriter> writer = std::make_unique<DirectExtentWriter>();
-
-  if (operation.type() == InstallOperation::REPLACE_BZ) {
-    writer.reset(new BzipExtentWriter(std::move(writer)));
-  } else if (operation.type() == InstallOperation::REPLACE_XZ) {
-    writer.reset(new XzExtentWriter(std::move(writer)));
-  }
-
-  TEST_AND_RETURN_FALSE(
-      writer->Init(target_fd_, operation.dst_extents(), block_size_));
-  TEST_AND_RETURN_FALSE(writer->Write(buffer_.data(), operation.data_length()));
-
+  TEST_AND_RETURN_FALSE(partition_writer_->PerformReplaceOperation(
+      operation, buffer_.data(), buffer_.size()));
   // Update buffer
   DiscardBuffer(true, buffer_.size());
   return true;
@@ -1024,41 +861,13 @@
   TEST_AND_RETURN_FALSE(!operation.has_data_offset());
   TEST_AND_RETURN_FALSE(!operation.has_data_length());
 
-#ifdef BLKZEROOUT
-  bool attempt_ioctl = true;
-  int request =
-      (operation.type() == InstallOperation::ZERO ? BLKZEROOUT : BLKDISCARD);
-#else   // !defined(BLKZEROOUT)
-  bool attempt_ioctl = false;
-  int request = 0;
-#endif  // !defined(BLKZEROOUT)
-
-  brillo::Blob zeros;
-  for (const Extent& extent : operation.dst_extents()) {
-    const uint64_t start = extent.start_block() * block_size_;
-    const uint64_t length = extent.num_blocks() * block_size_;
-    if (attempt_ioctl) {
-      int result = 0;
-      if (target_fd_->BlkIoctl(request, start, length, &result) && result == 0)
-        continue;
-      attempt_ioctl = false;
-    }
-    // In case of failure, we fall back to writing 0 to the selected region.
-    zeros.resize(16 * block_size_);
-    for (uint64_t offset = 0; offset < length; offset += zeros.size()) {
-      uint64_t chunk_length =
-          min(length - offset, static_cast<uint64_t>(zeros.size()));
-      TEST_AND_RETURN_FALSE(utils::PWriteAll(
-          target_fd_, zeros.data(), chunk_length, start + offset));
-    }
-  }
-  return true;
+  return partition_writer_->PerformZeroOrDiscardOperation(operation);
 }
 
-bool DeltaPerformer::ValidateSourceHash(const brillo::Blob& calculated_hash,
-                                        const InstallOperation& operation,
-                                        const FileDescriptorPtr source_fd,
-                                        ErrorCode* error) {
+bool PartitionWriter::ValidateSourceHash(const brillo::Blob& calculated_hash,
+                                         const InstallOperation& operation,
+                                         const FileDescriptorPtr source_fd,
+                                         ErrorCode* error) {
   brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
                                     operation.src_sha256_hash().end());
   if (calculated_hash != expected_source_hash) {
@@ -1099,169 +908,7 @@
     TEST_AND_RETURN_FALSE(operation.src_length() % block_size_ == 0);
   if (operation.has_dst_length())
     TEST_AND_RETURN_FALSE(operation.dst_length() % block_size_ == 0);
-
-  TEST_AND_RETURN_FALSE(source_fd_ != nullptr);
-
-  // The device may optimize the SOURCE_COPY operation.
-  // Being this a device-specific optimization let DynamicPartitionController
-  // decide it the operation should be skipped.
-  const PartitionUpdate& partition = partitions_[current_partition_];
-  const auto& partition_control = boot_control_->GetDynamicPartitionControl();
-
-  InstallOperation buf;
-  bool should_optimize = partition_control->OptimizeOperation(
-      partition.partition_name(), operation, &buf);
-  const InstallOperation& optimized = should_optimize ? buf : operation;
-
-  if (operation.has_src_sha256_hash()) {
-    bool read_ok;
-    brillo::Blob source_hash;
-    brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
-                                      operation.src_sha256_hash().end());
-
-    // We fall back to use the error corrected device if the hash of the raw
-    // device doesn't match or there was an error reading the source partition.
-    // Note that this code will also fall back if writing the target partition
-    // fails.
-    if (should_optimize) {
-      // Hash operation.src_extents(), then copy optimized.src_extents to
-      // optimized.dst_extents.
-      read_ok =
-          fd_utils::ReadAndHashExtents(
-              source_fd_, operation.src_extents(), block_size_, &source_hash) &&
-          fd_utils::CopyAndHashExtents(source_fd_,
-                                       optimized.src_extents(),
-                                       target_fd_,
-                                       optimized.dst_extents(),
-                                       block_size_,
-                                       nullptr /* skip hashing */);
-    } else {
-      read_ok = fd_utils::CopyAndHashExtents(source_fd_,
-                                             operation.src_extents(),
-                                             target_fd_,
-                                             operation.dst_extents(),
-                                             block_size_,
-                                             &source_hash);
-    }
-    if (read_ok && expected_source_hash == source_hash)
-      return true;
-    LOG(WARNING) << "Source hash from RAW device mismatched, attempting to "
-                    "correct using ECC";
-    if (!OpenCurrentECCPartition()) {
-      // The following function call will return false since the source hash
-      // mismatches, but we still want to call it so it prints the appropriate
-      // log message.
-      return ValidateSourceHash(source_hash, operation, source_fd_, error);
-    }
-
-    LOG(WARNING) << "Source hash from RAW device mismatched: found "
-                 << base::HexEncode(source_hash.data(), source_hash.size())
-                 << ", expected "
-                 << base::HexEncode(expected_source_hash.data(),
-                                    expected_source_hash.size());
-    if (should_optimize) {
-      TEST_AND_RETURN_FALSE(fd_utils::ReadAndHashExtents(
-          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash));
-      TEST_AND_RETURN_FALSE(
-          fd_utils::CopyAndHashExtents(source_ecc_fd_,
-                                       optimized.src_extents(),
-                                       target_fd_,
-                                       optimized.dst_extents(),
-                                       block_size_,
-                                       nullptr /* skip hashing */));
-    } else {
-      TEST_AND_RETURN_FALSE(
-          fd_utils::CopyAndHashExtents(source_ecc_fd_,
-                                       operation.src_extents(),
-                                       target_fd_,
-                                       operation.dst_extents(),
-                                       block_size_,
-                                       &source_hash));
-    }
-    TEST_AND_RETURN_FALSE(
-        ValidateSourceHash(source_hash, operation, source_ecc_fd_, error));
-    // At this point reading from the the error corrected device worked, but
-    // reading from the raw device failed, so this is considered a recovered
-    // failure.
-    source_ecc_recovered_failures_++;
-  } else {
-    // When the operation doesn't include a source hash, we attempt the error
-    // corrected device first since we can't verify the block in the raw device
-    // at this point, but we fall back to the raw device since the error
-    // corrected device can be shorter or not available.
-
-    if (OpenCurrentECCPartition() &&
-        fd_utils::CopyAndHashExtents(source_ecc_fd_,
-                                     optimized.src_extents(),
-                                     target_fd_,
-                                     optimized.dst_extents(),
-                                     block_size_,
-                                     nullptr)) {
-      return true;
-    }
-    TEST_AND_RETURN_FALSE(fd_utils::CopyAndHashExtents(source_fd_,
-                                                       optimized.src_extents(),
-                                                       target_fd_,
-                                                       optimized.dst_extents(),
-                                                       block_size_,
-                                                       nullptr));
-  }
-  return true;
-}
-
-FileDescriptorPtr DeltaPerformer::ChooseSourceFD(
-    const InstallOperation& operation, ErrorCode* error) {
-  if (source_fd_ == nullptr) {
-    LOG(ERROR) << "ChooseSourceFD fail: source_fd_ == nullptr";
-    return nullptr;
-  }
-
-  if (!operation.has_src_sha256_hash()) {
-    // When the operation doesn't include a source hash, we attempt the error
-    // corrected device first since we can't verify the block in the raw device
-    // at this point, but we first need to make sure all extents are readable
-    // since the error corrected device can be shorter or not available.
-    if (OpenCurrentECCPartition() &&
-        fd_utils::ReadAndHashExtents(
-            source_ecc_fd_, operation.src_extents(), block_size_, nullptr)) {
-      return source_ecc_fd_;
-    }
-    return source_fd_;
-  }
-
-  brillo::Blob source_hash;
-  brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
-                                    operation.src_sha256_hash().end());
-  if (fd_utils::ReadAndHashExtents(
-          source_fd_, operation.src_extents(), block_size_, &source_hash) &&
-      source_hash == expected_source_hash) {
-    return source_fd_;
-  }
-  // We fall back to use the error corrected device if the hash of the raw
-  // device doesn't match or there was an error reading the source partition.
-  if (!OpenCurrentECCPartition()) {
-    // The following function call will return false since the source hash
-    // mismatches, but we still want to call it so it prints the appropriate
-    // log message.
-    ValidateSourceHash(source_hash, operation, source_fd_, error);
-    return nullptr;
-  }
-  LOG(WARNING) << "Source hash from RAW device mismatched: found "
-               << base::HexEncode(source_hash.data(), source_hash.size())
-               << ", expected "
-               << base::HexEncode(expected_source_hash.data(),
-                                  expected_source_hash.size());
-
-  if (fd_utils::ReadAndHashExtents(
-          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash) &&
-      ValidateSourceHash(source_hash, operation, source_ecc_fd_, error)) {
-    // At this point reading from the the error corrected device worked, but
-    // reading from the raw device failed, so this is considered a recovered
-    // failure.
-    source_ecc_recovered_failures_++;
-    return source_ecc_fd_;
-  }
-  return nullptr;
+  return partition_writer_->PerformSourceCopyOperation(operation, error);
 }
 
 bool DeltaPerformer::ExtentsToBsdiffPositionsString(
@@ -1286,69 +933,6 @@
   return true;
 }
 
-namespace {
-
-class BsdiffExtentFile : public bsdiff::FileInterface {
- public:
-  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader, size_t size)
-      : BsdiffExtentFile(std::move(reader), nullptr, size) {}
-  BsdiffExtentFile(std::unique_ptr<ExtentWriter> writer, size_t size)
-      : BsdiffExtentFile(nullptr, std::move(writer), size) {}
-
-  ~BsdiffExtentFile() override = default;
-
-  bool Read(void* buf, size_t count, size_t* bytes_read) override {
-    TEST_AND_RETURN_FALSE(reader_->Read(buf, count));
-    *bytes_read = count;
-    offset_ += count;
-    return true;
-  }
-
-  bool Write(const void* buf, size_t count, size_t* bytes_written) override {
-    TEST_AND_RETURN_FALSE(writer_->Write(buf, count));
-    *bytes_written = count;
-    offset_ += count;
-    return true;
-  }
-
-  bool Seek(off_t pos) override {
-    if (reader_ != nullptr) {
-      TEST_AND_RETURN_FALSE(reader_->Seek(pos));
-      offset_ = pos;
-    } else {
-      // For writes technically there should be no change of position, or it
-      // should be equivalent of current offset.
-      TEST_AND_RETURN_FALSE(offset_ == static_cast<uint64_t>(pos));
-    }
-    return true;
-  }
-
-  bool Close() override { return true; }
-
-  bool GetSize(uint64_t* size) override {
-    *size = size_;
-    return true;
-  }
-
- private:
-  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader,
-                   std::unique_ptr<ExtentWriter> writer,
-                   size_t size)
-      : reader_(std::move(reader)),
-        writer_(std::move(writer)),
-        size_(size),
-        offset_(0) {}
-
-  std::unique_ptr<ExtentReader> reader_;
-  std::unique_ptr<ExtentWriter> writer_;
-  uint64_t size_;
-  uint64_t offset_;
-
-  DISALLOW_COPY_AND_ASSIGN(BsdiffExtentFile);
-};
-
-}  // namespace
-
 bool DeltaPerformer::PerformSourceBsdiffOperation(
     const InstallOperation& operation, ErrorCode* error) {
   // Since we delete data off the beginning of the buffer as we use it,
@@ -1360,136 +944,20 @@
   if (operation.has_dst_length())
     TEST_AND_RETURN_FALSE(operation.dst_length() % block_size_ == 0);
 
-  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
-  TEST_AND_RETURN_FALSE(source_fd != nullptr);
-
-  auto reader = std::make_unique<DirectExtentReader>();
-  TEST_AND_RETURN_FALSE(
-      reader->Init(source_fd, operation.src_extents(), block_size_));
-  auto src_file = std::make_unique<BsdiffExtentFile>(
-      std::move(reader),
-      utils::BlocksInExtents(operation.src_extents()) * block_size_);
-
-  auto writer = std::make_unique<DirectExtentWriter>();
-  TEST_AND_RETURN_FALSE(
-      writer->Init(target_fd_, operation.dst_extents(), block_size_));
-  auto dst_file = std::make_unique<BsdiffExtentFile>(
-      std::move(writer),
-      utils::BlocksInExtents(operation.dst_extents()) * block_size_);
-
-  TEST_AND_RETURN_FALSE(bsdiff::bspatch(std::move(src_file),
-                                        std::move(dst_file),
-                                        buffer_.data(),
-                                        buffer_.size()) == 0);
+  TEST_AND_RETURN_FALSE(partition_writer_->PerformSourceBsdiffOperation(
+      operation, error, buffer_.data(), buffer_.size()));
   DiscardBuffer(true, buffer_.size());
   return true;
 }
 
-namespace {
-
-// A class to be passed to |puffpatch| for reading from |source_fd_| and writing
-// into |target_fd_|.
-class PuffinExtentStream : public puffin::StreamInterface {
- public:
-  // Constructor for creating a stream for reading from an |ExtentReader|.
-  PuffinExtentStream(std::unique_ptr<ExtentReader> reader, uint64_t size)
-      : PuffinExtentStream(std::move(reader), nullptr, size) {}
-
-  // Constructor for creating a stream for writing to an |ExtentWriter|.
-  PuffinExtentStream(std::unique_ptr<ExtentWriter> writer, uint64_t size)
-      : PuffinExtentStream(nullptr, std::move(writer), size) {}
-
-  ~PuffinExtentStream() override = default;
-
-  bool GetSize(uint64_t* size) const override {
-    *size = size_;
-    return true;
-  }
-
-  bool GetOffset(uint64_t* offset) const override {
-    *offset = offset_;
-    return true;
-  }
-
-  bool Seek(uint64_t offset) override {
-    if (is_read_) {
-      TEST_AND_RETURN_FALSE(reader_->Seek(offset));
-      offset_ = offset;
-    } else {
-      // For writes technically there should be no change of position, or it
-      // should equivalent of current offset.
-      TEST_AND_RETURN_FALSE(offset_ == offset);
-    }
-    return true;
-  }
-
-  bool Read(void* buffer, size_t count) override {
-    TEST_AND_RETURN_FALSE(is_read_);
-    TEST_AND_RETURN_FALSE(reader_->Read(buffer, count));
-    offset_ += count;
-    return true;
-  }
-
-  bool Write(const void* buffer, size_t count) override {
-    TEST_AND_RETURN_FALSE(!is_read_);
-    TEST_AND_RETURN_FALSE(writer_->Write(buffer, count));
-    offset_ += count;
-    return true;
-  }
-
-  bool Close() override { return true; }
-
- private:
-  PuffinExtentStream(std::unique_ptr<ExtentReader> reader,
-                     std::unique_ptr<ExtentWriter> writer,
-                     uint64_t size)
-      : reader_(std::move(reader)),
-        writer_(std::move(writer)),
-        size_(size),
-        offset_(0),
-        is_read_(reader_ ? true : false) {}
-
-  std::unique_ptr<ExtentReader> reader_;
-  std::unique_ptr<ExtentWriter> writer_;
-  uint64_t size_;
-  uint64_t offset_;
-  bool is_read_;
-
-  DISALLOW_COPY_AND_ASSIGN(PuffinExtentStream);
-};
-
-}  // namespace
-
 bool DeltaPerformer::PerformPuffDiffOperation(const InstallOperation& operation,
                                               ErrorCode* error) {
   // Since we delete data off the beginning of the buffer as we use it,
   // the data we need should be exactly at the beginning of the buffer.
   TEST_AND_RETURN_FALSE(buffer_offset_ == operation.data_offset());
   TEST_AND_RETURN_FALSE(buffer_.size() >= operation.data_length());
-
-  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
-  TEST_AND_RETURN_FALSE(source_fd != nullptr);
-
-  auto reader = std::make_unique<DirectExtentReader>();
-  TEST_AND_RETURN_FALSE(
-      reader->Init(source_fd, operation.src_extents(), block_size_));
-  puffin::UniqueStreamPtr src_stream(new PuffinExtentStream(
-      std::move(reader),
-      utils::BlocksInExtents(operation.src_extents()) * block_size_));
-
-  auto writer = std::make_unique<DirectExtentWriter>();
-  TEST_AND_RETURN_FALSE(
-      writer->Init(target_fd_, operation.dst_extents(), block_size_));
-  puffin::UniqueStreamPtr dst_stream(new PuffinExtentStream(
-      std::move(writer),
-      utils::BlocksInExtents(operation.dst_extents()) * block_size_));
-
-  const size_t kMaxCacheSize = 5 * 1024 * 1024;  // Total 5MB cache.
-  TEST_AND_RETURN_FALSE(puffin::PuffPatch(std::move(src_stream),
-                                          std::move(dst_stream),
-                                          buffer_.data(),
-                                          buffer_.size(),
-                                          kMaxCacheSize));
+  TEST_AND_RETURN_FALSE(partition_writer_->PerformPuffDiffOperation(
+      operation, error, buffer_.data(), buffer_.size()));
   DiscardBuffer(true, buffer_.size());
   return true;
 }
@@ -1502,11 +970,11 @@
       buffer_.begin(), buffer_.begin() + manifest_.signatures_size());
 
   // Save the signature blob because if the update is interrupted after the
-  // download phase we don't go through this path anymore. Some alternatives to
-  // consider:
+  // download phase we don't go through this path anymore. Some alternatives
+  // to consider:
   //
-  // 1. On resume, re-download the signature blob from the server and re-verify
-  // it.
+  // 1. On resume, re-download the signature blob from the server and
+  // re-verify it.
   //
   // 2. Verify the signature as soon as it's received and don't checkpoint the
   // blob and the signed sha-256 context.
@@ -1529,8 +997,8 @@
     return utils::ReadFile(public_key_path_, out_public_key);
   }
 
-  // If this is an official build then we are not allowed to use public key from
-  // Omaha response.
+  // If this is an official build then we are not allowed to use public key
+  // from Omaha response.
   if (!hardware_->IsOfficialBuild() && !install_plan_->public_key_rsa.empty()) {
     LOG(INFO) << "Verifying using public key from Omaha response.";
     return brillo::data_encoding::Base64Decode(install_plan_->public_key_rsa,
@@ -1642,34 +1110,41 @@
 
   // Check version field for a given PartitionUpdate object. If an error
   // is encountered, set |error_code| accordingly. If downgrade is detected,
-  // |downgrade_detected| is set. Return true if the program should continue to
-  // check the next partition or not, or false if it should exit early due to
-  // errors.
+  // |downgrade_detected| is set. Return true if the program should continue
+  // to check the next partition or not, or false if it should exit early due
+  // to errors.
   auto&& timestamp_valid = [this](const PartitionUpdate& partition,
                                   bool allow_empty_version,
                                   bool* downgrade_detected) -> ErrorCode {
+    const auto& partition_name = partition.partition_name();
     if (!partition.has_version()) {
+      if (hardware_->GetVersionForLogging(partition_name).empty()) {
+        LOG(INFO) << partition_name << " does't have version, skipping "
+                  << "downgrade check.";
+        return ErrorCode::kSuccess;
+      }
+
       if (allow_empty_version) {
         return ErrorCode::kSuccess;
       }
       LOG(ERROR)
-          << "PartitionUpdate " << partition.partition_name()
-          << " does ot have a version field. Not allowed in partial updates.";
+          << "PartitionUpdate " << partition_name
+          << " doesn't have a version field. Not allowed in partial updates.";
       return ErrorCode::kDownloadManifestParseError;
     }
 
-    auto error_code = hardware_->IsPartitionUpdateValid(
-        partition.partition_name(), partition.version());
+    auto error_code =
+        hardware_->IsPartitionUpdateValid(partition_name, partition.version());
     switch (error_code) {
       case ErrorCode::kSuccess:
         break;
       case ErrorCode::kPayloadTimestampError:
         *downgrade_detected = true;
-        LOG(WARNING) << "PartitionUpdate " << partition.partition_name()
+        LOG(WARNING) << "PartitionUpdate " << partition_name
                      << " has an older version than partition on device.";
         break;
       default:
-        LOG(ERROR) << "IsPartitionUpdateValid(" << partition.partition_name()
+        LOG(ERROR) << "IsPartitionUpdateValid(" << partition_name
                    << ") returned" << utils::ErrorCodeToString(error_code);
         break;
     }
@@ -1722,10 +1197,11 @@
     const InstallOperation& operation) {
   if (!operation.data_sha256_hash().size()) {
     if (!operation.data_length()) {
-      // Operations that do not have any data blob won't have any operation hash
-      // either. So, these operations are always considered validated since the
-      // metadata that contains all the non-data-blob portions of the operation
-      // has already been validated. This is true for both HTTP and HTTPS cases.
+      // Operations that do not have any data blob won't have any operation
+      // hash either. So, these operations are always considered validated
+      // since the metadata that contains all the non-data-blob portions of
+      // the operation has already been validated. This is true for both HTTP
+      // and HTTPS cases.
       return ErrorCode::kSuccess;
     }
 
@@ -1864,8 +1340,8 @@
     return false;
 
   int64_t resumed_update_failures;
-  // Note that storing this value is optional, but if it is there it should not
-  // be more than the limit.
+  // Note that storing this value is optional, but if it is there it should
+  // not be more than the limit.
   if (prefs->GetInt64(kPrefsResumedUpdateFailures, &resumed_update_failures) &&
       resumed_update_failures > kMaxResumedUpdateFailures)
     return false;
@@ -2024,4 +1500,10 @@
   return true;
 }
 
+bool DeltaPerformer::IsDynamicPartition(const std::string& part_name) {
+  return std::find(dynamic_partitions_.begin(),
+                   dynamic_partitions_.end(),
+                   part_name) != dynamic_partitions_.end();
+}
+
 }  // namespace chromeos_update_engine
diff --git a/payload_consumer/delta_performer.h b/payload_consumer/delta_performer.h
index e4b56c1..4990bf8 100644
--- a/payload_consumer/delta_performer.h
+++ b/payload_consumer/delta_performer.h
@@ -35,6 +35,7 @@
 #include "update_engine/payload_consumer/file_descriptor.h"
 #include "update_engine/payload_consumer/file_writer.h"
 #include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_consumer/partition_writer.h"
 #include "update_engine/payload_consumer/payload_metadata.h"
 #include "update_engine/payload_consumer/payload_verifier.h"
 #include "update_engine/update_metadata.pb.h"
@@ -101,10 +102,6 @@
   // work. Returns whether the required file descriptors were successfully open.
   bool OpenCurrentPartition();
 
-  // Attempt to open the error-corrected device for the current partition.
-  // Returns whether the operation succeeded.
-  bool OpenCurrentECCPartition();
-
   // Closes the current partition file descriptors if open. Returns 0 on success
   // or -errno on error.
   int CloseCurrentPartition();
@@ -204,7 +201,6 @@
   friend class DeltaPerformerIntegrationTest;
   FRIEND_TEST(DeltaPerformerTest, BrilloMetadataSignatureSizeTest);
   FRIEND_TEST(DeltaPerformerTest, BrilloParsePayloadMetadataTest);
-  FRIEND_TEST(DeltaPerformerTest, ChooseSourceFDTest);
   FRIEND_TEST(DeltaPerformerTest, UsePublicKeyFromResponse);
 
   // Parse and move the update instructions of all partitions into our local
@@ -258,13 +254,6 @@
   bool PerformPuffDiffOperation(const InstallOperation& operation,
                                 ErrorCode* error);
 
-  // For a given operation, choose the source fd to be used (raw device or error
-  // correction device) based on the source operation hash.
-  // Returns nullptr if the source hash mismatch cannot be corrected, and set
-  // the |error| accordingly.
-  FileDescriptorPtr ChooseSourceFD(const InstallOperation& operation,
-                                   ErrorCode* error);
-
   // Extracts the payload signature message from the current |buffer_| if the
   // offset matches the one specified by the manifest. Returns whether the
   // signature was extracted.
@@ -314,6 +303,8 @@
   //   a generic error on the device.
   ErrorCode CheckTimestampError() const;
 
+  // Check if partition `part_name` is a dynamic partition.
+  bool IsDynamicPartition(const std::string& part_name);
   // Update Engine preference store.
   PrefsInterface* prefs_;
 
@@ -331,34 +322,6 @@
   // Pointer to the current payload in install_plan_.payloads.
   InstallPlan::Payload* payload_{nullptr};
 
-  // File descriptor of the source partition. Only set while updating a
-  // partition when using a delta payload.
-  FileDescriptorPtr source_fd_{nullptr};
-
-  // File descriptor of the error corrected source partition. Only set while
-  // updating partition using a delta payload for a partition where error
-  // correction is available. The size of the error corrected device is smaller
-  // than the underlying raw device, since it doesn't include the error
-  // correction blocks.
-  FileDescriptorPtr source_ecc_fd_{nullptr};
-
-  // The total number of operations that failed source hash verification but
-  // passed after falling back to the error-corrected |source_ecc_fd_| device.
-  uint64_t source_ecc_recovered_failures_{0};
-
-  // Whether opening the current partition as an error-corrected device failed.
-  // Used to avoid re-opening the same source partition if it is not actually
-  // error corrected.
-  bool source_ecc_open_failure_{false};
-
-  // File descriptor of the target partition. Only set while performing the
-  // operations of a given partition.
-  FileDescriptorPtr target_fd_{nullptr};
-
-  // Paths the |source_fd_| and |target_fd_| refer to.
-  std::string source_path_;
-  std::string target_path_;
-
   PayloadMetadata payload_metadata_;
 
   // Parsed manifest. Set after enough bytes to parse the manifest were
@@ -379,22 +342,22 @@
   // otherwise 0.
   size_t num_total_operations_{0};
 
-  // The list of partitions to update as found in the manifest major version 2.
-  // When parsing an older manifest format, the information is converted over to
-  // this format instead.
+  // The list of partitions to update as found in the manifest major
+  // version 2. When parsing an older manifest format, the information is
+  // converted over to this format instead.
   std::vector<PartitionUpdate> partitions_;
 
   // Index in the list of partitions (|partitions_| member) of the current
   // partition being processed.
   size_t current_partition_{0};
 
-  // Index of the next operation to perform in the manifest. The index is linear
-  // on the total number of operation on the manifest.
+  // Index of the next operation to perform in the manifest. The index is
+  // linear on the total number of operation on the manifest.
   size_t next_operation_num_{0};
 
   // A buffer used for accumulating downloaded data. Initially, it stores the
-  // payload metadata; once that's downloaded and parsed, it stores data for the
-  // next update operation.
+  // payload metadata; once that's downloaded and parsed, it stores data for
+  // the next update operation.
   brillo::Blob buffer_;
   // Offset of buffer_ in the binary blobs section of the update.
   uint64_t buffer_offset_{0};
@@ -436,8 +399,9 @@
   // If |true|, the update is user initiated (vs. periodic update checks).
   bool interactive_{false};
 
-  // The timeout after which we should force emitting a progress log (constant),
-  // and the actual point in time for the next forced log to be emitted.
+  // The timeout after which we should force emitting a progress log
+  // (constant), and the actual point in time for the next forced log to be
+  // emitted.
   const base::TimeDelta forced_progress_log_wait_{
       base::TimeDelta::FromSeconds(kProgressLogTimeoutSeconds)};
   base::TimeTicks forced_progress_log_time_;
@@ -448,6 +412,10 @@
       base::TimeDelta::FromSeconds(kCheckpointFrequencySeconds)};
   base::TimeTicks update_checkpoint_time_;
 
+  std::unique_ptr<PartitionWriter> partition_writer_;
+
+  // List of dynamic partitions on device.
+  std::vector<std::string> dynamic_partitions_;
   DISALLOW_COPY_AND_ASSIGN(DeltaPerformer);
 };
 
diff --git a/payload_consumer/delta_performer_unittest.cc b/payload_consumer/delta_performer_unittest.cc
index 65b9dac..f742b1c 100644
--- a/payload_consumer/delta_performer_unittest.cc
+++ b/payload_consumer/delta_performer_unittest.cc
@@ -418,22 +418,7 @@
     EXPECT_EQ(payload_.metadata_size, performer_.metadata_size_);
   }
 
-  // Helper function to pretend that the ECC file descriptor was already opened.
-  // Returns a pointer to the created file descriptor.
-  FakeFileDescriptor* SetFakeECCFile(size_t size) {
-    EXPECT_FALSE(performer_.source_ecc_fd_) << "source_ecc_fd_ already open.";
-    FakeFileDescriptor* ret = new FakeFileDescriptor();
-    fake_ecc_fd_.reset(ret);
-    // Call open to simulate it was already opened.
-    ret->Open("", 0);
-    ret->SetFileSize(size);
-    performer_.source_ecc_fd_ = fake_ecc_fd_;
-    return ret;
-  }
 
-  uint64_t GetSourceEccRecoveredFailures() const {
-    return performer_.source_ecc_recovered_failures_;
-  }
 
   FakePrefs prefs_;
   InstallPlan install_plan_;
@@ -660,95 +645,6 @@
   EXPECT_EQ(actual_data, ApplyPayload(payload_data, source.path(), false));
 }
 
-// Test that the error-corrected file descriptor is used to read the partition
-// since the source partition doesn't match the operation hash.
-TEST_F(DeltaPerformerTest, ErrorCorrectionSourceCopyFallbackTest) {
-  constexpr size_t kCopyOperationSize = 4 * 4096;
-  ScopedTempFile source("Source-XXXXXX");
-  // Write invalid data to the source image, which doesn't match the expected
-  // hash.
-  brillo::Blob invalid_data(kCopyOperationSize, 0x55);
-  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), invalid_data));
-
-  // Setup the fec file descriptor as the fake stream, which matches
-  // |expected_data|.
-  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize);
-  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
-
-  PartitionConfig old_part(kPartitionNameRoot);
-  old_part.path = source.path();
-  old_part.size = invalid_data.size();
-
-  brillo::Blob payload_data =
-      GenerateSourceCopyPayload(expected_data, true, &old_part);
-  EXPECT_EQ(expected_data, ApplyPayload(payload_data, source.path(), true));
-  // Verify that the fake_fec was actually used.
-  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
-  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
-}
-
-// Test that the error-corrected file descriptor is used to read a partition
-// when no hash is available for SOURCE_COPY but it falls back to the normal
-// file descriptor when the size of the error corrected one is too small.
-TEST_F(DeltaPerformerTest, ErrorCorrectionSourceCopyWhenNoHashFallbackTest) {
-  constexpr size_t kCopyOperationSize = 4 * 4096;
-  ScopedTempFile source("Source-XXXXXX");
-  // Setup the source path with the right expected data.
-  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
-  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), expected_data));
-
-  // Setup the fec file descriptor as the fake stream, with smaller data than
-  // the expected.
-  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize / 2);
-
-  PartitionConfig old_part(kPartitionNameRoot);
-  old_part.path = source.path();
-  old_part.size = expected_data.size();
-
-  // The payload operation doesn't include an operation hash.
-  brillo::Blob payload_data =
-      GenerateSourceCopyPayload(expected_data, false, &old_part);
-  EXPECT_EQ(expected_data, ApplyPayload(payload_data, source.path(), true));
-  // Verify that the fake_fec was attempted to be used. Since the file
-  // descriptor is shorter it can actually do more than one read to realize it
-  // reached the EOF.
-  EXPECT_LE(1U, fake_fec->GetReadOps().size());
-  // This fallback doesn't count as an error-corrected operation since the
-  // operation hash was not available.
-  EXPECT_EQ(0U, GetSourceEccRecoveredFailures());
-}
-
-TEST_F(DeltaPerformerTest, ChooseSourceFDTest) {
-  constexpr size_t kSourceSize = 4 * 4096;
-  ScopedTempFile source("Source-XXXXXX");
-  // Write invalid data to the source image, which doesn't match the expected
-  // hash.
-  brillo::Blob invalid_data(kSourceSize, 0x55);
-  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), invalid_data));
-
-  performer_.source_fd_ = std::make_shared<EintrSafeFileDescriptor>();
-  performer_.source_fd_->Open(source.path().c_str(), O_RDONLY);
-  performer_.block_size_ = 4096;
-
-  // Setup the fec file descriptor as the fake stream, which matches
-  // |expected_data|.
-  FakeFileDescriptor* fake_fec = SetFakeECCFile(kSourceSize);
-  brillo::Blob expected_data = FakeFileDescriptorData(kSourceSize);
-
-  InstallOperation op;
-  *(op.add_src_extents()) = ExtentForRange(0, kSourceSize / 4096);
-  brillo::Blob src_hash;
-  EXPECT_TRUE(HashCalculator::RawHashOfData(expected_data, &src_hash));
-  op.set_src_sha256_hash(src_hash.data(), src_hash.size());
-
-  ErrorCode error = ErrorCode::kSuccess;
-  EXPECT_EQ(performer_.source_ecc_fd_, performer_.ChooseSourceFD(op, &error));
-  EXPECT_EQ(ErrorCode::kSuccess, error);
-  // Verify that the fake_fec was actually used.
-  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
-  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
-}
-
 TEST_F(DeltaPerformerTest, ExtentsToByteStringTest) {
   uint64_t test[] = {1, 1, 4, 2, 0, 1};
   static_assert(base::size(test) % 2 == 0, "Array size uneven");
diff --git a/payload_consumer/fec_file_descriptor.cc b/payload_consumer/fec_file_descriptor.cc
index de22cf3..3fee196 100644
--- a/payload_consumer/fec_file_descriptor.cc
+++ b/payload_consumer/fec_file_descriptor.cc
@@ -16,6 +16,8 @@
 
 #include "update_engine/payload_consumer/fec_file_descriptor.h"
 
+#include <base/logging.h>
+
 namespace chromeos_update_engine {
 
 bool FecFileDescriptor::Open(const char* path, int flags) {
diff --git a/payload_consumer/file_descriptor.cc b/payload_consumer/file_descriptor.cc
index 1de615c..6101c68 100644
--- a/payload_consumer/file_descriptor.cc
+++ b/payload_consumer/file_descriptor.cc
@@ -21,6 +21,7 @@
 #include <sys/ioctl.h>
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <unistd.h>
 
 #include <base/posix/eintr_wrapper.h>
 
@@ -125,11 +126,16 @@
 
 bool EintrSafeFileDescriptor::Flush() {
   CHECK_GE(fd_, 0);
+  // Implemented as a No-Op, as delta_performer typically uses |O_DSYNC|, except
+  // in interactive settings.
   return true;
 }
 
 bool EintrSafeFileDescriptor::Close() {
   CHECK_GE(fd_, 0);
+  // https://stackoverflow.com/questions/705454/does-linux-guarantee-the-contents-of-a-file-is-flushed-to-disc-after-close
+  // |close()| doesn't imply |fsync()|, we need to do it manually.
+  fsync(fd_);
   if (IGNORE_EINTR(close(fd_)))
     return false;
   fd_ = -1;
diff --git a/payload_consumer/file_descriptor.h b/payload_consumer/file_descriptor.h
index 55f76c6..fb07ff0 100644
--- a/payload_consumer/file_descriptor.h
+++ b/payload_consumer/file_descriptor.h
@@ -21,7 +21,7 @@
 #include <sys/types.h>
 #include <memory>
 
-#include <base/logging.h>
+#include <base/macros.h>
 
 // Abstraction for managing opening, reading, writing and closing of file
 // descriptors. This includes an abstract class and one standard implementation
diff --git a/payload_consumer/partition_writer.cc b/payload_consumer/partition_writer.cc
new file mode 100644
index 0000000..b4b869c
--- /dev/null
+++ b/payload_consumer/partition_writer.cc
@@ -0,0 +1,654 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+#include <update_engine/payload_consumer/partition_writer.h>
+
+#include <fcntl.h>
+#include <linux/fs.h>
+
+#include <algorithm>
+#include <initializer_list>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include <base/strings/string_number_conversions.h>
+#include <bsdiff/bspatch.h>
+#include <puffin/puffpatch.h>
+#include <bsdiff/file_interface.h>
+#include <puffin/stream.h>
+
+#include "update_engine/common/terminator.h"
+#include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/bzip_extent_writer.h"
+#include "update_engine/payload_consumer/cached_file_descriptor.h"
+#include "update_engine/payload_consumer/extent_reader.h"
+#include "update_engine/payload_consumer/extent_writer.h"
+#include "update_engine/payload_consumer/fec_file_descriptor.h"
+#include "update_engine/payload_consumer/file_descriptor_utils.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_consumer/mount_history.h"
+#include "update_engine/payload_consumer/payload_constants.h"
+#include "update_engine/payload_consumer/xz_extent_writer.h"
+
+namespace chromeos_update_engine {
+
+namespace {
+constexpr uint64_t kCacheSize = 1024 * 1024;  // 1MB
+
+// Discard the tail of the block device referenced by |fd|, from the offset
+// |data_size| until the end of the block device. Returns whether the data was
+// discarded.
+
+bool DiscardPartitionTail(const FileDescriptorPtr& fd, uint64_t data_size) {
+  uint64_t part_size = fd->BlockDevSize();
+  if (!part_size || part_size <= data_size)
+    return false;
+
+  struct blkioctl_request {
+    int number;
+    const char* name;
+  };
+  const std::initializer_list<blkioctl_request> blkioctl_requests = {
+      {BLKDISCARD, "BLKDISCARD"},
+      {BLKSECDISCARD, "BLKSECDISCARD"},
+#ifdef BLKZEROOUT
+      {BLKZEROOUT, "BLKZEROOUT"},
+#endif
+  };
+  for (const auto& req : blkioctl_requests) {
+    int error = 0;
+    if (fd->BlkIoctl(req.number, data_size, part_size - data_size, &error) &&
+        error == 0) {
+      return true;
+    }
+    LOG(WARNING) << "Error discarding the last "
+                 << (part_size - data_size) / 1024 << " KiB using ioctl("
+                 << req.name << ")";
+  }
+  return false;
+}
+
+}  // namespace
+
+// Opens path for read/write. On success returns an open FileDescriptor
+// and sets *err to 0. On failure, sets *err to errno and returns nullptr.
+FileDescriptorPtr OpenFile(const char* path,
+                           int mode,
+                           bool cache_writes,
+                           int* err) {
+  // Try to mark the block device read-only based on the mode. Ignore any
+  // failure since this won't work when passing regular files.
+  bool read_only = (mode & O_ACCMODE) == O_RDONLY;
+  utils::SetBlockDeviceReadOnly(path, read_only);
+
+  FileDescriptorPtr fd(new EintrSafeFileDescriptor());
+  if (cache_writes && !read_only) {
+    fd = FileDescriptorPtr(new CachedFileDescriptor(fd, kCacheSize));
+    LOG(INFO) << "Caching writes.";
+  }
+  if (!fd->Open(path, mode, 000)) {
+    *err = errno;
+    PLOG(ERROR) << "Unable to open file " << path;
+    return nullptr;
+  }
+  *err = 0;
+  return fd;
+}
+
+class BsdiffExtentFile : public bsdiff::FileInterface {
+ public:
+  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader, size_t size)
+      : BsdiffExtentFile(std::move(reader), nullptr, size) {}
+  BsdiffExtentFile(std::unique_ptr<ExtentWriter> writer, size_t size)
+      : BsdiffExtentFile(nullptr, std::move(writer), size) {}
+
+  ~BsdiffExtentFile() override = default;
+
+  bool Read(void* buf, size_t count, size_t* bytes_read) override {
+    TEST_AND_RETURN_FALSE(reader_->Read(buf, count));
+    *bytes_read = count;
+    offset_ += count;
+    return true;
+  }
+
+  bool Write(const void* buf, size_t count, size_t* bytes_written) override {
+    TEST_AND_RETURN_FALSE(writer_->Write(buf, count));
+    *bytes_written = count;
+    offset_ += count;
+    return true;
+  }
+
+  bool Seek(off_t pos) override {
+    if (reader_ != nullptr) {
+      TEST_AND_RETURN_FALSE(reader_->Seek(pos));
+      offset_ = pos;
+    } else {
+      // For writes technically there should be no change of position, or it
+      // should be equivalent of current offset.
+      TEST_AND_RETURN_FALSE(offset_ == static_cast<uint64_t>(pos));
+    }
+    return true;
+  }
+
+  bool Close() override { return true; }
+
+  bool GetSize(uint64_t* size) override {
+    *size = size_;
+    return true;
+  }
+
+ private:
+  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader,
+                   std::unique_ptr<ExtentWriter> writer,
+                   size_t size)
+      : reader_(std::move(reader)),
+        writer_(std::move(writer)),
+        size_(size),
+        offset_(0) {}
+
+  std::unique_ptr<ExtentReader> reader_;
+  std::unique_ptr<ExtentWriter> writer_;
+  uint64_t size_;
+  uint64_t offset_;
+
+  DISALLOW_COPY_AND_ASSIGN(BsdiffExtentFile);
+};
+// A class to be passed to |puffpatch| for reading from |source_fd_| and writing
+// into |target_fd_|.
+class PuffinExtentStream : public puffin::StreamInterface {
+ public:
+  // Constructor for creating a stream for reading from an |ExtentReader|.
+  PuffinExtentStream(std::unique_ptr<ExtentReader> reader, uint64_t size)
+      : PuffinExtentStream(std::move(reader), nullptr, size) {}
+
+  // Constructor for creating a stream for writing to an |ExtentWriter|.
+  PuffinExtentStream(std::unique_ptr<ExtentWriter> writer, uint64_t size)
+      : PuffinExtentStream(nullptr, std::move(writer), size) {}
+
+  ~PuffinExtentStream() override = default;
+
+  bool GetSize(uint64_t* size) const override {
+    *size = size_;
+    return true;
+  }
+
+  bool GetOffset(uint64_t* offset) const override {
+    *offset = offset_;
+    return true;
+  }
+
+  bool Seek(uint64_t offset) override {
+    if (is_read_) {
+      TEST_AND_RETURN_FALSE(reader_->Seek(offset));
+      offset_ = offset;
+    } else {
+      // For writes technically there should be no change of position, or it
+      // should equivalent of current offset.
+      TEST_AND_RETURN_FALSE(offset_ == offset);
+    }
+    return true;
+  }
+
+  bool Read(void* buffer, size_t count) override {
+    TEST_AND_RETURN_FALSE(is_read_);
+    TEST_AND_RETURN_FALSE(reader_->Read(buffer, count));
+    offset_ += count;
+    return true;
+  }
+
+  bool Write(const void* buffer, size_t count) override {
+    TEST_AND_RETURN_FALSE(!is_read_);
+    TEST_AND_RETURN_FALSE(writer_->Write(buffer, count));
+    offset_ += count;
+    return true;
+  }
+
+  bool Close() override { return true; }
+
+ private:
+  PuffinExtentStream(std::unique_ptr<ExtentReader> reader,
+                     std::unique_ptr<ExtentWriter> writer,
+                     uint64_t size)
+      : reader_(std::move(reader)),
+        writer_(std::move(writer)),
+        size_(size),
+        offset_(0),
+        is_read_(reader_ ? true : false) {}
+
+  std::unique_ptr<ExtentReader> reader_;
+  std::unique_ptr<ExtentWriter> writer_;
+  uint64_t size_;
+  uint64_t offset_;
+  bool is_read_;
+
+  DISALLOW_COPY_AND_ASSIGN(PuffinExtentStream);
+};
+
+PartitionWriter::PartitionWriter(
+    const PartitionUpdate& partition_update,
+    const InstallPlan::Partition& install_part,
+    DynamicPartitionControlInterface* dynamic_control,
+    size_t block_size,
+    bool is_interactive)
+    : partition_update_(partition_update),
+      install_part_(install_part),
+      dynamic_control_(dynamic_control),
+      interactive_(is_interactive),
+      block_size_(block_size) {}
+
+PartitionWriter::~PartitionWriter() {
+  Close();
+}
+
+bool PartitionWriter::Init(const InstallPlan* install_plan,
+                           bool source_may_exist) {
+  const PartitionUpdate& partition = partition_update_;
+  uint32_t source_slot = install_plan->source_slot;
+  uint32_t target_slot = install_plan->target_slot;
+
+  // We shouldn't open the source partition in certain cases, e.g. some dynamic
+  // partitions in delta payload, partitions included in the full payload for
+  // partial updates. Use the source size as the indicator.
+  if (source_may_exist && install_part_.source_size > 0) {
+    source_path_ = install_part_.source_path;
+    int err;
+    source_fd_ = OpenFile(source_path_.c_str(), O_RDONLY, false, &err);
+    if (!source_fd_) {
+      LOG(ERROR) << "Unable to open source partition "
+                 << partition.partition_name() << " on slot "
+                 << BootControlInterface::SlotName(source_slot) << ", file "
+                 << source_path_;
+      return false;
+    }
+  }
+
+  target_path_ = install_part_.target_path;
+  int err;
+
+  int flags = O_RDWR;
+  if (!interactive_)
+    flags |= O_DSYNC;
+
+  LOG(INFO) << "Opening " << target_path_ << " partition with"
+            << (interactive_ ? "out" : "") << " O_DSYNC";
+
+  target_fd_ = OpenFile(target_path_.c_str(), flags, true, &err);
+  if (!target_fd_) {
+    LOG(ERROR) << "Unable to open target partition "
+               << partition.partition_name() << " on slot "
+               << BootControlInterface::SlotName(target_slot) << ", file "
+               << target_path_;
+    return false;
+  }
+
+  LOG(INFO) << "Applying " << partition.operations().size()
+            << " operations to partition \"" << partition.partition_name()
+            << "\"";
+
+  // Discard the end of the partition, but ignore failures.
+  DiscardPartitionTail(target_fd_, install_part_.target_size);
+
+  return true;
+}
+
+bool PartitionWriter::PerformReplaceOperation(const InstallOperation& operation,
+                                              const void* data,
+                                              size_t count) {
+  // Setup the ExtentWriter stack based on the operation type.
+  std::unique_ptr<ExtentWriter> writer = CreateBaseExtentWriter();
+
+  if (operation.type() == InstallOperation::REPLACE_BZ) {
+    writer.reset(new BzipExtentWriter(std::move(writer)));
+  } else if (operation.type() == InstallOperation::REPLACE_XZ) {
+    writer.reset(new XzExtentWriter(std::move(writer)));
+  }
+
+  TEST_AND_RETURN_FALSE(
+      writer->Init(target_fd_, operation.dst_extents(), block_size_));
+  TEST_AND_RETURN_FALSE(writer->Write(data, operation.data_length()));
+
+  return Flush();
+}
+
+bool PartitionWriter::PerformZeroOrDiscardOperation(
+    const InstallOperation& operation) {
+#ifdef BLKZEROOUT
+  bool attempt_ioctl = true;
+  int request =
+      (operation.type() == InstallOperation::ZERO ? BLKZEROOUT : BLKDISCARD);
+#else   // !defined(BLKZEROOUT)
+  bool attempt_ioctl = false;
+  int request = 0;
+#endif  // !defined(BLKZEROOUT)
+
+  brillo::Blob zeros;
+  for (const Extent& extent : operation.dst_extents()) {
+    const uint64_t start = extent.start_block() * block_size_;
+    const uint64_t length = extent.num_blocks() * block_size_;
+    if (attempt_ioctl) {
+      int result = 0;
+      if (target_fd_->BlkIoctl(request, start, length, &result) && result == 0)
+        continue;
+      attempt_ioctl = false;
+    }
+    // In case of failure, we fall back to writing 0 to the selected region.
+    zeros.resize(16 * block_size_);
+    for (uint64_t offset = 0; offset < length; offset += zeros.size()) {
+      uint64_t chunk_length =
+          std::min(length - offset, static_cast<uint64_t>(zeros.size()));
+      TEST_AND_RETURN_FALSE(utils::PWriteAll(
+          target_fd_, zeros.data(), chunk_length, start + offset));
+    }
+  }
+  return Flush();
+}
+
+bool PartitionWriter::PerformSourceCopyOperation(
+    const InstallOperation& operation, ErrorCode* error) {
+  TEST_AND_RETURN_FALSE(source_fd_ != nullptr);
+
+  // The device may optimize the SOURCE_COPY operation.
+  // Being this a device-specific optimization let DynamicPartitionController
+  // decide it the operation should be skipped.
+  const PartitionUpdate& partition = partition_update_;
+  const auto& partition_control = dynamic_control_;
+
+  InstallOperation buf;
+  bool should_optimize = partition_control->OptimizeOperation(
+      partition.partition_name(), operation, &buf);
+  const InstallOperation& optimized = should_optimize ? buf : operation;
+
+  if (operation.has_src_sha256_hash()) {
+    bool read_ok;
+    brillo::Blob source_hash;
+    brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
+                                      operation.src_sha256_hash().end());
+
+    // We fall back to use the error corrected device if the hash of the raw
+    // device doesn't match or there was an error reading the source partition.
+    // Note that this code will also fall back if writing the target partition
+    // fails.
+    if (should_optimize) {
+      // Hash operation.src_extents(), then copy optimized.src_extents to
+      // optimized.dst_extents.
+      read_ok =
+          fd_utils::ReadAndHashExtents(
+              source_fd_, operation.src_extents(), block_size_, &source_hash) &&
+          fd_utils::CopyAndHashExtents(source_fd_,
+                                       optimized.src_extents(),
+                                       target_fd_,
+                                       optimized.dst_extents(),
+                                       block_size_,
+                                       nullptr /* skip hashing */);
+    } else {
+      read_ok = fd_utils::CopyAndHashExtents(source_fd_,
+                                             operation.src_extents(),
+                                             target_fd_,
+                                             operation.dst_extents(),
+                                             block_size_,
+                                             &source_hash);
+    }
+    if (read_ok && expected_source_hash == source_hash)
+      return true;
+    LOG(WARNING) << "Source hash from RAW device mismatched, attempting to "
+                    "correct using ECC";
+    if (!OpenCurrentECCPartition()) {
+      // The following function call will return false since the source hash
+      // mismatches, but we still want to call it so it prints the appropriate
+      // log message.
+      return ValidateSourceHash(source_hash, operation, source_fd_, error);
+    }
+
+    LOG(WARNING) << "Source hash from RAW device mismatched: found "
+                 << base::HexEncode(source_hash.data(), source_hash.size())
+                 << ", expected "
+                 << base::HexEncode(expected_source_hash.data(),
+                                    expected_source_hash.size());
+    if (should_optimize) {
+      TEST_AND_RETURN_FALSE(fd_utils::ReadAndHashExtents(
+          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash));
+      TEST_AND_RETURN_FALSE(
+          fd_utils::CopyAndHashExtents(source_ecc_fd_,
+                                       optimized.src_extents(),
+                                       target_fd_,
+                                       optimized.dst_extents(),
+                                       block_size_,
+                                       nullptr /* skip hashing */));
+    } else {
+      TEST_AND_RETURN_FALSE(
+          fd_utils::CopyAndHashExtents(source_ecc_fd_,
+                                       operation.src_extents(),
+                                       target_fd_,
+                                       operation.dst_extents(),
+                                       block_size_,
+                                       &source_hash));
+    }
+    TEST_AND_RETURN_FALSE(
+        ValidateSourceHash(source_hash, operation, source_ecc_fd_, error));
+    // At this point reading from the error corrected device worked, but
+    // reading from the raw device failed, so this is considered a recovered
+    // failure.
+    source_ecc_recovered_failures_++;
+  } else {
+    // When the operation doesn't include a source hash, we attempt the error
+    // corrected device first since we can't verify the block in the raw device
+    // at this point, but we fall back to the raw device since the error
+    // corrected device can be shorter or not available.
+
+    if (OpenCurrentECCPartition() &&
+        fd_utils::CopyAndHashExtents(source_ecc_fd_,
+                                     optimized.src_extents(),
+                                     target_fd_,
+                                     optimized.dst_extents(),
+                                     block_size_,
+                                     nullptr)) {
+      return true;
+    }
+    TEST_AND_RETURN_FALSE(fd_utils::CopyAndHashExtents(source_fd_,
+                                                       optimized.src_extents(),
+                                                       target_fd_,
+                                                       optimized.dst_extents(),
+                                                       block_size_,
+                                                       nullptr));
+  }
+  return Flush();
+}
+
+bool PartitionWriter::PerformSourceBsdiffOperation(
+    const InstallOperation& operation,
+    ErrorCode* error,
+    const void* data,
+    size_t count) {
+  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
+  TEST_AND_RETURN_FALSE(source_fd != nullptr);
+
+  auto reader = std::make_unique<DirectExtentReader>();
+  TEST_AND_RETURN_FALSE(
+      reader->Init(source_fd, operation.src_extents(), block_size_));
+  auto src_file = std::make_unique<BsdiffExtentFile>(
+      std::move(reader),
+      utils::BlocksInExtents(operation.src_extents()) * block_size_);
+
+  auto writer = CreateBaseExtentWriter();
+  TEST_AND_RETURN_FALSE(
+      writer->Init(target_fd_, operation.dst_extents(), block_size_));
+  auto dst_file = std::make_unique<BsdiffExtentFile>(
+      std::move(writer),
+      utils::BlocksInExtents(operation.dst_extents()) * block_size_);
+
+  TEST_AND_RETURN_FALSE(bsdiff::bspatch(std::move(src_file),
+                                        std::move(dst_file),
+                                        reinterpret_cast<const uint8_t*>(data),
+                                        count) == 0);
+  return Flush();
+}
+
+bool PartitionWriter::PerformPuffDiffOperation(
+    const InstallOperation& operation,
+    ErrorCode* error,
+    const void* data,
+    size_t count) {
+  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
+  TEST_AND_RETURN_FALSE(source_fd != nullptr);
+
+  auto reader = std::make_unique<DirectExtentReader>();
+  TEST_AND_RETURN_FALSE(
+      reader->Init(source_fd, operation.src_extents(), block_size_));
+  puffin::UniqueStreamPtr src_stream(new PuffinExtentStream(
+      std::move(reader),
+      utils::BlocksInExtents(operation.src_extents()) * block_size_));
+
+  auto writer = CreateBaseExtentWriter();
+  TEST_AND_RETURN_FALSE(
+      writer->Init(target_fd_, operation.dst_extents(), block_size_));
+  puffin::UniqueStreamPtr dst_stream(new PuffinExtentStream(
+      std::move(writer),
+      utils::BlocksInExtents(operation.dst_extents()) * block_size_));
+
+  constexpr size_t kMaxCacheSize = 5 * 1024 * 1024;  // Total 5MB cache.
+  TEST_AND_RETURN_FALSE(
+      puffin::PuffPatch(std::move(src_stream),
+                        std::move(dst_stream),
+                        reinterpret_cast<const uint8_t*>(data),
+                        count,
+                        kMaxCacheSize));
+  return Flush();
+}
+
+FileDescriptorPtr PartitionWriter::ChooseSourceFD(
+    const InstallOperation& operation, ErrorCode* error) {
+  if (source_fd_ == nullptr) {
+    LOG(ERROR) << "ChooseSourceFD fail: source_fd_ == nullptr";
+    return nullptr;
+  }
+
+  if (!operation.has_src_sha256_hash()) {
+    // When the operation doesn't include a source hash, we attempt the error
+    // corrected device first since we can't verify the block in the raw device
+    // at this point, but we first need to make sure all extents are readable
+    // since the error corrected device can be shorter or not available.
+    if (OpenCurrentECCPartition() &&
+        fd_utils::ReadAndHashExtents(
+            source_ecc_fd_, operation.src_extents(), block_size_, nullptr)) {
+      return source_ecc_fd_;
+    }
+    return source_fd_;
+  }
+
+  brillo::Blob source_hash;
+  brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
+                                    operation.src_sha256_hash().end());
+  if (fd_utils::ReadAndHashExtents(
+          source_fd_, operation.src_extents(), block_size_, &source_hash) &&
+      source_hash == expected_source_hash) {
+    return source_fd_;
+  }
+  // We fall back to use the error corrected device if the hash of the raw
+  // device doesn't match or there was an error reading the source partition.
+  if (!OpenCurrentECCPartition()) {
+    // The following function call will return false since the source hash
+    // mismatches, but we still want to call it so it prints the appropriate
+    // log message.
+    ValidateSourceHash(source_hash, operation, source_fd_, error);
+    return nullptr;
+  }
+  LOG(WARNING) << "Source hash from RAW device mismatched: found "
+               << base::HexEncode(source_hash.data(), source_hash.size())
+               << ", expected "
+               << base::HexEncode(expected_source_hash.data(),
+                                  expected_source_hash.size());
+
+  if (fd_utils::ReadAndHashExtents(
+          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash) &&
+      ValidateSourceHash(source_hash, operation, source_ecc_fd_, error)) {
+    // At this point reading from the error corrected device worked, but
+    // reading from the raw device failed, so this is considered a recovered
+    // failure.
+    source_ecc_recovered_failures_++;
+    return source_ecc_fd_;
+  }
+  return nullptr;
+}
+
+bool PartitionWriter::OpenCurrentECCPartition() {
+  // No support for ECC for full payloads.
+  // Full payload should not have any opeartion that requires ECC partitions.
+  if (source_ecc_fd_)
+    return true;
+
+  if (source_ecc_open_failure_)
+    return false;
+
+#if USE_FEC
+  const PartitionUpdate& partition = partition_update_;
+  const InstallPlan::Partition& install_part = install_part_;
+  std::string path = install_part.source_path;
+  FileDescriptorPtr fd(new FecFileDescriptor());
+  if (!fd->Open(path.c_str(), O_RDONLY, 0)) {
+    PLOG(ERROR) << "Unable to open ECC source partition "
+                << partition.partition_name() << ", file " << path;
+    source_ecc_open_failure_ = true;
+    return false;
+  }
+  source_ecc_fd_ = fd;
+#else
+  // No support for ECC compiled.
+  source_ecc_open_failure_ = true;
+#endif  // USE_FEC
+
+  return !source_ecc_open_failure_;
+}
+
+int PartitionWriter::Close() {
+  int err = 0;
+  if (source_fd_ && !source_fd_->Close()) {
+    err = errno;
+    PLOG(ERROR) << "Error closing source partition";
+    if (!err)
+      err = 1;
+  }
+  source_fd_.reset();
+  source_path_.clear();
+
+  if (target_fd_ && !target_fd_->Close()) {
+    err = errno;
+    PLOG(ERROR) << "Error closing target partition";
+    if (!err)
+      err = 1;
+  }
+  target_fd_.reset();
+  target_path_.clear();
+
+  if (source_ecc_fd_ && !source_ecc_fd_->Close()) {
+    err = errno;
+    PLOG(ERROR) << "Error closing ECC source partition";
+    if (!err)
+      err = 1;
+  }
+  source_ecc_fd_.reset();
+  source_ecc_open_failure_ = false;
+  return -err;
+}
+
+std::unique_ptr<ExtentWriter> PartitionWriter::CreateBaseExtentWriter() {
+  return std::make_unique<DirectExtentWriter>();
+}
+
+bool PartitionWriter::Flush() {
+  return target_fd_->Flush();
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/partition_writer.h b/payload_consumer/partition_writer.h
new file mode 100644
index 0000000..1acbddc
--- /dev/null
+++ b/payload_consumer/partition_writer.h
@@ -0,0 +1,129 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef UPDATE_ENGINE_PARTITION_WRITER_H_
+#define UPDATE_ENGINE_PARTITION_WRITER_H_
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include <brillo/secure_blob.h>
+#include <gtest/gtest_prod.h>
+
+#include "update_engine/common/dynamic_partition_control_interface.h"
+#include "update_engine/payload_consumer/extent_writer.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/update_metadata.pb.h"
+namespace chromeos_update_engine {
+class PartitionWriter {
+ public:
+  PartitionWriter(const PartitionUpdate& partition_update,
+                  const InstallPlan::Partition& install_part,
+                  DynamicPartitionControlInterface* dynamic_control,
+                  size_t block_size,
+                  bool is_interactive);
+  virtual ~PartitionWriter();
+  static bool ValidateSourceHash(const brillo::Blob& calculated_hash,
+                                 const InstallOperation& operation,
+                                 const FileDescriptorPtr source_fd,
+                                 ErrorCode* error);
+
+  // Perform necessary initialization work before InstallOperation can be
+  // applied to this partition
+  [[nodiscard]] virtual bool Init(const InstallPlan* install_plan,
+                                  bool source_may_exist);
+
+  int Close();
+
+  // These perform a specific type of operation and return true on success.
+  // |error| will be set if source hash mismatch, otherwise |error| might not be
+  // set even if it fails.
+  [[nodiscard]] virtual bool PerformReplaceOperation(
+      const InstallOperation& operation, const void* data, size_t count);
+  [[nodiscard]] virtual bool PerformZeroOrDiscardOperation(
+      const InstallOperation& operation);
+
+  [[nodiscard]] virtual bool PerformSourceCopyOperation(
+      const InstallOperation& operation, ErrorCode* error);
+  [[nodiscard]] virtual bool PerformSourceBsdiffOperation(
+      const InstallOperation& operation,
+      ErrorCode* error,
+      const void* data,
+      size_t count);
+  [[nodiscard]] virtual bool PerformPuffDiffOperation(
+      const InstallOperation& operation,
+      ErrorCode* error,
+      const void* data,
+      size_t count);
+  [[nodiscard]] virtual bool Flush();
+
+ protected:
+  friend class PartitionWriterTest;
+  FRIEND_TEST(PartitionWriterTest, ChooseSourceFDTest);
+
+  bool OpenCurrentECCPartition();
+  // For a given operation, choose the source fd to be used (raw device or error
+  // correction device) based on the source operation hash.
+  // Returns nullptr if the source hash mismatch cannot be corrected, and set
+  // the |error| accordingly.
+  FileDescriptorPtr ChooseSourceFD(const InstallOperation& operation,
+                                   ErrorCode* error);
+  [[nodiscard]] virtual std::unique_ptr<ExtentWriter> CreateBaseExtentWriter();
+
+  const PartitionUpdate& partition_update_;
+  const InstallPlan::Partition& install_part_;
+  DynamicPartitionControlInterface* dynamic_control_;
+  // Path to source partition
+  std::string source_path_;
+  // Path to target partition
+  std::string target_path_;
+  FileDescriptorPtr source_fd_;
+  FileDescriptorPtr target_fd_;
+  const bool interactive_;
+  const size_t block_size_;
+  // File descriptor of the error corrected source partition. Only set while
+  // updating partition using a delta payload for a partition where error
+  // correction is available. The size of the error corrected device is smaller
+  // than the underlying raw device, since it doesn't include the error
+  // correction blocks.
+  FileDescriptorPtr source_ecc_fd_{nullptr};
+
+  // The total number of operations that failed source hash verification but
+  // passed after falling back to the error-corrected |source_ecc_fd_| device.
+  uint64_t source_ecc_recovered_failures_{0};
+
+  // Whether opening the current partition as an error-corrected device failed.
+  // Used to avoid re-opening the same source partition if it is not actually
+  // error corrected.
+  bool source_ecc_open_failure_{false};
+};
+
+namespace partition_writer {
+// Return a PartitionWriter instance for perform InstallOps on this partition.
+// Uses VABCPartitionWriter for Virtual AB Compression
+std::unique_ptr<PartitionWriter> CreatePartitionWriter(
+    const PartitionUpdate& partition_update,
+    const InstallPlan::Partition& install_part,
+    DynamicPartitionControlInterface* dynamic_control,
+    size_t block_size,
+    bool is_interactive,
+    bool is_dynamic_partition);
+}  // namespace partition_writer
+}  // namespace chromeos_update_engine
+
+#endif
diff --git a/payload_consumer/partition_writer_factory_android.cc b/payload_consumer/partition_writer_factory_android.cc
new file mode 100644
index 0000000..0c9f7ea
--- /dev/null
+++ b/payload_consumer/partition_writer_factory_android.cc
@@ -0,0 +1,54 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <cstddef>
+#include <memory>
+
+#include <base/logging.h>
+
+#include "update_engine/payload_consumer/vabc_partition_writer.h"
+
+namespace chromeos_update_engine::partition_writer {
+
+std::unique_ptr<PartitionWriter> CreatePartitionWriter(
+    const PartitionUpdate& partition_update,
+    const InstallPlan::Partition& install_part,
+    DynamicPartitionControlInterface* dynamic_control,
+    size_t block_size,
+    bool is_interactive,
+    bool is_dynamic_partition) {
+  if (dynamic_control &&
+      dynamic_control->GetVirtualAbCompressionFeatureFlag().IsEnabled() &&
+      is_dynamic_partition) {
+    LOG(INFO)
+        << "Virtual AB Compression Enabled, using VABC Partition Writer for `"
+        << install_part.name << '`';
+    return std::make_unique<VABCPartitionWriter>(partition_update,
+                                                 install_part,
+                                                 dynamic_control,
+                                                 block_size,
+                                                 is_interactive);
+  } else {
+    LOG(INFO) << "Virtual AB Compression disabled, using Partition Writer for `"
+              << install_part.name << '`';
+    return std::make_unique<PartitionWriter>(partition_update,
+                                             install_part,
+                                             dynamic_control,
+                                             block_size,
+                                             is_interactive);
+  }
+}
+}  // namespace chromeos_update_engine::partition_writer
diff --git a/payload_consumer/partition_writer_factory_chromeos.cc b/payload_consumer/partition_writer_factory_chromeos.cc
new file mode 100644
index 0000000..609f043
--- /dev/null
+++ b/payload_consumer/partition_writer_factory_chromeos.cc
@@ -0,0 +1,38 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <cstddef>
+#include <memory>
+
+#include <base/logging.h>
+
+#include "update_engine/payload_consumer/partition_writer.h"
+
+namespace chromeos_update_engine::partition_writer {
+std::unique_ptr<PartitionWriter> CreatePartitionWriter(
+    const PartitionUpdate& partition_update,
+    const InstallPlan::Partition& install_part,
+    DynamicPartitionControlInterface* dynamic_control,
+    size_t block_size,
+    bool is_interactive,
+    bool is_dynamic_partition) {
+  return std::make_unique<PartitionWriter>(partition_update,
+                                           install_part,
+                                           dynamic_control,
+                                           block_size,
+                                           is_interactive);
+}
+}  // namespace chromeos_update_engine::partition_writer
diff --git a/payload_consumer/partition_writer_unittest.cc b/payload_consumer/partition_writer_unittest.cc
new file mode 100644
index 0000000..1ef4783
--- /dev/null
+++ b/payload_consumer/partition_writer_unittest.cc
@@ -0,0 +1,203 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <memory>
+#include <vector>
+
+#include <brillo/secure_blob.h>
+#include <gtest/gtest.h>
+
+#include "update_engine/common/dynamic_partition_control_stub.h"
+#include "update_engine/common/error_code.h"
+#include "update_engine/common/fake_prefs.h"
+#include "update_engine/common/hash_calculator.h"
+#include "update_engine/common/test_utils.h"
+#include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/delta_performer.h"
+#include "update_engine/payload_consumer/extent_reader.h"
+#include "update_engine/payload_consumer/extent_writer.h"
+#include "update_engine/payload_consumer/fake_file_descriptor.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_generator/annotated_operation.h"
+#include "update_engine/payload_generator/delta_diff_generator.h"
+#include "update_engine/payload_generator/extent_ranges.h"
+#include "update_engine/payload_generator/payload_file.h"
+#include "update_engine/payload_generator/payload_generation_config.h"
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+
+class PartitionWriterTest : public testing::Test {
+ public:
+  // Helper function to pretend that the ECC file descriptor was already opened.
+  // Returns a pointer to the created file descriptor.
+  FakeFileDescriptor* SetFakeECCFile(size_t size) {
+    EXPECT_FALSE(writer_.source_ecc_fd_) << "source_ecc_fd_ already open.";
+    FakeFileDescriptor* ret = new FakeFileDescriptor();
+    fake_ecc_fd_.reset(ret);
+    // Call open to simulate it was already opened.
+    ret->Open("", 0);
+    ret->SetFileSize(size);
+    writer_.source_ecc_fd_ = fake_ecc_fd_;
+    return ret;
+  }
+
+  uint64_t GetSourceEccRecoveredFailures() const {
+    return writer_.source_ecc_recovered_failures_;
+  }
+
+  AnnotatedOperation GenerateSourceCopyOp(const brillo::Blob& copied_data,
+                                          bool add_hash,
+                                          PartitionConfig* old_part = nullptr) {
+    PayloadGenerationConfig config;
+    const uint64_t kDefaultBlockSize = config.block_size;
+    EXPECT_EQ(0U, copied_data.size() % kDefaultBlockSize);
+    uint64_t num_blocks = copied_data.size() / kDefaultBlockSize;
+    AnnotatedOperation aop;
+    *(aop.op.add_src_extents()) = ExtentForRange(0, num_blocks);
+    *(aop.op.add_dst_extents()) = ExtentForRange(0, num_blocks);
+    aop.op.set_type(InstallOperation::SOURCE_COPY);
+    brillo::Blob src_hash;
+    EXPECT_TRUE(HashCalculator::RawHashOfData(copied_data, &src_hash));
+    if (add_hash)
+      aop.op.set_src_sha256_hash(src_hash.data(), src_hash.size());
+
+    return aop;
+  }
+
+  brillo::Blob PerformSourceCopyOp(const InstallOperation& op,
+                                   const brillo::Blob blob_data) {
+    ScopedTempFile source_partition("Blob-XXXXXX");
+    DirectExtentWriter extent_writer;
+    FileDescriptorPtr fd(new EintrSafeFileDescriptor());
+    EXPECT_TRUE(fd->Open(source_partition.path().c_str(), O_RDWR));
+    EXPECT_TRUE(extent_writer.Init(fd, op.src_extents(), kBlockSize));
+    EXPECT_TRUE(extent_writer.Write(blob_data.data(), blob_data.size()));
+
+    ScopedTempFile target_partition("Blob-XXXXXX");
+
+    install_part_.source_path = source_partition.path();
+    install_part_.target_path = target_partition.path();
+    install_part_.source_size = blob_data.size();
+    install_part_.target_size = blob_data.size();
+
+    ErrorCode error;
+    EXPECT_TRUE(writer_.Init(&install_plan_, true));
+    EXPECT_TRUE(writer_.PerformSourceCopyOperation(op, &error));
+
+    brillo::Blob output_data;
+    EXPECT_TRUE(utils::ReadFile(target_partition.path(), &output_data));
+    return output_data;
+  }
+
+  FakePrefs prefs_{};
+  InstallPlan install_plan_{};
+  InstallPlan::Payload payload_{};
+  DynamicPartitionControlStub dynamic_control_{};
+  FileDescriptorPtr fake_ecc_fd_{};
+  DeltaArchiveManifest manifest_{};
+  PartitionUpdate partition_update_{};
+  InstallPlan::Partition install_part_{};
+  PartitionWriter writer_{
+      partition_update_, install_part_, &dynamic_control_, kBlockSize, false};
+};
+// Test that the error-corrected file descriptor is used to read a partition
+// when no hash is available for SOURCE_COPY but it falls back to the normal
+// file descriptor when the size of the error corrected one is too small.
+TEST_F(PartitionWriterTest, ErrorCorrectionSourceCopyWhenNoHashFallbackTest) {
+  constexpr size_t kCopyOperationSize = 4 * 4096;
+  ScopedTempFile source("Source-XXXXXX");
+  // Setup the source path with the right expected data.
+  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
+  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), expected_data));
+
+  // Setup the fec file descriptor as the fake stream, with smaller data than
+  // the expected.
+  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize / 2);
+
+  PartitionConfig old_part(kPartitionNameRoot);
+  old_part.path = source.path();
+  old_part.size = expected_data.size();
+
+  // The payload operation doesn't include an operation hash.
+  auto source_copy_op = GenerateSourceCopyOp(expected_data, false, &old_part);
+
+  auto output_data = PerformSourceCopyOp(source_copy_op.op, expected_data);
+  ASSERT_EQ(output_data, expected_data);
+
+  // Verify that the fake_fec was attempted to be used. Since the file
+  // descriptor is shorter it can actually do more than one read to realize it
+  // reached the EOF.
+  EXPECT_LE(1U, fake_fec->GetReadOps().size());
+  // This fallback doesn't count as an error-corrected operation since the
+  // operation hash was not available.
+  EXPECT_EQ(0U, GetSourceEccRecoveredFailures());
+}
+
+// Test that the error-corrected file descriptor is used to read the partition
+// since the source partition doesn't match the operation hash.
+TEST_F(PartitionWriterTest, ErrorCorrectionSourceCopyFallbackTest) {
+  constexpr size_t kCopyOperationSize = 4 * 4096;
+  // Write invalid data to the source image, which doesn't match the expected
+  // hash.
+  brillo::Blob invalid_data(kCopyOperationSize, 0x55);
+
+  // Setup the fec file descriptor as the fake stream, which matches
+  // |expected_data|.
+  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize);
+  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
+
+  auto source_copy_op = GenerateSourceCopyOp(expected_data, true);
+  auto output_data = PerformSourceCopyOp(source_copy_op.op, invalid_data);
+  ASSERT_EQ(output_data, expected_data);
+
+  // Verify that the fake_fec was actually used.
+  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
+  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
+}
+
+TEST_F(PartitionWriterTest, ChooseSourceFDTest) {
+  constexpr size_t kSourceSize = 4 * 4096;
+  ScopedTempFile source("Source-XXXXXX");
+  // Write invalid data to the source image, which doesn't match the expected
+  // hash.
+  brillo::Blob invalid_data(kSourceSize, 0x55);
+  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), invalid_data));
+
+  writer_.source_fd_ = std::make_shared<EintrSafeFileDescriptor>();
+  writer_.source_fd_->Open(source.path().c_str(), O_RDONLY);
+
+  // Setup the fec file descriptor as the fake stream, which matches
+  // |expected_data|.
+  FakeFileDescriptor* fake_fec = SetFakeECCFile(kSourceSize);
+  brillo::Blob expected_data = FakeFileDescriptorData(kSourceSize);
+
+  InstallOperation op;
+  *(op.add_src_extents()) = ExtentForRange(0, kSourceSize / 4096);
+  brillo::Blob src_hash;
+  EXPECT_TRUE(HashCalculator::RawHashOfData(expected_data, &src_hash));
+  op.set_src_sha256_hash(src_hash.data(), src_hash.size());
+
+  ErrorCode error = ErrorCode::kSuccess;
+  EXPECT_EQ(writer_.source_ecc_fd_, writer_.ChooseSourceFD(op, &error));
+  EXPECT_EQ(ErrorCode::kSuccess, error);
+  // Verify that the fake_fec was actually used.
+  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
+  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/snapshot_extent_writer.cc b/payload_consumer/snapshot_extent_writer.cc
new file mode 100644
index 0000000..882d1f7
--- /dev/null
+++ b/payload_consumer/snapshot_extent_writer.cc
@@ -0,0 +1,54 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+#include "update_engine/payload_consumer/snapshot_extent_writer.h"
+
+#include <algorithm>
+#include <cstdint>
+
+#include <libsnapshot/cow_writer.h>
+
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+SnapshotExtentWriter::SnapshotExtentWriter(
+    android::snapshot::ICowWriter* cow_writer)
+    : cow_writer_(cow_writer) {
+  CHECK_NE(cow_writer, nullptr);
+}
+
+SnapshotExtentWriter::~SnapshotExtentWriter() {
+  CHECK(buffer_.empty());
+}
+
+bool SnapshotExtentWriter::Init(
+    FileDescriptorPtr /*fd*/,
+    const google::protobuf::RepeatedPtrField<Extent>& extents,
+    uint32_t /*block_size*/) {
+  // TODO(zhangkelvin) Implement this
+  return true;
+}
+
+// Returns true on success.
+// This will construct a COW_REPLACE operation and forward it to CowWriter. It
+// is important that caller does not perform SOURCE_COPY operation on this
+// class, otherwise raw data will be stored. Caller should find ways to use
+// COW_COPY whenever possible.
+bool SnapshotExtentWriter::Write(const void* bytes, size_t count) {
+  // TODO(zhangkelvin) Implement this
+  return true;
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/snapshot_extent_writer.h b/payload_consumer/snapshot_extent_writer.h
new file mode 100644
index 0000000..43a8317
--- /dev/null
+++ b/payload_consumer/snapshot_extent_writer.h
@@ -0,0 +1,47 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+#include <cstdint>
+#include <vector>
+
+#include <libsnapshot/cow_writer.h>
+
+#include "update_engine/payload_consumer/extent_writer.h"
+
+namespace chromeos_update_engine {
+class SnapshotExtentWriter : public chromeos_update_engine::ExtentWriter {
+ public:
+  explicit SnapshotExtentWriter(android::snapshot::ICowWriter* cow_writer);
+  ~SnapshotExtentWriter();
+  // Returns true on success.
+  bool Init(FileDescriptorPtr fd,
+            const google::protobuf::RepeatedPtrField<Extent>& extents,
+            uint32_t block_size) override;
+  // Returns true on success.
+  // This will construct a COW_REPLACE operation and forward it to CowWriter. It
+  // is important that caller does not perform SOURCE_COPY operation on this
+  // class, otherwise raw data will be stored. Caller should find ways to use
+  // COW_COPY whenever possible.
+  bool Write(const void* bytes, size_t count) override;
+
+ private:
+  // It's a non-owning pointer, because PartitionWriter owns the CowWruter. This
+  // allows us to use a single instance of CowWriter for all operations applied
+  // to the same partition.
+  [[maybe_unused]] android::snapshot::ICowWriter* cow_writer_;
+  [[maybe_unused]] google::protobuf::RepeatedPtrField<Extent> extents_;
+  [[maybe_unused]] std::vector<uint8_t> buffer_;
+};
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/vabc_partition_writer.cc b/payload_consumer/vabc_partition_writer.cc
new file mode 100644
index 0000000..1578f29
--- /dev/null
+++ b/payload_consumer/vabc_partition_writer.cc
@@ -0,0 +1,106 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/payload_consumer/vabc_partition_writer.h"
+
+#include <memory>
+#include <vector>
+
+#include <libsnapshot/cow_writer.h>
+
+#include "update_engine/common/cow_operation_convert.h"
+#include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/extent_writer.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_consumer/partition_writer.h"
+#include "update_engine/payload_consumer/snapshot_extent_writer.h"
+
+namespace chromeos_update_engine {
+bool VABCPartitionWriter::Init(const InstallPlan* install_plan,
+                               bool source_may_exist) {
+  TEST_AND_RETURN_FALSE(install_plan != nullptr);
+  TEST_AND_RETURN_FALSE(PartitionWriter::Init(install_plan, source_may_exist));
+  cow_writer_ = dynamic_control_->OpenCowWriter(
+      install_part_.name, install_part_.source_path, install_plan->is_resume);
+  TEST_AND_RETURN_FALSE(cow_writer_ != nullptr);
+
+  // TODO(zhangkelvin) Emit a label before writing SOURCE_COPY. When resuming,
+  // use pref or CowWriter::GetLastLabel to determine if the SOURCE_COPY ops are
+  // written. No need to handle SOURCE_COPY operations when resuming.
+
+  // ===== Resume case handling code goes here ====
+
+  // ==============================================
+
+  // TODO(zhangkelvin) Rewrite this in C++20 coroutine once that's available.
+  auto converted = ConvertToCowOperations(partition_update_.operations(),
+                                          partition_update_.merge_operations());
+  std::vector<uint8_t> buffer(block_size_);
+  for (const auto& cow_op : converted) {
+    switch (cow_op.op) {
+      case CowOperation::CowCopy:
+        TEST_AND_RETURN_FALSE(
+            cow_writer_->AddCopy(cow_op.dst_block, cow_op.src_block));
+        break;
+      case CowOperation::CowReplace:
+        ssize_t bytes_read = 0;
+        TEST_AND_RETURN_FALSE(utils::PReadAll(source_fd_,
+                                              buffer.data(),
+                                              block_size_,
+                                              cow_op.src_block * block_size_,
+                                              &bytes_read));
+        if (bytes_read <= 0 || static_cast<size_t>(bytes_read) != block_size_) {
+          LOG(ERROR) << "source_fd->Read failed: " << bytes_read;
+          return false;
+        }
+        TEST_AND_RETURN_FALSE(cow_writer_->AddRawBlocks(
+            cow_op.dst_block, buffer.data(), block_size_));
+        break;
+    }
+  }
+  return true;
+}
+
+std::unique_ptr<ExtentWriter> VABCPartitionWriter::CreateBaseExtentWriter() {
+  return std::make_unique<SnapshotExtentWriter>(cow_writer_.get());
+}
+
+[[nodiscard]] bool VABCPartitionWriter::PerformZeroOrDiscardOperation(
+    const InstallOperation& operation) {
+  for (const auto& extent : operation.dst_extents()) {
+    TEST_AND_RETURN_FALSE(
+        cow_writer_->AddZeroBlocks(extent.start_block(), extent.num_blocks()));
+  }
+  return true;
+}
+
+[[nodiscard]] bool VABCPartitionWriter::PerformSourceCopyOperation(
+    const InstallOperation& operation, ErrorCode* error) {
+  // TODO(zhangkelvin) Probably just ignore SOURCE_COPY? They should be taken
+  // care of during Init();
+  return true;
+}
+
+bool VABCPartitionWriter::Flush() {
+  // No need to do anything, as CowWriter automatically flushes every OP added.
+  return true;
+}
+
+VABCPartitionWriter::~VABCPartitionWriter() {
+  cow_writer_->Finalize();
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/vabc_partition_writer.h b/payload_consumer/vabc_partition_writer.h
new file mode 100644
index 0000000..d65ac4a
--- /dev/null
+++ b/payload_consumer/vabc_partition_writer.h
@@ -0,0 +1,52 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef UPDATE_ENGINE_VABC_PARTITION_WRITER_H_
+#define UPDATE_ENGINE_VABC_PARTITION_WRITER_H_
+
+#include <memory>
+
+#include <libsnapshot/snapshot_writer.h>
+
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_consumer/partition_writer.h"
+
+namespace chromeos_update_engine {
+class VABCPartitionWriter final : public PartitionWriter {
+ public:
+  using PartitionWriter::PartitionWriter;
+  [[nodiscard]] bool Init(const InstallPlan* install_plan,
+                          bool source_may_exist) override;
+  ~VABCPartitionWriter() override;
+
+  [[nodiscard]] std::unique_ptr<ExtentWriter> CreateBaseExtentWriter() override;
+
+  // Only ZERO and SOURCE_COPY InstallOperations are treated special by VABC
+  // Partition Writer. These operations correspond to COW_ZERO and COW_COPY. All
+  // other operations just get converted to COW_REPLACE.
+  [[nodiscard]] bool PerformZeroOrDiscardOperation(
+      const InstallOperation& operation) override;
+  [[nodiscard]] bool PerformSourceCopyOperation(
+      const InstallOperation& operation, ErrorCode* error) override;
+  [[nodiscard]] bool Flush() override;
+
+ private:
+  std::unique_ptr<android::snapshot::ISnapshotWriter> cow_writer_;
+};
+
+}  // namespace chromeos_update_engine
+
+#endif
diff --git a/payload_generator/extent_utils.h b/payload_generator/extent_utils.h
index 9763b1f..f870b29 100644
--- a/payload_generator/extent_utils.h
+++ b/payload_generator/extent_utils.h
@@ -20,6 +20,8 @@
 #include <string>
 #include <vector>
 
+#include <base/logging.h>
+
 #include "update_engine/payload_consumer/payload_constants.h"
 #include "update_engine/update_metadata.pb.h"
 
@@ -83,6 +85,43 @@
 
 bool operator==(const Extent& a, const Extent& b);
 
+// TODO(zhangkelvin) This is ugly. Rewrite using C++20's coroutine once
+// that's available. Unfortunately with C++17 this is the best I could do.
+
+// An iterator that takes a sequence of extents, and iterate over blocks
+// inside this sequence of extents.
+// Example usage:
+
+// BlockIterator it1{src_extents};
+// while(!it1.is_end()) {
+//    auto block = *it1;
+//    Do stuff with |block|
+// }
+struct BlockIterator {
+  explicit BlockIterator(
+      const google::protobuf::RepeatedPtrField<Extent>& src_extents)
+      : src_extents_(src_extents) {}
+
+  BlockIterator& operator++() {
+    CHECK_LT(cur_extent_, src_extents_.size());
+    block_offset_++;
+    if (block_offset_ >= src_extents_[cur_extent_].num_blocks()) {
+      cur_extent_++;
+      block_offset_ = 0;
+    }
+    return *this;
+  }
+
+  [[nodiscard]] bool is_end() { return cur_extent_ >= src_extents_.size(); }
+  [[nodiscard]] uint64_t operator*() {
+    return src_extents_[cur_extent_].start_block() + block_offset_;
+  }
+
+  const google::protobuf::RepeatedPtrField<Extent>& src_extents_;
+  int cur_extent_ = 0;
+  size_t block_offset_ = 0;
+};
+
 }  // namespace chromeos_update_engine
 
 #endif  // UPDATE_ENGINE_PAYLOAD_GENERATOR_EXTENT_UTILS_H_
diff --git a/scripts/paycheck.py b/scripts/paycheck.py
index 8eb0033..cb1713f 100755
--- a/scripts/paycheck.py
+++ b/scripts/paycheck.py
@@ -27,6 +27,7 @@
 import sys
 import tempfile
 
+# pylint: disable=redefined-builtin
 from six.moves import zip
 from update_payload import error
 
diff --git a/scripts/paycheck_unittest.py b/scripts/paycheck_unittest.py
index e54a3c0..a90d269 100755
--- a/scripts/paycheck_unittest.py
+++ b/scripts/paycheck_unittest.py
@@ -34,7 +34,7 @@
 # Previously test_paycheck.sh. Run with update_payload ebuild.
 
 # Disable check for function names to avoid errors based on old code
-# pylint: disable-msg=invalid-name
+# pylint: disable=invalid-name
 
 import filecmp
 import os
diff --git a/scripts/update_device.py b/scripts/update_device.py
index 1cd4b6a..756d443 100755
--- a/scripts/update_device.py
+++ b/scripts/update_device.py
@@ -17,6 +17,7 @@
 
 """Send an A/B update to an Android device over adb."""
 
+from __future__ import print_function
 from __future__ import absolute_import
 
 import argparse
@@ -305,6 +306,7 @@
     logging.info('Server Terminated')
 
   def StopServer(self):
+    self._httpd.shutdown()
     self._httpd.socket.close()
 
 
@@ -318,13 +320,13 @@
   """Return the command to run to start the update in the Android device."""
   ota = AndroidOTAPackage(ota_filename, secondary)
   headers = ota.properties
-  headers += 'USER_AGENT=Dalvik (something, something)\n'
-  headers += 'NETWORK_ID=0\n'
-  headers += extra_headers
+  headers += b'USER_AGENT=Dalvik (something, something)\n'
+  headers += b'NETWORK_ID=0\n'
+  headers += extra_headers.encode()
 
   return ['update_engine_client', '--update', '--follow',
           '--payload=%s' % payload_url, '--offset=%d' % ota.offset,
-          '--size=%d' % ota.size, '--headers="%s"' % headers]
+          '--size=%d' % ota.size, '--headers="%s"' % headers.decode()]
 
 
 def OmahaUpdateCommand(omaha_url):
diff --git a/scripts/update_payload/checker.py b/scripts/update_payload/checker.py
index 99a5c62..56a9370 100644
--- a/scripts/update_payload/checker.py
+++ b/scripts/update_payload/checker.py
@@ -35,6 +35,7 @@
 import os
 import subprocess
 
+# pylint: disable=redefined-builtin
 from six.moves import range
 
 from update_payload import common
diff --git a/scripts/update_payload/update_metadata_pb2.py b/scripts/update_payload/update_metadata_pb2.py
index 841cd22..ea4bc59 100644
--- a/scripts/update_payload/update_metadata_pb2.py
+++ b/scripts/update_payload/update_metadata_pb2.py
@@ -18,7 +18,7 @@
   package='chromeos_update_engine',
   syntax='proto2',
   serialized_options=b'H\003',
-  serialized_pb=b'\n\x15update_metadata.proto\x12\x16\x63hromeos_update_engine\"1\n\x06\x45xtent\x12\x13\n\x0bstart_block\x18\x01 \x01(\x04\x12\x12\n\nnum_blocks\x18\x02 \x01(\x04\"\x9f\x01\n\nSignatures\x12@\n\nsignatures\x18\x01 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x1aO\n\tSignature\x12\x13\n\x07version\x18\x01 \x01(\rB\x02\x18\x01\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\x1f\n\x17unpadded_signature_size\x18\x03 \x01(\x07\"+\n\rPartitionInfo\x12\x0c\n\x04size\x18\x01 \x01(\x04\x12\x0c\n\x04hash\x18\x02 \x01(\x0c\"w\n\tImageInfo\x12\r\n\x05\x62oard\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\x12\x0f\n\x07\x63hannel\x18\x03 \x01(\t\x12\x0f\n\x07version\x18\x04 \x01(\t\x12\x15\n\rbuild_channel\x18\x05 \x01(\t\x12\x15\n\rbuild_version\x18\x06 \x01(\t\"\xee\x03\n\x10InstallOperation\x12;\n\x04type\x18\x01 \x02(\x0e\x32-.chromeos_update_engine.InstallOperation.Type\x12\x13\n\x0b\x64\x61ta_offset\x18\x02 \x01(\x04\x12\x13\n\x0b\x64\x61ta_length\x18\x03 \x01(\x04\x12\x33\n\x0bsrc_extents\x18\x04 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\nsrc_length\x18\x05 \x01(\x04\x12\x33\n\x0b\x64st_extents\x18\x06 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\ndst_length\x18\x07 \x01(\x04\x12\x18\n\x10\x64\x61ta_sha256_hash\x18\x08 \x01(\x0c\x12\x17\n\x0fsrc_sha256_hash\x18\t \x01(\x0c\"\xad\x01\n\x04Type\x12\x0b\n\x07REPLACE\x10\x00\x12\x0e\n\nREPLACE_BZ\x10\x01\x12\x0c\n\x04MOVE\x10\x02\x1a\x02\x08\x01\x12\x0e\n\x06\x42SDIFF\x10\x03\x1a\x02\x08\x01\x12\x0f\n\x0bSOURCE_COPY\x10\x04\x12\x11\n\rSOURCE_BSDIFF\x10\x05\x12\x0e\n\nREPLACE_XZ\x10\x08\x12\x08\n\x04ZERO\x10\x06\x12\x0b\n\x07\x44ISCARD\x10\x07\x12\x11\n\rBROTLI_BSDIFF\x10\n\x12\x0c\n\x08PUFFDIFF\x10\t\"\xe8\x05\n\x0fPartitionUpdate\x12\x16\n\x0epartition_name\x18\x01 \x02(\t\x12\x17\n\x0frun_postinstall\x18\x02 \x01(\x08\x12\x18\n\x10postinstall_path\x18\x03 \x01(\t\x12\x17\n\x0f\x66ilesystem_type\x18\x04 \x01(\t\x12M\n\x17new_partition_signature\x18\x05 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x12\x41\n\x12old_partition_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12\x41\n\x12new_partition_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12<\n\noperations\x18\x08 \x03(\x0b\x32(.chromeos_update_engine.InstallOperation\x12\x1c\n\x14postinstall_optional\x18\t \x01(\x08\x12=\n\x15hash_tree_data_extent\x18\n \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x38\n\x10hash_tree_extent\x18\x0b \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x1b\n\x13hash_tree_algorithm\x18\x0c \x01(\t\x12\x16\n\x0ehash_tree_salt\x18\r \x01(\x0c\x12\x37\n\x0f\x66\x65\x63_data_extent\x18\x0e \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x32\n\nfec_extent\x18\x0f \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x14\n\tfec_roots\x18\x10 \x01(\r:\x01\x32\x12\x0f\n\x07version\x18\x11 \x01(\t\"L\n\x15\x44ynamicPartitionGroup\x12\x0c\n\x04name\x18\x01 \x02(\t\x12\x0c\n\x04size\x18\x02 \x01(\x04\x12\x17\n\x0fpartition_names\x18\x03 \x03(\t\"s\n\x18\x44ynamicPartitionMetadata\x12=\n\x06groups\x18\x01 \x03(\x0b\x32-.chromeos_update_engine.DynamicPartitionGroup\x12\x18\n\x10snapshot_enabled\x18\x02 \x01(\x08\"\xe1\x06\n\x14\x44\x65ltaArchiveManifest\x12H\n\x12install_operations\x18\x01 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12O\n\x19kernel_install_operations\x18\x02 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12\x18\n\nblock_size\x18\x03 \x01(\r:\x04\x34\x30\x39\x36\x12\x19\n\x11signatures_offset\x18\x04 \x01(\x04\x12\x17\n\x0fsignatures_size\x18\x05 \x01(\x04\x12\x42\n\x0fold_kernel_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_kernel_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fold_rootfs_info\x18\x08 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_rootfs_info\x18\t \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x39\n\x0eold_image_info\x18\n \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x39\n\x0enew_image_info\x18\x0b \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x18\n\rminor_version\x18\x0c \x01(\r:\x01\x30\x12;\n\npartitions\x18\r \x03(\x0b\x32\'.chromeos_update_engine.PartitionUpdate\x12\x15\n\rmax_timestamp\x18\x0e \x01(\x03\x12T\n\x1a\x64ynamic_partition_metadata\x18\x0f \x01(\x0b\x32\x30.chromeos_update_engine.DynamicPartitionMetadata\x12\x16\n\x0epartial_update\x18\x10 \x01(\x08\x42\x02H\x03'
+  serialized_pb=b'\n\x15update_metadata.proto\x12\x16\x63hromeos_update_engine\"1\n\x06\x45xtent\x12\x13\n\x0bstart_block\x18\x01 \x01(\x04\x12\x12\n\nnum_blocks\x18\x02 \x01(\x04\"\x9f\x01\n\nSignatures\x12@\n\nsignatures\x18\x01 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x1aO\n\tSignature\x12\x13\n\x07version\x18\x01 \x01(\rB\x02\x18\x01\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\x1f\n\x17unpadded_signature_size\x18\x03 \x01(\x07\"+\n\rPartitionInfo\x12\x0c\n\x04size\x18\x01 \x01(\x04\x12\x0c\n\x04hash\x18\x02 \x01(\x0c\"w\n\tImageInfo\x12\r\n\x05\x62oard\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\x12\x0f\n\x07\x63hannel\x18\x03 \x01(\t\x12\x0f\n\x07version\x18\x04 \x01(\t\x12\x15\n\rbuild_channel\x18\x05 \x01(\t\x12\x15\n\rbuild_version\x18\x06 \x01(\t\"\xee\x03\n\x10InstallOperation\x12;\n\x04type\x18\x01 \x02(\x0e\x32-.chromeos_update_engine.InstallOperation.Type\x12\x13\n\x0b\x64\x61ta_offset\x18\x02 \x01(\x04\x12\x13\n\x0b\x64\x61ta_length\x18\x03 \x01(\x04\x12\x33\n\x0bsrc_extents\x18\x04 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\nsrc_length\x18\x05 \x01(\x04\x12\x33\n\x0b\x64st_extents\x18\x06 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\ndst_length\x18\x07 \x01(\x04\x12\x18\n\x10\x64\x61ta_sha256_hash\x18\x08 \x01(\x0c\x12\x17\n\x0fsrc_sha256_hash\x18\t \x01(\x0c\"\xad\x01\n\x04Type\x12\x0b\n\x07REPLACE\x10\x00\x12\x0e\n\nREPLACE_BZ\x10\x01\x12\x0c\n\x04MOVE\x10\x02\x1a\x02\x08\x01\x12\x0e\n\x06\x42SDIFF\x10\x03\x1a\x02\x08\x01\x12\x0f\n\x0bSOURCE_COPY\x10\x04\x12\x11\n\rSOURCE_BSDIFF\x10\x05\x12\x0e\n\nREPLACE_XZ\x10\x08\x12\x08\n\x04ZERO\x10\x06\x12\x0b\n\x07\x44ISCARD\x10\x07\x12\x11\n\rBROTLI_BSDIFF\x10\n\x12\x0c\n\x08PUFFDIFF\x10\t\"\xcf\x01\n\x11\x43owMergeOperation\x12<\n\x04type\x18\x01 \x01(\x0e\x32..chromeos_update_engine.CowMergeOperation.Type\x12\x32\n\nsrc_extent\x18\x02 \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x32\n\ndst_extent\x18\x03 \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\"\x14\n\x04Type\x12\x0c\n\x08\x43OW_COPY\x10\x00\"\xc8\x06\n\x0fPartitionUpdate\x12\x16\n\x0epartition_name\x18\x01 \x02(\t\x12\x17\n\x0frun_postinstall\x18\x02 \x01(\x08\x12\x18\n\x10postinstall_path\x18\x03 \x01(\t\x12\x17\n\x0f\x66ilesystem_type\x18\x04 \x01(\t\x12M\n\x17new_partition_signature\x18\x05 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x12\x41\n\x12old_partition_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12\x41\n\x12new_partition_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12<\n\noperations\x18\x08 \x03(\x0b\x32(.chromeos_update_engine.InstallOperation\x12\x1c\n\x14postinstall_optional\x18\t \x01(\x08\x12=\n\x15hash_tree_data_extent\x18\n \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x38\n\x10hash_tree_extent\x18\x0b \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x1b\n\x13hash_tree_algorithm\x18\x0c \x01(\t\x12\x16\n\x0ehash_tree_salt\x18\r \x01(\x0c\x12\x37\n\x0f\x66\x65\x63_data_extent\x18\x0e \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x32\n\nfec_extent\x18\x0f \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x14\n\tfec_roots\x18\x10 \x01(\r:\x01\x32\x12\x0f\n\x07version\x18\x11 \x01(\t\x12\x43\n\x10merge_operations\x18\x12 \x03(\x0b\x32).chromeos_update_engine.CowMergeOperation\x12\x19\n\x11\x65stimate_cow_size\x18\x13 \x01(\x04\"L\n\x15\x44ynamicPartitionGroup\x12\x0c\n\x04name\x18\x01 \x02(\t\x12\x0c\n\x04size\x18\x02 \x01(\x04\x12\x17\n\x0fpartition_names\x18\x03 \x03(\t\"s\n\x18\x44ynamicPartitionMetadata\x12=\n\x06groups\x18\x01 \x03(\x0b\x32-.chromeos_update_engine.DynamicPartitionGroup\x12\x18\n\x10snapshot_enabled\x18\x02 \x01(\x08\"\xe1\x06\n\x14\x44\x65ltaArchiveManifest\x12H\n\x12install_operations\x18\x01 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12O\n\x19kernel_install_operations\x18\x02 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12\x18\n\nblock_size\x18\x03 \x01(\r:\x04\x34\x30\x39\x36\x12\x19\n\x11signatures_offset\x18\x04 \x01(\x04\x12\x17\n\x0fsignatures_size\x18\x05 \x01(\x04\x12\x42\n\x0fold_kernel_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_kernel_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fold_rootfs_info\x18\x08 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_rootfs_info\x18\t \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x39\n\x0eold_image_info\x18\n \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x39\n\x0enew_image_info\x18\x0b \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x18\n\rminor_version\x18\x0c \x01(\r:\x01\x30\x12;\n\npartitions\x18\r \x03(\x0b\x32\'.chromeos_update_engine.PartitionUpdate\x12\x15\n\rmax_timestamp\x18\x0e \x01(\x03\x12T\n\x1a\x64ynamic_partition_metadata\x18\x0f \x01(\x0b\x32\x30.chromeos_update_engine.DynamicPartitionMetadata\x12\x16\n\x0epartial_update\x18\x10 \x01(\x08\x42\x02H\x03'
 )
 
 
@@ -81,6 +81,24 @@
 )
 _sym_db.RegisterEnumDescriptor(_INSTALLOPERATION_TYPE)
 
+_COWMERGEOPERATION_TYPE = _descriptor.EnumDescriptor(
+  name='Type',
+  full_name='chromeos_update_engine.CowMergeOperation.Type',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='COW_COPY', index=0, number=0,
+      serialized_options=None,
+      type=None),
+  ],
+  containing_type=None,
+  serialized_options=None,
+  serialized_start=1113,
+  serialized_end=1133,
+)
+_sym_db.RegisterEnumDescriptor(_COWMERGEOPERATION_TYPE)
+
 
 _EXTENT = _descriptor.Descriptor(
   name='Extent',
@@ -387,6 +405,52 @@
 )
 
 
+_COWMERGEOPERATION = _descriptor.Descriptor(
+  name='CowMergeOperation',
+  full_name='chromeos_update_engine.CowMergeOperation',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='type', full_name='chromeos_update_engine.CowMergeOperation.type', index=0,
+      number=1, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='src_extent', full_name='chromeos_update_engine.CowMergeOperation.src_extent', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='dst_extent', full_name='chromeos_update_engine.CowMergeOperation.dst_extent', index=2,
+      number=3, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+    _COWMERGEOPERATION_TYPE,
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto2',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=926,
+  serialized_end=1133,
+)
+
+
 _PARTITIONUPDATE = _descriptor.Descriptor(
   name='PartitionUpdate',
   full_name='chromeos_update_engine.PartitionUpdate',
@@ -513,6 +577,20 @@
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='merge_operations', full_name='chromeos_update_engine.PartitionUpdate.merge_operations', index=17,
+      number=18, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='estimate_cow_size', full_name='chromeos_update_engine.PartitionUpdate.estimate_cow_size', index=18,
+      number=19, type=4, cpp_type=4, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -525,8 +603,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=926,
-  serialized_end=1670,
+  serialized_start=1136,
+  serialized_end=1976,
 )
 
 
@@ -570,8 +648,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1672,
-  serialized_end=1748,
+  serialized_start=1978,
+  serialized_end=2054,
 )
 
 
@@ -608,8 +686,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1750,
-  serialized_end=1865,
+  serialized_start=2056,
+  serialized_end=2171,
 )
 
 
@@ -744,8 +822,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1868,
-  serialized_end=2733,
+  serialized_start=2174,
+  serialized_end=3039,
 )
 
 _SIGNATURES_SIGNATURE.containing_type = _SIGNATURES
@@ -754,6 +832,10 @@
 _INSTALLOPERATION.fields_by_name['src_extents'].message_type = _EXTENT
 _INSTALLOPERATION.fields_by_name['dst_extents'].message_type = _EXTENT
 _INSTALLOPERATION_TYPE.containing_type = _INSTALLOPERATION
+_COWMERGEOPERATION.fields_by_name['type'].enum_type = _COWMERGEOPERATION_TYPE
+_COWMERGEOPERATION.fields_by_name['src_extent'].message_type = _EXTENT
+_COWMERGEOPERATION.fields_by_name['dst_extent'].message_type = _EXTENT
+_COWMERGEOPERATION_TYPE.containing_type = _COWMERGEOPERATION
 _PARTITIONUPDATE.fields_by_name['new_partition_signature'].message_type = _SIGNATURES_SIGNATURE
 _PARTITIONUPDATE.fields_by_name['old_partition_info'].message_type = _PARTITIONINFO
 _PARTITIONUPDATE.fields_by_name['new_partition_info'].message_type = _PARTITIONINFO
@@ -762,6 +844,7 @@
 _PARTITIONUPDATE.fields_by_name['hash_tree_extent'].message_type = _EXTENT
 _PARTITIONUPDATE.fields_by_name['fec_data_extent'].message_type = _EXTENT
 _PARTITIONUPDATE.fields_by_name['fec_extent'].message_type = _EXTENT
+_PARTITIONUPDATE.fields_by_name['merge_operations'].message_type = _COWMERGEOPERATION
 _DYNAMICPARTITIONMETADATA.fields_by_name['groups'].message_type = _DYNAMICPARTITIONGROUP
 _DELTAARCHIVEMANIFEST.fields_by_name['install_operations'].message_type = _INSTALLOPERATION
 _DELTAARCHIVEMANIFEST.fields_by_name['kernel_install_operations'].message_type = _INSTALLOPERATION
@@ -778,6 +861,7 @@
 DESCRIPTOR.message_types_by_name['PartitionInfo'] = _PARTITIONINFO
 DESCRIPTOR.message_types_by_name['ImageInfo'] = _IMAGEINFO
 DESCRIPTOR.message_types_by_name['InstallOperation'] = _INSTALLOPERATION
+DESCRIPTOR.message_types_by_name['CowMergeOperation'] = _COWMERGEOPERATION
 DESCRIPTOR.message_types_by_name['PartitionUpdate'] = _PARTITIONUPDATE
 DESCRIPTOR.message_types_by_name['DynamicPartitionGroup'] = _DYNAMICPARTITIONGROUP
 DESCRIPTOR.message_types_by_name['DynamicPartitionMetadata'] = _DYNAMICPARTITIONMETADATA
@@ -827,6 +911,13 @@
   })
 _sym_db.RegisterMessage(InstallOperation)
 
+CowMergeOperation = _reflection.GeneratedProtocolMessageType('CowMergeOperation', (_message.Message,), {
+  'DESCRIPTOR' : _COWMERGEOPERATION,
+  '__module__' : 'update_metadata_pb2'
+  # @@protoc_insertion_point(class_scope:chromeos_update_engine.CowMergeOperation)
+  })
+_sym_db.RegisterMessage(CowMergeOperation)
+
 PartitionUpdate = _reflection.GeneratedProtocolMessageType('PartitionUpdate', (_message.Message,), {
   'DESCRIPTOR' : _PARTITIONUPDATE,
   '__module__' : 'update_metadata_pb2'
diff --git a/stable/Android.bp b/stable/Android.bp
index 337ae96..a415ac5 100644
--- a/stable/Android.bp
+++ b/stable/Android.bp
@@ -18,6 +18,13 @@
 // ========================================================
 aidl_interface {
     name: "libupdate_engine_stable",
+
+    // This header library is available to core and product modules.
+    // Right now, vendor_available is the only way to specify this.
+    // vendor modules should NOT use this library.
+    // TODO(b/150902910): change this to product_available.
+    vendor_available: true,
+
     srcs: [
         "android/os/IUpdateEngineStable.aidl",
         "android/os/IUpdateEngineStableCallback.aidl",
@@ -40,10 +47,10 @@
 
 // update_engine_stable_client (type: executable)
 // ========================================================
-// update_engine console client installed to APEXes
+// update_engine console client installed to APEXes.
 cc_binary {
     name: "update_engine_stable_client",
-
+    product_specific: true,
     header_libs: [
         "libupdate_engine_headers",
     ],
diff --git a/stable/update_engine_stable_client.cc b/stable/update_engine_stable_client.cc
index da203c4..17f66b6 100644
--- a/stable/update_engine_stable_client.cc
+++ b/stable/update_engine_stable_client.cc
@@ -32,7 +32,6 @@
 #include <android/binder_ibinder.h>
 #include <common/error_code.h>
 #include <gflags/gflags.h>
-#include <utils/StrongPointer.h>
 
 namespace chromeos_update_engine::internal {
 
diff --git a/update_attempter_android.cc b/update_attempter_android.cc
index 7fc13e1..3578d95 100644
--- a/update_attempter_android.cc
+++ b/update_attempter_android.cc
@@ -507,7 +507,7 @@
         return LogAndSetError(
             error, FROM_HERE, "Failed to hash " + partition_path);
       }
-      if (!DeltaPerformer::ValidateSourceHash(
+      if (!PartitionWriter::ValidateSourceHash(
               source_hash, operation, fd, &errorcode)) {
         return false;
       }
diff --git a/update_metadata.proto b/update_metadata.proto
index 99bfa84..452b89d 100644
--- a/update_metadata.proto
+++ b/update_metadata.proto
@@ -314,6 +314,11 @@
   // skip writing the raw bytes for these extents. During snapshot merge, the
   // bytes will read from the source partitions instead.
   repeated CowMergeOperation merge_operations = 18;
+
+  // Estimated size for COW image. This is used by libsnapshot
+  // as a hint. If set to 0, libsnapshot should use alternative
+  // methods for estimating size.
+  optional uint64 estimate_cow_size = 19;
 }
 
 message DynamicPartitionGroup {