libsnapshot:snapuserd: Batch merge copy operation

Allow batch merge of copy operations during merge.
When metadata is read from COW device, assign
the chunk-id by validating there is no overlap
of copy operations. Furthermore, detect the blocks
which are contiguous and batch merge them.

No regression in merge time for full OTA (~35-40 seconds)

Merge time for incremental OTA of ~200M takes about 2 minutes
as compared to 15-20+ minutes without this change.

Add unit test to test ReadMetadata() functionality.

Multiple incremental OTA and full OTA test done on pixel.
adb reboot during merge and validate the merge resume operations.

Bug: 179629624
Test: incremental OTA and full OTA on pixel,
      cow_snapuserd_test
Signed-off-by: Akilesh Kailash <akailash@google.com>
Change-Id: I4cd84e4923e42afacc796b8cec01738b1bb1f420
diff --git a/fs_mgr/libsnapshot/Android.bp b/fs_mgr/libsnapshot/Android.bp
index d36a7f0..678adf8 100644
--- a/fs_mgr/libsnapshot/Android.bp
+++ b/fs_mgr/libsnapshot/Android.bp
@@ -549,6 +549,7 @@
     ],
     srcs: [
         "cow_snapuserd_test.cpp",
+        "snapuserd.cpp",
     ],
     cflags: [
         "-Wall",
diff --git a/fs_mgr/libsnapshot/cow_reader.cpp b/fs_mgr/libsnapshot/cow_reader.cpp
index c15a05b..75c05d1 100644
--- a/fs_mgr/libsnapshot/cow_reader.cpp
+++ b/fs_mgr/libsnapshot/cow_reader.cpp
@@ -181,6 +181,7 @@
         ops_buffer->resize(current_op_num);
     }
 
+    LOG(DEBUG) << "COW file read complete. Total ops: " << ops_buffer->size();
     // To successfully parse a COW file, we need either:
     //  (1) a label to read up to, and for that label to be found, or
     //  (2) a valid footer.
@@ -298,10 +299,9 @@
     // are contiguous. These are monotonically increasing numbers.
     //
     // When both (1) and (2) are true, kernel will batch merge the operations.
-    // However, we do not want copy operations to be batch merged as
-    // a crash or system reboot during an overlapping copy can drive the device
-    // to a corrupted state. Hence, merging of copy operations should always be
-    // done as a individual 4k block. In the above case, since the
+    // In the above case, we have to ensure that the copy operations
+    // are merged first before replace operations are done. Hence,
+    // we will not change the order of copy operations. Since,
     // cow_op->new_block numbers are contiguous, we will ensure that the
     // cow block numbers assigned in ReadMetadata() for these respective copy
     // operations are not contiguous forcing kernel to issue merge for each
@@ -328,10 +328,8 @@
     //
     // Merge sequence will look like:
     //
-    // Merge-1 - Copy-op-1
-    // Merge-2 - Copy-op-2
-    // Merge-3 - Copy-op-3
-    // Merge-4 - Batch-merge {Replace-op-7, Replace-op-6, Zero-op-8,
+    // Merge-1 - Batch-merge { Copy-op-1, Copy-op-2, Copy-op-3 }
+    // Merge-2 - Batch-merge {Replace-op-7, Replace-op-6, Zero-op-8,
     //                        Replace-op-4, Zero-op-9, Replace-op-5 }
     //==============================================================
 
diff --git a/fs_mgr/libsnapshot/cow_snapuserd_test.cpp b/fs_mgr/libsnapshot/cow_snapuserd_test.cpp
index 7fa23db..045d9db 100644
--- a/fs_mgr/libsnapshot/cow_snapuserd_test.cpp
+++ b/fs_mgr/libsnapshot/cow_snapuserd_test.cpp
@@ -36,6 +36,8 @@
 #include <libsnapshot/snapuserd_client.h>
 #include <storage_literals/storage_literals.h>
 
