diff --git a/payload_generator/cow_size_estimator.cc b/payload_generator/cow_size_estimator.cc
new file mode 100644
index 0000000..662a3ca
--- /dev/null
+++ b/payload_generator/cow_size_estimator.cc
@@ -0,0 +1,110 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/payload_generator/cow_size_estimator.h"
+
+#include <utility>
+#include <vector>
+
+#include <libsnapshot/cow_writer.h>
+
+#include "android-base/unique_fd.h"
+#include "update_engine/common/cow_operation_convert.h"
+#include "update_engine/payload_consumer/vabc_partition_writer.h"
+#include "update_engine/update_metadata.pb.h"
+
+namespace chromeos_update_engine {
+using android::snapshot::CowWriter;
+
+void PerformReplaceOp(const InstallOperation& op,
+                      CowWriter* writer,
+                      FileDescriptorPtr target_fd,
+                      size_t block_size) {
+  std::vector<unsigned char> buffer;
+  for (const auto& extent : op.dst_extents()) {
+    buffer.resize(extent.num_blocks() * block_size);
+    // No need to read from payload.bin then decompress, just read from target
+    // directly.
+    ssize_t bytes_read = 0;
+    auto success = utils::PReadAll(target_fd,
+                                   buffer.data(),
+                                   buffer.size(),
+                                   extent.start_block() * block_size,
+                                   &bytes_read);
+    CHECK(success);
+    CHECK_EQ(static_cast<size_t>(bytes_read), buffer.size());
+    writer->AddRawBlocks(extent.start_block(), buffer.data(), buffer.size());
+  }
+}
+
+void PerformZeroOp(const InstallOperation& op,
+                   CowWriter* writer,
+                   size_t block_size) {
+  for (const auto& extent : op.dst_extents()) {
+    writer->AddZeroBlocks(extent.start_block(), extent.num_blocks());
+  }
+}
+
+size_t EstimateCowSize(
+    FileDescriptorPtr source_fd,
+    FileDescriptorPtr target_fd,
+    const google::protobuf::RepeatedPtrField<InstallOperation>& operations,
+    const google::protobuf::RepeatedPtrField<CowMergeOperation>&
+        merge_operations,
+    size_t block_size) {
+  android::snapshot::CowWriter cow_writer{
+      {.block_size = static_cast<uint32_t>(block_size), .compression = "gz"}};
+  // CowWriter treats -1 as special value, will discard all the data but still
+  // reports Cow size. Good for estimation purposes
+  cow_writer.Initialize(android::base::borrowed_fd{-1});
+
+  const auto converted = ConvertToCowOperations(operations, merge_operations);
+  VABCPartitionWriter::WriteAllCowOps(
+      block_size, converted, &cow_writer, source_fd);
+  cow_writer.AddLabel(0);
+  for (const auto& op : operations) {
+    switch (op.type()) {
+      case InstallOperation::REPLACE:
+      case InstallOperation::REPLACE_BZ:
+      case InstallOperation::REPLACE_XZ:
+        PerformReplaceOp(op, &cow_writer, target_fd, block_size);
+        break;
+      case InstallOperation::ZERO:
+      case InstallOperation::DISCARD:
+        PerformZeroOp(op, &cow_writer, block_size);
+        break;
+      case InstallOperation::SOURCE_COPY:
+      case InstallOperation::MOVE:
+        // Already handeled by WriteAllCowOps,
+        break;
+      case InstallOperation::SOURCE_BSDIFF:
+      case InstallOperation::BROTLI_BSDIFF:
+      case InstallOperation::PUFFDIFF:
+      case InstallOperation::BSDIFF:
+        // We might do something special by adding CowBsdiff to CowWriter.
+        // For now proceed the same way as normal REPLACE operation.
+        PerformReplaceOp(op, &cow_writer, target_fd, block_size);
+        break;
+    }
+    // Arbitrary label number, we won't be resuming use these labels here.
+    // They are emitted just to keep size estimates accurate. As update_engine
+    // emits 1 label for every op.
+    cow_writer.AddLabel(2);
+  }
+  // TODO(zhangkelvin) Take FEC extents into account once VABC stabilizes
+  return cow_writer.GetCowSize();
+}
+}  // namespace chromeos_update_engine
diff --git a/payload_generator/cow_size_estimator.h b/payload_generator/cow_size_estimator.h
new file mode 100644
index 0000000..cba89b5
--- /dev/null
+++ b/payload_generator/cow_size_estimator.h
@@ -0,0 +1,36 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+#include <cstddef>
+
+#include <update_engine/update_metadata.pb.h>
+
+#include "update_engine/payload_consumer/file_descriptor.h"
+
+namespace chromeos_update_engine {
+// Given file descriptor to the source image, target image, and list of
+// operations, estimate the size of COW image if the operations are applied on
+// Virtual AB Compression enabled device. This is intended to be used by update
+// generators to put an estimate cow size in OTA payload. When installing an OTA
+// update, libsnapshot will take this estimate as a hint to allocate spaces.
+size_t EstimateCowSize(
+    FileDescriptorPtr source_fd,
+    FileDescriptorPtr target_fd,
+    const google::protobuf::RepeatedPtrField<InstallOperation>& operations,
+    const google::protobuf::RepeatedPtrField<CowMergeOperation>&
+        merge_operations,
+    size_t block_size);
+
+}  // namespace chromeos_update_engine
diff --git a/payload_generator/cow_size_estimator_stub.cc b/payload_generator/cow_size_estimator_stub.cc
new file mode 100644
index 0000000..9d94d63
--- /dev/null
+++ b/payload_generator/cow_size_estimator_stub.cc
@@ -0,0 +1,31 @@
+//
+// Copyright (C) 2020 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include "update_engine/payload_generator/cow_size_estimator.h"
+
+namespace chromeos_update_engine {
+
+size_t EstimateCowSize(
+    FileDescriptorPtr source_fd,
+    FileDescriptorPtr target_fd,
+    const google::protobuf::RepeatedPtrField<InstallOperation>& operations,
+    const google::protobuf::RepeatedPtrField<CowMergeOperation>&
+        merge_operations,
+    size_t block_size) {
+  return 0;
+}
+
+}  // namespace chromeos_update_engine
diff --git a/payload_generator/delta_diff_generator.cc b/payload_generator/delta_diff_generator.cc
index ff8b0da..47c92e0 100644
--- a/payload_generator/delta_diff_generator.cc
+++ b/payload_generator/delta_diff_generator.cc
@@ -33,14 +33,17 @@
 
 #include "update_engine/common/utils.h"
 #include "update_engine/payload_consumer/delta_performer.h"
