Merge remote-tracking branch 'aosp/upstream-master' into merge

Test: treehugger
Change-Id: I4984f03fa95a753fb17779451eb458f177432d4f
diff --git a/Android.bp b/Android.bp
index a02b16f..5ae7a73 100644
--- a/Android.bp
+++ b/Android.bp
@@ -80,6 +80,28 @@
     },
 }
 
+// libcow_operation_convert (type: library)
+// ========================================================
+cc_library {
+    name: "libcow_operation_convert",
+    host_supported: true,
+    recovery_available: true,
+    defaults: [
+        "ue_defaults",
+        "update_metadata-protos_exports",
+    ],
+    srcs: [
+        "common/cow_operation_convert.cc",
+    ],
+    static_libs: [
+        "libsnapshot_cow",
+        "update_metadata-protos",
+        "libpayload_extent_ranges",
+        "libbrotli",
+        "libz",
+    ],
+}
+
 // update_metadata-protos (type: static_library)
 // ========================================================
 // Protobufs.
@@ -122,6 +144,11 @@
         "libfec_rs",
         "libpuffpatch",
         "libverity_tree",
+        "libsnapshot_cow",
+        "libbrotli",
+        "libz",
+        "libpayload_extent_ranges",
+        "libcow_operation_convert",
     ],
     shared_libs: [
         "libbase",
@@ -175,6 +202,10 @@
         "payload_consumer/payload_constants.cc",
         "payload_consumer/payload_metadata.cc",
         "payload_consumer/payload_verifier.cc",
+        "payload_consumer/partition_writer.cc",
+        "payload_consumer/partition_writer_factory_android.cc",
+        "payload_consumer/vabc_partition_writer.cc",
+        "payload_consumer/snapshot_extent_writer.cc",
         "payload_consumer/postinstall_runner_action.cc",
         "payload_consumer/verity_writer_android.cc",
         "payload_consumer/xz_extent_writer.cc",
@@ -196,6 +227,8 @@
         "libgsi",
         "libpayload_consumer",
         "libsnapshot",
+        "libsnapshot_cow",
+        "libz",
         "update_metadata-protos",
     ],
     shared_libs: [
@@ -257,7 +290,7 @@
     ],
 
     static_libs: [
-        "libkver",
+        "gkiprops",
         "libpayload_consumer",
         "libupdate_engine_boot_control",
     ],
@@ -384,6 +417,7 @@
         // We add the static versions of the shared libraries that are not installed to
         // recovery image due to size concerns. Need to include all the static library
         // dependencies of these static libraries.
+        "gkiprops",
         "libevent",
         "libmodpb64",
         "libgtest_prod",
@@ -391,7 +425,6 @@
         "libbrillo-stream",
         "libbrillo",
         "libchrome",
-        "libkver",
     ],
     target: {
         recovery: {
@@ -477,6 +510,7 @@
         "ue_defaults",
     ],
     host_supported: true,
+    recovery_available: true,
     srcs: [
         "payload_generator/extent_ranges.cc",
     ],
@@ -502,6 +536,7 @@
         "payload_generator/block_mapping.cc",
         "payload_generator/boot_img_filesystem.cc",
         "payload_generator/bzip.cc",
+        "payload_generator/cow_size_estimator.cc",
         "payload_generator/deflate_utils.cc",
         "payload_generator/delta_diff_generator.cc",
         "payload_generator/delta_diff_utils.cc",
@@ -674,6 +709,7 @@
         "common/action_pipe_unittest.cc",
         "common/action_processor_unittest.cc",
         "common/action_unittest.cc",
+        "common/cow_operation_convert_unittest.cc",
         "common/cpu_limiter_unittest.cc",
         "common/fake_prefs.cc",
         "common/file_fetcher_unittest.cc",
@@ -695,8 +731,10 @@
         "payload_consumer/certificate_parser_android_unittest.cc",
         "payload_consumer/delta_performer_integration_test.cc",
         "payload_consumer/delta_performer_unittest.cc",
+        "payload_consumer/partition_writer_unittest.cc",
         "payload_consumer/extent_reader_unittest.cc",
         "payload_consumer/extent_writer_unittest.cc",
+        "payload_consumer/snapshot_extent_writer_unittest.cc",
         "payload_consumer/fake_file_descriptor.cc",
         "payload_consumer/file_descriptor_utils_unittest.cc",
         "payload_consumer/file_writer_unittest.cc",
@@ -755,9 +793,25 @@
 // update_engine header library
 cc_library_headers {
     name: "libupdate_engine_headers",
+
+    // This header library is available to core and product modules.
+    // Right now, vendor_available is the only way to specify this.
+    // vendor modules should NOT use this library.
+    // TODO(b/150902910): change this to product_available.
+    vendor_available: true,
+
     export_include_dirs: ["."],
     apex_available: [
         "com.android.gki.*",
+        "//apex_available:platform",
     ],
     host_supported: true,
+    recovery_available: true,
+    ramdisk_available: true,
+
+    target: {
+        darwin: {
+            enabled: false,
+        },
+    }
 }
diff --git a/BUILD.gn b/BUILD.gn
index ed85594..1ddae22 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -151,6 +151,8 @@
     "payload_consumer/install_plan.cc",
     "payload_consumer/mount_history.cc",
     "payload_consumer/partition_update_generator_stub.cc",
+    "payload_consumer/partition_writer_factory_chromeos.cc",
+    "payload_consumer/partition_writer.cc",
     "payload_consumer/payload_constants.cc",
     "payload_consumer/payload_metadata.cc",
     "payload_consumer/payload_verifier.cc",
@@ -335,6 +337,7 @@
     "payload_generator/block_mapping.cc",
     "payload_generator/boot_img_filesystem_stub.cc",
     "payload_generator/bzip.cc",
+    "payload_generator/cow_size_estimator_stub.cc",
     "payload_generator/deflate_utils.cc",
     "payload_generator/delta_diff_generator.cc",
     "payload_generator/delta_diff_utils.cc",
diff --git a/aosp/boot_control_android.cc b/aosp/boot_control_android.cc
index bda65be..3b20fc2 100644
--- a/aosp/boot_control_android.cc
+++ b/aosp/boot_control_android.cc
@@ -30,8 +30,6 @@
 
 using std::string;
 
-using android::dm::DmDeviceState;
-using android::hardware::hidl_string;
 using android::hardware::Return;
 using android::hardware::boot::V1_0::BoolResult;
 using android::hardware::boot::V1_0::CommandResult;
@@ -183,4 +181,12 @@
   return dynamic_control_.get();
 }
 
+std::optional<PartitionDevice> BootControlAndroid::GetPartitionDevice(
+    const std::string& partition_name,
+    uint32_t slot,
+    uint32_t current_slot,
+    bool not_in_payload) const {
+  return dynamic_control_->GetPartitionDevice(
+      partition_name, slot, current_slot, not_in_payload);
+}
 }  // namespace chromeos_update_engine
diff --git a/aosp/boot_control_android.h b/aosp/boot_control_android.h
index e288723..926023a 100644
--- a/aosp/boot_control_android.h
+++ b/aosp/boot_control_android.h
@@ -44,6 +44,11 @@
   // BootControlInterface overrides.
   unsigned int GetNumSlots() const override;
   BootControlInterface::Slot GetCurrentSlot() const override;
+  std::optional<PartitionDevice> GetPartitionDevice(
+      const std::string& partition_name,
+      uint32_t slot,
+      uint32_t current_slot,
+      bool not_in_payload = false) const override;
   bool GetPartitionDevice(const std::string& partition_name,
                           BootControlInterface::Slot slot,
                           bool not_in_payload,
diff --git a/aosp/cleanup_previous_update_action.cc b/aosp/cleanup_previous_update_action.cc
index 278b101..16cb9fe 100644
--- a/aosp/cleanup_previous_update_action.cc
+++ b/aosp/cleanup_previous_update_action.cc
@@ -67,30 +67,28 @@
       last_percentage_(0),
       merge_stats_(nullptr) {}
 
+CleanupPreviousUpdateAction::~CleanupPreviousUpdateAction() {
+  StopActionInternal();
+}
+
 void CleanupPreviousUpdateAction::PerformAction() {
-  ResumeAction();
+  StartActionInternal();
 }
 
 void CleanupPreviousUpdateAction::TerminateProcessing() {
-  SuspendAction();
+  StopActionInternal();
 }
 
 void CleanupPreviousUpdateAction::ResumeAction() {
-  CHECK(prefs_);
-  CHECK(boot_control_);
-
-  LOG(INFO) << "Starting/resuming CleanupPreviousUpdateAction";
-  running_ = true;
   StartActionInternal();
 }
 
 void CleanupPreviousUpdateAction::SuspendAction() {
-  LOG(INFO) << "Stopping/suspending CleanupPreviousUpdateAction";
-  running_ = false;
+  StopActionInternal();
 }
 
 void CleanupPreviousUpdateAction::ActionCompleted(ErrorCode error_code) {
-  running_ = false;
+  StopActionInternal();
   ReportMergeStats();
   metadata_device_ = nullptr;
 }
@@ -103,7 +101,52 @@
   return "CleanupPreviousUpdateAction";
 }
 