+#include "snapuserd.h"
+
 namespace android {
 namespace snapshot {
 
@@ -119,7 +121,6 @@
     void CreateDmUserDevice();
     void StartSnapuserdDaemon();
     void CreateSnapshotDevice();
-    unique_fd CreateTempFile(const std::string& name, size_t size);
 
     unique_ptr<LoopDevice> base_loop_;
     unique_ptr<TempDevice> dmuser_dev_;
@@ -140,7 +141,24 @@
     int total_base_size_;
 };
 
-unique_fd CowSnapuserdTest::CreateTempFile(const std::string& name, size_t size) {
+class CowSnapuserdMetadataTest final {
+  public:
+    void Setup();
+    void SetupPartialArea();
+    void ValidateMetadata();
+    void ValidatePartialFilledArea();
+
+  private:
+    void InitMetadata();
+    void CreateCowDevice();
+    void CreateCowPartialFilledArea();
+
+    std::unique_ptr<Snapuserd> snapuserd_;
+    std::unique_ptr<TemporaryFile> cow_system_;
+    size_t size_ = 1_MiB;
+};
+
+static unique_fd CreateTempFile(const std::string& name, size_t size) {
     unique_fd fd(syscall(__NR_memfd_create, name.c_str(), MFD_ALLOW_SEALING));
     if (fd < 0) {
         return {};
@@ -430,25 +448,299 @@
 }
 
 void CowSnapuserdTest::MergeInterrupt() {
+    // Interrupt merge at various intervals
     StartMerge();
-    std::this_thread::sleep_for(4s);
+    std::this_thread::sleep_for(250ms);
     SimulateDaemonRestart();
 
     StartMerge();
-    std::this_thread::sleep_for(3s);
+    std::this_thread::sleep_for(250ms);
     SimulateDaemonRestart();
 
     StartMerge();
-    std::this_thread::sleep_for(3s);
+    std::this_thread::sleep_for(150ms);
     SimulateDaemonRestart();
 
     StartMerge();
-    std::this_thread::sleep_for(1s);
+    std::this_thread::sleep_for(100ms);
+    SimulateDaemonRestart();
+
+    StartMerge();
+    std::this_thread::sleep_for(800ms);
+    SimulateDaemonRestart();
+
+    StartMerge();
+    std::this_thread::sleep_for(600ms);
     SimulateDaemonRestart();
 
     ASSERT_TRUE(Merge());
 }
 
+void CowSnapuserdMetadataTest::CreateCowPartialFilledArea() {
+    std::string path = android::base::GetExecutableDirectory();
+    cow_system_ = std::make_unique<TemporaryFile>(path);
+
+    CowOptions options;
+    options.compression = "gz";
+    CowWriter writer(options);
+
+    ASSERT_TRUE(writer.Initialize(cow_system_->fd));
+
+    // Area 0 is completely filled with 256 exceptions
+    for (int i = 0; i < 256; i++) {
+        ASSERT_TRUE(writer.AddCopy(i, 256 + i));
+    }
+
+    // Area 1 is partially filled with 2 copy ops and 10 zero ops
+    ASSERT_TRUE(writer.AddCopy(500, 1000));
+    ASSERT_TRUE(writer.AddCopy(501, 1001));
+
+    ASSERT_TRUE(writer.AddZeroBlocks(300, 10));
+
+    // Flush operations
+    ASSERT_TRUE(writer.Finalize());
+}
+
+void CowSnapuserdMetadataTest::ValidatePartialFilledArea() {
+    int area_sz = snapuserd_->GetMetadataAreaSize();
+
+    ASSERT_EQ(area_sz, 2);
+
+    size_t new_chunk = 263;
+    // Verify the partially filled area
+    void* buffer = snapuserd_->GetExceptionBuffer(1);
+    loff_t offset = 0;
+    struct disk_exception* de;
+    for (int i = 0; i < 12; i++) {
+        de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+        ASSERT_EQ(de->old_chunk, i);
+        ASSERT_EQ(de->new_chunk, new_chunk);
+        offset += sizeof(struct disk_exception);
+        new_chunk += 1;
+    }
+
+    de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+    ASSERT_EQ(de->old_chunk, 0);
+    ASSERT_EQ(de->new_chunk, 0);
+}
+
+void CowSnapuserdMetadataTest::SetupPartialArea() {
+    CreateCowPartialFilledArea();
+    InitMetadata();
+}
+
+void CowSnapuserdMetadataTest::CreateCowDevice() {
+    unique_fd rnd_fd;
+    loff_t offset = 0;
+
+    std::string path = android::base::GetExecutableDirectory();
+    cow_system_ = std::make_unique<TemporaryFile>(path);
+
+    rnd_fd.reset(open("/dev/random", O_RDONLY));
+    ASSERT_TRUE(rnd_fd > 0);
+
+    std::unique_ptr<uint8_t[]> random_buffer_1_ = std::make_unique<uint8_t[]>(size_);
+
+    // Fill random data
+    for (size_t j = 0; j < (size_ / 1_MiB); j++) {
+        ASSERT_EQ(ReadFullyAtOffset(rnd_fd, (char*)random_buffer_1_.get() + offset, 1_MiB, 0),
+                  true);
+
+        offset += 1_MiB;
+    }
+
+    CowOptions options;
+    options.compression = "gz";
+    CowWriter writer(options);
+
+    ASSERT_TRUE(writer.Initialize(cow_system_->fd));
+
+    size_t num_blocks = size_ / options.block_size;
+
+    // Overlapping region. This has to be split
+    // into two batch operations
+    ASSERT_TRUE(writer.AddCopy(23, 20));
+    ASSERT_TRUE(writer.AddCopy(22, 19));
+    ASSERT_TRUE(writer.AddCopy(21, 18));
+    ASSERT_TRUE(writer.AddCopy(20, 17));
+    ASSERT_TRUE(writer.AddCopy(19, 16));
+    ASSERT_TRUE(writer.AddCopy(18, 15));
+
+    // Contiguous region but blocks in ascending order
+    // Daemon has to ensure that these blocks are merged
+    // in a batch
+    ASSERT_TRUE(writer.AddCopy(50, 75));
+    ASSERT_TRUE(writer.AddCopy(51, 76));
+    ASSERT_TRUE(writer.AddCopy(52, 77));
+    ASSERT_TRUE(writer.AddCopy(53, 78));
+
+    // Dis-contiguous region
+    ASSERT_TRUE(writer.AddCopy(110, 130));
+    ASSERT_TRUE(writer.AddCopy(105, 125));
+    ASSERT_TRUE(writer.AddCopy(100, 120));
+
+    // Overlap
+    ASSERT_TRUE(writer.AddCopy(25, 30));
+    ASSERT_TRUE(writer.AddCopy(30, 31));
+
+    size_t source_blk = num_blocks;
+
+    ASSERT_TRUE(writer.AddRawBlocks(source_blk, random_buffer_1_.get(), size_));
+
+    size_t blk_zero_copy_start = source_blk + num_blocks;
+
+    ASSERT_TRUE(writer.AddZeroBlocks(blk_zero_copy_start, num_blocks));
+
+    // Flush operations
+    ASSERT_TRUE(writer.Finalize());
+}
+
+void CowSnapuserdMetadataTest::InitMetadata() {
+    snapuserd_ = std::make_unique<Snapuserd>("", cow_system_->path, "");
+    ASSERT_TRUE(snapuserd_->InitCowDevice());
+}
+
+void CowSnapuserdMetadataTest::Setup() {
+    CreateCowDevice();
+    InitMetadata();
+}
+
+void CowSnapuserdMetadataTest::ValidateMetadata() {
+    int area_sz = snapuserd_->GetMetadataAreaSize();
+    ASSERT_EQ(area_sz, 3);
+
+    size_t old_chunk;
+    size_t new_chunk;
+
+    for (int i = 0; i < area_sz; i++) {
+        void* buffer = snapuserd_->GetExceptionBuffer(i);
+        loff_t offset = 0;
+        if (i == 0) {
+            old_chunk = 256;
+            new_chunk = 2;
+        } else if (i == 1) {
+            old_chunk = 512;
+            new_chunk = 259;
+        }
+        for (int j = 0; j < 256; j++) {
+            struct disk_exception* de =
+                    reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+
+            if (i != 2) {
+                ASSERT_EQ(de->old_chunk, old_chunk);
+                ASSERT_EQ(de->new_chunk, new_chunk);
+                old_chunk += 1;
+                new_chunk += 1;
+            } else {
+                break;
+            }
+            offset += sizeof(struct disk_exception);
+        }
+
+        if (i == 2) {
+            // The first 5 copy operation is not batch merged
+            // as the sequence is discontiguous
+            struct disk_exception* de =
+                    reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 30);
+            ASSERT_EQ(de->new_chunk, 518);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 25);
+            ASSERT_EQ(de->new_chunk, 520);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 100);
+            ASSERT_EQ(de->new_chunk, 522);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 105);
+            ASSERT_EQ(de->new_chunk, 524);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 110);
+            ASSERT_EQ(de->new_chunk, 526);
+            offset += sizeof(struct disk_exception);
+
+            // The next 4 operations are batch merged as
+            // both old and new chunk are contiguous
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 50);
+            ASSERT_EQ(de->new_chunk, 528);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 51);
+            ASSERT_EQ(de->new_chunk, 529);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 52);
+            ASSERT_EQ(de->new_chunk, 530);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 53);
+            ASSERT_EQ(de->new_chunk, 531);
+            offset += sizeof(struct disk_exception);
+
+            // This is handling overlap operation with
+            // two batch merge operations.
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 18);
+            ASSERT_EQ(de->new_chunk, 533);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 19);
+            ASSERT_EQ(de->new_chunk, 534);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 20);
+            ASSERT_EQ(de->new_chunk, 535);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 21);
+            ASSERT_EQ(de->new_chunk, 537);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 22);
+            ASSERT_EQ(de->new_chunk, 538);
+            offset += sizeof(struct disk_exception);
+
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 23);
+            ASSERT_EQ(de->new_chunk, 539);
+            offset += sizeof(struct disk_exception);
+
+            // End of metadata
+            de = reinterpret_cast<struct disk_exception*>((char*)buffer + offset);
+            ASSERT_EQ(de->old_chunk, 0);
+            ASSERT_EQ(de->new_chunk, 0);
+            offset += sizeof(struct disk_exception);
+        }
+    }
+}
+
+TEST(Snapuserd_Test, Snapshot_Metadata) {
+    CowSnapuserdMetadataTest harness;
+    harness.Setup();
+    harness.ValidateMetadata();
+}
+
+TEST(Snapuserd_Test, Snapshot_Metadata_Overlap) {
+    CowSnapuserdMetadataTest harness;
+    harness.SetupPartialArea();
+    harness.ValidatePartialFilledArea();
+}
+
 TEST(Snapuserd_Test, Snapshot_Merge_Resume) {
     CowSnapuserdTest harness;
     ASSERT_TRUE(harness.Setup());
@@ -457,7 +749,7 @@
     harness.Shutdown();
 }
 
