Skip ab/6749736 in stage.

Merged-In: I5ddd6573d86a554f9f0eee041490e748affd2b4d
Change-Id: If4ed3cde37bb358325db8925911dfc328ee8a445
diff --git a/Android.bp b/Android.bp
index 1076c52..9187e67 100644
--- a/Android.bp
+++ b/Android.bp
@@ -178,6 +178,7 @@
         "payload_consumer/payload_constants.cc",
         "payload_consumer/payload_metadata.cc",
         "payload_consumer/payload_verifier.cc",
+        "payload_consumer/partition_writer.cc",
         "payload_consumer/postinstall_runner_action.cc",
         "payload_consumer/verity_writer_android.cc",
         "payload_consumer/xz_extent_writer.cc",
@@ -260,6 +261,7 @@
     ],
 
     static_libs: [
+        "gkiprops",
         "libpayload_consumer",
         "libupdate_engine_boot_control",
     ],
@@ -306,7 +308,6 @@
         "hardware_android.cc",
         "libcurl_http_fetcher.cc",
         "logging_android.cc",
-        "metrics_reporter_android.cc",
         "metrics_utils.cc",
         "network_selector_android.cc",
         "update_attempter_android.cc",
@@ -331,7 +332,7 @@
         "otacerts",
     ],
 
-    srcs: ["main.cc"],
+    srcs: ["main.cc", "metrics_reporter_android.cc"],
     init_rc: ["update_engine.rc"],
 }
 
@@ -383,6 +384,7 @@
         // We add the static versions of the shared libraries that are not installed to
         // recovery image due to size concerns. Need to include all the static library
         // dependencies of these static libraries.
+        "gkiprops",
         "libevent",
         "libmodpb64",
         "libgtest_prod",
@@ -470,6 +472,20 @@
 }
 
 cc_library_static {
+    name: "libpayload_extent_ranges",
+    defaults: [
+        "ue_defaults",
+    ],
+    host_supported: true,
+    srcs: [
+        "payload_generator/extent_ranges.cc",
+    ],
+    static_libs: [
+        "update_metadata-protos",
+    ],
+}
+
+cc_library_static {
     name: "libpayload_generator",
     defaults: [
         "ue_defaults",
@@ -492,6 +508,7 @@
         "payload_generator/extent_utils.cc",
         "payload_generator/full_update_generator.cc",
         "payload_generator/mapfile_filesystem.cc",
+        "payload_generator/merge_sequence_generator.cc",
         "payload_generator/payload_file.cc",
         "payload_generator/payload_generation_config_android.cc",
         "payload_generator/payload_generation_config.cc",
@@ -673,6 +690,7 @@
         "payload_consumer/certificate_parser_android_unittest.cc",
         "payload_consumer/delta_performer_integration_test.cc",
         "payload_consumer/delta_performer_unittest.cc",
+        "payload_consumer/partition_writer_unittest.cc",
         "payload_consumer/download_action_android_unittest.cc",
         "payload_consumer/extent_reader_unittest.cc",
         "payload_consumer/extent_writer_unittest.cc",
@@ -696,6 +714,7 @@
         "payload_generator/fake_filesystem.cc",
         "payload_generator/full_update_generator_unittest.cc",
         "payload_generator/mapfile_filesystem_unittest.cc",
+        "payload_generator/merge_sequence_generator_unittest.cc",
         "payload_generator/payload_file_unittest.cc",
         "payload_generator/payload_generation_config_android_unittest.cc",
         "payload_generator/payload_generation_config_unittest.cc",
@@ -706,17 +725,18 @@
         "testrunner.cc",
         "update_attempter_android_unittest.cc",
         "update_status_utils_unittest.cc",
+        "metrics_reporter_stub.cc",
     ],
 }
 
 // Brillo update payload generation script
 // ========================================================