+// This function is called at the beginning of all delayed functions. By
+// resetting |scheduled_task_|, the delayed function acknowledges that the task
+// has already been executed, therefore there's no need to cancel it in the
+// future. This avoids StopActionInternal() from resetting task IDs in an
+// unexpected way because task IDs could be reused.
+void CleanupPreviousUpdateAction::AcknowledgeTaskExecuted() {
+  if (scheduled_task_ != MessageLoop::kTaskIdNull) {
+    LOG(INFO) << "Executing task " << scheduled_task_;
+  }
+  scheduled_task_ = MessageLoop::kTaskIdNull;
+}
+
+// Check that scheduled_task_ is a valid task ID. Otherwise, terminate the
+// action.
+void CleanupPreviousUpdateAction::CheckTaskScheduled(std::string_view name) {
+  if (scheduled_task_ == MessageLoop::kTaskIdNull) {
+    LOG(ERROR) << "Unable to schedule " << name;
+    processor_->ActionComplete(this, ErrorCode::kError);
+  } else {
+    LOG(INFO) << "CleanupPreviousUpdateAction scheduled task ID "
+              << scheduled_task_ << " for " << name;
+  }
+}
+
+void CleanupPreviousUpdateAction::StopActionInternal() {
+  LOG(INFO) << "Stopping/suspending/completing CleanupPreviousUpdateAction";
+  running_ = false;
+
+  if (scheduled_task_ != MessageLoop::kTaskIdNull) {
+    if (MessageLoop::current()->CancelTask(scheduled_task_)) {
+      LOG(INFO) << "CleanupPreviousUpdateAction cancelled pending task ID "
+                << scheduled_task_;
+    } else {
+      LOG(ERROR) << "CleanupPreviousUpdateAction unable to cancel task ID "
+                 << scheduled_task_;
+    }
+  }
+  scheduled_task_ = MessageLoop::kTaskIdNull;
+}
+
 void CleanupPreviousUpdateAction::StartActionInternal() {
+  CHECK(prefs_);
+  CHECK(boot_control_);
+
+  LOG(INFO) << "Starting/resuming CleanupPreviousUpdateAction";
+  running_ = true;
   // Do nothing on non-VAB device.
   if (!boot_control_->GetDynamicPartitionControl()
            ->GetVirtualAbFeatureFlag()
@@ -120,14 +163,16 @@
 
 void CleanupPreviousUpdateAction::ScheduleWaitBootCompleted() {
   TEST_AND_RETURN(running_);
-  MessageLoop::current()->PostDelayedTask(
+  scheduled_task_ = MessageLoop::current()->PostDelayedTask(
       FROM_HERE,
       base::Bind(&CleanupPreviousUpdateAction::WaitBootCompletedOrSchedule,
                  base::Unretained(this)),
       kCheckBootCompletedInterval);
+  CheckTaskScheduled("WaitBootCompleted");
 }
 
 void CleanupPreviousUpdateAction::WaitBootCompletedOrSchedule() {
+  AcknowledgeTaskExecuted();
   TEST_AND_RETURN(running_);
   if (!kIsRecovery &&
       !android::base::GetBoolProperty(kBootCompletedProp, false)) {
@@ -142,15 +187,17 @@
 
 void CleanupPreviousUpdateAction::ScheduleWaitMarkBootSuccessful() {
   TEST_AND_RETURN(running_);
-  MessageLoop::current()->PostDelayedTask(
+  scheduled_task_ = MessageLoop::current()->PostDelayedTask(
       FROM_HERE,
       base::Bind(
           &CleanupPreviousUpdateAction::CheckSlotMarkedSuccessfulOrSchedule,
           base::Unretained(this)),
       kCheckSlotMarkedSuccessfulInterval);
+  CheckTaskScheduled("WaitMarkBootSuccessful");
 }
 
 void CleanupPreviousUpdateAction::CheckSlotMarkedSuccessfulOrSchedule() {
+  AcknowledgeTaskExecuted();
   TEST_AND_RETURN(running_);
   if (!kIsRecovery &&
       !boot_control_->IsSlotMarkedSuccessful(boot_control_->GetCurrentSlot())) {
@@ -212,14 +259,16 @@
 
 void CleanupPreviousUpdateAction::ScheduleWaitForMerge() {
   TEST_AND_RETURN(running_);
-  MessageLoop::current()->PostDelayedTask(
+  scheduled_task_ = MessageLoop::current()->PostDelayedTask(
       FROM_HERE,
       base::Bind(&CleanupPreviousUpdateAction::WaitForMergeOrSchedule,
                  base::Unretained(this)),
       kWaitForMergeInterval);
+  CheckTaskScheduled("WaitForMerge");
 }
 
 void CleanupPreviousUpdateAction::WaitForMergeOrSchedule() {
+  AcknowledgeTaskExecuted();
   TEST_AND_RETURN(running_);
   auto state = snapshot_->ProcessUpdateState(
       std::bind(&CleanupPreviousUpdateAction::OnMergePercentageUpdate, this),
diff --git a/aosp/cleanup_previous_update_action.h b/aosp/cleanup_previous_update_action.h
index 73cef26..b93c557 100644
--- a/aosp/cleanup_previous_update_action.h
+++ b/aosp/cleanup_previous_update_action.h
@@ -20,6 +20,7 @@
 #include <chrono>  // NOLINT(build/c++11) -- for merge times
 #include <memory>
 #include <string>
+#include <string_view>
 
 #include <brillo/message_loops/message_loop.h>
 #include <libsnapshot/snapshot.h>
@@ -51,6 +52,7 @@
       BootControlInterface* boot_control,
       android::snapshot::ISnapshotManager* snapshot,
       CleanupPreviousUpdateActionDelegateInterface* delegate);
+  ~CleanupPreviousUpdateAction();
 
   void PerformAction() override;
   void SuspendAction() override;
@@ -74,7 +76,13 @@
   bool cancel_failed_{false};
   unsigned int last_percentage_{0};
   android::snapshot::ISnapshotMergeStats* merge_stats_;
+  brillo::MessageLoop::TaskId scheduled_task_{brillo::MessageLoop::kTaskIdNull};
 
+  // Helpers for task management.
+  void AcknowledgeTaskExecuted();
+  void CheckTaskScheduled(std::string_view name);
+
+  void StopActionInternal();
   void StartActionInternal();
   void ScheduleWaitBootCompleted();
   void WaitBootCompletedOrSchedule();
diff --git a/aosp/dynamic_partition_control_android.cc b/aosp/dynamic_partition_control_android.cc
index e045965..1575796 100644
--- a/aosp/dynamic_partition_control_android.cc
+++ b/aosp/dynamic_partition_control_android.cc
@@ -16,7 +16,9 @@
 
 #include "update_engine/aosp/dynamic_partition_control_android.h"
 
+#include <algorithm>
 #include <chrono>  // NOLINT(build/c++11) - using libsnapshot / liblp API
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <set>
@@ -36,6 +38,8 @@
 #include <fs_mgr_overlayfs.h>
 #include <libavb/libavb.h>
 #include <libdm/dm.h>
+#include <liblp/liblp.h>
+#include <libsnapshot/cow_writer.h>
 #include <libsnapshot/snapshot.h>
 #include <libsnapshot/snapshot_stub.h>
 
@@ -71,6 +75,14 @@
     "ro.boot.dynamic_partitions_retrofit";
 constexpr char kVirtualAbEnabled[] = "ro.virtual_ab.enabled";
 constexpr char kVirtualAbRetrofit[] = "ro.virtual_ab.retrofit";
+constexpr char kVirtualAbCompressionEnabled[] =
+    "ro.virtual_ab.compression.enabled";
+
+// Currently, android doesn't have a retrofit prop for VAB Compression. However,
+// struct FeatureFlag forces us to determine if a feature is 'retrofit'. So this
+// is here just to simplify code. Replace it with real retrofit prop name once
+// there is one.
+constexpr char kVirtualAbCompressionRetrofit[] = "";
 constexpr char kPostinstallFstabPrefix[] = "ro.postinstall.fstab.prefix";
 // Map timeout for dynamic partitions.
 constexpr std::chrono::milliseconds kMapTimeout{1000};
@@ -78,19 +90,15 @@
 // needs to be mapped, this timeout is longer than |kMapTimeout|.
 constexpr std::chrono::milliseconds kMapSnapshotTimeout{5000};
 
-#ifdef __ANDROID_RECOVERY__
-constexpr bool kIsRecovery = true;
-#else
-constexpr bool kIsRecovery = false;
-#endif
-
 DynamicPartitionControlAndroid::~DynamicPartitionControlAndroid() {
   Cleanup();
 }
 
 static FeatureFlag GetFeatureFlag(const char* enable_prop,
                                   const char* retrofit_prop) {
-  bool retrofit = GetBoolProperty(retrofit_prop, false);
+  // Default retrofit to false if retrofit_prop is empty.
+  bool retrofit = retrofit_prop && retrofit_prop[0] != '\0' &&
+                  GetBoolProperty(retrofit_prop, false);
   bool enabled = GetBoolProperty(enable_prop, false);
   if (retrofit && !enabled) {
     LOG(ERROR) << retrofit_prop << " is true but " << enable_prop
@@ -109,7 +117,9 @@
 DynamicPartitionControlAndroid::DynamicPartitionControlAndroid()
     : dynamic_partitions_(
           GetFeatureFlag(kUseDynamicPartitions, kRetrfoitDynamicPartitions)),
-      virtual_ab_(GetFeatureFlag(kVirtualAbEnabled, kVirtualAbRetrofit)) {
+      virtual_ab_(GetFeatureFlag(kVirtualAbEnabled, kVirtualAbRetrofit)),
+      virtual_ab_compression_(GetFeatureFlag(kVirtualAbCompressionEnabled,
+                                             kVirtualAbCompressionRetrofit)) {
   if (GetVirtualAbFeatureFlag().IsEnabled()) {
     snapshot_ = SnapshotManager::New();
   } else {
@@ -126,6 +136,11 @@
   return virtual_ab_;
 }
 
+FeatureFlag
+DynamicPartitionControlAndroid::GetVirtualAbCompressionFeatureFlag() {
+  return virtual_ab_compression_;
+}
+
 bool DynamicPartitionControlAndroid::OptimizeOperation(
     const std::string& partition_name,
     const InstallOperation& operation,
@@ -259,9 +274,9 @@
   return true;
 }
 
-void DynamicPartitionControlAndroid::UnmapAllPartitions() {
+bool DynamicPartitionControlAndroid::UnmapAllPartitions() {
   if (mapped_devices_.empty()) {
-    return;
+    return false;
   }
   // UnmapPartitionOnDeviceMapper removes objects from mapped_devices_, hence
   // a copy is needed for the loop.
@@ -270,6 +285,7 @@
   for (const auto& partition_name : mapped) {
     ignore_result(UnmapPartitionOnDeviceMapper(partition_name));
   }
+  return true;
 }
 
 void DynamicPartitionControlAndroid::Cleanup() {
@@ -945,47 +961,16 @@
     bool not_in_payload,
     std::string* device,
     bool* is_dynamic) {
-  const auto& partition_name_suffix =
-      partition_name + SlotSuffixForSlotNumber(slot);
-  std::string device_dir_str;
-  TEST_AND_RETURN_FALSE(GetDeviceDir(&device_dir_str));
-  base::FilePath device_dir(device_dir_str);
-
-  if (is_dynamic) {
-    *is_dynamic = false;
-  }
-
-  // When looking up target partition devices, treat them as static if the
-  // current payload doesn't encode them as dynamic partitions. This may happen
-  // when applying a retrofit update on top of a dynamic-partitions-enabled
-  // build.
-  if (GetDynamicPartitionsFeatureFlag().IsEnabled() &&
-      (slot == current_slot || is_target_dynamic_)) {
-    switch (GetDynamicPartitionDevice(device_dir,
-                                      partition_name_suffix,
-                                      slot,
-                                      current_slot,
-                                      not_in_payload,
-                                      device)) {
-      case DynamicPartitionDeviceStatus::SUCCESS:
-        if (is_dynamic) {
-          *is_dynamic = true;
-        }
-        return true;
-      case DynamicPartitionDeviceStatus::TRY_STATIC:
-        break;
-      case DynamicPartitionDeviceStatus::ERROR:  // fallthrough
-      default:
-        return false;
-    }
-  }
-  base::FilePath path = device_dir.Append(partition_name_suffix);
-  if (!DeviceExists(path.value())) {
-    LOG(ERROR) << "Device file " << path.value() << " does not exist.";
+  auto partition_dev = GetPartitionDevice(partition_name, slot, current_slot);
+  if (!partition_dev.has_value()) {
     return false;
   }
-
-  *device = path.value();
+  if (device) {
+    *device = std::move(partition_dev->rw_device_path);
+  }
+  if (is_dynamic) {
+    *is_dynamic = partition_dev->is_dynamic;
+  }
   return true;
 }
 
@@ -998,6 +983,73 @@
       partition_name, slot, current_slot, false, device, nullptr);
 }
 
+static std::string GetStaticDevicePath(
+    const base::FilePath& device_dir,
+    const std::string& partition_name_suffixed) {
+  base::FilePath path = device_dir.Append(partition_name_suffixed);
+  return path.value();
+}
+
+std::optional<PartitionDevice>
+DynamicPartitionControlAndroid::GetPartitionDevice(
+    const std::string& partition_name,
+    uint32_t slot,
+    uint32_t current_slot,
+    bool not_in_payload) {
+  std::string device_dir_str;
+  if (!GetDeviceDir(&device_dir_str)) {
+    LOG(ERROR) << "Failed to GetDeviceDir()";
+    return {};
+  }
+  const base::FilePath device_dir(device_dir_str);
+  // When VABC is enabled, we can't get device path for dynamic partitions in
+  // target slot.
+  const auto& partition_name_suffix =
+      partition_name + SlotSuffixForSlotNumber(slot);
+  if (GetVirtualAbCompressionFeatureFlag().IsEnabled() &&
+      IsDynamicPartition(partition_name) && slot != current_slot) {
+    return {{.mountable_device_path =
+                 GetStaticDevicePath(device_dir, partition_name_suffix),
+             .is_dynamic = true}};
+  }
+
+  // When looking up target partition devices, treat them as static if the
+  // current payload doesn't encode them as dynamic partitions. This may happen
+  // when applying a retrofit update on top of a dynamic-partitions-enabled
+  // build.
+  std::string device;
+  if (GetDynamicPartitionsFeatureFlag().IsEnabled() &&
+      (slot == current_slot || is_target_dynamic_)) {
+    switch (GetDynamicPartitionDevice(device_dir,
+                                      partition_name_suffix,
+                                      slot,
+                                      current_slot,
+                                      not_in_payload,
+                                      &device)) {
+      case DynamicPartitionDeviceStatus::SUCCESS:
+        return {{.rw_device_path = device,
+                 .mountable_device_path = device,
+                 .is_dynamic = true}};
+
+      case DynamicPartitionDeviceStatus::TRY_STATIC:
+        break;
+      case DynamicPartitionDeviceStatus::ERROR:  // fallthrough
+      default:
+        return {};
+    }
+  }
+  // Try static partitions.
+  auto static_path = GetStaticDevicePath(device_dir, partition_name_suffix);
+  if (!DeviceExists(static_path)) {
+    LOG(ERROR) << "Device file " << static_path << " does not exist.";
+    return {};
+  }
+
+  return {{.rw_device_path = static_path,
+           .mountable_device_path = static_path,
+           .is_dynamic = false}};
+}
+
 bool DynamicPartitionControlAndroid::IsSuperBlockDevice(
     const base::FilePath& device_dir,
     uint32_t current_slot,
@@ -1068,7 +1120,7 @@
 }
 
 bool DynamicPartitionControlAndroid::IsRecovery() {
-  return kIsRecovery;
+  return constants::kIsRecovery;
 }
 
 static bool IsIncrementalUpdate(const DeltaArchiveManifest& manifest) {
@@ -1219,4 +1271,65 @@
   return metadata_device_ != nullptr;
 }
 
+std::unique_ptr<android::snapshot::ISnapshotWriter>
+DynamicPartitionControlAndroid::OpenCowWriter(
+    const std::string& partition_name,
+    const std::optional<std::string>& source_path,
+    bool is_append) {
+  auto suffix = SlotSuffixForSlotNumber(target_slot_);
+
+  auto super_device = GetSuperDevice();
+  if (!super_device.has_value()) {
+    return nullptr;
+  }
+  CreateLogicalPartitionParams params = {
+      .block_device = super_device->value(),
+      .metadata_slot = target_slot_,
+      .partition_name = partition_name + suffix,
+      .force_writable = true,
+      .timeout_ms = kMapSnapshotTimeout};
+  // TODO(zhangkelvin) Open an APPEND mode CowWriter once there's an API to do
+  // it.
+  return snapshot_->OpenSnapshotWriter(params, std::move(source_path));
+}  // namespace chromeos_update_engine
+
+FileDescriptorPtr DynamicPartitionControlAndroid::OpenCowReader(
+    const std::string& unsuffixed_partition_name,
+    const std::optional<std::string>& source_path,
+    bool is_append) {
+  auto cow_writer =
+      OpenCowWriter(unsuffixed_partition_name, source_path, is_append);
+  if (cow_writer == nullptr) {
+    return nullptr;
+  }
+  cow_writer->InitializeAppend(kEndOfInstallLabel);
+  return cow_writer->OpenReader();
+}
+
+std::optional<base::FilePath> DynamicPartitionControlAndroid::GetSuperDevice() {
+  std::string device_dir_str;
+  if (!GetDeviceDir(&device_dir_str)) {
+    LOG(ERROR) << "Failed to get device dir!";
+    return {};
+  }
+  base::FilePath device_dir(device_dir_str);
+  auto super_device = device_dir.Append(GetSuperPartitionName(target_slot_));
+  return super_device;
+}
+
+bool DynamicPartitionControlAndroid::MapAllPartitions() {
+  return snapshot_->MapAllSnapshots(kMapSnapshotTimeout);
+}
+
+bool DynamicPartitionControlAndroid::IsDynamicPartition(
+    const std::string& partition_name) {
+  if (dynamic_partition_list_.empty() &&
+      GetDynamicPartitionsFeatureFlag().IsEnabled()) {
+    CHECK(ListDynamicPartitionsForSlot(source_slot_, &dynamic_partition_list_));
+  }
+  return std::find(dynamic_partition_list_.begin(),
+                   dynamic_partition_list_.end(),
+                   partition_name) != dynamic_partition_list_.end();
+}
+
 }  // namespace chromeos_update_engine
diff --git a/aosp/dynamic_partition_control_android.h b/aosp/dynamic_partition_control_android.h
index 79f87d9..ecab6fa 100644
--- a/aosp/dynamic_partition_control_android.h
+++ b/aosp/dynamic_partition_control_android.h
@@ -25,6 +25,7 @@
 #include <base/files/file_util.h>
 #include <libsnapshot/auto_device.h>
 #include <libsnapshot/snapshot.h>
+#include <libsnapshot/snapshot_writer.h>
 
 #include "update_engine/common/dynamic_partition_control_interface.h"
 
@@ -34,8 +35,10 @@
  public:
   DynamicPartitionControlAndroid();
   ~DynamicPartitionControlAndroid();
+
   FeatureFlag GetDynamicPartitionsFeatureFlag() override;
   FeatureFlag GetVirtualAbFeatureFlag() override;
+  FeatureFlag GetVirtualAbCompressionFeatureFlag() override;
   bool OptimizeOperation(const std::string& partition_name,
                          const InstallOperation& operation,
                          InstallOperation* optimized) override;
@@ -69,6 +72,13 @@
   // Note: this function is only used by BootControl*::GetPartitionDevice.
   // Other callers should prefer BootControl*::GetPartitionDevice over
   // BootControl*::GetDynamicPartitionControl()->GetPartitionDevice().
+  std::optional<PartitionDevice> GetPartitionDevice(
+      const std::string& partition_name,
+      uint32_t slot,
+      uint32_t current_slot,
+      bool not_in_payload = false);
+  // Deprecated, please use GetPartitionDevice(string, uint32_t, uint32_t);
+  // TODO(zhangkelvin) Remove below deprecated APIs.
   bool GetPartitionDevice(const std::string& partition_name,
                           uint32_t slot,
                           uint32_t current_slot,
@@ -81,6 +91,20 @@
                           uint32_t current_slot,
                           std::string* device);
 
+  // Partition name is expected to be unsuffixed. e.g. system, vendor
+  // Return an interface to write to a snapshoted partition.
+  std::unique_ptr<android::snapshot::ISnapshotWriter> OpenCowWriter(
+      const std::string& unsuffixed_partition_name,
+      const std::optional<std::string>& source_path,
+      bool is_append) override;
+  FileDescriptorPtr OpenCowReader(const std::string& unsuffixed_partition_name,
+                                  const std::optional<std::string>&,
+                                  bool is_append = false) override;
+
+  bool UnmapAllPartitions() override;
+
+  bool IsDynamicPartition(const std::string& part_name) override;
+
  protected:
   // These functions are exposed for testing.
 
@@ -94,10 +118,10 @@
   virtual std::unique_ptr<android::fs_mgr::MetadataBuilder> LoadMetadataBuilder(
       const std::string& super_device, uint32_t slot);
 
-  // Retrieves metadata from |super_device| at slot |source_slot|. And modifies
-  // the metadata so that during updates, the metadata can be written to
-  // |target_slot|. In particular, on retrofit devices, the returned metadata
-  // automatically includes block devices at |target_slot|.
+  // Retrieves metadata from |super_device| at slot |source_slot|. And
+  // modifies the metadata so that during updates, the metadata can be written
+  // to |target_slot|. In particular, on retrofit devices, the returned
+  // metadata automatically includes block devices at |target_slot|.
   virtual std::unique_ptr<android::fs_mgr::MetadataBuilder> LoadMetadataBuilder(
       const std::string& super_device,
       uint32_t source_slot,
@@ -192,11 +216,17 @@
       const DeltaArchiveManifest& manifest,
       bool delete_source);
 
+  bool MapAllPartitions() override;
+
+  void SetSourceSlot(uint32_t slot) { source_slot_ = slot; }
+  void SetTargetSlot(uint32_t slot) { target_slot_ = slot; }
+
  private:
   friend class DynamicPartitionControlAndroidTest;
   friend class SnapshotPartitionTestP;
 
-  void UnmapAllPartitions();
+  std::optional<base::FilePath> GetSuperDevice();
+
   bool MapPartitionInternal(const std::string& super_device,
                             const std::string& target_partition_name,
                             uint32_t slot,
@@ -212,8 +242,8 @@
                                uint32_t target_slot,
                                const DeltaArchiveManifest& manifest);
 
-  // Helper for PreparePartitionsForUpdate. Used for snapshotted partitions for
-  // Virtual A/B update.
+  // Helper for PreparePartitionsForUpdate. Used for snapshotted partitions
+  // for Virtual A/B update.
   bool PrepareSnapshotPartitionsForUpdate(uint32_t source_slot,
                                           uint32_t target_slot,
                                           const DeltaArchiveManifest& manifest,
@@ -251,17 +281,20 @@
   // Returns true if metadata is expected to be mounted, false otherwise.
   // Note that it returns false on non-Virtual A/B devices.
   //
-  // Almost all functions of SnapshotManager depends on metadata being mounted.
+  // Almost all functions of SnapshotManager depends on metadata being
+  // mounted.
   // - In Android mode for Virtual A/B devices, assume it is mounted. If not,
   //   let caller fails when calling into SnapshotManager.
-  // - In recovery for Virtual A/B devices, it is possible that metadata is not
+  // - In recovery for Virtual A/B devices, it is possible that metadata is
+  // not
   //   formatted, hence it cannot be mounted. Caller should not call into
   //   SnapshotManager.
-  // - On non-Virtual A/B devices, updates do not depend on metadata partition.
+  // - On non-Virtual A/B devices, updates do not depend on metadata
+  // partition.
   //   Caller should not call into SnapshotManager.
   //
-  // This function does NOT mount metadata partition. Use EnsureMetadataMounted
-  // to mount metadata partition.
+  // This function does NOT mount metadata partition. Use
+  // EnsureMetadataMounted to mount metadata partition.
   bool ExpectMetadataMounted();
 
   // Ensure /metadata is mounted. Returns true if successful, false otherwise.
@@ -277,14 +310,17 @@
   std::set<std::string> mapped_devices_;
   const FeatureFlag dynamic_partitions_;
   const FeatureFlag virtual_ab_;
+  const FeatureFlag virtual_ab_compression_;
   std::unique_ptr<android::snapshot::ISnapshotManager> snapshot_;
   std::unique_ptr<android::snapshot::AutoDevice> metadata_device_;
   bool target_supports_snapshot_ = false;
   // Whether the target partitions should be loaded as dynamic partitions. Set
   // by PreparePartitionsForUpdate() per each update.
   bool is_target_dynamic_ = false;
+
   uint32_t source_slot_ = UINT32_MAX;
   uint32_t target_slot_ = UINT32_MAX;
+  std::vector<std::string> dynamic_partition_list_;
 
   DISALLOW_COPY_AND_ASSIGN(DynamicPartitionControlAndroid);
 };
diff --git a/aosp/dynamic_partition_control_android_unittest.cc b/aosp/dynamic_partition_control_android_unittest.cc
index 5d6463b..af5ae2c 100644
--- a/aosp/dynamic_partition_control_android_unittest.cc
+++ b/aosp/dynamic_partition_control_android_unittest.cc
@@ -16,6 +16,7 @@
 
 #include "update_engine/aosp/dynamic_partition_control_android.h"
 
+#include <algorithm>
 #include <set>
 #include <vector>
 
@@ -27,7 +28,7 @@
 #include <libsnapshot/mock_snapshot.h>
 
 #include "update_engine/aosp/dynamic_partition_test_utils.h"
-#include "update_engine/aosp/mock_dynamic_partition_control.h"
+#include "update_engine/aosp/mock_dynamic_partition_control_android.h"
 #include "update_engine/common/mock_prefs.h"
 #include "update_engine/common/test_utils.h"
 
@@ -38,6 +39,7 @@
 using testing::_;
 using testing::AnyNumber;
 using testing::AnyOf;
+using testing::AtLeast;
 using testing::Invoke;
 using testing::NiceMock;
 using testing::Not;
@@ -55,6 +57,8 @@
         .WillByDefault(Return(FeatureFlag(FeatureFlag::Value::LAUNCH)));
     ON_CALL(dynamicControl(), GetVirtualAbFeatureFlag())
         .WillByDefault(Return(FeatureFlag(FeatureFlag::Value::NONE)));
+    ON_CALL(dynamicControl(), GetVirtualAbCompressionFeatureFlag())
+        .WillByDefault(Return(FeatureFlag(FeatureFlag::Value::NONE)));
 
     ON_CALL(dynamicControl(), GetDeviceDir(_))
         .WillByDefault(Invoke([](auto path) {
@@ -217,6 +221,8 @@
   void SetUp() override {
     DynamicPartitionControlAndroidTest::SetUp();
     SetSlots(GetParam());
+    dynamicControl().SetSourceSlot(source());
+    dynamicControl().SetTargetSlot(target());
   }
 };
 
@@ -386,6 +392,84 @@
   EXPECT_EQ(GetDevice(T("bar")), bar_device);
 }
 
+TEST_P(DynamicPartitionControlAndroidTestP, GetMountableDevicePath) {
+  ON_CALL(dynamicControl(), GetDynamicPartitionsFeatureFlag())
+      .WillByDefault(Return(FeatureFlag(FeatureFlag::Value::LAUNCH)));
+  ON_CALL(dynamicControl(), GetVirtualAbFeatureFlag())
+      .WillByDefault(Return(FeatureFlag(FeatureFlag::Value::LAUNCH)));
+  ON_CALL(dynamicControl(), GetVirtualAbCompressionFeatureFlag())
+      .WillByDefault(Return(FeatureFlag(FeatureFlag::Value::NONE)));
+  ON_CALL(dynamicControl(), IsDynamicPartition(_)).WillByDefault(Return(true));
+
+  EXPECT_CALL(dynamicControl(),
+              DeviceExists(AnyOf(GetDevice(S("vendor")),
+                                 GetDevice(T("vendor")),
+                                 GetDevice(S("system")),
+                                 GetDevice(T("system")))))
+      .WillRepeatedly(Return(true));
+  EXPECT_CALL(
+      dynamicControl(),
+      GetState(AnyOf(S("vendor"), T("vendor"), S("system"), T("system"))))
+      .WillRepeatedly(Return(DmDeviceState::ACTIVE));
+
+  SetMetadata(source(), {{S("system"), 2_GiB}, {S("vendor"), 1_GiB}});
+  SetMetadata(target(), {{T("system"), 2_GiB}, {T("vendor"), 1_GiB}});
+  std::string device;
+  ASSERT_TRUE(dynamicControl().GetPartitionDevice(
+      "system", source(), source(), &device));
+  ASSERT_EQ(GetDmDevice(S("system")), device);
+
+  ASSERT_TRUE(dynamicControl().GetPartitionDevice(
+      "system", target(), source(), &device));
+  ASSERT_EQ(GetDevice(T("system")), device);
+
+  // If VABC is disabled, mountable device path should be same as device path.
+  auto device_info =
+      dynamicControl().GetPartitionDevice("system", target(), source());
+  ASSERT_TRUE(device_info.has_value());
+  ASSERT_EQ(device_info->mountable_device_path, device);
+}
+
+TEST_P(DynamicPartitionControlAndroidTestP, GetMountableDevicePathVABC) {
+  ON_CALL(dynamicControl(), GetDynamicPartitionsFeatureFlag())
+      .WillByDefault(Return(FeatureFlag(FeatureFlag::Value::LAUNCH)));
+  ON_CALL(dynamicControl(), GetVirtualAbFeatureFlag())
+      .WillByDefault(Return(FeatureFlag(FeatureFlag::Value::LAUNCH)));
+  ON_CALL(dynamicControl(), GetVirtualAbCompressionFeatureFlag())
+      .WillByDefault(Return(FeatureFlag(FeatureFlag::Value::LAUNCH)));
+  EXPECT_CALL(dynamicControl(), IsDynamicPartition(_))
+      .Times(AtLeast(1))
+      .WillRepeatedly(Return(true));
+
+  EXPECT_CALL(dynamicControl(),
+              DeviceExists(AnyOf(GetDevice(S("vendor")),
+                                 GetDevice(T("vendor")),
+                                 GetDevice(S("system")),
+                                 GetDevice(T("system")))))
+      .WillRepeatedly(Return(true));
+  EXPECT_CALL(
+      dynamicControl(),
+      GetState(AnyOf(S("vendor"), T("vendor"), S("system"), T("system"))))
+      .WillRepeatedly(Return(DmDeviceState::ACTIVE));
+
+  SetMetadata(source(), {{S("system"), 2_GiB}, {S("vendor"), 1_GiB}});
+  SetMetadata(target(), {{T("system"), 2_GiB}, {T("vendor"), 1_GiB}});
+
+  std::string device;
+  ASSERT_TRUE(dynamicControl().GetPartitionDevice(
+      "system", source(), source(), &device));
+  ASSERT_EQ(GetDmDevice(S("system")), device);
+
+  ASSERT_TRUE(dynamicControl().GetPartitionDevice(
+      "system", target(), source(), &device));
+  ASSERT_EQ("", device);
+
+  auto device_info =
+      dynamicControl().GetPartitionDevice("system", target(), source());
+  ASSERT_TRUE(device_info.has_value());
+  ASSERT_EQ(device_info->mountable_device_path, GetDevice(T("system")));
+}
+
 TEST_P(DynamicPartitionControlAndroidTestP,
        GetPartitionDeviceWhenResumingUpdate) {
   // Static partition bar_{a,b} exists.
diff --git a/aosp/dynamic_partition_test_utils.h b/aosp/dynamic_partition_test_utils.h
index c7be1cb..c518382 100644
--- a/aosp/dynamic_partition_test_utils.h
+++ b/aosp/dynamic_partition_test_utils.h
@@ -47,7 +47,7 @@
 
 constexpr const uint32_t kMaxNumSlots = 2;
 constexpr const char* kSlotSuffixes[kMaxNumSlots] = {"_a", "_b"};
-constexpr const char* kFakeDevicePath = "/fake/dev/path/";
+constexpr std::string_view kFakeDevicePath = "/fake/dev/path/";
 constexpr const char* kFakeDmDevicePath = "/fake/dm/dev/path/";
 constexpr const uint32_t kFakeMetadataSize = 65536;
 constexpr const char* kDefaultGroup = "foo";
@@ -112,7 +112,7 @@
 }
 
 inline std::string GetDevice(const std::string& name) {
-  return kFakeDevicePath + name;
+  return std::string(kFakeDevicePath) + name;
 }
 
 inline std::string GetDmDevice(const std::string& name) {
diff --git a/aosp/hardware_android.cc b/aosp/hardware_android.cc
index 3b0d9a8..6f884d4 100644
--- a/aosp/hardware_android.cc
+++ b/aosp/hardware_android.cc
@@ -17,18 +17,16 @@
 #include "update_engine/aosp/hardware_android.h"
 
 #include <sys/types.h>
-#include <sys/utsname.h>
 
 #include <memory>
 #include <string>
 #include <string_view>
 
+#include <android/sysprop/GkiProperties.sysprop.h>
 #include <android-base/parseint.h>
 #include <android-base/properties.h>
 #include <base/files/file_util.h>
 #include <bootloader_message/bootloader_message.h>
-#include <kver/kernel_release.h>
-#include <kver/utils.h>
 
 #include "update_engine/common/error_code_utils.h"
 #include "update_engine/common/hardware.h"
@@ -38,8 +36,6 @@
 using android::base::GetBoolProperty;
 using android::base::GetIntProperty;
 using android::base::GetProperty;
-using android::kver::IsKernelUpdateValid;
-using android::kver::KernelRelease;
 using std::string;
 
 namespace chromeos_update_engine {
@@ -59,6 +55,19 @@
                                     "");
 }
 
+ErrorCode IsTimestampNewerLogged(const std::string& partition_name,
+                                 const std::string& old_version,
+                                 const std::string& new_version) {
+  auto error_code = utils::IsTimestampNewer(old_version, new_version);
+  if (error_code != ErrorCode::kSuccess) {
+    LOG(WARNING) << "Timestamp check failed with "
+                 << utils::ErrorCodeToString(error_code) << ": "
+                 << partition_name << " Partition timestamp: " << old_version
+                 << " Update timestamp: " << new_version;
+  }
+  return error_code;
+}
+
 }  // namespace
 
 namespace hardware {
@@ -222,23 +231,19 @@
 }
 
 void HardwareAndroid::SetWarmReset(bool warm_reset) {
-  constexpr char warm_reset_prop[] = "ota.warm_reset";
-  if (!android::base::SetProperty(warm_reset_prop, warm_reset ? "1" : "0")) {
-    LOG(WARNING) << "Failed to set prop " << warm_reset_prop;
+  if constexpr (!constants::kIsRecovery) {
+    constexpr char warm_reset_prop[] = "ota.warm_reset";
+    if (!android::base::SetProperty(warm_reset_prop, warm_reset ? "1" : "0")) {
+      LOG(WARNING) << "Failed to set prop " << warm_reset_prop;
+    }
   }
 }
 
 string HardwareAndroid::GetVersionForLogging(
     const string& partition_name) const {
   if (partition_name == "boot") {
-    struct utsname buf;
-    if (uname(&buf) != 0) {
-      PLOG(ERROR) << "Unable to call uname()";
-      return "";
-    }
-    auto kernel_release =
-        KernelRelease::Parse(buf.release, true /* allow_suffix */);
-    return kernel_release.has_value() ? kernel_release->string() : "";
+    // ro.bootimage.build.date.utc
+    return GetPartitionBuildDate("bootimage");
   }
   return GetPartitionBuildDate(partition_name);
 }
@@ -246,50 +251,33 @@
 ErrorCode HardwareAndroid::IsPartitionUpdateValid(
     const string& partition_name, const string& new_version) const {
   if (partition_name == "boot") {
-    struct utsname buf;
-    if (uname(&buf) != 0) {
-      PLOG(ERROR) << "Unable to call uname()";
-      return ErrorCode::kError;
+    const auto old_version = GetPartitionBuildDate("bootimage");
+    auto error_code =
+        IsTimestampNewerLogged(partition_name, old_version, new_version);
+    if (error_code == ErrorCode::kPayloadTimestampError) {
+      bool prevent_downgrade =
+          android::sysprop::GkiProperties::prevent_downgrade_version().value_or(
+              false);
+      if (!prevent_downgrade) {
+        LOG(WARNING) << "Downgrade of boot image is detected, but permitting "
+                        "update because device does not prevent boot image "
+                        "downgrade";
+        // If prevent_downgrade_version sysprop is not explicitly set, permit
+        // downgrade in boot image version.
+        // Even though error_code is overridden here, always call
+        // IsTimestampNewerLogged to produce log messages.
+        error_code = ErrorCode::kSuccess;
+      }
     }
-    return IsKernelUpdateValid(buf.release, new_version);
+    return error_code;
   }
 
   const auto old_version = GetPartitionBuildDate(partition_name);
   // TODO(zhangkelvin)  for some partitions, missing a current timestamp should
   // be an error, e.g. system, vendor, product etc.
-  auto error_code = utils::IsTimestampNewer(old_version, new_version);
-  if (error_code != ErrorCode::kSuccess) {
-    LOG(ERROR) << "Timestamp check failed with "
-               << utils::ErrorCodeToString(error_code)
-               << " Partition timestamp: " << old_version
-               << " Update timestamp: " << new_version;
-  }
+  auto error_code =
+      IsTimestampNewerLogged(partition_name, old_version, new_version);
   return error_code;
 }
 
-ErrorCode HardwareAndroid::IsKernelUpdateValid(const string& old_release,
-                                               const string& new_release) {
-  // Check that the package either contain an empty version (indicating that the
-  // new build does not use GKI), or a valid GKI kernel release.
-  std::optional<KernelRelease> new_kernel_release;
-  if (new_release.empty()) {
-    LOG(INFO) << "New build does not contain GKI.";
-  } else {
-    new_kernel_release =
-        KernelRelease::Parse(new_release, true /* allow_suffix */);
-    if (!new_kernel_release.has_value()) {
-      LOG(ERROR) << "New kernel release is not valid GKI kernel release: "
-                 << new_release;
-      return ErrorCode::kDownloadManifestParseError;
-    }
-  }
-
-  auto old_kernel_release =
-      KernelRelease::Parse(old_release, true /* allow_suffix */);
-  return android::kver::IsKernelUpdateValid(old_kernel_release,
-                                            new_kernel_release)
-             ? ErrorCode::kSuccess
-             : ErrorCode::kPayloadTimestampError;
-}
-
 }  // namespace chromeos_update_engine
diff --git a/aosp/hardware_android.h b/aosp/hardware_android.h
index 5e09fb3..5ffd7c5 100644
--- a/aosp/hardware_android.h
+++ b/aosp/hardware_android.h
@@ -22,7 +22,6 @@
 
 #include <base/macros.h>
 #include <base/time/time.h>
-#include <gtest/gtest_prod.h>
 
 #include "update_engine/common/error_code.h"
 #include "update_engine/common/hardware.h"
@@ -66,12 +65,6 @@
       const std::string& new_version) const override;
 
  private:
-  FRIEND_TEST(HardwareAndroidTest, IsKernelUpdateValid);
-
-  // Helper for IsPartitionUpdateValid.
-  static ErrorCode IsKernelUpdateValid(const std::string& old_release,
-                                       const std::string& new_release);
-
   DISALLOW_COPY_AND_ASSIGN(HardwareAndroid);
 };
 
diff --git a/aosp/mock_dynamic_partition_control.h b/aosp/mock_dynamic_partition_control_android.h
similarity index 81%
rename from aosp/mock_dynamic_partition_control.h
rename to aosp/mock_dynamic_partition_control_android.h
index 382106e..1d4bb14 100644
--- a/aosp/mock_dynamic_partition_control.h
+++ b/aosp/mock_dynamic_partition_control_android.h
@@ -22,6 +22,10 @@
 
 #include <gmock/gmock.h>
 
+#include <libsnapshot/cow_writer.h>
+#include <libsnapshot/snapshot_writer.h>
+
+#include "payload_consumer/file_descriptor.h"
 #include "update_engine/aosp/dynamic_partition_control_android.h"
 #include "update_engine/common/boot_control_interface.h"
 #include "update_engine/common/dynamic_partition_control_interface.h"
@@ -66,6 +70,7 @@
   MOCK_METHOD(FeatureFlag, GetDynamicPartitionsFeatureFlag, (), (override));
   MOCK_METHOD(std::string, GetSuperPartitionName, (uint32_t), (override));
   MOCK_METHOD(FeatureFlag, GetVirtualAbFeatureFlag, (), (override));
+  MOCK_METHOD(FeatureFlag, GetVirtualAbCompressionFeatureFlag, (), (override));
   MOCK_METHOD(bool, FinishUpdate, (bool), (override));
   MOCK_METHOD(bool,
               GetSystemOtherPath,
@@ -81,6 +86,21 @@
               PrepareDynamicPartitionsForUpdate,
               (uint32_t, uint32_t, const DeltaArchiveManifest&, bool),
               (override));
+  MOCK_METHOD(std::unique_ptr<android::snapshot::ISnapshotWriter>,
+              OpenCowWriter,
+              (const std::string& unsuffixed_partition_name,
+               const std::optional<std::string>& source_path,
+               bool is_append),
+              (override));
+  MOCK_METHOD(FileDescriptorPtr,
+              OpenCowReader,
+              (const std::string& unsuffixed_partition_name,
+               const std::optional<std::string>& source_path,
+               bool is_append),
+              (override));
+  MOCK_METHOD(bool, MapAllPartitions, (), (override));
+  MOCK_METHOD(bool, UnmapAllPartitions, (), (override));
+  MOCK_METHOD(bool, IsDynamicPartition, (const std::string&), (override));
 
   void set_fake_mapped_devices(const std::set<std::string>& fake) override {
     DynamicPartitionControlAndroid::set_fake_mapped_devices(fake);
@@ -113,6 +133,8 @@
     return DynamicPartitionControlAndroid::PrepareDynamicPartitionsForUpdate(
         source_slot, target_slot, manifest, delete_source);
   }
+  using DynamicPartitionControlAndroid::SetSourceSlot;
+  using DynamicPartitionControlAndroid::SetTargetSlot;
 };
 
 }  // namespace chromeos_update_engine
diff --git a/aosp/update_attempter_android.cc b/aosp/update_attempter_android.cc
index 348f330..eb1ebe0 100644
--- a/aosp/update_attempter_android.cc
+++ b/aosp/update_attempter_android.cc
@@ -277,6 +277,7 @@
     }
   }
 
+  LOG(INFO) << "Using this install plan:";
   install_plan_.Dump();
 
   HttpFetcher* fetcher = nullptr;
@@ -506,7 +507,7 @@
         return LogAndSetError(
             error, FROM_HERE, "Failed to hash " + partition_path);
       }
-      if (!DeltaPerformer::ValidateSourceHash(
+      if (!PartitionWriter::ValidateSourceHash(
               source_hash, operation, fd, &errorcode)) {
         return false;
       }
diff --git a/common/boot_control_interface.h b/common/boot_control_interface.h
index c93de5c..3b61add 100644
--- a/common/boot_control_interface.h
+++ b/common/boot_control_interface.h
@@ -75,6 +75,11 @@
                                   Slot slot,
                                   std::string* device) const = 0;
 
+  virtual std::optional<PartitionDevice> GetPartitionDevice(
+      const std::string& partition_name,
+      uint32_t slot,
+      uint32_t current_slot,
+      bool not_in_payload = false) const = 0;
   // Returns whether the passed |slot| is marked as bootable. Returns false if
   // the slot is invalid.
   virtual bool IsSlotBootable(Slot slot) const = 0;
diff --git a/common/boot_control_stub.cc b/common/boot_control_stub.cc
index 907f670..a1cc055 100644
--- a/common/boot_control_stub.cc
+++ b/common/boot_control_stub.cc
@@ -44,6 +44,15 @@
   return false;
 }
 
+std::optional<PartitionDevice> BootControlStub::GetPartitionDevice(
+    const std::string& partition_name,
+    uint32_t slot,
+    uint32_t current_slot,
+    bool not_in_payload) const {
+  LOG(ERROR) << __FUNCTION__ << " should never be called.";
+  return {};
+}
+
 bool BootControlStub::GetPartitionDevice(const string& partition_name,
                                          Slot slot,
                                          string* device) const {
diff --git a/common/boot_control_stub.h b/common/boot_control_stub.h
index a1bdb96..dcddbae 100644
--- a/common/boot_control_stub.h
+++ b/common/boot_control_stub.h
@@ -48,6 +48,11 @@
   bool GetPartitionDevice(const std::string& partition_name,
                           BootControlInterface::Slot slot,
                           std::string* device) const override;
+  std::optional<PartitionDevice> GetPartitionDevice(
+      const std::string& partition_name,
+      uint32_t slot,
+      uint32_t current_slot,
+      bool not_in_payload = false) const override;
   bool IsSlotBootable(BootControlInterface::Slot slot) const override;
   bool MarkSlotUnbootable(BootControlInterface::Slot slot) override;
   bool SetActiveBootSlot(BootControlInterface::Slot slot) override;
diff --git a/common/constants.h b/common/constants.h
index 1e97249..64447ce 100644
--- a/common/constants.h
+++ b/common/constants.h
@@ -17,6 +17,8 @@
 #ifndef UPDATE_ENGINE_COMMON_CONSTANTS_H_
 #define UPDATE_ENGINE_COMMON_CONSTANTS_H_
 
+#include <cstdint>
+
 namespace chromeos_update_engine {
 
 // The root path of all exclusion prefs.
@@ -154,30 +156,30 @@
 } PayloadType;
 
 // Maximum number of times we'll allow using p2p for the same update payload.
-const int kMaxP2PAttempts = 10;
+constexpr int kMaxP2PAttempts = 10;
 
 // Maximum wallclock time we allow attempting to update using p2p for
 // the same update payload - five days.
-const int kMaxP2PAttemptTimeSeconds = 5 * 24 * 60 * 60;
+constexpr int kMaxP2PAttemptTimeSeconds = 5 * 24 * 60 * 60;
 
 // The maximum amount of time to spend waiting for p2p-client(1) to
 // return while waiting in line to use the LAN - six hours.
-const int kMaxP2PNetworkWaitTimeSeconds = 6 * 60 * 60;
+constexpr int kMaxP2PNetworkWaitTimeSeconds = 6 * 60 * 60;
 
 // The maximum number of payload files to keep in /var/cache/p2p.
-const int kMaxP2PFilesToKeep = 3;
+constexpr int kMaxP2PFilesToKeep = 3;
 
 // The maximum number of days to keep a p2p file;
-const int kMaxP2PFileAgeDays = 5;
+constexpr int kMaxP2PFileAgeDays = 5;
 
 // The default number of UMA buckets for metrics.
-const int kNumDefaultUmaBuckets = 50;
+constexpr int kNumDefaultUmaBuckets = 50;
 
-// General constants
-const int kNumBytesInOneMiB = 1024 * 1024;
+// General constexprants
+constexpr int kNumBytesInOneMiB = 1024 * 1024;
 
 // Number of redirects allowed when downloading.
-const int kDownloadMaxRedirects = 10;
+constexpr int kDownloadMaxRedirects = 10;
 
 // The minimum average speed that downloads must sustain...
 //
@@ -185,8 +187,8 @@
 // connectivity and we want to make as much forward progress as
 // possible. For p2p this is high (25 kB/second) since we can assume
 // high bandwidth (same LAN) and we want to fail fast.
-const int kDownloadLowSpeedLimitBps = 1;
-const int kDownloadP2PLowSpeedLimitBps = 25 * 1000;
+constexpr int kDownloadLowSpeedLimitBps = 1;
+constexpr int kDownloadP2PLowSpeedLimitBps = 25 * 1000;
 
 // ... measured over this period.
 //
@@ -195,18 +197,18 @@
 // for the workstation to generate the payload. For normal operation
 // and p2p, make this relatively low since we want to fail fast in
 // those cases.
-const int kDownloadLowSpeedTimeSeconds = 30;
-const int kDownloadDevModeLowSpeedTimeSeconds = 180;
-const int kDownloadP2PLowSpeedTimeSeconds = 60;
+constexpr int kDownloadLowSpeedTimeSeconds = 30;
+constexpr int kDownloadDevModeLowSpeedTimeSeconds = 180;
+constexpr int kDownloadP2PLowSpeedTimeSeconds = 60;
 
 // The maximum amount of HTTP server reconnect attempts.
 //
 // This is set high in order to maximize the attempt's chance of
 // succeeding. When using p2p, this is low in order to fail fast.
-const int kDownloadMaxRetryCount = 20;
-const int kDownloadMaxRetryCountOobeNotComplete = 3;
-const int kDownloadMaxRetryCountInteractive = 3;
-const int kDownloadP2PMaxRetryCount = 5;
+constexpr int kDownloadMaxRetryCount = 20;
+constexpr int kDownloadMaxRetryCountOobeNotComplete = 3;
+constexpr int kDownloadMaxRetryCountInteractive = 3;
+constexpr int kDownloadP2PMaxRetryCount = 5;
 
 // The connect timeout, in seconds.
 //
@@ -214,11 +216,19 @@
 // connectivity and we may be using HTTPS which involves complicated
 // multi-roundtrip setup. For p2p, this is set low because we can
 // the server is on the same LAN and we want to fail fast.
-const int kDownloadConnectTimeoutSeconds = 30;
-const int kDownloadP2PConnectTimeoutSeconds = 5;
+constexpr int kDownloadConnectTimeoutSeconds = 30;
+constexpr int kDownloadP2PConnectTimeoutSeconds = 5;
 
 // Size in bytes of SHA256 hash.
-const int kSHA256Size = 32;
+constexpr int kSHA256Size = 32;
+
+// A hardcoded label to mark end of all InstallOps
+// This number must be greater than number of install ops.
+// Number of install ops is bounded by number of blocks on any partition.
+// Currently, the block size is 4096. Using |kEndOfInstallLabel| of 2^48 will
+// allow partitions with 2^48 * 4096 = 2^60 bytes. That's 1024PB? Partitions on
+// android aren't getting that big any time soon.
+constexpr uint64_t kEndOfInstallLabel = (1ULL << 48);
 
 }  // namespace chromeos_update_engine
 
diff --git a/common/cow_operation_convert.cc b/common/cow_operation_convert.cc
new file mode 100644
index 0000000..6b64a9c
--- /dev/null
+++ b/common/cow_operation_convert.cc
@@ -0,0 +1,93 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/common/cow_operation_convert.h"
+
+#include <base/logging.h>
+
+#include "update_engine/payload_generator/extent_ranges.h"
+#include "update_engine/payload_generator/extent_utils.h"
+
+namespace chromeos_update_engine {
+
+std::vector<CowOperation> ConvertToCowOperations(
+    const ::google::protobuf::RepeatedPtrField<
+        ::chromeos_update_engine::InstallOperation>& operations,
+    const ::google::protobuf::RepeatedPtrField<CowMergeOperation>&
+        merge_operations) {
+  ExtentRanges merge_extents;
+  std::vector<CowOperation> converted;
+  ExtentRanges modified_extents;
+
+  // We want all CowCopy ops to be done first, before any COW_REPLACE happen.
+  // Therefore we add these ops in 2 separate loops. This is because during
+  // merge, a CowReplace might modify a block needed by CowCopy, so we always
+  // perform CowCopy first.
+
+  // This loop handles CowCopy blocks within SOURCE_COPY, and the next loop
+  // converts the leftover blocks to CowReplace?
+  for (const auto& merge_op : merge_operations) {
+    merge_extents.AddExtent(merge_op.dst_extent());
+    const auto& src_extent = merge_op.src_extent();
+    const auto& dst_extent = merge_op.dst_extent();
+    // Add blocks in reverse order to avoid merge conflicts on self-overlapping
+    // Ops.
+    // For example: SOURCE_COPY [20 - 30] -> [25 - 35] If blocks are added in
+    // forward order, then 20->25 is performed first, destroying block 25, which
+    // is neede by a later operation.
+    if (src_extent.start_block() < dst_extent.start_block()) {
+      for (uint64_t i = src_extent.num_blocks(); i > 0; i--) {
+        auto src_block = src_extent.start_block() + i - 1;
+        auto dst_block = dst_extent.start_block() + i - 1;
+        CHECK(!modified_extents.ContainsBlock(src_block))
+            << "block " << src_block << " is modified by previous CowCopy";
+        converted.push_back({CowOperation::CowCopy, src_block, dst_block});
+        modified_extents.AddBlock(dst_block);
+      }
+    } else {
+      for (uint64_t i = 0; i < src_extent.num_blocks(); i++) {
+        auto src_block = src_extent.start_block() + i;
+        auto dst_block = dst_extent.start_block() + i;
+        CHECK(!modified_extents.ContainsBlock(src_block))
+            << "block " << src_block << " is modified by previous CowCopy";
+        converted.push_back({CowOperation::CowCopy, src_block, dst_block});
+        modified_extents.AddBlock(dst_block);
+      }
+    }
+  }
+  // COW_REPLACE are added after COW_COPY, because replace might modify blocks
+  // needed by COW_COPY. Please don't merge this loop with the previous one.
+  for (const auto& operation : operations) {
+    if (operation.type() != InstallOperation::SOURCE_COPY) {
+      continue;
+    }
+    const auto& src_extents = operation.src_extents();
+    const auto& dst_extents = operation.dst_extents();
+    BlockIterator it1{src_extents};
+    BlockIterator it2{dst_extents};
+    while (!it1.is_end() && !it2.is_end()) {
+      auto src_block = *it1;
+      auto dst_block = *it2;
+      if (!merge_extents.ContainsBlock(dst_block)) {
+        converted.push_back({CowOperation::CowReplace, src_block, dst_block});
+      }
+      ++it1;
+      ++it2;
+    }
+  }
+  return converted;
+}
+}  // namespace chromeos_update_engine
diff --git a/common/cow_operation_convert.h b/common/cow_operation_convert.h
new file mode 100644
index 0000000..c0543f7
--- /dev/null
+++ b/common/cow_operation_convert.h
@@ -0,0 +1,56 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef __COW_OPERATION_CONVERT_H
+#define __COW_OPERATION_CONVERT_H
+
+#include <vector>
+
+#include <libsnapshot/cow_format.h>
+
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+struct CowOperation {
+  enum Type {
+    CowCopy = android::snapshot::kCowCopyOp,
+    CowReplace = android::snapshot::kCowReplaceOp,
+  };
+  Type op;
+  uint64_t src_block;
+  uint64_t dst_block;
+};
+
+// Convert SOURCE_COPY operations in `operations` list to a list of
+// CowOperations according to the merge sequence. This function only converts
+// SOURCE_COPY, other operations are ignored. If there's a merge conflict in
+// SOURCE_COPY operations, some blocks may be converted to COW_REPLACE instead
+// of COW_COPY.
+
+// The list returned does not necessarily preserve the order of
+// SOURCE_COPY in `operations`. The only guarantee about ordering in the
+// returned list is that if operations are applied in such order, there would be
+// no merge conflicts.
+
+// This funnction is intended to be used by delta_performer to perform
+// SOURCE_COPY operations on Virtual AB Compression devices.
+std::vector<CowOperation> ConvertToCowOperations(
+    const ::google::protobuf::RepeatedPtrField<
+        ::chromeos_update_engine::InstallOperation>& operations,
+    const ::google::protobuf::RepeatedPtrField<CowMergeOperation>&
+        merge_operations);
+}  // namespace chromeos_update_engine
+#endif
diff --git a/common/cow_operation_convert_unittest.cc b/common/cow_operation_convert_unittest.cc
new file mode 100644
index 0000000..93173fe
--- /dev/null
+++ b/common/cow_operation_convert_unittest.cc
@@ -0,0 +1,236 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <algorithm>
+#include <array>
+#include <initializer_list>
+
+#include <gtest/gtest.h>
+
+#include "update_engine/common/cow_operation_convert.h"
+#include "update_engine/payload_generator/extent_ranges.h"
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+using OperationList = ::google::protobuf::RepeatedPtrField<
+    ::chromeos_update_engine::InstallOperation>;
+using MergeOplist = ::google::protobuf::RepeatedPtrField<
+    ::chromeos_update_engine::CowMergeOperation>;
+
+std::ostream& operator<<(std::ostream& out, CowOperation::Type op) {
+  switch (op) {
+    case CowOperation::Type::CowCopy:
+      out << "CowCopy";
+      break;
+    case CowOperation::Type::CowReplace:
+      out << "CowReplace";
+      break;
+    default:
+      out << op;
+      break;
+  }
+  return out;
+}
+
+std::ostream& operator<<(std::ostream& out, const CowOperation& c) {
+  out << "{" << c.op << ", " << c.src_block << ", " << c.dst_block << "}";
+  return out;
+}
+
+class CowOperationConvertTest : public testing::Test {
+ public:
+  void VerifyCowMergeOp(const std::vector<CowOperation>& cow_ops) {
+    // Build a set of all extents covered by InstallOps.
+    ExtentRanges src_extent_set;
+    ExtentRanges dst_extent_set;
+    for (auto&& op : operations_) {
+      src_extent_set.AddRepeatedExtents(op.src_extents());
+      dst_extent_set.AddRepeatedExtents(op.dst_extents());
+    }
+    ExtentRanges modified_extents;
+    for (auto&& cow_op : cow_ops) {
+      if (cow_op.op == CowOperation::CowCopy) {
+        EXPECT_TRUE(src_extent_set.ContainsBlock(cow_op.src_block));
+        // converted operations should be conflict free.
+        EXPECT_FALSE(modified_extents.ContainsBlock(cow_op.src_block))
+            << "SOURCE_COPY operation " << cow_op
+            << " read from a modified block";
+      }
+      EXPECT_TRUE(dst_extent_set.ContainsBlock(cow_op.dst_block));
+      dst_extent_set.SubtractExtent(ExtentForRange(cow_op.dst_block, 1));
+      modified_extents.AddBlock(cow_op.dst_block);
+    }
+    // The generated CowOps should cover all extents in InstallOps.
+    EXPECT_EQ(dst_extent_set.blocks(), 0UL);
+    // It's possible that src_extent_set is non-empty, because some operations
+    // will be converted to CowReplace, and we don't count the source extent for
+    // those.
+  }
+  OperationList operations_;
+  MergeOplist merge_operations_;
+};
+
+void AddOperation(OperationList* operations,
+                  ::chromeos_update_engine::InstallOperation_Type op_type,
+                  std::initializer_list<std::array<int, 2>> src_extents,
+                  std::initializer_list<std::array<int, 2>> dst_extents) {
+  auto&& op = operations->Add();
+  op->set_type(op_type);
+  for (const auto& extent : src_extents) {
+    *op->add_src_extents() = ExtentForRange(extent[0], extent[1]);
+  }
+  for (const auto& extent : dst_extents) {
+    *op->add_dst_extents() = ExtentForRange(extent[0], extent[1]);
+  }
+}
+
+void AddMergeOperation(MergeOplist* operations,
+                       ::chromeos_update_engine::CowMergeOperation_Type op_type,
+                       std::array<int, 2> src_extent,
+                       std::array<int, 2> dst_extent) {
+  auto&& op = operations->Add();
+  op->set_type(op_type);
+  *op->mutable_src_extent() = ExtentForRange(src_extent[0], src_extent[1]);
+  *op->mutable_dst_extent() = ExtentForRange(dst_extent[0], dst_extent[1]);
+}
+
+TEST_F(CowOperationConvertTest, NoConflict) {
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{20, 1}}, {{30, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{10, 1}}, {{20, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{0, 1}}, {{10, 1}});
+
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {20, 1}, {30, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {10, 1}, {20, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {0, 1}, {10, 1});
+
+  auto cow_ops = ConvertToCowOperations(operations_, merge_operations_);
+  ASSERT_EQ(cow_ops.size(), 3UL);
+  ASSERT_TRUE(std::all_of(cow_ops.begin(), cow_ops.end(), [](auto&& cow_op) {
+    return cow_op.op == CowOperation::CowCopy;
+  }));
+  VerifyCowMergeOp(cow_ops);
+}
+
+TEST_F(CowOperationConvertTest, CowReplace) {
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{30, 1}}, {{0, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{20, 1}}, {{30, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{10, 1}}, {{20, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{0, 1}}, {{10, 1}});
+
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {20, 1}, {30, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {10, 1}, {20, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {0, 1}, {10, 1});
+
+  auto cow_ops = ConvertToCowOperations(operations_, merge_operations_);
+  ASSERT_EQ(cow_ops.size(), 4UL);
+  // Expect 3 COW_COPY and 1 COW_REPLACE
+  ASSERT_EQ(std::count_if(cow_ops.begin(),
+                          cow_ops.end(),
+                          [](auto&& cow_op) {
+                            return cow_op.op == CowOperation::CowCopy;
+                          }),
+            3);
+  ASSERT_EQ(std::count_if(cow_ops.begin(),
+                          cow_ops.end(),
+                          [](auto&& cow_op) {
+                            return cow_op.op == CowOperation::CowReplace;
+                          }),
+            1);
+  VerifyCowMergeOp(cow_ops);
+}
+
+TEST_F(CowOperationConvertTest, ReOrderSourceCopy) {
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{30, 1}}, {{20, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{20, 1}}, {{10, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{10, 1}}, {{0, 1}});
+
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {10, 1}, {0, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {20, 1}, {10, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {30, 1}, {20, 1});
+
+  auto cow_ops = ConvertToCowOperations(operations_, merge_operations_);
+  ASSERT_EQ(cow_ops.size(), 3UL);
+  // Expect 3 COW_COPY
+  ASSERT_TRUE(std::all_of(cow_ops.begin(), cow_ops.end(), [](auto&& cow_op) {
+    return cow_op.op == CowOperation::CowCopy;
+  }));
+  VerifyCowMergeOp(cow_ops);
+}
+
+TEST_F(CowOperationConvertTest, InterleavingSrcExtent) {
+  AddOperation(&operations_,
+               InstallOperation::SOURCE_COPY,
+               {{30, 5}, {35, 5}},
+               {{20, 10}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{20, 1}}, {{10, 1}});
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{10, 1}}, {{0, 1}});
+
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {10, 1}, {0, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {20, 1}, {10, 1});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {30, 5}, {20, 5});
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {35, 5}, {25, 5});
+
+  auto cow_ops = ConvertToCowOperations(operations_, merge_operations_);
+  // Expect 4 COW_COPY
+  ASSERT_EQ(cow_ops.size(), 12UL);
+  ASSERT_TRUE(std::all_of(cow_ops.begin(), cow_ops.end(), [](auto&& cow_op) {
+    return cow_op.op == CowOperation::CowCopy;
+  }));
+  VerifyCowMergeOp(cow_ops);
+}
+
+TEST_F(CowOperationConvertTest, SelfOverlappingOperation) {
+  AddOperation(
+      &operations_, InstallOperation::SOURCE_COPY, {{20, 10}}, {{25, 10}});
+
+  AddMergeOperation(
+      &merge_operations_, CowMergeOperation::COW_COPY, {20, 10}, {25, 10});
+
+  auto cow_ops = ConvertToCowOperations(operations_, merge_operations_);
+  // Expect 10 COW_COPY
+  ASSERT_EQ(cow_ops.size(), 10UL);
+  ASSERT_TRUE(std::all_of(cow_ops.begin(), cow_ops.end(), [](auto&& cow_op) {
+    return cow_op.op == CowOperation::CowCopy;
+  }));
+  VerifyCowMergeOp(cow_ops);
+}
+
+}  // namespace chromeos_update_engine
diff --git a/common/dynamic_partition_control_interface.h b/common/dynamic_partition_control_interface.h
index f47958d..4f46f74 100644
--- a/common/dynamic_partition_control_interface.h
+++ b/common/dynamic_partition_control_interface.h
@@ -27,10 +27,22 @@
 #include "update_engine/common/cleanup_previous_update_action_delegate.h"
 #include "update_engine/common/error_code.h"
 #include "update_engine/common/prefs_interface.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
 #include "update_engine/update_metadata.pb.h"
 
+// Forware declare for libsnapshot/snapshot_writer.h
+namespace android::snapshot {
+class ISnapshotWriter;
+}
+
 namespace chromeos_update_engine {
 
+struct PartitionDevice {
+  std::string rw_device_path;
+  std::string mountable_device_path;
+  bool is_dynamic;
+};
+
 struct FeatureFlag {
   enum class Value { NONE = 0, RETROFIT, LAUNCH };
   constexpr explicit FeatureFlag(Value value) : value_(value) {}
@@ -56,6 +68,8 @@
 
   // Return the feature flags of Virtual A/B on this device.
   virtual FeatureFlag GetVirtualAbFeatureFlag() = 0;
+  // Return the feature flags of Virtual A/B Compression on this device.
+  virtual FeatureFlag GetVirtualAbCompressionFeatureFlag() = 0;
 
   // Attempt to optimize |operation|.
   // If successful, |optimized| contains an operation with extents that
@@ -137,6 +151,26 @@
       uint32_t source_slot,
       uint32_t target_slot,
       const std::vector<std::string>& partitions) = 0;
+  // Partition name is expected to be unsuffixed. e.g. system, vendor
+  // Return an interface to write to a snapshoted partition.
+  // If `is_append` is false, then existing COW data will be overwritten.
+  // Otherwise the cow writer will be opened on APPEND mode, existing COW data
+  // is preserved.
+  virtual std::unique_ptr<android::snapshot::ISnapshotWriter> OpenCowWriter(
+      const std::string& unsuffixed_partition_name,
+      const std::optional<std::string>&,
+      bool is_append = false) = 0;
+  virtual FileDescriptorPtr OpenCowReader(
+      const std::string& unsuffixed_partition_name,
+      const std::optional<std::string>&,
+      bool is_append = false) = 0;
+
+  virtual bool IsDynamicPartition(const std::string& part_name) = 0;
+
+  // Create virtual block devices for all partitions.
+  virtual bool MapAllPartitions() = 0;
+  // Unmap virtual block devices for all partitions.
+  virtual bool UnmapAllPartitions() = 0;
 };
 
 }  // namespace chromeos_update_engine
diff --git a/common/dynamic_partition_control_stub.cc b/common/dynamic_partition_control_stub.cc
index 5a8ca43..2c6bb1b 100644
--- a/common/dynamic_partition_control_stub.cc
+++ b/common/dynamic_partition_control_stub.cc
@@ -20,6 +20,7 @@
 #include <string>
 
 #include <base/logging.h>
+#include <libsnapshot/cow_writer.h>
 
 #include "update_engine/common/dynamic_partition_control_stub.h"
 
@@ -33,6 +34,10 @@
   return FeatureFlag(FeatureFlag::Value::NONE);
 }
 
+FeatureFlag DynamicPartitionControlStub::GetVirtualAbCompressionFeatureFlag() {
+  return FeatureFlag(FeatureFlag::Value::NONE);
+}
+
 bool DynamicPartitionControlStub::OptimizeOperation(
     const std::string& partition_name,
     const InstallOperation& operation,
@@ -83,4 +88,32 @@
   return true;
 }
 
+std::unique_ptr<android::snapshot::ISnapshotWriter>
+DynamicPartitionControlStub::OpenCowWriter(
+    const std::string& /*unsuffixed_partition_name*/,
+    const std::optional<std::string>& /*source_path*/,
+    bool /*is_append*/) {
+  return nullptr;
+}
+
+FileDescriptorPtr DynamicPartitionControlStub::OpenCowReader(
+    const std::string& unsuffixed_partition_name,
+    const std::optional<std::string>&,
+    bool /*is_append */) {
+  return nullptr;
+}
+
+bool DynamicPartitionControlStub::MapAllPartitions() {
+  return false;
+}
+
+bool DynamicPartitionControlStub::UnmapAllPartitions() {
+  return false;
+}
+
+bool DynamicPartitionControlStub::IsDynamicPartition(
+    const std::string& part_name) {
+  return false;
+}
+
 }  // namespace chromeos_update_engine
diff --git a/common/dynamic_partition_control_stub.h b/common/dynamic_partition_control_stub.h
index 94dba1b..0f428ab 100644
--- a/common/dynamic_partition_control_stub.h
+++ b/common/dynamic_partition_control_stub.h
@@ -31,6 +31,7 @@
  public:
   FeatureFlag GetDynamicPartitionsFeatureFlag() override;
   FeatureFlag GetVirtualAbFeatureFlag() override;
+  FeatureFlag GetVirtualAbCompressionFeatureFlag() override;
   bool OptimizeOperation(const std::string& partition_name,
                          const InstallOperation& operation,
                          InstallOperation* optimized) override;
@@ -56,8 +57,20 @@
       uint32_t source_slot,
       uint32_t target_slot,
       const std::vector<std::string>& partitions) override;
-};
 