-TEST(Snapuserd_Test, Snapshot) {
+TEST(Snapuserd_Test, Snapshot_IO_TEST) {
     CowSnapuserdTest harness;
     ASSERT_TRUE(harness.Setup());
     harness.ReadSnapshotDeviceAndValidate();
@@ -465,7 +757,6 @@
     harness.ValidateMerge();
     harness.Shutdown();
 }
-
 }  // namespace snapshot
 }  // namespace android
 
diff --git a/fs_mgr/libsnapshot/cow_writer.cpp b/fs_mgr/libsnapshot/cow_writer.cpp
index c1a5f32..81edc79 100644
--- a/fs_mgr/libsnapshot/cow_writer.cpp
+++ b/fs_mgr/libsnapshot/cow_writer.cpp
@@ -491,7 +491,7 @@
     return true;
 }
 
-bool CowWriter::CommitMerge(int merged_ops, bool sync) {
+bool CowWriter::CommitMerge(int merged_ops) {
     CHECK(merge_in_progress_);
     header_.num_merge_ops += merged_ops;
 
@@ -506,11 +506,7 @@
         return false;
     }
 
-    // Sync only for merging of copy operations.
-    if (sync) {
-        return Sync();
-    }
-    return true;
+    return Sync();
 }
 
 bool CowWriter::Truncate(off_t length) {
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
index 22ddfa6..6ffd5d8 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_writer.h
@@ -101,7 +101,7 @@
     bool InitializeAppend(android::base::borrowed_fd fd, uint64_t label);
 
     void InitializeMerge(android::base::borrowed_fd fd, CowHeader* header);
-    bool CommitMerge(int merged_ops, bool sync);
+    bool CommitMerge(int merged_ops);
 
     bool Finalize() override;
 
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/snapuserd_kernel.h b/fs_mgr/libsnapshot/include/libsnapshot/snapuserd_kernel.h
index 7941e68..2b6c8ef 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/snapuserd_kernel.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/snapuserd_kernel.h
@@ -47,8 +47,8 @@
 static constexpr uint32_t CHUNK_SIZE = 8;
 static constexpr uint32_t CHUNK_SHIFT = (__builtin_ffs(CHUNK_SIZE) - 1);
 
-static constexpr uint32_t BLOCK_SIZE = 4096;
-static constexpr uint32_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SIZE) - 1);
+static constexpr uint32_t BLOCK_SZ = 4096;
+static constexpr uint32_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1);
 
 #define DIV_ROUND_UP(n, d) (((n) + (d)-1) / (d))
 