-cc_prebuilt_binary {
+sh_binary {
     name: "brillo_update_payload",
     device_supported: false,
     host_supported: true,
 
-    srcs: ["scripts/brillo_update_payload"],
+    src: "scripts/brillo_update_payload",
     required: [
         "delta_generator",
         "shflags",
@@ -733,8 +753,25 @@
 // update_engine header library
 cc_library_headers {
     name: "libupdate_engine_headers",
+
+    // This header library is available to core and product modules.
+    // Right now, vendor_available is the only way to specify this.
+    // vendor modules should NOT use this library.
+    // TODO(b/150902910): change this to product_available.
+    vendor_available: true,
+
     export_include_dirs: ["."],
     apex_available: [
         "com.android.gki.*",
+        "//apex_available:platform",
     ],
+    host_supported: true,
+    recovery_available: true,
+    ramdisk_available: true,
+
+    target: {
+        darwin: {
+            enabled: false,
+        },
+    }
 }
diff --git a/BUILD.gn b/BUILD.gn
index e438af4..b7de9fc 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -133,6 +133,7 @@
     "common/clock.cc",
     "common/constants.cc",
     "common/cpu_limiter.cc",
+    "common/dynamic_partition_control_stub.cc",
     "common/error_code_utils.cc",
     "common/hash_calculator.cc",
     "common/http_common.cc",
@@ -147,6 +148,7 @@
     "common/utils.cc",
     "payload_consumer/bzip_extent_writer.cc",
     "payload_consumer/cached_file_descriptor.cc",
+    "payload_consumer/certificate_parser_stub.cc",
     "payload_consumer/delta_performer.cc",
     "payload_consumer/download_action.cc",
     "payload_consumer/extent_reader.cc",
@@ -157,6 +159,7 @@
     "payload_consumer/filesystem_verifier_action.cc",
     "payload_consumer/install_plan.cc",
     "payload_consumer/mount_history.cc",
+    "payload_consumer/partition_update_generator_stub.cc",
     "payload_consumer/payload_constants.cc",
     "payload_consumer/payload_metadata.cc",
     "payload_consumer/payload_verifier.cc",
@@ -200,6 +203,7 @@
     "hardware_chromeos.cc",
     "image_properties_chromeos.cc",
     "libcurl_http_fetcher.cc",
+    "logging.cc",
     "metrics_reporter_omaha.cc",
     "metrics_utils.cc",
     "omaha_request_action.cc",
@@ -332,7 +336,7 @@
     "payload_generator/annotated_operation.cc",
     "payload_generator/blob_file_writer.cc",
     "payload_generator/block_mapping.cc",
-    "payload_generator/boot_img_filesystem.cc",
+    "payload_generator/boot_img_filesystem_stub.cc",
     "payload_generator/bzip.cc",
     "payload_generator/deflate_utils.cc",
     "payload_generator/delta_diff_generator.cc",
@@ -342,6 +346,7 @@
     "payload_generator/extent_utils.cc",
     "payload_generator/full_update_generator.cc",
     "payload_generator/mapfile_filesystem.cc",
+    "payload_generator/merge_sequence_generator.cc",
     "payload_generator/payload_file.cc",
     "payload_generator/payload_generation_config.cc",
     "payload_generator/payload_generation_config_chromeos.cc",
@@ -493,7 +498,6 @@
       "payload_generator/ab_generator_unittest.cc",
       "payload_generator/blob_file_writer_unittest.cc",
       "payload_generator/block_mapping_unittest.cc",
-      "payload_generator/boot_img_filesystem_unittest.cc",
       "payload_generator/deflate_utils_unittest.cc",
       "payload_generator/delta_diff_utils_unittest.cc",
       "payload_generator/ext2_filesystem_unittest.cc",
@@ -501,6 +505,7 @@
       "payload_generator/extent_utils_unittest.cc",
       "payload_generator/full_update_generator_unittest.cc",
       "payload_generator/mapfile_filesystem_unittest.cc",
+      "payload_generator/merge_sequence_generator_unittest.cc",
       "payload_generator/payload_file_unittest.cc",
       "payload_generator/payload_generation_config_unittest.cc",
       "payload_generator/payload_properties_unittest.cc",
diff --git a/Doxyfile b/Doxyfile
new file mode 100644
index 0000000..db31f86
--- /dev/null
+++ b/Doxyfile
@@ -0,0 +1,9 @@
+CLANG_DATABASE_PATH=../../
+HAVE_DOT=YES
+CALL_GRAPH=YES
+CALLER_GRAPH=YES
+GENERATE_HTML=YES
+GENERATE_LATEX=NO
+INPUT=.
+RECURSIVE=YES
+
diff --git a/cleanup_previous_update_action.cc b/cleanup_previous_update_action.cc
index 1a2476f..89ed6f8 100644
--- a/cleanup_previous_update_action.cc
+++ b/cleanup_previous_update_action.cc
@@ -67,30 +67,28 @@
       last_percentage_(0),
       merge_stats_(nullptr) {}
 
+CleanupPreviousUpdateAction::~CleanupPreviousUpdateAction() {
+  StopActionInternal();
+}
+
 void CleanupPreviousUpdateAction::PerformAction() {
-  ResumeAction();
+  StartActionInternal();
 }
 
 void CleanupPreviousUpdateAction::TerminateProcessing() {
-  SuspendAction();
+  StopActionInternal();
 }
 
 void CleanupPreviousUpdateAction::ResumeAction() {
-  CHECK(prefs_);
-  CHECK(boot_control_);
-
-  LOG(INFO) << "Starting/resuming CleanupPreviousUpdateAction";
-  running_ = true;
   StartActionInternal();
 }
 
 void CleanupPreviousUpdateAction::SuspendAction() {
-  LOG(INFO) << "Stopping/suspending CleanupPreviousUpdateAction";
-  running_ = false;
+  StopActionInternal();
 }
 
 void CleanupPreviousUpdateAction::ActionCompleted(ErrorCode error_code) {
-  running_ = false;
+  StopActionInternal();
   ReportMergeStats();
   metadata_device_ = nullptr;
 }
@@ -103,7 +101,52 @@
   return "CleanupPreviousUpdateAction";
 }
 
+// This function is called at the beginning of all delayed functions. By
+// resetting |scheduled_task_|, the delayed function acknowledges that the task
+// has already been executed, therefore there's no need to cancel it in the
+// future. This avoids StopActionInternal() from resetting task IDs in an
+// unexpected way because task IDs could be reused.
+void CleanupPreviousUpdateAction::AcknowledgeTaskExecuted() {
+  if (scheduled_task_ != MessageLoop::kTaskIdNull) {
+    LOG(INFO) << "Executing task " << scheduled_task_;
+  }
+  scheduled_task_ = MessageLoop::kTaskIdNull;
+}
+
+// Check that scheduled_task_ is a valid task ID. Otherwise, terminate the
+// action.
+void CleanupPreviousUpdateAction::CheckTaskScheduled(std::string_view name) {
+  if (scheduled_task_ == MessageLoop::kTaskIdNull) {
+    LOG(ERROR) << "Unable to schedule " << name;
+    processor_->ActionComplete(this, ErrorCode::kError);
+  } else {
+    LOG(INFO) << "CleanupPreviousUpdateAction scheduled task ID "
+              << scheduled_task_ << " for " << name;
+  }
+}
+
+void CleanupPreviousUpdateAction::StopActionInternal() {
+  LOG(INFO) << "Stopping/suspending/completing CleanupPreviousUpdateAction";
+  running_ = false;
+
+  if (scheduled_task_ != MessageLoop::kTaskIdNull) {
+    if (MessageLoop::current()->CancelTask(scheduled_task_)) {
+      LOG(INFO) << "CleanupPreviousUpdateAction cancelled pending task ID "
+                << scheduled_task_;
+    } else {
+      LOG(ERROR) << "CleanupPreviousUpdateAction unable to cancel task ID "
+                 << scheduled_task_;
+    }
+  }
+  scheduled_task_ = MessageLoop::kTaskIdNull;
+}
+
 void CleanupPreviousUpdateAction::StartActionInternal() {
+  CHECK(prefs_);
+  CHECK(boot_control_);
+
+  LOG(INFO) << "Starting/resuming CleanupPreviousUpdateAction";
+  running_ = true;
   // Do nothing on non-VAB device.
   if (!boot_control_->GetDynamicPartitionControl()
            ->GetVirtualAbFeatureFlag()
@@ -120,14 +163,16 @@
 
 void CleanupPreviousUpdateAction::ScheduleWaitBootCompleted() {
   TEST_AND_RETURN(running_);
-  MessageLoop::current()->PostDelayedTask(
+  scheduled_task_ = MessageLoop::current()->PostDelayedTask(
       FROM_HERE,
       base::Bind(&CleanupPreviousUpdateAction::WaitBootCompletedOrSchedule,
                  base::Unretained(this)),
       kCheckBootCompletedInterval);
+  CheckTaskScheduled("WaitBootCompleted");
 }
 
 void CleanupPreviousUpdateAction::WaitBootCompletedOrSchedule() {
+  AcknowledgeTaskExecuted();
   TEST_AND_RETURN(running_);
   if (!kIsRecovery &&
       !android::base::GetBoolProperty(kBootCompletedProp, false)) {
@@ -142,15 +187,17 @@
 
 void CleanupPreviousUpdateAction::ScheduleWaitMarkBootSuccessful() {
   TEST_AND_RETURN(running_);
-  MessageLoop::current()->PostDelayedTask(
+  scheduled_task_ = MessageLoop::current()->PostDelayedTask(
       FROM_HERE,
       base::Bind(
           &CleanupPreviousUpdateAction::CheckSlotMarkedSuccessfulOrSchedule,
           base::Unretained(this)),
       kCheckSlotMarkedSuccessfulInterval);
+  CheckTaskScheduled("WaitMarkBootSuccessful");
 }
 
 void CleanupPreviousUpdateAction::CheckSlotMarkedSuccessfulOrSchedule() {
+  AcknowledgeTaskExecuted();
   TEST_AND_RETURN(running_);
   if (!kIsRecovery &&
       !boot_control_->IsSlotMarkedSuccessful(boot_control_->GetCurrentSlot())) {
@@ -212,14 +259,16 @@
 
 void CleanupPreviousUpdateAction::ScheduleWaitForMerge() {
   TEST_AND_RETURN(running_);
-  MessageLoop::current()->PostDelayedTask(
+  scheduled_task_ = MessageLoop::current()->PostDelayedTask(
       FROM_HERE,
       base::Bind(&CleanupPreviousUpdateAction::WaitForMergeOrSchedule,
                  base::Unretained(this)),
       kWaitForMergeInterval);
+  CheckTaskScheduled("WaitForMerge");
 }
 
 void CleanupPreviousUpdateAction::WaitForMergeOrSchedule() {
+  AcknowledgeTaskExecuted();
   TEST_AND_RETURN(running_);
   auto state = snapshot_->ProcessUpdateState(
       std::bind(&CleanupPreviousUpdateAction::OnMergePercentageUpdate, this),
diff --git a/cleanup_previous_update_action.h b/cleanup_previous_update_action.h
index 6f6ce07..fe65e60 100644
--- a/cleanup_previous_update_action.h
+++ b/cleanup_previous_update_action.h
@@ -20,6 +20,7 @@
 #include <chrono>  // NOLINT(build/c++11) -- for merge times
 #include <memory>
 #include <string>
+#include <string_view>
 
 #include <brillo/message_loops/message_loop.h>
 #include <libsnapshot/snapshot.h>
@@ -51,6 +52,7 @@
       BootControlInterface* boot_control,
       android::snapshot::ISnapshotManager* snapshot,
       CleanupPreviousUpdateActionDelegateInterface* delegate);
+  ~CleanupPreviousUpdateAction();
 
   void PerformAction() override;
   void SuspendAction() override;
@@ -74,7 +76,13 @@
   bool cancel_failed_{false};
   unsigned int last_percentage_{0};
   android::snapshot::ISnapshotMergeStats* merge_stats_;
+  brillo::MessageLoop::TaskId scheduled_task_{brillo::MessageLoop::kTaskIdNull};
 
+  // Helpers for task management.
+  void AcknowledgeTaskExecuted();
+  void CheckTaskScheduled(std::string_view name);
+
+  void StopActionInternal();
   void StartActionInternal();
   void ScheduleWaitBootCompleted();
   void WaitBootCompletedOrSchedule();
diff --git a/common/dynamic_partition_control_interface.h b/common/dynamic_partition_control_interface.h
index 7c2d0b0..22f6db8 100644
--- a/common/dynamic_partition_control_interface.h
+++ b/common/dynamic_partition_control_interface.h
@@ -56,6 +56,8 @@
 
   // Return the feature flags of Virtual A/B on this device.
   virtual FeatureFlag GetVirtualAbFeatureFlag() = 0;
+  // Return the feature flags of Virtual A/B Compression on this device.
+  virtual FeatureFlag GetVirtualAbCompressionFeatureFlag() = 0;
 
   // Attempt to optimize |operation|.
   // If successful, |optimized| contains an operation with extents that
diff --git a/common/dynamic_partition_control_stub.cc b/common/dynamic_partition_control_stub.cc
index 5a8ca43..c63a8ff 100644
--- a/common/dynamic_partition_control_stub.cc
+++ b/common/dynamic_partition_control_stub.cc
@@ -33,6 +33,10 @@
   return FeatureFlag(FeatureFlag::Value::NONE);
 }
 
+FeatureFlag DynamicPartitionControlStub::GetVirtualAbCompressionFeatureFlag() {
+  return FeatureFlag(FeatureFlag::Value::NONE);
+}
+
 bool DynamicPartitionControlStub::OptimizeOperation(
     const std::string& partition_name,
     const InstallOperation& operation,
diff --git a/common/dynamic_partition_control_stub.h b/common/dynamic_partition_control_stub.h
index 94dba1b..8bff474 100644
--- a/common/dynamic_partition_control_stub.h
+++ b/common/dynamic_partition_control_stub.h
@@ -31,6 +31,7 @@
  public:
   FeatureFlag GetDynamicPartitionsFeatureFlag() override;
   FeatureFlag GetVirtualAbFeatureFlag() override;
+  FeatureFlag GetVirtualAbCompressionFeatureFlag() override;
   bool OptimizeOperation(const std::string& partition_name,
                          const InstallOperation& operation,
                          InstallOperation* optimized) override;
diff --git a/common/fake_boot_control.h b/common/fake_boot_control.h
index adbacd6..5d8823a 100644
--- a/common/fake_boot_control.h
+++ b/common/fake_boot_control.h
@@ -57,6 +57,9 @@
     if (part_it == devices_[slot].end())
       return false;
     *device = part_it->second;
+    if (is_dynamic != nullptr) {
+      *is_dynamic = false;
+    }
     return true;
   }
 
diff --git a/common/fake_hardware.h b/common/fake_hardware.h
index 2a8e81d..82382ff 100644
--- a/common/fake_hardware.h
+++ b/common/fake_hardware.h
@@ -19,10 +19,13 @@
 
 #include <map>
 #include <string>
+#include <utility>
 
 #include <base/time/time.h>
 
+#include "update_engine/common/error_code.h"
 #include "update_engine/common/hardware_interface.h"
+#include "update_engine/common/utils.h"
 
 namespace chromeos_update_engine {
 
@@ -199,7 +202,7 @@
     build_timestamp_ = build_timestamp;
   }
 
-  void SetWarmReset(bool warm_reset) { warm_reset_ = warm_reset; }
+  void SetWarmReset(bool warm_reset) override { warm_reset_ = warm_reset; }
 
   // Getters to verify state.
   int GetMaxKernelKeyRollforward() const { return kernel_max_rollforward_; }
@@ -207,6 +210,19 @@
   bool GetIsRollbackPowerwashScheduled() const {
     return powerwash_scheduled_ && save_rollback_data_;
   }
+  std::string GetVersionForLogging(
+      const std::string& partition_name) const override {
+    return partition_timestamps_[partition_name];
+  }
+  void SetVersion(const std::string& partition_name, std::string timestamp) {
+    partition_timestamps_[partition_name] = std::move(timestamp);
+  }
+  ErrorCode IsPartitionUpdateValid(
+      const std::string& partition_name,
+      const std::string& new_version) const override {
+    const auto old_version = GetVersionForLogging(partition_name);
+    return utils::IsTimestampNewer(old_version, new_version);
+  }
 
  private:
   bool is_official_build_{true};
@@ -230,6 +246,7 @@
   int64_t build_timestamp_{0};
   bool first_active_omaha_ping_sent_{false};
   bool warm_reset_{false};
+  mutable std::map<std::string, std::string> partition_timestamps_;
 
   DISALLOW_COPY_AND_ASSIGN(FakeHardware);
 };
diff --git a/common/hardware_interface.h b/common/hardware_interface.h
index 4f0305f..b37b007 100644
--- a/common/hardware_interface.h
+++ b/common/hardware_interface.h
@@ -25,6 +25,8 @@
 #include <base/files/file_path.h>
 #include <base/time/time.h>
 
+#include "update_engine/common/error_code.h"
+
 namespace chromeos_update_engine {
 
 // The hardware interface allows access to the crossystem exposed properties,
@@ -142,6 +144,26 @@
   // If |warm_reset| is true, sets the warm reset to indicate a warm reset is
   // needed on the next reboot. Otherwise, clears the flag.
   virtual void SetWarmReset(bool warm_reset) = 0;
+
+  // Return the version/timestamp for partition `partition_name`.
+  // Don't make any assumption about the formatting of returned string.
+  // Only used for logging/debugging purposes.
+  virtual std::string GetVersionForLogging(
+      const std::string& partition_name) const = 0;
+
+  // Return true if and only if `new_version` is "newer" than the
+  // version number of partition `partition_name`. The notion of
+  // "newer" is defined by this function. Caller should not make
+  // any assumption about the underlying logic.
+  // Return:
+  // - kSuccess if update is valid.
+  // - kPayloadTimestampError if downgrade is detected
+  // - kDownloadManifestParseError if |new_version| has an incorrect format
+  // - Other error values if the source of error is known, or kError for
+  //   a generic error on the device.
+  virtual ErrorCode IsPartitionUpdateValid(
+      const std::string& partition_name,
+      const std::string& new_version) const = 0;
 };
 
 }  // namespace chromeos_update_engine
diff --git a/common/http_fetcher_unittest.cc b/common/http_fetcher_unittest.cc
index 589579e..9338087 100644
--- a/common/http_fetcher_unittest.cc
+++ b/common/http_fetcher_unittest.cc
@@ -37,7 +37,11 @@
 #include <brillo/message_loops/base_message_loop.h>
 #include <brillo/message_loops/message_loop.h>
 #include <brillo/message_loops/message_loop_utils.h>
+#ifdef __CHROMEOS__
+#include <brillo/process/process.h>
+#else
 #include <brillo/process.h>
+#endif  // __CHROMEOS__
 #include <brillo/streams/file_stream.h>
 #include <brillo/streams/stream.h>
 #include <gtest/gtest.h>
diff --git a/common/subprocess.cc b/common/subprocess.cc
index 3e197fb..023017b 100644
--- a/common/subprocess.cc
+++ b/common/subprocess.cc
@@ -32,7 +32,6 @@
 #include <base/stl_util.h>
 #include <base/strings/string_util.h>
 #include <base/strings/stringprintf.h>
-#include <brillo/process.h>
 #include <brillo/secure_blob.h>
 
 #include "update_engine/common/utils.h"
diff --git a/common/subprocess.h b/common/subprocess.h
index 432d4cb..179a5c5 100644
--- a/common/subprocess.h
+++ b/common/subprocess.h
@@ -30,8 +30,13 @@
 #include <base/macros.h>
 #include <brillo/asynchronous_signal_handler_interface.h>
 #include <brillo/message_loops/message_loop.h>
+#ifdef __CHROMEOS__
+#include <brillo/process/process.h>
+#include <brillo/process/process_reaper.h>
+#else
 #include <brillo/process.h>
 #include <brillo/process_reaper.h>
+#endif  // __CHROMEOS__
 #include <gtest/gtest_prod.h>  // for FRIEND_TEST
 
 // The Subprocess class is a singleton. It's used to spawn off a subprocess
diff --git a/common/subprocess_unittest.cc b/common/subprocess_unittest.cc
index 74fee61..b4d068f 100644
--- a/common/subprocess_unittest.cc
+++ b/common/subprocess_unittest.cc
@@ -75,7 +75,6 @@
   brillo::AsynchronousSignalHandler async_signal_handler_;
   Subprocess subprocess_;
   unique_ptr<base::FileDescriptorWatcher::Controller> watcher_;
-
 };
 
 namespace {
diff --git a/common/utils.cc b/common/utils.cc
index 3e3d830..5d76f3f 100644
--- a/common/utils.cc
+++ b/common/utils.cc
@@ -820,7 +820,7 @@
   return base_code;
 }
 
-string StringVectorToString(const vector<string> &vec_str) {
+string StringVectorToString(const vector<string>& vec_str) {
   string str = "[";
   for (vector<string>::const_iterator i = vec_str.begin(); i != vec_str.end();
        ++i) {
@@ -849,7 +849,7 @@
                             encoded_hash.c_str());
 }
 
-bool ConvertToOmahaInstallDate(Time time, int *out_num_days) {
+bool ConvertToOmahaInstallDate(Time time, int* out_num_days) {
   time_t unix_time = time.ToTimeT();
   // Output of: date +"%s" --date="Jan 1, 2007 0:00 PST".
   const time_t kOmahaEpoch = 1167638400;
@@ -982,6 +982,36 @@
   return base::NumberToString(base::StringPieceHash()(str_to_convert));
 }
 
+static bool ParseTimestamp(const std::string& str, int64_t* out) {
+  if (!base::StringToInt64(str, out)) {
+    LOG(WARNING) << "Invalid timestamp: " << str;
+    return false;
+  }
+  return true;
+}
+
+ErrorCode IsTimestampNewer(const std::string& old_version,
+                           const std::string& new_version) {
+  if (old_version.empty() || new_version.empty()) {
+    LOG(WARNING)
+        << "One of old/new timestamp is empty, permit update anyway. Old: "
+        << old_version << " New: " << new_version;
+    return ErrorCode::kSuccess;
+  }
+  int64_t old_ver = 0;
+  if (!ParseTimestamp(old_version, &old_ver)) {
+    return ErrorCode::kError;
+  }
+  int64_t new_ver = 0;
+  if (!ParseTimestamp(new_version, &new_ver)) {
+    return ErrorCode::kDownloadManifestParseError;
+  }
+  if (old_ver > new_ver) {
+    return ErrorCode::kPayloadTimestampError;
+  }
+  return ErrorCode::kSuccess;
+}
+
 }  // namespace utils
 
 }  // namespace chromeos_update_engine
diff --git a/common/utils.h b/common/utils.h
index 23ac03d..0a1dc0c 100644
--- a/common/utils.h
+++ b/common/utils.h
@@ -323,6 +323,17 @@
 // with |Excluder| as the exclusion name.
 std::string GetExclusionName(const std::string& str_to_convert);
 
+// Parse `old_version` and `new_version` as integer timestamps and
+// Return kSuccess if `new_version` is larger/newer.
+// Return kSuccess if either one is empty.
+// Return kError if |old_version| is not empty and not an integer.
+// Return kDownloadManifestParseError if |new_version| is not empty and not an
+// integer.
+// Return kPayloadTimestampError if both are integers but |new_version| <
+// |old_version|.
+ErrorCode IsTimestampNewer(const std::string& old_version,
+                           const std::string& new_version);
+
 }  // namespace utils
 
 // Utility class to close a file descriptor
diff --git a/common/utils_unittest.cc b/common/utils_unittest.cc
index ebcc548..d73b3da 100644
--- a/common/utils_unittest.cc
+++ b/common/utils_unittest.cc
@@ -481,4 +481,14 @@
   IGNORE_EINTR(close(fd));
 }
 
+TEST(UtilsTest, ValidatePerPartitionTimestamp) {
+  ASSERT_EQ(ErrorCode::kPayloadTimestampError,
+            utils::IsTimestampNewer("10", "5"));
+  ASSERT_EQ(ErrorCode::kSuccess, utils::IsTimestampNewer("10", "11"));
+  ASSERT_EQ(ErrorCode::kDownloadManifestParseError,
+            utils::IsTimestampNewer("10", "lol"));
+  ASSERT_EQ(ErrorCode::kError, utils::IsTimestampNewer("lol", "ZZZ"));
+  ASSERT_EQ(ErrorCode::kSuccess, utils::IsTimestampNewer("10", ""));
+}
+
 }  // namespace chromeos_update_engine
diff --git a/dynamic_partition_control_android.cc b/dynamic_partition_control_android.cc
index aa0f393..c9888ab 100644
--- a/dynamic_partition_control_android.cc
+++ b/dynamic_partition_control_android.cc
@@ -71,6 +71,14 @@
     "ro.boot.dynamic_partitions_retrofit";
 constexpr char kVirtualAbEnabled[] = "ro.virtual_ab.enabled";
 constexpr char kVirtualAbRetrofit[] = "ro.virtual_ab.retrofit";
+constexpr char kVirtualAbCompressionEnabled[] =
+    "ro.virtual_ab.compression.enabled";
+
+// Currently, android doesn't have a retrofit prop for VAB Compression. However,
+// struct FeatureFlag forces us to determine if a feature is 'retrofit'. So this
+// is here just to simplify code. Replace it with real retrofit prop name once
+// there is one.
+constexpr char kVirtualAbCompressionRetrofit[] = "";
 constexpr char kPostinstallFstabPrefix[] = "ro.postinstall.fstab.prefix";
 // Map timeout for dynamic partitions.
 constexpr std::chrono::milliseconds kMapTimeout{1000};
@@ -90,7 +98,9 @@
 
 static FeatureFlag GetFeatureFlag(const char* enable_prop,
                                   const char* retrofit_prop) {
-  bool retrofit = GetBoolProperty(retrofit_prop, false);
+  // Default retrofit to false if retrofit_prop is empty.
+  bool retrofit = retrofit_prop && retrofit_prop[0] != '\0' &&
+                  GetBoolProperty(retrofit_prop, false);
   bool enabled = GetBoolProperty(enable_prop, false);
   if (retrofit && !enabled) {
     LOG(ERROR) << retrofit_prop << " is true but " << enable_prop
@@ -109,7 +119,9 @@
 DynamicPartitionControlAndroid::DynamicPartitionControlAndroid()
     : dynamic_partitions_(
           GetFeatureFlag(kUseDynamicPartitions, kRetrfoitDynamicPartitions)),
-      virtual_ab_(GetFeatureFlag(kVirtualAbEnabled, kVirtualAbRetrofit)) {
+      virtual_ab_(GetFeatureFlag(kVirtualAbEnabled, kVirtualAbRetrofit)),
+      virtual_ab_compression_(GetFeatureFlag(kVirtualAbCompressionEnabled,
+                                             kVirtualAbCompressionRetrofit)) {
   if (GetVirtualAbFeatureFlag().IsEnabled()) {
     snapshot_ = SnapshotManager::New();
   } else {
@@ -126,6 +138,11 @@
   return virtual_ab_;
 }
 
+FeatureFlag
+DynamicPartitionControlAndroid::GetVirtualAbCompressionFeatureFlag() {
+  return virtual_ab_compression_;
+}
+
 bool DynamicPartitionControlAndroid::OptimizeOperation(
     const std::string& partition_name,
     const InstallOperation& operation,
@@ -838,6 +855,11 @@
     MetadataBuilder* builder,
     uint32_t target_slot,
     const DeltaArchiveManifest& manifest) {
+  // Check preconditions.
+  CHECK(!GetVirtualAbFeatureFlag().IsEnabled() || IsRecovery())
+      << "UpdatePartitionMetadata is called on a Virtual A/B device "
+         "but source partitions is not deleted. This is not allowed.";
+
   // If applying downgrade from Virtual A/B to non-Virtual A/B, the left-over
   // COW group needs to be deleted to ensure there are enough space to create
   // target partitions.
@@ -853,7 +875,12 @@
 
   std::string expr;
   uint64_t allocatable_space = builder->AllocatableSpace();
-  if (!GetDynamicPartitionsFeatureFlag().IsRetrofit()) {
+  // On device retrofitting dynamic partitions, allocatable_space = super.
+  // On device launching dynamic partitions w/o VAB,
+  //   allocatable_space = super / 2.
+  // On device launching dynamic partitions with VAB, allocatable_space = super.
+  if (!GetDynamicPartitionsFeatureFlag().IsRetrofit() &&
+      !GetVirtualAbFeatureFlag().IsEnabled()) {
     allocatable_space /= 2;
     expr = "half of ";
   }
diff --git a/dynamic_partition_control_android.h b/dynamic_partition_control_android.h
index 9ee85db..f3805f0 100644
--- a/dynamic_partition_control_android.h
+++ b/dynamic_partition_control_android.h
@@ -36,6 +36,7 @@
   ~DynamicPartitionControlAndroid();
   FeatureFlag GetDynamicPartitionsFeatureFlag() override;
   FeatureFlag GetVirtualAbFeatureFlag() override;
+  FeatureFlag GetVirtualAbCompressionFeatureFlag() override;
   bool OptimizeOperation(const std::string& partition_name,
                          const InstallOperation& operation,
                          InstallOperation* optimized) override;
@@ -203,8 +204,11 @@
                             bool force_writable,
                             std::string* path);
 
-  // Update |builder| according to |partition_metadata|, assuming the device
-  // does not have Virtual A/B.
+  // Update |builder| according to |partition_metadata|.
+  // - In Android mode, this is only called when the device
+  //   does not have Virtual A/B.
+  // - When sideloading, this maybe called as a fallback path if CoW cannot
+  //   be created.
   bool UpdatePartitionMetadata(android::fs_mgr::MetadataBuilder* builder,
                                uint32_t target_slot,
                                const DeltaArchiveManifest& manifest);
@@ -274,6 +278,7 @@
   std::set<std::string> mapped_devices_;
   const FeatureFlag dynamic_partitions_;
   const FeatureFlag virtual_ab_;
+  const FeatureFlag virtual_ab_compression_;
   std::unique_ptr<android::snapshot::ISnapshotManager> snapshot_;
   std::unique_ptr<android::snapshot::AutoDevice> metadata_device_;
   bool target_supports_snapshot_ = false;
diff --git a/dynamic_partition_control_android_unittest.cc b/dynamic_partition_control_android_unittest.cc
index 4154b36..223e177 100644
--- a/dynamic_partition_control_android_unittest.cc
+++ b/dynamic_partition_control_android_unittest.cc
@@ -113,21 +113,24 @@
   // |slot|.
   void SetMetadata(uint32_t slot,
                    const PartitionSuffixSizes& sizes,
-                   uint32_t partition_attr = 0) {
+                   uint32_t partition_attr = 0,
+                   uint64_t super_size = kDefaultSuperSize) {
     EXPECT_CALL(dynamicControl(),
                 LoadMetadataBuilder(GetSuperDevice(slot), slot))
         .Times(AnyNumber())
-        .WillRepeatedly(Invoke([sizes, partition_attr](auto, auto) {
+        .WillRepeatedly(Invoke([=](auto, auto) {
           return NewFakeMetadata(PartitionSuffixSizesToManifest(sizes),
-                                 partition_attr);
+                                 partition_attr,
+                                 super_size);
         }));
 
     EXPECT_CALL(dynamicControl(),
                 LoadMetadataBuilder(GetSuperDevice(slot), slot, _))
         .Times(AnyNumber())
-        .WillRepeatedly(Invoke([sizes, partition_attr](auto, auto, auto) {
+        .WillRepeatedly(Invoke([=](auto, auto, auto) {
           return NewFakeMetadata(PartitionSuffixSizesToManifest(sizes),
-                                 partition_attr);
+                                 partition_attr,
+                                 super_size);
         }));
   }
 
@@ -1006,8 +1009,11 @@
         return dynamicControl().RealPrepareDynamicPartitionsForUpdate(
             source_slot, target_slot, manifest, delete_source);
       }));
+  // Only one slot of space in super
+  uint64_t super_size = kDefaultGroupSize + 1_MiB;
   // Expectation on PrepareDynamicPartitionsForUpdate
-  SetMetadata(source(), {{S("system"), 2_GiB}, {S("vendor"), 1_GiB}});
+  SetMetadata(
+      source(), {{S("system"), 2_GiB}, {S("vendor"), 1_GiB}}, 0, super_size);
   ExpectUnmap({T("system"), T("vendor")});
   // Expect that the source partitions aren't present in target super metadata.
   ExpectStoreMetadata({{T("system"), 3_GiB}, {T("vendor"), 1_GiB}});
diff --git a/dynamic_partition_test_utils.h b/dynamic_partition_test_utils.h
index 70a176b..d701dce 100644
--- a/dynamic_partition_test_utils.h
+++ b/dynamic_partition_test_utils.h
@@ -175,9 +175,11 @@
 }
 
 inline std::unique_ptr<MetadataBuilder> NewFakeMetadata(
-    const DeltaArchiveManifest& manifest, uint32_t partition_attr = 0) {
+    const DeltaArchiveManifest& manifest,
+    uint32_t partition_attr = 0,
+    uint64_t super_size = kDefaultSuperSize) {
   auto builder =
-      MetadataBuilder::New(kDefaultSuperSize, kFakeMetadataSize, kMaxNumSlots);
+      MetadataBuilder::New(super_size, kFakeMetadataSize, kMaxNumSlots);
   for (const auto& group : manifest.dynamic_partition_metadata().groups()) {
     EXPECT_TRUE(builder->AddGroup(group.name(), group.size()));
     for (const auto& partition_name : group.partition_names()) {
diff --git a/hardware_android.cc b/hardware_android.cc
index 0bf05e4..a659bf6 100644
--- a/hardware_android.cc
+++ b/hardware_android.cc
@@ -19,13 +19,19 @@
 #include <sys/types.h>
 
 #include <memory>
+#include <string>
+#include <string_view>
 
+#include <android/sysprop/GkiProperties.sysprop.h>
+#include <android-base/parseint.h>
 #include <android-base/properties.h>
 #include <base/files/file_util.h>
 #include <bootloader_message/bootloader_message.h>
 
+#include "update_engine/common/error_code_utils.h"
 #include "update_engine/common/hardware.h"
 #include "update_engine/common/platform_constants.h"
+#include "update_engine/common/utils.h"
 
 using android::base::GetBoolProperty;
 using android::base::GetIntProperty;
@@ -46,6 +52,24 @@
 const char kPropBootRevision[] = "ro.boot.revision";
 const char kPropBuildDateUTC[] = "ro.build.date.utc";
 
+string GetPartitionBuildDate(const string& partition_name) {
+  return android::base::GetProperty("ro." + partition_name + ".build.date.utc",
+                                    "");
+}
+
+ErrorCode IsTimestampNewerLogged(const std::string& partition_name,
+                                 const std::string& old_version,
+                                 const std::string& new_version) {
+  auto error_code = utils::IsTimestampNewer(old_version, new_version);
+  if (error_code != ErrorCode::kSuccess) {
+    LOG(WARNING) << "Timestamp check failed with "
+                 << utils::ErrorCodeToString(error_code) << ": "
+                 << partition_name << " Partition timestamp: " << old_version
+                 << " Update timestamp: " << new_version;
+  }
+  return error_code;
+}
+
 }  // namespace
 
 namespace hardware {
@@ -223,4 +247,45 @@
   }
 }
 
+string HardwareAndroid::GetVersionForLogging(
+    const string& partition_name) const {
+  if (partition_name == "boot") {
+    // ro.bootimage.build.date.utc
+    return GetPartitionBuildDate("bootimage");
+  }
+  return GetPartitionBuildDate(partition_name);
+}
+
+ErrorCode HardwareAndroid::IsPartitionUpdateValid(
+    const string& partition_name, const string& new_version) const {
+  if (partition_name == "boot") {
+    const auto old_version = GetPartitionBuildDate("bootimage");
+    auto error_code =
+        IsTimestampNewerLogged(partition_name, old_version, new_version);
+    if (error_code == ErrorCode::kPayloadTimestampError) {
+      bool prevent_downgrade =
+          android::sysprop::GkiProperties::prevent_downgrade_version().value_or(
+              false);
+      if (!prevent_downgrade) {
+        LOG(WARNING) << "Downgrade of boot image is detected, but permitting "
+                        "update because device does not prevent boot image "
+                        "downgrade";
+        // If prevent_downgrade_version sysprop is not explicitly set, permit
+        // downgrade in boot image version.
+        // Even though error_code is overridden here, always call
+        // IsTimestampNewerLogged to produce log messages.
+        error_code = ErrorCode::kSuccess;
+      }
+    }
+    return error_code;
+  }
+
+  const auto old_version = GetPartitionBuildDate(partition_name);
+  // TODO(zhangkelvin)  for some partitions, missing a current timestamp should
+  // be an error, e.g. system, vendor, product etc.
+  auto error_code =
+      IsTimestampNewerLogged(partition_name, old_version, new_version);
+  return error_code;
+}
+
 }  // namespace chromeos_update_engine
diff --git a/hardware_android.h b/hardware_android.h
index e0368f9..d8fbbbe 100644
--- a/hardware_android.h
+++ b/hardware_android.h
@@ -18,17 +18,19 @@
 #define UPDATE_ENGINE_HARDWARE_ANDROID_H_
 
 #include <string>
+#include <string_view>
 
 #include <base/macros.h>
 #include <base/time/time.h>
 
+#include "update_engine/common/error_code.h"
 #include "update_engine/common/hardware.h"
 #include "update_engine/common/hardware_interface.h"
 
 namespace chromeos_update_engine {
 
 // Implements the real interface with the hardware in the Android platform.
-class HardwareAndroid final : public HardwareInterface {
+class HardwareAndroid : public HardwareInterface {
  public:
   HardwareAndroid() = default;
   ~HardwareAndroid() override = default;
@@ -58,6 +60,11 @@
   bool GetFirstActiveOmahaPingSent() const override;
   bool SetFirstActiveOmahaPingSent() override;
   void SetWarmReset(bool warm_reset) override;
+  [[nodiscard]] std::string GetVersionForLogging(
+      const std::string& partition_name) const override;
+  [[nodiscard]] ErrorCode IsPartitionUpdateValid(
+      const std::string& partition_name,
+      const std::string& new_version) const override;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(HardwareAndroid);
diff --git a/hardware_chromeos.cc b/hardware_chromeos.cc
index 2a83830..807e086 100644
--- a/hardware_chromeos.cc
+++ b/hardware_chromeos.cc
@@ -384,4 +384,16 @@
 
 void HardwareChromeOS::SetWarmReset(bool warm_reset) {}
 
+std::string HardwareChromeOS::GetVersionForLogging(
+    const std::string& partition_name) const {
+  // TODO(zhangkelvin) Implement per-partition timestamp for Chrome OS.
+  return "";
+}
+
+ErrorCode HardwareChromeOS::IsPartitionUpdateValid(
+    const std::string& partition_name, const std::string& new_version) const {
+  // TODO(zhangkelvin) Implement per-partition timestamp for Chrome OS.
+  return ErrorCode::kSuccess;
+}
+
 }  // namespace chromeos_update_engine
diff --git a/hardware_chromeos.h b/hardware_chromeos.h
index e14ae9a..bbfe273 100644
--- a/hardware_chromeos.h
+++ b/hardware_chromeos.h
@@ -25,6 +25,7 @@
 #include <base/time/time.h>
 #include <debugd/dbus-proxies.h>
 
+#include "update_engine/common/error_code.h"
 #include "update_engine/common/hardware_interface.h"
 
 namespace chromeos_update_engine {
@@ -63,6 +64,11 @@
   bool GetFirstActiveOmahaPingSent() const override;
   bool SetFirstActiveOmahaPingSent() override;
   void SetWarmReset(bool warm_reset) override;
+  std::string GetVersionForLogging(
+      const std::string& partition_name) const override;
+  ErrorCode IsPartitionUpdateValid(
+      const std::string& partition_name,
+      const std::string& new_version) const override;
 
  private:
   friend class HardwareChromeOSTest;
diff --git a/libcurl_http_fetcher.cc b/libcurl_http_fetcher.cc
index f8aed7c..bce0920 100644
--- a/libcurl_http_fetcher.cc
+++ b/libcurl_http_fetcher.cc
@@ -33,7 +33,6 @@
 #include <base/strings/stringprintf.h>
 #include <base/threading/thread_task_runner_handle.h>
 
-
 #ifdef __ANDROID__
 #include <cutils/qtaguid.h>
 #include <private/android_filesystem_config.h>
@@ -464,12 +463,12 @@
     // possible to watch file descriptors. Just poll it later. This usually
     // happens if brillo::FakeMessageLoop is used.
     if (!base::ThreadTaskRunnerHandle::IsSet()) {
-        MessageLoop::current()->PostDelayedTask(
-            FROM_HERE,
-            base::Bind(&LibcurlHttpFetcher::CurlPerformOnce,
-                       base::Unretained(this)),
-            TimeDelta::FromSeconds(1));
-        return;
+      MessageLoop::current()->PostDelayedTask(
+          FROM_HERE,
+          base::Bind(&LibcurlHttpFetcher::CurlPerformOnce,
+                     base::Unretained(this)),
+          TimeDelta::FromSeconds(1));
+      return;
     }
 #endif
     SetupMessageLoopSources();
diff --git a/payload_consumer/certificate_parser_stub.cc b/payload_consumer/certificate_parser_stub.cc
index 95fd6e8..a365ab8 100644
--- a/payload_consumer/certificate_parser_stub.cc
+++ b/payload_consumer/certificate_parser_stub.cc
@@ -14,7 +14,7 @@
 // limitations under the License.
 //
 
-#include <payload_consumer/certificate_parser_stub.h>
+#include "update_engine/payload_consumer/certificate_parser_stub.h"
 
 namespace chromeos_update_engine {
 bool CertificateParserStub::ReadPublicKeysFromCertificates(
diff --git a/payload_consumer/certificate_parser_stub.h b/payload_consumer/certificate_parser_stub.h
index f4f8825..a51c2c6 100644
--- a/payload_consumer/certificate_parser_stub.h
+++ b/payload_consumer/certificate_parser_stub.h
@@ -23,7 +23,7 @@
 
 #include <base/macros.h>
 
-#include "payload_consumer/certificate_parser_interface.h"
+#include "update_engine/payload_consumer/certificate_parser_interface.h"
 
 namespace chromeos_update_engine {
 class CertificateParserStub : public CertificateParserInterface {
diff --git a/payload_consumer/delta_performer.cc b/payload_consumer/delta_performer.cc
index 19d1297..87fc4cf 100644
--- a/payload_consumer/delta_performer.cc
+++ b/payload_consumer/delta_performer.cc
@@ -41,6 +41,8 @@
 #include <puffin/puffpatch.h>
 
 #include "update_engine/common/constants.h"
+#include "update_engine/common/error_code.h"
+#include "update_engine/common/error_code_utils.h"
 #include "update_engine/common/hardware_interface.h"
 #include "update_engine/common/prefs_interface.h"
 #include "update_engine/common/subprocess.h"
@@ -52,6 +54,7 @@
 #include "update_engine/payload_consumer/extent_reader.h"
 #include "update_engine/payload_consumer/extent_writer.h"
 #include "update_engine/payload_consumer/partition_update_generator_interface.h"
+#include "update_engine/payload_consumer/partition_writer.h"
 #if USE_FEC
 #include "update_engine/payload_consumer/fec_file_descriptor.h"
 #endif  // USE_FEC
@@ -77,65 +80,6 @@
 const int kUpdateStateOperationInvalid = -1;
 const int kMaxResumedUpdateFailures = 10;
 
-const uint64_t kCacheSize = 1024 * 1024;  // 1MB
-
-// Opens path for read/write. On success returns an open FileDescriptor
-// and sets *err to 0. On failure, sets *err to errno and returns nullptr.
-FileDescriptorPtr OpenFile(const char* path,
-                           int mode,
-                           bool cache_writes,
-                           int* err) {
-  // Try to mark the block device read-only based on the mode. Ignore any
-  // failure since this won't work when passing regular files.
-  bool read_only = (mode & O_ACCMODE) == O_RDONLY;
-  utils::SetBlockDeviceReadOnly(path, read_only);
-
-  FileDescriptorPtr fd(new EintrSafeFileDescriptor());
-  if (cache_writes && !read_only) {
-    fd = FileDescriptorPtr(new CachedFileDescriptor(fd, kCacheSize));
-    LOG(INFO) << "Caching writes.";
-  }
-  if (!fd->Open(path, mode, 000)) {
-    *err = errno;
-    PLOG(ERROR) << "Unable to open file " << path;
-    return nullptr;
-  }
-  *err = 0;
-  return fd;
-}
-
-// Discard the tail of the block device referenced by |fd|, from the offset
-// |data_size| until the end of the block device. Returns whether the data was
-// discarded.
-bool DiscardPartitionTail(const FileDescriptorPtr& fd, uint64_t data_size) {
-  uint64_t part_size = fd->BlockDevSize();
-  if (!part_size || part_size <= data_size)
-    return false;
-
-  struct blkioctl_request {
-    int number;
-    const char* name;
-  };
-  const vector<blkioctl_request> blkioctl_requests = {
-      {BLKDISCARD, "BLKDISCARD"},
-      {BLKSECDISCARD, "BLKSECDISCARD"},
-#ifdef BLKZEROOUT
-      {BLKZEROOUT, "BLKZEROOUT"},
-#endif
-  };
-  for (const auto& req : blkioctl_requests) {
-    int error = 0;
-    if (fd->BlkIoctl(req.number, data_size, part_size - data_size, &error) &&
-        error == 0) {
-      return true;
-    }
-    LOG(WARNING) << "Error discarding the last "
-                 << (part_size - data_size) / 1024 << " KiB using ioctl("
-                 << req.name << ")";
-  }
-  return false;
-}
-
 }  // namespace
 
 // Computes the ratio of |part| and |total|, scaled to |norm|, using integer
@@ -280,33 +224,12 @@
 }
 
 int DeltaPerformer::CloseCurrentPartition() {
-  int err = 0;
-  if (source_fd_ && !source_fd_->Close()) {
-    err = errno;
-    PLOG(ERROR) << "Error closing source partition";
-    if (!err)
-      err = 1;
+  if (!partition_writer_) {
+    return 0;
   }
-  source_fd_.reset();
-  if (source_ecc_fd_ && !source_ecc_fd_->Close()) {
-    err = errno;
-    PLOG(ERROR) << "Error closing ECC source partition";
-    if (!err)
-      err = 1;
-  }
-  source_ecc_fd_.reset();
-  source_ecc_open_failure_ = false;
-  source_path_.clear();
-
-  if (target_fd_ && !target_fd_->Close()) {
-    err = errno;
-    PLOG(ERROR) << "Error closing target partition";
-    if (!err)
-      err = 1;
-  }
-  target_fd_.reset();
-  target_path_.clear();
-  return -err;
+  int err = partition_writer_->Close();
+  partition_writer_ = nullptr;
+  return err;
 }
 
 bool DeltaPerformer::OpenCurrentPartition() {
@@ -318,92 +241,18 @@
       install_plan_->partitions.size() - partitions_.size();
   const InstallPlan::Partition& install_part =
       install_plan_->partitions[num_previous_partitions + current_partition_];
+  partition_writer_ = std::make_unique<PartitionWriter>(
+      partition,
+      install_part,
+      boot_control_->GetDynamicPartitionControl(),
+      block_size_,
+      interactive_);
+
   // Open source fds if we have a delta payload, or for partitions in the
   // partial update.
   bool source_may_exist = manifest_.partial_update() ||
                           payload_->type == InstallPayloadType::kDelta;
-  // We shouldn't open the source partition in certain cases, e.g. some dynamic
-  // partitions in delta payload, partitions included in the full payload for
-  // partial updates. Use the source size as the indicator.
-  if (source_may_exist && install_part.source_size > 0) {
-    source_path_ = install_part.source_path;
-    int err;
-    source_fd_ = OpenFile(source_path_.c_str(), O_RDONLY, false, &err);
-    if (!source_fd_) {
-      LOG(ERROR) << "Unable to open source partition "
-                 << partition.partition_name() << " on slot "
-                 << BootControlInterface::SlotName(install_plan_->source_slot)
-                 << ", file " << source_path_;
-      return false;
-    }
-  }
-
-  target_path_ = install_part.target_path;
-  int err;
-
-  int flags = O_RDWR;
-  if (!interactive_)
-    flags |= O_DSYNC;
-
-  LOG(INFO) << "Opening " << target_path_ << " partition with"
-            << (interactive_ ? "out" : "") << " O_DSYNC";
-
-  target_fd_ = OpenFile(target_path_.c_str(), flags, true, &err);
-  if (!target_fd_) {
-    LOG(ERROR) << "Unable to open target partition "
-               << partition.partition_name() << " on slot "
-               << BootControlInterface::SlotName(install_plan_->target_slot)
-               << ", file " << target_path_;
-    return false;
-  }
-
-  LOG(INFO) << "Applying " << partition.operations().size()
-            << " operations to partition \"" << partition.partition_name()
-            << "\"";
-
-  // Discard the end of the partition, but ignore failures.
-  DiscardPartitionTail(target_fd_, install_part.target_size);
-
-  return true;
-}
-
-bool DeltaPerformer::OpenCurrentECCPartition() {
-  if (source_ecc_fd_)
-    return true;
-
-  if (source_ecc_open_failure_)
-    return false;
-
-  if (current_partition_ >= partitions_.size())
-    return false;
-
-  // No support for ECC for full payloads.
-  if (payload_->type == InstallPayloadType::kFull)
-    return false;
-
-#if USE_FEC
-  const PartitionUpdate& partition = partitions_[current_partition_];
-  size_t num_previous_partitions =
-      install_plan_->partitions.size() - partitions_.size();
-  const InstallPlan::Partition& install_part =
-      install_plan_->partitions[num_previous_partitions + current_partition_];
-  string path = install_part.source_path;
-  FileDescriptorPtr fd(new FecFileDescriptor());
-  if (!fd->Open(path.c_str(), O_RDONLY, 0)) {
-    PLOG(ERROR) << "Unable to open ECC source partition "
-                << partition.partition_name() << " on slot "
-                << BootControlInterface::SlotName(install_plan_->source_slot)
-                << ", file " << path;
-    source_ecc_open_failure_ = true;
-    return false;
-  }
-  source_ecc_fd_ = fd;
-#else
-  // No support for ECC compiled.
-  source_ecc_open_failure_ = true;
-#endif  // USE_FEC
-
-  return !source_ecc_open_failure_;
+  return partition_writer_->Init(install_plan_, source_may_exist);
 }
 
 namespace {
@@ -731,10 +580,6 @@
     if (!HandleOpResult(op_result, InstallOperationTypeName(op.type()), error))
       return false;
 
-    if (!target_fd_->Flush()) {
-      return false;
-    }
-
     next_operation_num_++;
     UpdateOverallProgress(false, "Completed ");
     CheckpointUpdateProgress(false);
@@ -1001,22 +846,10 @@
 
   // Since we delete data off the beginning of the buffer as we use it,
   // the data we need should be exactly at the beginning of the buffer.
-  TEST_AND_RETURN_FALSE(buffer_offset_ == operation.data_offset());
   TEST_AND_RETURN_FALSE(buffer_.size() >= operation.data_length());
 
-  // Setup the ExtentWriter stack based on the operation type.
-  std::unique_ptr<ExtentWriter> writer = std::make_unique<DirectExtentWriter>();
-
-  if (operation.type() == InstallOperation::REPLACE_BZ) {
-    writer.reset(new BzipExtentWriter(std::move(writer)));
-  } else if (operation.type() == InstallOperation::REPLACE_XZ) {
-    writer.reset(new XzExtentWriter(std::move(writer)));
-  }
-
-  TEST_AND_RETURN_FALSE(
-      writer->Init(target_fd_, operation.dst_extents(), block_size_));
-  TEST_AND_RETURN_FALSE(writer->Write(buffer_.data(), operation.data_length()));
-
+  TEST_AND_RETURN_FALSE(partition_writer_->PerformReplaceOperation(
+      operation, buffer_.data(), buffer_.size()));
   // Update buffer
   DiscardBuffer(true, buffer_.size());
   return true;
@@ -1031,41 +864,13 @@
   TEST_AND_RETURN_FALSE(!operation.has_data_offset());
   TEST_AND_RETURN_FALSE(!operation.has_data_length());
 
-#ifdef BLKZEROOUT
-  bool attempt_ioctl = true;
-  int request =
-      (operation.type() == InstallOperation::ZERO ? BLKZEROOUT : BLKDISCARD);
-#else   // !defined(BLKZEROOUT)
-  bool attempt_ioctl = false;
-  int request = 0;
-#endif  // !defined(BLKZEROOUT)
-
-  brillo::Blob zeros;
-  for (const Extent& extent : operation.dst_extents()) {
-    const uint64_t start = extent.start_block() * block_size_;
-    const uint64_t length = extent.num_blocks() * block_size_;
-    if (attempt_ioctl) {
-      int result = 0;
-      if (target_fd_->BlkIoctl(request, start, length, &result) && result == 0)
-        continue;
-      attempt_ioctl = false;
-    }
-    // In case of failure, we fall back to writing 0 to the selected region.
-    zeros.resize(16 * block_size_);
-    for (uint64_t offset = 0; offset < length; offset += zeros.size()) {
-      uint64_t chunk_length =
-          min(length - offset, static_cast<uint64_t>(zeros.size()));
-      TEST_AND_RETURN_FALSE(utils::PWriteAll(
-          target_fd_, zeros.data(), chunk_length, start + offset));
-    }
-  }
-  return true;
+  return partition_writer_->PerformZeroOrDiscardOperation(operation);
 }
 
-bool DeltaPerformer::ValidateSourceHash(const brillo::Blob& calculated_hash,
-                                        const InstallOperation& operation,
-                                        const FileDescriptorPtr source_fd,
-                                        ErrorCode* error) {
+bool PartitionWriter::ValidateSourceHash(const brillo::Blob& calculated_hash,
+                                         const InstallOperation& operation,
+                                         const FileDescriptorPtr source_fd,
+                                         ErrorCode* error) {
   brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
                                     operation.src_sha256_hash().end());
   if (calculated_hash != expected_source_hash) {
@@ -1106,169 +911,7 @@
     TEST_AND_RETURN_FALSE(operation.src_length() % block_size_ == 0);
   if (operation.has_dst_length())
     TEST_AND_RETURN_FALSE(operation.dst_length() % block_size_ == 0);
-
-  TEST_AND_RETURN_FALSE(source_fd_ != nullptr);
-
-  // The device may optimize the SOURCE_COPY operation.
-  // Being this a device-specific optimization let DynamicPartitionController
-  // decide it the operation should be skipped.
-  const PartitionUpdate& partition = partitions_[current_partition_];
-  const auto& partition_control = boot_control_->GetDynamicPartitionControl();
-
-  InstallOperation buf;
-  bool should_optimize = partition_control->OptimizeOperation(
-      partition.partition_name(), operation, &buf);
-  const InstallOperation& optimized = should_optimize ? buf : operation;
-
-  if (operation.has_src_sha256_hash()) {
-    bool read_ok;
-    brillo::Blob source_hash;
-    brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
-                                      operation.src_sha256_hash().end());
-
-    // We fall back to use the error corrected device if the hash of the raw
-    // device doesn't match or there was an error reading the source partition.
-    // Note that this code will also fall back if writing the target partition
-    // fails.
-    if (should_optimize) {
-      // Hash operation.src_extents(), then copy optimized.src_extents to
-      // optimized.dst_extents.
-      read_ok =
-          fd_utils::ReadAndHashExtents(
-              source_fd_, operation.src_extents(), block_size_, &source_hash) &&
-          fd_utils::CopyAndHashExtents(source_fd_,
-                                       optimized.src_extents(),
-                                       target_fd_,
-                                       optimized.dst_extents(),
-                                       block_size_,
-                                       nullptr /* skip hashing */);
-    } else {
-      read_ok = fd_utils::CopyAndHashExtents(source_fd_,
-                                             operation.src_extents(),
-                                             target_fd_,
-                                             operation.dst_extents(),
-                                             block_size_,
-                                             &source_hash);
-    }
-    if (read_ok && expected_source_hash == source_hash)
-      return true;
-    LOG(WARNING) << "Source hash from RAW device mismatched, attempting to "
-                    "correct using ECC";
-    if (!OpenCurrentECCPartition()) {
-      // The following function call will return false since the source hash
-      // mismatches, but we still want to call it so it prints the appropriate
-      // log message.
-      return ValidateSourceHash(source_hash, operation, source_fd_, error);
-    }
-
-    LOG(WARNING) << "Source hash from RAW device mismatched: found "
-                 << base::HexEncode(source_hash.data(), source_hash.size())
-                 << ", expected "
-                 << base::HexEncode(expected_source_hash.data(),
-                                    expected_source_hash.size());
-    if (should_optimize) {
-      TEST_AND_RETURN_FALSE(fd_utils::ReadAndHashExtents(
-          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash));
-      TEST_AND_RETURN_FALSE(
-          fd_utils::CopyAndHashExtents(source_ecc_fd_,
-                                       optimized.src_extents(),
-                                       target_fd_,
-                                       optimized.dst_extents(),
-                                       block_size_,
-                                       nullptr /* skip hashing */));
-    } else {
-      TEST_AND_RETURN_FALSE(
-          fd_utils::CopyAndHashExtents(source_ecc_fd_,
-                                       operation.src_extents(),
-                                       target_fd_,
-                                       operation.dst_extents(),
-                                       block_size_,
-                                       &source_hash));
-    }
-    TEST_AND_RETURN_FALSE(
-        ValidateSourceHash(source_hash, operation, source_ecc_fd_, error));
-    // At this point reading from the the error corrected device worked, but
-    // reading from the raw device failed, so this is considered a recovered
-    // failure.
-    source_ecc_recovered_failures_++;
-  } else {
-    // When the operation doesn't include a source hash, we attempt the error
-    // corrected device first since we can't verify the block in the raw device
-    // at this point, but we fall back to the raw device since the error
-    // corrected device can be shorter or not available.
-
-    if (OpenCurrentECCPartition() &&
-        fd_utils::CopyAndHashExtents(source_ecc_fd_,
-                                     optimized.src_extents(),
-                                     target_fd_,
-                                     optimized.dst_extents(),
-                                     block_size_,
-                                     nullptr)) {
-      return true;
-    }
-    TEST_AND_RETURN_FALSE(fd_utils::CopyAndHashExtents(source_fd_,
-                                                       optimized.src_extents(),
-                                                       target_fd_,
-                                                       optimized.dst_extents(),
-                                                       block_size_,
-                                                       nullptr));
-  }
-  return true;
-}
-
-FileDescriptorPtr DeltaPerformer::ChooseSourceFD(
-    const InstallOperation& operation, ErrorCode* error) {
-  if (source_fd_ == nullptr) {
-    LOG(ERROR) << "ChooseSourceFD fail: source_fd_ == nullptr";
-    return nullptr;
-  }
-
-  if (!operation.has_src_sha256_hash()) {
-    // When the operation doesn't include a source hash, we attempt the error
-    // corrected device first since we can't verify the block in the raw device
-    // at this point, but we first need to make sure all extents are readable
-    // since the error corrected device can be shorter or not available.
-    if (OpenCurrentECCPartition() &&
-        fd_utils::ReadAndHashExtents(
-            source_ecc_fd_, operation.src_extents(), block_size_, nullptr)) {
-      return source_ecc_fd_;
-    }
-    return source_fd_;
-  }
-
-  brillo::Blob source_hash;
-  brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
-                                    operation.src_sha256_hash().end());
-  if (fd_utils::ReadAndHashExtents(
-          source_fd_, operation.src_extents(), block_size_, &source_hash) &&
-      source_hash == expected_source_hash) {
-    return source_fd_;
-  }
-  // We fall back to use the error corrected device if the hash of the raw
-  // device doesn't match or there was an error reading the source partition.
-  if (!OpenCurrentECCPartition()) {
-    // The following function call will return false since the source hash
-    // mismatches, but we still want to call it so it prints the appropriate
-    // log message.
-    ValidateSourceHash(source_hash, operation, source_fd_, error);
-    return nullptr;
-  }
-  LOG(WARNING) << "Source hash from RAW device mismatched: found "
-               << base::HexEncode(source_hash.data(), source_hash.size())
-               << ", expected "
-               << base::HexEncode(expected_source_hash.data(),
-                                  expected_source_hash.size());
-
-  if (fd_utils::ReadAndHashExtents(
-          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash) &&
-      ValidateSourceHash(source_hash, operation, source_ecc_fd_, error)) {
-    // At this point reading from the the error corrected device worked, but
-    // reading from the raw device failed, so this is considered a recovered
-    // failure.
-    source_ecc_recovered_failures_++;
-    return source_ecc_fd_;
-  }
-  return nullptr;
+  return partition_writer_->PerformSourceCopyOperation(operation, error);
 }
 
 bool DeltaPerformer::ExtentsToBsdiffPositionsString(
@@ -1293,69 +936,6 @@
   return true;
 }
 
-namespace {
-
-class BsdiffExtentFile : public bsdiff::FileInterface {
- public:
-  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader, size_t size)
-      : BsdiffExtentFile(std::move(reader), nullptr, size) {}
-  BsdiffExtentFile(std::unique_ptr<ExtentWriter> writer, size_t size)
-      : BsdiffExtentFile(nullptr, std::move(writer), size) {}
-
-  ~BsdiffExtentFile() override = default;
-
-  bool Read(void* buf, size_t count, size_t* bytes_read) override {
-    TEST_AND_RETURN_FALSE(reader_->Read(buf, count));
-    *bytes_read = count;
-    offset_ += count;
-    return true;
-  }
-
-  bool Write(const void* buf, size_t count, size_t* bytes_written) override {
-    TEST_AND_RETURN_FALSE(writer_->Write(buf, count));
-    *bytes_written = count;
-    offset_ += count;
-    return true;
-  }
-
-  bool Seek(off_t pos) override {
-    if (reader_ != nullptr) {
-      TEST_AND_RETURN_FALSE(reader_->Seek(pos));
-      offset_ = pos;
-    } else {
-      // For writes technically there should be no change of position, or it
-      // should be equivalent of current offset.
-      TEST_AND_RETURN_FALSE(offset_ == static_cast<uint64_t>(pos));
-    }
-    return true;
-  }
-
-  bool Close() override { return true; }
-
-  bool GetSize(uint64_t* size) override {
-    *size = size_;
-    return true;
-  }
-
- private:
-  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader,
-                   std::unique_ptr<ExtentWriter> writer,
-                   size_t size)
-      : reader_(std::move(reader)),
-        writer_(std::move(writer)),
-        size_(size),
-        offset_(0) {}
-
-  std::unique_ptr<ExtentReader> reader_;
-  std::unique_ptr<ExtentWriter> writer_;
-  uint64_t size_;
-  uint64_t offset_;
-
-  DISALLOW_COPY_AND_ASSIGN(BsdiffExtentFile);
-};
-
-}  // namespace
-
 bool DeltaPerformer::PerformSourceBsdiffOperation(
     const InstallOperation& operation, ErrorCode* error) {
   // Since we delete data off the beginning of the buffer as we use it,
@@ -1367,136 +947,20 @@
   if (operation.has_dst_length())
     TEST_AND_RETURN_FALSE(operation.dst_length() % block_size_ == 0);
 
-  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
-  TEST_AND_RETURN_FALSE(source_fd != nullptr);
-
-  auto reader = std::make_unique<DirectExtentReader>();
-  TEST_AND_RETURN_FALSE(
-      reader->Init(source_fd, operation.src_extents(), block_size_));
-  auto src_file = std::make_unique<BsdiffExtentFile>(
-      std::move(reader),
-      utils::BlocksInExtents(operation.src_extents()) * block_size_);
-
-  auto writer = std::make_unique<DirectExtentWriter>();
-  TEST_AND_RETURN_FALSE(
-      writer->Init(target_fd_, operation.dst_extents(), block_size_));
-  auto dst_file = std::make_unique<BsdiffExtentFile>(
-      std::move(writer),
-      utils::BlocksInExtents(operation.dst_extents()) * block_size_);
-
-  TEST_AND_RETURN_FALSE(bsdiff::bspatch(std::move(src_file),
-                                        std::move(dst_file),
-                                        buffer_.data(),
-                                        buffer_.size()) == 0);
+  TEST_AND_RETURN_FALSE(partition_writer_->PerformSourceBsdiffOperation(
+      operation, error, buffer_.data(), buffer_.size()));
   DiscardBuffer(true, buffer_.size());
   return true;
 }
 
-namespace {
-
-// A class to be passed to |puffpatch| for reading from |source_fd_| and writing
-// into |target_fd_|.
-class PuffinExtentStream : public puffin::StreamInterface {
- public:
-  // Constructor for creating a stream for reading from an |ExtentReader|.
-  PuffinExtentStream(std::unique_ptr<ExtentReader> reader, uint64_t size)
-      : PuffinExtentStream(std::move(reader), nullptr, size) {}
-
-  // Constructor for creating a stream for writing to an |ExtentWriter|.
-  PuffinExtentStream(std::unique_ptr<ExtentWriter> writer, uint64_t size)
-      : PuffinExtentStream(nullptr, std::move(writer), size) {}
-
-  ~PuffinExtentStream() override = default;
-
-  bool GetSize(uint64_t* size) const override {
-    *size = size_;
-    return true;
-  }
-
-  bool GetOffset(uint64_t* offset) const override {
-    *offset = offset_;
-    return true;
-  }
-
-  bool Seek(uint64_t offset) override {
-    if (is_read_) {
-      TEST_AND_RETURN_FALSE(reader_->Seek(offset));
-      offset_ = offset;
-    } else {
-      // For writes technically there should be no change of position, or it
-      // should equivalent of current offset.
-      TEST_AND_RETURN_FALSE(offset_ == offset);
-    }
-    return true;
-  }
-
-  bool Read(void* buffer, size_t count) override {
-    TEST_AND_RETURN_FALSE(is_read_);
-    TEST_AND_RETURN_FALSE(reader_->Read(buffer, count));
-    offset_ += count;
-    return true;
-  }
-
-  bool Write(const void* buffer, size_t count) override {
-    TEST_AND_RETURN_FALSE(!is_read_);
-    TEST_AND_RETURN_FALSE(writer_->Write(buffer, count));
-    offset_ += count;
-    return true;
-  }
-
-  bool Close() override { return true; }
-
- private:
-  PuffinExtentStream(std::unique_ptr<ExtentReader> reader,
-                     std::unique_ptr<ExtentWriter> writer,
-                     uint64_t size)
-      : reader_(std::move(reader)),
-        writer_(std::move(writer)),
-        size_(size),
-        offset_(0),
-        is_read_(reader_ ? true : false) {}
-
-  std::unique_ptr<ExtentReader> reader_;
-  std::unique_ptr<ExtentWriter> writer_;
-  uint64_t size_;
-  uint64_t offset_;
-  bool is_read_;
-
-  DISALLOW_COPY_AND_ASSIGN(PuffinExtentStream);
-};
-
-}  // namespace
-
 bool DeltaPerformer::PerformPuffDiffOperation(const InstallOperation& operation,
                                               ErrorCode* error) {
   // Since we delete data off the beginning of the buffer as we use it,
   // the data we need should be exactly at the beginning of the buffer.
   TEST_AND_RETURN_FALSE(buffer_offset_ == operation.data_offset());
   TEST_AND_RETURN_FALSE(buffer_.size() >= operation.data_length());
-
-  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
-  TEST_AND_RETURN_FALSE(source_fd != nullptr);
-
-  auto reader = std::make_unique<DirectExtentReader>();
-  TEST_AND_RETURN_FALSE(
-      reader->Init(source_fd, operation.src_extents(), block_size_));
-  puffin::UniqueStreamPtr src_stream(new PuffinExtentStream(
-      std::move(reader),
-      utils::BlocksInExtents(operation.src_extents()) * block_size_));
-
-  auto writer = std::make_unique<DirectExtentWriter>();
-  TEST_AND_RETURN_FALSE(
-      writer->Init(target_fd_, operation.dst_extents(), block_size_));
-  puffin::UniqueStreamPtr dst_stream(new PuffinExtentStream(
-      std::move(writer),
-      utils::BlocksInExtents(operation.dst_extents()) * block_size_));
-
-  const size_t kMaxCacheSize = 5 * 1024 * 1024;  // Total 5MB cache.
-  TEST_AND_RETURN_FALSE(puffin::PuffPatch(std::move(src_stream),
-                                          std::move(dst_stream),
-                                          buffer_.data(),
-                                          buffer_.size(),
-                                          kMaxCacheSize));
+  TEST_AND_RETURN_FALSE(partition_writer_->PerformPuffDiffOperation(
+      operation, error, buffer_.data(), buffer_.size()));
   DiscardBuffer(true, buffer_.size());
   return true;
 }
@@ -1509,11 +973,11 @@
       buffer_.begin(), buffer_.begin() + manifest_.signatures_size());
 
   // Save the signature blob because if the update is interrupted after the
-  // download phase we don't go through this path anymore. Some alternatives to
-  // consider:
+  // download phase we don't go through this path anymore. Some alternatives
+  // to consider:
   //
-  // 1. On resume, re-download the signature blob from the server and re-verify
-  // it.
+  // 1. On resume, re-download the signature blob from the server and
+  // re-verify it.
   //
   // 2. Verify the signature as soon as it's received and don't checkpoint the
   // blob and the signed sha-256 context.
@@ -1536,8 +1000,8 @@
     return utils::ReadFile(public_key_path_, out_public_key);
   }
 
-  // If this is an official build then we are not allowed to use public key from
-  // Omaha response.
+  // If this is an official build then we are not allowed to use public key
+  // from Omaha response.
   if (!hardware_->IsOfficialBuild() && !install_plan_->public_key_rsa.empty()) {
     LOG(INFO) << "Verifying using public key from Omaha response.";
     return brillo::data_encoding::Base64Decode(install_plan_->public_key_rsa,
@@ -1628,17 +1092,19 @@
     LOG(ERROR) << "Manifest contains deprecated fields.";
     return ErrorCode::kPayloadMismatchedType;
   }
-
-  if (manifest_.max_timestamp() < hardware_->GetBuildTimestamp()) {
-    LOG(ERROR) << "The current OS build timestamp ("
-               << hardware_->GetBuildTimestamp()
-               << ") is newer than the maximum timestamp in the manifest ("
-               << manifest_.max_timestamp() << ")";
-    if (!hardware_->AllowDowngrade()) {
-      return ErrorCode::kPayloadTimestampError;
+  ErrorCode error_code = CheckTimestampError();
+  if (error_code != ErrorCode::kSuccess) {
+    if (error_code == ErrorCode::kPayloadTimestampError) {
+      if (!hardware_->AllowDowngrade()) {
+        return ErrorCode::kPayloadTimestampError;
+      }
+      LOG(INFO) << "The current OS build allows downgrade, continuing to apply"
+                   " the payload with an older timestamp.";
+    } else {
+      LOG(ERROR) << "Timestamp check returned "
+                 << utils::ErrorCodeToString(error_code);
+      return error_code;
     }
-    LOG(INFO) << "The current OS build allows downgrade, continuing to apply"
-                 " the payload with an older timestamp.";
   }
 
   // TODO(crbug.com/37661) we should be adding more and more manifest checks,
@@ -1647,14 +1113,98 @@
   return ErrorCode::kSuccess;
 }
 
+ErrorCode DeltaPerformer::CheckTimestampError() const {
+  bool is_partial_update =
+      manifest_.has_partial_update() && manifest_.partial_update();
+  const auto& partitions = manifest_.partitions();
+
+  // Check version field for a given PartitionUpdate object. If an error
+  // is encountered, set |error_code| accordingly. If downgrade is detected,
+  // |downgrade_detected| is set. Return true if the program should continue
+  // to check the next partition or not, or false if it should exit early due
+  // to errors.
+  auto&& timestamp_valid = [this](const PartitionUpdate& partition,
+                                  bool allow_empty_version,
+                                  bool* downgrade_detected) -> ErrorCode {
+    if (!partition.has_version()) {
+      if (allow_empty_version) {
+        return ErrorCode::kSuccess;
+      }
+      LOG(ERROR)
+          << "PartitionUpdate " << partition.partition_name()
+          << " does ot have a version field. Not allowed in partial updates.";
+      return ErrorCode::kDownloadManifestParseError;
+    }
+
+    auto error_code = hardware_->IsPartitionUpdateValid(
+        partition.partition_name(), partition.version());
+    switch (error_code) {
+      case ErrorCode::kSuccess:
+        break;
+      case ErrorCode::kPayloadTimestampError:
+        *downgrade_detected = true;
+        LOG(WARNING) << "PartitionUpdate " << partition.partition_name()
+                     << " has an older version than partition on device.";
+        break;
+      default:
+        LOG(ERROR) << "IsPartitionUpdateValid(" << partition.partition_name()
+                   << ") returned" << utils::ErrorCodeToString(error_code);
+        break;
+    }
+    return error_code;
+  };
+
+  bool downgrade_detected = false;
+
+  if (is_partial_update) {
+    // for partial updates, all partition MUST have valid timestamps
+    // But max_timestamp can be empty
+    for (const auto& partition : partitions) {
+      auto error_code = timestamp_valid(
+          partition, false /* allow_empty_version */, &downgrade_detected);
+      if (error_code != ErrorCode::kSuccess &&
+          error_code != ErrorCode::kPayloadTimestampError) {
+        return error_code;
+      }
+    }
+    if (downgrade_detected) {
+      return ErrorCode::kPayloadTimestampError;
+    }
+    return ErrorCode::kSuccess;
+  }
+
+  // For non-partial updates, check max_timestamp first.
+  if (manifest_.max_timestamp() < hardware_->GetBuildTimestamp()) {
+    LOG(ERROR) << "The current OS build timestamp ("
+               << hardware_->GetBuildTimestamp()
+               << ") is newer than the maximum timestamp in the manifest ("
+               << manifest_.max_timestamp() << ")";
+    return ErrorCode::kPayloadTimestampError;
+  }
+  // Otherwise... partitions can have empty timestamps.
+  for (const auto& partition : partitions) {
+    auto error_code = timestamp_valid(
+        partition, true /* allow_empty_version */, &downgrade_detected);
+    if (error_code != ErrorCode::kSuccess &&
+        error_code != ErrorCode::kPayloadTimestampError) {
+      return error_code;
+    }
+  }
+  if (downgrade_detected) {
+    return ErrorCode::kPayloadTimestampError;
+  }
+  return ErrorCode::kSuccess;
+}
+
 ErrorCode DeltaPerformer::ValidateOperationHash(
     const InstallOperation& operation) {
   if (!operation.data_sha256_hash().size()) {
     if (!operation.data_length()) {
-      // Operations that do not have any data blob won't have any operation hash
-      // either. So, these operations are always considered validated since the
-      // metadata that contains all the non-data-blob portions of the operation
-      // has already been validated. This is true for both HTTP and HTTPS cases.
+      // Operations that do not have any data blob won't have any operation
+      // hash either. So, these operations are always considered validated
+      // since the metadata that contains all the non-data-blob portions of
+      // the operation has already been validated. This is true for both HTTP
+      // and HTTPS cases.
       return ErrorCode::kSuccess;
     }
 
@@ -1731,6 +1281,16 @@
     return ErrorCode::kPayloadSizeMismatchError;
   }
 
+  auto [payload_verifier, perform_verification] = CreatePayloadVerifier();
+  if (!perform_verification) {
+    LOG(WARNING) << "Not verifying signed delta payload -- missing public key.";
+    return ErrorCode::kSuccess;
+  }
+  if (!payload_verifier) {
+    LOG(ERROR) << "Failed to create the payload verifier.";
+    return ErrorCode::kDownloadPayloadPubKeyVerificationError;
+  }
+
   // Verifies the payload hash.
   TEST_AND_RETURN_VAL(ErrorCode::kDownloadPayloadVerificationError,
                       !payload_hash_calculator_.raw_hash().empty());
@@ -1744,15 +1304,6 @@
   TEST_AND_RETURN_VAL(ErrorCode::kDownloadPayloadPubKeyVerificationError,
                       hash_data.size() == kSHA256Size);
 
-  auto [payload_verifier, perform_verification] = CreatePayloadVerifier();
-  if (!perform_verification) {
-    LOG(WARNING) << "Not verifying signed delta payload -- missing public key.";
-    return ErrorCode::kSuccess;
-  }
-  if (!payload_verifier) {
-    LOG(ERROR) << "Failed to create the payload verifier.";
-    return ErrorCode::kDownloadPayloadPubKeyVerificationError;
-  }
   if (!payload_verifier->VerifySignature(signatures_message_data_, hash_data)) {
     // The autoupdate_CatchBadSignatures test checks for this string
     // in log-files. Keep in sync.
@@ -1792,8 +1343,8 @@
     return false;
 
   int64_t resumed_update_failures;
-  // Note that storing this value is optional, but if it is there it should not
-  // be more than the limit.
+  // Note that storing this value is optional, but if it is there it should
+  // not be more than the limit.
   if (prefs->GetInt64(kPrefsResumedUpdateFailures, &resumed_update_failures) &&
       resumed_update_failures > kMaxResumedUpdateFailures)
     return false;
diff --git a/payload_consumer/delta_performer.h b/payload_consumer/delta_performer.h
index 2d1768d..d44f6c2 100644
--- a/payload_consumer/delta_performer.h
+++ b/payload_consumer/delta_performer.h
@@ -35,6 +35,7 @@
 #include "update_engine/payload_consumer/file_descriptor.h"
 #include "update_engine/payload_consumer/file_writer.h"
 #include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_consumer/partition_writer.h"
 #include "update_engine/payload_consumer/payload_metadata.h"
 #include "update_engine/payload_consumer/payload_verifier.h"
 #include "update_engine/update_metadata.pb.h"
@@ -48,7 +49,6 @@
 
 // This class performs the actions in a delta update synchronously. The delta
 // update itself should be passed in in chunks as it is received.
-
 class DeltaPerformer : public FileWriter {
  public:
   // Defines the granularity of progress logging in terms of how many "completed
@@ -102,10 +102,6 @@
   // work. Returns whether the required file descriptors were successfully open.
   bool OpenCurrentPartition();
 
-  // Attempt to open the error-corrected device for the current partition.
-  // Returns whether the operation succeeded.
-  bool OpenCurrentECCPartition();
-
   // Closes the current partition file descriptors if open. Returns 0 on success
   // or -errno on error.
   int CloseCurrentPartition();
@@ -178,14 +174,6 @@
   // it returns that value, otherwise it returns the default value.
   uint32_t GetMinorVersion() const;
 
-  // Compare |calculated_hash| with source hash in |operation|, return false and
-  // dump hash and set |error| if don't match.
-  // |source_fd| is the file descriptor of the source partition.
-  static bool ValidateSourceHash(const brillo::Blob& calculated_hash,
-                                 const InstallOperation& operation,
-                                 const FileDescriptorPtr source_fd,
-                                 ErrorCode* error);
-
   // Initialize partitions and allocate required space for an update with the
   // given |manifest|. |update_check_response_hash| is used to check if the
   // previous call to this function corresponds to the same payload.
@@ -209,7 +197,6 @@
   friend class DeltaPerformerIntegrationTest;
   FRIEND_TEST(DeltaPerformerTest, BrilloMetadataSignatureSizeTest);
   FRIEND_TEST(DeltaPerformerTest, BrilloParsePayloadMetadataTest);
-  FRIEND_TEST(DeltaPerformerTest, ChooseSourceFDTest);
   FRIEND_TEST(DeltaPerformerTest, UsePublicKeyFromResponse);
 
   // Parse and move the update instructions of all partitions into our local
@@ -263,13 +250,6 @@
   bool PerformPuffDiffOperation(const InstallOperation& operation,
                                 ErrorCode* error);
 
-  // For a given operation, choose the source fd to be used (raw device or error
-  // correction device) based on the source operation hash.
-  // Returns nullptr if the source hash mismatch cannot be corrected, and set
-  // the |error| accordingly.
-  FileDescriptorPtr ChooseSourceFD(const InstallOperation& operation,
-                                   ErrorCode* error);
-
   // Extracts the payload signature message from the current |buffer_| if the
   // offset matches the one specified by the manifest. Returns whether the
   // signature was extracted.
@@ -310,6 +290,15 @@
   // Also see comment for the static PreparePartitionsForUpdate().
   bool PreparePartitionsForUpdate(uint64_t* required_size);
 
+  // Check if current manifest contains timestamp errors.
+  // Return:
+  // - kSuccess if update is valid.
+  // - kPayloadTimestampError if downgrade is detected
+  // - kDownloadManifestParseError if |new_version| has an incorrect format
+  // - Other error values if the source of error is known, or kError for
+  //   a generic error on the device.
+  ErrorCode CheckTimestampError() const;
+
   // Update Engine preference store.
   PrefsInterface* prefs_;
 
@@ -327,34 +316,6 @@
   // Pointer to the current payload in install_plan_.payloads.
   InstallPlan::Payload* payload_{nullptr};
 
-  // File descriptor of the source partition. Only set while updating a
-  // partition when using a delta payload.
-  FileDescriptorPtr source_fd_{nullptr};
-
-  // File descriptor of the error corrected source partition. Only set while
-  // updating partition using a delta payload for a partition where error
-  // correction is available. The size of the error corrected device is smaller
-  // than the underlying raw device, since it doesn't include the error
-  // correction blocks.
-  FileDescriptorPtr source_ecc_fd_{nullptr};
-
-  // The total number of operations that failed source hash verification but
-  // passed after falling back to the error-corrected |source_ecc_fd_| device.
-  uint64_t source_ecc_recovered_failures_{0};
-
-  // Whether opening the current partition as an error-corrected device failed.
-  // Used to avoid re-opening the same source partition if it is not actually
-  // error corrected.
-  bool source_ecc_open_failure_{false};
-
-  // File descriptor of the target partition. Only set while performing the
-  // operations of a given partition.
-  FileDescriptorPtr target_fd_{nullptr};
-
-  // Paths the |source_fd_| and |target_fd_| refer to.
-  std::string source_path_;
-  std::string target_path_;
-
   PayloadMetadata payload_metadata_;
 
   // Parsed manifest. Set after enough bytes to parse the manifest were
@@ -444,6 +405,8 @@
       base::TimeDelta::FromSeconds(kCheckpointFrequencySeconds)};
   base::TimeTicks update_checkpoint_time_;
 
+  std::unique_ptr<PartitionWriter> partition_writer_;
+
   DISALLOW_COPY_AND_ASSIGN(DeltaPerformer);
 };
 
diff --git a/payload_consumer/delta_performer_integration_test.cc b/payload_consumer/delta_performer_integration_test.cc
index acbecad..f2aeb03 100644
--- a/payload_consumer/delta_performer_integration_test.cc
+++ b/payload_consumer/delta_performer_integration_test.cc
@@ -36,9 +36,12 @@
 #include "update_engine/common/constants.h"
 #include "update_engine/common/fake_boot_control.h"
 #include "update_engine/common/fake_hardware.h"
+#include "update_engine/common/fake_prefs.h"
 #include "update_engine/common/mock_prefs.h"
 #include "update_engine/common/test_utils.h"
 #include "update_engine/common/utils.h"
+#include "update_engine/hardware_android.h"
+#include "update_engine/payload_consumer/install_plan.h"
 #include "update_engine/payload_consumer/mock_download_action.h"
 #include "update_engine/payload_consumer/payload_constants.h"
 #include "update_engine/payload_consumer/payload_metadata.h"
@@ -125,7 +128,41 @@
 
 }  // namespace
 
-class DeltaPerformerIntegrationTest : public ::testing::Test {};
+class DeltaPerformerIntegrationTest : public ::testing::Test {
+ public:
+  void RunManifestValidation(const DeltaArchiveManifest& manifest,
+                             uint64_t major_version,
+                             ErrorCode expected) {
+    FakePrefs prefs;
+    InstallPlan::Payload payload;
+    InstallPlan install_plan;
+    DeltaPerformer performer{&prefs,
+                             nullptr,
+                             &fake_hardware_,
+                             nullptr,
+                             &install_plan,
+                             &payload,
+                             false /* interactive*/};
+    // Delta performer will treat manifest as kDelta payload
+    // if it's a partial update.
+    payload.type = manifest.partial_update() ? InstallPayloadType::kDelta
+                                             : InstallPayloadType::kFull;
+
+    // The Manifest we are validating.
+    performer.manifest_.CopyFrom(manifest);
+    performer.major_payload_version_ = major_version;
+
+    EXPECT_EQ(expected, performer.ValidateManifest());
+  }
+  void AddPartition(DeltaArchiveManifest* manifest,
+                    std::string name,
+                    int timestamp) {
+    auto& partition = *manifest->add_partitions();
+    partition.set_version(std::to_string(timestamp));
+    partition.set_partition_name(name);
+  }
+  FakeHardware fake_hardware_;
+};
 
 static void CompareFilesByBlock(const string& a_file,
                                 const string& b_file,
@@ -995,13 +1032,13 @@
   delete performer;
 }
 
-TEST(DeltaPerformerIntegrationTest, RunAsRootSmallImageTest) {
+TEST_F(DeltaPerformerIntegrationTest, RunAsRootSmallImageTest) {
   DoSmallImageTest(
       false, false, -1, kSignatureGenerator, false, kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest,
-     RunAsRootSmallImageSignaturePlaceholderTest) {
+TEST_F(DeltaPerformerIntegrationTest,
+       RunAsRootSmallImageSignaturePlaceholderTest) {
   DoSmallImageTest(false,
                    false,
                    -1,
@@ -1010,8 +1047,8 @@
                    kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest,
-     RunAsRootSmallImageSignaturePlaceholderMismatchTest) {
+TEST_F(DeltaPerformerIntegrationTest,
+       RunAsRootSmallImageSignaturePlaceholderMismatchTest) {
   DeltaState state;
   GenerateDeltaFile(false,
                     false,
@@ -1021,7 +1058,7 @@
                     kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest, RunAsRootSmallImageChunksTest) {
+TEST_F(DeltaPerformerIntegrationTest, RunAsRootSmallImageChunksTest) {
   DoSmallImageTest(false,
                    false,
                    kBlockSize,
@@ -1030,27 +1067,28 @@
                    kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest, RunAsRootFullKernelSmallImageTest) {
+TEST_F(DeltaPerformerIntegrationTest, RunAsRootFullKernelSmallImageTest) {
   DoSmallImageTest(
       true, false, -1, kSignatureGenerator, false, kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest, RunAsRootFullSmallImageTest) {
+TEST_F(DeltaPerformerIntegrationTest, RunAsRootFullSmallImageTest) {
   DoSmallImageTest(
       true, true, -1, kSignatureGenerator, true, kFullPayloadMinorVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest, RunAsRootSmallImageSignNoneTest) {
+TEST_F(DeltaPerformerIntegrationTest, RunAsRootSmallImageSignNoneTest) {
   DoSmallImageTest(
       false, false, -1, kSignatureNone, false, kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest, RunAsRootSmallImageSignGeneratedTest) {
+TEST_F(DeltaPerformerIntegrationTest, RunAsRootSmallImageSignGeneratedTest) {
   DoSmallImageTest(
       false, false, -1, kSignatureGenerated, true, kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest, RunAsRootSmallImageSignGeneratedShellTest) {
+TEST_F(DeltaPerformerIntegrationTest,
+       RunAsRootSmallImageSignGeneratedShellTest) {
   DoSmallImageTest(false,
                    false,
                    -1,
@@ -1059,8 +1097,8 @@
                    kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest,
-     RunAsRootSmallImageSignGeneratedShellECKeyTest) {
+TEST_F(DeltaPerformerIntegrationTest,
+       RunAsRootSmallImageSignGeneratedShellECKeyTest) {
   DoSmallImageTest(false,
                    false,
                    -1,
@@ -1069,8 +1107,8 @@
                    kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest,
-     RunAsRootSmallImageSignGeneratedShellBadKeyTest) {
+TEST_F(DeltaPerformerIntegrationTest,
+       RunAsRootSmallImageSignGeneratedShellBadKeyTest) {
   DoSmallImageTest(false,
                    false,
                    -1,
@@ -1079,8 +1117,8 @@
                    kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest,
-     RunAsRootSmallImageSignGeneratedShellRotateCl1Test) {
+TEST_F(DeltaPerformerIntegrationTest,
+       RunAsRootSmallImageSignGeneratedShellRotateCl1Test) {
   DoSmallImageTest(false,
                    false,
                    -1,
@@ -1089,8 +1127,8 @@
                    kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest,
-     RunAsRootSmallImageSignGeneratedShellRotateCl2Test) {
+TEST_F(DeltaPerformerIntegrationTest,
+       RunAsRootSmallImageSignGeneratedShellRotateCl2Test) {
   DoSmallImageTest(false,
                    false,
                    -1,
@@ -1099,14 +1137,137 @@
                    kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest, RunAsRootSmallImageSourceOpsTest) {
+TEST_F(DeltaPerformerIntegrationTest, RunAsRootSmallImageSourceOpsTest) {
   DoSmallImageTest(
       false, false, -1, kSignatureGenerator, false, kSourceMinorPayloadVersion);
 }
 
-TEST(DeltaPerformerIntegrationTest,
-     RunAsRootMandatoryOperationHashMismatchTest) {
+TEST_F(DeltaPerformerIntegrationTest,
+       RunAsRootMandatoryOperationHashMismatchTest) {
   DoOperationHashMismatchTest(kInvalidOperationData, true);
 }
 
+TEST_F(DeltaPerformerIntegrationTest, ValidatePerPartitionTimestampSuccess) {
+  // The Manifest we are validating.
+  DeltaArchiveManifest manifest;
+
+  fake_hardware_.SetVersion("system", "5");
+  fake_hardware_.SetVersion("product", "99");
+  fake_hardware_.SetBuildTimestamp(1);
+
+  manifest.set_minor_version(kFullPayloadMinorVersion);
+  manifest.set_max_timestamp(2);
+  AddPartition(&manifest, "system", 10);
+  AddPartition(&manifest, "product", 100);
+
+  RunManifestValidation(
+      manifest, kMaxSupportedMajorPayloadVersion, ErrorCode::kSuccess);
+}
+
+TEST_F(DeltaPerformerIntegrationTest, ValidatePerPartitionTimestampFailure) {
+  // The Manifest we are validating.
+  DeltaArchiveManifest manifest;
+
+  fake_hardware_.SetVersion("system", "5");
+  fake_hardware_.SetVersion("product", "99");
+  fake_hardware_.SetBuildTimestamp(1);
+
+  manifest.set_minor_version(kFullPayloadMinorVersion);
+  manifest.set_max_timestamp(2);
+  AddPartition(&manifest, "system", 10);
+  AddPartition(&manifest, "product", 98);
+
+  RunManifestValidation(manifest,
+                        kMaxSupportedMajorPayloadVersion,
+                        ErrorCode::kPayloadTimestampError);
+}
+
+TEST_F(DeltaPerformerIntegrationTest,
+       ValidatePerPartitionTimestampMissingTimestamp) {
+  // The Manifest we are validating.
+  DeltaArchiveManifest manifest;
+
+  fake_hardware_.SetVersion("system", "5");
+  fake_hardware_.SetVersion("product", "99");
+  fake_hardware_.SetBuildTimestamp(1);
+
+  manifest.set_minor_version(kFullPayloadMinorVersion);
+  manifest.set_max_timestamp(2);
+  AddPartition(&manifest, "system", 10);
+  {
+    auto& partition = *manifest.add_partitions();
+    // For complete updates, missing timestamp should not trigger
+    // timestamp error.
+    partition.set_partition_name("product");
+  }
+
+  RunManifestValidation(
+      manifest, kMaxSupportedMajorPayloadVersion, ErrorCode::kSuccess);
+}
+
+TEST_F(DeltaPerformerIntegrationTest,
+       ValidatePerPartitionTimestampPartialUpdatePass) {
+  fake_hardware_.SetVersion("system", "5");
+  fake_hardware_.SetVersion("product", "99");
+
+  DeltaArchiveManifest manifest;
+  manifest.set_minor_version(kPartialUpdateMinorPayloadVersion);
+  manifest.set_partial_update(true);
+  AddPartition(&manifest, "product", 100);
+  RunManifestValidation(
+      manifest, kMaxSupportedMajorPayloadVersion, ErrorCode::kSuccess);
+}
+
+TEST_F(DeltaPerformerIntegrationTest,
+       ValidatePerPartitionTimestampPartialUpdateDowngrade) {
+  fake_hardware_.SetVersion("system", "5");
+  fake_hardware_.SetVersion("product", "99");
+
+  DeltaArchiveManifest manifest;
+  manifest.set_minor_version(kPartialUpdateMinorPayloadVersion);
+  manifest.set_partial_update(true);
+  AddPartition(&manifest, "product", 98);
+  RunManifestValidation(manifest,
+                        kMaxSupportedMajorPayloadVersion,
+                        ErrorCode::kPayloadTimestampError);
+}
+
+TEST_F(DeltaPerformerIntegrationTest,
+       ValidatePerPartitionTimestampPartialUpdateMissingVersion) {
+  fake_hardware_.SetVersion("system", "5");
+  fake_hardware_.SetVersion("product", "99");
+
+  DeltaArchiveManifest manifest;
+  manifest.set_minor_version(kPartialUpdateMinorPayloadVersion);
+  manifest.set_partial_update(true);
+  {
+    auto& partition = *manifest.add_partitions();
+    // For partial updates, missing timestamp should trigger an error
+    partition.set_partition_name("product");
+    // has_version() == false.
+  }
+  RunManifestValidation(manifest,
+                        kMaxSupportedMajorPayloadVersion,
+                        ErrorCode::kDownloadManifestParseError);
+}
+
+TEST_F(DeltaPerformerIntegrationTest,
+       ValidatePerPartitionTimestampPartialUpdateEmptyVersion) {
+  fake_hardware_.SetVersion("system", "5");
+  fake_hardware_.SetVersion("product", "99");
+
+  DeltaArchiveManifest manifest;
+  manifest.set_minor_version(kPartialUpdateMinorPayloadVersion);
+  manifest.set_partial_update(true);
+  {
+    auto& partition = *manifest.add_partitions();
+    // For partial updates, invalid timestamp should trigger an error
+    partition.set_partition_name("product");
+    partition.set_version("something");
+  }
+  RunManifestValidation(manifest,
+                        kMaxSupportedMajorPayloadVersion,
+                        ErrorCode::kDownloadManifestParseError);
+}
+
 }  // namespace chromeos_update_engine
diff --git a/payload_consumer/delta_performer_unittest.cc b/payload_consumer/delta_performer_unittest.cc
index 44107cd..a5eb538 100644
--- a/payload_consumer/delta_performer_unittest.cc
+++ b/payload_consumer/delta_performer_unittest.cc
@@ -36,9 +36,11 @@
 #include <gtest/gtest.h>
 
 #include "update_engine/common/constants.h"
+#include "update_engine/common/error_code.h"
 #include "update_engine/common/fake_boot_control.h"
 #include "update_engine/common/fake_hardware.h"
 #include "update_engine/common/fake_prefs.h"
+#include "update_engine/common/hardware_interface.h"
 #include "update_engine/common/test_utils.h"
 #include "update_engine/common/utils.h"
 #include "update_engine/payload_consumer/fake_file_descriptor.h"
@@ -226,13 +228,13 @@
     new_part.path = "/dev/zero";
     new_part.size = 1234;
 
-    payload.AddPartition(*old_part, new_part, aops);
+    payload.AddPartition(*old_part, new_part, aops, {});
 
     // We include a kernel partition without operations.
     old_part->name = kPartitionNameKernel;
     new_part.name = kPartitionNameKernel;
     new_part.size = 0;
-    payload.AddPartition(*old_part, new_part, {});
+    payload.AddPartition(*old_part, new_part, {}, {});
 
     test_utils::ScopedTempFile payload_file("Payload-XXXXXX");
     string private_key =
@@ -416,22 +418,7 @@
     EXPECT_EQ(payload_.metadata_size, performer_.metadata_size_);
   }
 
-  // Helper function to pretend that the ECC file descriptor was already opened.
-  // Returns a pointer to the created file descriptor.
-  FakeFileDescriptor* SetFakeECCFile(size_t size) {
-    EXPECT_FALSE(performer_.source_ecc_fd_) << "source_ecc_fd_ already open.";
-    FakeFileDescriptor* ret = new FakeFileDescriptor();
-    fake_ecc_fd_.reset(ret);
-    // Call open to simulate it was already opened.
-    ret->Open("", 0);
-    ret->SetFileSize(size);
-    performer_.source_ecc_fd_ = fake_ecc_fd_;
-    return ret;
-  }
 
-  uint64_t GetSourceEccRecoveredFailures() const {
-    return performer_.source_ecc_recovered_failures_;
-  }
 
   FakePrefs prefs_;
   InstallPlan install_plan_;
@@ -658,94 +645,8 @@
   EXPECT_EQ(actual_data, ApplyPayload(payload_data, source.path(), false));
 }
 
-// Test that the error-corrected file descriptor is used to read the partition
-// since the source partition doesn't match the operation hash.
-TEST_F(DeltaPerformerTest, ErrorCorrectionSourceCopyFallbackTest) {
-  constexpr size_t kCopyOperationSize = 4 * 4096;
-  test_utils::ScopedTempFile source("Source-XXXXXX");
-  // Write invalid data to the source image, which doesn't match the expected
-  // hash.
-  brillo::Blob invalid_data(kCopyOperationSize, 0x55);
-  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), invalid_data));
 
-  // Setup the fec file descriptor as the fake stream, which matches
-  // |expected_data|.
-  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize);
-  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
 
-  PartitionConfig old_part(kPartitionNameRoot);
-  old_part.path = source.path();
-  old_part.size = invalid_data.size();
-
-  brillo::Blob payload_data =
-      GenerateSourceCopyPayload(expected_data, true, &old_part);
-  EXPECT_EQ(expected_data, ApplyPayload(payload_data, source.path(), true));
-  // Verify that the fake_fec was actually used.
-  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
-  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
-}
-
-// Test that the error-corrected file descriptor is used to read a partition
-// when no hash is available for SOURCE_COPY but it falls back to the normal
-// file descriptor when the size of the error corrected one is too small.
-TEST_F(DeltaPerformerTest, ErrorCorrectionSourceCopyWhenNoHashFallbackTest) {
-  constexpr size_t kCopyOperationSize = 4 * 4096;
-  test_utils::ScopedTempFile source("Source-XXXXXX");
-  // Setup the source path with the right expected data.
-  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
-  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), expected_data));
-
-  // Setup the fec file descriptor as the fake stream, with smaller data than
-  // the expected.
-  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize / 2);
-
-  PartitionConfig old_part(kPartitionNameRoot);
-  old_part.path = source.path();
-  old_part.size = expected_data.size();
-
-  // The payload operation doesn't include an operation hash.
-  brillo::Blob payload_data =
-      GenerateSourceCopyPayload(expected_data, false, &old_part);
-  EXPECT_EQ(expected_data, ApplyPayload(payload_data, source.path(), true));
-  // Verify that the fake_fec was attempted to be used. Since the file
-  // descriptor is shorter it can actually do more than one read to realize it
-  // reached the EOF.
-  EXPECT_LE(1U, fake_fec->GetReadOps().size());
-  // This fallback doesn't count as an error-corrected operation since the
-  // operation hash was not available.
-  EXPECT_EQ(0U, GetSourceEccRecoveredFailures());
-}
-
-TEST_F(DeltaPerformerTest, ChooseSourceFDTest) {
-  constexpr size_t kSourceSize = 4 * 4096;
-  test_utils::ScopedTempFile source("Source-XXXXXX");
-  // Write invalid data to the source image, which doesn't match the expected
-  // hash.
-  brillo::Blob invalid_data(kSourceSize, 0x55);
-  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), invalid_data));
-
-  performer_.source_fd_ = std::make_shared<EintrSafeFileDescriptor>();
-  performer_.source_fd_->Open(source.path().c_str(), O_RDONLY);
-  performer_.block_size_ = 4096;
-
-  // Setup the fec file descriptor as the fake stream, which matches
-  // |expected_data|.
-  FakeFileDescriptor* fake_fec = SetFakeECCFile(kSourceSize);
-  brillo::Blob expected_data = FakeFileDescriptorData(kSourceSize);
-
-  InstallOperation op;
-  *(op.add_src_extents()) = ExtentForRange(0, kSourceSize / 4096);
-  brillo::Blob src_hash;
-  EXPECT_TRUE(HashCalculator::RawHashOfData(expected_data, &src_hash));
-  op.set_src_sha256_hash(src_hash.data(), src_hash.size());
-
-  ErrorCode error = ErrorCode::kSuccess;
-  EXPECT_EQ(performer_.source_ecc_fd_, performer_.ChooseSourceFD(op, &error));
-  EXPECT_EQ(ErrorCode::kSuccess, error);
-  // Verify that the fake_fec was actually used.
-  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
-  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
-}
 
 TEST_F(DeltaPerformerTest, ExtentsToByteStringTest) {
   uint64_t test[] = {1, 1, 4, 2, 0, 1};
@@ -899,6 +800,24 @@
                         ErrorCode::kPayloadTimestampError);
 }
 
+TEST_F(DeltaPerformerTest, ValidatePerPartitionTimestampSuccess) {
+  // The Manifest we are validating.
+  DeltaArchiveManifest manifest;
+
+  manifest.set_minor_version(kFullPayloadMinorVersion);
+  manifest.set_max_timestamp(2);
+  fake_hardware_.SetBuildTimestamp(1);
+  auto& partition = *manifest.add_partitions();
+  partition.set_version("10");
+  partition.set_partition_name("system");
+  fake_hardware_.SetVersion("system", "5");
+
+  RunManifestValidation(manifest,
+                        kMaxSupportedMajorPayloadVersion,
+                        InstallPayloadType::kFull,
+                        ErrorCode::kSuccess);
+}
+
 TEST_F(DeltaPerformerTest, BrilloMetadataSignatureSizeTest) {
   unsigned int seed = time(nullptr);
   EXPECT_TRUE(performer_.Write(kDeltaMagic, sizeof(kDeltaMagic)));
diff --git a/payload_consumer/fec_file_descriptor.cc b/payload_consumer/fec_file_descriptor.cc
index de22cf3..3fee196 100644
--- a/payload_consumer/fec_file_descriptor.cc
+++ b/payload_consumer/fec_file_descriptor.cc
@@ -16,6 +16,8 @@
 
 #include "update_engine/payload_consumer/fec_file_descriptor.h"
 
+#include <base/logging.h>
+
 namespace chromeos_update_engine {
 
 bool FecFileDescriptor::Open(const char* path, int flags) {
diff --git a/payload_consumer/file_descriptor.h b/payload_consumer/file_descriptor.h
index 55f76c6..fb07ff0 100644
--- a/payload_consumer/file_descriptor.h
+++ b/payload_consumer/file_descriptor.h
@@ -21,7 +21,7 @@
 #include <sys/types.h>
 #include <memory>
 
-#include <base/logging.h>
+#include <base/macros.h>
 
 // Abstraction for managing opening, reading, writing and closing of file
 // descriptors. This includes an abstract class and one standard implementation
diff --git a/payload_consumer/partition_update_generator_android.cc b/payload_consumer/partition_update_generator_android.cc
index 5768dd6..d5d5313 100644
--- a/payload_consumer/partition_update_generator_android.cc
+++ b/payload_consumer/partition_update_generator_android.cc
@@ -18,30 +18,23 @@
 
 #include <filesystem>
 #include <memory>
-#include <set>
-#include <string_view>
 #include <utility>
 
+#include <android-base/properties.h>
 #include <android-base/strings.h>
 #include <base/logging.h>
+#include <base/strings/string_split.h>
 
+#include "update_engine/common/boot_control_interface.h"
 #include "update_engine/common/hash_calculator.h"
 #include "update_engine/common/utils.h"
 
-namespace {
-// TODO(xunchang) use definition in fs_mgr, e.g. fs_mgr_get_slot_suffix
-const char* SUFFIX_A = "_a";
-const char* SUFFIX_B = "_b";
-}  // namespace
-
 namespace chromeos_update_engine {
 
 PartitionUpdateGeneratorAndroid::PartitionUpdateGeneratorAndroid(
     BootControlInterface* boot_control,
-    std::string device_dir,
     size_t block_size)
     : boot_control_(boot_control),
-      block_device_dir_(std::move(device_dir)),
       block_size_(block_size) {}
 
 bool PartitionUpdateGeneratorAndroid::
@@ -50,22 +43,57 @@
         BootControlInterface::Slot target_slot,
         const std::set<std::string>& partitions_in_payload,
         std::vector<PartitionUpdate>* update_list) {
-  auto ab_partitions = GetStaticAbPartitionsOnDevice();
-  if (!ab_partitions.has_value()) {
+  auto ab_partitions = GetAbPartitionsOnDevice();
+  if (ab_partitions.empty()) {
     LOG(ERROR) << "Failed to load static a/b partitions";
     return false;
   }
 
   std::vector<PartitionUpdate> partition_updates;
-  for (const auto& partition_name : ab_partitions.value()) {
+  for (const auto& partition_name : ab_partitions) {
     if (partitions_in_payload.find(partition_name) !=
         partitions_in_payload.end()) {
       LOG(INFO) << partition_name << " has included in payload";
       continue;
     }
+    bool is_source_dynamic = false;
+    std::string source_device;
 
-    auto partition_update =
-        CreatePartitionUpdate(partition_name, source_slot, target_slot);
+    TEST_AND_RETURN_FALSE(
+        boot_control_->GetPartitionDevice(partition_name,
+                                          source_slot,
+                                          true, /* not_in_payload */
+                                          &source_device,
+                                          &is_source_dynamic));
+    bool is_target_dynamic = false;
+    std::string target_device;
+    TEST_AND_RETURN_FALSE(boot_control_->GetPartitionDevice(
+        partition_name, target_slot, true, &target_device, &is_target_dynamic));
+
+    if (is_source_dynamic || is_target_dynamic) {
+      if (is_source_dynamic != is_target_dynamic) {
+        LOG(ERROR) << "Partition " << partition_name << " is expected to be a"
+                   << " static partition. source slot is "
+                   << (is_source_dynamic ? "" : "not")
+                   << " dynamic, and target slot " << target_slot << " is "
+                   << (is_target_dynamic ? "" : "not") << " dynamic.";
+        return false;
+      } else {
+        continue;
+      }
+    }
+
+    auto source_size = utils::FileSize(source_device);
+    auto target_size = utils::FileSize(target_device);
+    if (source_size == -1 || target_size == -1 || source_size != target_size ||
+        source_size % block_size_ != 0) {
+      LOG(ERROR) << "Invalid partition size. source size " << source_size
+                 << ", target size " << target_size;
+      return false;
+    }
+
+    auto partition_update = CreatePartitionUpdate(
+        partition_name, source_device, target_device, source_size);
     if (!partition_update.has_value()) {
       LOG(ERROR) << "Failed to create partition update for " << partition_name;
       return false;
@@ -76,98 +104,14 @@
   return true;
 }
 
-std::optional<std::set<std::string>>
-PartitionUpdateGeneratorAndroid::GetStaticAbPartitionsOnDevice() {
-  if (std::error_code error_code;
-      !std::filesystem::exists(block_device_dir_, error_code) || error_code) {
-    LOG(ERROR) << "Failed to find " << block_device_dir_ << " "
-               << error_code.message();
-    return std::nullopt;
-  }
-
-  std::error_code error_code;
-  auto it = std::filesystem::directory_iterator(block_device_dir_, error_code);
-  if (error_code) {
-    LOG(ERROR) << "Failed to iterate " << block_device_dir_ << " "
-               << error_code.message();
-    return std::nullopt;
-  }
-
-  std::set<std::string> partitions_with_suffix;
-  for (const auto& entry : it) {
-    auto partition_name = entry.path().filename().string();
-    if (android::base::EndsWith(partition_name, SUFFIX_A) ||
-        android::base::EndsWith(partition_name, SUFFIX_B)) {
-      partitions_with_suffix.insert(partition_name);
-    }
-  }
-
-  // Second iteration to add the partition name without suffixes.
-  std::set<std::string> ab_partitions;
-  for (std::string_view name : partitions_with_suffix) {
-    if (!android::base::ConsumeSuffix(&name, SUFFIX_A)) {
-      continue;
-    }
-
-    // Add to the output list if the partition exist for both slot a and b.
-    auto base_name = std::string(name);
-    if (partitions_with_suffix.find(base_name + SUFFIX_B) !=
-        partitions_with_suffix.end()) {
-      ab_partitions.insert(base_name);
-    } else {
-      LOG(WARNING) << "Failed to find the b partition for " << base_name;
-    }
-  }
-
-  return ab_partitions;
-}
-
-std::optional<PartitionUpdate>
-PartitionUpdateGeneratorAndroid::CreatePartitionUpdate(
-    const std::string& partition_name,
-    BootControlInterface::Slot source_slot,
-    BootControlInterface::Slot target_slot) {
-  bool is_source_dynamic = false;
-  std::string source_device;
-  if (!boot_control_->GetPartitionDevice(partition_name,
-                                         source_slot,
-                                         true, /* not_in_payload */
-                                         &source_device,
-                                         &is_source_dynamic)) {
-    LOG(ERROR) << "Failed to load source " << partition_name;
-    return std::nullopt;
-  }
-  bool is_target_dynamic = false;
-  std::string target_device;
-  if (!boot_control_->GetPartitionDevice(partition_name,
-                                         target_slot,
-                                         true,
-                                         &target_device,
-                                         &is_target_dynamic)) {
-    LOG(ERROR) << "Failed to load target " << partition_name;
-    return std::nullopt;
-  }
-
-  if (is_source_dynamic || is_target_dynamic) {
-    LOG(ERROR) << "Partition " << partition_name << " is expected to be a"
-               << " static partition. source slot is "
-               << (is_source_dynamic ? "" : "not")
-               << " dynamic, and target slot " << target_slot << " is "
-               << (is_target_dynamic ? "" : "not") << " dynamic.";
-    return std::nullopt;
-  }
-
-  auto source_size = utils::FileSize(source_device);
-  auto target_size = utils::FileSize(target_device);
-  if (source_size == -1 || target_size == -1 || source_size != target_size ||
-      source_size % block_size_ != 0) {
-    LOG(ERROR) << "Invalid partition size. source size " << source_size
-               << ", target size " << target_size;
-    return std::nullopt;
-  }
-
-  return CreatePartitionUpdate(
-      partition_name, source_device, target_device, source_size);
+std::vector<std::string>
+PartitionUpdateGeneratorAndroid::GetAbPartitionsOnDevice() const {
+  auto partition_list_str =
+      android::base::GetProperty("ro.product.ab_ota_partitions", "");
+  return base::SplitString(partition_list_str,
+                           ",",
+                           base::TRIM_WHITESPACE,
+                           base::SPLIT_WANT_NONEMPTY);
 }
 
 std::optional<PartitionUpdate>
@@ -183,6 +127,8 @@
 
   auto raw_hash = CalculateHashForPartition(source_device, partition_size);
   if (!raw_hash.has_value()) {
+    LOG(ERROR) << "Failed to calculate hash for partition " << source_device
+               << " size: " << partition_size;
     return {};
   }
   old_partition_info->set_hash(raw_hash->data(), raw_hash->size());
@@ -225,16 +171,9 @@
 std::unique_ptr<PartitionUpdateGeneratorInterface> Create(
     BootControlInterface* boot_control, size_t block_size) {
   CHECK(boot_control);
-  auto dynamic_control = boot_control->GetDynamicPartitionControl();
-  CHECK(dynamic_control);
-  std::string dir_path;
-  if (!dynamic_control->GetDeviceDir(&dir_path)) {
-    return nullptr;
-  }
 
   return std::unique_ptr<PartitionUpdateGeneratorInterface>(
-      new PartitionUpdateGeneratorAndroid(
-          boot_control, std::move(dir_path), block_size));
+      new PartitionUpdateGeneratorAndroid(boot_control, block_size));
 }
 }  // namespace partition_update_generator
 
diff --git a/payload_consumer/partition_update_generator_android.h b/payload_consumer/partition_update_generator_android.h
index 97b7d83..0330c99 100644
--- a/payload_consumer/partition_update_generator_android.h
+++ b/payload_consumer/partition_update_generator_android.h
@@ -29,11 +29,11 @@
 #include "update_engine/payload_consumer/partition_update_generator_interface.h"
 
 namespace chromeos_update_engine {
+
 class PartitionUpdateGeneratorAndroid
     : public PartitionUpdateGeneratorInterface {
  public:
   PartitionUpdateGeneratorAndroid(BootControlInterface* boot_control,
-                                  std::string device_dir,
                                   size_t block_size);
 
   bool GenerateOperationsForPartitionsNotInPayload(
@@ -41,15 +41,13 @@
       BootControlInterface::Slot target_slot,
       const std::set<std::string>& partitions_in_payload,
       std::vector<PartitionUpdate>* update_list) override;
+  virtual std::vector<std::string> GetAbPartitionsOnDevice() const;
 
  private:
   friend class PartitionUpdateGeneratorAndroidTest;
   FRIEND_TEST(PartitionUpdateGeneratorAndroidTest, GetStaticPartitions);
   FRIEND_TEST(PartitionUpdateGeneratorAndroidTest, CreatePartitionUpdate);
 
-  // Gets the name of the static a/b partitions on the device.
-  std::optional<std::set<std::string>> GetStaticAbPartitionsOnDevice();
-
   // Creates a PartitionUpdate object for a given partition to update from
   // source to target. Returns std::nullopt on failure.
   std::optional<PartitionUpdate> CreatePartitionUpdate(
@@ -58,17 +56,10 @@
       const std::string& target_device,
       int64_t partition_size);
 
-  std::optional<PartitionUpdate> CreatePartitionUpdate(
-      const std::string& partition_name,
-      BootControlInterface::Slot source_slot,
-      BootControlInterface::Slot target_slot);
-
   std::optional<brillo::Blob> CalculateHashForPartition(
       const std::string& block_device, int64_t partition_size);
 
   BootControlInterface* boot_control_;
-  // Path to look for a/b partitions
-  std::string block_device_dir_;
   size_t block_size_;
 };
 
diff --git a/payload_consumer/partition_update_generator_android_unittest.cc b/payload_consumer/partition_update_generator_android_unittest.cc
index c3be9db..86d025e 100644
--- a/payload_consumer/partition_update_generator_android_unittest.cc
+++ b/payload_consumer/partition_update_generator_android_unittest.cc
@@ -19,12 +19,14 @@
 #include <map>
 #include <memory>
 #include <set>
+#include <utility>
 #include <vector>
 
 #include <android-base/strings.h>
 #include <brillo/secure_blob.h>
 #include <gtest/gtest.h>
 
+#include "update_engine/common/boot_control_interface.h"
 #include "update_engine/common/fake_boot_control.h"
 #include "update_engine/common/hash_calculator.h"
 #include "update_engine/common/test_utils.h"
@@ -32,40 +34,53 @@
 
 namespace chromeos_update_engine {
 
+class FakePartitionUpdateGenerator : public PartitionUpdateGeneratorAndroid {
+ public:
+  std::vector<std::string> GetAbPartitionsOnDevice() const {
+    return ab_partitions_;
+  }
+  using PartitionUpdateGeneratorAndroid::PartitionUpdateGeneratorAndroid;
+  std::vector<std::string> ab_partitions_;
+};
+
 class PartitionUpdateGeneratorAndroidTest : public ::testing::Test {
  protected:
   void SetUp() override {
     ASSERT_TRUE(device_dir_.CreateUniqueTempDir());
     boot_control_ = std::make_unique<FakeBootControl>();
-    boot_control_->SetNumSlots(2);
-    auto generator =
-        partition_update_generator::Create(boot_control_.get(), 4096);
-    generator_.reset(
-        static_cast<PartitionUpdateGeneratorAndroid*>(generator.release()));
     ASSERT_TRUE(boot_control_);
+    boot_control_->SetNumSlots(2);
+    generator_ = std::make_unique<FakePartitionUpdateGenerator>(
+        boot_control_.get(), 4096);
     ASSERT_TRUE(generator_);
-    generator_->block_device_dir_ = device_dir_.GetPath().value();
   }
 
-  std::unique_ptr<PartitionUpdateGeneratorAndroid> generator_;
+  std::unique_ptr<FakePartitionUpdateGenerator> generator_;
   std::unique_ptr<FakeBootControl> boot_control_;
 
   base::ScopedTempDir device_dir_;
+  std::map<std::string, std::string> device_map_;
 
   void SetUpBlockDevice(const std::map<std::string, std::string>& contents) {
+    std::set<std::string> partition_base_names;
     for (const auto& [name, content] : contents) {
-      auto path = generator_->block_device_dir_ + "/" + name;
+      auto path = device_dir_.GetPath().value() + "/" + name;
       ASSERT_TRUE(
           utils::WriteFile(path.c_str(), content.data(), content.size()));
 
       if (android::base::EndsWith(name, "_a")) {
-        boot_control_->SetPartitionDevice(
-            name.substr(0, name.size() - 2), 0, path);
+        auto prefix = name.substr(0, name.size() - 2);
+        boot_control_->SetPartitionDevice(prefix, 0, path);
+        partition_base_names.emplace(prefix);
       } else if (android::base::EndsWith(name, "_b")) {
-        boot_control_->SetPartitionDevice(
-            name.substr(0, name.size() - 2), 1, path);
+        auto prefix = name.substr(0, name.size() - 2);
+        boot_control_->SetPartitionDevice(prefix, 1, path);
+        partition_base_names.emplace(prefix);
       }
+      device_map_[name] = std::move(path);
     }
+    generator_->ab_partitions_ = {partition_base_names.begin(),
+                                  partition_base_names.end()};
   }
 
   void CheckPartitionUpdate(const std::string& name,
@@ -95,25 +110,6 @@
   }
 };
 
-TEST_F(PartitionUpdateGeneratorAndroidTest, GetStaticPartitions) {
-  std::map<std::string, std::string> contents = {
-      {"system_a", ""},
-      {"system_b", ""},
-      {"vendor_a", ""},
-      {"vendor_b", ""},
-      {"persist", ""},
-      {"vbmeta_a", ""},
-      {"vbmeta_b", ""},
-      {"boot_a", ""},
-      {"boot_b", ""},
-  };
-
-  SetUpBlockDevice(contents);
-  auto partitions = generator_->GetStaticAbPartitionsOnDevice();
-  ASSERT_EQ(std::set<std::string>({"system", "vendor", "vbmeta", "boot"}),
-            partitions);
-}
-
 TEST_F(PartitionUpdateGeneratorAndroidTest, CreatePartitionUpdate) {
   auto system_contents = std::string(4096 * 2, '1');
   auto boot_contents = std::string(4096 * 5, 'b');
@@ -125,13 +121,14 @@
   };
   SetUpBlockDevice(contents);
 
-  auto system_partition_update =
-      generator_->CreatePartitionUpdate("system", 0, 1);
+  auto system_partition_update = generator_->CreatePartitionUpdate(
+      "system", device_map_["system_a"], device_map_["system_b"], 4096 * 2);
   ASSERT_TRUE(system_partition_update.has_value());
   CheckPartitionUpdate(
       "system", system_contents, system_partition_update.value());
 
-  auto boot_partition_update = generator_->CreatePartitionUpdate("boot", 0, 1);
+  auto boot_partition_update = generator_->CreatePartitionUpdate(
+      "boot", device_map_["boot_a"], device_map_["boot_b"], 4096 * 5);
   ASSERT_TRUE(boot_partition_update.has_value());
   CheckPartitionUpdate("boot", boot_contents, boot_partition_update.value());
 }
diff --git a/payload_consumer/partition_update_generator_stub.cc b/payload_consumer/partition_update_generator_stub.cc
index e2b64ec..cfbd5e1 100644
--- a/payload_consumer/partition_update_generator_stub.cc
+++ b/payload_consumer/partition_update_generator_stub.cc
@@ -30,7 +30,7 @@
 
 namespace partition_update_generator {
 std::unique_ptr<PartitionUpdateGeneratorInterface> Create(
-    BootControlInterface* boot_control) {
+    BootControlInterface* boot_control, size_t block_size) {
   return std::make_unique<PartitionUpdateGeneratorStub>();
 }
 }  // namespace partition_update_generator
diff --git a/payload_consumer/partition_writer.cc b/payload_consumer/partition_writer.cc
new file mode 100644
index 0000000..d47ebee
--- /dev/null
+++ b/payload_consumer/partition_writer.cc
@@ -0,0 +1,644 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+#include <update_engine/payload_consumer/partition_writer.h>
+
+#include <fcntl.h>
+#include <linux/fs.h>
+
+#include <algorithm>
+#include <initializer_list>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include <base/strings/string_number_conversions.h>
+#include <bsdiff/bspatch.h>
+#include <puffin/puffpatch.h>
+#include <bsdiff/file_interface.h>
+#include <puffin/stream.h>
+
+#include "update_engine/common/terminator.h"
+#include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/bzip_extent_writer.h"
+#include "update_engine/payload_consumer/cached_file_descriptor.h"
+#include "update_engine/payload_consumer/extent_reader.h"
+#include "update_engine/payload_consumer/extent_writer.h"
+#include "update_engine/payload_consumer/fec_file_descriptor.h"
+#include "update_engine/payload_consumer/file_descriptor_utils.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_consumer/mount_history.h"
+#include "update_engine/payload_consumer/payload_constants.h"
+#include "update_engine/payload_consumer/xz_extent_writer.h"
+
+namespace chromeos_update_engine {
+
+namespace {
+constexpr uint64_t kCacheSize = 1024 * 1024;  // 1MB
+
+// Discard the tail of the block device referenced by |fd|, from the offset
+// |data_size| until the end of the block device. Returns whether the data was
+// discarded.
+
+bool DiscardPartitionTail(const FileDescriptorPtr& fd, uint64_t data_size) {
+  uint64_t part_size = fd->BlockDevSize();
+  if (!part_size || part_size <= data_size)
+    return false;
+
+  struct blkioctl_request {
+    int number;
+    const char* name;
+  };
+  const std::initializer_list<blkioctl_request> blkioctl_requests = {
+      {BLKDISCARD, "BLKDISCARD"},
+      {BLKSECDISCARD, "BLKSECDISCARD"},
+#ifdef BLKZEROOUT
+      {BLKZEROOUT, "BLKZEROOUT"},
+#endif
+  };
+  for (const auto& req : blkioctl_requests) {
+    int error = 0;
+    if (fd->BlkIoctl(req.number, data_size, part_size - data_size, &error) &&
+        error == 0) {
+      return true;
+    }
+    LOG(WARNING) << "Error discarding the last "
+                 << (part_size - data_size) / 1024 << " KiB using ioctl("
+                 << req.name << ")";
+  }
+  return false;
+}
+
+}  // namespace
+
+// Opens path for read/write. On success returns an open FileDescriptor
+// and sets *err to 0. On failure, sets *err to errno and returns nullptr.
+FileDescriptorPtr OpenFile(const char* path,
+                           int mode,
+                           bool cache_writes,
+                           int* err) {
+  // Try to mark the block device read-only based on the mode. Ignore any
+  // failure since this won't work when passing regular files.
+  bool read_only = (mode & O_ACCMODE) == O_RDONLY;
+  utils::SetBlockDeviceReadOnly(path, read_only);
+
+  FileDescriptorPtr fd(new EintrSafeFileDescriptor());
+  if (cache_writes && !read_only) {
+    fd = FileDescriptorPtr(new CachedFileDescriptor(fd, kCacheSize));
+    LOG(INFO) << "Caching writes.";
+  }
+  if (!fd->Open(path, mode, 000)) {
+    *err = errno;
+    PLOG(ERROR) << "Unable to open file " << path;
+    return nullptr;
+  }
+  *err = 0;
+  return fd;
+}
+
+class BsdiffExtentFile : public bsdiff::FileInterface {
+ public:
+  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader, size_t size)
+      : BsdiffExtentFile(std::move(reader), nullptr, size) {}
+  BsdiffExtentFile(std::unique_ptr<ExtentWriter> writer, size_t size)
+      : BsdiffExtentFile(nullptr, std::move(writer), size) {}
+
+  ~BsdiffExtentFile() override = default;
+
+  bool Read(void* buf, size_t count, size_t* bytes_read) override {
+    TEST_AND_RETURN_FALSE(reader_->Read(buf, count));
+    *bytes_read = count;
+    offset_ += count;
+    return true;
+  }
+
+  bool Write(const void* buf, size_t count, size_t* bytes_written) override {
+    TEST_AND_RETURN_FALSE(writer_->Write(buf, count));
+    *bytes_written = count;
+    offset_ += count;
+    return true;
+  }
+
+  bool Seek(off_t pos) override {
+    if (reader_ != nullptr) {
+      TEST_AND_RETURN_FALSE(reader_->Seek(pos));
+      offset_ = pos;
+    } else {
+      // For writes technically there should be no change of position, or it
+      // should be equivalent of current offset.
+      TEST_AND_RETURN_FALSE(offset_ == static_cast<uint64_t>(pos));
+    }
+    return true;
+  }
+
+  bool Close() override { return true; }
+
+  bool GetSize(uint64_t* size) override {
+    *size = size_;
+    return true;
+  }
+
+ private:
+  BsdiffExtentFile(std::unique_ptr<ExtentReader> reader,
+                   std::unique_ptr<ExtentWriter> writer,
+                   size_t size)
+      : reader_(std::move(reader)),
+        writer_(std::move(writer)),
+        size_(size),
+        offset_(0) {}
+
+  std::unique_ptr<ExtentReader> reader_;
+  std::unique_ptr<ExtentWriter> writer_;
+  uint64_t size_;
+  uint64_t offset_;
+
+  DISALLOW_COPY_AND_ASSIGN(BsdiffExtentFile);
+};
+// A class to be passed to |puffpatch| for reading from |source_fd_| and writing
+// into |target_fd_|.
+class PuffinExtentStream : public puffin::StreamInterface {
+ public:
+  // Constructor for creating a stream for reading from an |ExtentReader|.
+  PuffinExtentStream(std::unique_ptr<ExtentReader> reader, uint64_t size)
+      : PuffinExtentStream(std::move(reader), nullptr, size) {}
+
+  // Constructor for creating a stream for writing to an |ExtentWriter|.
+  PuffinExtentStream(std::unique_ptr<ExtentWriter> writer, uint64_t size)
+      : PuffinExtentStream(nullptr, std::move(writer), size) {}
+
+  ~PuffinExtentStream() override = default;
+
+  bool GetSize(uint64_t* size) const override {
+    *size = size_;
+    return true;
+  }
+
+  bool GetOffset(uint64_t* offset) const override {
+    *offset = offset_;
+    return true;
+  }
+
+  bool Seek(uint64_t offset) override {
+    if (is_read_) {
+      TEST_AND_RETURN_FALSE(reader_->Seek(offset));
+      offset_ = offset;
+    } else {
+      // For writes technically there should be no change of position, or it
+      // should equivalent of current offset.
+      TEST_AND_RETURN_FALSE(offset_ == offset);
+    }
+    return true;
+  }
+
+  bool Read(void* buffer, size_t count) override {
+    TEST_AND_RETURN_FALSE(is_read_);
+    TEST_AND_RETURN_FALSE(reader_->Read(buffer, count));
+    offset_ += count;
+    return true;
+  }
+
+  bool Write(const void* buffer, size_t count) override {
+    TEST_AND_RETURN_FALSE(!is_read_);
+    TEST_AND_RETURN_FALSE(writer_->Write(buffer, count));
+    offset_ += count;
+    return true;
+  }
+
+  bool Close() override { return true; }
+
+ private:
+  PuffinExtentStream(std::unique_ptr<ExtentReader> reader,
+                     std::unique_ptr<ExtentWriter> writer,
+                     uint64_t size)
+      : reader_(std::move(reader)),
+        writer_(std::move(writer)),
+        size_(size),
+        offset_(0),
+        is_read_(reader_ ? true : false) {}
+
+  std::unique_ptr<ExtentReader> reader_;
+  std::unique_ptr<ExtentWriter> writer_;
+  uint64_t size_;
+  uint64_t offset_;
+  bool is_read_;
+
+  DISALLOW_COPY_AND_ASSIGN(PuffinExtentStream);
+};
+
+PartitionWriter::PartitionWriter(
+    const PartitionUpdate& partition_update,
+    const InstallPlan::Partition& install_part,
+    DynamicPartitionControlInterface* dynamic_control,
+    size_t block_size,
+    bool is_interactive)
+    : partition_update_(partition_update),
+      install_part_(install_part),
+      dynamic_control_(dynamic_control),
+      interactive_(is_interactive),
+      block_size_(block_size) {}
+
+PartitionWriter::~PartitionWriter() {
+  Close();
+}
+
+bool PartitionWriter::Init(const InstallPlan* install_plan,
+                           bool source_may_exist) {
+  const PartitionUpdate& partition = partition_update_;
+  uint32_t source_slot = install_plan->source_slot;
+  uint32_t target_slot = install_plan->target_slot;
+
+  // We shouldn't open the source partition in certain cases, e.g. some dynamic
+  // partitions in delta payload, partitions included in the full payload for
+  // partial updates. Use the source size as the indicator.
+  if (source_may_exist && install_part_.source_size > 0) {
+    source_path_ = install_part_.source_path;
+    int err;
+    source_fd_ = OpenFile(source_path_.c_str(), O_RDONLY, false, &err);
+    if (!source_fd_) {
+      LOG(ERROR) << "Unable to open source partition "
+                 << partition.partition_name() << " on slot "
+                 << BootControlInterface::SlotName(source_slot) << ", file "
+                 << source_path_;
+      return false;
+    }
+  }
+
+  target_path_ = install_part_.target_path;
+  int err;
+
+  int flags = O_RDWR;
+  if (!interactive_)
+    flags |= O_DSYNC;
+
+  LOG(INFO) << "Opening " << target_path_ << " partition with"
+            << (interactive_ ? "out" : "") << " O_DSYNC";
+
+  target_fd_ = OpenFile(target_path_.c_str(), flags, true, &err);
+  if (!target_fd_) {
+    LOG(ERROR) << "Unable to open target partition "
+               << partition.partition_name() << " on slot "
+               << BootControlInterface::SlotName(target_slot) << ", file "
+               << target_path_;
+    return false;
+  }
+
+  LOG(INFO) << "Applying " << partition.operations().size()
+            << " operations to partition \"" << partition.partition_name()
+            << "\"";
+
+  // Discard the end of the partition, but ignore failures.
+  DiscardPartitionTail(target_fd_, install_part_.target_size);
+
+  return true;
+}
+
+bool PartitionWriter::PerformReplaceOperation(const InstallOperation& operation,
+                                              const void* data,
+                                              size_t count) {
+  // Setup the ExtentWriter stack based on the operation type.
+  std::unique_ptr<ExtentWriter> writer = std::make_unique<DirectExtentWriter>();
+
+  if (operation.type() == InstallOperation::REPLACE_BZ) {
+    writer.reset(new BzipExtentWriter(std::move(writer)));
+  } else if (operation.type() == InstallOperation::REPLACE_XZ) {
+    writer.reset(new XzExtentWriter(std::move(writer)));
+  }
+
+  TEST_AND_RETURN_FALSE(
+      writer->Init(target_fd_, operation.dst_extents(), block_size_));
+  TEST_AND_RETURN_FALSE(writer->Write(data, operation.data_length()));
+
+  return target_fd_->Flush();
+}
+
+bool PartitionWriter::PerformZeroOrDiscardOperation(
+    const InstallOperation& operation) {
+#ifdef BLKZEROOUT
+  bool attempt_ioctl = true;
+  int request =
+      (operation.type() == InstallOperation::ZERO ? BLKZEROOUT : BLKDISCARD);
+#else   // !defined(BLKZEROOUT)
+  bool attempt_ioctl = false;
+  int request = 0;
+#endif  // !defined(BLKZEROOUT)
+
+  brillo::Blob zeros;
+  for (const Extent& extent : operation.dst_extents()) {
+    const uint64_t start = extent.start_block() * block_size_;
+    const uint64_t length = extent.num_blocks() * block_size_;
+    if (attempt_ioctl) {
+      int result = 0;
+      if (target_fd_->BlkIoctl(request, start, length, &result) && result == 0)
+        continue;
+      attempt_ioctl = false;
+    }
+    // In case of failure, we fall back to writing 0 to the selected region.
+    zeros.resize(16 * block_size_);
+    for (uint64_t offset = 0; offset < length; offset += zeros.size()) {
+      uint64_t chunk_length =
+          std::min(length - offset, static_cast<uint64_t>(zeros.size()));
+      TEST_AND_RETURN_FALSE(utils::PWriteAll(
+          target_fd_, zeros.data(), chunk_length, start + offset));
+    }
+  }
+  return target_fd_->Flush();
+}
+
+bool PartitionWriter::PerformSourceCopyOperation(
+    const InstallOperation& operation, ErrorCode* error) {
+  TEST_AND_RETURN_FALSE(source_fd_ != nullptr);
+
+  // The device may optimize the SOURCE_COPY operation.
+  // Being this a device-specific optimization let DynamicPartitionController
+  // decide it the operation should be skipped.
+  const PartitionUpdate& partition = partition_update_;
+  const auto& partition_control = dynamic_control_;
+
+  InstallOperation buf;
+  bool should_optimize = partition_control->OptimizeOperation(
+      partition.partition_name(), operation, &buf);
+  const InstallOperation& optimized = should_optimize ? buf : operation;
+
+  if (operation.has_src_sha256_hash()) {
+    bool read_ok;
+    brillo::Blob source_hash;
+    brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
+                                      operation.src_sha256_hash().end());
+
+    // We fall back to use the error corrected device if the hash of the raw
+    // device doesn't match or there was an error reading the source partition.
+    // Note that this code will also fall back if writing the target partition
+    // fails.
+    if (should_optimize) {
+      // Hash operation.src_extents(), then copy optimized.src_extents to
+      // optimized.dst_extents.
+      read_ok =
+          fd_utils::ReadAndHashExtents(
+              source_fd_, operation.src_extents(), block_size_, &source_hash) &&
+          fd_utils::CopyAndHashExtents(source_fd_,
+                                       optimized.src_extents(),
+                                       target_fd_,
+                                       optimized.dst_extents(),
+                                       block_size_,
+                                       nullptr /* skip hashing */);
+    } else {
+      read_ok = fd_utils::CopyAndHashExtents(source_fd_,
+                                             operation.src_extents(),
+                                             target_fd_,
+                                             operation.dst_extents(),
+                                             block_size_,
+                                             &source_hash);
+    }
+    if (read_ok && expected_source_hash == source_hash)
+      return true;
+    LOG(WARNING) << "Source hash from RAW device mismatched, attempting to "
+                    "correct using ECC";
+    if (!OpenCurrentECCPartition()) {
+      // The following function call will return false since the source hash
+      // mismatches, but we still want to call it so it prints the appropriate
+      // log message.
+      return ValidateSourceHash(source_hash, operation, source_fd_, error);
+    }
+
+    LOG(WARNING) << "Source hash from RAW device mismatched: found "
+                 << base::HexEncode(source_hash.data(), source_hash.size())
+                 << ", expected "
+                 << base::HexEncode(expected_source_hash.data(),
+                                    expected_source_hash.size());
+    if (should_optimize) {
+      TEST_AND_RETURN_FALSE(fd_utils::ReadAndHashExtents(
+          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash));
+      TEST_AND_RETURN_FALSE(
+          fd_utils::CopyAndHashExtents(source_ecc_fd_,
+                                       optimized.src_extents(),
+                                       target_fd_,
+                                       optimized.dst_extents(),
+                                       block_size_,
+                                       nullptr /* skip hashing */));
+    } else {
+      TEST_AND_RETURN_FALSE(
+          fd_utils::CopyAndHashExtents(source_ecc_fd_,
+                                       operation.src_extents(),
+                                       target_fd_,
+                                       operation.dst_extents(),
+                                       block_size_,
+                                       &source_hash));
+    }
+    TEST_AND_RETURN_FALSE(
+        ValidateSourceHash(source_hash, operation, source_ecc_fd_, error));
+    // At this point reading from the error corrected device worked, but
+    // reading from the raw device failed, so this is considered a recovered
+    // failure.
+    source_ecc_recovered_failures_++;
+  } else {
+    // When the operation doesn't include a source hash, we attempt the error
+    // corrected device first since we can't verify the block in the raw device
+    // at this point, but we fall back to the raw device since the error
+    // corrected device can be shorter or not available.
+
+    if (OpenCurrentECCPartition() &&
+        fd_utils::CopyAndHashExtents(source_ecc_fd_,
+                                     optimized.src_extents(),
+                                     target_fd_,
+                                     optimized.dst_extents(),
+                                     block_size_,
+                                     nullptr)) {
+      return true;
+    }
+    TEST_AND_RETURN_FALSE(fd_utils::CopyAndHashExtents(source_fd_,
+                                                       optimized.src_extents(),
+                                                       target_fd_,
+                                                       optimized.dst_extents(),
+                                                       block_size_,
+                                                       nullptr));
+  }
+  return target_fd_->Flush();
+}
+bool PartitionWriter::PerformSourceBsdiffOperation(
+    const InstallOperation& operation,
+    ErrorCode* error,
+    const void* data,
+    size_t count) {
+  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
+  TEST_AND_RETURN_FALSE(source_fd != nullptr);
+
+  auto reader = std::make_unique<DirectExtentReader>();
+  TEST_AND_RETURN_FALSE(
+      reader->Init(source_fd, operation.src_extents(), block_size_));
+  auto src_file = std::make_unique<BsdiffExtentFile>(
+      std::move(reader),
+      utils::BlocksInExtents(operation.src_extents()) * block_size_);
+
+  auto writer = std::make_unique<DirectExtentWriter>();
+  TEST_AND_RETURN_FALSE(
+      writer->Init(target_fd_, operation.dst_extents(), block_size_));
+  auto dst_file = std::make_unique<BsdiffExtentFile>(
+      std::move(writer),
+      utils::BlocksInExtents(operation.dst_extents()) * block_size_);
+
+  TEST_AND_RETURN_FALSE(bsdiff::bspatch(std::move(src_file),
+                                        std::move(dst_file),
+                                        reinterpret_cast<const uint8_t*>(data),
+                                        count) == 0);
+  return target_fd_->Flush();
+}
+
+bool PartitionWriter::PerformPuffDiffOperation(
+    const InstallOperation& operation,
+    ErrorCode* error,
+    const void* data,
+    size_t count) {
+  FileDescriptorPtr source_fd = ChooseSourceFD(operation, error);
+  TEST_AND_RETURN_FALSE(source_fd != nullptr);
+
+  auto reader = std::make_unique<DirectExtentReader>();
+  TEST_AND_RETURN_FALSE(
+      reader->Init(source_fd, operation.src_extents(), block_size_));
+  puffin::UniqueStreamPtr src_stream(new PuffinExtentStream(
+      std::move(reader),
+      utils::BlocksInExtents(operation.src_extents()) * block_size_));
+
+  auto writer = std::make_unique<DirectExtentWriter>();
+  TEST_AND_RETURN_FALSE(
+      writer->Init(target_fd_, operation.dst_extents(), block_size_));
+  puffin::UniqueStreamPtr dst_stream(new PuffinExtentStream(
+      std::move(writer),
+      utils::BlocksInExtents(operation.dst_extents()) * block_size_));
+
+  constexpr size_t kMaxCacheSize = 5 * 1024 * 1024;  // Total 5MB cache.
+  TEST_AND_RETURN_FALSE(
+      puffin::PuffPatch(std::move(src_stream),
+                        std::move(dst_stream),
+                        reinterpret_cast<const uint8_t*>(data),
+                        count,
+                        kMaxCacheSize));
+  return target_fd_->Flush();
+}
+
+FileDescriptorPtr PartitionWriter::ChooseSourceFD(
+    const InstallOperation& operation, ErrorCode* error) {
+  if (source_fd_ == nullptr) {
+    LOG(ERROR) << "ChooseSourceFD fail: source_fd_ == nullptr";
+    return nullptr;
+  }
+
+  if (!operation.has_src_sha256_hash()) {
+    // When the operation doesn't include a source hash, we attempt the error
+    // corrected device first since we can't verify the block in the raw device
+    // at this point, but we first need to make sure all extents are readable
+    // since the error corrected device can be shorter or not available.
+    if (OpenCurrentECCPartition() &&
+        fd_utils::ReadAndHashExtents(
+            source_ecc_fd_, operation.src_extents(), block_size_, nullptr)) {
+      return source_ecc_fd_;
+    }
+    return source_fd_;
+  }
+
+  brillo::Blob source_hash;
+  brillo::Blob expected_source_hash(operation.src_sha256_hash().begin(),
+                                    operation.src_sha256_hash().end());
+  if (fd_utils::ReadAndHashExtents(
+          source_fd_, operation.src_extents(), block_size_, &source_hash) &&
+      source_hash == expected_source_hash) {
+    return source_fd_;
+  }
+  // We fall back to use the error corrected device if the hash of the raw
+  // device doesn't match or there was an error reading the source partition.
+  if (!OpenCurrentECCPartition()) {
+    // The following function call will return false since the source hash
+    // mismatches, but we still want to call it so it prints the appropriate
+    // log message.
+    ValidateSourceHash(source_hash, operation, source_fd_, error);
+    return nullptr;
+  }
+  LOG(WARNING) << "Source hash from RAW device mismatched: found "
+               << base::HexEncode(source_hash.data(), source_hash.size())
+               << ", expected "
+               << base::HexEncode(expected_source_hash.data(),
+                                  expected_source_hash.size());
+
+  if (fd_utils::ReadAndHashExtents(
+          source_ecc_fd_, operation.src_extents(), block_size_, &source_hash) &&
+      ValidateSourceHash(source_hash, operation, source_ecc_fd_, error)) {
+    // At this point reading from the error corrected device worked, but
+    // reading from the raw device failed, so this is considered a recovered
+    // failure.
+    source_ecc_recovered_failures_++;
+    return source_ecc_fd_;
+  }
+  return nullptr;
+}
+
+bool PartitionWriter::OpenCurrentECCPartition() {
+  // No support for ECC for full payloads.
+  // Full payload should not have any opeartion that requires ECC partitions.
+  if (source_ecc_fd_)
+    return true;
+
+  if (source_ecc_open_failure_)
+    return false;
+
+#if USE_FEC
+  const PartitionUpdate& partition = partition_update_;
+  const InstallPlan::Partition& install_part = install_part_;
+  std::string path = install_part.source_path;
+  FileDescriptorPtr fd(new FecFileDescriptor());
+  if (!fd->Open(path.c_str(), O_RDONLY, 0)) {
+    PLOG(ERROR) << "Unable to open ECC source partition "
+                << partition.partition_name() << ", file " << path;
+    source_ecc_open_failure_ = true;
+    return false;
+  }
+  source_ecc_fd_ = fd;
+#else
+  // No support for ECC compiled.
+  source_ecc_open_failure_ = true;
+#endif  // USE_FEC
+
+  return !source_ecc_open_failure_;
+}
+
+int PartitionWriter::Close() {
+  int err = 0;
+  if (source_fd_ && !source_fd_->Close()) {
+    err = errno;
+    PLOG(ERROR) << "Error closing source partition";
+    if (!err)
+      err = 1;
+  }
+  source_fd_.reset();
+  source_path_.clear();
+
+  if (target_fd_ && !target_fd_->Close()) {
+    err = errno;
+    PLOG(ERROR) << "Error closing target partition";
+    if (!err)
+      err = 1;
+  }
+  target_fd_.reset();
+  target_path_.clear();
+
+  if (source_ecc_fd_ && !source_ecc_fd_->Close()) {
+    err = errno;
+    PLOG(ERROR) << "Error closing ECC source partition";
+    if (!err)
+      err = 1;
+  }
+  source_ecc_fd_.reset();
+  source_ecc_open_failure_ = false;
+  return -err;
+}
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/partition_writer.h b/payload_consumer/partition_writer.h
new file mode 100644
index 0000000..624a411
--- /dev/null
+++ b/payload_consumer/partition_writer.h
@@ -0,0 +1,113 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef UPDATE_ENGINE_PARTITION_WRITER_H_
+#define UPDATE_ENGINE_PARTITION_WRITER_H_
+
+#include <cstdint>
+#include <string>
+
+#include <brillo/secure_blob.h>
+#include <gtest/gtest_prod.h>
+
+#include "update_engine/common/dynamic_partition_control_interface.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/update_metadata.pb.h"
+namespace chromeos_update_engine {
+class PartitionWriter {
+ public:
+  PartitionWriter(const PartitionUpdate& partition_update,
+                  const InstallPlan::Partition& install_part,
+                  DynamicPartitionControlInterface* dynamic_control,
+                  size_t block_size,
+                  bool is_interactive);
+  ~PartitionWriter();
+  static bool ValidateSourceHash(const brillo::Blob& calculated_hash,
+                                 const InstallOperation& operation,
+                                 const FileDescriptorPtr source_fd,
+                                 ErrorCode* error);
+
+  // Perform necessary initialization work before InstallOperation can be
+  // applied to this partition
+  [[nodiscard]] bool Init(const InstallPlan* install_plan,
+                          bool source_may_exist);
+
+  int Close();
+
+  // These perform a specific type of operation and return true on success.
+  // |error| will be set if source hash mismatch, otherwise |error| might not be
+  // set even if it fails.
+  [[nodiscard]] bool PerformReplaceOperation(const InstallOperation& operation,
+                                             const void* data,
+                                             size_t count);
+  [[nodiscard]] bool PerformZeroOrDiscardOperation(
+      const InstallOperation& operation);
+
+  [[nodiscard]] bool PerformSourceCopyOperation(
+      const InstallOperation& operation, ErrorCode* error);
+  [[nodiscard]] bool PerformSourceBsdiffOperation(
+      const InstallOperation& operation,
+      ErrorCode* error,
+      const void* data,
+      size_t count);
+  [[nodiscard]] bool PerformPuffDiffOperation(const InstallOperation& operation,
+                                              ErrorCode* error,
+                                              const void* data,
+                                              size_t count);
+
+ private:
+  friend class PartitionWriterTest;
+  FRIEND_TEST(PartitionWriterTest, ChooseSourceFDTest);
+
+  bool OpenCurrentECCPartition();
+  // For a given operation, choose the source fd to be used (raw device or error
+  // correction device) based on the source operation hash.
+  // Returns nullptr if the source hash mismatch cannot be corrected, and set
+  // the |error| accordingly.
+  FileDescriptorPtr ChooseSourceFD(const InstallOperation& operation,
+                                   ErrorCode* error);
+
+  const PartitionUpdate& partition_update_;
+  const InstallPlan::Partition& install_part_;
+  DynamicPartitionControlInterface* dynamic_control_;
+  // Path to source partition
+  std::string source_path_;
+  // Path to target partition
+  std::string target_path_;
+  FileDescriptorPtr source_fd_;
+  FileDescriptorPtr target_fd_;
+  const bool interactive_;
+  const size_t block_size_;
+  // File descriptor of the error corrected source partition. Only set while
+  // updating partition using a delta payload for a partition where error
+  // correction is available. The size of the error corrected device is smaller
+  // than the underlying raw device, since it doesn't include the error
+  // correction blocks.
+  FileDescriptorPtr source_ecc_fd_{nullptr};
+
+  // The total number of operations that failed source hash verification but
+  // passed after falling back to the error-corrected |source_ecc_fd_| device.
+  uint64_t source_ecc_recovered_failures_{0};
+
+  // Whether opening the current partition as an error-corrected device failed.
+  // Used to avoid re-opening the same source partition if it is not actually
+  // error corrected.
+  bool source_ecc_open_failure_{false};
+};
+}  // namespace chromeos_update_engine
+
+#endif
diff --git a/payload_consumer/partition_writer_unittest.cc b/payload_consumer/partition_writer_unittest.cc
new file mode 100644
index 0000000..c1ff4f4
--- /dev/null
+++ b/payload_consumer/partition_writer_unittest.cc
@@ -0,0 +1,203 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <memory>
+#include <vector>
+
+#include <brillo/secure_blob.h>
+#include <gtest/gtest.h>
+
+#include "update_engine/common/dynamic_partition_control_stub.h"
+#include "update_engine/common/error_code.h"
+#include "update_engine/common/fake_prefs.h"
+#include "update_engine/common/hash_calculator.h"
+#include "update_engine/common/test_utils.h"
+#include "update_engine/common/utils.h"
+#include "update_engine/payload_consumer/delta_performer.h"
+#include "update_engine/payload_consumer/extent_reader.h"
+#include "update_engine/payload_consumer/extent_writer.h"
+#include "update_engine/payload_consumer/fake_file_descriptor.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
+#include "update_engine/payload_consumer/install_plan.h"
+#include "update_engine/payload_generator/annotated_operation.h"
+#include "update_engine/payload_generator/delta_diff_generator.h"
+#include "update_engine/payload_generator/extent_ranges.h"
+#include "update_engine/payload_generator/payload_file.h"
+#include "update_engine/payload_generator/payload_generation_config.h"
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+
+class PartitionWriterTest : public testing::Test {
+ public:
+  // Helper function to pretend that the ECC file descriptor was already opened.
+  // Returns a pointer to the created file descriptor.
+  FakeFileDescriptor* SetFakeECCFile(size_t size) {
+    EXPECT_FALSE(writer_.source_ecc_fd_) << "source_ecc_fd_ already open.";
+    FakeFileDescriptor* ret = new FakeFileDescriptor();
+    fake_ecc_fd_.reset(ret);
+    // Call open to simulate it was already opened.
+    ret->Open("", 0);
+    ret->SetFileSize(size);
+    writer_.source_ecc_fd_ = fake_ecc_fd_;
+    return ret;
+  }
+
+  uint64_t GetSourceEccRecoveredFailures() const {
+    return writer_.source_ecc_recovered_failures_;
+  }
+
+  AnnotatedOperation GenerateSourceCopyOp(const brillo::Blob& copied_data,
+                                          bool add_hash,
+                                          PartitionConfig* old_part = nullptr) {
+    PayloadGenerationConfig config;
+    const uint64_t kDefaultBlockSize = config.block_size;
+    EXPECT_EQ(0U, copied_data.size() % kDefaultBlockSize);
+    uint64_t num_blocks = copied_data.size() / kDefaultBlockSize;
+    AnnotatedOperation aop;
+    *(aop.op.add_src_extents()) = ExtentForRange(0, num_blocks);
+    *(aop.op.add_dst_extents()) = ExtentForRange(0, num_blocks);
+    aop.op.set_type(InstallOperation::SOURCE_COPY);
+    brillo::Blob src_hash;
+    EXPECT_TRUE(HashCalculator::RawHashOfData(copied_data, &src_hash));
+    if (add_hash)
+      aop.op.set_src_sha256_hash(src_hash.data(), src_hash.size());
+
+    return aop;
+  }
+
+  brillo::Blob PerformSourceCopyOp(const InstallOperation& op,
+                                   const brillo::Blob blob_data) {
+    test_utils::ScopedTempFile source_partition("Blob-XXXXXX");
+    DirectExtentWriter extent_writer;
+    FileDescriptorPtr fd(new EintrSafeFileDescriptor());
+    EXPECT_TRUE(fd->Open(source_partition.path().c_str(), O_RDWR));
+    EXPECT_TRUE(extent_writer.Init(fd, op.src_extents(), kBlockSize));
+    EXPECT_TRUE(extent_writer.Write(blob_data.data(), blob_data.size()));
+
+    test_utils::ScopedTempFile target_partition("Blob-XXXXXX");
+
+    install_part_.source_path = source_partition.path();
+    install_part_.target_path = target_partition.path();
+    install_part_.source_size = blob_data.size();
+    install_part_.target_size = blob_data.size();
+
+    ErrorCode error;
+    EXPECT_TRUE(writer_.Init(&install_plan_, true));
+    EXPECT_TRUE(writer_.PerformSourceCopyOperation(op, &error));
+
+    brillo::Blob output_data;
+    EXPECT_TRUE(utils::ReadFile(target_partition.path(), &output_data));
+    return output_data;
+  }
+
+  FakePrefs prefs_{};
+  InstallPlan install_plan_{};
+  InstallPlan::Payload payload_{};
+  DynamicPartitionControlStub dynamic_control_{};
+  FileDescriptorPtr fake_ecc_fd_{};
+  DeltaArchiveManifest manifest_{};
+  PartitionUpdate partition_update_{};
+  InstallPlan::Partition install_part_{};
+  PartitionWriter writer_{
+      partition_update_, install_part_, &dynamic_control_, kBlockSize, false};
+};
+// Test that the error-corrected file descriptor is used to read a partition
+// when no hash is available for SOURCE_COPY but it falls back to the normal
+// file descriptor when the size of the error corrected one is too small.
+TEST_F(PartitionWriterTest, ErrorCorrectionSourceCopyWhenNoHashFallbackTest) {
+  constexpr size_t kCopyOperationSize = 4 * 4096;
+  test_utils::ScopedTempFile source("Source-XXXXXX");
+  // Setup the source path with the right expected data.
+  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
+  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), expected_data));
+
+  // Setup the fec file descriptor as the fake stream, with smaller data than
+  // the expected.
+  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize / 2);
+
+  PartitionConfig old_part(kPartitionNameRoot);
+  old_part.path = source.path();
+  old_part.size = expected_data.size();
+
+  // The payload operation doesn't include an operation hash.
+  auto source_copy_op = GenerateSourceCopyOp(expected_data, false, &old_part);
+
+  auto output_data = PerformSourceCopyOp(source_copy_op.op, expected_data);
+  ASSERT_EQ(output_data, expected_data);
+
+  // Verify that the fake_fec was attempted to be used. Since the file
+  // descriptor is shorter it can actually do more than one read to realize it
+  // reached the EOF.
+  EXPECT_LE(1U, fake_fec->GetReadOps().size());
+  // This fallback doesn't count as an error-corrected operation since the
+  // operation hash was not available.
+  EXPECT_EQ(0U, GetSourceEccRecoveredFailures());
+}
+
+// Test that the error-corrected file descriptor is used to read the partition
+// since the source partition doesn't match the operation hash.
+TEST_F(PartitionWriterTest, ErrorCorrectionSourceCopyFallbackTest) {
+  constexpr size_t kCopyOperationSize = 4 * 4096;
+  // Write invalid data to the source image, which doesn't match the expected
+  // hash.
+  brillo::Blob invalid_data(kCopyOperationSize, 0x55);
+
+  // Setup the fec file descriptor as the fake stream, which matches
+  // |expected_data|.
+  FakeFileDescriptor* fake_fec = SetFakeECCFile(kCopyOperationSize);
+  brillo::Blob expected_data = FakeFileDescriptorData(kCopyOperationSize);
+
+  auto source_copy_op = GenerateSourceCopyOp(expected_data, true);
+  auto output_data = PerformSourceCopyOp(source_copy_op.op, invalid_data);
+  ASSERT_EQ(output_data, expected_data);
+
+  // Verify that the fake_fec was actually used.
+  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
+  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
+}
+
+TEST_F(PartitionWriterTest, ChooseSourceFDTest) {
+  constexpr size_t kSourceSize = 4 * 4096;
+  test_utils::ScopedTempFile source("Source-XXXXXX");
+  // Write invalid data to the source image, which doesn't match the expected
+  // hash.
+  brillo::Blob invalid_data(kSourceSize, 0x55);
+  EXPECT_TRUE(test_utils::WriteFileVector(source.path(), invalid_data));
+
+  writer_.source_fd_ = std::make_shared<EintrSafeFileDescriptor>();
+  writer_.source_fd_->Open(source.path().c_str(), O_RDONLY);
+
+  // Setup the fec file descriptor as the fake stream, which matches
+  // |expected_data|.
+  FakeFileDescriptor* fake_fec = SetFakeECCFile(kSourceSize);
+  brillo::Blob expected_data = FakeFileDescriptorData(kSourceSize);
+
+  InstallOperation op;
+  *(op.add_src_extents()) = ExtentForRange(0, kSourceSize / 4096);
+  brillo::Blob src_hash;
+  EXPECT_TRUE(HashCalculator::RawHashOfData(expected_data, &src_hash));
+  op.set_src_sha256_hash(src_hash.data(), src_hash.size());
+
+  ErrorCode error = ErrorCode::kSuccess;
+  EXPECT_EQ(writer_.source_ecc_fd_, writer_.ChooseSourceFD(op, &error));
+  EXPECT_EQ(ErrorCode::kSuccess, error);
+  // Verify that the fake_fec was actually used.
+  EXPECT_EQ(1U, fake_fec->GetReadOps().size());
+  EXPECT_EQ(1U, GetSourceEccRecoveredFailures());
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_consumer/postinstall_runner_action.cc b/payload_consumer/postinstall_runner_action.cc
index 94d0392..e8fa81b 100644
--- a/payload_consumer/postinstall_runner_action.cc
+++ b/payload_consumer/postinstall_runner_action.cc
@@ -224,7 +224,6 @@
       progress_fd_,
       base::BindRepeating(&PostinstallRunnerAction::OnProgressFdReady,
                           base::Unretained(this)));
-
 }
 
 void PostinstallRunnerAction::OnProgressFdReady() {
diff --git a/payload_generator/blob_file_writer.cc b/payload_generator/blob_file_writer.cc
index 7cdeb35..a1afe87 100644
--- a/payload_generator/blob_file_writer.cc
+++ b/payload_generator/blob_file_writer.cc
@@ -38,9 +38,9 @@
   return result;
 }
 
-void BlobFileWriter::SetTotalBlobs(size_t total_blobs) {
-  total_blobs_ = total_blobs;
-  stored_blobs_ = 0;
+void BlobFileWriter::IncTotalBlobs(size_t increment) {
+  base::AutoLock auto_lock(blob_mutex_);
+  total_blobs_ += increment;
 }
 
 }  // namespace chromeos_update_engine
diff --git a/payload_generator/blob_file_writer.h b/payload_generator/blob_file_writer.h
index 48553be..bdd4c08 100644
--- a/payload_generator/blob_file_writer.h
+++ b/payload_generator/blob_file_writer.h
@@ -35,10 +35,8 @@
   // was stored, or -1 in case of failure.
   off_t StoreBlob(const brillo::Blob& blob);
 
-  // The number of |total_blobs| is the number of blobs that will be stored but
-  // is only used for logging purposes. If not set or set to 0, logging will be
-  // skipped. This function will also reset the number of stored blobs to 0.
-  void SetTotalBlobs(size_t total_blobs);
+  // Increase |total_blobs| by |increment|. Thread safe.
+  void IncTotalBlobs(size_t increment);
 
  private:
   size_t total_blobs_{0};
diff --git a/payload_generator/delta_diff_generator.cc b/payload_generator/delta_diff_generator.cc
index aa49252..c2b35ee 100644
--- a/payload_generator/delta_diff_generator.cc
+++ b/payload_generator/delta_diff_generator.cc
@@ -39,6 +39,7 @@
 #include "update_engine/payload_generator/blob_file_writer.h"
 #include "update_engine/payload_generator/delta_diff_utils.h"
 #include "update_engine/payload_generator/full_update_generator.h"
+#include "update_engine/payload_generator/merge_sequence_generator.h"
 #include "update_engine/payload_generator/payload_file.h"
 
 using std::string;
@@ -59,12 +60,14 @@
       const PartitionConfig& new_part,
       BlobFileWriter* file_writer,
       std::vector<AnnotatedOperation>* aops,
+      std::vector<CowMergeOperation>* cow_merge_sequence,
       std::unique_ptr<chromeos_update_engine::OperationsGenerator> strategy)
       : config_(config),
         old_part_(old_part),
         new_part_(new_part),
         file_writer_(file_writer),
         aops_(aops),
+        cow_merge_sequence_(cow_merge_sequence),
         strategy_(std::move(strategy)) {}
   PartitionProcessor(PartitionProcessor&&) noexcept = default;
   void Run() override {
@@ -78,6 +81,17 @@
       LOG(FATAL) << "GenerateOperations(" << old_part_.name << ", "
                  << new_part_.name << ") failed";
     }
+
+    bool snapshot_enabled =
+        config_.target.dynamic_partition_metadata &&
+        config_.target.dynamic_partition_metadata->snapshot_enabled();
+    if (old_part_.path.empty() || !snapshot_enabled) {
+      return;
+    }
+    auto generator = MergeSequenceGenerator::Create(*aops_);
+    if (!generator || !generator->Generate(cow_merge_sequence_)) {
+      LOG(FATAL) << "Failed to generate merge sequence";
+    }
   }
 
  private:
@@ -86,6 +100,7 @@
   const PartitionConfig& new_part_;
   BlobFileWriter* file_writer_;
   std::vector<AnnotatedOperation>* aops_;
+  std::vector<CowMergeOperation>* cow_merge_sequence_;
   std::unique_ptr<chromeos_update_engine::OperationsGenerator> strategy_;
   DISALLOW_COPY_AND_ASSIGN(PartitionProcessor);
 };
@@ -123,6 +138,8 @@
     PartitionConfig empty_part("");
     std::vector<std::vector<AnnotatedOperation>> all_aops;
     all_aops.resize(config.target.partitions.size());
+    std::vector<std::vector<CowMergeOperation>> all_merge_sequences;
+    all_merge_sequences.resize(config.target.partitions.size());
     std::vector<PartitionProcessor> partition_tasks{};
     auto thread_count = std::min<int>(diff_utils::GetMaxThreads(),
                                       config.target.partitions.size());
@@ -153,6 +170,7 @@
                                                    new_part,
                                                    &blob_file,
                                                    &all_aops[i],
+                                                   &all_merge_sequences[i],
                                                    std::move(strategy)));
     }
     thread_pool.Start();
@@ -166,7 +184,10 @@
           config.is_delta ? config.source.partitions[i] : empty_part;
       const PartitionConfig& new_part = config.target.partitions[i];
       TEST_AND_RETURN_FALSE(
-          payload.AddPartition(old_part, new_part, std::move(all_aops[i])));
+          payload.AddPartition(old_part,
+                               new_part,
+                               std::move(all_aops[i]),
+                               std::move(all_merge_sequences[i])));
     }
   }
 
diff --git a/payload_generator/extent_ranges.cc b/payload_generator/extent_ranges.cc
index 4600efe..2098639 100644
--- a/payload_generator/extent_ranges.cc
+++ b/payload_generator/extent_ranges.cc
@@ -202,6 +202,15 @@
   }
 }
 
+bool ExtentRanges::OverlapsWithExtent(const Extent& extent) const {
+  for (const auto& entry : extent_set_) {
+    if (ExtentsOverlap(entry, extent)) {
+      return true;
+    }
+  }
+  return false;
+}
+
 bool ExtentRanges::ContainsBlock(uint64_t block) const {
   auto lower = extent_set_.lower_bound(ExtentForRange(block, 1));
   // The block could be on the extent before the one in |lower|.
diff --git a/payload_generator/extent_ranges.h b/payload_generator/extent_ranges.h
index 62ffff4..68aa27f 100644
--- a/payload_generator/extent_ranges.h
+++ b/payload_generator/extent_ranges.h
@@ -63,6 +63,9 @@
   void AddRanges(const ExtentRanges& ranges);
   void SubtractRanges(const ExtentRanges& ranges);
 
+  // Returns true if the input extent overlaps with the current ExtentRanges.
+  bool OverlapsWithExtent(const Extent& extent) const;
+
   // Returns whether the block |block| is in this ExtentRange.
   bool ContainsBlock(uint64_t block) const;
 
diff --git a/payload_generator/full_update_generator.cc b/payload_generator/full_update_generator.cc
index 94a43ab..4a5f63a 100644
--- a/payload_generator/full_update_generator.cc
+++ b/payload_generator/full_update_generator.cc
@@ -153,7 +153,7 @@
   aops->resize(num_chunks);
   vector<ChunkProcessor> chunk_processors;
   chunk_processors.reserve(num_chunks);
-  blob_file->SetTotalBlobs(num_chunks);
+  blob_file->IncTotalBlobs(num_chunks);
 
   for (size_t i = 0; i < num_chunks; ++i) {
     size_t start_block = i * chunk_blocks;
@@ -187,9 +187,6 @@
     thread_pool.AddWork(&processor);
   thread_pool.JoinAll();
 
-  // All the work done, disable logging.
-  blob_file->SetTotalBlobs(0);
-
   // All the operations must have a type set at this point. Otherwise, a
   // ChunkProcessor failed to complete.
   for (const AnnotatedOperation& aop : *aops) {
diff --git a/payload_generator/generate_delta_main.cc b/payload_generator/generate_delta_main.cc
index 18cff4b..dd41a29 100644
--- a/payload_generator/generate_delta_main.cc
+++ b/payload_generator/generate_delta_main.cc
@@ -14,6 +14,7 @@
 // limitations under the License.
 //
 
+#include <map>
 #include <string>
 #include <vector>
 
@@ -22,6 +23,7 @@
 #include <base/logging.h>
 #include <base/strings/string_number_conversions.h>
 #include <base/strings/string_split.h>
+#include <base/strings/string_util.h>
 #include <brillo/flag_helper.h>
 #include <brillo/key_value_store.h>
 #include <brillo/message_loops/base_message_loop.h>
@@ -47,6 +49,7 @@
 // and an output file as arguments and the path to an output file and
 // generates a delta that can be sent to Chrome OS clients.
 
+using std::map;
 using std::string;
 using std::vector;
 
@@ -294,6 +297,39 @@
   return true;
 }
 
+template <typename Key, typename Val>
+string ToString(const map<Key, Val>& map) {
+  vector<string> result;
+  result.reserve(map.size());
+  for (const auto& it : map) {
+    result.emplace_back(it.first + ": " + it.second);
+  }
+  return "{" + base::JoinString(result, ",") + "}";
+}
+
+bool ParsePerPartitionTimestamps(const string& partition_timestamps,
+                                 PayloadGenerationConfig* config) {
+  base::StringPairs pairs;
+  CHECK(base::SplitStringIntoKeyValuePairs(
+      partition_timestamps, ':', ',', &pairs))
+      << "--partition_timestamps accepts commad "
+         "separated pairs. e.x. system:1234,vendor:5678";
+  map<string, string> partition_timestamps_map{
+      std::move_iterator(pairs.begin()), std::move_iterator(pairs.end())};
+  for (auto&& partition : config->target.partitions) {
+    auto&& it = partition_timestamps_map.find(partition.name);
+    if (it != partition_timestamps_map.end()) {
+      partition.version = std::move(it->second);
+      partition_timestamps_map.erase(it);
+    }
+  }
+  if (!partition_timestamps_map.empty()) {
+    LOG(ERROR) << "Unused timestamps: " << ToString(partition_timestamps_map);
+    return false;
+  }
+  return true;
+}
+
 int Main(int argc, char** argv) {
   DEFINE_string(old_image, "", "Path to the old rootfs");
   DEFINE_string(new_image, "", "Path to the new rootfs");
@@ -384,6 +420,11 @@
                0,
                "The maximum timestamp of the OS allowed to apply this "
                "payload.");
+  DEFINE_string(
+      partition_timestamps,
+      "",
+      "The per-partition maximum timestamps which the OS allowed to apply this "
+      "payload. Passed in comma separated pairs, e.x. system:1234,vendor:5678");
 
   DEFINE_string(old_channel,
                 "",
@@ -709,6 +750,10 @@
   }
 
   payload_config.max_timestamp = FLAGS_max_timestamp;
+  if (!FLAGS_partition_timestamps.empty()) {
+    CHECK(ParsePerPartitionTimestamps(FLAGS_partition_timestamps,
+                                      &payload_config));
+  }
 
   if (payload_config.is_delta &&
       payload_config.version.minor >= kVerityMinorPayloadVersion)
diff --git a/payload_generator/merge_sequence_generator.cc b/payload_generator/merge_sequence_generator.cc
new file mode 100644
index 0000000..eaffeac
--- /dev/null
+++ b/payload_generator/merge_sequence_generator.cc
@@ -0,0 +1,269 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/payload_generator/merge_sequence_generator.h"
+
+#include <algorithm>
+
+#include "update_engine/payload_generator/extent_utils.h"
+
+namespace chromeos_update_engine {
+
+CowMergeOperation CreateCowMergeOperation(const Extent& src_extent,
+                                          const Extent& dst_extent) {
+  CowMergeOperation ret;
+  ret.set_type(CowMergeOperation::COW_COPY);
+  *ret.mutable_src_extent() = src_extent;
+  *ret.mutable_dst_extent() = dst_extent;
+  return ret;
+}
+
+std::ostream& operator<<(std::ostream& os,
+                         const CowMergeOperation& merge_operation) {
+  os << "CowMergeOperation src extent: "
+     << ExtentsToString({merge_operation.src_extent()})
+     << ", dst extent: " << ExtentsToString({merge_operation.dst_extent()});
+  return os;
+}
+
+// The OTA generation guarantees that all blocks in the dst extent will be
+// written only once. So we can use it to order the CowMergeOperation.
+bool operator<(const CowMergeOperation& op1, const CowMergeOperation& op2) {
+  return op1.dst_extent().start_block() < op2.dst_extent().start_block();
+}
+
+bool operator==(const CowMergeOperation& op1, const CowMergeOperation& op2) {
+  return op1.type() == op2.type() && op1.src_extent() == op2.src_extent() &&
+         op1.dst_extent() == op2.dst_extent();
+}
+
+std::unique_ptr<MergeSequenceGenerator> MergeSequenceGenerator::Create(
+    const std::vector<AnnotatedOperation>& aops) {
+  std::vector<CowMergeOperation> sequence;
+  for (const auto& aop : aops) {
+    // Only handle SOURCE_COPY now for the cow size optimization.
+    if (aop.op.type() != InstallOperation::SOURCE_COPY) {
+      continue;
+    }
+    if (aop.op.dst_extents().size() != 1) {
+      std::vector<Extent> out_extents;
+      ExtentsToVector(aop.op.dst_extents(), &out_extents);
+      LOG(ERROR) << "The dst extents for source_copy expects to be contiguous,"
+                 << " dst extents: " << ExtentsToString(out_extents);
+      return nullptr;
+    }
+
+    // Split the source extents.
+    size_t used_blocks = 0;
+    for (const auto& src_extent : aop.op.src_extents()) {
+      // The dst_extent in the merge sequence will be a subset of
+      // InstallOperation's dst_extent. This will simplify the OTA -> COW
+      // conversion when we install the payload.
+      Extent dst_extent =
+          ExtentForRange(aop.op.dst_extents(0).start_block() + used_blocks,
+                         src_extent.num_blocks());
+      sequence.emplace_back(CreateCowMergeOperation(src_extent, dst_extent));
+      used_blocks += src_extent.num_blocks();
+    }
+
+    if (used_blocks != aop.op.dst_extents(0).num_blocks()) {
+      LOG(ERROR) << "Number of blocks in src extents doesn't equal to the"
+                 << " ones in the dst extents, src blocks " << used_blocks
+                 << ", dst blocks " << aop.op.dst_extents(0).num_blocks();
+      return nullptr;
+    }
+  }
+
+  std::sort(sequence.begin(), sequence.end());
+  return std::unique_ptr<MergeSequenceGenerator>(
+      new MergeSequenceGenerator(sequence));
+}
+
+bool MergeSequenceGenerator::FindDependency(
+    std::map<CowMergeOperation, std::set<CowMergeOperation>>* result) const {
+  CHECK(result);
+  LOG(INFO) << "Finding dependencies";
+
+  // Since the OTA operation may reuse some source blocks, use the binary
+  // search on sorted dst extents to find overlaps.
+  std::map<CowMergeOperation, std::set<CowMergeOperation>> merge_after;
+  for (const auto& op : operations_) {
+    // lower bound (inclusive): dst extent's end block >= src extent's start
+    // block.
+    const auto lower_it = std::lower_bound(
+        operations_.begin(),
+        operations_.end(),
+        op,
+        [](const CowMergeOperation& it, const CowMergeOperation& op) {
+          auto dst_end_block =
+              it.dst_extent().start_block() + it.dst_extent().num_blocks() - 1;
+          return dst_end_block < op.src_extent().start_block();
+        });
+    // upper bound: dst extent's start block > src extent's end block
+    const auto upper_it = std::upper_bound(
+        lower_it,
+        operations_.end(),
+        op,
+        [](const CowMergeOperation& op, const CowMergeOperation& it) {
+          auto src_end_block =
+              op.src_extent().start_block() + op.src_extent().num_blocks() - 1;
+          return src_end_block < it.dst_extent().start_block();
+        });
+
+    // TODO(xunchang) skip inserting the empty set to merge_after.
+    if (lower_it == upper_it) {
+      merge_after.insert({op, {}});
+    } else {
+      std::set<CowMergeOperation> operations(lower_it, upper_it);
+      auto it = operations.find(op);
+      if (it != operations.end()) {
+        LOG(INFO) << "Self overlapping " << op;
+        operations.erase(it);
+      }
+      auto ret = merge_after.emplace(op, std::move(operations));
+      // Check the insertion indeed happens.
+      CHECK(ret.second);
+    }
+  }
+
+  *result = std::move(merge_after);
+  return true;
+}
+
+bool MergeSequenceGenerator::Generate(
+    std::vector<CowMergeOperation>* sequence) const {
+  sequence->clear();
+  std::map<CowMergeOperation, std::set<CowMergeOperation>> merge_after;
+  if (!FindDependency(&merge_after)) {
+    LOG(ERROR) << "Failed to find dependencies";
+    return false;
+  }
+
+  LOG(INFO) << "Generating sequence";
+
+  // Use the non-DFS version of the topology sort. So we can control the
+  // operations to discard to break cycles; thus yielding a deterministic
+  // sequence.
+  std::map<CowMergeOperation, int> incoming_edges;
+  for (const auto& it : merge_after) {
+    for (const auto& blocked : it.second) {
+      // Value is default initialized to 0.
+      incoming_edges[blocked] += 1;
+    }
+  }
+
+  std::set<CowMergeOperation> free_operations;
+  for (const auto& op : operations_) {
+    if (incoming_edges.find(op) == incoming_edges.end()) {
+      free_operations.insert(op);
+    }
+  }
+
+  std::vector<CowMergeOperation> merge_sequence;
+  std::set<CowMergeOperation> convert_to_raw;
+  while (!incoming_edges.empty()) {
+    if (!free_operations.empty()) {
+      merge_sequence.insert(
+          merge_sequence.end(), free_operations.begin(), free_operations.end());
+    } else {
+      auto to_convert = incoming_edges.begin()->first;
+      free_operations.insert(to_convert);
+      convert_to_raw.insert(to_convert);
+      LOG(INFO) << "Converting operation to raw " << to_convert;
+    }
+
+    std::set<CowMergeOperation> next_free_operations;
+    for (const auto& op : free_operations) {
+      incoming_edges.erase(op);
+
+      // Now that this particular operation is merged, other operations blocked
+      // by this one may be free. Decrement the count of blocking operations,
+      // and set up the free operations for the next iteration.
+      for (const auto& blocked : merge_after[op]) {
+        auto it = incoming_edges.find(blocked);
+        if (it == incoming_edges.end()) {
+          continue;
+        }
+
+        auto blocking_transfer_count = &it->second;
+        if (*blocking_transfer_count <= 0) {
+          LOG(ERROR) << "Unexpected count in merge after map "
+                     << blocking_transfer_count;
+          return false;
+        }
+        // This operation is no longer blocked by anyone. Add it to the merge
+        // sequence in the next iteration.
+        *blocking_transfer_count -= 1;
+        if (*blocking_transfer_count == 0) {
+          next_free_operations.insert(blocked);
+        }
+      }
+    }
+
+    LOG(INFO) << "Remaining transfers " << incoming_edges.size()
+              << ", free transfers " << free_operations.size()
+              << ", merge_sequence size " << merge_sequence.size();
+    free_operations = std::move(next_free_operations);
+  }
+
+  if (!free_operations.empty()) {
+    merge_sequence.insert(
+        merge_sequence.end(), free_operations.begin(), free_operations.end());
+  }
+
+  CHECK_EQ(operations_.size(), merge_sequence.size() + convert_to_raw.size());
+
+  size_t blocks_in_sequence = 0;
+  for (const CowMergeOperation& transfer : merge_sequence) {
+    blocks_in_sequence += transfer.dst_extent().num_blocks();
+  }
+
+  size_t blocks_in_raw = 0;
+  for (const CowMergeOperation& transfer : convert_to_raw) {
+    blocks_in_raw += transfer.dst_extent().num_blocks();
+  }
+
+  LOG(INFO) << "Blocks in merge sequence " << blocks_in_sequence
+            << ", blocks in raw " << blocks_in_raw;
+  if (!ValidateSequence(merge_sequence)) {
+    return false;
+  }
+
+  *sequence = std::move(merge_sequence);
+  return true;
+}
+
+bool MergeSequenceGenerator::ValidateSequence(
+    const std::vector<CowMergeOperation>& sequence) {
+  LOG(INFO) << "Validating merge sequence";
+  ExtentRanges visited;
+  for (const auto& op : sequence) {
+    if (visited.OverlapsWithExtent(op.src_extent())) {
+      LOG(ERROR) << "Transfer violates the merge sequence " << op
+                 << "Visited extent ranges: ";
+      visited.Dump();
+      return false;
+    }
+
+    CHECK(!visited.OverlapsWithExtent(op.dst_extent()))
+        << "dst extent should write only once.";
+    visited.AddExtent(op.dst_extent());
+  }
+
+  return true;
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_generator/merge_sequence_generator.h b/payload_generator/merge_sequence_generator.h
new file mode 100644
index 0000000..bc0158e
--- /dev/null
+++ b/payload_generator/merge_sequence_generator.h
@@ -0,0 +1,74 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#ifndef UPDATE_ENGINE_PAYLOAD_GENERATOR_MERGE_SEQUENCE_GENERATOR_H_
+#define UPDATE_ENGINE_PAYLOAD_GENERATOR_MERGE_SEQUENCE_GENERATOR_H_
+
+#include <map>
+#include <memory>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "update_engine/payload_generator/annotated_operation.h"
+#include "update_engine/payload_generator/extent_ranges.h"
+#include "update_engine/payload_generator/extent_utils.h"
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+// Constructs CowMergeOperation from src & dst extents
+CowMergeOperation CreateCowMergeOperation(const Extent& src_extent,
+                                          const Extent& dst_extent);
+
+// Comparator for CowMergeOperation.
+bool operator<(const CowMergeOperation& op1, const CowMergeOperation& op2);
+bool operator==(const CowMergeOperation& op1, const CowMergeOperation& op2);
+
+std::ostream& operator<<(std::ostream& os,
+                         const CowMergeOperation& merge_operation);
+
+// This class takes a list of CowMergeOperations; and sorts them so that no
+// read after write will happen by following the sequence. When there is a
+// cycle, we will omit some operations in the list. Therefore, the result
+// sequence may not contain all blocks in the input list.
+class MergeSequenceGenerator {
+ public:
+  // Creates an object from a list of OTA InstallOperations. Returns nullptr on
+  // failure.
+  static std::unique_ptr<MergeSequenceGenerator> Create(
+      const std::vector<AnnotatedOperation>& aops);
+  // Checks that no read after write happens in the given sequence.
+  static bool ValidateSequence(const std::vector<CowMergeOperation>& sequence);
+
+  // Generates a merge sequence from |operations_|, puts the result in
+  // |sequence|. Returns false on failure.
+  bool Generate(std::vector<CowMergeOperation>* sequence) const;
+
+ private:
+  friend class MergeSequenceGeneratorTest;
+  explicit MergeSequenceGenerator(std::vector<CowMergeOperation> transfers)
+      : operations_(std::move(transfers)) {}
+
+  // For a given merge operation, finds all the operations that should merge
+  // after myself. Put the result in |merge_after|.
+  bool FindDependency(std::map<CowMergeOperation, std::set<CowMergeOperation>>*
+                          merge_after) const;
+  // The list of CowMergeOperations to sort.
+  std::vector<CowMergeOperation> operations_;
+};
+
+}  // namespace chromeos_update_engine
+#endif
diff --git a/payload_generator/merge_sequence_generator_unittest.cc b/payload_generator/merge_sequence_generator_unittest.cc
new file mode 100644
index 0000000..567ede1
--- /dev/null
+++ b/payload_generator/merge_sequence_generator_unittest.cc
@@ -0,0 +1,196 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <algorithm>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "update_engine/payload_consumer/payload_constants.h"
+#include "update_engine/payload_generator/extent_utils.h"
+#include "update_engine/payload_generator/merge_sequence_generator.h"
+
+namespace chromeos_update_engine {
+class MergeSequenceGeneratorTest : public ::testing::Test {
+ protected:
+  void VerifyTransfers(MergeSequenceGenerator* generator,
+                       const std::vector<CowMergeOperation>& expected) {
+    ASSERT_EQ(expected, generator->operations_);
+  }
+
+  void FindDependency(
+      std::vector<CowMergeOperation> transfers,
+      std::map<CowMergeOperation, std::set<CowMergeOperation>>* result) {
+    std::sort(transfers.begin(), transfers.end());
+    MergeSequenceGenerator generator(std::move(transfers));
+    ASSERT_TRUE(generator.FindDependency(result));
+  }
+
+  void GenerateSequence(std::vector<CowMergeOperation> transfers,
+                        const std::vector<CowMergeOperation>& expected) {
+    std::sort(transfers.begin(), transfers.end());
+    MergeSequenceGenerator generator(std::move(transfers));
+    std::vector<CowMergeOperation> sequence;
+    ASSERT_TRUE(generator.Generate(&sequence));
+    ASSERT_EQ(expected, sequence);
+  }
+};
+
+TEST_F(MergeSequenceGeneratorTest, Create) {
+  std::vector<AnnotatedOperation> aops{{"file1", {}}, {"file2", {}}};
+  aops[0].op.set_type(InstallOperation::SOURCE_COPY);
+  *aops[0].op.add_src_extents() = ExtentForRange(10, 10);
+  *aops[0].op.add_dst_extents() = ExtentForRange(30, 10);
+
+  aops[1].op.set_type(InstallOperation::SOURCE_COPY);
+  *aops[1].op.add_src_extents() = ExtentForRange(20, 10);
+  *aops[1].op.add_dst_extents() = ExtentForRange(40, 10);
+
+  auto generator = MergeSequenceGenerator::Create(aops);
+  ASSERT_TRUE(generator);
+  std::vector<CowMergeOperation> expected = {
+      CreateCowMergeOperation(ExtentForRange(10, 10), ExtentForRange(30, 10)),
+      CreateCowMergeOperation(ExtentForRange(20, 10), ExtentForRange(40, 10))};
+  VerifyTransfers(generator.get(), expected);
+
+  *aops[1].op.add_src_extents() = ExtentForRange(30, 5);
+  *aops[1].op.add_dst_extents() = ExtentForRange(50, 5);
+  generator = MergeSequenceGenerator::Create(aops);
+  ASSERT_FALSE(generator);
+}
+
+TEST_F(MergeSequenceGeneratorTest, Create_SplitSource) {
+  InstallOperation op;
+  op.set_type(InstallOperation::SOURCE_COPY);
+  *(op.add_src_extents()) = ExtentForRange(2, 3);
+  *(op.add_src_extents()) = ExtentForRange(6, 1);
+  *(op.add_src_extents()) = ExtentForRange(8, 4);
+  *(op.add_dst_extents()) = ExtentForRange(10, 8);
+
+  AnnotatedOperation aop{"file1", op};
+  auto generator = MergeSequenceGenerator::Create({aop});
+  ASSERT_TRUE(generator);
+  std::vector<CowMergeOperation> expected = {
+      CreateCowMergeOperation(ExtentForRange(2, 3), ExtentForRange(10, 3)),
+      CreateCowMergeOperation(ExtentForRange(6, 1), ExtentForRange(13, 1)),
+      CreateCowMergeOperation(ExtentForRange(8, 4), ExtentForRange(14, 4))};
+  VerifyTransfers(generator.get(), expected);
+}
+
+TEST_F(MergeSequenceGeneratorTest, FindDependency) {
+  std::vector<CowMergeOperation> transfers = {
+      CreateCowMergeOperation(ExtentForRange(10, 10), ExtentForRange(15, 10)),
+      CreateCowMergeOperation(ExtentForRange(40, 10), ExtentForRange(50, 10)),
+  };
+
+  std::map<CowMergeOperation, std::set<CowMergeOperation>> merge_after;
+  FindDependency(transfers, &merge_after);
+  ASSERT_EQ(std::set<CowMergeOperation>(), merge_after.at(transfers[0]));
+  ASSERT_EQ(std::set<CowMergeOperation>(), merge_after.at(transfers[1]));
+
+  transfers = {
+      CreateCowMergeOperation(ExtentForRange(10, 10), ExtentForRange(25, 10)),
+      CreateCowMergeOperation(ExtentForRange(24, 5), ExtentForRange(35, 5)),
+      CreateCowMergeOperation(ExtentForRange(30, 10), ExtentForRange(15, 10)),
+  };
+
+  FindDependency(transfers, &merge_after);
+  ASSERT_EQ(std::set<CowMergeOperation>({transfers[2]}),
+            merge_after.at(transfers[0]));
+  ASSERT_EQ(std::set<CowMergeOperation>({transfers[0], transfers[2]}),
+            merge_after.at(transfers[1]));
+  ASSERT_EQ(std::set<CowMergeOperation>({transfers[0], transfers[1]}),
+            merge_after.at(transfers[2]));
+}
+
+TEST_F(MergeSequenceGeneratorTest, FindDependency_ReusedSourceBlocks) {
+  std::vector<CowMergeOperation> transfers = {
+      CreateCowMergeOperation(ExtentForRange(5, 10), ExtentForRange(15, 10)),
+      CreateCowMergeOperation(ExtentForRange(6, 5), ExtentForRange(30, 5)),
+      CreateCowMergeOperation(ExtentForRange(50, 5), ExtentForRange(5, 5)),
+  };
+
+  std::map<CowMergeOperation, std::set<CowMergeOperation>> merge_after;
+  FindDependency(transfers, &merge_after);
+  ASSERT_EQ(std::set<CowMergeOperation>({transfers[2]}),
+            merge_after.at(transfers[0]));
+  ASSERT_EQ(std::set<CowMergeOperation>({transfers[2]}),
+            merge_after.at(transfers[1]));
+}
+
+TEST_F(MergeSequenceGeneratorTest, ValidateSequence) {
+  std::vector<CowMergeOperation> transfers = {
+      CreateCowMergeOperation(ExtentForRange(10, 10), ExtentForRange(15, 10)),
+      CreateCowMergeOperation(ExtentForRange(30, 10), ExtentForRange(40, 10)),
+  };
+
+  // Self overlapping
+  ASSERT_TRUE(MergeSequenceGenerator::ValidateSequence(transfers));
+
+  transfers = {
+      CreateCowMergeOperation(ExtentForRange(30, 10), ExtentForRange(20, 10)),
+      CreateCowMergeOperation(ExtentForRange(15, 10), ExtentForRange(10, 10)),
+  };
+  ASSERT_FALSE(MergeSequenceGenerator::ValidateSequence(transfers));
+}
+
+TEST_F(MergeSequenceGeneratorTest, GenerateSequenceNoCycles) {
+  std::vector<CowMergeOperation> transfers = {
+      CreateCowMergeOperation(ExtentForRange(10, 10), ExtentForRange(15, 10)),
+      // file3 should merge before file2
+      CreateCowMergeOperation(ExtentForRange(40, 5), ExtentForRange(25, 5)),
+      CreateCowMergeOperation(ExtentForRange(25, 10), ExtentForRange(30, 10)),
+  };
+
+  std::vector<CowMergeOperation> expected{
+      transfers[0], transfers[2], transfers[1]};
+  GenerateSequence(transfers, expected);
+}
+
+TEST_F(MergeSequenceGeneratorTest, GenerateSequenceWithCycles) {
+  std::vector<CowMergeOperation> transfers = {
+      CreateCowMergeOperation(ExtentForRange(25, 10), ExtentForRange(30, 10)),
+      CreateCowMergeOperation(ExtentForRange(30, 10), ExtentForRange(40, 10)),
+      CreateCowMergeOperation(ExtentForRange(40, 10), ExtentForRange(25, 10)),
+      CreateCowMergeOperation(ExtentForRange(10, 10), ExtentForRange(15, 10)),
+  };
+
+  // file 1,2,3 form a cycle. And file3, whose dst ext has smallest offset, will
+  // be converted to raw blocks
+  std::vector<CowMergeOperation> expected{
+      transfers[3], transfers[1], transfers[0]};
+  GenerateSequence(transfers, expected);
+}
+
+TEST_F(MergeSequenceGeneratorTest, GenerateSequenceMultipleCycles) {
+  std::vector<CowMergeOperation> transfers = {
+      // cycle 1
+      CreateCowMergeOperation(ExtentForRange(10, 10), ExtentForRange(25, 10)),
+      CreateCowMergeOperation(ExtentForRange(24, 5), ExtentForRange(35, 5)),
+      CreateCowMergeOperation(ExtentForRange(30, 10), ExtentForRange(15, 10)),
+      // cycle 2
+      CreateCowMergeOperation(ExtentForRange(55, 10), ExtentForRange(60, 10)),
+      CreateCowMergeOperation(ExtentForRange(60, 10), ExtentForRange(70, 10)),
+      CreateCowMergeOperation(ExtentForRange(70, 10), ExtentForRange(55, 10)),
+  };
+
+  // file 3, 6 will be converted to raw.
+  std::vector<CowMergeOperation> expected{
+      transfers[1], transfers[0], transfers[4], transfers[3]};
+  GenerateSequence(transfers, expected);
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_generator/payload_file.cc b/payload_generator/payload_file.cc
index c1594c7..49dff4e 100644
--- a/payload_generator/payload_file.cc
+++ b/payload_generator/payload_file.cc
@@ -86,12 +86,15 @@
 
 bool PayloadFile::AddPartition(const PartitionConfig& old_conf,
                                const PartitionConfig& new_conf,
-                               vector<AnnotatedOperation> aops) {
+                               vector<AnnotatedOperation> aops,
+                               vector<CowMergeOperation> merge_sequence) {
   Partition part;
   part.name = new_conf.name;
   part.aops = std::move(aops);
+  part.cow_merge_sequence = std::move(merge_sequence);
   part.postinstall = new_conf.postinstall;
   part.verity = new_conf.verity;
+  part.version = new_conf.version;
   // Initialize the PartitionInfo objects if present.
   if (!old_conf.path.empty())
     TEST_AND_RETURN_FALSE(
@@ -132,6 +135,9 @@
   for (const auto& part : part_vec_) {
     PartitionUpdate* partition = manifest_.add_partitions();
     partition->set_partition_name(part.name);
+    if (!part.version.empty()) {
+      partition->set_version(part.version);
+    }
     if (part.postinstall.run) {
       partition->set_run_postinstall(true);
       if (!part.postinstall.path.empty())
@@ -159,6 +165,10 @@
     for (const AnnotatedOperation& aop : part.aops) {
       *partition->add_operations() = aop.op;
     }
+    for (const auto& merge_op : part.cow_merge_sequence) {
+      *partition->add_merge_operations() = merge_op;
+    }
+
     if (part.old_info.has_size() || part.old_info.has_hash())
       *(partition->mutable_old_partition_info()) = part.old_info;
     if (part.new_info.has_size() || part.new_info.has_hash())
diff --git a/payload_generator/payload_file.h b/payload_generator/payload_file.h
index d1f8196..8b17956 100644
--- a/payload_generator/payload_file.h
+++ b/payload_generator/payload_file.h
@@ -43,7 +43,8 @@
   // reference a blob stored in the file provided to WritePayload().
   bool AddPartition(const PartitionConfig& old_conf,
                     const PartitionConfig& new_conf,
-                    std::vector<AnnotatedOperation> aops);
+                    std::vector<AnnotatedOperation> aops,
+                    std::vector<CowMergeOperation> merge_sequence);
 
   // Write the payload to the |payload_file| file. The operations reference
   // blobs in the |data_blobs_path| file and the blobs will be reordered in the
@@ -90,12 +91,15 @@
 
     // The operations to be performed to this partition.
     std::vector<AnnotatedOperation> aops;
+    std::vector<CowMergeOperation> cow_merge_sequence;
 
     PartitionInfo old_info;
     PartitionInfo new_info;
 
     PostInstallConfig postinstall;
     VerityConfig verity;
+    // Per partition timestamp.
+    std::string version;
   };
 
   std::vector<Partition> part_vec_;
diff --git a/payload_generator/payload_generation_config.h b/payload_generator/payload_generation_config.h
index 9abb97f..ec63043 100644
--- a/payload_generator/payload_generation_config.h
+++ b/payload_generator/payload_generation_config.h
@@ -119,6 +119,9 @@
 
   // Enables the on device fec data computation by default.
   bool disable_fec_computation = false;
+
+  // Per-partition version, usually a number representing timestamp.
+  std::string version;
 };
 
 // The ImageConfig struct describes a pair of binaries kernel and rootfs and the
diff --git a/payload_generator/payload_properties_unittest.cc b/payload_generator/payload_properties_unittest.cc
index db3902c..e0072fc 100644
--- a/payload_generator/payload_properties_unittest.cc
+++ b/payload_generator/payload_properties_unittest.cc
@@ -98,7 +98,7 @@
     EXPECT_TRUE(strategy->GenerateOperations(
         config, old_part, new_part, &blob_file_writer, &aops));
 
-    payload.AddPartition(old_part, new_part, aops);
+    payload.AddPartition(old_part, new_part, aops, {});
 
     uint64_t metadata_size;
     EXPECT_TRUE(payload.WritePayload(
diff --git a/scripts/brillo_update_payload b/scripts/brillo_update_payload
index 9bae74e..3bc87bd 100755
--- a/scripts/brillo_update_payload
+++ b/scripts/brillo_update_payload
@@ -186,6 +186,10 @@
     "Optional: The maximum unix timestamp of the OS allowed to apply this \
 payload, should be set to a number higher than the build timestamp of the \
 system running on the device, 0 if not specified."
+  DEFINE_string partition_timestamps "" \
+    "Optional: Per-partition maximum unix timestamp of the OS allowed to \
+apply this payload. Should be a comma separated key value pairs. e.x.\
+system:1234,vendor:456"
   DEFINE_string disable_fec_computation "" \
     "Optional: Disables the on device fec data computation for incremental \
 update. This feature is enabled by default."
@@ -696,6 +700,10 @@
     GENERATOR_ARGS+=( --max_timestamp="${FLAGS_max_timestamp}" )
   fi
 
+  if [[ -n "${FLAGS_partition_timestamps}" ]]; then
+    GENERATOR_ARGS+=( --partition_timestamps="${FLAGS_partition_timestamps}" )
+  fi
+
   if [[ -n "${POSTINSTALL_CONFIG_FILE}" ]]; then
     GENERATOR_ARGS+=(
       --new_postinstall_config_file="${POSTINSTALL_CONFIG_FILE}"
diff --git a/scripts/payload_info.py b/scripts/payload_info.py
index 965bb76..7625ee8 100755
--- a/scripts/payload_info.py
+++ b/scripts/payload_info.py
@@ -74,7 +74,9 @@
     for partition in manifest.partitions:
       DisplayValue('  Number of "%s" ops' % partition.partition_name,
                    len(partition.operations))
-
+    for partition in manifest.partitions:
+      DisplayValue("Timestamp for " +
+                   partition.partition_name, partition.version)
     DisplayValue('Block size', manifest.block_size)
     DisplayValue('Minor version', manifest.minor_version)
 
diff --git a/scripts/update_device.py b/scripts/update_device.py
index 1cd4b6a..756d443 100755
--- a/scripts/update_device.py
+++ b/scripts/update_device.py
@@ -17,6 +17,7 @@
 
 """Send an A/B update to an Android device over adb."""
 
+from __future__ import print_function
 from __future__ import absolute_import
 
 import argparse
@@ -305,6 +306,7 @@
     logging.info('Server Terminated')
 
   def StopServer(self):
+    self._httpd.shutdown()
     self._httpd.socket.close()
 
 
@@ -318,13 +320,13 @@
   """Return the command to run to start the update in the Android device."""
   ota = AndroidOTAPackage(ota_filename, secondary)
   headers = ota.properties
-  headers += 'USER_AGENT=Dalvik (something, something)\n'
-  headers += 'NETWORK_ID=0\n'
-  headers += extra_headers
+  headers += b'USER_AGENT=Dalvik (something, something)\n'
+  headers += b'NETWORK_ID=0\n'
+  headers += extra_headers.encode()
 
   return ['update_engine_client', '--update', '--follow',
           '--payload=%s' % payload_url, '--offset=%d' % ota.offset,
-          '--size=%d' % ota.size, '--headers="%s"' % headers]
+          '--size=%d' % ota.size, '--headers="%s"' % headers.decode()]
 
 
 def OmahaUpdateCommand(omaha_url):
diff --git a/scripts/update_payload/payload.py b/scripts/update_payload/payload.py
index ea5ed30..78b8e2c 100644
--- a/scripts/update_payload/payload.py
+++ b/scripts/update_payload/payload.py
@@ -20,7 +20,9 @@
 from __future__ import print_function
 
 import hashlib
+import io
 import struct
+import zipfile
 
 from update_payload import applier
 from update_payload import checker
@@ -119,6 +121,10 @@
       payload_file: update payload file object open for reading
       payload_file_offset: the offset of the actual payload
     """
+    if zipfile.is_zipfile(payload_file):
+      with zipfile.ZipFile(payload_file) as zfp:
+        with zfp.open("payload.bin") as payload_fp:
+          payload_file = io.BytesIO(payload_fp.read())
     self.payload_file = payload_file
     self.payload_file_offset = payload_file_offset
     self.manifest_hasher = None
diff --git a/scripts/update_payload/update_metadata_pb2.py b/scripts/update_payload/update_metadata_pb2.py
index d41c1da..841cd22 100644
--- a/scripts/update_payload/update_metadata_pb2.py
+++ b/scripts/update_payload/update_metadata_pb2.py
@@ -2,8 +2,6 @@
 # Generated by the protocol buffer compiler.  DO NOT EDIT!
 # source: update_metadata.proto
 
-import sys
-_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
 from google.protobuf import descriptor as _descriptor
 from google.protobuf import message as _message
 from google.protobuf import reflection as _reflection
@@ -19,8 +17,8 @@
   name='update_metadata.proto',
   package='chromeos_update_engine',
   syntax='proto2',
-  serialized_options=_b('H\003'),
-  serialized_pb=_b('\n\x15update_metadata.proto\x12\x16\x63hromeos_update_engine\"1\n\x06\x45xtent\x12\x13\n\x0bstart_block\x18\x01 \x01(\x04\x12\x12\n\nnum_blocks\x18\x02 \x01(\x04\"\x9f\x01\n\nSignatures\x12@\n\nsignatures\x18\x01 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x1aO\n\tSignature\x12\x13\n\x07version\x18\x01 \x01(\rB\x02\x18\x01\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\x1f\n\x17unpadded_signature_size\x18\x03 \x01(\x07\"+\n\rPartitionInfo\x12\x0c\n\x04size\x18\x01 \x01(\x04\x12\x0c\n\x04hash\x18\x02 \x01(\x0c\"w\n\tImageInfo\x12\r\n\x05\x62oard\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\x12\x0f\n\x07\x63hannel\x18\x03 \x01(\t\x12\x0f\n\x07version\x18\x04 \x01(\t\x12\x15\n\rbuild_channel\x18\x05 \x01(\t\x12\x15\n\rbuild_version\x18\x06 \x01(\t\"\xee\x03\n\x10InstallOperation\x12;\n\x04type\x18\x01 \x02(\x0e\x32-.chromeos_update_engine.InstallOperation.Type\x12\x13\n\x0b\x64\x61ta_offset\x18\x02 \x01(\x04\x12\x13\n\x0b\x64\x61ta_length\x18\x03 \x01(\x04\x12\x33\n\x0bsrc_extents\x18\x04 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\nsrc_length\x18\x05 \x01(\x04\x12\x33\n\x0b\x64st_extents\x18\x06 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\ndst_length\x18\x07 \x01(\x04\x12\x18\n\x10\x64\x61ta_sha256_hash\x18\x08 \x01(\x0c\x12\x17\n\x0fsrc_sha256_hash\x18\t \x01(\x0c\"\xad\x01\n\x04Type\x12\x0b\n\x07REPLACE\x10\x00\x12\x0e\n\nREPLACE_BZ\x10\x01\x12\x0c\n\x04MOVE\x10\x02\x1a\x02\x08\x01\x12\x0e\n\x06\x42SDIFF\x10\x03\x1a\x02\x08\x01\x12\x0f\n\x0bSOURCE_COPY\x10\x04\x12\x11\n\rSOURCE_BSDIFF\x10\x05\x12\x0e\n\nREPLACE_XZ\x10\x08\x12\x08\n\x04ZERO\x10\x06\x12\x0b\n\x07\x44ISCARD\x10\x07\x12\x11\n\rBROTLI_BSDIFF\x10\n\x12\x0c\n\x08PUFFDIFF\x10\t\"\xd7\x05\n\x0fPartitionUpdate\x12\x16\n\x0epartition_name\x18\x01 \x02(\t\x12\x17\n\x0frun_postinstall\x18\x02 \x01(\x08\x12\x18\n\x10postinstall_path\x18\x03 \x01(\t\x12\x17\n\x0f\x66ilesystem_type\x18\x04 \x01(\t\x12M\n\x17new_partition_signature\x18\x05 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x12\x41\n\x12old_partition_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12\x41\n\x12new_partition_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12<\n\noperations\x18\x08 \x03(\x0b\x32(.chromeos_update_engine.InstallOperation\x12\x1c\n\x14postinstall_optional\x18\t \x01(\x08\x12=\n\x15hash_tree_data_extent\x18\n \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x38\n\x10hash_tree_extent\x18\x0b \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x1b\n\x13hash_tree_algorithm\x18\x0c \x01(\t\x12\x16\n\x0ehash_tree_salt\x18\r \x01(\x0c\x12\x37\n\x0f\x66\x65\x63_data_extent\x18\x0e \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x32\n\nfec_extent\x18\x0f \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x14\n\tfec_roots\x18\x10 \x01(\r:\x01\x32\"L\n\x15\x44ynamicPartitionGroup\x12\x0c\n\x04name\x18\x01 \x02(\t\x12\x0c\n\x04size\x18\x02 \x01(\x04\x12\x17\n\x0fpartition_names\x18\x03 \x03(\t\"s\n\x18\x44ynamicPartitionMetadata\x12=\n\x06groups\x18\x01 \x03(\x0b\x32-.chromeos_update_engine.DynamicPartitionGroup\x12\x18\n\x10snapshot_enabled\x18\x02 \x01(\x08\"\xe1\x06\n\x14\x44\x65ltaArchiveManifest\x12H\n\x12install_operations\x18\x01 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12O\n\x19kernel_install_operations\x18\x02 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12\x18\n\nblock_size\x18\x03 \x01(\r:\x04\x34\x30\x39\x36\x12\x19\n\x11signatures_offset\x18\x04 \x01(\x04\x12\x17\n\x0fsignatures_size\x18\x05 \x01(\x04\x12\x42\n\x0fold_kernel_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_kernel_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fold_rootfs_info\x18\x08 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_rootfs_info\x18\t \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x39\n\x0eold_image_info\x18\n \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x39\n\x0enew_image_info\x18\x0b \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x18\n\rminor_version\x18\x0c \x01(\r:\x01\x30\x12;\n\npartitions\x18\r \x03(\x0b\x32\'.chromeos_update_engine.PartitionUpdate\x12\x15\n\rmax_timestamp\x18\x0e \x01(\x03\x12T\n\x1a\x64ynamic_partition_metadata\x18\x0f \x01(\x0b\x32\x30.chromeos_update_engine.DynamicPartitionMetadata\x12\x16\n\x0epartial_update\x18\x10 \x01(\x08\x42\x02H\x03')
+  serialized_options=b'H\003',
+  serialized_pb=b'\n\x15update_metadata.proto\x12\x16\x63hromeos_update_engine\"1\n\x06\x45xtent\x12\x13\n\x0bstart_block\x18\x01 \x01(\x04\x12\x12\n\nnum_blocks\x18\x02 \x01(\x04\"\x9f\x01\n\nSignatures\x12@\n\nsignatures\x18\x01 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x1aO\n\tSignature\x12\x13\n\x07version\x18\x01 \x01(\rB\x02\x18\x01\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\x1f\n\x17unpadded_signature_size\x18\x03 \x01(\x07\"+\n\rPartitionInfo\x12\x0c\n\x04size\x18\x01 \x01(\x04\x12\x0c\n\x04hash\x18\x02 \x01(\x0c\"w\n\tImageInfo\x12\r\n\x05\x62oard\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\x12\x0f\n\x07\x63hannel\x18\x03 \x01(\t\x12\x0f\n\x07version\x18\x04 \x01(\t\x12\x15\n\rbuild_channel\x18\x05 \x01(\t\x12\x15\n\rbuild_version\x18\x06 \x01(\t\"\xee\x03\n\x10InstallOperation\x12;\n\x04type\x18\x01 \x02(\x0e\x32-.chromeos_update_engine.InstallOperation.Type\x12\x13\n\x0b\x64\x61ta_offset\x18\x02 \x01(\x04\x12\x13\n\x0b\x64\x61ta_length\x18\x03 \x01(\x04\x12\x33\n\x0bsrc_extents\x18\x04 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\nsrc_length\x18\x05 \x01(\x04\x12\x33\n\x0b\x64st_extents\x18\x06 \x03(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x12\n\ndst_length\x18\x07 \x01(\x04\x12\x18\n\x10\x64\x61ta_sha256_hash\x18\x08 \x01(\x0c\x12\x17\n\x0fsrc_sha256_hash\x18\t \x01(\x0c\"\xad\x01\n\x04Type\x12\x0b\n\x07REPLACE\x10\x00\x12\x0e\n\nREPLACE_BZ\x10\x01\x12\x0c\n\x04MOVE\x10\x02\x1a\x02\x08\x01\x12\x0e\n\x06\x42SDIFF\x10\x03\x1a\x02\x08\x01\x12\x0f\n\x0bSOURCE_COPY\x10\x04\x12\x11\n\rSOURCE_BSDIFF\x10\x05\x12\x0e\n\nREPLACE_XZ\x10\x08\x12\x08\n\x04ZERO\x10\x06\x12\x0b\n\x07\x44ISCARD\x10\x07\x12\x11\n\rBROTLI_BSDIFF\x10\n\x12\x0c\n\x08PUFFDIFF\x10\t\"\xe8\x05\n\x0fPartitionUpdate\x12\x16\n\x0epartition_name\x18\x01 \x02(\t\x12\x17\n\x0frun_postinstall\x18\x02 \x01(\x08\x12\x18\n\x10postinstall_path\x18\x03 \x01(\t\x12\x17\n\x0f\x66ilesystem_type\x18\x04 \x01(\t\x12M\n\x17new_partition_signature\x18\x05 \x03(\x0b\x32,.chromeos_update_engine.Signatures.Signature\x12\x41\n\x12old_partition_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12\x41\n\x12new_partition_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfo\x12<\n\noperations\x18\x08 \x03(\x0b\x32(.chromeos_update_engine.InstallOperation\x12\x1c\n\x14postinstall_optional\x18\t \x01(\x08\x12=\n\x15hash_tree_data_extent\x18\n \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x38\n\x10hash_tree_extent\x18\x0b \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x1b\n\x13hash_tree_algorithm\x18\x0c \x01(\t\x12\x16\n\x0ehash_tree_salt\x18\r \x01(\x0c\x12\x37\n\x0f\x66\x65\x63_data_extent\x18\x0e \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x32\n\nfec_extent\x18\x0f \x01(\x0b\x32\x1e.chromeos_update_engine.Extent\x12\x14\n\tfec_roots\x18\x10 \x01(\r:\x01\x32\x12\x0f\n\x07version\x18\x11 \x01(\t\"L\n\x15\x44ynamicPartitionGroup\x12\x0c\n\x04name\x18\x01 \x02(\t\x12\x0c\n\x04size\x18\x02 \x01(\x04\x12\x17\n\x0fpartition_names\x18\x03 \x03(\t\"s\n\x18\x44ynamicPartitionMetadata\x12=\n\x06groups\x18\x01 \x03(\x0b\x32-.chromeos_update_engine.DynamicPartitionGroup\x12\x18\n\x10snapshot_enabled\x18\x02 \x01(\x08\"\xe1\x06\n\x14\x44\x65ltaArchiveManifest\x12H\n\x12install_operations\x18\x01 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12O\n\x19kernel_install_operations\x18\x02 \x03(\x0b\x32(.chromeos_update_engine.InstallOperationB\x02\x18\x01\x12\x18\n\nblock_size\x18\x03 \x01(\r:\x04\x34\x30\x39\x36\x12\x19\n\x11signatures_offset\x18\x04 \x01(\x04\x12\x17\n\x0fsignatures_size\x18\x05 \x01(\x04\x12\x42\n\x0fold_kernel_info\x18\x06 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_kernel_info\x18\x07 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fold_rootfs_info\x18\x08 \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x42\n\x0fnew_rootfs_info\x18\t \x01(\x0b\x32%.chromeos_update_engine.PartitionInfoB\x02\x18\x01\x12\x39\n\x0eold_image_info\x18\n \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x39\n\x0enew_image_info\x18\x0b \x01(\x0b\x32!.chromeos_update_engine.ImageInfo\x12\x18\n\rminor_version\x18\x0c \x01(\r:\x01\x30\x12;\n\npartitions\x18\r \x03(\x0b\x32\'.chromeos_update_engine.PartitionUpdate\x12\x15\n\rmax_timestamp\x18\x0e \x01(\x03\x12T\n\x1a\x64ynamic_partition_metadata\x18\x0f \x01(\x0b\x32\x30.chromeos_update_engine.DynamicPartitionMetadata\x12\x16\n\x0epartial_update\x18\x10 \x01(\x08\x42\x02H\x03'
 )
 
 
@@ -41,11 +39,11 @@
       type=None),
     _descriptor.EnumValueDescriptor(
       name='MOVE', index=2, number=2,
-      serialized_options=_b('\010\001'),
+      serialized_options=b'\010\001',
       type=None),
     _descriptor.EnumValueDescriptor(
       name='BSDIFF', index=3, number=3,
-      serialized_options=_b('\010\001'),
+      serialized_options=b'\010\001',
       type=None),
     _descriptor.EnumValueDescriptor(
       name='SOURCE_COPY', index=4, number=4,
@@ -135,11 +133,11 @@
       has_default_value=False, default_value=0,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=_b('\030\001'), file=DESCRIPTOR),
+      serialized_options=b'\030\001', file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='data', full_name='chromeos_update_engine.Signatures.Signature.data', index=1,
       number=2, type=12, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b(""),
+      has_default_value=False, default_value=b"",
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -214,7 +212,7 @@
     _descriptor.FieldDescriptor(
       name='hash', full_name='chromeos_update_engine.PartitionInfo.hash', index=1,
       number=2, type=12, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b(""),
+      has_default_value=False, default_value=b"",
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -245,42 +243,42 @@
     _descriptor.FieldDescriptor(
       name='board', full_name='chromeos_update_engine.ImageInfo.board', index=0,
       number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='key', full_name='chromeos_update_engine.ImageInfo.key', index=1,
       number=2, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='channel', full_name='chromeos_update_engine.ImageInfo.channel', index=2,
       number=3, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='version', full_name='chromeos_update_engine.ImageInfo.version', index=3,
       number=4, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='build_channel', full_name='chromeos_update_engine.ImageInfo.build_channel', index=4,
       number=5, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='build_version', full_name='chromeos_update_engine.ImageInfo.build_version', index=5,
       number=6, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -360,14 +358,14 @@
     _descriptor.FieldDescriptor(
       name='data_sha256_hash', full_name='chromeos_update_engine.InstallOperation.data_sha256_hash', index=7,
       number=8, type=12, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b(""),
+      has_default_value=False, default_value=b"",
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='src_sha256_hash', full_name='chromeos_update_engine.InstallOperation.src_sha256_hash', index=8,
       number=9, type=12, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b(""),
+      has_default_value=False, default_value=b"",
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -399,7 +397,7 @@
     _descriptor.FieldDescriptor(
       name='partition_name', full_name='chromeos_update_engine.PartitionUpdate.partition_name', index=0,
       number=1, type=9, cpp_type=9, label=2,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -413,14 +411,14 @@
     _descriptor.FieldDescriptor(
       name='postinstall_path', full_name='chromeos_update_engine.PartitionUpdate.postinstall_path', index=2,
       number=3, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='filesystem_type', full_name='chromeos_update_engine.PartitionUpdate.filesystem_type', index=3,
       number=4, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -476,14 +474,14 @@
     _descriptor.FieldDescriptor(
       name='hash_tree_algorithm', full_name='chromeos_update_engine.PartitionUpdate.hash_tree_algorithm', index=11,
       number=12, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='hash_tree_salt', full_name='chromeos_update_engine.PartitionUpdate.hash_tree_salt', index=12,
       number=13, type=12, cpp_type=9, label=1,
-      has_default_value=False, default_value=_b(""),
+      has_default_value=False, default_value=b"",
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -508,6 +506,13 @@
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
+    _descriptor.FieldDescriptor(
+      name='version', full_name='chromeos_update_engine.PartitionUpdate.version', index=16,
+      number=17, type=9, cpp_type=9, label=1,
+      has_default_value=False, default_value=b"".decode('utf-8'),
+      message_type=None, enum_type=None, containing_type=None,
+      is_extension=False, extension_scope=None,
+      serialized_options=None, file=DESCRIPTOR),
   ],
   extensions=[
   ],
@@ -521,7 +526,7 @@
   oneofs=[
   ],
   serialized_start=926,
-  serialized_end=1653,
+  serialized_end=1670,
 )
 
 
@@ -535,7 +540,7 @@
     _descriptor.FieldDescriptor(
       name='name', full_name='chromeos_update_engine.DynamicPartitionGroup.name', index=0,
       number=1, type=9, cpp_type=9, label=2,
-      has_default_value=False, default_value=_b("").decode('utf-8'),
+      has_default_value=False, default_value=b"".decode('utf-8'),
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
       serialized_options=None, file=DESCRIPTOR),
@@ -565,8 +570,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1655,
-  serialized_end=1731,
+  serialized_start=1672,
+  serialized_end=1748,
 )
 
 
@@ -603,8 +608,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1733,
-  serialized_end=1848,
+  serialized_start=1750,
+  serialized_end=1865,
 )
 
 
@@ -621,14 +626,14 @@
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=_b('\030\001'), file=DESCRIPTOR),
+      serialized_options=b'\030\001', file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='kernel_install_operations', full_name='chromeos_update_engine.DeltaArchiveManifest.kernel_install_operations', index=1,
       number=2, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=_b('\030\001'), file=DESCRIPTOR),
+      serialized_options=b'\030\001', file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='block_size', full_name='chromeos_update_engine.DeltaArchiveManifest.block_size', index=2,
       number=3, type=13, cpp_type=3, label=1,
@@ -656,28 +661,28 @@
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=_b('\030\001'), file=DESCRIPTOR),
+      serialized_options=b'\030\001', file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='new_kernel_info', full_name='chromeos_update_engine.DeltaArchiveManifest.new_kernel_info', index=6,
       number=7, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=_b('\030\001'), file=DESCRIPTOR),
+      serialized_options=b'\030\001', file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='old_rootfs_info', full_name='chromeos_update_engine.DeltaArchiveManifest.old_rootfs_info', index=7,
       number=8, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=_b('\030\001'), file=DESCRIPTOR),
+      serialized_options=b'\030\001', file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='new_rootfs_info', full_name='chromeos_update_engine.DeltaArchiveManifest.new_rootfs_info', index=8,
       number=9, type=11, cpp_type=10, label=1,
       has_default_value=False, default_value=None,
       message_type=None, enum_type=None, containing_type=None,
       is_extension=False, extension_scope=None,
-      serialized_options=_b('\030\001'), file=DESCRIPTOR),
+      serialized_options=b'\030\001', file=DESCRIPTOR),
     _descriptor.FieldDescriptor(
       name='old_image_info', full_name='chromeos_update_engine.DeltaArchiveManifest.old_image_info', index=9,
       number=10, type=11, cpp_type=10, label=1,
@@ -739,8 +744,8 @@
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=1851,
-  serialized_end=2716,
+  serialized_start=1868,
+  serialized_end=2733,
 )
 
 _SIGNATURES_SIGNATURE.containing_type = _SIGNATURES
diff --git a/stable/Android.bp b/stable/Android.bp
index 337ae96..a415ac5 100644
--- a/stable/Android.bp
+++ b/stable/Android.bp
@@ -18,6 +18,13 @@
 // ========================================================
 aidl_interface {
     name: "libupdate_engine_stable",
+
+    // This header library is available to core and product modules.
+    // Right now, vendor_available is the only way to specify this.
+    // vendor modules should NOT use this library.
+    // TODO(b/150902910): change this to product_available.
+    vendor_available: true,
+
     srcs: [
         "android/os/IUpdateEngineStable.aidl",
         "android/os/IUpdateEngineStableCallback.aidl",
@@ -40,10 +47,10 @@
 
 // update_engine_stable_client (type: executable)
 // ========================================================
-// update_engine console client installed to APEXes
+// update_engine console client installed to APEXes.
 cc_binary {
     name: "update_engine_stable_client",
-
+    product_specific: true,
     header_libs: [
         "libupdate_engine_headers",
     ],
diff --git a/stable/update_engine_stable_client.cc b/stable/update_engine_stable_client.cc
index da203c4..17f66b6 100644
--- a/stable/update_engine_stable_client.cc
+++ b/stable/update_engine_stable_client.cc
@@ -32,7 +32,6 @@
 #include <android/binder_ibinder.h>
 #include <common/error_code.h>
 #include <gflags/gflags.h>
-#include <utils/StrongPointer.h>
 
 namespace chromeos_update_engine::internal {
 
diff --git a/test_http_server.cc b/test_http_server.cc
index 4fc89e5..1c3a2e0 100644
--- a/test_http_server.cc
+++ b/test_http_server.cc
@@ -189,7 +189,8 @@
   ret = WriteString(fd,
                     string("HTTP/1.1 ") + Itoa(return_code) + " " +
                         GetHttpResponseDescription(return_code) +
-                        EOL "Content-Type: application/octet-stream" EOL);
+                        EOL "Content-Type: application/octet-stream" EOL
+                        "Connection: close" EOL);
   if (ret < 0)
     return -1;
   written += ret;
@@ -406,7 +407,9 @@
   if ((ret = WriteString(fd, "HTTP/1.1 " + Itoa(code) + " " + status + EOL)) <
       0)
     return;
+  WriteString(fd, "Connection: close" EOL);
   WriteString(fd, "Location: " + url + EOL);
+
 }
 
 // Generate a page not found error response with actual text payload. Return
diff --git a/update_attempter.cc b/update_attempter.cc
index f37973e..c4fe348 100644
--- a/update_attempter.cc
+++ b/update_attempter.cc
@@ -1376,6 +1376,7 @@
         case UpdateStatus::REPORTING_ERROR_EVENT:
         case UpdateStatus::ATTEMPTING_ROLLBACK:
         case UpdateStatus::DISABLED:
+        case UpdateStatus::CLEANUP_PREVIOUS_UPDATE:
           MarkDeltaUpdateFailure();
           break;
       }
diff --git a/update_attempter_android.cc b/update_attempter_android.cc
index 7fc13e1..3578d95 100644
--- a/update_attempter_android.cc
+++ b/update_attempter_android.cc
@@ -507,7 +507,7 @@
         return LogAndSetError(
             error, FROM_HERE, "Failed to hash " + partition_path);
       }
-      if (!DeltaPerformer::ValidateSourceHash(
+      if (!PartitionWriter::ValidateSourceHash(
               source_hash, operation, fd, &errorcode)) {
         return false;
       }
diff --git a/update_manager/boxed_value.cc b/update_manager/boxed_value.cc
index 4dff9ef..b031dfc 100644
--- a/update_manager/boxed_value.cc
+++ b/update_manager/boxed_value.cc
@@ -177,6 +177,8 @@
       return "Reporting Error Event";
     case Stage::kAttemptingRollback:
       return "Attempting Rollback";
+    case Stage::kCleanupPreviousUpdate:
+      return "Cleanup Previous Update";
   }
   NOTREACHED();
   return "Unknown";
diff --git a/update_manager/real_updater_provider.cc b/update_manager/real_updater_provider.cc
index 134db69..1f9af0d 100644
--- a/update_manager/real_updater_provider.cc
+++ b/update_manager/real_updater_provider.cc
@@ -169,6 +169,8 @@
      Stage::kReportingErrorEvent},
     {update_engine::kUpdateStatusAttemptingRollback,
      Stage::kAttemptingRollback},
+    {update_engine::kUpdateStatusCleanupPreviousUpdate,
+     Stage::kCleanupPreviousUpdate},
 };
 
 const Stage* StageVariable::GetValue(TimeDelta /* timeout */, string* errmsg) {
diff --git a/update_manager/updater_provider.h b/update_manager/updater_provider.h
index cb62623..81ffb41 100644
--- a/update_manager/updater_provider.h
+++ b/update_manager/updater_provider.h
@@ -36,6 +36,7 @@
   kUpdatedNeedReboot,
   kReportingErrorEvent,
   kAttemptingRollback,
+  kCleanupPreviousUpdate,
 };
 
 enum class UpdateRequestStatus {
diff --git a/update_metadata.proto b/update_metadata.proto
index e6a067e..373ee5e 100644
--- a/update_metadata.proto
+++ b/update_metadata.proto
@@ -225,6 +225,22 @@
   optional bytes src_sha256_hash = 9;
 }
 
+// Hints to VAB snapshot to skip writing some blocks if these blocks are
+// identical to the ones on the source image. The src & dst extents for each
+// CowMergeOperation should be contiguous, and they're a subset of an OTA
+// InstallOperation.
+// During merge time, we need to follow the pre-computed sequence to avoid
+// read after write, similar to the inplace update schema.
+message CowMergeOperation {
+  enum Type {
+    COW_COPY = 0;  // identical blocks
+  }
+  optional Type type = 1;
+
+  optional Extent src_extent = 2;
+  optional Extent dst_extent = 3;
+}
+
 // Describes the update to apply to a single partition.
 message PartitionUpdate {
   // A platform-specific name to identify the partition set being updated. For
@@ -288,6 +304,16 @@
 
   // The number of FEC roots.
   optional uint32 fec_roots = 16 [default = 2];
+
+  // Per-partition version used for downgrade detection, added
+  // as an effort to support partial updates. For most partitions,
+  // this is the build timestamp.
+  optional string version = 17;
+
+  // A sorted list of CowMergeOperation. When writing cow, we can choose to
+  // skip writing the raw bytes for these extents. During snapshot merge, the
+  // bytes will read from the source partitions instead.
+  repeated CowMergeOperation merge_operations = 18;
 }
 
 message DynamicPartitionGroup {