+  std::unique_ptr<android::snapshot::ISnapshotWriter> OpenCowWriter(
+      const std::string& unsuffixed_partition_name,
+      const std::optional<std::string>&,
+      bool is_append) override;
+  FileDescriptorPtr OpenCowReader(const std::string& unsuffixed_partition_name,
+                                  const std::optional<std::string>&,
+                                  bool is_append = false) override;
+
+  bool MapAllPartitions() override;
+  bool UnmapAllPartitions() override;
+
+  bool IsDynamicPartition(const std::string& part_name) override;
+};
 }  // namespace chromeos_update_engine
 
 #endif  // UPDATE_ENGINE_COMMON_DYNAMIC_PARTITION_CONTROL_STUB_H_
diff --git a/common/error_code.h b/common/error_code.h
index 7d9cfff..a889888 100644
--- a/common/error_code.h
+++ b/common/error_code.h
@@ -86,6 +86,7 @@
   kNotEnoughSpace = 60,
   kDeviceCorrupted = 61,
   kPackageExcludedFromUpdate = 62,
+  kPostInstallMountError = 63,
 
   // VERY IMPORTANT! When adding new error codes:
   //
diff --git a/common/error_code_utils.cc b/common/error_code_utils.cc
index cda4c7e..421544a 100644
--- a/common/error_code_utils.cc
+++ b/common/error_code_utils.cc
@@ -173,6 +173,8 @@
       return "ErrorCode::kDeviceCorrupted";
     case ErrorCode::kPackageExcludedFromUpdate:
       return "ErrorCode::kPackageExcludedFromUpdate";
+    case ErrorCode::kPostInstallMountError:
+      return "ErrorCode::kPostInstallMountError";
       // Don't add a default case to let the compiler warn about newly added
       // error codes which should be added here.
   }
diff --git a/common/fake_boot_control.h b/common/fake_boot_control.h
index 98b93e6..fc7839d 100644
--- a/common/fake_boot_control.h
+++ b/common/fake_boot_control.h
@@ -51,14 +51,16 @@
                           bool not_in_payload,
                           std::string* device,
                           bool* is_dynamic) const override {
-    if (slot >= num_slots_)
+    auto dev =
+        GetPartitionDevice(partition_name, slot, current_slot_, not_in_payload);
+    if (!dev.has_value()) {
       return false;
-    auto part_it = devices_[slot].find(partition_name);
-    if (part_it == devices_[slot].end())
-      return false;
-    *device = part_it->second;
-    if (is_dynamic != nullptr) {
-      *is_dynamic = false;
+    }
+    if (is_dynamic) {
+      *is_dynamic = dev->is_dynamic;
+    }
+    if (device) {
+      *device = dev->rw_device_path;
     }
     return true;
   }
@@ -120,6 +122,25 @@
     return dynamic_partition_control_.get();
   }
 
+  std::optional<PartitionDevice> GetPartitionDevice(
+      const std::string& partition_name,
+      uint32_t slot,
+      uint32_t current_slot,
+      bool not_in_payload = false) const override {
+    if (slot >= devices_.size()) {
+      return {};
+    }
+    auto device_path = devices_[slot].find(partition_name);
+    if (device_path == devices_[slot].end()) {
+      return {};
+    }
+    PartitionDevice device;
+    device.is_dynamic = false;
+    device.rw_device_path = device_path->second;
+    device.mountable_device_path = device.rw_device_path;
+    return device;
+  }
+
  private:
   BootControlInterface::Slot num_slots_{2};
   BootControlInterface::Slot current_slot_{0};
diff --git a/common/platform_constants.h b/common/platform_constants.h
index 243af69..c060133 100644
--- a/common/platform_constants.h
+++ b/common/platform_constants.h
@@ -58,6 +58,12 @@
 // postinstall.
 extern const char kPostinstallMountOptions[];
 
+#ifdef __ANDROID_RECOVERY__
+constexpr bool kIsRecovery = true;
+#else
+constexpr bool kIsRecovery = false;
+#endif
+
 }  // namespace constants
 }  // namespace chromeos_update_engine
 
diff --git a/common/prefs_unittest.cc b/common/prefs_unittest.cc
index e8efd8a..a5f46e5 100644
--- a/common/prefs_unittest.cc
+++ b/common/prefs_unittest.cc
@@ -130,7 +130,7 @@
 
     vector<string> fpKeys;
     EXPECT_TRUE(common_prefs_->GetSubKeys(kDlcPrefsSubDir, &fpKeys));
-    EXPECT_EQ(fpKeys.size(), 3);
+    EXPECT_EQ(fpKeys.size(), 3UL);
     EXPECT_TRUE(common_prefs_->Delete(fpKeys[0]));
     EXPECT_TRUE(common_prefs_->Delete(fpKeys[1]));
     EXPECT_TRUE(common_prefs_->Delete(fpKeys[2]));
diff --git a/common/utils.cc b/common/utils.cc
index ac9fa9c..3a89c2a 100644
--- a/common/utils.cc
+++ b/common/utils.cc
@@ -191,10 +191,10 @@
   return true;
 }
 
-bool PWriteAll(const FileDescriptorPtr& fd,
-               const void* buf,
-               size_t count,
-               off_t offset) {
+bool WriteAll(const FileDescriptorPtr& fd,
+              const void* buf,
+              size_t count,
+              off_t offset) {
   TEST_AND_RETURN_FALSE_ERRNO(fd->Seek(offset, SEEK_SET) !=
                               static_cast<off_t>(-1));
   return WriteAll(fd, buf, count);
@@ -217,11 +217,11 @@
   return true;
 }
 
-bool PReadAll(const FileDescriptorPtr& fd,
-              void* buf,
-              size_t count,
-              off_t offset,
-              ssize_t* out_bytes_read) {
+bool ReadAll(const FileDescriptorPtr& fd,
+             void* buf,
+             size_t count,
+             off_t offset,
+             ssize_t* out_bytes_read) {
   TEST_AND_RETURN_FALSE_ERRNO(fd->Seek(offset, SEEK_SET) !=
                               static_cast<off_t>(-1));
   char* c_buf = static_cast<char*>(buf);
@@ -238,6 +238,31 @@
   return true;
 }
 
+bool PReadAll(const FileDescriptorPtr& fd,
+              void* buf,
+              size_t count,
+              off_t offset,
+              ssize_t* out_bytes_read) {
+  auto old_off = fd->Seek(0, SEEK_CUR);
+  TEST_AND_RETURN_FALSE_ERRNO(old_off >= 0);
+
+  auto success = ReadAll(fd, buf, count, offset, out_bytes_read);
+  TEST_AND_RETURN_FALSE_ERRNO(fd->Seek(old_off, SEEK_SET) == old_off);
+  return success;
+}
+
+bool PWriteAll(const FileDescriptorPtr& fd,
+               const void* buf,
+               size_t count,
+               off_t offset) {
+  auto old_off = fd->Seek(0, SEEK_CUR);
+  TEST_AND_RETURN_FALSE_ERRNO(old_off >= 0);
+
+  auto success = WriteAll(fd, buf, count, offset);
+  TEST_AND_RETURN_FALSE_ERRNO(fd->Seek(old_off, SEEK_SET) == old_off);
+  return success;
+}
+
 // Append |nbytes| of content from |buf| to the vector pointed to by either
 // |vec_p| or |str_p|.
 static void AppendBytes(const uint8_t* buf,
diff --git a/common/utils.h b/common/utils.h
index 05a92be..616de06 100644
--- a/common/utils.h
+++ b/common/utils.h
@@ -18,6 +18,7 @@
 #define UPDATE_ENGINE_COMMON_UTILS_H_
 
 #include <errno.h>
+#include <sys/types.h>
 #include <time.h>
 #include <unistd.h>
 
@@ -63,6 +64,15 @@
 bool PWriteAll(int fd, const void* buf, size_t count, off_t offset);
 
 bool WriteAll(const FileDescriptorPtr& fd, const void* buf, size_t count);
+// WriteAll writes data at specified offset, but it modifies file position.
+bool WriteAll(const FileDescriptorPtr& fd,
+              const void* buf,
+              size_t count,
+              off_t off);
+
+// https://man7.org/linux/man-pages/man2/pread.2.html
+// PWriteAll writes data at specified offset, but it DOES NOT modify file
+// position. Behaves similar to linux' pwrite syscall.
 bool PWriteAll(const FileDescriptorPtr& fd,
                const void* buf,
                size_t count,
@@ -81,6 +91,16 @@
 bool PReadAll(
     int fd, void* buf, size_t count, off_t offset, ssize_t* out_bytes_read);
 
+// Reads data at specified offset, this function does change file position.
+bool ReadAll(const FileDescriptorPtr& fd,
+             void* buf,
+             size_t count,
+             off_t offset,
+             ssize_t* out_bytes_read);
+
+// https://man7.org/linux/man-pages/man2/pread.2.html
+// Reads data at specified offset, this function DOES NOT change file position.
+// Behavior is similar to linux's pread syscall.
 bool PReadAll(const FileDescriptorPtr& fd,
               void* buf,
               size_t count,
diff --git a/download_action.cc b/download_action.cc
index adae128..fa29139 100644
--- a/download_action.cc
+++ b/download_action.cc
@@ -295,6 +295,7 @@
     }
   }
 
+#ifndef __ANDROID__
   if (SystemState::Get() != nullptr) {
     const PayloadStateInterface* payload_state =
         SystemState::Get()->payload_state();
@@ -332,6 +333,7 @@
       http_fetcher_->set_connect_timeout(kDownloadP2PConnectTimeoutSeconds);
     }
   }
+#endif
 
   http_fetcher_->BeginTransfer(install_plan_.download_url);
 }
diff --git a/metrics_utils.cc b/metrics_utils.cc
index 19f274b..34da5a1 100644
--- a/metrics_utils.cc
+++ b/metrics_utils.cc
@@ -94,6 +94,7 @@
     case ErrorCode::kPostinstallRunnerError:
     case ErrorCode::kPostinstallBootedFromFirmwareB:
     case ErrorCode::kPostinstallFirmwareRONotUpdatable:
+    case ErrorCode::kPostInstallMountError:
       return metrics::AttemptResult::kPostInstallFailed;
 
     case ErrorCode::kUserCanceled:
@@ -188,6 +189,7 @@
     case ErrorCode::kOmahaResponseHandlerError:
     case ErrorCode::kFilesystemCopierError:
     case ErrorCode::kPostinstallRunnerError:
+    case ErrorCode::kPostInstallMountError:
     case ErrorCode::kPayloadMismatchedType:
     case ErrorCode::kInstallDeviceOpenError:
     case ErrorCode::kKernelDeviceOpenError:
diff --git a/payload_consumer/delta_performer.cc b/payload_consumer/delta_performer.cc
index e6ec67a..be39542 100644
--- a/payload_consumer/delta_performer.cc
+++ b/payload_consumer/delta_performer.cc
@@ -48,12 +48,14 @@
 #include "update_engine/common/prefs_interface.h"
 #include "update_engine/common/subprocess.h"
 #include "update_engine/common/terminator.h"
+#include "update_engine/common/utils.h"
 #include "update_engine/payload_consumer/bzip_extent_writer.h"
 #include "update_engine/payload_consumer/cached_file_descriptor.h"
 #include "update_engine/payload_consumer/certificate_parser_interface.h"
 #include "update_engine/payload_consumer/extent_reader.h"
 #include "update_engine/payload_consumer/extent_writer.h"
 #include "update_engine/payload_consumer/partition_update_generator_interface.h"
+#include "update_engine/payload_consumer/partition_writer.h"
 #if USE_FEC
 #include "update_engine/payload_consumer/fec_file_descriptor.h"
 #endif  // USE_FEC
@@ -79,65 +81,6 @@
 const int kUpdateStateOperationInvalid = -1;
 const int kMaxResumedUpdateFailures = 10;
 
-const uint64_t kCacheSize = 1024 * 1024;  // 1MB
-
-// Opens path for read/write. On success returns an open FileDescriptor
-// and sets *err to 0. On failure, sets *err to errno and returns nullptr.
-FileDescriptorPtr OpenFile(const char* path,
-                           int mode,
-                           bool cache_writes,
-                           int* err) {
-  // Try to mark the block device read-only based on the mode. Ignore any
-  // failure since this won't work when passing regular files.
-  bool read_only = (mode & O_ACCMODE) == O_RDONLY;
-  utils::SetBlockDeviceReadOnly(path, read_only);
-
-  FileDescriptorPtr fd(new EintrSafeFileDescriptor());
-  if (cache_writes && !read_only) {
-    fd = FileDescriptorPtr(new CachedFileDescriptor(fd, kCacheSize));
-    LOG(INFO) << "Caching writes.";
-  }
-  if (!fd->Open(path, mode, 000)) {
-    *err = errno;
-    PLOG(ERROR) << "Unable to open file " << path;
-    return nullptr;
-  }
-  *err = 0;
-  return fd;
-}
-
-// Discard the tail of the block device referenced by |fd|, from the offset
-// |data_size| until the end of the block device. Returns whether the data was
-// discarded.
-bool DiscardPartitionTail(const FileDescriptorPtr& fd, uint64_t data_size) {
-  uint64_t part_size = fd->BlockDevSize();
-  if (!part_size || part_size <= data_size)
-    return false;
-
-  struct blkioctl_request {
-    int number;
-    const char* name;
-  };
-  const vector<blkioctl_request> blkioctl_requests = {
-      {BLKDISCARD, "BLKDISCARD"},
-      {BLKSECDISCARD, "BLKSECDISCARD"},
-#ifdef BLKZEROOUT
-      {BLKZEROOUT, "BLKZEROOUT"},
-#endif
-  };
-  for (const auto& req : blkioctl_requests) {
-    int error = 0;
-    if (fd->BlkIoctl(req.number, data_size, part_size - data_size, &error) &&
-        error == 0) {
-      return true;
-    }
-    LOG(WARNING) << "Error discarding the last "
-                 << (part_size - data_size) / 1024 << " KiB using ioctl("
-                 << req.name << ")";
-  }
-  return false;
-}
-
 }  // namespace
 
 // Computes the ratio of |part| and |total|, scaled to |norm|, using integer
@@ -255,12 +198,9 @@
   if (op_result)
     return true;
 
-  size_t partition_first_op_num =
-      current_partition_ ? acc_num_operations_[current_partition_ - 1] : 0;
   LOG(ERROR) << "Failed to perform " << op_type_name << " operation "
              << next_operation_num_ << ", which is the operation "
-             << next_operation_num_ - partition_first_op_num
-             << " in partition \""
+             << GetPartitionOperationNum() << " in partition \""
              << partitions_[current_partition_].partition_name() << "\"";
   if (*error == ErrorCode::kSuccess)
     *error = ErrorCode::kDownloadOperationExecutionError;
@@ -282,33 +222,12 @@
 }
 
 int DeltaPerformer::CloseCurrentPartition() {
-  int err = 0;
-  if (source_fd_ && !source_fd_->Close()) {
-    err = errno;
-    PLOG(ERROR) << "Error closing source partition";
-    if (!err)
-      err = 1;
+  if (!partition_writer_) {
+    return 0;
   }
-  source_fd_.reset();
-  if (source_ecc_fd_ && !source_ecc_fd_->Close()) {
-    err = errno;
-    PLOG(ERROR) << "Error closing ECC source partition";
-    if (!err)
-      err = 1;
-  }
-  source_ecc_fd_.reset();
-  source_ecc_open_failure_ = false;
-  source_path_.clear();
-
-  if (target_fd_ && !target_fd_->Close()) {
-    err = errno;
-    PLOG(ERROR) << "Error closing target partition";
-    if (!err)
-      err = 1;
-  }
-  target_fd_.reset();
-  target_path_.clear();
-  return -err;
+  int err = partition_writer_->Close();
+  partition_writer_ = nullptr;
+  return err;
 }
 
 bool DeltaPerformer::OpenCurrentPartition() {
@@ -320,92 +239,29 @@
       install_plan_->partitions.size() - partitions_.size();
   const InstallPlan::Partition& install_part =
       install_plan_->partitions[num_previous_partitions + current_partition_];
+  auto dynamic_control = boot_control_->GetDynamicPartitionControl();
+  partition_writer_ = partition_writer::CreatePartitionWriter(
+      partition,
+      install_part,
+      dynamic_control,
+      block_size_,
+      interactive_,
+      IsDynamicPartition(install_part.name));
   // Open source fds if we have a delta payload, or for partitions in the
   // partial update.
   bool source_may_exist = manifest_.partial_update() ||
                           payload_->type == InstallPayloadType::kDelta;
-  // We shouldn't open the source partition in certain cases, e.g. some dynamic
-  // partitions in delta payload, partitions included in the full payload for
-  // partial updates. Use the source size as the indicator.
-  if (source_may_exist && install_part.source_size > 0) {
-    source_path_ = install_part.source_path;
-    int err;
-    source_fd_ = OpenFile(source_path_.c_str(), O_RDONLY, false, &err);
-    if (!source_fd_) {
-      LOG(ERROR) << "Unable to open source partition "
-                 << partition.partition_name() << " on slot "
-                 << BootControlInterface::SlotName(install_plan_->source_slot)
-                 << ", file " << source_path_;
-      return false;
-    }
-  }
+  const size_t partition_operation_num = GetPartitionOperationNum();
 
-  target_path_ = install_part.target_path;
-  int err;
-
-  int flags = O_RDWR;
-  if (!interactive_)
-    flags |= O_DSYNC;
-
-  LOG(INFO) << "Opening " << target_path_ << " partition with"
-            << (interactive_ ? "out" : "") << " O_DSYNC";
-
-  target_fd_ = OpenFile(target_path_.c_str(), flags, true, &err);
-  if (!target_fd_) {
-    LOG(ERROR) << "Unable to open target partition "
-               << partition.partition_name() << " on slot "
-               << BootControlInterface::SlotName(install_plan_->target_slot)
-               << ", file " << target_path_;
-    return false;
-  }
-
-  LOG(INFO) << "Applying " << partition.operations().size()
-            << " operations to partition \"" << partition.partition_name()
-            << "\"";
-
-  // Discard the end of the partition, but ignore failures.
-  DiscardPartitionTail(target_fd_, install_part.target_size);
-
+  TEST_AND_RETURN_FALSE(partition_writer_->Init(
+      install_plan_, source_may_exist, partition_operation_num));
+  CheckpointUpdateProgress(true);
   return true;
 }
 
-bool DeltaPerformer::OpenCurrentECCPartition() {
-  if (source_ecc_fd_)
-    return true;
-
-  if (source_ecc_open_failure_)
-    return false;
-
-  if (current_partition_ >= partitions_.size())
-    return false;
-
-  // No support for ECC for full payloads.
-  if (payload_->type == InstallPayloadType::kFull)
-    return false;
-
-#if USE_FEC
-  const PartitionUpdate& partition = partitions_[current_partition_];
-  size_t num_previous_partitions =
-      install_plan_->partitions.size() - partitions_.size();
-  const InstallPlan::Partition& install_part =
-      install_plan_->partitions[num_previous_partitions + current_partition_];
-  string path = install_part.source_path;
-  FileDescriptorPtr fd(new FecFileDescriptor());
-  if (!fd->Open(path.c_str(), O_RDONLY, 0)) {
-    PLOG(ERROR) << "Unable to open ECC source partition "
-                << partition.partition_name() << " on slot "
-                << BootControlInterface::SlotName(install_plan_->source_slot)
-                << ", file " << path;
-    source_ecc_open_failure_ = true;
-    return false;
-  }
-  source_ecc_fd_ = fd;
-#else
-  // No support for ECC compiled.
-  source_ecc_open_failure_ = true;
-#endif  // USE_FEC
-
-  return !source_ecc_open_failure_;
+size_t DeltaPerformer::GetPartitionOperationNum() {
+  return next_operation_num_ -
+         (current_partition_ ? acc_num_operations_[current_partition_ - 1] : 0);
 }
 
 namespace {
@@ -641,6 +497,9 @@
     // We know there are more operations to perform because we didn't reach the
     // |num_total_operations_| limit yet.
     if (next_operation_num_ >= acc_num_operations_[current_partition_]) {
+      if (partition_writer_) {
+        TEST_AND_RETURN_FALSE(partition_writer_->FinishedInstallOps());
+      }
       CloseCurrentPartition();
       // Skip until there are operations for current_partition_.
       while (next_operation_num_ >= acc_num_operations_[current_partition_]) {
@@ -651,12 +510,9 @@
         return false;
       }
     }
-    const size_t partition_operation_num =
-        next_operation_num_ -
-        (current_partition_ ? acc_num_operations_[current_partition_ - 1] : 0);
 
     const InstallOperation& op =
-        partitions_[current_partition_].operations(partition_operation_num);
+        partitions_[current_partition_].operations(GetPartitionOperationNum());
 
     CopyDataToBuffer(&c_bytes, &count, op.data_length());
 
@@ -725,10 +581,6 @@
     if (!HandleOpResult(op_result, InstallOperationTypeName(op.type()), error))
       return false;
 
-    if (!target_fd_->Flush()) {
-      return false;
-    }
-
     next_operation_num_++;
     UpdateOverallProgress(false, "Completed ");
     CheckpointUpdateProgress(false);
@@ -791,6 +643,11 @@
     }
   }
 
+  auto dynamic_control = boot_control_->GetDynamicPartitionControl();
+  CHECK_NE(dynamic_control, nullptr);
+  TEST_AND_RETURN_FALSE(dynamic_control->ListDynamicPartitionsForSlot(
+      install_plan_->target_slot, &dynamic_partitions_));
+
   // Partitions in manifest are no longer needed after preparing partitions.
   manifest_.clear_partitions();
   // TODO(xunchang) TBD: allow partial update only on devices with dynamic
@@ -995,22 +852,10 @@
 
   // Since we delete data off the beginning of the buffer as we use it,
   // the data we need should be exactly at the beginning of the buffer.
-  TEST_AND_RETURN_FALSE(buffer_offset_ == operation.data_offset());
   TEST_AND_RETURN_FALSE(buffer_.size() >= operation.data_length());
 
-  // Setup the ExtentWriter stack based on the operation type.
-  std::unique_ptr<ExtentWriter> writer = std::make_unique<DirectExtentWriter>();
-
-  if (operation.type() == InstallOperation::REPLACE_BZ) {
-    writer.reset(new BzipExtentWriter(std::move(writer)));
-  } else if (operation.type() == InstallOperation::REPLACE_XZ) {
-    writer.reset(new XzExtentWriter(std::move(writer)));
-  }
-
-  TEST_AND_RETURN_FALSE(
-      writer->Init(target_fd_, operation.dst_extents(), block_size_));
-  TEST_AND_RETURN_FALSE(writer->Write(buffer_.data(), operation.data_length()));
-
+  TEST_AND_RETURN_FALSE(partition_writer_->PerformReplaceOperation(
+      operation, buffer_.data(), buffer_.size()));
   // Update buffer
   DiscardBuffer(true, buffer_.size());
   return true;
@@ -1025,41 +870,13 @@
   TEST_AND_RETURN_FALSE(!operation.has_data_offset());
   TEST_AND_RETURN_FALSE(!operation.has_data_length());
 
-#ifdef BLKZEROOUT
-  bool attempt_ioctl = true;
-  int request =
-      (operation.type() == InstallOperation::ZERO ? BLKZEROOUT : BLKDISCARD);
-#else   // !defined(BLKZEROOUT)
-  bool attempt_ioctl = false;
-  int request = 0;
-#endif  // !defined(BLKZEROOUT)
-
-  brillo::Blob zeros;
-  for (const Extent& extent : operation.dst_extents()) {
-    const uint64_t start = extent.start_block() * block_size_;
-    const uint64_t length = extent.num_blocks() * block_size_;
-    if (attempt_ioctl) {
-      int result = 0;
-      if (target_fd_->BlkIoctl(request, start, length, &result) && result == 0)
-        continue;
-      attempt_ioctl = false;
-    }
-    // In case of failure, we fall back to writing 0 to the selected region.
-    zeros.resize(16 * block_size_);
-    for (uint64_t offset = 0; offset < length; offset += zeros.size()) {
-      uint64_t chunk_length =
-          min(length - offset, static_cast<uint64_t>(zeros.size()));
-      TEST_AND_RETURN_FALSE(utils::PWriteAll(
-          target_fd_, zeros.data(), chunk_length, start + offset));
-    }
-  }
-  return true;
+  return partition_writer_->PerformZeroOrDiscardOperation(operation);
 }
 
-bool DeltaPerformer::ValidateSourceHash(const brillo::Blob& calculated_hash,
-                                        const InstallOperation& operation,
-                                        const FileDescriptorPtr source_fd,
-                                        ErrorCode* error) {
+bool PartitionWriter::ValidateSourceHash(const brillo::Blob& calculated_hash,
+                                         const InstallOperation& operation,
+                                         const FileDescriptorPtr source_fd,
+                                         ErrorCode* error) {
   brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
                                     operation.src_sha256_hash().end());
   if (calculated_hash != expected_source_hash) {
@@ -1100,169 +917,7 @@
     TEST_AND_RETURN_FALSE(operation.src_length() % block_size_ == 0);
   if (operation.has_dst_length())
     TEST_AND_RETURN_FALSE(operation.dst_length() % block_size_ == 0);
-
-  TEST_AND_RETURN_FALSE(source_fd_ != nullptr);
-
-  // The device may optimize the SOURCE_COPY operation.
-  // Being this a device-specific optimization let DynamicPartitionController
-  // decide it the operation should be skipped.
-  const PartitionUpdate& partition = partitions_[current_partition_];
-  const auto& partition_control = boot_control_->GetDynamicPartitionControl();
-
-  InstallOperation buf;
-  bool should_optimize = partition_control->OptimizeOperation(
-      partition.partition_name(), operation, &buf);
-  const InstallOperation& optimized = should_optimize ? buf : operation;
-
-  if (operation.has_src_sha256_hash()) {
-    bool read_ok;
-    brillo::Blob source_hash;
-    brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
-                                      operation.src_sha256_hash().end());
-
-    // We fall back to use the error corrected device if the hash of the raw
-    // device doesn't match or there was an error reading the source partition.
-    // Note that this code will also fall back if writing the target partition
-    // fails.
-    if (should_optimize) {
-      // Hash operation.src_extents(), then copy optimized.src_extents to
-      // optimized.dst_extents.
-      read_ok =
-          fd_utils::ReadAndHashExtents(
-              source_fd_, operation.src_extents(), block_size_, &source_hash) &&
-          fd_utils::CopyAndHashExtents(source_fd_,
-                                       optimized.src_extents(),
-                                       target_fd_,
-                                       optimized.dst_extents(),
-                                       block_size_,
-                                       nullptr /* skip hashing */);
-    } else {
-      read_ok = fd_utils::CopyAndHashExtents(source_fd_,
-                                             operation.src_extents(),
-                                             target_fd_,
-                                             operation.dst_extents(),
-                                             block_size_,
-                                             &source_hash);
-    }
-    if (read_ok && expected_source_hash == source_hash)
-      return true;
-    LOG(WARNING) << "Source hash from RAW device mismatched, attempting to "
-                    "correct using ECC";
-    if (!OpenCurrentECCPartition()) {
-      // The following function call will return false since the source hash
-      // mismatches, but we still want to call it so it prints the appropriate
-      // log message.
-      return ValidateSourceHash(source_hash, operation, source_fd_, error);
-    }
-
-    LOG(WARNING) << "Source hash from RAW device mismatched: found "
-                 << base::HexEncode(source_hash.data(), source_hash.size())
-                 << ", expected "
-                 << base::HexEncode(expected_source_hash.data(),
-                                    expected_source_hash.size());
-    if (should_optimize) {
-      TEST_AND_RETURN_FALSE(fd_utils::ReadAndHashExtents(
-          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash));
-      TEST_AND_RETURN_FALSE(
-          fd_utils::CopyAndHashExtents(source_ecc_fd_,
-                                       optimized.src_extents(),
-                                       target_fd_,
-                                       optimized.dst_extents(),
-                                       block_size_,
-                                       nullptr /* skip hashing */));
-    } else {
-      TEST_AND_RETURN_FALSE(
-          fd_utils::CopyAndHashExtents(source_ecc_fd_,
-                                       operation.src_extents(),
-                                       target_fd_,
-                                       operation.dst_extents(),
-                                       block_size_,
-                                       &source_hash));
-    }
-    TEST_AND_RETURN_FALSE(
-        ValidateSourceHash(source_hash, operation, source_ecc_fd_, error));
-    // At this point reading from the the error corrected device worked, but
-    // reading from the raw device failed, so this is considered a recovered
-    // failure.
-    source_ecc_recovered_failures_++;
-  } else {
-    // When the operation doesn't include a source hash, we attempt the error
-    // corrected device first since we can't verify the block in the raw device
-    // at this point, but we fall back to the raw device since the error
-    // corrected device can be shorter or not available.
-
-    if (OpenCurrentECCPartition() &&
-        fd_utils::CopyAndHashExtents(source_ecc_fd_,
-                                     optimized.src_extents(),
-                                     target_fd_,
-                                     optimized.dst_extents(),
-                                     block_size_,
-                                     nullptr)) {
-      return true;
-    }
-    TEST_AND_RETURN_FALSE(fd_utils::CopyAndHashExtents(source_fd_,
-                                                       optimized.src_extents(),
-                                                       target_fd_,
-                                                       optimized.dst_extents(),
-                                                       block_size_,
-                                                       nullptr));
-  }
-  return true;
-}
-
-FileDescriptorPtr DeltaPerformer::ChooseSourceFD(
-    const InstallOperation& operation, ErrorCode* error) {
-  if (source_fd_ == nullptr) {
-    LOG(ERROR) << "ChooseSourceFD fail: source_fd_ == nullptr";
-    return nullptr;
-  }
-
-  if (!operation.has_src_sha256_hash()) {
-    // When the operation doesn't include a source hash, we attempt the error
-    // corrected device first since we can't verify the block in the raw device
-    // at this point, but we first need to make sure all extents are readable
-    // since the error corrected device can be shorter or not available.
-    if (OpenCurrentECCPartition() &&
-        fd_utils::ReadAndHashExtents(
-            source_ecc_fd_, operation.src_extents(), block_size_, nullptr)) {
-      return source_ecc_fd_;
-    }
-    return source_fd_;
-  }
-
-  brillo::Blob source_hash;
-  brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
-                                    operation.src_sha256_hash().end());
-  if (fd_utils::ReadAndHashExtents(
-          source_fd_, operation.src_extents(), block_size_, &source_hash) &&
-      source_hash == expected_source_hash) {
-    return source_fd_;
-  }
-  // We fall back to use the error corrected device if the hash of the raw
-  // device doesn't match or there was an error reading the source partition.
-  if (!OpenCurrentECCPartition()) {
-    // The following function call will return false since the source hash
-    // mismatches, but we still want to call it so it prints the appropriate
-    // log message.
-    ValidateSourceHash(source_hash, operation, source_fd_, error);
-    return nullptr;
-  }
-  LOG(WARNING) << "Source hash from RAW device mismatched: found "
-               << base::HexEncode(source_hash.data(), source_hash.size())
-               << ", expected "
-               << base::HexEncode(expected_source_hash.data(),
-                                  expected_source_hash.size());
-
-  if (fd_utils::ReadAndHashExtents(
-          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash) &&
-      ValidateSourceHash(source_hash, operation, source_ecc_fd_, error)) {
-    // At this point reading from the the error corrected device worked, but
-    // reading from the raw device failed, so this is considered a recovered
-    // failure.
-    source_ecc_recovered_failures_++;
-    return source_ecc_fd_;
-  }
-  return nullptr;
+  return partition_writer_->PerformSourceCopyOperation(operation, error);
 }
 
 bool DeltaPerformer::ExtentsToBsdiffPositionsString(
@@ -1287,69 +942,6 @@
   return true;
 }
 