diff --git a/fs_mgr/libsnapshot/snapshot.cpp b/fs_mgr/libsnapshot/snapshot.cpp
index 3485474..eb3a501 100644
--- a/fs_mgr/libsnapshot/snapshot.cpp
+++ b/fs_mgr/libsnapshot/snapshot.cpp
@@ -1439,6 +1439,7 @@
         // Wait for ueventd to acknowledge and create the control device node.
         std::string control_device = "/dev/dm-user/" + misc_name;
         if (!WaitForDevice(control_device, 10s)) {
+            LOG(ERROR) << "dm-user control device no found:  " << misc_name;
             continue;
         }
 
diff --git a/fs_mgr/libsnapshot/snapuserd.cpp b/fs_mgr/libsnapshot/snapuserd.cpp
index 057acad..0a02bd0 100644
--- a/fs_mgr/libsnapshot/snapuserd.cpp
+++ b/fs_mgr/libsnapshot/snapuserd.cpp
@@ -17,6 +17,8 @@
 #include "snapuserd.h"
 
 #include <csignal>
+#include <optional>
+#include <set>
 
 #include <libsnapshot/snapuserd_client.h>
 
@@ -32,7 +34,7 @@
 
 static constexpr size_t PAYLOAD_SIZE = (1UL << 20);
 
-static_assert(PAYLOAD_SIZE >= BLOCK_SIZE);
+static_assert(PAYLOAD_SIZE >= BLOCK_SZ);
 
 void BufferSink::Initialize(size_t size) {
     buffer_size_ = size;
@@ -78,10 +80,10 @@
 // request will always be 4k. After constructing
 // the header, zero out the remaining block.
 void Snapuserd::ConstructKernelCowHeader() {
-    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SIZE);
+    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
     CHECK(buffer != nullptr);
 
-    memset(buffer, 0, BLOCK_SIZE);
+    memset(buffer, 0, BLOCK_SZ);
 
     struct disk_header* dh = reinterpret_cast<struct disk_header*>(buffer);
 
@@ -106,13 +108,13 @@
 // Start the copy operation. This will read the backing
 // block device which is represented by cow_op->source.
 bool Snapuserd::ProcessCopyOp(const CowOperation* cow_op) {
-    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SIZE);
+    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
     CHECK(buffer != nullptr);
 
     // Issue a single 4K IO. However, this can be optimized
     // if the successive blocks are contiguous.
-    if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SIZE,
-                                          cow_op->source * BLOCK_SIZE)) {
+    if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ,
+                                          cow_op->source * BLOCK_SZ)) {
         SNAP_PLOG(ERROR) << "Copy-op failed. Read from backing store: " << backing_store_device_
                          << "at block :" << cow_op->source;
         return false;
@@ -123,10 +125,10 @@
 
 bool Snapuserd::ProcessZeroOp() {
     // Zero out the entire block
-    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SIZE);
+    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
     CHECK(buffer != nullptr);
 
-    memset(buffer, 0, BLOCK_SIZE);
+    memset(buffer, 0, BLOCK_SZ);
     return true;
 }
 
