diff --git a/fs_mgr/libsnapshot/cow_api_test.cpp b/fs_mgr/libsnapshot/cow_api_test.cpp
index b75b154..7f7e40a 100644
--- a/fs_mgr/libsnapshot/cow_api_test.cpp
+++ b/fs_mgr/libsnapshot/cow_api_test.cpp
@@ -981,6 +981,137 @@
     ASSERT_EQ(num_clusters, 1);
 }
 
+TEST_F(CowTest, BigSeqOp) {
+    CowOptions options;
+    CowWriter writer(options);
+    const int seq_len = std::numeric_limits<uint16_t>::max() / sizeof(uint32_t) + 1;
+    uint32_t sequence[seq_len];
+    for (int i = 0; i < seq_len; i++) {
+        sequence[i] = i + 1;
+    }
+
+    ASSERT_TRUE(writer.Initialize(cow_->fd));
+
+    ASSERT_TRUE(writer.AddSequenceData(seq_len, sequence));
+    ASSERT_TRUE(writer.AddZeroBlocks(1, seq_len));
+    ASSERT_TRUE(writer.Finalize());
+
+    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+    auto iter = reader.GetRevMergeOpIter();
+
+    for (int i = 0; i < seq_len; i++) {
+        ASSERT_TRUE(!iter->Done());
+        const auto& op = iter->Get();
+
+        ASSERT_EQ(op.new_block, seq_len - i);
+
+        iter->Next();
+    }
+    ASSERT_TRUE(iter->Done());
+}
+
+TEST_F(CowTest, RevMergeOpItrTest) {
+    CowOptions options;
+    options.cluster_ops = 5;
+    options.num_merge_ops = 1;
+    CowWriter writer(options);
+    uint32_t sequence[] = {2, 10, 6, 7, 3, 5};
+
+    ASSERT_TRUE(writer.Initialize(cow_->fd));
+
+    ASSERT_TRUE(writer.AddSequenceData(6, sequence));
+    ASSERT_TRUE(writer.AddCopy(6, 3));
+    ASSERT_TRUE(writer.AddZeroBlocks(12, 1));
+    ASSERT_TRUE(writer.AddZeroBlocks(8, 1));
+    ASSERT_TRUE(writer.AddZeroBlocks(11, 1));
+    ASSERT_TRUE(writer.AddCopy(3, 5));
+    ASSERT_TRUE(writer.AddCopy(2, 1));
+    ASSERT_TRUE(writer.AddZeroBlocks(4, 1));
+    ASSERT_TRUE(writer.AddZeroBlocks(9, 1));
+    ASSERT_TRUE(writer.AddCopy(5, 6));
+    ASSERT_TRUE(writer.AddZeroBlocks(1, 1));
+    ASSERT_TRUE(writer.AddCopy(10, 2));
+    ASSERT_TRUE(writer.AddCopy(7, 4));
+    ASSERT_TRUE(writer.Finalize());
+
+    // New block in cow order is 6, 12, 8, 11, 3, 2, 4, 9, 5, 1, 10, 7
+    // New block in merge order is 2, 10, 6, 7, 3, 5, 12, 11, 9, 8, 4, 1
+    // RevMergeOrder is 1, 4, 8, 9, 11, 12, 5, 3, 7, 6, 10, 2
+    // new block 2 is "already merged", so will be left out.
+
+    std::vector<uint64_t> revMergeOpSequence = {1, 4, 8, 9, 11, 12, 5, 3, 7, 6, 10};
+
+    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+    auto iter = reader.GetRevMergeOpIter();
+    auto expected_new_block = revMergeOpSequence.begin();
+
+    while (!iter->Done() && expected_new_block != revMergeOpSequence.end()) {
+        const auto& op = iter->Get();
+
+        ASSERT_EQ(op.new_block, *expected_new_block);
+
+        iter->Next();
+        expected_new_block++;
+    }
+    ASSERT_EQ(expected_new_block, revMergeOpSequence.end());
+    ASSERT_TRUE(iter->Done());
+}
+
+TEST_F(CowTest, LegacyRevMergeOpItrTest) {
+    CowOptions options;
+    options.cluster_ops = 5;
+    options.num_merge_ops = 1;
+    CowWriter writer(options);
+
+    ASSERT_TRUE(writer.Initialize(cow_->fd));
+
+    ASSERT_TRUE(writer.AddCopy(2, 1));
+    ASSERT_TRUE(writer.AddCopy(10, 2));
+    ASSERT_TRUE(writer.AddCopy(6, 3));
+    ASSERT_TRUE(writer.AddCopy(7, 4));
+    ASSERT_TRUE(writer.AddCopy(3, 5));
+    ASSERT_TRUE(writer.AddCopy(5, 6));
+    ASSERT_TRUE(writer.AddZeroBlocks(12, 1));
+    ASSERT_TRUE(writer.AddZeroBlocks(8, 1));
+    ASSERT_TRUE(writer.AddZeroBlocks(11, 1));
+    ASSERT_TRUE(writer.AddZeroBlocks(4, 1));
+    ASSERT_TRUE(writer.AddZeroBlocks(9, 1));
+    ASSERT_TRUE(writer.AddZeroBlocks(1, 1));
+
+    ASSERT_TRUE(writer.Finalize());
+
+    // New block in cow order is 2, 10, 6, 7, 3, 5, 12, 8, 11, 4, 9, 1
+    // New block in merge order is 2, 10, 6, 7, 3, 5, 12, 11, 9, 8, 4, 1
+    // RevMergeOrder is 1, 4, 8, 9, 11, 12, 5, 3, 7, 6, 10, 2
+    // new block 2 is "already merged", so will be left out.
+
+    std::vector<uint64_t> revMergeOpSequence = {1, 4, 8, 9, 11, 12, 5, 3, 7, 6, 10};
+
+    ASSERT_EQ(lseek(cow_->fd, 0, SEEK_SET), 0);
+
+    CowReader reader;
+    ASSERT_TRUE(reader.Parse(cow_->fd));
+    auto iter = reader.GetRevMergeOpIter();
+    auto expected_new_block = revMergeOpSequence.begin();
+
+    while (!iter->Done() && expected_new_block != revMergeOpSequence.end()) {
+        const auto& op = iter->Get();
+
+        ASSERT_EQ(op.new_block, *expected_new_block);
+
+        iter->Next();
+        expected_new_block++;
+    }
+    ASSERT_EQ(expected_new_block, revMergeOpSequence.end());
+    ASSERT_TRUE(iter->Done());
+}
+
 }  // namespace snapshot
 }  // namespace android
 