-namespace {
-
-class BsdiffExtentFile : public bsdiff::FileInterface {
- public:
-  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader, size_t size)
-      : BsdiffExtentFile(std::move(reader), nullptr, size) {}
-  BsdiffExtentFile(std::unique_ptr<ExtentWriter> writer, size_t size)
-      : BsdiffExtentFile(nullptr, std::move(writer), size) {}
-
-  ~BsdiffExtentFile() override = default;
-
-  bool Read(void* buf, size_t count, size_t* bytes_read) override {
-    TEST_AND_RETURN_FALSE(reader_->Read(buf, count));
-    *bytes_read = count;
-    offset_ += count;
-    return true;
-  }
-
-  bool Write(const void* buf, size_t count, size_t* bytes_written) override {
-    TEST_AND_RETURN_FALSE(writer_->Write(buf, count));
-    *bytes_written = count;
-    offset_ += count;
-    return true;
-  }
-
-  bool Seek(off_t pos) override {
-    if (reader_ != nullptr) {
-      TEST_AND_RETURN_FALSE(reader_->Seek(pos));
-      offset_ = pos;
-    } else {
-      // For writes technically there should be no change of position, or it
-      // should be equivalent of current offset.
-      TEST_AND_RETURN_FALSE(offset_ == static_cast<uint64_t>(pos));
-    }
-    return true;
-  }
-
-  bool Close() override { return true; }
-
-  bool GetSize(uint64_t* size) override {
-    *size = size_;
-    return true;
-  }
-
- private:
-  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader,
-                   std::unique_ptr<ExtentWriter> writer,
-                   size_t size)
-      : reader_(std::move(reader)),
-        writer_(std::move(writer)),
-        size_(size),
-        offset_(0) {}
-
-  std::unique_ptr<ExtentReader> reader_;
-  std::unique_ptr<ExtentWriter> writer_;
-  uint64_t size_;
-  uint64_t offset_;
-
-  DISALLOW_COPY_AND_ASSIGN(BsdiffExtentFile);
-};
-
-}  // namespace
-
 bool DeltaPerformer::PerformSourceBsdiffOperation(
     const InstallOperation& operation, ErrorCode* error) {
   // Since we delete data off the beginning of the buffer as we use it,
@@ -1361,136 +953,20 @@
   if (operation.has_dst_length())
     TEST_AND_RETURN_FALSE(operation.dst_length() % block_size_ == 0);
 
-  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
-  TEST_AND_RETURN_FALSE(source_fd != nullptr);
-
-  auto reader = std::make_unique<DirectExtentReader>();
-  TEST_AND_RETURN_FALSE(
-      reader->Init(source_fd, operation.src_extents(), block_size_));
-  auto src_file = std::make_unique<BsdiffExtentFile>(
-      std::move(reader),
-      utils::BlocksInExtents(operation.src_extents()) * block_size_);
-
-  auto writer = std::make_unique<DirectExtentWriter>();
-  TEST_AND_RETURN_FALSE(
-      writer->Init(target_fd_, operation.dst_extents(), block_size_));
-  auto dst_file = std::make_unique<BsdiffExtentFile>(
-      std::move(writer),
-      utils::BlocksInExtents(operation.dst_extents()) * block_size_);
-
-  TEST_AND_RETURN_FALSE(bsdiff::bspatch(std::move(src_file),
-                                        std::move(dst_file),
-                                        buffer_.data(),
-                                        buffer_.size()) == 0);
+  TEST_AND_RETURN_FALSE(partition_writer_->PerformSourceBsdiffOperation(
+      operation, error, buffer_.data(), buffer_.size()));
   DiscardBuffer(true, buffer_.size());
   return true;
 }
 
-namespace {
-
-// A class to be passed to |puffpatch| for reading from |source_fd_| and writing
-// into |target_fd_|.
-class PuffinExtentStream : public puffin::StreamInterface {
- public:
-  // Constructor for creating a stream for reading from an |ExtentReader|.
-  PuffinExtentStream(std::unique_ptr<ExtentReader> reader, uint64_t size)
-      : PuffinExtentStream(std::move(reader), nullptr, size) {}
-
-  // Constructor for creating a stream for writing to an |ExtentWriter|.
-  PuffinExtentStream(std::unique_ptr<ExtentWriter> writer, uint64_t size)
-      : PuffinExtentStream(nullptr, std::move(writer), size) {}
-
-  ~PuffinExtentStream() override = default;
-
-  bool GetSize(uint64_t* size) const override {
-    *size = size_;
-    return true;
-  }
-
-  bool GetOffset(uint64_t* offset) const override {
-    *offset = offset_;
-    return true;
-  }
-
-  bool Seek(uint64_t offset) override {
-    if (is_read_) {
-      TEST_AND_RETURN_FALSE(reader_->Seek(offset));
-      offset_ = offset;
-    } else {
-      // For writes technically there should be no change of position, or it
-      // should equivalent of current offset.
-      TEST_AND_RETURN_FALSE(offset_ == offset);
-    }
-    return true;
-  }
-
-  bool Read(void* buffer, size_t count) override {
-    TEST_AND_RETURN_FALSE(is_read_);
-    TEST_AND_RETURN_FALSE(reader_->Read(buffer, count));
-    offset_ += count;
-    return true;
-  }
-
-  bool Write(const void* buffer, size_t count) override {
-    TEST_AND_RETURN_FALSE(!is_read_);
-    TEST_AND_RETURN_FALSE(writer_->Write(buffer, count));
-    offset_ += count;
-    return true;
-  }
-
-  bool Close() override { return true; }
-
- private:
-  PuffinExtentStream(std::unique_ptr<ExtentReader> reader,
-                     std::unique_ptr<ExtentWriter> writer,
-                     uint64_t size)
-      : reader_(std::move(reader)),
-        writer_(std::move(writer)),
-        size_(size),
-        offset_(0),
-        is_read_(reader_ ? true : false) {}
-
-  std::unique_ptr<ExtentReader> reader_;
-  std::unique_ptr<ExtentWriter> writer_;
-  uint64_t size_;
-  uint64_t offset_;
-  bool is_read_;
-
-  DISALLOW_COPY_AND_ASSIGN(PuffinExtentStream);
-};
-
-}  // namespace
-
 bool DeltaPerformer::PerformPuffDiffOperation(const InstallOperation& operation,
                                               ErrorCode* error) {
   // Since we delete data off the beginning of the buffer as we use it,
   // the data we need should be exactly at the beginning of the buffer.
   TEST_AND_RETURN_FALSE(buffer_offset_ == operation.data_offset());
   TEST_AND_RETURN_FALSE(buffer_.size() >= operation.data_length());
-
-  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
-  TEST_AND_RETURN_FALSE(source_fd != nullptr);
-
-  auto reader = std::make_unique<DirectExtentReader>();
-  TEST_AND_RETURN_FALSE(
-      reader->Init(source_fd, operation.src_extents(), block_size_));
-  puffin::UniqueStreamPtr src_stream(new PuffinExtentStream(
-      std::move(reader),
-      utils::BlocksInExtents(operation.src_extents()) * block_size_));
-
-  auto writer = std::make_unique<DirectExtentWriter>();
-  TEST_AND_RETURN_FALSE(
-      writer->Init(target_fd_, operation.dst_extents(), block_size_));
-  puffin::UniqueStreamPtr dst_stream(new PuffinExtentStream(
-      std::move(writer),
-      utils::BlocksInExtents(operation.dst_extents()) * block_size_));
-
-  const size_t kMaxCacheSize = 5 * 1024 * 1024;  // Total 5MB cache.
-  TEST_AND_RETURN_FALSE(puffin::PuffPatch(std::move(src_stream),
-                                          std::move(dst_stream),
-                                          buffer_.data(),
-                                          buffer_.size(),
-                                          kMaxCacheSize));
+  TEST_AND_RETURN_FALSE(partition_writer_->PerformPuffDiffOperation(
+      operation, error, buffer_.data(), buffer_.size()));
   DiscardBuffer(true, buffer_.size());
   return true;
 }
@@ -1503,11 +979,11 @@
       buffer_.begin(), buffer_.begin() + manifest_.signatures_size());
 
   // Save the signature blob because if the update is interrupted after the
-  // download phase we don't go through this path anymore. Some alternatives to
-  // consider:
+  // download phase we don't go through this path anymore. Some alternatives
+  // to consider:
   //
-  // 1. On resume, re-download the signature blob from the server and re-verify
-  // it.
+  // 1. On resume, re-download the signature blob from the server and
+  // re-verify it.
   //
   // 2. Verify the signature as soon as it's received and don't checkpoint the
   // blob and the signed sha-256 context.
@@ -1530,8 +1006,8 @@
     return utils::ReadFile(public_key_path_, out_public_key);
   }
 
-  // If this is an official build then we are not allowed to use public key from
-  // Omaha response.
+  // If this is an official build then we are not allowed to use public key
+  // from Omaha response.
   if (!hardware_->IsOfficialBuild() && !install_plan_->public_key_rsa.empty()) {
     LOG(INFO) << "Verifying using public key from Omaha response.";
     return brillo::data_encoding::Base64Decode(install_plan_->public_key_rsa,
@@ -1643,34 +1119,41 @@
 
   // Check version field for a given PartitionUpdate object. If an error
   // is encountered, set |error_code| accordingly. If downgrade is detected,
-  // |downgrade_detected| is set. Return true if the program should continue to
-  // check the next partition or not, or false if it should exit early due to
-  // errors.
+  // |downgrade_detected| is set. Return true if the program should continue
+  // to check the next partition or not, or false if it should exit early due
+  // to errors.
   auto&& timestamp_valid = [this](const PartitionUpdate& partition,
                                   bool allow_empty_version,
                                   bool* downgrade_detected) -> ErrorCode {
+    const auto& partition_name = partition.partition_name();
     if (!partition.has_version()) {
+      if (hardware_->GetVersionForLogging(partition_name).empty()) {
+        LOG(INFO) << partition_name << " does't have version, skipping "
+                  << "downgrade check.";
+        return ErrorCode::kSuccess;
+      }
+
       if (allow_empty_version) {
         return ErrorCode::kSuccess;
       }
       LOG(ERROR)
-          << "PartitionUpdate " << partition.partition_name()
-          << " does ot have a version field. Not allowed in partial updates.";
+          << "PartitionUpdate " << partition_name
+          << " doesn't have a version field. Not allowed in partial updates.";
       return ErrorCode::kDownloadManifestParseError;
     }
 
-    auto error_code = hardware_->IsPartitionUpdateValid(
-        partition.partition_name(), partition.version());
+    auto error_code =
+        hardware_->IsPartitionUpdateValid(partition_name, partition.version());
     switch (error_code) {
       case ErrorCode::kSuccess:
         break;
       case ErrorCode::kPayloadTimestampError:
         *downgrade_detected = true;
-        LOG(WARNING) << "PartitionUpdate " << partition.partition_name()
+        LOG(WARNING) << "PartitionUpdate " << partition_name
                      << " has an older version than partition on device.";
         break;
       default:
-        LOG(ERROR) << "IsPartitionUpdateValid(" << partition.partition_name()
+        LOG(ERROR) << "IsPartitionUpdateValid(" << partition_name
                    << ") returned" << utils::ErrorCodeToString(error_code);
         break;
     }
@@ -1723,10 +1206,11 @@
     const InstallOperation& operation) {
   if (!operation.data_sha256_hash().size()) {
     if (!operation.data_length()) {
-      // Operations that do not have any data blob won't have any operation hash
-      // either. So, these operations are always considered validated since the
-      // metadata that contains all the non-data-blob portions of the operation
-      // has already been validated. This is true for both HTTP and HTTPS cases.
+      // Operations that do not have any data blob won't have any operation
+      // hash either. So, these operations are always considered validated
+      // since the metadata that contains all the non-data-blob portions of
+      // the operation has already been validated. This is true for both HTTP
+      // and HTTPS cases.
       return ErrorCode::kSuccess;
     }
 
@@ -1866,8 +1350,8 @@
     return false;
 
   int64_t resumed_update_failures;
-  // Note that storing this value is optional, but if it is there it should not
-  // be more than the limit.
+  // Note that storing this value is optional, but if it is there it should
+  // not be more than the limit.
   if (prefs->GetInt64(kPrefsResumedUpdateFailures, &resumed_update_failures) &&
       resumed_update_failures > kMaxResumedUpdateFailures)
     return false;
@@ -1959,6 +1443,14 @@
       TEST_AND_RETURN_FALSE(
           prefs_->SetInt64(kPrefsUpdateStateNextDataLength, 0));
     }
+    if (partition_writer_) {
+      partition_writer_->CheckpointUpdateProgress(GetPartitionOperationNum());
+    } else {
+      CHECK_EQ(next_operation_num_, num_total_operations_)
+          << "Partition writer is null, we are expected to finish all "
+             "operations: "
+          << next_operation_num_ << "/" << num_total_operations_;
+    }
   }
   TEST_AND_RETURN_FALSE(
       prefs_->SetInt64(kPrefsUpdateStateNextOperation, next_operation_num_));
@@ -2026,4 +1518,10 @@
   return true;
 }
 
+bool DeltaPerformer::IsDynamicPartition(const std::string& part_name) {
+  return std::find(dynamic_partitions_.begin(),
+                   dynamic_partitions_.end(),
+                   part_name) != dynamic_partitions_.end();
+}
+
 }  // namespace chromeos_update_engine
diff --git a/payload_consumer/delta_performer.h b/payload_consumer/delta_performer.h
index e4b56c1..67ca6ce 100644
--- a/payload_consumer/delta_performer.h
+++ b/payload_consumer/delta_performer.h
@@ -35,6 +35,7 @@
 #include "update_engine/payload_consumer/file_descriptor.h"
 #include "update_engine/payload_consumer/file_writer.h"
 #include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_consumer/partition_writer.h"
 #include "update_engine/payload_consumer/payload_metadata.h"
 #include "update_engine/payload_consumer/payload_verifier.h"
 #include "update_engine/update_metadata.pb.h"
@@ -101,10 +102,6 @@
   // work. Returns whether the required file descriptors were successfully open.
   bool OpenCurrentPartition();
 
-  // Attempt to open the error-corrected device for the current partition.
-  // Returns whether the operation succeeded.
-  bool OpenCurrentECCPartition();
-
   // Closes the current partition file descriptors if open. Returns 0 on success
   // or -errno on error.
   int CloseCurrentPartition();
@@ -173,6 +170,12 @@
   // Return true if header parsing is finished and no errors occurred.
   bool IsHeaderParsed() const;
 
+  // Checkpoints the update progress into persistent storage to allow this
+  // update attempt to be resumed after reboot.
+  // If |force| is false, checkpoint may be throttled.
+  // Exposed for testing purposes.
+  bool CheckpointUpdateProgress(bool force);
+
   // Compare |calculated_hash| with source hash in |operation|, return false and
   // dump hash and set |error| if don't match.
   // |source_fd| is the file descriptor of the source partition.
@@ -204,9 +207,14 @@
   friend class DeltaPerformerIntegrationTest;
   FRIEND_TEST(DeltaPerformerTest, BrilloMetadataSignatureSizeTest);
   FRIEND_TEST(DeltaPerformerTest, BrilloParsePayloadMetadataTest);
-  FRIEND_TEST(DeltaPerformerTest, ChooseSourceFDTest);
   FRIEND_TEST(DeltaPerformerTest, UsePublicKeyFromResponse);
 
+  // Obtain the operation index for current partition. If all operations for
+  // current partition is are finished, return # of operations. This is mostly
+  // intended to be used by CheckpointUpdateProgress, where partition writer
+  // needs to know the current operation number to properly checkpoint update.
+  size_t GetPartitionOperationNum();
+
   // Parse and move the update instructions of all partitions into our local
   // |partitions_| variable based on the version of the payload. Requires the
   // manifest to be parsed and valid.
@@ -258,13 +266,6 @@
   bool PerformPuffDiffOperation(const InstallOperation& operation,
                                 ErrorCode* error);
 
-  // For a given operation, choose the source fd to be used (raw device or error
-  // correction device) based on the source operation hash.
-  // Returns nullptr if the source hash mismatch cannot be corrected, and set
-  // the |error| accordingly.
-  FileDescriptorPtr ChooseSourceFD(const InstallOperation& operation,
-                                   ErrorCode* error);
-
   // Extracts the payload signature message from the current |buffer_| if the
   // offset matches the one specified by the manifest. Returns whether the
   // signature was extracted.
@@ -277,11 +278,6 @@
   // accordingly.
   void DiscardBuffer(bool do_advance_offset, size_t signed_hash_buffer_size);
 
-  // Checkpoints the update progress into persistent storage to allow this
-  // update attempt to be resumed after reboot.
-  // If |force| is false, checkpoint may be throttled.
-  bool CheckpointUpdateProgress(bool force);
-
   // Primes the required update state. Returns true if the update state was
   // successfully initialized to a saved resume state or if the update is a new
   // update. Returns false otherwise.
@@ -314,6 +310,8 @@
   //   a generic error on the device.
   ErrorCode CheckTimestampError() const;
 
+  // Check if partition `part_name` is a dynamic partition.
+  bool IsDynamicPartition(const std::string& part_name);
   // Update Engine preference store.
   PrefsInterface* prefs_;
 
@@ -331,34 +329,6 @@
   // Pointer to the current payload in install_plan_.payloads.
   InstallPlan::Payload* payload_{nullptr};
 
-  // File descriptor of the source partition. Only set while updating a
-  // partition when using a delta payload.
-  FileDescriptorPtr source_fd_{nullptr};
-
-  // File descriptor of the error corrected source partition. Only set while
-  // updating partition using a delta payload for a partition where error
-  // correction is available. The size of the error corrected device is smaller
-  // than the underlying raw device, since it doesn't include the error
-  // correction blocks.
-  FileDescriptorPtr source_ecc_fd_{nullptr};
-
-  // The total number of operations that failed source hash verification but
-  // passed after falling back to the error-corrected |source_ecc_fd_| device.
-  uint64_t source_ecc_recovered_failures_{0};
-
-  // Whether opening the current partition as an error-corrected device failed.
-  // Used to avoid re-opening the same source partition if it is not actually
-  // error corrected.
-  bool source_ecc_open_failure_{false};
-
-  // File descriptor of the target partition. Only set while performing the
-  // operations of a given partition.
-  FileDescriptorPtr target_fd_{nullptr};
-
-  // Paths the |source_fd_| and |target_fd_| refer to.
-  std::string source_path_;
-  std::string target_path_;
-
   PayloadMetadata payload_metadata_;
 
   // Parsed manifest. Set after enough bytes to parse the manifest were
@@ -379,22 +349,22 @@
   // otherwise 0.
   size_t num_total_operations_{0};
 
-  // The list of partitions to update as found in the manifest major version 2.
-  // When parsing an older manifest format, the information is converted over to
-  // this format instead.
+  // The list of partitions to update as found in the manifest major
+  // version 2. When parsing an older manifest format, the information is
+  // converted over to this format instead.
   std::vector<PartitionUpdate> partitions_;
 
   // Index in the list of partitions (|partitions_| member) of the current
   // partition being processed.
   size_t current_partition_{0};
 
-  // Index of the next operation to perform in the manifest. The index is linear
-  // on the total number of operation on the manifest.
+  // Index of the next operation to perform in the manifest. The index is
+  // linear on the total number of operation on the manifest.
   size_t next_operation_num_{0};
 
   // A buffer used for accumulating downloaded data. Initially, it stores the
-  // payload metadata; once that's downloaded and parsed, it stores data for the
-  // next update operation.
+  // payload metadata; once that's downloaded and parsed, it stores data for
+  // the next update operation.
   brillo::Blob buffer_;
   // Offset of buffer_ in the binary blobs section of the update.
   uint64_t buffer_offset_{0};
@@ -436,8 +406,9 @@
   // If |true|, the update is user initiated (vs. periodic update checks).
   bool interactive_{false};
 
-  // The timeout after which we should force emitting a progress log (constant),
-  // and the actual point in time for the next forced log to be emitted.
+  // The timeout after which we should force emitting a progress log
+  // (constant), and the actual point in time for the next forced log to be
+  // emitted.
   const base::TimeDelta forced_progress_log_wait_{
       base::TimeDelta::FromSeconds(kProgressLogTimeoutSeconds)};
   base::TimeTicks forced_progress_log_time_;
@@ -448,6 +419,10 @@
       base::TimeDelta::FromSeconds(kCheckpointFrequencySeconds)};
   base::TimeTicks update_checkpoint_time_;
 
+  std::unique_ptr<PartitionWriter> partition_writer_;
+
+  // List of dynamic partitions on device.
+  std::vector<std::string> dynamic_partitions_;
   DISALLOW_COPY_AND_ASSIGN(DeltaPerformer);
 };
 
diff --git a/payload_consumer/delta_performer_unittest.cc b/payload_consumer/delta_performer_unittest.cc
index 9269882..ba387a5 100644
--- a/payload_consumer/delta_performer_unittest.cc
+++ b/payload_consumer/delta_performer_unittest.cc
@@ -59,7 +59,6 @@
 using std::vector;
 using test_utils::GetBuildArtifactsPath;
 using test_utils::kRandomString;
-using test_utils::System;
 using testing::_;
 
 extern const char* kUnittestPrivateKeyPath;
@@ -228,13 +227,13 @@
     new_part.path = "/dev/zero";
     new_part.size = 1234;
 
-    payload.AddPartition(*old_part, new_part, aops, {});
+    payload.AddPartition(*old_part, new_part, aops, {}, 0);
 
     // We include a kernel partition without operations.
     old_part->name = kPartitionNameKernel;
     new_part.name = kPartitionNameKernel;
     new_part.size = 0;
-    payload.AddPartition(*old_part, new_part, {}, {});
+    payload.AddPartition(*old_part, new_part, {}, {}, 0);
 
     ScopedTempFile payload_file("Payload-XXXXXX");
     string private_key =
@@ -418,23 +417,6 @@
     EXPECT_EQ(payload_.metadata_size, performer_.metadata_size_);
   }
 
-  // Helper function to pretend that the ECC file descriptor was already opened.
-  // Returns a pointer to the created file descriptor.
-  FakeFileDescriptor* SetFakeECCFile(size_t size) {
-    EXPECT_FALSE(performer_.source_ecc_fd_) << "source_ecc_fd_ already open.";
-    FakeFileDescriptor* ret = new FakeFileDescriptor();
-    fake_ecc_fd_.reset(ret);
-    // Call open to simulate it was already opened.
-    ret->Open("", 0);
-    ret->SetFileSize(size);
-    performer_.source_ecc_fd_ = fake_ecc_fd_;
-    return ret;
-  }
-
-  uint64_t GetSourceEccRecoveredFailures() const {
-    return performer_.source_ecc_recovered_failures_;
-  }
-
   FakePrefs prefs_;
   InstallPlan install_plan_;
   InstallPlan::Payload payload_;
@@ -660,95 +642,6 @@
   EXPECT_EQ(actual_data, ApplyPayload(payload_data, source.path(), false));
 }
 
-// Test that the error-corrected file descriptor is used to read the partition
-// since the source partition doesn't match the operation hash.
-TEST_F(DeltaPerformerTest, ErrorCorrectionSourceCopyFallbackTest) {
-  constexpr size_t kCopyOperationSize = 4 * 4096;
-  ScopedTempFile source("Source-XXXXXX");
-  // Write invalid data to the source image, which doesn't match the expected
-  // hash.
-  brillo::Blob invalid_data(kCopyOperationSize, 0x55);
-  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), invalid_data));
-
-  // Setup the fec file descriptor as the fake stream, which matches
-  // |expected_data|.
-  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize);
-  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
-
-  PartitionConfig old_part(kPartitionNameRoot);
-  old_part.path = source.path();
-  old_part.size = invalid_data.size();
-
-  brillo::Blob payload_data =
-      GenerateSourceCopyPayload(expected_data, true, &old_part);
-  EXPECT_EQ(expected_data, ApplyPayload(payload_data, source.path(), true));
-  // Verify that the fake_fec was actually used.
-  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
-  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
-}
-
-// Test that the error-corrected file descriptor is used to read a partition
-// when no hash is available for SOURCE_COPY but it falls back to the normal
-// file descriptor when the size of the error corrected one is too small.
-TEST_F(DeltaPerformerTest, ErrorCorrectionSourceCopyWhenNoHashFallbackTest) {
-  constexpr size_t kCopyOperationSize = 4 * 4096;
-  ScopedTempFile source("Source-XXXXXX");
-  // Setup the source path with the right expected data.
-  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
-  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), expected_data));
-
-  // Setup the fec file descriptor as the fake stream, with smaller data than
-  // the expected.
-  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize / 2);
-
-  PartitionConfig old_part(kPartitionNameRoot);
-  old_part.path = source.path();
-  old_part.size = expected_data.size();
-
-  // The payload operation doesn't include an operation hash.
-  brillo::Blob payload_data =
-      GenerateSourceCopyPayload(expected_data, false, &old_part);
-  EXPECT_EQ(expected_data, ApplyPayload(payload_data, source.path(), true));
-  // Verify that the fake_fec was attempted to be used. Since the file
-  // descriptor is shorter it can actually do more than one read to realize it
-  // reached the EOF.
-  EXPECT_LE(1U, fake_fec->GetReadOps().size());
-  // This fallback doesn't count as an error-corrected operation since the
-  // operation hash was not available.
-  EXPECT_EQ(0U, GetSourceEccRecoveredFailures());
-}
-
-TEST_F(DeltaPerformerTest, ChooseSourceFDTest) {
-  constexpr size_t kSourceSize = 4 * 4096;
-  ScopedTempFile source("Source-XXXXXX");
-  // Write invalid data to the source image, which doesn't match the expected
-  // hash.
-  brillo::Blob invalid_data(kSourceSize, 0x55);
-  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), invalid_data));
-
-  performer_.source_fd_ = std::make_shared<EintrSafeFileDescriptor>();
-  performer_.source_fd_->Open(source.path().c_str(), O_RDONLY);
-  performer_.block_size_ = 4096;
-
-  // Setup the fec file descriptor as the fake stream, which matches
-  // |expected_data|.
-  FakeFileDescriptor* fake_fec = SetFakeECCFile(kSourceSize);
-  brillo::Blob expected_data = FakeFileDescriptorData(kSourceSize);
-
-  InstallOperation op;
-  *(op.add_src_extents()) = ExtentForRange(0, kSourceSize / 4096);
-  brillo::Blob src_hash;
-  EXPECT_TRUE(HashCalculator::RawHashOfData(expected_data, &src_hash));
-  op.set_src_sha256_hash(src_hash.data(), src_hash.size());
-
-  ErrorCode error = ErrorCode::kSuccess;
-  EXPECT_EQ(performer_.source_ecc_fd_, performer_.ChooseSourceFD(op, &error));
-  EXPECT_EQ(ErrorCode::kSuccess, error);
-  // Verify that the fake_fec was actually used.
-  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
-  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
-}
-
 TEST_F(DeltaPerformerTest, ExtentsToByteStringTest) {
   uint64_t test[] = {1, 1, 4, 2, 0, 1};
   static_assert(base::size(test) % 2 == 0, "Array size uneven");
@@ -1176,4 +1069,30 @@
   EXPECT_EQ(kMaxSupportedMajorPayloadVersion, major_version);
 }
 
+TEST_F(DeltaPerformerTest, FullPayloadCanResumeTest) {
+  payload_.type = InstallPayloadType::kFull;
+  brillo::Blob expected_data =
+      brillo::Blob(std::begin(kRandomString), std::end(kRandomString));
+  expected_data.resize(4096);  // block size
+  vector<AnnotatedOperation> aops;
+  AnnotatedOperation aop;
+  *(aop.op.add_dst_extents()) = ExtentForRange(0, 1);
+  aop.op.set_data_offset(0);
+  aop.op.set_data_length(expected_data.size());
+  aop.op.set_type(InstallOperation::REPLACE);
+  aops.push_back(aop);
+
+  brillo::Blob payload_data = GeneratePayload(expected_data,
+                                              aops,
+                                              false,
+                                              kBrilloMajorPayloadVersion,
+                                              kFullPayloadMinorVersion);
+
+  ASSERT_EQ(expected_data, ApplyPayload(payload_data, "/dev/null", true));
+  performer_.CheckpointUpdateProgress(true);
+  const std::string payload_id = "12345";
+  prefs_.SetString(kPrefsUpdateCheckResponseHash, payload_id);
+  ASSERT_TRUE(DeltaPerformer::CanResumeUpdate(&prefs_, payload_id));
+}
+
 }  // namespace chromeos_update_engine
diff --git a/payload_consumer/extent_reader.cc b/payload_consumer/extent_reader.cc
index ad983ae..3c7329d 100644
--- a/payload_consumer/extent_reader.cc
+++ b/payload_consumer/extent_reader.cc
@@ -77,7 +77,7 @@
         std::min(count - bytes_read, cur_extent_bytes_left);
 
     ssize_t out_bytes_read;