@@ -173,11 +175,11 @@
         struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
 
         memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
-                (BLOCK_SIZE - skip_sector_size));
+                (BLOCK_SZ - skip_sector_size));
     }
 
     bufsink_.ResetBufferOffset();
-    return std::min(size, (BLOCK_SIZE - skip_sector_size));
+    return std::min(size, (BLOCK_SZ - skip_sector_size));
 }
 
 /*
@@ -234,7 +236,7 @@
         return ReadUnalignedSector(sector, size, it);
     }
 
-    int num_ops = DIV_ROUND_UP(size, BLOCK_SIZE);
+    int num_ops = DIV_ROUND_UP(size, BLOCK_SZ);
     while (num_ops) {
         if (!ProcessCowOp(it->second)) {
             return -1;
@@ -242,7 +244,7 @@
         num_ops -= 1;
         it++;
         // Update the buffer offset
-        bufsink_.UpdateBufferOffset(BLOCK_SIZE);
+        bufsink_.UpdateBufferOffset(BLOCK_SZ);
 
         SNAP_LOG(DEBUG) << "ReadData at sector: " << sector << " size: " << size;
     }
@@ -344,7 +346,7 @@
 }
 
 int Snapuserd::GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
-                                    int unmerged_exceptions, bool* copy_op) {
+                                    int unmerged_exceptions) {
     int merged_ops_cur_iter = 0;
 
     // Find the operations which are merged in this cycle.
@@ -362,10 +364,8 @@
             offset += sizeof(struct disk_exception);
             const CowOperation* cow_op = chunk_map_[ChunkToSector(cow_de->new_chunk)];
             CHECK(cow_op != nullptr);
+
             CHECK(cow_op->new_block == cow_de->old_chunk);
-            if (cow_op->type == kCowCopyOp) {
-                *copy_op = true;
-            }
             // zero out to indicate that operation is merged.
             cow_de->old_chunk = 0;
             cow_de->new_chunk = 0;
@@ -389,10 +389,6 @@
             return -1;
         }
     }
-
-    if (*copy_op) {
-        CHECK(merged_ops_cur_iter == 1);
-    }
     return merged_ops_cur_iter;
 }
 
@@ -414,42 +410,15 @@
     int unmerged_exceptions = 0;
     loff_t offset = GetMergeStartOffset(buffer, vec_[divresult.quot].get(), &unmerged_exceptions);
 
-    bool copy_op = false;
-    // Check if the merged operation is a copy operation. If so, then we need
-    // to explicitly sync the metadata before initiating the next merge.
-    // For ex: Consider a following sequence of copy operations in the COW file:
-    //
-    // Op-1: Copy 2 -> 3
-    // Op-2: Copy 1 -> 2
-    // Op-3: Copy 5 -> 10
-    //
-    // Op-1 and Op-2 are overlapping copy operations. The merge sequence will
-    // look like:
-    //
-    // Merge op-1: Copy 2 -> 3
-    // Merge op-2: Copy 1 -> 2
-    // Merge op-3: Copy 5 -> 10
-    //
-    // Now, let's say we have a crash _after_ Merge op-2; Block 2 contents would
-    // have been over-written by Block-1 after merge op-2. During next reboot,
-    // kernel will request the metadata for all the un-merged blocks. If we had
-    // not sync the metadata after Merge-op 1 and Merge op-2, snapuser daemon
-    // will think that these merge operations are still pending and hence will
-    // inform the kernel that Op-1 and Op-2 are un-merged blocks. When kernel
-    // resumes back the merging process, it will attempt to redo the Merge op-1
-    // once again. However, block 2 contents are wrong as it has the contents
-    // of block 1 from previous merge cycle. Although, merge will silently succeed,
-    // this will lead to silent data corruption.
-    //
-    int merged_ops_cur_iter = GetNumberOfMergedOps(buffer, vec_[divresult.quot].get(), offset,
-                                                   unmerged_exceptions, &copy_op);
+    int merged_ops_cur_iter =
+            GetNumberOfMergedOps(buffer, vec_[divresult.quot].get(), offset, unmerged_exceptions);
 
     // There should be at least one operation merged in this cycle
     CHECK(merged_ops_cur_iter > 0);
 
     header.num_merge_ops += merged_ops_cur_iter;
     reader_->UpdateMergeProgress(merged_ops_cur_iter);
-    if (!writer_->CommitMerge(merged_ops_cur_iter, copy_op)) {
+    if (!writer_->CommitMerge(merged_ops_cur_iter)) {
         SNAP_LOG(ERROR) << "CommitMerge failed... merged_ops_cur_iter: " << merged_ops_cur_iter;
         return false;
     }
@@ -506,12 +475,13 @@
  *      during merge; specifically when the merge operation has dependency.
  *      These dependencies can only happen during copy operations.
  *
- *      To avoid this problem, we make sure that no two copy-operations
- *      do not have contiguous chunk IDs. Additionally, we make sure
- *      that each copy operation is merged individually.
+ *      To avoid this problem, we make sure overlap copy operations
+ *      are not batch merged.
  * 6: Use a monotonically increasing chunk number to assign the
  *    new_chunk
- * 7: Each chunk-id represents either a: Metadata page or b: Data page
+ * 7: Each chunk-id represents either
+ *        a: Metadata page or
+ *        b: Data page
  * 8: Chunk-id representing a data page is stored in a map.
  * 9: Chunk-id representing a metadata page is converted into a vector
  *    index. We store this in vector as kernel requests metadata during
@@ -531,8 +501,8 @@
     reader_ = std::make_unique<CowReader>();
     CowHeader header;
     CowOptions options;
-    bool prev_copy_op = false;
     bool metadata_found = false;
+    int replace_ops = 0, zero_ops = 0, copy_ops = 0;
 
     SNAP_LOG(DEBUG) << "ReadMetadata: Parsing cow file";
 
@@ -546,10 +516,10 @@
         return false;
     }
 
-    CHECK(header.block_size == BLOCK_SIZE);
+    CHECK(header.block_size == BLOCK_SZ);
 
-    SNAP_LOG(DEBUG) << "Merge-ops: " << header.num_merge_ops;
     reader_->InitializeMerge();
+    SNAP_LOG(DEBUG) << "Merge-ops: " << header.num_merge_ops;
 
     writer_ = std::make_unique<CowWriter>(options);
     writer_->InitializeMerge(cow_fd_.get(), &header);
@@ -584,17 +554,26 @@
         }
 
         metadata_found = true;
-        if ((cow_op->type == kCowCopyOp || prev_copy_op)) {
+        // This loop will handle all the replace and zero ops.
+        // We will handle the copy ops later as it requires special
+        // handling of assigning chunk-id's. Furthermore, we make
+        // sure that replace/zero and copy ops are not batch merged; hence,
+        // the bump in the chunk_id before break of this loop
+        if (cow_op->type == kCowCopyOp) {
             data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
+            break;
         }
 
-        prev_copy_op = (cow_op->type == kCowCopyOp);
+        if (cow_op->type == kCowReplaceOp) {
+            replace_ops++;
+        } else if (cow_op->type == kCowZeroOp) {
+            zero_ops++;
+        }
 
         // Construct the disk-exception
         de->old_chunk = cow_op->new_block;
         de->new_chunk = data_chunk_id;
 
-        SNAP_LOG(DEBUG) << "Old-chunk: " << de->old_chunk << "New-chunk: " << de->new_chunk;
 
         // Store operation pointer.
         chunk_map_[ChunkToSector(data_chunk_id)] = cow_op;
@@ -602,6 +581,9 @@
         offset += sizeof(struct disk_exception);
         cowop_riter_->Next();
 
+        SNAP_LOG(DEBUG) << num_ops << ":"
+                        << " Old-chunk: " << de->old_chunk << " New-chunk: " << de->new_chunk;
+
         if (num_ops == exceptions_per_area_) {
             // Store it in vector at the right index. This maps the chunk-id to
             // vector index.
@@ -616,13 +598,213 @@
 
             if (cowop_riter_->Done()) {
                 vec_.push_back(std::move(de_ptr));
-                SNAP_LOG(DEBUG) << "ReadMetadata() completed; Number of Areas: " << vec_.size();
             }
         }
 
         data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
     }
 
+    std::optional<chunk_t> prev_id = {};
+    std::map<uint64_t, const CowOperation*> map;
+    std::set<uint64_t> dest_blocks;
+    size_t pending_copy_ops = exceptions_per_area_ - num_ops;
+    SNAP_LOG(INFO) << " Processing copy-ops at Area: " << vec_.size()
+                   << " Number of replace/zero ops completed in this area: " << num_ops
+                   << " Pending copy ops for this area: " << pending_copy_ops;
+    while (!cowop_riter_->Done()) {
+        do {
+            const CowOperation* cow_op = &cowop_riter_->Get();
+            if (IsMetadataOp(*cow_op)) {
+                cowop_riter_->Next();
+                continue;
+            }
+
+            // We have two cases specific cases:
+            //
+            // =====================================================
+            // Case 1: Overlapping copy regions
+            //
+            // Ex:
+            //
+            // Source -> Destination
+            //
+            // 1: 15 -> 18
+            // 2: 16 -> 19
+            // 3: 17 -> 20
+            // 4: 18 -> 21
+            // 5: 19 -> 22
+            // 6: 20 -> 23
+            //
+            // We have 6 copy operations to be executed in OTA and there is a overlap. Update-engine
+            // will write to COW file as follows:
+            //
+            // Op-1: 20 -> 23
+            // Op-2: 19 -> 22
+            // Op-3: 18 -> 21
+            // Op-4: 17 -> 20
+            // Op-5: 16 -> 19
+            // Op-6: 15 -> 18
+            //
+            // Note that the blocks numbers are contiguous. Hence, all 6 copy
+            // operations can potentially be batch merged. However, that will be
+            // problematic if we have a crash as block 20, 19, 18 would have
+            // been overwritten and hence subsequent recovery may end up with
+            // a silent data corruption when op-1, op-2 and op-3 are
+            // re-executed.
+            //
+            // We will split these 6 operations into two batches viz:
+            //
+            // Batch-1:
+            // ===================
+            // Op-1: 20 -> 23
+            // Op-2: 19 -> 22
+            // Op-3: 18 -> 21
+            // ===================
+            //
+            // Batch-2:
+            // ==================
+            // Op-4: 17 -> 20
+            // Op-5: 16 -> 19
+            // Op-6: 15 -> 18
+            // ==================
+            //
+            // Now, merge sequence will look like:
+            //
+            // 1: Merge Batch-1 { op-1, op-2, op-3 }
+            // 2: Update Metadata in COW File that op-1, op-2, op-3 merge is
+            // done.
+            // 3: Merge Batch-2
+            // 4: Update Metadata in COW File that op-4, op-5, op-6 merge is
+            // done.
+            //
+            // Note, that the order of block operations are still the same.
+            // However, we have two batch merge operations. Any crash between
+            // either of this sequence should be safe as each of these
+            // batches are self-contained.
+            //
+            //===========================================================
+            //
+            // Case 2:
+            //
+            // Let's say we have three copy operations written to COW file
+            // in the following order:
+            //
+            // op-1: 15 -> 18
+            // op-2: 16 -> 19
+            // op-3: 17 -> 20
+            //
+            // As aforementioned, kernel will initiate merge in reverse order.
+            // Hence, we will read these ops in reverse order so that all these
+            // ops are exectued in the same order as requested. Thus, we will
+            // read the metadata in reverse order and for the kernel it will
+            // look like:
+            //
+            // op-3: 17 -> 20
+            // op-2: 16 -> 19
+            // op-1: 15 -> 18   <-- Merge starts here in the kernel
+            //
+            // Now, this is problematic as kernel cannot batch merge them.
+            //
+            // Merge sequence will look like:
+            //
+            // Merge-1: op-1: 15 -> 18
+            // Merge-2: op-2: 16 -> 19
+            // Merge-3: op-3: 17 -> 20
+            //
+            // We have three merge operations.
+            //
+            // Even though the blocks are contiguous, kernel can batch merge
+            // them if the blocks are in descending order. Update engine
+            // addresses this issue partially for overlapping operations as
+            // we see that op-1 to op-3 and op-4 to op-6 operatiosn are in
+            // descending order. However, if the copy operations are not
+            // overlapping, update engine cannot write these blocks
+            // in descending order. Hence, we will try to address it.
+            // Thus, we will send these blocks to the kernel and it will
+            // look like:
+            //
+            // op-3: 15 -> 18
+            // op-2: 16 -> 19
+            // op-1: 17 -> 20  <-- Merge starts here in the kernel
+            //
+            // Now with this change, we can batch merge all these three
+            // operations. Merge sequence will look like:
+            //
+            // Merge-1: {op-1: 17 -> 20, op-2: 16 -> 19, op-3: 15 -> 18}
+            //
+            // Note that we have changed the ordering of merge; However, this
+            // is ok as each of these copy operations are independent and there
+            // is no overlap.
+            //
+            //===================================================================
+            if (prev_id.has_value()) {
+                chunk_t diff = (cow_op->new_block > prev_id.value())
+                                       ? (cow_op->new_block - prev_id.value())
+                                       : (prev_id.value() - cow_op->new_block);
+                if (diff != 1) {
+                    break;
+                }
+                if (dest_blocks.count(cow_op->new_block) || map.count(cow_op->source) > 0) {
+                    break;
+                }
+            }
+            metadata_found = true;
+            pending_copy_ops -= 1;
+            map[cow_op->new_block] = cow_op;
+            dest_blocks.insert(cow_op->source);
+            prev_id = cow_op->new_block;
+            cowop_riter_->Next();
+        } while (!cowop_riter_->Done() && pending_copy_ops);
+
+        data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
+        SNAP_LOG(DEBUG) << "Batch Merge copy-ops of size: " << map.size()
+                        << " Area: " << vec_.size() << " Area offset: " << offset
+                        << " Pending-copy-ops in this area: " << pending_copy_ops;
+
+        for (auto it = map.begin(); it != map.end(); it++) {
+            struct disk_exception* de =
+                    reinterpret_cast<struct disk_exception*>((char*)de_ptr.get() + offset);
+            de->old_chunk = it->first;
+            de->new_chunk = data_chunk_id;
+
+            // Store operation pointer.
+            chunk_map_[ChunkToSector(data_chunk_id)] = it->second;
+            offset += sizeof(struct disk_exception);
+            num_ops += 1;
+            copy_ops++;
+
+            SNAP_LOG(DEBUG) << num_ops << ":"
+                            << " Copy-op: "
+                            << " Old-chunk: " << de->old_chunk << " New-chunk: " << de->new_chunk;
+
+            if (num_ops == exceptions_per_area_) {
+                // Store it in vector at the right index. This maps the chunk-id to
+                // vector index.
+                vec_.push_back(std::move(de_ptr));
+                num_ops = 0;
+                offset = 0;
+
+                // Create buffer for next area
+                de_ptr = std::make_unique<uint8_t[]>(exceptions_per_area_ *
+                                                     sizeof(struct disk_exception));
+                memset(de_ptr.get(), 0, (exceptions_per_area_ * sizeof(struct disk_exception)));
+
+                if (cowop_riter_->Done()) {
+                    vec_.push_back(std::move(de_ptr));
+                    SNAP_LOG(DEBUG) << "ReadMetadata() completed; Number of Areas: " << vec_.size();
+                }
+
+                CHECK(pending_copy_ops == 0);
+                pending_copy_ops = exceptions_per_area_;
+            }
+
+            data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
+        }
+        map.clear();
+        dest_blocks.clear();
+        prev_id.reset();
+    }
+
     // Partially filled area or there is no metadata
     // If there is no metadata, fill with zero so that kernel
     // is aware that merge is completed.
@@ -632,8 +814,11 @@
                         << "Areas : " << vec_.size();
     }
 
-    SNAP_LOG(DEBUG) << "ReadMetadata() completed. Final_chunk_id: " << data_chunk_id
-                    << "Num Sector: " << ChunkToSector(data_chunk_id);
+    SNAP_LOG(INFO) << "ReadMetadata completed. Final-chunk-id: " << data_chunk_id
+                   << " Num Sector: " << ChunkToSector(data_chunk_id)
+                   << " Replace-ops: " << replace_ops << " Zero-ops: " << zero_ops
+                   << " Copy-ops: " << copy_ops << " Areas: " << vec_.size()
+                   << " Num-ops-merged: " << header.num_merge_ops;
 
     // Total number of sectors required for creating dm-user device
     num_sectors_ = ChunkToSector(data_chunk_id);
@@ -742,7 +927,7 @@
 
     size_t remaining_size = header->len;
     size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
-    CHECK(read_size == BLOCK_SIZE);
+    CHECK(read_size == BLOCK_SZ);
 
     CHECK(header->sector > 0);
     chunk_t chunk = SectorToChunk(header->sector);
@@ -793,11 +978,11 @@
         // will always be a single 4k.
         if (header->sector == 0) {
             CHECK(metadata_read_done_ == true);
-            CHECK(read_size == BLOCK_SIZE);
+            CHECK(read_size == BLOCK_SZ);
             ConstructKernelCowHeader();
             SNAP_LOG(DEBUG) << "Kernel header constructed";
         } else {
-            if (!offset && (read_size == BLOCK_SIZE) &&
+            if (!offset && (read_size == BLOCK_SZ) &&
                 chunk_map_.find(header->sector) == chunk_map_.end()) {
                 if (!ReadDiskExceptions(chunk, read_size)) {
                     SNAP_LOG(ERROR) << "ReadDiskExceptions failed for chunk id: " << chunk
diff --git a/fs_mgr/libsnapshot/snapuserd.h b/fs_mgr/libsnapshot/snapuserd.h
index 4587881..dba3186 100644
--- a/fs_mgr/libsnapshot/snapuserd.h
+++ b/fs_mgr/libsnapshot/snapuserd.h
@@ -75,6 +75,8 @@
         cow_fd_ = {};
         backing_store_fd_ = {};
     }
+    size_t GetMetadataAreaSize() { return vec_.size(); }
+    void* GetExceptionBuffer(size_t i) { return vec_[i].get(); }
 
   private:
     bool DmuserReadRequest();
@@ -101,11 +103,11 @@
     loff_t GetMergeStartOffset(void* merged_buffer, void* unmerged_buffer,
                                int* unmerged_exceptions);
     int GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
-                             int unmerged_exceptions, bool* copy_op);
+                             int unmerged_exceptions);
     bool ProcessMergeComplete(chunk_t chunk, void* buffer);
     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
     chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
-    bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SIZE - 1)) == 0); }
+    bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
 
     std::string cow_device_;
     std::string backing_store_device_;