diff --git a/fs_mgr/libsnapshot/cow_reader.cpp b/fs_mgr/libsnapshot/cow_reader.cpp
index 63795f2..57d108f 100644
--- a/fs_mgr/libsnapshot/cow_reader.cpp
+++ b/fs_mgr/libsnapshot/cow_reader.cpp
@@ -19,6 +19,9 @@
 
 #include <limits>
 #include <optional>
+#include <set>
+#include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include <android-base/file.h>
@@ -127,7 +130,10 @@
         return false;
     }
 
-    return ParseOps(label);
+    if (!ParseOps(label)) {
+        return false;
+    }
+    return PrepMergeOps();
 }
 
 bool CowReader::ParseOps(std::optional<uint64_t> label) {
@@ -253,7 +259,7 @@
             LOG(ERROR) << "ops checksum does not match";
             return false;
         }
-        SHA256(ops_buffer.get()->data(), footer_->op.ops_size, csum);
+        SHA256(ops_buffer->data(), footer_->op.ops_size, csum);
         if (memcmp(csum, footer_->data.ops_checksum, sizeof(csum)) != 0) {
             LOG(ERROR) << "ops checksum does not match";
             return false;
@@ -266,6 +272,161 @@
     return true;
 }
 
+//
+// This sets up the data needed for MergeOpIter. MergeOpIter presents
+// data in the order we intend to merge in.
+//
+// We merge all order sensitive ops up front, and sort the rest to allow for
+// batch merging. Order sensitive ops can either be presented in their proper
+// order in the cow, or be ordered by sequence ops (kCowSequenceOp), in which
+// case we want to merge those ops first, followed by any ops not specified by
+// new_block value by the sequence op, in sorted order.
+// We will re-arrange the vector in such a way that
+// kernel can batch merge. Ex:
+//
+// Existing COW format; All the copy operations
+// are at the beginning.
+// =======================================
+// Copy-op-1    - cow_op->new_block = 1
+// Copy-op-2    - cow_op->new_block = 2
+// Copy-op-3    - cow_op->new_block = 3
+// Replace-op-4 - cow_op->new_block = 6
+// Replace-op-5 - cow_op->new_block = 4
+// Replace-op-6 - cow_op->new_block = 8
+// Replace-op-7 - cow_op->new_block = 9
+// Zero-op-8    - cow_op->new_block = 7
+// Zero-op-9    - cow_op->new_block = 5
+// =======================================
+//
+// First find the operation which isn't a copy-op
+// and then sort all the operations in descending order
+// with the key being cow_op->new_block (source block)
+//
+// The data-structure will look like:
+//
+// =======================================
+// Copy-op-1    - cow_op->new_block = 1
+// Copy-op-2    - cow_op->new_block = 2
+// Copy-op-3    - cow_op->new_block = 3
+// Replace-op-7 - cow_op->new_block = 9
+// Replace-op-6 - cow_op->new_block = 8
+// Zero-op-8    - cow_op->new_block = 7
+// Replace-op-4 - cow_op->new_block = 6
+// Zero-op-9    - cow_op->new_block = 5
+// Replace-op-5 - cow_op->new_block = 4
+// =======================================
+//
+// Daemon will read the above data-structure in reverse-order
+// when reading metadata. Thus, kernel will get the metadata
+// in the following order:
+//
+// ========================================
+// Replace-op-5 - cow_op->new_block = 4
+// Zero-op-9    - cow_op->new_block = 5
+// Replace-op-4 - cow_op->new_block = 6
+// Zero-op-8    - cow_op->new_block = 7
+// Replace-op-6 - cow_op->new_block = 8
+// Replace-op-7 - cow_op->new_block = 9
+// Copy-op-3    - cow_op->new_block = 3
+// Copy-op-2    - cow_op->new_block = 2
+// Copy-op-1    - cow_op->new_block = 1
+// ===========================================
+//
+// When merging begins, kernel will start from the last
+// metadata which was read: In the above format, Copy-op-1
+// will be the first merge operation.
+//
+// Now, batching of the merge operations happens only when
+// 1: origin block numbers in the base device are contiguous
+// (cow_op->new_block) and,
+// 2: cow block numbers which are assigned by daemon in ReadMetadata()
+// are contiguous. These are monotonically increasing numbers.
+//
+// When both (1) and (2) are true, kernel will batch merge the operations.
+// In the above case, we have to ensure that the copy operations
+// are merged first before replace operations are done. Hence,
+// we will not change the order of copy operations. Since,
+// cow_op->new_block numbers are contiguous, we will ensure that the
+// cow block numbers assigned in ReadMetadata() for these respective copy
+// operations are not contiguous forcing kernel to issue merge for each
+// copy operations without batch merging.
+//
+// For all the other operations viz. Replace and Zero op, the cow block
+// numbers assigned by daemon will be contiguous allowing kernel to batch
+// merge.
+//
+// The final format after assiging COW block numbers by the daemon will
+// look something like:
+//
+// =========================================================
+// Replace-op-5 - cow_op->new_block = 4  cow-block-num = 2
+// Zero-op-9    - cow_op->new_block = 5  cow-block-num = 3
+// Replace-op-4 - cow_op->new_block = 6  cow-block-num = 4
+// Zero-op-8    - cow_op->new_block = 7  cow-block-num = 5
+// Replace-op-6 - cow_op->new_block = 8  cow-block-num = 6
+// Replace-op-7 - cow_op->new_block = 9  cow-block-num = 7
+// Copy-op-3    - cow_op->new_block = 3  cow-block-num = 9
+// Copy-op-2    - cow_op->new_block = 2  cow-block-num = 11
+// Copy-op-1    - cow_op->new_block = 1  cow-block-num = 13
+// ==========================================================
+//
+// Merge sequence will look like:
+//
+// Merge-1 - Batch-merge { Copy-op-1, Copy-op-2, Copy-op-3 }
+// Merge-2 - Batch-merge {Replace-op-7, Replace-op-6, Zero-op-8,
+//                        Replace-op-4, Zero-op-9, Replace-op-5 }
+//==============================================================
+bool CowReader::PrepMergeOps() {
+    auto merge_op_blocks = std::make_shared<std::vector<uint32_t>>();
+    std::set<int, std::greater<int>> other_ops;
+    auto seq_ops_set = std::unordered_set<uint32_t>();
+    auto block_map = std::make_shared<std::unordered_map<uint32_t, int>>();
+    int num_seqs = 0;
+    size_t read;
+
+    for (int i = 0; i < ops_->size(); i++) {
+        auto& current_op = ops_->data()[i];
+
+        if (current_op.type == kCowSequenceOp) {
+            size_t seq_len = current_op.data_length / sizeof(uint32_t);
+
+            merge_op_blocks->resize(merge_op_blocks->size() + seq_len);
+            if (!GetRawBytes(current_op.source, &merge_op_blocks->data()[num_seqs],
+                             current_op.data_length, &read)) {
+                PLOG(ERROR) << "Failed to read sequence op!";
+                return false;
+            }
+            for (int j = num_seqs; j < num_seqs + seq_len; j++) {
+                seq_ops_set.insert(merge_op_blocks->data()[j]);
+            }
+            num_seqs += seq_len;
+        }
+
+        if (IsMetadataOp(current_op)) {
+            continue;
+        }
+
+        if (!has_seq_ops_ && IsOrderedOp(current_op)) {
+            merge_op_blocks->emplace_back(current_op.new_block);
+        } else if (seq_ops_set.count(current_op.new_block) == 0) {
+            other_ops.insert(current_op.new_block);
+        }
+        block_map->insert({current_op.new_block, i});
+    }
+    merge_op_blocks->reserve(merge_op_blocks->size() + other_ops.size());
+    for (auto block : other_ops) {
+        merge_op_blocks->emplace_back(block);
+    }
+    merge_op_blocks_ = merge_op_blocks;
+    block_map_ = block_map;
+
+    if (header_.num_merge_ops > 0) {
+        merge_op_blocks_->erase(merge_op_blocks_->begin(),
+                                merge_op_blocks_->begin() + header_.num_merge_ops);
+    }
+    return true;
+}
+
 void CowReader::InitializeMerge() {
     uint64_t num_copy_ops = 0;
 
@@ -481,6 +642,47 @@
     return (*op_riter_);
 }
 