-    TEST_AND_RETURN_FALSE(utils::PReadAll(
+    TEST_AND_RETURN_FALSE(utils::ReadAll(
         fd_,
         bytes + bytes_read,
         bytes_to_read,
diff --git a/payload_consumer/fec_file_descriptor.cc b/payload_consumer/fec_file_descriptor.cc
index de22cf3..3fee196 100644
--- a/payload_consumer/fec_file_descriptor.cc
+++ b/payload_consumer/fec_file_descriptor.cc
@@ -16,6 +16,8 @@
 
 #include "update_engine/payload_consumer/fec_file_descriptor.h"
 
+#include <base/logging.h>
+
 namespace chromeos_update_engine {
 
 bool FecFileDescriptor::Open(const char* path, int flags) {
diff --git a/payload_consumer/file_descriptor.cc b/payload_consumer/file_descriptor.cc
index 1de615c..7c69c1b 100644
--- a/payload_consumer/file_descriptor.cc
+++ b/payload_consumer/file_descriptor.cc
@@ -21,6 +21,7 @@
 #include <sys/ioctl.h>
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <unistd.h>
 
 #include <base/posix/eintr_wrapper.h>
 
@@ -28,6 +29,12 @@
 
 namespace chromeos_update_engine {
 
+EintrSafeFileDescriptor::~EintrSafeFileDescriptor() {
+  if (IsOpen()) {
+    Close();
+  }
+}
+
 bool EintrSafeFileDescriptor::Open(const char* path, int flags, mode_t mode) {
   CHECK_EQ(fd_, -1);
   return ((fd_ = HANDLE_EINTR(open(path, flags, mode))) >= 0);
@@ -125,11 +132,17 @@
 
 bool EintrSafeFileDescriptor::Flush() {
   CHECK_GE(fd_, 0);
+  // Implemented as a No-Op, as delta_performer typically uses |O_DSYNC|, except
+  // in interactive settings.
+  fsync(fd_);
   return true;
 }
 
 bool EintrSafeFileDescriptor::Close() {
   CHECK_GE(fd_, 0);
+  // https://stackoverflow.com/questions/705454/does-linux-guarantee-the-contents-of-a-file-is-flushed-to-disc-after-close
+  // |close()| doesn't imply |fsync()|, we need to do it manually.
+  fsync(fd_);
   if (IGNORE_EINTR(close(fd_)))
     return false;
   fd_ = -1;
diff --git a/payload_consumer/file_descriptor.h b/payload_consumer/file_descriptor.h
index 55f76c6..faebcc1 100644
--- a/payload_consumer/file_descriptor.h
+++ b/payload_consumer/file_descriptor.h
@@ -21,7 +21,7 @@
 #include <sys/types.h>
 #include <memory>
 
-#include <base/logging.h>
+#include <base/macros.h>
 
 // Abstraction for managing opening, reading, writing and closing of file
 // descriptors. This includes an abstract class and one standard implementation
@@ -111,6 +111,7 @@
 class EintrSafeFileDescriptor : public FileDescriptor {
  public:
   EintrSafeFileDescriptor() : fd_(-1) {}
+  ~EintrSafeFileDescriptor();
 
   // Interface methods.
   bool Open(const char* path, int flags, mode_t mode) override;
diff --git a/payload_consumer/filesystem_verifier_action.cc b/payload_consumer/filesystem_verifier_action.cc
index 61917ea..634f03f 100644
--- a/payload_consumer/filesystem_verifier_action.cc
+++ b/payload_consumer/filesystem_verifier_action.cc
@@ -20,17 +20,22 @@
 #include <fcntl.h>
 #include <sys/stat.h>
 #include <sys/types.h>
+#include <unistd.h>
 
 #include <algorithm>
 #include <cstdlib>
+#include <memory>
 #include <string>
+#include <utility>
 
 #include <base/bind.h>
-#include <brillo/data_encoding.h>
-#include <brillo/streams/file_stream.h>
 #include <base/strings/string_util.h>
+#include <brillo/data_encoding.h>
+#include <brillo/message_loops/message_loop.h>
+#include <brillo/streams/file_stream.h>
 
 #include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
 
 using brillo::data_encoding::Base64Encode;
 using std::string;
@@ -59,18 +64,19 @@
     return;
   }
   install_plan_.Dump();
-
   StartPartitionHashing();
   abort_action_completer.set_should_complete(false);
 }
 
 void FilesystemVerifierAction::TerminateProcessing() {
+  brillo::MessageLoop::current()->CancelTask(pending_task_id_);
   cancelled_ = true;
   Cleanup(ErrorCode::kSuccess);  // error code is ignored if canceled_ is true.
 }
 
 void FilesystemVerifierAction::Cleanup(ErrorCode code) {
-  src_stream_.reset();
+  read_fd_.reset();
+  write_fd_.reset();
   // This memory is not used anymore.
   buffer_.clear();
 
@@ -88,6 +94,43 @@
   }
 }
 
+bool FilesystemVerifierAction::InitializeFdVABC() {
+  const InstallPlan::Partition& partition =
+      install_plan_.partitions[partition_index_];
+
+  read_fd_ = dynamic_control_->OpenCowReader(
+      partition.name, partition.source_path, true);
+  if (!read_fd_) {
+    LOG(ERROR) << "OpenCowReader(" << partition.name << ", "
+               << partition.source_path << ") failed.";
+    return false;
+  }
+  partition_size_ = partition.target_size;
+  // TODO(b/173432386): Support Verity writes for VABC.
+  CHECK_EQ(partition.fec_size, 0U);
+  CHECK_EQ(partition.hash_tree_size, 0U);
+  return true;
+}
+
+bool FilesystemVerifierAction::InitializeFd(const std::string& part_path) {
+  read_fd_ = FileDescriptorPtr(new EintrSafeFileDescriptor());
+  if (!read_fd_->Open(part_path.c_str(), O_RDONLY)) {
+    LOG(ERROR) << "Unable to open " << part_path << " for reading.";
+    return false;
+  }
+
+  // Can't re-use |read_fd_|, as verity writer may call `seek` to modify state
+  // of a file descriptor.
+  if (ShouldWriteVerity()) {
+    write_fd_ = FileDescriptorPtr(new EintrSafeFileDescriptor());
+    if (!write_fd_->Open(part_path.c_str(), O_RDWR)) {
+      LOG(ERROR) << "Unable to open " << part_path << " for Read/Write.";
+      return false;
+    }
+  }
+  return true;
+}
+
 void FilesystemVerifierAction::StartPartitionHashing() {
   if (partition_index_ == install_plan_.partitions.size()) {
     if (!install_plan_.untouched_dynamic_partitions.empty()) {
@@ -109,7 +152,6 @@
   }
   const InstallPlan::Partition& partition =
       install_plan_.partitions[partition_index_];
-
   string part_path;
   switch (verifier_step_) {
     case VerifierStep::kVerifySourceHash:
@@ -122,44 +164,40 @@
       break;
   }
 
-  if (part_path.empty()) {
-    if (partition_size_ == 0) {
-      LOG(INFO) << "Skip hashing partition " << partition_index_ << " ("
-                << partition.name << ") because size is 0.";
-      partition_index_++;
-      StartPartitionHashing();
-      return;
-    }
-    LOG(ERROR) << "Cannot hash partition " << partition_index_ << " ("
-               << partition.name
-               << ") because its device path cannot be determined.";
-    Cleanup(ErrorCode::kFilesystemVerifierError);
-    return;
-  }
-
   LOG(INFO) << "Hashing partition " << partition_index_ << " ("
             << partition.name << ") on device " << part_path;
-
-  brillo::ErrorPtr error;
-  src_stream_ =
-      brillo::FileStream::Open(base::FilePath(part_path),
-                               brillo::Stream::AccessMode::READ,
-                               brillo::FileStream::Disposition::OPEN_EXISTING,
-                               &error);
-
-  if (!src_stream_) {
-    LOG(ERROR) << "Unable to open " << part_path << " for reading";
+  auto success = false;
+  if (dynamic_control_->GetVirtualAbCompressionFeatureFlag().IsEnabled() &&
+      dynamic_control_->IsDynamicPartition(partition.name) &&
+      verifier_step_ == VerifierStep::kVerifyTargetHash) {
+    success = InitializeFdVABC();
+  } else {
+    if (part_path.empty()) {
+      if (partition_size_ == 0) {
+        LOG(INFO) << "Skip hashing partition " << partition_index_ << " ("
+                  << partition.name << ") because size is 0.";
+        partition_index_++;
+        StartPartitionHashing();
+        return;
+      }
+      LOG(ERROR) << "Cannot hash partition " << partition_index_ << " ("
+                 << partition.name
+                 << ") because its device path cannot be determined.";
+      Cleanup(ErrorCode::kFilesystemVerifierError);
+      return;
+    }
+    success = InitializeFd(part_path);
+  }
+  if (!success) {
     Cleanup(ErrorCode::kFilesystemVerifierError);
     return;
   }
-
   buffer_.resize(kReadFileBufferSize);
   hasher_ = std::make_unique<HashCalculator>();
 
   offset_ = 0;
-  if (verifier_step_ == VerifierStep::kVerifyTargetHash &&
-      install_plan_.write_verity) {
-    if (!verity_writer_->Init(partition)) {
+  if (ShouldWriteVerity()) {
+    if (!verity_writer_->Init(partition, read_fd_, write_fd_)) {
       Cleanup(ErrorCode::kVerityCalculationError);
       return;
     }
@@ -169,6 +207,14 @@
   ScheduleRead();
 }
 
+bool FilesystemVerifierAction::ShouldWriteVerity() {
+  const InstallPlan::Partition& partition =
+      install_plan_.partitions[partition_index_];
+  return verifier_step_ == VerifierStep::kVerifyTargetHash &&
+         install_plan_.write_verity &&
+         (partition.hash_tree_size > 0 || partition.fec_size > 0);
+}
+
 void FilesystemVerifierAction::ScheduleRead() {
   const InstallPlan::Partition& partition =
       install_plan_.partitions[partition_index_];
@@ -190,22 +236,21 @@
     return;
   }
 
-  bool read_async_ok = src_stream_->ReadAsync(
-      buffer_.data(),
-      bytes_to_read,
-      base::Bind(&FilesystemVerifierAction::OnReadDoneCallback,
-                 base::Unretained(this)),
-      base::Bind(&FilesystemVerifierAction::OnReadErrorCallback,
-                 base::Unretained(this)),
-      nullptr);
-
-  if (!read_async_ok) {
+  auto bytes_read = read_fd_->Read(buffer_.data(), bytes_to_read);
+  if (bytes_read < 0) {
     LOG(ERROR) << "Unable to schedule an asynchronous read from the stream.";
     Cleanup(ErrorCode::kError);
+  } else {
+    // We could just invoke |OnReadDoneCallback()|, it works. But |PostTask|
+    // is used so that users can cancel updates.
+    pending_task_id_ = brillo::MessageLoop::current()->PostTask(
+        base::Bind(&FilesystemVerifierAction::OnReadDone,
+                   base::Unretained(this),
+                   bytes_read));
   }
 }
 
-void FilesystemVerifierAction::OnReadDoneCallback(size_t bytes_read) {
+void FilesystemVerifierAction::OnReadDone(size_t bytes_read) {
   if (cancelled_) {
     Cleanup(ErrorCode::kError);
     return;
@@ -231,8 +276,7 @@
   UpdateProgress(
       (static_cast<double>(offset_) / partition_size_ + partition_index_) /
       install_plan_.partitions.size());
-  if (verifier_step_ == VerifierStep::kVerifyTargetHash &&
-      install_plan_.write_verity) {
+  if (ShouldWriteVerity()) {
     if (!verity_writer_->Update(offset_, buffer_.data(), bytes_read)) {
       Cleanup(ErrorCode::kVerityCalculationError);
       return;
@@ -249,12 +293,6 @@
   ScheduleRead();
 }
 
-void FilesystemVerifierAction::OnReadErrorCallback(const brillo::Error* error) {
-  // TODO(deymo): Transform the read-error into an specific ErrorCode.
-  LOG(ERROR) << "Asynchronous read failed.";
-  Cleanup(ErrorCode::kError);
-}
-
 void FilesystemVerifierAction::FinishPartitionHashing() {
   if (!hasher_->Finalize()) {
     LOG(ERROR) << "Unable to finalize the hash.";
@@ -278,8 +316,8 @@
         }
         // If we have not verified source partition yet, now that the target
         // partition does not match, and it's not a full payload, we need to
-        // switch to kVerifySourceHash step to check if it's because the source
-        // partition does not match either.
+        // switch to kVerifySourceHash step to check if it's because the
+        // source partition does not match either.
         verifier_step_ = VerifierStep::kVerifySourceHash;
       } else {
         partition_index_++;
@@ -315,17 +353,22 @@
       }
       // The action will skip kVerifySourceHash step if target partition hash
       // matches, if we are in this step, it means target hash does not match,
-      // and now that the source partition hash matches, we should set the error
-      // code to reflect the error in target partition.
-      // We only need to verify the source partition which the target hash does
-      // not match, the rest of the partitions don't matter.
+      // and now that the source partition hash matches, we should set the
+      // error code to reflect the error in target partition. We only need to
+      // verify the source partition which the target hash does not match, the
+      // rest of the partitions don't matter.
       Cleanup(ErrorCode::kNewRootfsVerificationError);
       return;
   }
   // Start hashing the next partition, if any.
   hasher_.reset();
   buffer_.clear();
-  src_stream_->CloseBlocking(nullptr);
+  if (read_fd_) {
+    read_fd_.reset();
+  }
+  if (write_fd_) {
+    write_fd_.reset();
+  }
   StartPartitionHashing();
 }
 
diff --git a/payload_consumer/filesystem_verifier_action.h b/payload_consumer/filesystem_verifier_action.h
index 6a8823a..b6df4b8 100644
--- a/payload_consumer/filesystem_verifier_action.h
+++ b/payload_consumer/filesystem_verifier_action.h
@@ -24,10 +24,11 @@
 #include <string>
 #include <vector>
 
-#include <brillo/streams/stream.h>
+#include <brillo/message_loops/message_loop.h>
 
 #include "update_engine/common/action.h"
 #include "update_engine/common/hash_calculator.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
 #include "update_engine/payload_consumer/install_plan.h"
 #include "update_engine/payload_consumer/verity_writer_interface.h"
 
@@ -83,6 +84,9 @@
 
  private:
   friend class FilesystemVerifierActionTestDelegate;
+
+  // Return true if we need to write verity bytes.
+  bool ShouldWriteVerity();
   // Starts the hashing of the current partition. If there aren't any partitions
   // remaining to be hashed, it finishes the action.
   void StartPartitionHashing();
@@ -92,8 +96,7 @@
 
   // Called from the main loop when a single read from |src_stream_| succeeds or
   // fails, calling OnReadDoneCallback() and OnReadErrorCallback() respectively.
-  void OnReadDoneCallback(size_t bytes_read);
-  void OnReadErrorCallback(const brillo::Error* error);
+  void OnReadDone(size_t bytes_read);
 
   // When the read is done, finalize the hash checking of the current partition
   // and continue checking the next one.
@@ -107,6 +110,10 @@
   // Invoke delegate callback to report progress, if delegate is not null
   void UpdateProgress(double progress);
 
+  // Initialize read_fd_ and write_fd_
+  bool InitializeFd(const std::string& part_path);
+  bool InitializeFdVABC();
+
   // The type of the partition that we are verifying.
   VerifierStep verifier_step_ = VerifierStep::kVerifyTargetHash;
 
@@ -114,8 +121,15 @@
   // being hashed.
   size_t partition_index_{0};
 
-  // If not null, the FileStream used to read from the device.
-  brillo::StreamPtr src_stream_;
+  // If not null, the FileDescriptor used to read from the device.
+  // |read_fd_| and |write_fd_| will be initialized when we begin hashing a
+  // partition. They will be deallocated once we encounter an error or
+  // successfully finished hashing.
+  FileDescriptorPtr read_fd_;
+  // If not null, the FileDescriptor used to write to the device.
+  // For VABC, this will be different from |read_fd_|. For other cases
+  // this can be the same as |read_fd_|.
+  FileDescriptorPtr write_fd_;
 
   // Buffer for storing data we read.
   brillo::Blob buffer_;
@@ -144,6 +158,11 @@
   // An observer that observes progress updates of this action.
   FilesystemVerifyDelegate* delegate_{};
 
+  // Callback that should be cancelled on |TerminateProcessing|. Usually this
+  // points to pending read callbacks from async stream.
+  brillo::MessageLoop::TaskId pending_task_id_{
+      brillo::MessageLoop::kTaskIdNull};
+
   DISALLOW_COPY_AND_ASSIGN(FilesystemVerifierAction);
 };
 
diff --git a/payload_consumer/filesystem_verifier_action_unittest.cc b/payload_consumer/filesystem_verifier_action_unittest.cc
index 2c29b44..925fdab 100644
--- a/payload_consumer/filesystem_verifier_action_unittest.cc
+++ b/payload_consumer/filesystem_verifier_action_unittest.cc
@@ -72,7 +72,7 @@
     if (action->Type() == FilesystemVerifierAction::StaticType()) {
       ran_ = true;
       code_ = code;
-      EXPECT_FALSE(static_cast<FilesystemVerifierAction*>(action)->src_stream_);
+      EXPECT_FALSE(static_cast<FilesystemVerifierAction*>(action)->read_fd_);
     } else if (action->Type() ==
                ObjectCollectorAction<InstallPlan>::StaticType()) {
       auto collector_action =
@@ -384,4 +384,5 @@
   EXPECT_TRUE(delegate.ran());
   EXPECT_EQ(ErrorCode::kSuccess, delegate.code());
 }
+
 }  // namespace chromeos_update_engine
diff --git a/payload_consumer/install_plan.cc b/payload_consumer/install_plan.cc
index c399d02..39827a4 100644
--- a/payload_consumer/install_plan.cc
+++ b/payload_consumer/install_plan.cc
@@ -153,18 +153,19 @@
   for (Partition& partition : partitions) {
     if (source_slot != BootControlInterface::kInvalidSlot &&
         partition.source_size > 0) {
-      result = boot_control->GetPartitionDevice(
-                   partition.name, source_slot, &partition.source_path) &&
-               result;
+      TEST_AND_RETURN_FALSE(boot_control->GetPartitionDevice(
+          partition.name, source_slot, &partition.source_path));
     } else {
       partition.source_path.clear();
     }
 
     if (target_slot != BootControlInterface::kInvalidSlot &&
         partition.target_size > 0) {
-      result = boot_control->GetPartitionDevice(
-                   partition.name, target_slot, &partition.target_path) &&
-               result;
+      auto device = boot_control->GetPartitionDevice(
+          partition.name, target_slot, source_slot);
+      TEST_AND_RETURN_FALSE(device.has_value());
+      partition.target_path = device->rw_device_path;
+      partition.postinstall_mount_device = device->mountable_device_path;
     } else {
       partition.target_path.clear();
     }
diff --git a/payload_consumer/install_plan.h b/payload_consumer/install_plan.h
index 2026635..43b94fc 100644
--- a/payload_consumer/install_plan.h
+++ b/payload_consumer/install_plan.h
@@ -102,9 +102,17 @@
     uint64_t source_size{0};
     brillo::Blob source_hash;
 
+    // |target_path| is intended to be a path to block device, which you can
+    // open with |open| syscall and perform regular unix style read/write.
+    // For VABC, this will be empty. As you can't read/write VABC devices with
+    // regular syscall.
     std::string target_path;
+    // |mountable_target_device| is intended to be a path to block device which
+    // can be used for mounting this block device's underlying filesystem.
+    std::string postinstall_mount_device;
     uint64_t target_size{0};
     brillo::Blob target_hash;
+
     uint32_t block_size{0};
 
     // Whether we should run the postinstall script from this partition and the
diff --git a/payload_consumer/mount_history.cc b/payload_consumer/mount_history.cc
index 43a75b3..1d2ec76 100644
--- a/payload_consumer/mount_history.cc
+++ b/payload_consumer/mount_history.cc
@@ -37,7 +37,7 @@
   brillo::Blob block0_buffer(kBlockSize);
   ssize_t bytes_read;
 
-  if (!utils::PReadAll(
+  if (!utils::ReadAll(
           blockdevice_fd, block0_buffer.data(), kBlockSize, 0, &bytes_read)) {
     LOG(WARNING) << "PReadAll failed";
     return;
diff --git a/payload_consumer/partition_update_generator_android.cc b/payload_consumer/partition_update_generator_android.cc
index 25771e1..4467182 100644
--- a/payload_consumer/partition_update_generator_android.cc
+++ b/payload_consumer/partition_update_generator_android.cc
@@ -41,6 +41,11 @@
         BootControlInterface::Slot target_slot,
         const std::set<std::string>& partitions_in_payload,
         std::vector<PartitionUpdate>* update_list) {
+#ifndef __ANDROID__
+  // Skip copying partitions for host verification.
+  return true;
+#endif
+
   auto ab_partitions = GetAbPartitionsOnDevice();
   if (ab_partitions.empty()) {
     LOG(ERROR) << "Failed to load static a/b partitions";
diff --git a/payload_consumer/partition_writer.cc b/payload_consumer/partition_writer.cc
new file mode 100644
index 0000000..6f06dd2
--- /dev/null
+++ b/payload_consumer/partition_writer.cc
@@ -0,0 +1,664 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+#include <update_engine/payload_consumer/partition_writer.h>
+
+#include <fcntl.h>
+#include <linux/fs.h>
+
+#include <algorithm>
+#include <initializer_list>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include <base/strings/string_number_conversions.h>
+#include <bsdiff/bspatch.h>
+#include <puffin/puffpatch.h>
+#include <bsdiff/file_interface.h>
+#include <puffin/stream.h>
+
+#include "update_engine/common/terminator.h"
+#include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/bzip_extent_writer.h"
+#include "update_engine/payload_consumer/cached_file_descriptor.h"
+#include "update_engine/payload_consumer/extent_reader.h"
+#include "update_engine/payload_consumer/extent_writer.h"
+#include "update_engine/payload_consumer/fec_file_descriptor.h"
+#include "update_engine/payload_consumer/file_descriptor_utils.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_consumer/mount_history.h"
+#include "update_engine/payload_consumer/payload_constants.h"
+#include "update_engine/payload_consumer/xz_extent_writer.h"
+
+namespace chromeos_update_engine {
+
+namespace {
+constexpr uint64_t kCacheSize = 1024 * 1024;  // 1MB
+
+// Discard the tail of the block device referenced by |fd|, from the offset
+// |data_size| until the end of the block device. Returns whether the data was
+// discarded.
+
+bool DiscardPartitionTail(const FileDescriptorPtr& fd, uint64_t data_size) {
+  uint64_t part_size = fd->BlockDevSize();
+  if (!part_size || part_size <= data_size)
+    return false;
+
+  struct blkioctl_request {
+    int number;
+    const char* name;
+  };
+  const std::initializer_list<blkioctl_request> blkioctl_requests = {
+      {BLKDISCARD, "BLKDISCARD"},
+      {BLKSECDISCARD, "BLKSECDISCARD"},
+#ifdef BLKZEROOUT
+      {BLKZEROOUT, "BLKZEROOUT"},
+#endif
+  };
+  for (const auto& req : blkioctl_requests) {
+    int error = 0;
+    if (fd->BlkIoctl(req.number, data_size, part_size - data_size, &error) &&
+        error == 0) {
+      return true;
+    }
+    LOG(WARNING) << "Error discarding the last "
+                 << (part_size - data_size) / 1024 << " KiB using ioctl("
+                 << req.name << ")";
+  }
+  return false;
+}
+
+}  // namespace
+
+// Opens path for read/write. On success returns an open FileDescriptor
+// and sets *err to 0. On failure, sets *err to errno and returns nullptr.
+FileDescriptorPtr OpenFile(const char* path,
+                           int mode,
+                           bool cache_writes,
+                           int* err) {
+  // Try to mark the block device read-only based on the mode. Ignore any
+  // failure since this won't work when passing regular files.
+  bool read_only = (mode & O_ACCMODE) == O_RDONLY;
+  utils::SetBlockDeviceReadOnly(path, read_only);
+
+  FileDescriptorPtr fd(new EintrSafeFileDescriptor());
+  if (cache_writes && !read_only) {
+    fd = FileDescriptorPtr(new CachedFileDescriptor(fd, kCacheSize));
+    LOG(INFO) << "Caching writes.";
+  }
+  if (!fd->Open(path, mode, 000)) {
+    *err = errno;
+    PLOG(ERROR) << "Unable to open file " << path;
+    return nullptr;
+  }
+  *err = 0;
+  return fd;
+}
+
+class BsdiffExtentFile : public bsdiff::FileInterface {
+ public:
+  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader, size_t size)
+      : BsdiffExtentFile(std::move(reader), nullptr, size) {}
+  BsdiffExtentFile(std::unique_ptr<ExtentWriter> writer, size_t size)
+      : BsdiffExtentFile(nullptr, std::move(writer), size) {}
+
+  ~BsdiffExtentFile() override = default;
+
+  bool Read(void* buf, size_t count, size_t* bytes_read) override {
+    TEST_AND_RETURN_FALSE(reader_->Read(buf, count));
+    *bytes_read = count;
+    offset_ += count;
+    return true;
+  }
+
+  bool Write(const void* buf, size_t count, size_t* bytes_written) override {
+    TEST_AND_RETURN_FALSE(writer_->Write(buf, count));
+    *bytes_written = count;
+    offset_ += count;
+    return true;
+  }
+
+  bool Seek(off_t pos) override {
+    if (reader_ != nullptr) {
+      TEST_AND_RETURN_FALSE(reader_->Seek(pos));
+      offset_ = pos;
+    } else {
+      // For writes technically there should be no change of position, or it
+      // should be equivalent of current offset.
+      TEST_AND_RETURN_FALSE(offset_ == static_cast<uint64_t>(pos));
+    }
+    return true;
+  }
+
+  bool Close() override { return true; }
+
+  bool GetSize(uint64_t* size) override {
+    *size = size_;
+    return true;
+  }
+
+ private:
+  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader,
+                   std::unique_ptr<ExtentWriter> writer,
+                   size_t size)
+      : reader_(std::move(reader)),
+        writer_(std::move(writer)),
+        size_(size),
+        offset_(0) {}
+
+  std::unique_ptr<ExtentReader> reader_;
+  std::unique_ptr<ExtentWriter> writer_;
+  uint64_t size_;
+  uint64_t offset_;
+
+  DISALLOW_COPY_AND_ASSIGN(BsdiffExtentFile);
+};
+// A class to be passed to |puffpatch| for reading from |source_fd_| and writing
+// into |target_fd_|.
+class PuffinExtentStream : public puffin::StreamInterface {
+ public:
+  // Constructor for creating a stream for reading from an |ExtentReader|.
+  PuffinExtentStream(std::unique_ptr<ExtentReader> reader, uint64_t size)
+      : PuffinExtentStream(std::move(reader), nullptr, size) {}
+
+  // Constructor for creating a stream for writing to an |ExtentWriter|.
+  PuffinExtentStream(std::unique_ptr<ExtentWriter> writer, uint64_t size)
+      : PuffinExtentStream(nullptr, std::move(writer), size) {}
+
+  ~PuffinExtentStream() override = default;
+
+  bool GetSize(uint64_t* size) const override {
+    *size = size_;
+    return true;
+  }
+
+  bool GetOffset(uint64_t* offset) const override {
+    *offset = offset_;
+    return true;
+  }
+
+  bool Seek(uint64_t offset) override {
+    if (is_read_) {
+      TEST_AND_RETURN_FALSE(reader_->Seek(offset));
+      offset_ = offset;
+    } else {
+      // For writes technically there should be no change of position, or it
+      // should equivalent of current offset.
+      TEST_AND_RETURN_FALSE(offset_ == offset);
+    }
+    return true;
+  }
+
+  bool Read(void* buffer, size_t count) override {
+    TEST_AND_RETURN_FALSE(is_read_);
+    TEST_AND_RETURN_FALSE(reader_->Read(buffer, count));
+    offset_ += count;
+    return true;
+  }
+
+  bool Write(const void* buffer, size_t count) override {
+    TEST_AND_RETURN_FALSE(!is_read_);
+    TEST_AND_RETURN_FALSE(writer_->Write(buffer, count));
+    offset_ += count;
+    return true;
+  }
+
+  bool Close() override { return true; }
+
+ private:
+  PuffinExtentStream(std::unique_ptr<ExtentReader> reader,
+                     std::unique_ptr<ExtentWriter> writer,
+                     uint64_t size)
+      : reader_(std::move(reader)),
+        writer_(std::move(writer)),
+        size_(size),
+        offset_(0),
+        is_read_(reader_ ? true : false) {}
+
+  std::unique_ptr<ExtentReader> reader_;
+  std::unique_ptr<ExtentWriter> writer_;
+  uint64_t size_;
+  uint64_t offset_;
+  bool is_read_;
+
+  DISALLOW_COPY_AND_ASSIGN(PuffinExtentStream);
+};
+
+PartitionWriter::PartitionWriter(
+    const PartitionUpdate& partition_update,
+    const InstallPlan::Partition& install_part,
+    DynamicPartitionControlInterface* dynamic_control,
+    size_t block_size,
+    bool is_interactive)
+    : partition_update_(partition_update),
+      install_part_(install_part),
+      dynamic_control_(dynamic_control),
+      interactive_(is_interactive),
+      block_size_(block_size) {}
+
+PartitionWriter::~PartitionWriter() {
+  Close();
+}
+
+bool PartitionWriter::OpenSourcePartition(uint32_t source_slot,
+                                          bool source_may_exist) {
+  source_path_.clear();
+  if (!source_may_exist) {
+    return true;
+  }
+  if (install_part_.source_size > 0 && !install_part_.source_path.empty()) {
+    source_path_ = install_part_.source_path;
+    int err;
+    source_fd_ = OpenFile(source_path_.c_str(), O_RDONLY, false, &err);
+    if (source_fd_ == nullptr) {
+      LOG(ERROR) << "Unable to open source partition " << install_part_.name
+                 << " on slot " << BootControlInterface::SlotName(source_slot)
+                 << ", file " << source_path_;
+      return false;
+    }
+  }
+  return true;
+}
+
+bool PartitionWriter::Init(const InstallPlan* install_plan,
+                           bool source_may_exist,
+                           size_t next_op_index) {
+  const PartitionUpdate& partition = partition_update_;
+  uint32_t source_slot = install_plan->source_slot;
+  uint32_t target_slot = install_plan->target_slot;
+  TEST_AND_RETURN_FALSE(OpenSourcePartition(source_slot, source_may_exist));
+
+  // We shouldn't open the source partition in certain cases, e.g. some dynamic
+  // partitions in delta payload, partitions included in the full payload for
+  // partial updates. Use the source size as the indicator.
+
+  target_path_ = install_part_.target_path;
+  int err;
+
+  int flags = O_RDWR;
+  if (!interactive_)
+    flags |= O_DSYNC;
+
+  LOG(INFO) << "Opening " << target_path_ << " partition with"
+            << (interactive_ ? "out" : "") << " O_DSYNC";
+
+  target_fd_ = OpenFile(target_path_.c_str(), flags, true, &err);
+  if (!target_fd_) {
+    LOG(ERROR) << "Unable to open target partition "
+               << partition.partition_name() << " on slot "
+               << BootControlInterface::SlotName(target_slot) << ", file "
+               << target_path_;
+    return false;
+  }
+
+  LOG(INFO) << "Applying " << partition.operations().size()
+            << " operations to partition \"" << partition.partition_name()
+            << "\"";
+
+  // Discard the end of the partition, but ignore failures.
+  DiscardPartitionTail(target_fd_, install_part_.target_size);
+
+  return true;
+}
+
+bool PartitionWriter::PerformReplaceOperation(const InstallOperation& operation,
+                                              const void* data,
+                                              size_t count) {
+  // Setup the ExtentWriter stack based on the operation type.
+  std::unique_ptr<ExtentWriter> writer = CreateBaseExtentWriter();
+
+  if (operation.type() == InstallOperation::REPLACE_BZ) {
+    writer.reset(new BzipExtentWriter(std::move(writer)));
+  } else if (operation.type() == InstallOperation::REPLACE_XZ) {
+    writer.reset(new XzExtentWriter(std::move(writer)));
+  }
+
+  TEST_AND_RETURN_FALSE(
+      writer->Init(target_fd_, operation.dst_extents(), block_size_));
+  TEST_AND_RETURN_FALSE(writer->Write(data, operation.data_length()));
+
+  return true;
+}
+
+bool PartitionWriter::PerformZeroOrDiscardOperation(
+    const InstallOperation& operation) {
+#ifdef BLKZEROOUT
+  bool attempt_ioctl = true;
+  int request =
+      (operation.type() == InstallOperation::ZERO ? BLKZEROOUT : BLKDISCARD);
+#else   // !defined(BLKZEROOUT)
+  bool attempt_ioctl = false;
+  int request = 0;
+#endif  // !defined(BLKZEROOUT)
+
+  brillo::Blob zeros;
+  for (const Extent& extent : operation.dst_extents()) {
+    const uint64_t start = extent.start_block() * block_size_;
+    const uint64_t length = extent.num_blocks() * block_size_;
+    if (attempt_ioctl) {
+      int result = 0;
+      if (target_fd_->BlkIoctl(request, start, length, &result) && result == 0)
+        continue;
+      attempt_ioctl = false;
+    }
+    // In case of failure, we fall back to writing 0 to the selected region.
+    zeros.resize(16 * block_size_);
+    for (uint64_t offset = 0; offset < length; offset += zeros.size()) {
+      uint64_t chunk_length =
+          std::min(length - offset, static_cast<uint64_t>(zeros.size()));
+      TEST_AND_RETURN_FALSE(utils::WriteAll(
+          target_fd_, zeros.data(), chunk_length, start + offset));
+    }
+  }
+  return true;
+}
+
+bool PartitionWriter::PerformSourceCopyOperation(
+    const InstallOperation& operation, ErrorCode* error) {
+  TEST_AND_RETURN_FALSE(source_fd_ != nullptr);
+
+  // The device may optimize the SOURCE_COPY operation.
+  // Being this a device-specific optimization let DynamicPartitionController
+  // decide it the operation should be skipped.
+  const PartitionUpdate& partition = partition_update_;
+  const auto& partition_control = dynamic_control_;
+
+  InstallOperation buf;
+  bool should_optimize = partition_control->OptimizeOperation(
+      partition.partition_name(), operation, &buf);
+  const InstallOperation& optimized = should_optimize ? buf : operation;
+
+  if (operation.has_src_sha256_hash()) {
+    bool read_ok;
+    brillo::Blob source_hash;
+    brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
+                                      operation.src_sha256_hash().end());
+
+    // We fall back to use the error corrected device if the hash of the raw
+    // device doesn't match or there was an error reading the source partition.
+    // Note that this code will also fall back if writing the target partition
+    // fails.
+    if (should_optimize) {
+      // Hash operation.src_extents(), then copy optimized.src_extents to
+      // optimized.dst_extents.
+      read_ok =
+          fd_utils::ReadAndHashExtents(
+              source_fd_, operation.src_extents(), block_size_, &source_hash) &&
+          fd_utils::CopyAndHashExtents(source_fd_,
+                                       optimized.src_extents(),
+                                       target_fd_,
+                                       optimized.dst_extents(),
+                                       block_size_,
+                                       nullptr /* skip hashing */);
+    } else {
+      read_ok = fd_utils::CopyAndHashExtents(source_fd_,
+                                             operation.src_extents(),
+                                             target_fd_,
+                                             operation.dst_extents(),
+                                             block_size_,
+                                             &source_hash);
+    }
+    if (read_ok && expected_source_hash == source_hash)
+      return true;
+    LOG(WARNING) << "Source hash from RAW device mismatched, attempting to "
+                    "correct using ECC";
+    if (!OpenCurrentECCPartition()) {
+      // The following function call will return false since the source hash
+      // mismatches, but we still want to call it so it prints the appropriate
+      // log message.
+      return ValidateSourceHash(source_hash, operation, source_fd_, error);
+    }
+
+    LOG(WARNING) << "Source hash from RAW device mismatched: found "
+                 << base::HexEncode(source_hash.data(), source_hash.size())
+                 << ", expected "
+                 << base::HexEncode(expected_source_hash.data(),
+                                    expected_source_hash.size());
+    if (should_optimize) {
+      TEST_AND_RETURN_FALSE(fd_utils::ReadAndHashExtents(
+          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash));
+      TEST_AND_RETURN_FALSE(
+          fd_utils::CopyAndHashExtents(source_ecc_fd_,
+                                       optimized.src_extents(),
+                                       target_fd_,
+                                       optimized.dst_extents(),
+                                       block_size_,
+                                       nullptr /* skip hashing */));
+    } else {
+      TEST_AND_RETURN_FALSE(
+          fd_utils::CopyAndHashExtents(source_ecc_fd_,
+                                       operation.src_extents(),
+                                       target_fd_,
+                                       operation.dst_extents(),
+                                       block_size_,
+                                       &source_hash));
+    }
+    TEST_AND_RETURN_FALSE(
+        ValidateSourceHash(source_hash, operation, source_ecc_fd_, error));
+    // At this point reading from the error corrected device worked, but
+    // reading from the raw device failed, so this is considered a recovered
+    // failure.
+    source_ecc_recovered_failures_++;
+  } else {
+    // When the operation doesn't include a source hash, we attempt the error
+    // corrected device first since we can't verify the block in the raw device
+    // at this point, but we fall back to the raw device since the error
+    // corrected device can be shorter or not available.
+
+    if (OpenCurrentECCPartition() &&
+        fd_utils::CopyAndHashExtents(source_ecc_fd_,
+                                     optimized.src_extents(),
+                                     target_fd_,
+                                     optimized.dst_extents(),
+                                     block_size_,
+                                     nullptr)) {
+      return true;
+    }
+    TEST_AND_RETURN_FALSE(fd_utils::CopyAndHashExtents(source_fd_,
+                                                       optimized.src_extents(),
+                                                       target_fd_,
+                                                       optimized.dst_extents(),
+                                                       block_size_,
+                                                       nullptr));
+  }
+  return true;
+}
+
+bool PartitionWriter::PerformSourceBsdiffOperation(
+    const InstallOperation& operation,
+    ErrorCode* error,
+    const void* data,
+    size_t count) {
+  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
+  TEST_AND_RETURN_FALSE(source_fd != nullptr);
+
+  auto reader = std::make_unique<DirectExtentReader>();
+  TEST_AND_RETURN_FALSE(
+      reader->Init(source_fd, operation.src_extents(), block_size_));
+  auto src_file = std::make_unique<BsdiffExtentFile>(
+      std::move(reader),
+      utils::BlocksInExtents(operation.src_extents()) * block_size_);
+
+  auto writer = CreateBaseExtentWriter();
+  TEST_AND_RETURN_FALSE(
+      writer->Init(target_fd_, operation.dst_extents(), block_size_));
+  auto dst_file = std::make_unique<BsdiffExtentFile>(
+      std::move(writer),
+      utils::BlocksInExtents(operation.dst_extents()) * block_size_);
+
+  TEST_AND_RETURN_FALSE(bsdiff::bspatch(std::move(src_file),
+                                        std::move(dst_file),
+                                        reinterpret_cast<const uint8_t*>(data),
+                                        count) == 0);
+  return true;
+}
+
+bool PartitionWriter::PerformPuffDiffOperation(
+    const InstallOperation& operation,
+    ErrorCode* error,
+    const void* data,
+    size_t count) {
+  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
+  TEST_AND_RETURN_FALSE(source_fd != nullptr);
+
+  auto reader = std::make_unique<DirectExtentReader>();
+  TEST_AND_RETURN_FALSE(
+      reader->Init(source_fd, operation.src_extents(), block_size_));
+  puffin::UniqueStreamPtr src_stream(new PuffinExtentStream(
+      std::move(reader),
+      utils::BlocksInExtents(operation.src_extents()) * block_size_));
+
+  auto writer = CreateBaseExtentWriter();
+  TEST_AND_RETURN_FALSE(
+      writer->Init(target_fd_, operation.dst_extents(), block_size_));
+  puffin::UniqueStreamPtr dst_stream(new PuffinExtentStream(
+      std::move(writer),
+      utils::BlocksInExtents(operation.dst_extents()) * block_size_));
+
+  constexpr size_t kMaxCacheSize = 5 * 1024 * 1024;  // Total 5MB cache.
+  TEST_AND_RETURN_FALSE(
+      puffin::PuffPatch(std::move(src_stream),
+                        std::move(dst_stream),
+                        reinterpret_cast<const uint8_t*>(data),
+                        count,
+                        kMaxCacheSize));
+  return true;
+}
+
+FileDescriptorPtr PartitionWriter::ChooseSourceFD(
+    const InstallOperation& operation, ErrorCode* error) {
+  if (source_fd_ == nullptr) {
+    LOG(ERROR) << "ChooseSourceFD fail: source_fd_ == nullptr";
+    return nullptr;
+  }
+
+  if (!operation.has_src_sha256_hash()) {
+    // When the operation doesn't include a source hash, we attempt the error
+    // corrected device first since we can't verify the block in the raw device
+    // at this point, but we first need to make sure all extents are readable
+    // since the error corrected device can be shorter or not available.
+    if (OpenCurrentECCPartition() &&
+        fd_utils::ReadAndHashExtents(
+            source_ecc_fd_, operation.src_extents(), block_size_, nullptr)) {
+      return source_ecc_fd_;
+    }
+    return source_fd_;
+  }
+
+  brillo::Blob source_hash;
+  brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
+                                    operation.src_sha256_hash().end());
+  if (fd_utils::ReadAndHashExtents(
+          source_fd_, operation.src_extents(), block_size_, &source_hash) &&
+      source_hash == expected_source_hash) {
+    return source_fd_;
+  }
+  // We fall back to use the error corrected device if the hash of the raw
+  // device doesn't match or there was an error reading the source partition.
+  if (!OpenCurrentECCPartition()) {
+    // The following function call will return false since the source hash
+    // mismatches, but we still want to call it so it prints the appropriate
+    // log message.
+    ValidateSourceHash(source_hash, operation, source_fd_, error);
+    return nullptr;
+  }
+  LOG(WARNING) << "Source hash from RAW device mismatched: found "
+               << base::HexEncode(source_hash.data(), source_hash.size())
+               << ", expected "
+               << base::HexEncode(expected_source_hash.data(),
+                                  expected_source_hash.size());
+
+  if (fd_utils::ReadAndHashExtents(
+          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash) &&
+      ValidateSourceHash(source_hash, operation, source_ecc_fd_, error)) {
+    // At this point reading from the error corrected device worked, but
+    // reading from the raw device failed, so this is considered a recovered
+    // failure.
+    source_ecc_recovered_failures_++;
+    return source_ecc_fd_;
+  }
+  return nullptr;
+}
+
+bool PartitionWriter::OpenCurrentECCPartition() {
+  // No support for ECC for full payloads.
+  // Full payload should not have any opeartion that requires ECC partitions.
+  if (source_ecc_fd_)
+    return true;
+
+  if (source_ecc_open_failure_)
+    return false;
+
+#if USE_FEC
+  const PartitionUpdate& partition = partition_update_;
+  const InstallPlan::Partition& install_part = install_part_;
+  std::string path = install_part.source_path;
+  FileDescriptorPtr fd(new FecFileDescriptor());
+  if (!fd->Open(path.c_str(), O_RDONLY, 0)) {
+    PLOG(ERROR) << "Unable to open ECC source partition "
+                << partition.partition_name() << ", file " << path;
+    source_ecc_open_failure_ = true;
+    return false;
+  }
+  source_ecc_fd_ = fd;
+#else
+  // No support for ECC compiled.
+  source_ecc_open_failure_ = true;
+#endif  // USE_FEC
+
+  return !source_ecc_open_failure_;
+}
+
+int PartitionWriter::Close() {
+  int err = 0;
+  if (source_fd_ && !source_fd_->Close()) {
+    err = errno;
+    PLOG(ERROR) << "Error closing source partition";
+    if (!err)
+      err = 1;
+  }
+  source_fd_.reset();
+  source_path_.clear();
+
+  if (target_fd_ && !target_fd_->Close()) {
+    err = errno;
+    PLOG(ERROR) << "Error closing target partition";
+    if (!err)
+      err = 1;
+  }
+  target_fd_.reset();
+  target_path_.clear();
+
+  if (source_ecc_fd_ && !source_ecc_fd_->Close()) {
+    err = errno;
+    PLOG(ERROR) << "Error closing ECC source partition";
+    if (!err)
+      err = 1;
+  }
+  source_ecc_fd_.reset();
+  source_ecc_open_failure_ = false;
+  return -err;
+}
+
+void PartitionWriter::CheckpointUpdateProgress(size_t next_op_index) {
+  target_fd_->Flush();
+}
+
+std::unique_ptr<ExtentWriter> PartitionWriter::CreateBaseExtentWriter() {
+  return std::make_unique<DirectExtentWriter>();
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/partition_writer.h b/payload_consumer/partition_writer.h
new file mode 100644
index 0000000..4b420d2
--- /dev/null
+++ b/payload_consumer/partition_writer.h
@@ -0,0 +1,146 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef UPDATE_ENGINE_PARTITION_WRITER_H_
+#define UPDATE_ENGINE_PARTITION_WRITER_H_
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include <brillo/secure_blob.h>
+#include <gtest/gtest_prod.h>
+
+#include "update_engine/common/dynamic_partition_control_interface.h"
+#include "update_engine/payload_consumer/extent_writer.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/update_metadata.pb.h"
+namespace chromeos_update_engine {
+class PartitionWriter {
+ public:
+  PartitionWriter(const PartitionUpdate& partition_update,
+                  const InstallPlan::Partition& install_part,
+                  DynamicPartitionControlInterface* dynamic_control,
+                  size_t block_size,
+                  bool is_interactive);
+  virtual ~PartitionWriter();
+  static bool ValidateSourceHash(const brillo::Blob& calculated_hash,
+                                 const InstallOperation& operation,
+                                 const FileDescriptorPtr source_fd,
+                                 ErrorCode* error);
+
+  // Perform necessary initialization work before InstallOperation can be
+  // applied to this partition
+  [[nodiscard]] virtual bool Init(const InstallPlan* install_plan,
+                                  bool source_may_exist,
+                                  size_t next_op_index);
+
+  // |CheckpointUpdateProgress| will be called after SetNextOpIndex(), but it's
+  // optional. DeltaPerformer may or may not call this everytime an operation is
+  // applied.
+  //   |next_op_index| is index of next operation that should be applied.
+  // |next_op_index-1| is the last operation that is already applied.
+  virtual void CheckpointUpdateProgress(size_t next_op_index);
+
+  // Close partition writer, when calling this function there's no guarantee
+  // that all |InstallOperations| are sent to |PartitionWriter|. This function
+  // will be called even if we are pausing/aborting the update.
+  int Close();
+
+  // These perform a specific type of operation and return true on success.
+  // |error| will be set if source hash mismatch, otherwise |error| might not be
+  // set even if it fails.
+  [[nodiscard]] virtual bool PerformReplaceOperation(
+      const InstallOperation& operation, const void* data, size_t count);
+  [[nodiscard]] virtual bool PerformZeroOrDiscardOperation(
+      const InstallOperation& operation);
+
+  [[nodiscard]] virtual bool PerformSourceCopyOperation(
+      const InstallOperation& operation, ErrorCode* error);
+  [[nodiscard]] virtual bool PerformSourceBsdiffOperation(
+      const InstallOperation& operation,
+      ErrorCode* error,
+      const void* data,
+      size_t count);
+  [[nodiscard]] virtual bool PerformPuffDiffOperation(
+      const InstallOperation& operation,
+      ErrorCode* error,
+      const void* data,
+      size_t count);
+
+  // |DeltaPerformer| calls this when all Install Ops are sent to partition
+  // writer. No |Perform*Operation| methods will be called in the future, and
+  // the partition writer is expected to be closed soon.
+  [[nodiscard]] virtual bool FinishedInstallOps() { return true; }
+
+ protected:
+  friend class PartitionWriterTest;
+  FRIEND_TEST(PartitionWriterTest, ChooseSourceFDTest);
+
+  bool OpenSourcePartition(uint32_t source_slot, bool source_may_exist);
+
+  bool OpenCurrentECCPartition();
+  // For a given operation, choose the source fd to be used (raw device or error
+  // correction device) based on the source operation hash.
+  // Returns nullptr if the source hash mismatch cannot be corrected, and set
+  // the |error| accordingly.
+  FileDescriptorPtr ChooseSourceFD(const InstallOperation& operation,
+                                   ErrorCode* error);
+  [[nodiscard]] virtual std::unique_ptr<ExtentWriter> CreateBaseExtentWriter();
+
+  const PartitionUpdate& partition_update_;
+  const InstallPlan::Partition& install_part_;
+  DynamicPartitionControlInterface* dynamic_control_;
+  // Path to source partition
+  std::string source_path_;
+  // Path to target partition
+  std::string target_path_;
+  FileDescriptorPtr source_fd_;
+  FileDescriptorPtr target_fd_;
+  const bool interactive_;
+  const size_t block_size_;
+  // File descriptor of the error corrected source partition. Only set while
+  // updating partition using a delta payload for a partition where error
+  // correction is available. The size of the error corrected device is smaller
+  // than the underlying raw device, since it doesn't include the error
+  // correction blocks.
+  FileDescriptorPtr source_ecc_fd_{nullptr};
+
+  // The total number of operations that failed source hash verification but
+  // passed after falling back to the error-corrected |source_ecc_fd_| device.
+  uint64_t source_ecc_recovered_failures_{0};
+
+  // Whether opening the current partition as an error-corrected device failed.
+  // Used to avoid re-opening the same source partition if it is not actually
+  // error corrected.
+  bool source_ecc_open_failure_{false};
+};
+
+namespace partition_writer {
+// Return a PartitionWriter instance for perform InstallOps on this partition.
+// Uses VABCPartitionWriter for Virtual AB Compression
+std::unique_ptr<PartitionWriter> CreatePartitionWriter(
+    const PartitionUpdate& partition_update,
+    const InstallPlan::Partition& install_part,
+    DynamicPartitionControlInterface* dynamic_control,
+    size_t block_size,
+    bool is_interactive,
+    bool is_dynamic_partition);
+}  // namespace partition_writer
+}  // namespace chromeos_update_engine
+
+#endif
diff --git a/payload_consumer/partition_writer_factory_android.cc b/payload_consumer/partition_writer_factory_android.cc
new file mode 100644
index 0000000..0c9f7ea
--- /dev/null
+++ b/payload_consumer/partition_writer_factory_android.cc
@@ -0,0 +1,54 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <cstddef>
+#include <memory>
+
+#include <base/logging.h>
+
+#include "update_engine/payload_consumer/vabc_partition_writer.h"
+
+namespace chromeos_update_engine::partition_writer {
+
+std::unique_ptr<PartitionWriter> CreatePartitionWriter(
+    const PartitionUpdate& partition_update,
+    const InstallPlan::Partition& install_part,
+    DynamicPartitionControlInterface* dynamic_control,
+    size_t block_size,
+    bool is_interactive,
+    bool is_dynamic_partition) {
+  if (dynamic_control &&
+      dynamic_control->GetVirtualAbCompressionFeatureFlag().IsEnabled() &&
+      is_dynamic_partition) {
+    LOG(INFO)
+        << "Virtual AB Compression Enabled, using VABC Partition Writer for `"
+        << install_part.name << '`';
+    return std::make_unique<VABCPartitionWriter>(partition_update,
+                                                 install_part,
+                                                 dynamic_control,
+                                                 block_size,
+                                                 is_interactive);
+  } else {
+    LOG(INFO) << "Virtual AB Compression disabled, using Partition Writer for `"
+              << install_part.name << '`';
+    return std::make_unique<PartitionWriter>(partition_update,
+                                             install_part,
+                                             dynamic_control,
+                                             block_size,
+                                             is_interactive);
+  }
+}
+}  // namespace chromeos_update_engine::partition_writer
diff --git a/payload_consumer/partition_writer_factory_chromeos.cc b/payload_consumer/partition_writer_factory_chromeos.cc
new file mode 100644
index 0000000..609f043
--- /dev/null
+++ b/payload_consumer/partition_writer_factory_chromeos.cc
@@ -0,0 +1,38 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <cstddef>
+#include <memory>
+
+#include <base/logging.h>
+
+#include "update_engine/payload_consumer/partition_writer.h"
+
+namespace chromeos_update_engine::partition_writer {
+std::unique_ptr<PartitionWriter> CreatePartitionWriter(
+    const PartitionUpdate& partition_update,
+    const InstallPlan::Partition& install_part,
+    DynamicPartitionControlInterface* dynamic_control,
+    size_t block_size,
+    bool is_interactive,
+    bool is_dynamic_partition) {
+  return std::make_unique<PartitionWriter>(partition_update,
+                                           install_part,
+                                           dynamic_control,
+                                           block_size,
+                                           is_interactive);
+}
+}  // namespace chromeos_update_engine::partition_writer
diff --git a/payload_consumer/partition_writer_unittest.cc b/payload_consumer/partition_writer_unittest.cc
new file mode 100644
index 0000000..91e5e26
--- /dev/null
+++ b/payload_consumer/partition_writer_unittest.cc
@@ -0,0 +1,204 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <memory>
+#include <vector>
+
+#include <brillo/secure_blob.h>
+#include <gtest/gtest.h>
+
+#include "update_engine/common/dynamic_partition_control_stub.h"
+#include "update_engine/common/error_code.h"
+#include "update_engine/common/fake_prefs.h"
+#include "update_engine/common/hash_calculator.h"
+#include "update_engine/common/test_utils.h"
+#include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/delta_performer.h"
+#include "update_engine/payload_consumer/extent_reader.h"
+#include "update_engine/payload_consumer/extent_writer.h"
+#include "update_engine/payload_consumer/fake_file_descriptor.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_generator/annotated_operation.h"
+#include "update_engine/payload_generator/delta_diff_generator.h"
+#include "update_engine/payload_generator/extent_ranges.h"
+#include "update_engine/payload_generator/payload_file.h"
+#include "update_engine/payload_generator/payload_generation_config.h"
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+
+class PartitionWriterTest : public testing::Test {
+ public:
+  // Helper function to pretend that the ECC file descriptor was already opened.
+  // Returns a pointer to the created file descriptor.
+  FakeFileDescriptor* SetFakeECCFile(size_t size) {
+    EXPECT_FALSE(writer_.source_ecc_fd_) << "source_ecc_fd_ already open.";
+    FakeFileDescriptor* ret = new FakeFileDescriptor();
+    fake_ecc_fd_.reset(ret);
+    // Call open to simulate it was already opened.
+    ret->Open("", 0);
+    ret->SetFileSize(size);
+    writer_.source_ecc_fd_ = fake_ecc_fd_;
+    return ret;
+  }
+
+  uint64_t GetSourceEccRecoveredFailures() const {
+    return writer_.source_ecc_recovered_failures_;
+  }
+
+  AnnotatedOperation GenerateSourceCopyOp(const brillo::Blob& copied_data,
+                                          bool add_hash,
+                                          PartitionConfig* old_part = nullptr) {
+    PayloadGenerationConfig config;
+    const uint64_t kDefaultBlockSize = config.block_size;
+    EXPECT_EQ(0U, copied_data.size() % kDefaultBlockSize);
+    uint64_t num_blocks = copied_data.size() / kDefaultBlockSize;
+    AnnotatedOperation aop;
+    *(aop.op.add_src_extents()) = ExtentForRange(0, num_blocks);
+    *(aop.op.add_dst_extents()) = ExtentForRange(0, num_blocks);
+    aop.op.set_type(InstallOperation::SOURCE_COPY);
+    brillo::Blob src_hash;
+    EXPECT_TRUE(HashCalculator::RawHashOfData(copied_data, &src_hash));
+    if (add_hash)
+      aop.op.set_src_sha256_hash(src_hash.data(), src_hash.size());
+
+    return aop;
+  }
+
+  brillo::Blob PerformSourceCopyOp(const InstallOperation& op,
+                                   const brillo::Blob blob_data) {
+    ScopedTempFile source_partition("Blob-XXXXXX");
+    DirectExtentWriter extent_writer;
+    FileDescriptorPtr fd(new EintrSafeFileDescriptor());
+    EXPECT_TRUE(fd->Open(source_partition.path().c_str(), O_RDWR));
+    EXPECT_TRUE(extent_writer.Init(fd, op.src_extents(), kBlockSize));
+    EXPECT_TRUE(extent_writer.Write(blob_data.data(), blob_data.size()));
+
+    ScopedTempFile target_partition("Blob-XXXXXX");
+
+    install_part_.source_path = source_partition.path();
+    install_part_.target_path = target_partition.path();
+    install_part_.source_size = blob_data.size();
+    install_part_.target_size = blob_data.size();
+
+    ErrorCode error;
+    EXPECT_TRUE(writer_.Init(&install_plan_, true, 0));
+    EXPECT_TRUE(writer_.PerformSourceCopyOperation(op, &error));
+    writer_.CheckpointUpdateProgress(1);
+
+    brillo::Blob output_data;
+    EXPECT_TRUE(utils::ReadFile(target_partition.path(), &output_data));
+    return output_data;
+  }
+
+  FakePrefs prefs_{};
+  InstallPlan install_plan_{};
+  InstallPlan::Payload payload_{};
+  DynamicPartitionControlStub dynamic_control_{};
+  FileDescriptorPtr fake_ecc_fd_{};
+  DeltaArchiveManifest manifest_{};
+  PartitionUpdate partition_update_{};
+  InstallPlan::Partition install_part_{};
+  PartitionWriter writer_{
+      partition_update_, install_part_, &dynamic_control_, kBlockSize, false};
+};
+// Test that the error-corrected file descriptor is used to read a partition
+// when no hash is available for SOURCE_COPY but it falls back to the normal
+// file descriptor when the size of the error corrected one is too small.
+TEST_F(PartitionWriterTest, ErrorCorrectionSourceCopyWhenNoHashFallbackTest) {
+  constexpr size_t kCopyOperationSize = 4 * 4096;
+  ScopedTempFile source("Source-XXXXXX");
+  // Setup the source path with the right expected data.
+  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
+  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), expected_data));
+
+  // Setup the fec file descriptor as the fake stream, with smaller data than
+  // the expected.
+  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize / 2);
+
+  PartitionConfig old_part(kPartitionNameRoot);
+  old_part.path = source.path();
+  old_part.size = expected_data.size();
+
+  // The payload operation doesn't include an operation hash.
+  auto source_copy_op = GenerateSourceCopyOp(expected_data, false, &old_part);
+
+  auto output_data = PerformSourceCopyOp(source_copy_op.op, expected_data);
+  ASSERT_EQ(output_data, expected_data);
+
+  // Verify that the fake_fec was attempted to be used. Since the file
+  // descriptor is shorter it can actually do more than one read to realize it
+  // reached the EOF.
+  EXPECT_LE(1U, fake_fec->GetReadOps().size());
+  // This fallback doesn't count as an error-corrected operation since the
+  // operation hash was not available.
+  EXPECT_EQ(0U, GetSourceEccRecoveredFailures());
+}
+
+// Test that the error-corrected file descriptor is used to read the partition
+// since the source partition doesn't match the operation hash.
+TEST_F(PartitionWriterTest, ErrorCorrectionSourceCopyFallbackTest) {
+  constexpr size_t kCopyOperationSize = 4 * 4096;
+  // Write invalid data to the source image, which doesn't match the expected
+  // hash.
+  brillo::Blob invalid_data(kCopyOperationSize, 0x55);
+
+  // Setup the fec file descriptor as the fake stream, which matches
+  // |expected_data|.
+  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize);
+  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
+
+  auto source_copy_op = GenerateSourceCopyOp(expected_data, true);
+  auto output_data = PerformSourceCopyOp(source_copy_op.op, invalid_data);
+  ASSERT_EQ(output_data, expected_data);
+
+  // Verify that the fake_fec was actually used.
+  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
+  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
+}
+
+TEST_F(PartitionWriterTest, ChooseSourceFDTest) {
+  constexpr size_t kSourceSize = 4 * 4096;
+  ScopedTempFile source("Source-XXXXXX");
+  // Write invalid data to the source image, which doesn't match the expected
+  // hash.
+  brillo::Blob invalid_data(kSourceSize, 0x55);
+  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), invalid_data));
+
+  writer_.source_fd_ = std::make_shared<EintrSafeFileDescriptor>();
+  writer_.source_fd_->Open(source.path().c_str(), O_RDONLY);
+
+  // Setup the fec file descriptor as the fake stream, which matches
+  // |expected_data|.
+  FakeFileDescriptor* fake_fec = SetFakeECCFile(kSourceSize);
+  brillo::Blob expected_data = FakeFileDescriptorData(kSourceSize);
+
+  InstallOperation op;
+  *(op.add_src_extents()) = ExtentForRange(0, kSourceSize / 4096);
+  brillo::Blob src_hash;
+  EXPECT_TRUE(HashCalculator::RawHashOfData(expected_data, &src_hash));
+  op.set_src_sha256_hash(src_hash.data(), src_hash.size());
+
+  ErrorCode error = ErrorCode::kSuccess;
+  EXPECT_EQ(writer_.source_ecc_fd_, writer_.ChooseSourceFD(op, &error));
+  EXPECT_EQ(ErrorCode::kSuccess, error);
+  // Verify that the fake_fec was actually used.
+  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
+  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/postinstall_runner_action.cc b/payload_consumer/postinstall_runner_action.cc
index 72d1a22..d452a64 100644
--- a/payload_consumer/postinstall_runner_action.cc
+++ b/payload_consumer/postinstall_runner_action.cc
@@ -50,14 +50,27 @@
 
 namespace chromeos_update_engine {
 
-using brillo::MessageLoop;
 using std::string;
 using std::vector;
 
 void PostinstallRunnerAction::PerformAction() {
   CHECK(HasInputObject());
+  CHECK(boot_control_);
   install_plan_ = GetInputObject();
 
+  auto dynamic_control = boot_control_->GetDynamicPartitionControl();
+  CHECK(dynamic_control);
+
+  // Mount snapshot partitions for Virtual AB Compression Compression.
+  if (dynamic_control->GetVirtualAbCompressionFeatureFlag().IsEnabled()) {
+    // Before calling MapAllPartitions to map snapshot devices, all CowWriters
+    // must be closed, and MapAllPartitions() should be called.
+    dynamic_control->UnmapAllPartitions();
+    if (!dynamic_control->MapAllPartitions()) {
+      return CompletePostinstall(ErrorCode::kPostInstallMountError);
+    }
+  }
+
   // We always powerwash when rolling back, however policy can determine
   // if this is a full/normal powerwash, or a special rollback powerwash
   // that retains a small amount of system state such as enrollment and
@@ -113,7 +126,7 @@
   const InstallPlan::Partition& partition =
       install_plan_.partitions[current_partition_];
 
-  const string mountable_device = partition.target_path;
+  const string mountable_device = partition.postinstall_mount_device;
   if (mountable_device.empty()) {
     LOG(ERROR) << "Cannot make mountable device from " << partition.target_path;
     return CompletePostinstall(ErrorCode::kPostinstallRunnerError);
@@ -130,6 +143,11 @@
   fs_mount_dir_ = temp_dir.value();
 #endif  // __ANDROID__
 
+  if (!utils::FileExists(fs_mount_dir_.c_str())) {
+    LOG(ERROR) << "Mount point " << fs_mount_dir_
+               << " does not exist, mount call will fail";
+    return CompletePostinstall(ErrorCode::kPostinstallRunnerError);
+  }
   // Double check that the fs_mount_dir is not busy with a previous mounted
   // filesystem from a previous crashed postinstall step.
   if (utils::IsMountpoint(fs_mount_dir_)) {
@@ -284,10 +302,14 @@
 void PostinstallRunnerAction::Cleanup() {
   utils::UnmountFilesystem(fs_mount_dir_);
 #ifndef __ANDROID__
+#if BASE_VER < 800000
+  if (!base::DeleteFile(base::FilePath(fs_mount_dir_), true)) {
+#else
   if (!base::DeleteFile(base::FilePath(fs_mount_dir_))) {
+#endif
     PLOG(WARNING) << "Not removing temporary mountpoint " << fs_mount_dir_;
   }
-#endif  // !__ANDROID__
+#endif
   fs_mount_dir_.clear();
 
   progress_fd_ = -1;
diff --git a/payload_consumer/postinstall_runner_action_unittest.cc b/payload_consumer/postinstall_runner_action_unittest.cc
index cce86e9..9b330d9 100644
--- a/payload_consumer/postinstall_runner_action_unittest.cc
+++ b/payload_consumer/postinstall_runner_action_unittest.cc
@@ -195,6 +195,7 @@
   InstallPlan::Partition part;
   part.name = "part";
   part.target_path = device_path;
+  part.postinstall_mount_device = device_path;
   part.run_postinstall = true;
   part.postinstall_path = postinstall_program;
   InstallPlan install_plan;
@@ -356,6 +357,7 @@
   InstallPlan::Partition part;
   part.name = "part";
   part.target_path = "/dev/null";
+  part.postinstall_mount_device = "/dev/null";
   part.run_postinstall = true;
   part.postinstall_path = kPostinstallDefaultScript;
   part.postinstall_optional = true;
diff --git a/payload_consumer/snapshot_extent_writer.cc b/payload_consumer/snapshot_extent_writer.cc
new file mode 100644
index 0000000..c9e6f31
--- /dev/null
+++ b/payload_consumer/snapshot_extent_writer.cc
@@ -0,0 +1,124 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/payload_consumer/snapshot_extent_writer.h"
+
+#include <algorithm>
+#include <cstdint>
+
+#include <libsnapshot/cow_writer.h>
+
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+
+SnapshotExtentWriter::SnapshotExtentWriter(
+    android::snapshot::ICowWriter* cow_writer)
+    : cow_writer_(cow_writer) {
+  CHECK_NE(cow_writer, nullptr);
+}
+
+SnapshotExtentWriter::~SnapshotExtentWriter() {
+  CHECK(buffer_.empty()) << buffer_.size();
+}
+
+bool SnapshotExtentWriter::Init(
+    FileDescriptorPtr /*fd*/,
+    const google::protobuf::RepeatedPtrField<Extent>& extents,
+    uint32_t block_size) {
+  extents_ = extents;
+  cur_extent_idx_ = 0;
+  buffer_.clear();
+  buffer_.reserve(block_size);
+  block_size_ = block_size;
+  return true;
+}
+
+size_t SnapshotExtentWriter::ConsumeWithBuffer(const uint8_t* data,
+                                               size_t count) {
+  CHECK_LT(cur_extent_idx_, static_cast<size_t>(extents_.size()));
+  const auto& cur_extent = extents_[cur_extent_idx_];
+  const auto cur_extent_size = cur_extent.num_blocks() * block_size_;
+
+  if (buffer_.empty() && count >= cur_extent_size) {
+    if (!cow_writer_->AddRawBlocks(
+            cur_extent.start_block(), data, cur_extent_size)) {
+      LOG(ERROR) << "AddRawBlocks(" << cur_extent.start_block() << ", " << data
+                 << ", " << cur_extent_size << ") failed.";
+      // return value is expected to be greater than 0. Return 0 to signal error
+      // condition
+      return 0;
+    }
+    if (!next_extent()) {
+      CHECK_EQ(count, cur_extent_size)
+          << "Exhausted all blocks, but still have " << count - cur_extent_size
+          << " bytes left";
+    }
+    return cur_extent_size;
+  }
+  CHECK_LT(buffer_.size(), cur_extent_size)
+      << "Data left in buffer should never be >= cur_extent_size, otherwise "
+         "we should have send that data to CowWriter. Buffer size: "
+      << buffer_.size() << " current extent size: " << cur_extent_size;
+  size_t bytes_to_copy =
+      std::min<size_t>(count, cur_extent_size - buffer_.size());
+  CHECK_GT(bytes_to_copy, 0U);
+
+  buffer_.insert(buffer_.end(), data, data + bytes_to_copy);
+  CHECK_LE(buffer_.size(), cur_extent_size);
+
+  if (buffer_.size() == cur_extent_size) {
+    if (!cow_writer_->AddRawBlocks(
+            cur_extent.start_block(), buffer_.data(), buffer_.size())) {
+      LOG(ERROR) << "AddRawBlocks(" << cur_extent.start_block() << ", "
+                 << buffer_.data() << ", " << buffer_.size() << ") failed.";
+      return 0;
+    }
+    buffer_.clear();
+    if (!next_extent()) {
+      CHECK_EQ(count, bytes_to_copy) << "Exhausted all blocks, but still have "
+                                     << count - bytes_to_copy << " bytes left";
+    }
+  }
+  return bytes_to_copy;
+}
+
+// Returns true on success.
+// This will construct a COW_REPLACE operation and forward it to CowWriter. It
+// is important that caller does not perform SOURCE_COPY operation on this
+// class, otherwise raw data will be stored. Caller should find ways to use
+// COW_COPY whenever possible.
+bool SnapshotExtentWriter::Write(const void* bytes, size_t count) {
+  if (count == 0) {
+    return true;
+  }
+  CHECK_NE(extents_.size(), 0);
+
+  auto data = static_cast<const uint8_t*>(bytes);
+  while (count > 0) {
+    auto bytes_written = ConsumeWithBuffer(data, count);
+    TEST_AND_RETURN_FALSE(bytes_written > 0);
+    data += bytes_written;
+    count -= bytes_written;
+  }
+  return true;
+}
+
+bool SnapshotExtentWriter::next_extent() {
+  cur_extent_idx_++;
+  return cur_extent_idx_ < static_cast<size_t>(extents_.size());
+}
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/snapshot_extent_writer.h b/payload_consumer/snapshot_extent_writer.h
new file mode 100644
index 0000000..6d9fe7d
--- /dev/null
+++ b/payload_consumer/snapshot_extent_writer.h
@@ -0,0 +1,55 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <cstdint>
+#include <vector>
+
+#include <libsnapshot/cow_writer.h>
+
+#include "update_engine/payload_consumer/extent_writer.h"
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+
+class SnapshotExtentWriter : public chromeos_update_engine::ExtentWriter {
+ public:
+  explicit SnapshotExtentWriter(android::snapshot::ICowWriter* cow_writer);
+  ~SnapshotExtentWriter();
+  // Returns true on success.
+  bool Init(FileDescriptorPtr fd,
+            const google::protobuf::RepeatedPtrField<Extent>& extents,
+            uint32_t block_size) override;
+  // Returns true on success.
+  // This will construct a COW_REPLACE operation and forward it to CowWriter. It
+  // is important that caller does not perform SOURCE_COPY operation on this
+  // class, otherwise raw data will be stored. Caller should find ways to use
+  // COW_COPY whenever possible.
+  bool Write(const void* bytes, size_t count) override;
+
+ private:
+  bool next_extent();
+  [[nodiscard]] size_t ConsumeWithBuffer(const uint8_t* bytes, size_t count);
+  // It's a non-owning pointer, because PartitionWriter owns the CowWruter. This
+  // allows us to use a single instance of CowWriter for all operations applied
+  // to the same partition.
+  android::snapshot::ICowWriter* cow_writer_;
+  google::protobuf::RepeatedPtrField<Extent> extents_;
+  size_t cur_extent_idx_;
+  std::vector<uint8_t> buffer_;
+  size_t block_size_;
+};
+
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/snapshot_extent_writer_unittest.cc b/payload_consumer/snapshot_extent_writer_unittest.cc
new file mode 100644
index 0000000..0e22482
--- /dev/null
+++ b/payload_consumer/snapshot_extent_writer_unittest.cc
@@ -0,0 +1,180 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <array>
+#include <cstring>
+#include <map>
+#include <numeric>
+#include <vector>
+
+#include <gtest/gtest.h>
+#include <google/protobuf/message_lite.h>
+#include <libsnapshot/cow_writer.h>
+
+#include "update_engine/payload_consumer/snapshot_extent_writer.h"
+#include "update_engine/payload_generator/delta_diff_generator.h"
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+
+class FakeCowWriter : public android::snapshot::ICowWriter {
+ public:
+  struct CowOp {
+    enum { COW_COPY, COW_REPLACE, COW_ZERO } type;
+    std::vector<unsigned char> data;
+    union {
+      size_t source_block;
+      size_t num_blocks;
+    };
+  };
+  using ICowWriter::ICowWriter;
+  ~FakeCowWriter() = default;
+
+  bool EmitCopy(uint64_t new_block, uint64_t old_block) override {
+    operations_[new_block] = {.type = CowOp::COW_COPY,
+                              .source_block = static_cast<size_t>(old_block)};
+    return true;
+  }
+  bool EmitRawBlocks(uint64_t new_block_start,
+                     const void* data,
+                     size_t size) override {
+    auto&& op = operations_[new_block_start];
+    const auto uint8_ptr = static_cast<const unsigned char*>(data);
+    op.data.insert(op.data.end(), uint8_ptr, uint8_ptr + size);
+    return true;
+  }
+  bool EmitZeroBlocks(uint64_t new_block_start, uint64_t num_blocks) override {
+    operations_[new_block_start] = {.type = CowOp::COW_ZERO};
+    return true;
+  }
+  bool Finalize() override {
+    finalize_called_ = true;
+    return true;
+  }
+
+  bool EmitLabel(uint64_t label) {
+    label_count_++;
+    return true;
+  }
+
+  // Return number of bytes the cow image occupies on disk.
+  uint64_t GetCowSize() override {
+    return std::accumulate(
+        operations_.begin(), operations_.end(), 0, [](auto&& acc, auto&& op) {
+          return acc + op.second.data.size();
+        });
+  }
+  bool Contains(size_t block) {
+    return operations_.find(block) != operations_.end();
+  }
+  bool finalize_called_ = true;
+  size_t label_count_ = 0;
+  std::map<size_t, CowOp> operations_;
+};
+
+class SnapshotExtentWriterTest : public ::testing::Test {
+ public:
+  void SetUp() override {}
+
+ protected:
+  android::snapshot::CowOptions options_ = {
+      .block_size = static_cast<uint32_t>(kBlockSize)};
+  FakeCowWriter cow_writer_{options_};
+  SnapshotExtentWriter writer_{&cow_writer_};
+};
+
+void AddExtent(google::protobuf::RepeatedPtrField<Extent>* extents,
+               size_t start_block,
+               size_t num_blocks) {
+  auto&& extent = extents->Add();
+  extent->set_start_block(start_block);
+  extent->set_num_blocks(num_blocks);
+}
+
+TEST_F(SnapshotExtentWriterTest, BufferWrites) {
+  google::protobuf::RepeatedPtrField<Extent> extents;
+  AddExtent(&extents, 123, 1);
+  writer_.Init(nullptr, extents, kBlockSize);
+
+  std::vector<uint8_t> buf(kBlockSize, 0);
+  buf[123] = 231;
+  buf[231] = 123;
+  buf[buf.size() - 1] = 255;
+
+  writer_.Write(buf.data(), kBlockSize - 1);
+  ASSERT_TRUE(cow_writer_.operations_.empty())
+      << "Haven't send data of a complete block yet, CowWriter should not be "
+         "invoked.";
+  writer_.Write(buf.data() + kBlockSize - 1, 1);
+  ASSERT_TRUE(cow_writer_.Contains(123))
+      << "Once a block of data is sent to SnapshotExtentWriter, it should "
+         "forward data to cow_writer.";
+  ASSERT_EQ(cow_writer_.operations_.size(), 1U);
+  ASSERT_EQ(buf, cow_writer_.operations_[123].data);
+}
+
+TEST_F(SnapshotExtentWriterTest, NonBufferedWrites) {
+  google::protobuf::RepeatedPtrField<Extent> extents;
+  AddExtent(&extents, 123, 1);
+  AddExtent(&extents, 125, 1);
+  writer_.Init(nullptr, extents, kBlockSize);
+
+  std::vector<uint8_t> buf(kBlockSize * 2, 0);
+  buf[123] = 231;
+  buf[231] = 123;
+  buf[buf.size() - 1] = 255;
+
+  writer_.Write(buf.data(), buf.size());
+  ASSERT_TRUE(cow_writer_.Contains(123));
+  ASSERT_TRUE(cow_writer_.Contains(125));
+
+  ASSERT_EQ(cow_writer_.operations_.size(), 2U);
+  auto actual_data = cow_writer_.operations_[123].data;
+  actual_data.insert(actual_data.end(),
+                     cow_writer_.operations_[125].data.begin(),
+                     cow_writer_.operations_[125].data.end());
+  ASSERT_EQ(buf, actual_data);
+}
+
+TEST_F(SnapshotExtentWriterTest, WriteAcrossBlockBoundary) {
+  google::protobuf::RepeatedPtrField<Extent> extents;
+  AddExtent(&extents, 123, 1);
+  AddExtent(&extents, 125, 2);
+  writer_.Init(nullptr, extents, kBlockSize);
+
+  std::vector<uint8_t> buf(kBlockSize * 3);
+  std::memset(buf.data(), 0, buf.size());
+  buf[123] = 231;
+  buf[231] = 123;
+  buf[buf.size() - 1] = 255;
+  buf[kBlockSize - 1] = 254;
+
+  writer_.Write(buf.data(), kBlockSize - 1);
+  ASSERT_TRUE(cow_writer_.operations_.empty())
+      << "Haven't send data of a complete block yet, CowWriter should not be "
+         "invoked.";
+  writer_.Write(buf.data() + kBlockSize - 1, 1 + kBlockSize * 2);
+  ASSERT_TRUE(cow_writer_.Contains(123));
+  ASSERT_TRUE(cow_writer_.Contains(125));
+
+  ASSERT_EQ(cow_writer_.operations_.size(), 2U);
+  auto actual_data = cow_writer_.operations_[123].data;
+  actual_data.insert(actual_data.end(),
+                     cow_writer_.operations_[125].data.begin(),
+                     cow_writer_.operations_[125].data.end());
+  ASSERT_EQ(buf, actual_data);
+}
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/vabc_partition_writer.cc b/payload_consumer/vabc_partition_writer.cc
new file mode 100644
index 0000000..5cb7989
--- /dev/null
+++ b/payload_consumer/vabc_partition_writer.cc
@@ -0,0 +1,166 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/payload_consumer/vabc_partition_writer.h"
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <libsnapshot/cow_writer.h>
+
+#include "update_engine/common/cow_operation_convert.h"
+#include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/extent_writer.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_consumer/partition_writer.h"
+#include "update_engine/payload_consumer/snapshot_extent_writer.h"
+
+namespace chromeos_update_engine {
+// Expected layout of COW file:
+// === Beginning of Cow Image ===
+// All Source Copy Operations
+// ========== Label 0 ==========
+// Operation 0 in PartitionUpdate
+// ========== Label 1 ==========
+// Operation 1 in PartitionUpdate
+// ========== label 2 ==========
+// Operation 2 in PartitionUpdate
+// ========== label 3 ==========
+// .
+// .
+// .
+
+// When resuming, pass |next_op_index_| as label to
+// |InitializeWithAppend|.
+// For example, suppose we finished writing SOURCE_COPY, and we finished writing
+// operation 2 completely. Update is suspended when we are half way through
+// operation 3.
+// |cnext_op_index_| would be 3, so we pass 3 as
+// label to |InitializeWithAppend|. The CowWriter will retain all data before
+// label 3, Which contains all operation 2's data, but none of operation 3's
+// data.
+
+bool VABCPartitionWriter::Init(const InstallPlan* install_plan,
+                               bool source_may_exist,
+                               size_t next_op_index) {
+  TEST_AND_RETURN_FALSE(install_plan != nullptr);
+  TEST_AND_RETURN_FALSE(
+      OpenSourcePartition(install_plan->source_slot, source_may_exist));
+  std::optional<std::string> source_path;
+  if (!install_part_.source_path.empty()) {
+    // TODO(zhangkelvin) Make |source_path| a std::optional<std::string>
+    source_path = install_part_.source_path;
+  }
+  cow_writer_ = dynamic_control_->OpenCowWriter(
+      install_part_.name, source_path, install_plan->is_resume);
+  TEST_AND_RETURN_FALSE(cow_writer_ != nullptr);
+
+  // ===== Resume case handling code goes here ====
+  // It is possible that the SOURCE_COPY are already written but
+  // |next_op_index_| is still 0. In this case we discard previously written
+  // SOURCE_COPY, and start over.
+  if (install_plan->is_resume && next_op_index > 0) {
+    LOG(INFO) << "Resuming update on partition `"
+              << partition_update_.partition_name() << "` op index "
+              << next_op_index;
+    TEST_AND_RETURN_FALSE(cow_writer_->InitializeAppend(next_op_index));
+    return true;
+  } else {
+    TEST_AND_RETURN_FALSE(cow_writer_->Initialize());
+  }
+
+  // ==============================================
+
+  // TODO(zhangkelvin) Rewrite this in C++20 coroutine once that's available.
+  auto converted = ConvertToCowOperations(partition_update_.operations(),
+                                          partition_update_.merge_operations());
+
+  WriteAllCowOps(block_size_, converted, cow_writer_.get(), source_fd_);
+  return true;
+}
+
+bool VABCPartitionWriter::WriteAllCowOps(
+    size_t block_size,
+    const std::vector<CowOperation>& converted,
+    android::snapshot::ICowWriter* cow_writer,
+    FileDescriptorPtr source_fd) {
+  std::vector<uint8_t> buffer(block_size);
+
+  for (const auto& cow_op : converted) {
+    switch (cow_op.op) {
+      case CowOperation::CowCopy:
+        TEST_AND_RETURN_FALSE(
+            cow_writer->AddCopy(cow_op.dst_block, cow_op.src_block));
+        break;
+      case CowOperation::CowReplace:
+        ssize_t bytes_read = 0;
+        TEST_AND_RETURN_FALSE(utils::ReadAll(source_fd,
+                                             buffer.data(),
+                                             block_size,
+                                             cow_op.src_block * block_size,
+                                             &bytes_read));
+        if (bytes_read <= 0 || static_cast<size_t>(bytes_read) != block_size) {
+          LOG(ERROR) << "source_fd->Read failed: " << bytes_read;
+          return false;
+        }
+        TEST_AND_RETURN_FALSE(cow_writer->AddRawBlocks(
+            cow_op.dst_block, buffer.data(), block_size));
+        break;
+    }
+  }
+
+  return true;
+}
+
+std::unique_ptr<ExtentWriter> VABCPartitionWriter::CreateBaseExtentWriter() {
+  return std::make_unique<SnapshotExtentWriter>(cow_writer_.get());
+}
+
+[[nodiscard]] bool VABCPartitionWriter::PerformZeroOrDiscardOperation(
+    const InstallOperation& operation) {
+  for (const auto& extent : operation.dst_extents()) {
+    TEST_AND_RETURN_FALSE(
+        cow_writer_->AddZeroBlocks(extent.start_block(), extent.num_blocks()));
+  }
+  return true;
+}
+
+[[nodiscard]] bool VABCPartitionWriter::PerformSourceCopyOperation(
+    const InstallOperation& operation, ErrorCode* error) {
+  // TODO(zhangkelvin) Probably just ignore SOURCE_COPY? They should be taken
+  // care of during Init();
+  return true;
+}
+
+void VABCPartitionWriter::CheckpointUpdateProgress(size_t next_op_index) {
+  // No need to call fsync/sync, as CowWriter flushes after a label is added
+  // added.
+  cow_writer_->AddLabel(next_op_index);
+}
+
+[[nodiscard]] bool VABCPartitionWriter::FinishedInstallOps() {
+  // Add a hardcoded magic label to indicate end of all install ops. This label
+  // is needed by filesystem verification, don't remove.
+  return cow_writer_->AddLabel(kEndOfInstallLabel);
+}
+
+VABCPartitionWriter::~VABCPartitionWriter() {
+  cow_writer_->Finalize();
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/vabc_partition_writer.h b/payload_consumer/vabc_partition_writer.h
new file mode 100644
index 0000000..7fb2a2c
--- /dev/null
+++ b/payload_consumer/vabc_partition_writer.h
@@ -0,0 +1,63 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef UPDATE_ENGINE_VABC_PARTITION_WRITER_H_
+#define UPDATE_ENGINE_VABC_PARTITION_WRITER_H_
+
+#include <memory>
+#include <vector>
+
+#include <libsnapshot/snapshot_writer.h>
+
+#include "update_engine/common/cow_operation_convert.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_consumer/partition_writer.h"
+
+namespace chromeos_update_engine {
+class VABCPartitionWriter final : public PartitionWriter {
+ public:
+  using PartitionWriter::PartitionWriter;
+  [[nodiscard]] bool Init(const InstallPlan* install_plan,
+                          bool source_may_exist,
+                          size_t next_op_index) override;
+  ~VABCPartitionWriter() override;
+
+  [[nodiscard]] std::unique_ptr<ExtentWriter> CreateBaseExtentWriter() override;
+
+  // Only ZERO and SOURCE_COPY InstallOperations are treated special by VABC
+  // Partition Writer. These operations correspond to COW_ZERO and COW_COPY. All
+  // other operations just get converted to COW_REPLACE.
+  [[nodiscard]] bool PerformZeroOrDiscardOperation(
+      const InstallOperation& operation) override;
+  [[nodiscard]] bool PerformSourceCopyOperation(
+      const InstallOperation& operation, ErrorCode* error) override;
+
+  void CheckpointUpdateProgress(size_t next_op_index) override;
+
+  static bool WriteAllCowOps(size_t block_size,
+                             const std::vector<CowOperation>& converted,
+                             android::snapshot::ICowWriter* cow_writer,
+                             FileDescriptorPtr source_fd);
+
+  [[nodiscard]] bool FinishedInstallOps() override;
+
+ private:
+  std::unique_ptr<android::snapshot::ISnapshotWriter> cow_writer_;
+};
+
+}  // namespace chromeos_update_engine
+
+#endif
diff --git a/payload_consumer/verity_writer_android.cc b/payload_consumer/verity_writer_android.cc
index d5437b6..864d9a1 100644
--- a/payload_consumer/verity_writer_android.cc
+++ b/payload_consumer/verity_writer_android.cc
@@ -29,6 +29,7 @@
 }
 
 #include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
 
 namespace chromeos_update_engine {
 
@@ -39,7 +40,16 @@
 }  // namespace verity_writer
 
 bool VerityWriterAndroid::Init(const InstallPlan::Partition& partition) {
+  auto read_fd = FileDescriptorPtr(new EintrSafeFileDescriptor());
+  TEST_AND_RETURN_FALSE(read_fd->Open(partition.target_path.c_str(), O_RDWR));
+  return Init(partition, read_fd, read_fd);
+}
+bool VerityWriterAndroid::Init(const InstallPlan::Partition& partition,
+                               FileDescriptorPtr read_fd,
+                               FileDescriptorPtr write_fd) {
   partition_ = &partition;
+  read_fd_ = read_fd;
+  write_fd_ = write_fd;
 
   if (partition_->hash_tree_size != 0 || partition_->fec_size != 0) {
     utils::SetBlockDeviceReadOnly(partition_->target_path, false);
@@ -82,18 +92,18 @@
 
       if (end_offset == hash_tree_data_end) {
         // All hash tree data blocks has been hashed, write hash tree to disk.
-        int fd = HANDLE_EINTR(open(partition_->target_path.c_str(), O_WRONLY));
-        if (fd < 0) {
-          PLOG(ERROR) << "Failed to open " << partition_->target_path
-                      << " to write hash tree.";
-          return false;
-        }
-        ScopedFdCloser fd_closer(&fd);
-
         LOG(INFO) << "Writing verity hash tree to " << partition_->target_path;
         TEST_AND_RETURN_FALSE(hash_tree_builder_->BuildHashTree());
-        TEST_AND_RETURN_FALSE(hash_tree_builder_->WriteHashTreeToFd(
-            fd, partition_->hash_tree_offset));
+        TEST_AND_RETURN_FALSE_ERRNO(
+            write_fd_->Seek(partition_->hash_tree_offset, SEEK_SET));
+        auto success = hash_tree_builder_->WriteHashTree(
+            [write_fd_(this->write_fd_)](auto data, auto size) {
+              return utils::WriteAll(write_fd_, data, size);
+            });
+        // hashtree builder already prints error messages.
+        if (!success) {
+          return false;
+        }
         hash_tree_builder_.reset();
       }
     }
@@ -103,7 +113,8 @@
         partition_->fec_data_offset + partition_->fec_data_size;
     if (offset < fec_data_end && offset + size >= fec_data_end) {
       LOG(INFO) << "Writing verity FEC to " << partition_->target_path;
-      TEST_AND_RETURN_FALSE(EncodeFEC(partition_->target_path,
+      TEST_AND_RETURN_FALSE(EncodeFEC(read_fd_,
+                                      write_fd_,
                                       partition_->fec_data_offset,
                                       partition_->fec_data_size,
                                       partition_->fec_offset,
@@ -116,7 +127,8 @@
   return true;
 }
 
-bool VerityWriterAndroid::EncodeFEC(const std::string& path,
+bool VerityWriterAndroid::EncodeFEC(FileDescriptorPtr read_fd,
+                                    FileDescriptorPtr write_fd,
                                     uint64_t data_offset,
                                     uint64_t data_size,
                                     uint64_t fec_offset,
@@ -135,13 +147,6 @@
       init_rs_char(FEC_PARAMS(fec_roots)), &free_rs_char);
   TEST_AND_RETURN_FALSE(rs_char != nullptr);
 
-  int fd = HANDLE_EINTR(open(path.c_str(), verify_mode ? O_RDONLY : O_RDWR));
-  if (fd < 0) {
-    PLOG(ERROR) << "Failed to open " << path << " to write FEC.";
-    return false;
-  }
-  ScopedFdCloser fd_closer(&fd);
-
   for (size_t i = 0; i < rounds; i++) {
     // Encodes |block_size| number of rs blocks each round so that we can read
     // one block each time instead of 1 byte to increase random read
@@ -154,13 +159,13 @@
       // Don't read past |data_size|, treat them as 0.
       if (offset < data_size) {
         ssize_t bytes_read = 0;
-        TEST_AND_RETURN_FALSE(utils::PReadAll(fd,
+        TEST_AND_RETURN_FALSE(utils::PReadAll(read_fd,
                                               buffer.data(),
                                               buffer.size(),
                                               data_offset + offset,
                                               &bytes_read));
-        TEST_AND_RETURN_FALSE(bytes_read ==
-                              static_cast<ssize_t>(buffer.size()));
+        TEST_AND_RETURN_FALSE(bytes_read >= 0);
+        TEST_AND_RETURN_FALSE(static_cast<size_t>(bytes_read) == buffer.size());
       }
       for (size_t k = 0; k < buffer.size(); k++) {
         rs_blocks[k * rs_n + j] = buffer[k];
@@ -179,17 +184,42 @@
       brillo::Blob fec_read(fec.size());
       ssize_t bytes_read = 0;
       TEST_AND_RETURN_FALSE(utils::PReadAll(
-          fd, fec_read.data(), fec_read.size(), fec_offset, &bytes_read));
-      TEST_AND_RETURN_FALSE(bytes_read ==
-                            static_cast<ssize_t>(fec_read.size()));
+          read_fd, fec_read.data(), fec_read.size(), fec_offset, &bytes_read));
+      TEST_AND_RETURN_FALSE(bytes_read >= 0);
+      TEST_AND_RETURN_FALSE(static_cast<size_t>(bytes_read) == fec_read.size());
       TEST_AND_RETURN_FALSE(fec == fec_read);
     } else {
-      TEST_AND_RETURN_FALSE(
-          utils::PWriteAll(fd, fec.data(), fec.size(), fec_offset));
+      CHECK(write_fd);
+      if (!utils::PWriteAll(write_fd, fec.data(), fec.size(), fec_offset)) {
+        PLOG(ERROR) << "EncodeFEC write() failed";
+        return false;
+      }
     }
     fec_offset += fec.size();
   }
 
   return true;
 }
+
+bool VerityWriterAndroid::EncodeFEC(const std::string& path,
+                                    uint64_t data_offset,
+                                    uint64_t data_size,
+                                    uint64_t fec_offset,
+                                    uint64_t fec_size,
+                                    uint32_t fec_roots,
+                                    uint32_t block_size,
+                                    bool verify_mode) {
+  FileDescriptorPtr fd(new EintrSafeFileDescriptor());
+  TEST_AND_RETURN_FALSE(
+      fd->Open(path.c_str(), verify_mode ? O_RDONLY : O_RDWR));
+  return EncodeFEC(fd,
+                   fd,
+                   data_offset,
+                   data_size,
+                   fec_offset,
+                   fec_size,
+                   fec_roots,
+                   block_size,
+                   verify_mode);
+}
 }  // namespace chromeos_update_engine
diff --git a/payload_consumer/verity_writer_android.h b/payload_consumer/verity_writer_android.h
index 05a5856..7dfac0f 100644
--- a/payload_consumer/verity_writer_android.h
+++ b/payload_consumer/verity_writer_android.h
@@ -22,6 +22,7 @@
 
 #include <verity/hash_tree_builder.h>
 
+#include "payload_consumer/file_descriptor.h"
 #include "update_engine/payload_consumer/verity_writer_interface.h"
 
 namespace chromeos_update_engine {
@@ -31,7 +32,10 @@
   VerityWriterAndroid() = default;
   ~VerityWriterAndroid() override = default;
 
-  bool Init(const InstallPlan::Partition& partition) override;
+  bool Init(const InstallPlan::Partition& partition,
+            FileDescriptorPtr read_fd,
+            FileDescriptorPtr write_fd) override;
+  bool Init(const InstallPlan::Partition& partition);
   bool Update(uint64_t offset, const uint8_t* buffer, size_t size) override;
 
   // Read [data_offset : data_offset + data_size) from |path| and encode FEC
@@ -40,6 +44,15 @@
   // in each Update() like hash tree, because for every rs block, its data are
   // spreaded across entire |data_size|, unless we can cache all data in
   // memory, we have to re-read them from disk.
+  static bool EncodeFEC(FileDescriptorPtr read_fd,
+                        FileDescriptorPtr write_fd,
+                        uint64_t data_offset,
+                        uint64_t data_size,
+                        uint64_t fec_offset,
+                        uint64_t fec_size,
+                        uint32_t fec_roots,
+                        uint32_t block_size,
+                        bool verify_mode);
   static bool EncodeFEC(const std::string& path,
                         uint64_t data_offset,
                         uint64_t data_size,
@@ -52,6 +65,8 @@
  private:
   const InstallPlan::Partition* partition_ = nullptr;
 
+  FileDescriptorPtr read_fd_;
+  FileDescriptorPtr write_fd_;
   std::unique_ptr<HashTreeBuilder> hash_tree_builder_;
 
   DISALLOW_COPY_AND_ASSIGN(VerityWriterAndroid);
diff --git a/payload_consumer/verity_writer_interface.h b/payload_consumer/verity_writer_interface.h
index a3ecef3..db7988e 100644
--- a/payload_consumer/verity_writer_interface.h
+++ b/payload_consumer/verity_writer_interface.h
@@ -22,6 +22,7 @@
 
 #include <base/macros.h>
 
+#include "payload_consumer/file_descriptor.h"
 #include "update_engine/payload_consumer/install_plan.h"
 
 namespace chromeos_update_engine {
@@ -30,6 +31,9 @@
  public:
   virtual ~VerityWriterInterface() = default;
 
+  virtual bool Init(const InstallPlan::Partition& partition,
+                    FileDescriptorPtr read_fd,
+                    FileDescriptorPtr write_fd) = 0;
   virtual bool Init(const InstallPlan::Partition& partition) = 0;
   // Update partition data at [offset : offset + size) stored in |buffer|.
   // Data not in |hash_tree_data_extent| or |fec_data_extent| is ignored.
diff --git a/payload_consumer/verity_writer_stub.cc b/payload_consumer/verity_writer_stub.cc
index a0e2467..314ec7e 100644
--- a/payload_consumer/verity_writer_stub.cc
+++ b/payload_consumer/verity_writer_stub.cc
@@ -26,7 +26,9 @@
 }
 }  // namespace verity_writer
 
-bool VerityWriterStub::Init(const InstallPlan::Partition& partition) {
+bool VerityWriterStub::Init(const InstallPlan::Partition& partition,
+                            FileDescriptorPtr read_fd,
+                            FileDescriptorPtr write_fd) {
   return partition.hash_tree_size == 0 && partition.fec_size == 0;
 }
 
diff --git a/payload_consumer/verity_writer_stub.h b/payload_consumer/verity_writer_stub.h
index ea5e574..f8d68ca 100644
--- a/payload_consumer/verity_writer_stub.h
+++ b/payload_consumer/verity_writer_stub.h
@@ -26,7 +26,9 @@
   VerityWriterStub() = default;
   ~VerityWriterStub() override = default;
 
-  bool Init(const InstallPlan::Partition& partition) override;
+  bool Init(const InstallPlan::Partition& partition,
+            FileDescriptorPtr read_fd,
+            FileDescriptorPtr write_fd) override;
   bool Update(uint64_t offset, const uint8_t* buffer, size_t size) override;
 
  private:
diff --git a/payload_generator/cow_size_estimator.cc b/payload_generator/cow_size_estimator.cc
new file mode 100644
index 0000000..3eb0aca
--- /dev/null
+++ b/payload_generator/cow_size_estimator.cc
@@ -0,0 +1,110 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/payload_generator/cow_size_estimator.h"
+
+#include <utility>
+#include <vector>
+
+#include <libsnapshot/cow_writer.h>
+
+#include "android-base/unique_fd.h"
+#include "update_engine/common/cow_operation_convert.h"
+#include "update_engine/payload_consumer/vabc_partition_writer.h"
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+using android::snapshot::CowWriter;
+
+void PerformReplaceOp(const InstallOperation& op,
+                      CowWriter* writer,
+                      FileDescriptorPtr target_fd,
+                      size_t block_size) {
+  std::vector<unsigned char> buffer;
+  for (const auto& extent : op.dst_extents()) {
+    buffer.resize(extent.num_blocks() * block_size);
+    // No need to read from payload.bin then decompress, just read from target
+    // directly.
+    ssize_t bytes_read = 0;
+    auto success = utils::ReadAll(target_fd,
+                                  buffer.data(),
+                                  buffer.size(),
+                                  extent.start_block() * block_size,
+                                  &bytes_read);
+    CHECK(success);
+    CHECK_EQ(static_cast<size_t>(bytes_read), buffer.size());
+    writer->AddRawBlocks(extent.start_block(), buffer.data(), buffer.size());
+  }
+}
+
+void PerformZeroOp(const InstallOperation& op,
+                   CowWriter* writer,
+                   size_t block_size) {
+  for (const auto& extent : op.dst_extents()) {
+    writer->AddZeroBlocks(extent.start_block(), extent.num_blocks());
+  }
+}
+
+size_t EstimateCowSize(
+    FileDescriptorPtr source_fd,
+    FileDescriptorPtr target_fd,
+    const google::protobuf::RepeatedPtrField<InstallOperation>& operations,
+    const google::protobuf::RepeatedPtrField<CowMergeOperation>&
+        merge_operations,
+    size_t block_size) {
+  android::snapshot::CowWriter cow_writer{
+      {.block_size = static_cast<uint32_t>(block_size), .compression = "gz"}};
+  // CowWriter treats -1 as special value, will discard all the data but still
+  // reports Cow size. Good for estimation purposes
+  cow_writer.Initialize(android::base::borrowed_fd{-1});
+
+  const auto converted = ConvertToCowOperations(operations, merge_operations);
+  VABCPartitionWriter::WriteAllCowOps(
+      block_size, converted, &cow_writer, source_fd);
+  cow_writer.AddLabel(0);
+  for (const auto& op : operations) {
+    switch (op.type()) {
+      case InstallOperation::REPLACE:
+      case InstallOperation::REPLACE_BZ:
+      case InstallOperation::REPLACE_XZ:
+        PerformReplaceOp(op, &cow_writer, target_fd, block_size);
+        break;
+      case InstallOperation::ZERO:
+      case InstallOperation::DISCARD:
+        PerformZeroOp(op, &cow_writer, block_size);
+        break;
+      case InstallOperation::SOURCE_COPY:
+      case InstallOperation::MOVE:
+        // Already handeled by WriteAllCowOps,
+        break;
+      case InstallOperation::SOURCE_BSDIFF:
+      case InstallOperation::BROTLI_BSDIFF:
+      case InstallOperation::PUFFDIFF:
+      case InstallOperation::BSDIFF:
+        // We might do something special by adding CowBsdiff to CowWriter.
+        // For now proceed the same way as normal REPLACE operation.
+        PerformReplaceOp(op, &cow_writer, target_fd, block_size);
+        break;
+    }
+    // Arbitrary label number, we won't be resuming use these labels here.
+    // They are emitted just to keep size estimates accurate. As update_engine
+    // emits 1 label for every op.
+    cow_writer.AddLabel(2);
+  }
+  // TODO(zhangkelvin) Take FEC extents into account once VABC stabilizes
+  return cow_writer.GetCowSize();
+}
+}  // namespace chromeos_update_engine
diff --git a/payload_generator/cow_size_estimator.h b/payload_generator/cow_size_estimator.h
new file mode 100644
index 0000000..cba89b5
--- /dev/null
+++ b/payload_generator/cow_size_estimator.h
@@ -0,0 +1,36 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+#include <cstddef>
+
+#include <update_engine/update_metadata.pb.h>
+
+#include "update_engine/payload_consumer/file_descriptor.h"
+
+namespace chromeos_update_engine {
+// Given file descriptor to the source image, target image, and list of
+// operations, estimate the size of COW image if the operations are applied on
+// Virtual AB Compression enabled device. This is intended to be used by update
+// generators to put an estimate cow size in OTA payload. When installing an OTA
+// update, libsnapshot will take this estimate as a hint to allocate spaces.
+size_t EstimateCowSize(
+    FileDescriptorPtr source_fd,
+    FileDescriptorPtr target_fd,
+    const google::protobuf::RepeatedPtrField<InstallOperation>& operations,
+    const google::protobuf::RepeatedPtrField<CowMergeOperation>&
+        merge_operations,
+    size_t block_size);
+
+}  // namespace chromeos_update_engine
diff --git a/payload_generator/cow_size_estimator_stub.cc b/payload_generator/cow_size_estimator_stub.cc
new file mode 100644
index 0000000..9d94d63
--- /dev/null
+++ b/payload_generator/cow_size_estimator_stub.cc
@@ -0,0 +1,31 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/payload_generator/cow_size_estimator.h"
+
+namespace chromeos_update_engine {
+
+size_t EstimateCowSize(
+    FileDescriptorPtr source_fd,
+    FileDescriptorPtr target_fd,
+    const google::protobuf::RepeatedPtrField<InstallOperation>& operations,
+    const google::protobuf::RepeatedPtrField<CowMergeOperation>&
+        merge_operations,
+    size_t block_size) {
+  return 0;
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_generator/delta_diff_generator.cc b/payload_generator/delta_diff_generator.cc
index ff8b0da..74d43fd 100644
--- a/payload_generator/delta_diff_generator.cc
+++ b/payload_generator/delta_diff_generator.cc
@@ -33,14 +33,17 @@
 
 #include "update_engine/common/utils.h"
 #include "update_engine/payload_consumer/delta_performer.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
 #include "update_engine/payload_consumer/payload_constants.h"
 #include "update_engine/payload_generator/ab_generator.h"
 #include "update_engine/payload_generator/annotated_operation.h"
 #include "update_engine/payload_generator/blob_file_writer.h"
+#include "update_engine/payload_generator/cow_size_estimator.h"
 #include "update_engine/payload_generator/delta_diff_utils.h"
 #include "update_engine/payload_generator/full_update_generator.h"
 #include "update_engine/payload_generator/merge_sequence_generator.h"
 #include "update_engine/payload_generator/payload_file.h"
+#include "update_engine/update_metadata.pb.h"
 
 using std::string;
 using std::unique_ptr;
@@ -53,6 +56,18 @@
 const size_t kBlockSize = 4096;  // bytes
 
 class PartitionProcessor : public base::DelegateSimpleThread::Delegate {
+  bool IsDynamicPartition(const std::string& partition_name) {
+    for (const auto& group :
+         config_.target.dynamic_partition_metadata->groups()) {
+      const auto& names = group.partition_names();
+      if (std::find(names.begin(), names.end(), partition_name) !=
+          names.end()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
  public:
   explicit PartitionProcessor(
       const PayloadGenerationConfig& config,
@@ -61,6 +76,7 @@
       BlobFileWriter* file_writer,
       std::vector<AnnotatedOperation>* aops,
       std::vector<CowMergeOperation>* cow_merge_sequence,
+      size_t* cow_size,
       std::unique_ptr<chromeos_update_engine::OperationsGenerator> strategy)
       : config_(config),
         old_part_(old_part),
@@ -68,11 +84,13 @@
         file_writer_(file_writer),
         aops_(aops),
         cow_merge_sequence_(cow_merge_sequence),
+        cow_size_(cow_size),
         strategy_(std::move(strategy)) {}
   PartitionProcessor(PartitionProcessor&&) noexcept = default;
+
   void Run() override {
     LOG(INFO) << "Started an async task to process partition "
-              << old_part_.name;
+              << new_part_.name;
     bool success = strategy_->GenerateOperations(
         config_, old_part_, new_part_, file_writer_, aops_);
     if (!success) {
@@ -85,13 +103,38 @@
     bool snapshot_enabled =
         config_.target.dynamic_partition_metadata &&
         config_.target.dynamic_partition_metadata->snapshot_enabled();
-    if (old_part_.path.empty() || !snapshot_enabled) {
+    if (!snapshot_enabled || !IsDynamicPartition(new_part_.name)) {
       return;
     }
-    auto generator = MergeSequenceGenerator::Create(*aops_);
-    if (!generator || !generator->Generate(cow_merge_sequence_)) {
-      LOG(FATAL) << "Failed to generate merge sequence";
+    if (!old_part_.path.empty()) {
+      auto generator = MergeSequenceGenerator::Create(*aops_);
+      if (!generator || !generator->Generate(cow_merge_sequence_)) {
+        LOG(FATAL) << "Failed to generate merge sequence";
+      }
     }
+
+    LOG(INFO) << "Estimating COW size for partition: " << new_part_.name;
+    // Need the contents of source/target image bytes when doing
+    // dry run.
+    FileDescriptorPtr source_fd{new EintrSafeFileDescriptor()};
+    source_fd->Open(old_part_.path.c_str(), O_RDONLY);
+
+    auto target_fd = std::make_unique<EintrSafeFileDescriptor>();
+    target_fd->Open(new_part_.path.c_str(), O_RDONLY);
+
+    google::protobuf::RepeatedPtrField<InstallOperation> operations;
+
+    for (const AnnotatedOperation& aop : *aops_) {
+      *operations.Add() = aop.op;
+    }
+    *cow_size_ = EstimateCowSize(
+        source_fd,
+        std::move(target_fd),
+        operations,
+        {cow_merge_sequence_->begin(), cow_merge_sequence_->end()},
+        config_.block_size);
+    LOG(INFO) << "Estimated COW size for partition: " << new_part_.name << " "
+              << *cow_size_;
   }
 
  private:
@@ -101,6 +144,7 @@
   BlobFileWriter* file_writer_;
   std::vector<AnnotatedOperation>* aops_;
   std::vector<CowMergeOperation>* cow_merge_sequence_;
+  size_t* cow_size_;
   std::unique_ptr<chromeos_update_engine::OperationsGenerator> strategy_;
   DISALLOW_COPY_AND_ASSIGN(PartitionProcessor);
 };
@@ -130,8 +174,12 @@
     PartitionConfig empty_part("");
     std::vector<std::vector<AnnotatedOperation>> all_aops;
     all_aops.resize(config.target.partitions.size());
+
     std::vector<std::vector<CowMergeOperation>> all_merge_sequences;
     all_merge_sequences.resize(config.target.partitions.size());
+
+    std::vector<size_t> all_cow_sizes(config.target.partitions.size(), 0);
+
     std::vector<PartitionProcessor> partition_tasks{};
     auto thread_count = std::min<int>(diff_utils::GetMaxThreads(),
                                       config.target.partitions.size());
@@ -149,10 +197,12 @@
       unique_ptr<OperationsGenerator> strategy;
       if (!old_part.path.empty()) {
         // Delta update.
-        LOG(INFO) << "Using generator ABGenerator().";
+        LOG(INFO) << "Using generator ABGenerator() for partition "
+                  << new_part.name;
         strategy.reset(new ABGenerator());
       } else {
-        LOG(INFO) << "Using generator FullUpdateGenerator().";
+        LOG(INFO) << "Using generator FullUpdateGenerator() for partition "
+                  << new_part.name;
         strategy.reset(new FullUpdateGenerator());
       }
 
@@ -163,6 +213,7 @@
                                                    &blob_file,
                                                    &all_aops[i],
                                                    &all_merge_sequences[i],
+                                                   &all_cow_sizes[i],
                                                    std::move(strategy)));
     }
     thread_pool.Start();
@@ -179,7 +230,8 @@
           payload.AddPartition(old_part,
                                new_part,
                                std::move(all_aops[i]),
-                               std::move(all_merge_sequences[i])));
+                               std::move(all_merge_sequences[i]),
+                               all_cow_sizes[i]));
     }
   }
   data_file.CloseFd();
diff --git a/payload_generator/extent_utils.h b/payload_generator/extent_utils.h
index 9763b1f..f870b29 100644
--- a/payload_generator/extent_utils.h
+++ b/payload_generator/extent_utils.h
@@ -20,6 +20,8 @@
 #include <string>
 #include <vector>
 
+#include <base/logging.h>
+
 #include "update_engine/payload_consumer/payload_constants.h"
 #include "update_engine/update_metadata.pb.h"
 
@@ -83,6 +85,43 @@
 
 bool operator==(const Extent& a, const Extent& b);
 
+// TODO(zhangkelvin) This is ugly. Rewrite using C++20's coroutine once
+// that's available. Unfortunately with C++17 this is the best I could do.
+
+// An iterator that takes a sequence of extents, and iterate over blocks
+// inside this sequence of extents.
+// Example usage:
+
+// BlockIterator it1{src_extents};
+// while(!it1.is_end()) {
+//    auto block = *it1;
+//    Do stuff with |block|
+// }
+struct BlockIterator {
+  explicit BlockIterator(
+      const google::protobuf::RepeatedPtrField<Extent>& src_extents)
+      : src_extents_(src_extents) {}
+
+  BlockIterator& operator++() {
+    CHECK_LT(cur_extent_, src_extents_.size());
+    block_offset_++;
+    if (block_offset_ >= src_extents_[cur_extent_].num_blocks()) {
+      cur_extent_++;
+      block_offset_ = 0;
+    }
+    return *this;
+  }
+
+  [[nodiscard]] bool is_end() { return cur_extent_ >= src_extents_.size(); }
+  [[nodiscard]] uint64_t operator*() {
+    return src_extents_[cur_extent_].start_block() + block_offset_;
+  }
+
+  const google::protobuf::RepeatedPtrField<Extent>& src_extents_;
+  int cur_extent_ = 0;
+  size_t block_offset_ = 0;
+};
+
 }  // namespace chromeos_update_engine
 
 #endif  // UPDATE_ENGINE_PAYLOAD_GENERATOR_EXTENT_UTILS_H_
diff --git a/payload_generator/generate_delta_main.cc b/payload_generator/generate_delta_main.cc
index 7724c9c..a187b32 100644
--- a/payload_generator/generate_delta_main.cc
+++ b/payload_generator/generate_delta_main.cc
@@ -18,6 +18,7 @@
 #include <string>
 #include <vector>
 
+#include <base/bind.h>
 #include <base/files/file_path.h>
 #include <base/files/file_util.h>
 #include <base/logging.h>
@@ -182,8 +183,11 @@
   install_plan.source_slot =
       config.is_delta ? 0 : BootControlInterface::kInvalidSlot;
   install_plan.target_slot = 1;
-  payload.type =
-      config.is_delta ? InstallPayloadType::kDelta : InstallPayloadType::kFull;
+  // For partial updates, we always write kDelta to the payload. Make it
+  // consistent for host simulation.
+  payload.type = config.is_delta || config.is_partial_update
+                     ? InstallPayloadType::kDelta
+                     : InstallPayloadType::kFull;
   payload.size = utils::FileSize(payload_file);
   // TODO(senj): This hash is only correct for unsigned payload, need to support
   // signed payload using PayloadSigner.
@@ -233,7 +237,9 @@
   processor.EnqueueAction(std::move(install_plan_action));
   processor.EnqueueAction(std::move(download_action));
   processor.EnqueueAction(std::move(filesystem_verifier_action));
-  processor.StartProcessing();
+  loop.PostTask(FROM_HERE,
+                base::Bind(&ActionProcessor::StartProcessing,
+                           base::Unretained(&processor)));
   loop.Run();
   CHECK_EQ(delegate.code_, ErrorCode::kSuccess);
   LOG(INFO) << "Completed applying " << (config.is_delta ? "delta" : "full")
@@ -404,6 +410,9 @@
   DEFINE_bool(disable_fec_computation,
               false,
               "Disables the fec data computation on device.");
+  DEFINE_bool(disable_verity_computation,
+              false,
+              "Disables the verity data computation on device.");
   DEFINE_string(
       out_maximum_signature_size_file,
       "",
@@ -572,6 +581,10 @@
     }
   }
 
+  if (FLAGS_is_partial_update) {
+    payload_config.is_partial_update = true;
+  }
+
   if (!FLAGS_in_file.empty()) {
     return ApplyPayload(FLAGS_in_file, payload_config) ? 0 : 1;
   }
@@ -600,10 +613,6 @@
     CHECK(payload_config.target.ValidateDynamicPartitionMetadata());
   }
 
-  if (FLAGS_is_partial_update) {
-    payload_config.is_partial_update = true;
-  }
-
   CHECK(!FLAGS_out_file.empty());
 
   payload_config.rootfs_partition_size = FLAGS_rootfs_partition_size;
@@ -664,7 +673,8 @@
   }
 
   if (payload_config.is_delta &&
-      payload_config.version.minor >= kVerityMinorPayloadVersion)
+      payload_config.version.minor >= kVerityMinorPayloadVersion &&
+      !FLAGS_disable_verity_computation)
     CHECK(payload_config.target.LoadVerityConfig());
 
   LOG(INFO) << "Generating " << (payload_config.is_delta ? "delta" : "full")
diff --git a/payload_generator/merge_sequence_generator_unittest.cc b/payload_generator/merge_sequence_generator_unittest.cc
index 567ede1..1f0c2ea 100644
--- a/payload_generator/merge_sequence_generator_unittest.cc
+++ b/payload_generator/merge_sequence_generator_unittest.cc
@@ -116,6 +116,20 @@
             merge_after.at(transfers[2]));
 }
 
+TEST_F(MergeSequenceGeneratorTest, FindDependencyEdgeCase) {
+  std::vector<CowMergeOperation> transfers = {
+      CreateCowMergeOperation(ExtentForRange(10, 10), ExtentForRange(15, 10)),
+      CreateCowMergeOperation(ExtentForRange(40, 10), ExtentForRange(50, 10)),
+      CreateCowMergeOperation(ExtentForRange(59, 10), ExtentForRange(60, 10)),
+  };
+
+  std::map<CowMergeOperation, std::set<CowMergeOperation>> merge_after;
+  FindDependency(transfers, &merge_after);
+  ASSERT_EQ(std::set<CowMergeOperation>(), merge_after.at(transfers[0]));
+  ASSERT_EQ(std::set<CowMergeOperation>(), merge_after.at(transfers[1]));
+  ASSERT_EQ(merge_after[transfers[2]].size(), 1U);
+}
+
 TEST_F(MergeSequenceGeneratorTest, FindDependency_ReusedSourceBlocks) {
   std::vector<CowMergeOperation> transfers = {
       CreateCowMergeOperation(ExtentForRange(5, 10), ExtentForRange(15, 10)),
diff --git a/payload_generator/payload_file.cc b/payload_generator/payload_file.cc
index 74423d1..33c0749 100644
--- a/payload_generator/payload_file.cc
+++ b/payload_generator/payload_file.cc
@@ -80,8 +80,10 @@
 bool PayloadFile::AddPartition(const PartitionConfig& old_conf,
                                const PartitionConfig& new_conf,
                                vector<AnnotatedOperation> aops,
-                               vector<CowMergeOperation> merge_sequence) {
+                               vector<CowMergeOperation> merge_sequence,
+                               size_t cow_size) {
   Partition part;
+  part.cow_size = cow_size;
   part.name = new_conf.name;
   part.aops = std::move(aops);
   part.cow_merge_sequence = std::move(merge_sequence);
@@ -129,6 +131,9 @@
     if (!part.version.empty()) {
       partition->set_version(part.version);
     }
+    if (part.cow_size > 0) {
+      partition->set_estimate_cow_size(part.cow_size);
+    }
     if (part.postinstall.run) {
       partition->set_run_postinstall(true);
       if (!part.postinstall.path.empty())
diff --git a/payload_generator/payload_file.h b/payload_generator/payload_file.h
index 8b17956..3a45793 100644
--- a/payload_generator/payload_file.h
+++ b/payload_generator/payload_file.h
@@ -44,7 +44,8 @@
   bool AddPartition(const PartitionConfig& old_conf,
                     const PartitionConfig& new_conf,
                     std::vector<AnnotatedOperation> aops,
-                    std::vector<CowMergeOperation> merge_sequence);
+                    std::vector<CowMergeOperation> merge_sequence,
+                    size_t cow_size);
 
   // Write the payload to the |payload_file| file. The operations reference
   // blobs in the |data_blobs_path| file and the blobs will be reordered in the
@@ -100,6 +101,7 @@
     VerityConfig verity;
     // Per partition timestamp.
     std::string version;
+    size_t cow_size;
   };
 
   std::vector<Partition> part_vec_;
diff --git a/payload_generator/payload_properties_unittest.cc b/payload_generator/payload_properties_unittest.cc
index ed936ff..0ff364f 100644
--- a/payload_generator/payload_properties_unittest.cc
+++ b/payload_generator/payload_properties_unittest.cc
@@ -88,7 +88,7 @@
     EXPECT_TRUE(strategy->GenerateOperations(
         config, old_part, new_part, &blob_file_writer, &aops));
 
-    payload.AddPartition(old_part, new_part, aops, {});
+    payload.AddPartition(old_part, new_part, aops, {}, 0);
 
     uint64_t metadata_size;
     EXPECT_TRUE(payload.WritePayload(
diff --git a/payload_generator/payload_signer.cc b/payload_generator/payload_signer.cc
index dd87ab7..d9f0dd7 100644
--- a/payload_generator/payload_signer.cc
+++ b/payload_generator/payload_signer.cc
@@ -241,8 +241,6 @@
                                            DeltaArchiveManifest* manifest) {
   LOG(INFO) << "Making room for signature in file";
   manifest->set_signatures_offset(signature_blob_offset);
-  LOG(INFO) << "set? " << manifest->has_signatures_offset();
-  manifest->set_signatures_offset(signature_blob_offset);
   manifest->set_signatures_size(signature_blob_length);
 }
 
diff --git a/scripts/brillo_update_payload b/scripts/brillo_update_payload
index 3bc87bd..1e729bd 100755
--- a/scripts/brillo_update_payload
+++ b/scripts/brillo_update_payload
@@ -193,9 +193,13 @@
   DEFINE_string disable_fec_computation "" \
     "Optional: Disables the on device fec data computation for incremental \
 update. This feature is enabled by default."
+  DEFINE_string disable_verity_computation "" \
+    "Optional: Disables the on device verity computation for incremental \
+update. This feature is enabled by default."
   DEFINE_string is_partial_update "" \
     "Optional: True if the payload is for partial update. i.e. it only updates \
 a subset of partitions on device."
+  DEFINE_string full_boot "" "Will include full boot image"
 fi
 if [[ "${COMMAND}" == "hash" || "${COMMAND}" == "sign" ]]; then
   DEFINE_string unsigned_payload "" "Path to the input unsigned payload."
@@ -649,7 +653,12 @@
     fi
     partition_names+="${part}"
     new_partitions+="${DST_PARTITIONS[${part}]}"
-    old_partitions+="${SRC_PARTITIONS[${part}]:-}"
+    if [ "${FLAGS_full_boot}" == "true" ] && [ "${part}" == "boot" ]; then
+      # Skip boot partition.
+      old_partitions+=""
+    else
+      old_partitions+="${SRC_PARTITIONS[${part}]:-}"
+    fi
     new_mapfiles+="${DST_PARTITIONS_MAP[${part}]:-}"
     old_mapfiles+="${SRC_PARTITIONS_MAP[${part}]:-}"
   done
@@ -681,6 +690,10 @@
       GENERATOR_ARGS+=(
         --disable_fec_computation="${FLAGS_disable_fec_computation}" )
     fi
+    if [[ -n "${FLAGS_disable_verity_computation}" ]]; then
+      GENERATOR_ARGS+=(
+        --disable_verity_computation="${FLAGS_disable_verity_computation}" )
+    fi
   fi
 
   # minor version is set only for delta or partial payload.
diff --git a/scripts/paycheck.py b/scripts/paycheck.py
index 8eb0033..cb1713f 100755
--- a/scripts/paycheck.py
+++ b/scripts/paycheck.py
@@ -27,6 +27,7 @@
 import sys
 import tempfile
 
+# pylint: disable=redefined-builtin
 from six.moves import zip
 from update_payload import error
 
diff --git a/scripts/paycheck_unittest.py b/scripts/paycheck_unittest.py
index e54a3c0..a90d269 100755
--- a/scripts/paycheck_unittest.py
+++ b/scripts/paycheck_unittest.py
@@ -34,7 +34,7 @@
 # Previously test_paycheck.sh. Run with update_payload ebuild.
 
 # Disable check for function names to avoid errors based on old code
-# pylint: disable-msg=invalid-name
+# pylint: disable=invalid-name
 
 import filecmp
 import os
diff --git a/scripts/payload_info.py b/scripts/payload_info.py
index 7625ee8..8343d21 100755
--- a/scripts/payload_info.py
+++ b/scripts/payload_info.py
@@ -75,8 +75,11 @@
       DisplayValue('  Number of "%s" ops' % partition.partition_name,
                    len(partition.operations))
     for partition in manifest.partitions:
-      DisplayValue("Timestamp for " +
+      DisplayValue("  Timestamp for " +
                    partition.partition_name, partition.version)
+    for partition in manifest.partitions:
+      DisplayValue("  COW Size for " +
+                   partition.partition_name, partition.estimate_cow_size)
     DisplayValue('Block size', manifest.block_size)
     DisplayValue('Minor version', manifest.minor_version)
 
diff --git a/scripts/update_device.py b/scripts/update_device.py
index 1cd4b6a..354972b 100755
--- a/scripts/update_device.py
+++ b/scripts/update_device.py
@@ -17,6 +17,7 @@
 
 """Send an A/B update to an Android device over adb."""
 
+from __future__ import print_function
 from __future__ import absolute_import
 
 import argparse
@@ -103,7 +104,7 @@
 
     if payload_info.compress_type != 0:
       logging.error(
-          "Expected layload to be uncompressed, got compression method %d",
+          "Expected payload to be uncompressed, got compression method %d",
           payload_info.compress_type)
     # Don't use len(payload_info.extra). Because that returns size of extra
     # fields in central directory. We need to look at local file directory,
@@ -124,10 +125,10 @@
       payload_header = fp.read(4)
       if payload_header != self.PAYLOAD_MAGIC_HEADER:
         logging.warning(
-            "Invalid header, expeted %s, got %s."
+            "Invalid header, expected %s, got %s."
             "Either the offset is not correct, or payload is corrupted",
             binascii.hexlify(self.PAYLOAD_MAGIC_HEADER),
-            payload_header)
+            binascii.hexlify(payload_header))
 
     property_entry = (self.SECONDARY_OTA_PAYLOAD_PROPERTIES_TXT if
                       secondary_payload else self.OTA_PAYLOAD_PROPERTIES_TXT)
@@ -305,6 +306,7 @@
     logging.info('Server Terminated')
 
   def StopServer(self):
+    self._httpd.shutdown()
     self._httpd.socket.close()
 
 
@@ -318,13 +320,13 @@
   """Return the command to run to start the update in the Android device."""
   ota = AndroidOTAPackage(ota_filename, secondary)
   headers = ota.properties
-  headers += 'USER_AGENT=Dalvik (something, something)\n'
-  headers += 'NETWORK_ID=0\n'
-  headers += extra_headers
+  headers += b'USER_AGENT=Dalvik (something, something)\n'
+  headers += b'NETWORK_ID=0\n'
+  headers += extra_headers.encode()
 
   return ['update_engine_client', '--update', '--follow',
           '--payload=%s' % payload_url, '--offset=%d' % ota.offset,
-          '--size=%d' % ota.size, '--headers="%s"' % headers]
+          '--size=%d' % ota.size, '--headers="%s"' % headers.decode()]
 
 
 def OmahaUpdateCommand(omaha_url):
@@ -401,6 +403,8 @@
                       help='Extra headers to pass to the device.')
   parser.add_argument('--secondary', action='store_true',
                       help='Update with the secondary payload in the package.')
+  parser.add_argument('--no-slot-switch', action='store_true',
+                      help='Do not perform slot switch after the update.')
   args = parser.parse_args()
   logging.basicConfig(
       level=logging.WARNING if args.no_verbose else logging.INFO)
@@ -418,6 +422,9 @@
   help_cmd = ['shell', 'su', '0', 'update_engine_client', '--help']
   use_omaha = 'omaha' in dut.adb_output(help_cmd)
 
+  if args.no_slot_switch:
+    args.extra_headers += "\nSWITCH_SLOT_ON_REBOOT=0"
+
   if args.file:
     # Update via pushing a file to /data.
     device_ota_file = os.path.join(OTA_PACKAGE_PATH, 'debug.zip')
diff --git a/scripts/update_payload/checker.py b/scripts/update_payload/checker.py
index 99a5c62..56a9370 100644
--- a/scripts/update_payload/checker.py
+++ b/scripts/update_payload/checker.py
@@ -35,6 +35,7 @@
 import os
 import subprocess
 
+# pylint: disable=redefined-builtin
 from six.moves import range
 
 from update_payload import common
diff --git a/scripts/update_payload/update_metadata_pb2.py b/scripts/update_payload/update_metadata_pb2.py
index 841cd22..ea4bc59 100644
--- a/scripts/update_payload/update_metadata_pb2.py
+++ b/scripts/update_payload/update_metadata_pb2.py
@@ -18,7 +18,7 @@
   package='chromeos_update_engine',
   syntax='proto2',
   serialized_options=b'H\003',
-  serialized_pb=b'\n\x15update_metadata.proto\x12\x16\x63hromeos_update_engine\"1\n\x06\x45xtent\x12\x13\n\x0bstart_block\x18\x01 \x01(\x04\x12\x12\n\nnum_blocks\x18\x02 \x01(\x04\"\x9f\x01\n\nSignatures\x12@\n\nsignatures\x18\x01 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x1aO\n\tSignature\x12\x13\n\x07version\x18\x01 \x01(\rB\x02\x18\x01\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\x1f\n\x17unpadded_signature_size\x18\x03 \x01(\x07\"+\n\rPartitionInfo\x12\x0c\n\x04size\x18\x01 \x01(\x04\x12\x0c\n\x04hash\x18\x02 \x01(\x0c\"w\n\tImageInfo\x12\r\n\x05\x62oard\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\x12\x0f\n\x07\x63hannel\x18\x03 \x01(\t\x12\x0f\n\x07version\x18\x04 \x01(\t\x12\x15\n\rbuild_channel\x18\x05 \x01(\t\x12\x15\n\rbuild_version\x18\x06 \x01(\t\"\xee\x03\n\x10InstallOperation\x12;\n\x04type\x18\x01 \x02(\x0e\x32-.chromeos_update_engine.InstallOperation.Type\x12\x13\n\x0b\x64\x61ta_offset\x18\x02 \x01(\x04\x12\x13\n\x0b\x64\x61ta_length\x18\x03 \x01(\x04\x12\x33\n\x0bsrc_extents\x18\x04 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\nsrc_length\x18\x05 \x01(\x04\x12\x33\n\x0b\x64st_extents\x18\x06 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\ndst_length\x18\x07 \x01(\x04\x12\x18\n\x10\x64\x61ta_sha256_hash\x18\x08 \x01(\x0c\x12\x17\n\x0fsrc_sha256_hash\x18\t \x01(\x0c\"\xad\x01\n\x04Type\x12\x0b\n\x07REPLACE\x10\x00\x12\x0e\n\nREPLACE_BZ\x10\x01\x12\x0c\n\x04MOVE\x10\x02\x1a\x02\x08\x01\x12\x0e\n\x06\x42SDIFF\x10\x03\x1a\x02\x08\x01\x12\x0f\n\x0bSOURCE_COPY\x10\x04\x12\x11\n\rSOURCE_BSDIFF\x10\x05\x12\x0e\n\nREPLACE_XZ\x10\x08\x12\x08\n\x04ZERO\x10\x06\x12\x0b\n\x07\x44ISCARD\x10\x07\x12\x11\n\rBROTLI_BSDIFF\x10\n\x12\x0c\n\x08PUFFDIFF\x10\t\"\xe8\x05\n\x0fPartitionUpdate\x12\x16\n\x0epartition_name\x18\x01 \x02(\t\x12\x17\n\x0frun_postinstall\x18\x02 \x01(\x08\x12\x18\n\x10postinstall_path\x18\x03 \x01(\t\x12\x17\n\x0f\x66ilesystem_type\x18\x04 \x01(\t\x12M\n\x17new_partition_signature\x18\x05 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x12\x41\n\x12old_partition_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12\x41\n\x12new_partition_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12<\n\noperations\x18\x08 \x03(\x0b\x32(.chromeos_update_engine.InstallOperation\x12\x1c\n\x14postinstall_optional\x18\t \x01(\x08\x12=\n\x15hash_tree_data_extent\x18\n \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x38\n\x10hash_tree_extent\x18\x0b \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x1b\n\x13hash_tree_algorithm\x18\x0c \x01(\t\x12\x16\n\x0ehash_tree_salt\x18\r \x01(\x0c\x12\x37\n\x0f\x66\x65\x63_data_extent\x18\x0e \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x32\n\nfec_extent\x18\x0f \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x14\n\tfec_roots\x18\x10 \x01(\r:\x01\x32\x12\x0f\n\x07version\x18\x11 \x01(\t\"L\n\x15\x44ynamicPartitionGroup\x12\x0c\n\x04name\x18\x01 \x02(\t\x12\x0c\n\x04size\x18\x02 \x01(\x04\x12\x17\n\x0fpartition_names\x18\x03 \x03(\t\"s\n\x18\x44ynamicPartitionMetadata\x12=\n\x06groups\x18\x01 \x03(\x0b\x32-.chromeos_update_engine.DynamicPartitionGroup\x12\x18\n\x10snapshot_enabled\x18\x02 \x01(\x08\"\xe1\x06\n\x14\x44\x65ltaArchiveManifest\x12H\n\x12install_operations\x18\x01 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12O\n\x19kernel_install_operations\x18\x02 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12\x18\n\nblock_size\x18\x03 \x01(\r:\x04\x34\x30\x39\x36\x12\x19\n\x11signatures_offset\x18\x04 \x01(\x04\x12\x17\n\x0fsignatures_size\x18\x05 \x01(\x04\x12\x42\n\x0fold_kernel_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_kernel_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fold_rootfs_info\x18\x08 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_rootfs_info\x18\t \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x39\n\x0eold_image_info\x18\n \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x39\n\x0enew_image_info\x18\x0b \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x18\n\rminor_version\x18\x0c \x01(\r:\x01\x30\x12;\n\npartitions\x18\r \x03(\x0b\x32\'.chromeos_update_engine.PartitionUpdate\x12\x15\n\rmax_timestamp\x18\x0e \x01(\x03\x12T\n\x1a\x64ynamic_partition_metadata\x18\x0f \x01(\x0b\x32\x30.chromeos_update_engine.DynamicPartitionMetadata\x12\x16\n\x0epartial_update\x18\x10 \x01(\x08\x42\x02H\x03'
+  serialized_pb=b'\n\x15update_metadata.proto\x12\x16\x63hromeos_update_engine\"1\n\x06\x45xtent\x12\x13\n\x0bstart_block\x18\x01 \x01(\x04\x12\x12\n\nnum_blocks\x18\x02 \x01(\x04\"\x9f\x01\n\nSignatures\x12@\n\nsignatures\x18\x01 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x1aO\n\tSignature\x12\x13\n\x07version\x18\x01 \x01(\rB\x02\x18\x01\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\x1f\n\x17unpadded_signature_size\x18\x03 \x01(\x07\"+\n\rPartitionInfo\x12\x0c\n\x04size\x18\x01 \x01(\x04\x12\x0c\n\x04hash\x18\x02 \x01(\x0c\"w\n\tImageInfo\x12\r\n\x05\x62oard\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\x12\x0f\n\x07\x63hannel\x18\x03 \x01(\t\x12\x0f\n\x07version\x18\x04 \x01(\t\x12\x15\n\rbuild_channel\x18\x05 \x01(\t\x12\x15\n\rbuild_version\x18\x06 \x01(\t\"\xee\x03\n\x10InstallOperation\x12;\n\x04type\x18\x01 \x02(\x0e\x32-.chromeos_update_engine.InstallOperation.Type\x12\x13\n\x0b\x64\x61ta_offset\x18\x02 \x01(\x04\x12\x13\n\x0b\x64\x61ta_length\x18\x03 \x01(\x04\x12\x33\n\x0bsrc_extents\x18\x04 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\nsrc_length\x18\x05 \x01(\x04\x12\x33\n\x0b\x64st_extents\x18\x06 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\ndst_length\x18\x07 \x01(\x04\x12\x18\n\x10\x64\x61ta_sha256_hash\x18\x08 \x01(\x0c\x12\x17\n\x0fsrc_sha256_hash\x18\t \x01(\x0c\"\xad\x01\n\x04Type\x12\x0b\n\x07REPLACE\x10\x00\x12\x0e\n\nREPLACE_BZ\x10\x01\x12\x0c\n\x04MOVE\x10\x02\x1a\x02\x08\x01\x12\x0e\n\x06\x42SDIFF\x10\x03\x1a\x02\x08\x01\x12\x0f\n\x0bSOURCE_COPY\x10\x04\x12\x11\n\rSOURCE_BSDIFF\x10\x05\x12\x0e\n\nREPLACE_XZ\x10\x08\x12\x08\n\x04ZERO\x10\x06\x12\x0b\n\x07\x44ISCARD\x10\x07\x12\x11\n\rBROTLI_BSDIFF\x10\n\x12\x0c\n\x08PUFFDIFF\x10\t\"\xcf\x01\n\x11\x43owMergeOperation\x12<\n\x04type\x18\x01 \x01(\x0e\x32..chromeos_update_engine.CowMergeOperation.Type\x12\x32\n\nsrc_extent\x18\x02 \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x32\n\ndst_extent\x18\x03 \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\"\x14\n\x04Type\x12\x0c\n\x08\x43OW_COPY\x10\x00\"\xc8\x06\n\x0fPartitionUpdate\x12\x16\n\x0epartition_name\x18\x01 \x02(\t\x12\x17\n\x0frun_postinstall\x18\x02 \x01(\x08\x12\x18\n\x10postinstall_path\x18\x03 \x01(\t\x12\x17\n\x0f\x66ilesystem_type\x18\x04 \x01(\t\x12M\n\x17new_partition_signature\x18\x05 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x12\x41\n\x12old_partition_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12\x41\n\x12new_partition_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12<\n\noperations\x18\x08 \x03(\x0b\x32(.chromeos_update_engine.InstallOperation\x12\x1c\n\x14postinstall_optional\x18\t \x01(\x08\x12=\n\x15hash_tree_data_extent\x18\n \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x38\n\x10hash_tree_extent\x18\x0b \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x1b\n\x13hash_tree_algorithm\x18\x0c \x01(\t\x12\x16\n\x0ehash_tree_salt\x18\r \x01(\x0c\x12\x37\n\x0f\x66\x65\x63_data_extent\x18\x0e \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x32\n\nfec_extent\x18\x0f \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x14\n\tfec_roots\x18\x10 \x01(\r:\x01\x32\x12\x0f\n\x07version\x18\x11 \x01(\t\x12\x43\n\x10merge_operations\x18\x12 \x03(\x0b\x32).chromeos_update_engine.CowMergeOperation\x12\x19\n\x11\x65stimate_cow_size\x18\x13 \x01(\x04\"L\n\x15\x44ynamicPartitionGroup\x12\x0c\n\x04name\x18\x01 \x02(\t\x12\x0c\n\x04size\x18\x02 \x01(\x04\x12\x17\n\x0fpartition_names\x18\x03 \x03(\t\"s\n\x18\x44ynamicPartitionMetadata\x12=\n\x06groups\x18\x01 \x03(\x0b\x32-.chromeos_update_engine.DynamicPartitionGroup\x12\x18\n\x10snapshot_enabled\x18\x02 \x01(\x08\"\xe1\x06\n\x14\x44\x65ltaArchiveManifest\x12H\n\x12install_operations\x18\x01 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12O\n\x19kernel_install_operations\x18\x02 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12\x18\n\nblock_size\x18\x03 \x01(\r:\x04\x34\x30\x39\x36\x12\x19\n\x11signatures_offset\x18\x04 \x01(\x04\x12\x17\n\x0fsignatures_size\x18\x05 \x01(\x04\x12\x42\n\x0fold_kernel_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_kernel_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fold_rootfs_info\x18\x08 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_rootfs_info\x18\t \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x39\n\x0eold_image_info\x18\n \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x39\n\x0enew_image_info\x18\x0b \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x18\n\rminor_version\x18\x0c \x01(\r:\x01\x30\x12;\n\npartitions\x18\r \x03(\x0b\x32\'.chromeos_update_engine.PartitionUpdate\x12\x15\n\rmax_timestamp\x18\x0e \x01(\x03\x12T\n\x1a\x64ynamic_partition_metadata\x18\x0f \x01(\x0b\x32\x30.chromeos_update_engine.DynamicPartitionMetadata\x12\x16\n\x0epartial_update\x18\x10 \x01(\x08\x42\x02H\x03'
 )
 
 
@@ -81,6 +81,24 @@
 )
 _sym_db.RegisterEnumDescriptor(_INSTALLOPERATION_TYPE)
 
+_COWMERGEOPERATION_TYPE = _descriptor.EnumDescriptor(
+  name='Type',
+  full_name='chromeos_update_engine.CowMergeOperation.Type',
+  filename=None,
+  file=DESCRIPTOR,
+  values=[
+    _descriptor.EnumValueDescriptor(
+      name='COW_COPY', index=0, number=0,
+      serialized_options=None,
+      type=None),
+  ],
+  containing_type=None,
+  serialized_options=None,
+  serialized_start=1113,
+  serialized_end=1133,
+)
+_sym_db.RegisterEnumDescriptor(_COWMERGEOPERATION_TYPE)
+
 
 _EXTENT = _descriptor.Descriptor(
   name='Extent',
@@ -387,6 +405,52 @@
 )
 
 
+_COWMERGEOPERATION = _descriptor.Descriptor(
+  name='CowMergeOperation',
+  full_name='chromeos_update_engine.CowMergeOperation',
+  filename=None,
+  file=DESCRIPTOR,
+  containing_type=None,
+  fields=[
+    _descriptor.FieldDescriptor(
+      name='type', full_name='chromeos_update_engine.CowMergeOperation.type', index=0,
+      number=1, type=14, cpp_type=8, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='src_extent', full_name='chromeos_update_engine.CowMergeOperation.src_extent', index=1,
+      number=2, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='dst_extent', full_name='chromeos_update_engine.CowMergeOperation.dst_extent', index=2,
+      number=3, type=11, cpp_type=10, label=1,
+      has_default_value=False, default_value=None,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+  ],
+  extensions=[
+  ],
+  nested_types=[],
+  enum_types=[
+    _COWMERGEOPERATION_TYPE,
+  ],
+  serialized_options=None,
+  is_extendable=False,
+  syntax='proto2',
+  extension_ranges=[],
+  oneofs=[
+  ],
+  serialized_start=926,
+  serialized_end=1133,
+)
+
+
 _PARTITIONUPDATE = _descriptor.Descriptor(
   name='PartitionUpdate',
   full_name='chromeos_update_engine.PartitionUpdate',
@@ -513,6 +577,20 @@
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='merge_operations', full_name='chromeos_update_engine.PartitionUpdate.merge_operations', index=17,
+      number=18, type=11, cpp_type=10, label=3,
+      has_default_value=False, default_value=[],
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='estimate_cow_size', full_name='chromeos_update_engine.PartitionUpdate.estimate_cow_size', index=18,
+      number=19, type=4, cpp_type=4, label=1,
+      has_default_value=False, default_value=0,
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -525,8 +603,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=926,
-  serialized_end=1670,
+  serialized_start=1136,
+  serialized_end=1976,
 )
 
 
@@ -570,8 +648,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1672,
-  serialized_end=1748,
+  serialized_start=1978,
+  serialized_end=2054,
 )
 
 
@@ -608,8 +686,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1750,
-  serialized_end=1865,
+  serialized_start=2056,
+  serialized_end=2171,
 )
 
 
@@ -744,8 +822,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1868,
-  serialized_end=2733,
+  serialized_start=2174,
+  serialized_end=3039,
 )
 
 _SIGNATURES_SIGNATURE.containing_type = _SIGNATURES
@@ -754,6 +832,10 @@
 _INSTALLOPERATION.fields_by_name['src_extents'].message_type = _EXTENT
 _INSTALLOPERATION.fields_by_name['dst_extents'].message_type = _EXTENT
 _INSTALLOPERATION_TYPE.containing_type = _INSTALLOPERATION
+_COWMERGEOPERATION.fields_by_name['type'].enum_type = _COWMERGEOPERATION_TYPE
+_COWMERGEOPERATION.fields_by_name['src_extent'].message_type = _EXTENT
+_COWMERGEOPERATION.fields_by_name['dst_extent'].message_type = _EXTENT
+_COWMERGEOPERATION_TYPE.containing_type = _COWMERGEOPERATION
 _PARTITIONUPDATE.fields_by_name['new_partition_signature'].message_type = _SIGNATURES_SIGNATURE
 _PARTITIONUPDATE.fields_by_name['old_partition_info'].message_type = _PARTITIONINFO
 _PARTITIONUPDATE.fields_by_name['new_partition_info'].message_type = _PARTITIONINFO
@@ -762,6 +844,7 @@
 _PARTITIONUPDATE.fields_by_name['hash_tree_extent'].message_type = _EXTENT
 _PARTITIONUPDATE.fields_by_name['fec_data_extent'].message_type = _EXTENT
 _PARTITIONUPDATE.fields_by_name['fec_extent'].message_type = _EXTENT
+_PARTITIONUPDATE.fields_by_name['merge_operations'].message_type = _COWMERGEOPERATION
 _DYNAMICPARTITIONMETADATA.fields_by_name['groups'].message_type = _DYNAMICPARTITIONGROUP
 _DELTAARCHIVEMANIFEST.fields_by_name['install_operations'].message_type = _INSTALLOPERATION
 _DELTAARCHIVEMANIFEST.fields_by_name['kernel_install_operations'].message_type = _INSTALLOPERATION
@@ -778,6 +861,7 @@
 DESCRIPTOR.message_types_by_name['PartitionInfo'] = _PARTITIONINFO
 DESCRIPTOR.message_types_by_name['ImageInfo'] = _IMAGEINFO
 DESCRIPTOR.message_types_by_name['InstallOperation'] = _INSTALLOPERATION
+DESCRIPTOR.message_types_by_name['CowMergeOperation'] = _COWMERGEOPERATION
 DESCRIPTOR.message_types_by_name['PartitionUpdate'] = _PARTITIONUPDATE
 DESCRIPTOR.message_types_by_name['DynamicPartitionGroup'] = _DYNAMICPARTITIONGROUP
 DESCRIPTOR.message_types_by_name['DynamicPartitionMetadata'] = _DYNAMICPARTITIONMETADATA
@@ -827,6 +911,13 @@
   })
 _sym_db.RegisterMessage(InstallOperation)
 
+CowMergeOperation = _reflection.GeneratedProtocolMessageType('CowMergeOperation', (_message.Message,), {
+  'DESCRIPTOR' : _COWMERGEOPERATION,
+  '__module__' : 'update_metadata_pb2'
+  # @@protoc_insertion_point(class_scope:chromeos_update_engine.CowMergeOperation)
+  })
+_sym_db.RegisterMessage(CowMergeOperation)
+
 PartitionUpdate = _reflection.GeneratedProtocolMessageType('PartitionUpdate', (_message.Message,), {
   'DESCRIPTOR' : _PARTITIONUPDATE,
   '__module__' : 'update_metadata_pb2'
diff --git a/stable/Android.bp b/stable/Android.bp
index 337ae96..a415ac5 100644
--- a/stable/Android.bp
+++ b/stable/Android.bp
@@ -18,6 +18,13 @@
 // ========================================================
 aidl_interface {
     name: "libupdate_engine_stable",
+
+    // This header library is available to core and product modules.
+    // Right now, vendor_available is the only way to specify this.
+    // vendor modules should NOT use this library.
+    // TODO(b/150902910): change this to product_available.
+    vendor_available: true,
+
     srcs: [
         "android/os/IUpdateEngineStable.aidl",
         "android/os/IUpdateEngineStableCallback.aidl",
@@ -40,10 +47,10 @@
 
 // update_engine_stable_client (type: executable)
 // ========================================================
-// update_engine console client installed to APEXes
+// update_engine console client installed to APEXes.
 cc_binary {
     name: "update_engine_stable_client",
-
+    product_specific: true,
     header_libs: [
         "libupdate_engine_headers",
     ],
diff --git a/stable/update_engine_stable_client.cc b/stable/update_engine_stable_client.cc
index da203c4..17f66b6 100644
--- a/stable/update_engine_stable_client.cc
+++ b/stable/update_engine_stable_client.cc
@@ -32,7 +32,6 @@
 #include <android/binder_ibinder.h>
 #include <common/error_code.h>
 #include <gflags/gflags.h>
-#include <utils/StrongPointer.h>
 
 namespace chromeos_update_engine::internal {
 
diff --git a/update_metadata.proto b/update_metadata.proto
index 99bfa84..452b89d 100644
--- a/update_metadata.proto
+++ b/update_metadata.proto
@@ -314,6 +314,11 @@
   // skip writing the raw bytes for these extents. During snapshot merge, the
   // bytes will read from the source partitions instead.
   repeated CowMergeOperation merge_operations = 18;
+
+  // Estimated size for COW image. This is used by libsnapshot
+  // as a hint. If set to 0, libsnapshot should use alternative
+  // methods for estimating size.
+  optional uint64 estimate_cow_size = 19;
 }
 
 message DynamicPartitionGroup {