+#include "update_engine/payload_consumer/file_descriptor.h"
 #include "update_engine/payload_consumer/payload_constants.h"
 #include "update_engine/payload_generator/ab_generator.h"
 #include "update_engine/payload_generator/annotated_operation.h"
 #include "update_engine/payload_generator/blob_file_writer.h"
+#include "update_engine/payload_generator/cow_size_estimator.h"
 #include "update_engine/payload_generator/delta_diff_utils.h"
 #include "update_engine/payload_generator/full_update_generator.h"
 #include "update_engine/payload_generator/merge_sequence_generator.h"
 #include "update_engine/payload_generator/payload_file.h"
+#include "update_engine/update_metadata.pb.h"
 
 using std::string;
 using std::unique_ptr;
@@ -53,6 +56,18 @@
 const size_t kBlockSize = 4096;  // bytes
 
 class PartitionProcessor : public base::DelegateSimpleThread::Delegate {
+  bool IsDynamicPartition(const std::string& partition_name) {
+    for (const auto& group :
+         config_.target.dynamic_partition_metadata->groups()) {
+      const auto& names = group.partition_names();
+      if (std::find(names.begin(), names.end(), partition_name) !=
+          names.end()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
  public:
   explicit PartitionProcessor(
       const PayloadGenerationConfig& config,
@@ -61,6 +76,7 @@
       BlobFileWriter* file_writer,
       std::vector<AnnotatedOperation>* aops,
       std::vector<CowMergeOperation>* cow_merge_sequence,
+      size_t* cow_size,
       std::unique_ptr<chromeos_update_engine::OperationsGenerator> strategy)
       : config_(config),
         old_part_(old_part),
@@ -68,11 +84,13 @@
         file_writer_(file_writer),
         aops_(aops),
         cow_merge_sequence_(cow_merge_sequence),
+        cow_size_(cow_size),
         strategy_(std::move(strategy)) {}
   PartitionProcessor(PartitionProcessor&&) noexcept = default;
+
   void Run() override {
     LOG(INFO) << "Started an async task to process partition "
-              << old_part_.name;
+              << new_part_.name;
     bool success = strategy_->GenerateOperations(
         config_, old_part_, new_part_, file_writer_, aops_);
     if (!success) {
@@ -85,13 +103,38 @@
     bool snapshot_enabled =
         config_.target.dynamic_partition_metadata &&
         config_.target.dynamic_partition_metadata->snapshot_enabled();
-    if (old_part_.path.empty() || !snapshot_enabled) {
+    if (!snapshot_enabled || !IsDynamicPartition(new_part_.name)) {
       return;
     }
-    auto generator = MergeSequenceGenerator::Create(*aops_);
-    if (!generator || !generator->Generate(cow_merge_sequence_)) {
-      LOG(FATAL) << "Failed to generate merge sequence";
+    if (!old_part_.path.empty()) {
+      auto generator = MergeSequenceGenerator::Create(*aops_);
+      if (!generator || !generator->Generate(cow_merge_sequence_)) {
+        LOG(FATAL) << "Failed to generate merge sequence";
+      }
     }
+
+    LOG(INFO) << "Estimating COW size for partition: " << new_part_.name;
+    // Need the contents of source/target image bytes when doing
+    // dry run.
+    FileDescriptorPtr source_fd{new EintrSafeFileDescriptor()};
+    source_fd->Open(old_part_.path.c_str(), O_RDONLY);
+
+    auto target_fd = std::make_unique<EintrSafeFileDescriptor>();
+    target_fd->Open(new_part_.path.c_str(), O_RDONLY);
+
+    google::protobuf::RepeatedPtrField<InstallOperation> operations;
+
+    for (const AnnotatedOperation& aop : *aops_) {
+      *operations.Add() = aop.op;
+    }
+    *cow_size_ = EstimateCowSize(
+        source_fd,
+        std::move(target_fd),
+        operations,
+        {cow_merge_sequence_->begin(), cow_merge_sequence_->end()},
+        config_.block_size);
+    LOG(INFO) << "Estimated COW size for partition: " << new_part_.name << " "
+              << *cow_size_;
   }
 
  private:
@@ -101,6 +144,7 @@
   BlobFileWriter* file_writer_;
   std::vector<AnnotatedOperation>* aops_;
   std::vector<CowMergeOperation>* cow_merge_sequence_;
+  size_t* cow_size_;
   std::unique_ptr<chromeos_update_engine::OperationsGenerator> strategy_;
   DISALLOW_COPY_AND_ASSIGN(PartitionProcessor);
 };
@@ -130,8 +174,12 @@
     PartitionConfig empty_part("");
     std::vector<std::vector<AnnotatedOperation>> all_aops;
     all_aops.resize(config.target.partitions.size());
+
     std::vector<std::vector<CowMergeOperation>> all_merge_sequences;
     all_merge_sequences.resize(config.target.partitions.size());
+
+    std::vector<size_t> all_cow_sizes(config.target.partitions.size(), 0);
+
     std::vector<PartitionProcessor> partition_tasks{};
     auto thread_count = std::min<int>(diff_utils::GetMaxThreads(),
                                       config.target.partitions.size());
@@ -163,6 +211,7 @@
                                                    &blob_file,
                                                    &all_aops[i],
                                                    &all_merge_sequences[i],
+                                                   &all_cow_sizes[i],
                                                    std::move(strategy)));
     }
     thread_pool.Start();
@@ -179,7 +228,8 @@
           payload.AddPartition(old_part,
                                new_part,
                                std::move(all_aops[i]),
-                               std::move(all_merge_sequences[i])));
+                               std::move(all_merge_sequences[i]),
+                               all_cow_sizes[i]));
     }
   }
   data_file.CloseFd();