+class CowRevMergeOpIter final : public ICowOpIter {
+  public:
+    explicit CowRevMergeOpIter(std::shared_ptr<std::vector<CowOperation>> ops,
+                               std::shared_ptr<std::vector<uint32_t>> merge_op_blocks,
+                               std::shared_ptr<std::unordered_map<uint32_t, int>> map);
+
+    bool Done() override;
+    const CowOperation& Get() override;
+    void Next() override;
+
+  private:
+    std::shared_ptr<std::vector<CowOperation>> ops_;
+    std::shared_ptr<std::vector<uint32_t>> merge_op_blocks_;
+    std::shared_ptr<std::unordered_map<uint32_t, int>> map_;
+    std::vector<uint32_t>::reverse_iterator block_riter_;
+};
+
+CowRevMergeOpIter::CowRevMergeOpIter(std::shared_ptr<std::vector<CowOperation>> ops,
+                                     std::shared_ptr<std::vector<uint32_t>> merge_op_blocks,
+                                     std::shared_ptr<std::unordered_map<uint32_t, int>> map) {
+    ops_ = ops;
+    merge_op_blocks_ = merge_op_blocks;
+    map_ = map;
+
+    block_riter_ = merge_op_blocks->rbegin();
+}
+
+bool CowRevMergeOpIter::Done() {
+    return block_riter_ == merge_op_blocks_->rend();
+}
+
+void CowRevMergeOpIter::Next() {
+    CHECK(!Done());
+    block_riter_++;
+}
+
+const CowOperation& CowRevMergeOpIter::Get() {
+    CHECK(!Done());
+    return ops_->data()[map_->at(*block_riter_)];
+}
+
 std::unique_ptr<ICowOpIter> CowReader::GetOpIter() {
     return std::make_unique<CowOpIter>(ops_);
 }