diff --git a/payload_generator/payload_file.cc b/payload_generator/payload_file.cc
index 74423d1..33c0749 100644
--- a/payload_generator/payload_file.cc
+++ b/payload_generator/payload_file.cc
@@ -80,8 +80,10 @@
 bool PayloadFile::AddPartition(const PartitionConfig& old_conf,
                                const PartitionConfig& new_conf,
                                vector<AnnotatedOperation> aops,
-                               vector<CowMergeOperation> merge_sequence) {
+                               vector<CowMergeOperation> merge_sequence,
+                               size_t cow_size) {
   Partition part;
+  part.cow_size = cow_size;
   part.name = new_conf.name;
   part.aops = std::move(aops);
   part.cow_merge_sequence = std::move(merge_sequence);
@@ -129,6 +131,9 @@
     if (!part.version.empty()) {
       partition->set_version(part.version);
     }
+    if (part.cow_size > 0) {
+      partition->set_estimate_cow_size(part.cow_size);
+    }
     if (part.postinstall.run) {
       partition->set_run_postinstall(true);
       if (!part.postinstall.path.empty())
diff --git a/payload_generator/payload_file.h b/payload_generator/payload_file.h
index 8b17956..3a45793 100644
--- a/payload_generator/payload_file.h
+++ b/payload_generator/payload_file.h
@@ -44,7 +44,8 @@
   bool AddPartition(const PartitionConfig& old_conf,
                     const PartitionConfig& new_conf,
                     std::vector<AnnotatedOperation> aops,
-                    std::vector<CowMergeOperation> merge_sequence);
+                    std::vector<CowMergeOperation> merge_sequence,
+                    size_t cow_size);
 
   // Write the payload to the |payload_file| file. The operations reference
   // blobs in the |data_blobs_path| file and the blobs will be reordered in the
@@ -100,6 +101,7 @@
     VerityConfig verity;
     // Per partition timestamp.
     std::string version;
+    size_t cow_size;
   };
 
   std::vector<Partition> part_vec_;
diff --git a/payload_generator/payload_properties_unittest.cc b/payload_generator/payload_properties_unittest.cc
index ed936ff..0ff364f 100644
--- a/payload_generator/payload_properties_unittest.cc
+++ b/payload_generator/payload_properties_unittest.cc
@@ -88,7 +88,7 @@
     EXPECT_TRUE(strategy->GenerateOperations(
         config, old_part, new_part, &blob_file_writer, &aops));
 
-    payload.AddPartition(old_part, new_part, aops, {});
+    payload.AddPartition(old_part, new_part, aops, {}, 0);
 
     uint64_t metadata_size;
     EXPECT_TRUE(payload.WritePayload(