@@ -489,6 +691,10 @@
     return std::make_unique<CowOpReverseIter>(ops_);
 }
 
+std::unique_ptr<ICowOpIter> CowReader::GetRevMergeOpIter() {
+    return std::make_unique<CowRevMergeOpIter>(ops_, merge_op_blocks_, block_map_);
+}
+
 bool CowReader::GetRawBytes(uint64_t offset, void* buffer, size_t len, size_t* read) {
     // Validate the offset, taking care to acknowledge possible overflow of offset+len.
     if (offset < header_.header_size || offset >= fd_size_ - sizeof(CowFooter) || len >= fd_size_ ||
diff --git a/fs_mgr/libsnapshot/cow_writer.cpp b/fs_mgr/libsnapshot/cow_writer.cpp
index df2bc96..51505bf 100644
--- a/fs_mgr/libsnapshot/cow_writer.cpp
+++ b/fs_mgr/libsnapshot/cow_writer.cpp
@@ -102,7 +102,7 @@
     header_.footer_size = sizeof(CowFooter);
     header_.op_size = sizeof(CowOperation);
     header_.block_size = options_.block_size;
-    header_.num_merge_ops = 0;
+    header_.num_merge_ops = options_.num_merge_ops;
     header_.cluster_ops = options_.cluster_ops;
     header_.buffer_size = 0;
     footer_ = {};
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
index ed7890e..cdfe2f1 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
@@ -19,6 +19,7 @@
 #include <functional>
 #include <memory>
 #include <optional>
+#include <unordered_map>
 
 #include <android-base/unique_fd.h>
 #include <libsnapshot/cow_format.h>
@@ -77,6 +78,9 @@
     // Return an reverse iterator for retrieving CowOperation entries.
     virtual std::unique_ptr<ICowOpIter> GetRevOpIter() = 0;
 
+    // Return an iterator for retrieving CowOperation entries in merge order
+    virtual std::unique_ptr<ICowOpIter> GetRevMergeOpIter() = 0;
+
     // Get decoded bytes from the data section, handling any decompression.
     // All retrieved data is passed to the sink.
     virtual bool ReadData(const CowOperation& op, IByteSink* sink) = 0;
@@ -120,6 +124,7 @@
     // value of these will never be null.
     std::unique_ptr<ICowOpIter> GetOpIter() override;
     std::unique_ptr<ICowOpIter> GetRevOpIter() override;
+    std::unique_ptr<ICowOpIter> GetRevMergeOpIter() override;
 
     bool ReadData(const CowOperation& op, IByteSink* sink) override;
 
@@ -138,6 +143,7 @@
 
   private:
     bool ParseOps(std::optional<uint64_t> label);
+    bool PrepMergeOps();
     uint64_t FindNumCopyops();
 
     android::base::unique_fd owned_fd_;
@@ -147,6 +153,8 @@
     uint64_t fd_size_;
     std::optional<uint64_t> last_label_;
     std::shared_ptr<std::vector<CowOperation>> ops_;
+    std::shared_ptr<std::vector<uint32_t>> merge_op_blocks_;
+    std::shared_ptr<std::unordered_map<uint32_t, int>> block_map_;
     uint64_t total_data_ops_;
     uint64_t copy_ops_;
     bool has_seq_ops_;
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
index 03ffb46..4a807fb 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
@@ -38,6 +38,9 @@
     uint32_t cluster_ops = 200;
 
     bool scratch_space = true;
+
+    // Preset the number of merged ops. Only useful for testing.
+    uint64_t num_merge_ops = 0;
 };
 
 // Interface for writing to a snapuserd COW. All operations are ordered; merges
