Merge changes I25fb5fce,I86cffff6,I031eb1a1,Ie40633c0,I08562b89

* changes:
  snapuserd: Sort REPLACE ops for batch merge
  snapuserd: API to query snapshot and merge status
  snapuserd: Wire up API's for Initiating and tracking Merge
  snapuserd: I/O requests which are not block aligned.
  snapuserd: Service I/O requests from dm-user
diff --git a/fs_mgr/libsnapshot/cow_reader.cpp b/fs_mgr/libsnapshot/cow_reader.cpp
index 5306b28..20030b9 100644
--- a/fs_mgr/libsnapshot/cow_reader.cpp
+++ b/fs_mgr/libsnapshot/cow_reader.cpp
@@ -34,11 +34,12 @@
 namespace android {
 namespace snapshot {
 
-CowReader::CowReader()
+CowReader::CowReader(ReaderFlags reader_flag)
     : fd_(-1),
       header_(),
       fd_size_(0),
-      merge_op_blocks_(std::make_shared<std::vector<uint32_t>>()) {}
+      merge_op_blocks_(std::make_shared<std::vector<uint32_t>>()),
+      reader_flag_(reader_flag) {}
 
 static void SHA256(const void*, size_t, uint8_t[]) {
 #if 0
@@ -415,7 +416,7 @@
 //==============================================================
 bool CowReader::PrepMergeOps() {
     auto merge_op_blocks = std::make_shared<std::vector<uint32_t>>();
-    std::set<int, std::greater<int>> other_ops;
+    std::vector<int> other_ops;
     auto seq_ops_set = std::unordered_set<uint32_t>();
     auto block_map = std::make_shared<std::unordered_map<uint32_t, int>>();
     size_t num_seqs = 0;
@@ -446,7 +447,7 @@
         if (!has_seq_ops_ && IsOrderedOp(current_op)) {
             merge_op_blocks->emplace_back(current_op.new_block);
         } else if (seq_ops_set.count(current_op.new_block) == 0) {
-            other_ops.insert(current_op.new_block);
+            other_ops.push_back(current_op.new_block);
         }
         block_map->insert({current_op.new_block, i});
     }
@@ -462,6 +463,18 @@
     } else {
         num_ordered_ops_to_merge_ = 0;
     }
+
+    // Sort the vector in increasing order if merging in user-space as
+    // we can batch merge them when iterating from forward.
+    //
+    // dm-snapshot-merge requires decreasing order as we iterate the blocks
+    // in reverse order.
+    if (reader_flag_ == ReaderFlags::USERSPACE_MERGE) {
+        std::sort(other_ops.begin(), other_ops.end());
+    } else {
+        std::sort(other_ops.begin(), other_ops.end(), std::greater<int>());
+    }
+
     merge_op_blocks->reserve(merge_op_blocks->size() + other_ops.size());
     for (auto block : other_ops) {
         merge_op_blocks->emplace_back(block);
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
index c15682a..9f4ddbb 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_format.h
@@ -26,8 +26,8 @@
 
 static constexpr uint32_t kCowVersionManifest = 2;
 
-static constexpr uint32_t BLOCK_SZ = 4096;
-static constexpr uint32_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1);
+static constexpr size_t BLOCK_SZ = 4096;
+static constexpr size_t BLOCK_SHIFT = (__builtin_ffs(BLOCK_SZ) - 1);
 
 // This header appears as the first sequence of bytes in the COW. All fields
 // in the layout are little-endian encoded. The on-disk layout is:
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
index 63a9e68..d5b4335 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
@@ -104,7 +104,12 @@
 
 class CowReader final : public ICowReader {
   public:
-    CowReader();
+    enum class ReaderFlags {
+        DEFAULT = 0,
+        USERSPACE_MERGE = 1,
+    };
+
+    CowReader(ReaderFlags reader_flag = ReaderFlags::DEFAULT);
     ~CowReader() { owned_fd_ = {}; }
 
     // Parse the COW, optionally, up to the given label. If no label is
@@ -166,6 +171,7 @@
     uint64_t num_ordered_ops_to_merge_;
     bool has_seq_ops_;
     std::shared_ptr<std::unordered_map<uint64_t, uint64_t>> data_loc_;
+    ReaderFlags reader_flag_;
 };
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index 837f33a..c9b0512 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -56,7 +56,7 @@
         "fs_mgr_defaults",
     ],
     srcs: [
-        "snapuserd_server.cpp",
+        "dm-snapshot-merge/snapuserd_server.cpp",
         "dm-snapshot-merge/snapuserd.cpp",
         "dm-snapshot-merge/snapuserd_worker.cpp",
         "dm-snapshot-merge/snapuserd_readahead.cpp",
@@ -67,6 +67,7 @@
 	"user-space-merge/snapuserd_merge.cpp",
 	"user-space-merge/snapuserd_readahead.cpp",
 	"user-space-merge/snapuserd_transitions.cpp",
+        "user-space-merge/snapuserd_server.cpp",
     ],
 
     cflags: [
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp
similarity index 99%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp
index 91b4190..9ddc963 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.cpp
@@ -31,6 +31,7 @@
 #include <android-base/scopeguard.h>
 #include <fs_mgr/file_wait.h>
 #include <snapuserd/snapuserd_client.h>
+
 #include "snapuserd_server.h"
 
 #define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h
similarity index 98%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h
index 14e5de6..3b6ff15 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_server.h
@@ -28,7 +28,7 @@
 #include <vector>
 
 #include <android-base/unique_fd.h>
-#include "dm-snapshot-merge/snapuserd.h"
+#include "snapuserd.h"
 
 namespace android {
 namespace snapshot {
diff --git a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h
index aeecf41..6ed55af 100644
--- a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h
+++ b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_client.h
@@ -79,6 +79,15 @@
 
     // Returns true if the snapuserd instance supports bridging a socket to second-stage init.
     bool SupportsSecondStageSocketHandoff();
+
+    // Returns true if the merge is started(or resumed from crash).
+    bool InitiateMerge(const std::string& misc_name);
+
+    // Returns Merge completion percentage
+    double GetMergePercent();
+
+    // Return the status of the snapshot
+    std::string QuerySnapshotStatus(const std::string& misc_name);
 };
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp
index 1ea05a3..e345269 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_client.cpp
@@ -231,5 +231,35 @@
     return true;
 }
 
+bool SnapuserdClient::InitiateMerge(const std::string& misc_name) {
+    std::string msg = "initiate_merge," + misc_name;
+    if (!Sendmsg(msg)) {
+        LOG(ERROR) << "Failed to send message " << msg << " to snapuserd";
+        return false;
+    }
+    std::string response = Receivemsg();
+    return response == "success";
+}
+
+double SnapuserdClient::GetMergePercent() {
+    std::string msg = "merge_percent";
+    if (!Sendmsg(msg)) {
+        LOG(ERROR) << "Failed to send message " << msg << " to snapuserd";
+        return false;
+    }
+    std::string response = Receivemsg();
+
+    return std::stod(response);
+}
+
+std::string SnapuserdClient::QuerySnapshotStatus(const std::string& misc_name) {
+    std::string msg = "getstatus," + misc_name;
+    if (!Sendmsg(msg)) {
+        LOG(ERROR) << "Failed to send message " << msg << " to snapuserd";
+        return "snapshot-merge-failed";
+    }
+    return Receivemsg();
+}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
index e05822e..912884f 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
@@ -21,8 +21,6 @@
 #include <gflags/gflags.h>
 #include <snapuserd/snapuserd_client.h>
 
-#include "snapuserd_server.h"
-
 DEFINE_string(socket, android::snapshot::kSnapuserdSocket, "Named socket or socket path.");
 DEFINE_bool(no_socket, false,
             "If true, no socket is used. Each additional argument is an INIT message.");
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h
index b660ba2..fbf57d9 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.h
@@ -19,7 +19,7 @@
 #include <string>
 #include <vector>
 
-#include "snapuserd_server.h"
+#include "dm-snapshot-merge/snapuserd_server.h"
 
 namespace android {
 namespace snapshot {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index a2538d2..57e47e7 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -59,6 +59,15 @@
     return reader_->CloneCowReader();
 }
 
+void SnapshotHandler::UpdateMergeCompletionPercentage() {
+    struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
+    merge_completion_percentage_ = (ch->num_merge_ops * 100.0) / reader_->get_num_total_data_ops();
+
+    SNAP_LOG(DEBUG) << "Merge-complete %: " << merge_completion_percentage_
+                    << " num_merge_ops: " << ch->num_merge_ops
+                    << " total-ops: " << reader_->get_num_total_data_ops();
+}
+
 bool SnapshotHandler::CommitMerge(int num_merge_ops) {
     struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
     ch->num_merge_ops += num_merge_ops;
@@ -95,6 +104,12 @@
         }
     }
 
+    // Update the merge completion - this is used by update engine
+    // to track the completion. No need to take a lock. It is ok
+    // even if there is a miss on reading a latest updated value.
+    // Subsequent polling will eventually converge to completion.
+    UpdateMergeCompletionPercentage();
+
     return true;
 }
 
@@ -124,7 +139,7 @@
 }
 
 bool SnapshotHandler::ReadMetadata() {
-    reader_ = std::make_unique<CowReader>();
+    reader_ = std::make_unique<CowReader>(CowReader::ReaderFlags::USERSPACE_MERGE);
     CowHeader header;
     CowOptions options;
 
@@ -152,16 +167,48 @@
         return false;
     }
 
+    UpdateMergeCompletionPercentage();
+
     // Initialize the iterator for reading metadata
     std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
 
+    int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
+    int ra_index = 0;
+
+    size_t copy_ops = 0, replace_ops = 0, zero_ops = 0, xor_ops = 0;
+
     while (!cowop_iter->Done()) {
         const CowOperation* cow_op = &cowop_iter->Get();
 
+        if (cow_op->type == kCowCopyOp) {
+            copy_ops += 1;
+        } else if (cow_op->type == kCowReplaceOp) {
+            replace_ops += 1;
+        } else if (cow_op->type == kCowZeroOp) {
+            zero_ops += 1;
+        } else if (cow_op->type == kCowXorOp) {
+            xor_ops += 1;
+        }
+
         chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op));
 
-        if (!ra_thread_ && IsOrderedOp(*cow_op)) {
+        if (IsOrderedOp(*cow_op)) {
             ra_thread_ = true;
+            block_to_ra_index_[cow_op->new_block] = ra_index;
+            num_ra_ops_per_iter -= 1;
+
+            if ((ra_index + 1) - merge_blk_state_.size() == 1) {
+                std::unique_ptr<MergeGroupState> blk_state = std::make_unique<MergeGroupState>(
+                        MERGE_GROUP_STATE::GROUP_MERGE_PENDING, 0);
+
+                merge_blk_state_.push_back(std::move(blk_state));
+            }
+
+            // Move to next RA block
+            if (num_ra_ops_per_iter == 0) {
+                num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
+                ra_index += 1;
+            }
         }
         cowop_iter->Next();
     }
@@ -173,6 +220,12 @@
 
     PrepareReadAhead();
 
+    SNAP_LOG(INFO) << "Merged-ops: " << header.num_merge_ops
+                   << " Total-data-ops: " << reader_->get_num_total_data_ops()
+                   << " Unmerged-ops: " << chunk_vec_.size() << " Copy-ops: " << copy_ops
+                   << " Zero-ops: " << zero_ops << " Replace-ops: " << replace_ops
+                   << " Xor-ops: " << xor_ops;
+
     return true;
 }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index c171eda..13b56fa 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -67,6 +67,27 @@
 
 class SnapshotHandler;
 
+enum class MERGE_GROUP_STATE {
+    GROUP_MERGE_PENDING,
+    GROUP_MERGE_RA_READY,
+    GROUP_MERGE_IN_PROGRESS,
+    GROUP_MERGE_COMPLETED,
+    GROUP_MERGE_FAILED,
+    GROUP_INVALID,
+};
+
+struct MergeGroupState {
+    MERGE_GROUP_STATE merge_state_;
+    // Ref count I/O when group state
+    // is in "GROUP_MERGE_PENDING"
+    size_t num_ios_in_progress;
+    std::mutex m_lock;
+    std::condition_variable m_cv;
+
+    MergeGroupState(MERGE_GROUP_STATE state, size_t n_ios)
+        : merge_state_(state), num_ios_in_progress(n_ios) {}
+};
+
 class ReadAhead {
   public:
     ReadAhead(const std::string& cow_device, const std::string& backing_device,
@@ -133,16 +154,33 @@
         base_path_merge_fd_ = {};
     }
 
+    // Functions interacting with dm-user
+    bool ReadDmUserHeader();
+    bool WriteDmUserPayload(size_t size, bool header_response);
+    bool DmuserReadRequest();
+
     // IO Path
     bool ProcessIORequest();
+    bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
+
+    bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
+    bool ReadFromSourceDevice(const CowOperation* cow_op);
+
+    bool ReadAlignedSector(sector_t sector, size_t sz, bool header_response);
+    bool ReadUnalignedSector(sector_t sector, size_t size);
+    int ReadUnalignedSector(sector_t sector, size_t size,
+                            std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
+    bool RespondIOError(bool header_response);
 
     // Processing COW operations
+    bool ProcessCowOp(const CowOperation* cow_op);
     bool ProcessReplaceOp(const CowOperation* cow_op);
     bool ProcessZeroOp();
 
     // Handles Copy and Xor
     bool ProcessCopyOp(const CowOperation* cow_op);
     bool ProcessXorOp(const CowOperation* cow_op);
+    bool ProcessOrderedOp(const CowOperation* cow_op);
 
     // Merge related ops
     bool Merge();
@@ -152,6 +190,9 @@
                      const std::unique_ptr<ICowOpIter>& cowop_iter,
                      std::vector<const CowOperation*>* replace_zero_vec = nullptr);
 
+    sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
+    chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
+
     std::unique_ptr<CowReader> reader_;
     BufferSink bufsink_;
     XorSink xorsink_;
@@ -210,6 +251,7 @@
     // Read-ahead related functions
     void* GetMappedAddr() { return mapped_addr_; }
     void PrepareReadAhead();
+    std::unordered_map<uint64_t, void*>& GetReadAheadMap() { return read_ahead_buffer_map_; }
 
     // State transitions for merge
     void InitiateMerge();
@@ -226,6 +268,8 @@
 
     bool ShouldReconstructDataFromCow() { return populate_data_from_cow_; }
     void FinishReconstructDataFromCow() { populate_data_from_cow_ = false; }
+    // Return the snapshot status
+    std::string GetMergeStatus();
 
     // RA related functions
     uint64_t GetBufferMetadataOffset();
@@ -238,12 +282,23 @@
     int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; }
     void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
     bool MergeInitiated() { return merge_initiated_; }
+    double GetMergePercentage() { return merge_completion_percentage_; }
+
+    // Merge Block State Transitions
+    void SetMergeCompleted(size_t block_index);
+    void SetMergeInProgress(size_t block_index);
+    void SetMergeFailed(size_t block_index);
+    void NotifyIOCompletion(uint64_t new_block);
+    bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
+    MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer);
 
   private:
     bool ReadMetadata();
     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
     chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
+    bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
     struct BufferState* GetBufferState();
+    void UpdateMergeCompletionPercentage();
 
     void ReadBlocks(const std::string partition_name, const std::string& dm_block_device);
     void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name,
@@ -261,7 +316,6 @@
 
     unique_fd cow_fd_;
 
-    // Number of sectors required when initializing dm-user
     uint64_t num_sectors_;
 
     std::unique_ptr<CowReader> reader_;
@@ -283,8 +337,16 @@
     int total_ra_blocks_merged_ = 0;
     MERGE_IO_TRANSITION io_state_;
     std::unique_ptr<ReadAhead> read_ahead_thread_;
+    std::unordered_map<uint64_t, void*> read_ahead_buffer_map_;
+
+    // user-space-merging
+    std::unordered_map<uint64_t, int> block_to_ra_index_;
+
+    // Merge Block state
+    std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
 
     std::unique_ptr<Worker> merge_thread_;
+    double merge_completion_percentage_;
 
     bool merge_initiated_ = false;
     bool attached_ = false;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
index 18c7f2c..bfbacf9 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
@@ -84,6 +84,56 @@
     return true;
 }
 
+bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
+    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+    if (buffer == nullptr) {
+        SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
+        return false;
+    }
+    SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
+                    << " Source: " << cow_op->source;
+    uint64_t offset = cow_op->source;
+    if (cow_op->type == kCowCopyOp) {
+        offset *= BLOCK_SZ;
+    }
+    if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) {
+        std::string op;
+        if (cow_op->type == kCowCopyOp)
+            op = "Copy-op";
+        else {
+            op = "Xor-op";
+        }
+        SNAP_PLOG(ERROR) << op << " failed. Read from backing store: " << backing_store_device_
+                         << "at block :" << offset / BLOCK_SZ << " offset:" << offset % BLOCK_SZ;
+        return false;
+    }
+
+    return true;
+}
+
+// Start the copy operation. This will read the backing
+// block device which is represented by cow_op->source.
+bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
+    if (!ReadFromSourceDevice(cow_op)) {
+        return false;
+    }
+
+    return true;
+}
+
+bool Worker::ProcessXorOp(const CowOperation* cow_op) {
+    if (!ReadFromSourceDevice(cow_op)) {
+        return false;
+    }
+    xorsink_.Reset();
+    if (!reader_->ReadData(*cow_op, &xorsink_)) {
+        SNAP_LOG(ERROR) << "ProcessXorOp failed for block " << cow_op->new_block;
+        return false;
+    }
+
+    return true;
+}
+
 bool Worker::ProcessZeroOp() {
     // Zero out the entire block
     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
@@ -96,12 +146,85 @@
     return true;
 }
 
-bool Worker::ProcessCopyOp(const CowOperation*) {
-    return true;
+bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
+    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+    if (buffer == nullptr) {
+        SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
+        return false;
+    }
+
+    MERGE_GROUP_STATE state = snapuserd_->ProcessMergingBlock(cow_op->new_block, buffer);
+
+    switch (state) {
+        case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: {
+            // Merge is completed for this COW op; just read directly from
+            // the base device
+            SNAP_LOG(DEBUG) << "Merge-completed: Reading from base device sector: "
+                            << (cow_op->new_block >> SECTOR_SHIFT)
+                            << " Block-number: " << cow_op->new_block;
+            if (!ReadDataFromBaseDevice(ChunkToSector(cow_op->new_block), BLOCK_SZ)) {
+                SNAP_LOG(ERROR) << "ReadDataFromBaseDevice at sector: "
+                                << (cow_op->new_block >> SECTOR_SHIFT) << " after merge-complete.";
+                return false;
+            }
+            return true;
+        }
+        case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
+            bool ret;
+            if (cow_op->type == kCowCopyOp) {
+                ret = ProcessCopyOp(cow_op);
+            } else {
+                ret = ProcessXorOp(cow_op);
+            }
+
+            // I/O is complete - decrement the refcount irrespective of the return
+            // status
+            snapuserd_->NotifyIOCompletion(cow_op->new_block);
+            return ret;
+        }
+        // We already have the data in the buffer retrieved from RA thread.
+        // Nothing to process further.
+        case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: {
+            [[fallthrough]];
+        }
+        case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: {
+            return true;
+        }
+        default: {
+            // All other states, fail the I/O viz (GROUP_MERGE_FAILED and GROUP_INVALID)
+            return false;
+        }
+    }
+
+    return false;
 }
 
-bool Worker::ProcessXorOp(const CowOperation*) {
-    return true;
+bool Worker::ProcessCowOp(const CowOperation* cow_op) {
+    if (cow_op == nullptr) {
+        SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
+        return false;
+    }
+
+    switch (cow_op->type) {
+        case kCowReplaceOp: {
+            return ProcessReplaceOp(cow_op);
+        }
+
+        case kCowZeroOp: {
+            return ProcessZeroOp();
+        }
+
+        case kCowCopyOp:
+            [[fallthrough]];
+        case kCowXorOp: {
+            return ProcessOrderedOp(cow_op);
+        }
+
+        default: {
+            SNAP_LOG(ERROR) << "Unknown operation-type found: " << cow_op->type;
+        }
+    }
+    return false;
 }
 
 void Worker::InitializeBufsink() {
@@ -129,7 +252,7 @@
 }
 
 bool Worker::RunThread() {
-    SNAP_LOG(DEBUG) << "Processing snapshot I/O requests...";
+    SNAP_LOG(INFO) << "Processing snapshot I/O requests....";
     // Start serving IO
     while (true) {
         if (!ProcessIORequest()) {
@@ -143,8 +266,378 @@
     return true;
 }
 
+// Read Header from dm-user misc device. This gives
+// us the sector number for which IO is issued by dm-snapshot device
+bool Worker::ReadDmUserHeader() {
+    if (!android::base::ReadFully(ctrl_fd_, bufsink_.GetBufPtr(), sizeof(struct dm_user_header))) {
+        if (errno != ENOTBLK) {
+            SNAP_PLOG(ERROR) << "Control-read failed";
+        }
+
+        SNAP_PLOG(DEBUG) << "ReadDmUserHeader failed....";
+        return false;
+    }
+
+    return true;
+}
+
+// Send the payload/data back to dm-user misc device.
+bool Worker::WriteDmUserPayload(size_t size, bool header_response) {
+    size_t payload_size = size;
+    void* buf = bufsink_.GetPayloadBufPtr();
+    if (header_response) {
+        payload_size += sizeof(struct dm_user_header);
+        buf = bufsink_.GetBufPtr();
+    }
+
+    if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
+        SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
+        return false;
+    }
+
+    return true;
+}
+
+bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
+    CHECK(read_size <= BLOCK_SZ);
+
+    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+    if (buffer == nullptr) {
+        SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
+        return false;
+    }
+
+    loff_t offset = sector << SECTOR_SHIFT;
+    if (!android::base::ReadFullyAtOffset(base_path_merge_fd_, buffer, read_size, offset)) {
+        SNAP_PLOG(ERROR) << "ReadDataFromBaseDevice failed. fd: " << base_path_merge_fd_
+                         << "at sector :" << sector << " size: " << read_size;
+        return false;
+    }
+
+    return true;
+}
+
+bool Worker::ReadAlignedSector(sector_t sector, size_t sz, bool header_response) {
+    struct dm_user_header* header = bufsink_.GetHeaderPtr();
+    size_t remaining_size = sz;
+    std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
+    bool io_error = false;
+    int ret = 0;
+
+    do {
+        // Process 1MB payload at a time
+        size_t read_size = std::min(PAYLOAD_SIZE, remaining_size);
+
+        header->type = DM_USER_RESP_SUCCESS;
+        size_t total_bytes_read = 0;
+        io_error = false;
+        bufsink_.ResetBufferOffset();
+
+        while (read_size) {
+            // We need to check every 4k block to verify if it is
+            // present in the mapping.
+            size_t size = std::min(BLOCK_SZ, read_size);
+
+            auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(),
+                                       std::make_pair(sector, nullptr), SnapshotHandler::compare);
+            bool not_found = (it == chunk_vec.end() || it->first != sector);
+
+            if (not_found) {
+                // Block not found in map - which means this block was not
+                // changed as per the OTA. Just route the I/O to the base
+                // device.
+                if (!ReadDataFromBaseDevice(sector, size)) {
+                    SNAP_LOG(ERROR) << "ReadDataFromBaseDevice failed";
+                    header->type = DM_USER_RESP_ERROR;
+                }
+
+                ret = size;
+            } else {
+                // We found the sector in mapping. Check the type of COW OP and
+                // process it.
+                if (!ProcessCowOp(it->second)) {
+                    SNAP_LOG(ERROR) << "ProcessCowOp failed";
+                    header->type = DM_USER_RESP_ERROR;
+                }
+
+                ret = BLOCK_SZ;
+            }
+
+            // Just return the header if it is an error
+            if (header->type == DM_USER_RESP_ERROR) {
+                if (!RespondIOError(header_response)) {
+                    return false;
+                }
+
+                io_error = true;
+                break;
+            }
+
+            read_size -= ret;
+            total_bytes_read += ret;
+            sector += (ret >> SECTOR_SHIFT);
+            bufsink_.UpdateBufferOffset(ret);
+        }
+
+        if (!io_error) {
+            if (!WriteDmUserPayload(total_bytes_read, header_response)) {
+                return false;
+            }
+
+            SNAP_LOG(DEBUG) << "WriteDmUserPayload success total_bytes_read: " << total_bytes_read
+                            << " header-response: " << header_response
+                            << " remaining_size: " << remaining_size;
+            header_response = false;
+            remaining_size -= total_bytes_read;
+        }
+    } while (remaining_size > 0 && !io_error);
+
+    return true;
+}
+
+int Worker::ReadUnalignedSector(
+        sector_t sector, size_t size,
+        std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
+    size_t skip_sector_size = 0;
+
+    SNAP_LOG(DEBUG) << "ReadUnalignedSector: sector " << sector << " size: " << size
+                    << " Aligned sector: " << it->first;
+
+    if (!ProcessCowOp(it->second)) {
+        SNAP_LOG(ERROR) << "ReadUnalignedSector: " << sector << " failed of size: " << size
+                        << " Aligned sector: " << it->first;
+        return -1;
+    }
+
+    int num_sectors_skip = sector - it->first;
+
+    if (num_sectors_skip > 0) {
+        skip_sector_size = num_sectors_skip << SECTOR_SHIFT;
+        char* buffer = reinterpret_cast<char*>(bufsink_.GetBufPtr());
+        struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
+
+        if (skip_sector_size == BLOCK_SZ) {
+            SNAP_LOG(ERROR) << "Invalid un-aligned IO request at sector: " << sector
+                            << " Base-sector: " << it->first;
+            return -1;
+        }
+
+        memmove(msg->payload.buf, (char*)msg->payload.buf + skip_sector_size,
+                (BLOCK_SZ - skip_sector_size));
+    }
+
+    bufsink_.ResetBufferOffset();
+    return std::min(size, (BLOCK_SZ - skip_sector_size));
+}
+
+bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
+    struct dm_user_header* header = bufsink_.GetHeaderPtr();
+    header->type = DM_USER_RESP_SUCCESS;
+    bufsink_.ResetBufferOffset();
+    std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
+
+    auto it = std::lower_bound(chunk_vec.begin(), chunk_vec.end(), std::make_pair(sector, nullptr),
+                               SnapshotHandler::compare);
+
+    // |-------|-------|-------|
+    // 0       1       2       3
+    //
+    // Block 0 - op 1
+    // Block 1 - op 2
+    // Block 2 - op 3
+    //
+    // chunk_vec will have block 0, 1, 2 which maps to relavant COW ops.
+    //
+    // Each block is 4k bytes. Thus, the last block will span 8 sectors
+    // ranging till block 3 (However, block 3 won't be in chunk_vec as
+    // it doesn't have any mapping to COW ops. Now, if we get an I/O request for a sector
+    // spanning between block 2 and block 3, we need to step back
+    // and get hold of the last element.
+    //
+    // Additionally, we need to make sure that the requested sector is
+    // indeed within the range of the final sector. It is perfectly valid
+    // to get an I/O request for block 3 and beyond which are not mapped
+    // to any COW ops. In that case, we just need to read from the base
+    // device.
+    bool merge_complete = false;
+    bool header_response = true;
+    if (it == chunk_vec.end()) {
+        if (chunk_vec.size() > 0) {
+            // I/O request beyond the last mapped sector
+            it = std::prev(chunk_vec.end());
+        } else {
+            // This can happen when a partition merge is complete but snapshot
+            // state in /metadata is not yet deleted; during this window if the
+            // device is rebooted, subsequent attempt will mount the snapshot.
+            // However, since the merge was completed we wouldn't have any
+            // mapping to COW ops thus chunk_vec will be empty. In that case,
+            // mark this as merge_complete and route the I/O to the base device.
+            merge_complete = true;
+        }
+    } else if (it->first != sector) {
+        if (it != chunk_vec.begin()) {
+            --it;
+        }
+    } else {
+        return ReadAlignedSector(sector, size, header_response);
+    }
+
+    loff_t requested_offset = sector << SECTOR_SHIFT;
+
+    loff_t final_offset = 0;
+    if (!merge_complete) {
+        final_offset = it->first << SECTOR_SHIFT;
+    }
+
+    // Since a COW op span 4k block size, we need to make sure that the requested
+    // offset is within the 4k region. Consider the following case:
+    //
+    // |-------|-------|-------|
+    // 0       1       2       3
+    //
+    // Block 0 - op 1
+    // Block 1 - op 2
+    //
+    // We have an I/O request for a sector between block 2 and block 3. However,
+    // we have mapping to COW ops only for block 0 and block 1. Thus, the
+    // requested offset in this case is beyond the last mapped COW op size (which
+    // is block 1 in this case).
+
+    size_t total_bytes_read = 0;
+    size_t remaining_size = size;
+    int ret = 0;
+    if (!merge_complete && (requested_offset >= final_offset) &&
+        (requested_offset - final_offset) < BLOCK_SZ) {
+        // Read the partial un-aligned data
+        ret = ReadUnalignedSector(sector, remaining_size, it);
+        if (ret < 0) {
+            SNAP_LOG(ERROR) << "ReadUnalignedSector failed for sector: " << sector
+                            << " size: " << size << " it->sector: " << it->first;
+            return RespondIOError(header_response);
+        }
+
+        remaining_size -= ret;
+        total_bytes_read += ret;
+        sector += (ret >> SECTOR_SHIFT);
+
+        // Send the data back
+        if (!WriteDmUserPayload(total_bytes_read, header_response)) {
+            return false;
+        }
+
+        header_response = false;
+        // If we still have pending data to be processed, this will be aligned I/O
+        if (remaining_size) {
+            return ReadAlignedSector(sector, remaining_size, header_response);
+        }
+    } else {
+        // This is all about handling I/O request to be routed to base device
+        // as the I/O is not mapped to any of the COW ops.
+        loff_t aligned_offset = requested_offset;
+        // Align to nearest 4k
+        aligned_offset += BLOCK_SZ - 1;
+        aligned_offset &= ~(BLOCK_SZ - 1);
+        // Find the diff of the aligned offset
+        size_t diff_size = aligned_offset - requested_offset;
+        CHECK(diff_size <= BLOCK_SZ);
+        if (remaining_size < diff_size) {
+            if (!ReadDataFromBaseDevice(sector, remaining_size)) {
+                return RespondIOError(header_response);
+            }
+            total_bytes_read += remaining_size;
+
+            if (!WriteDmUserPayload(total_bytes_read, header_response)) {
+                return false;
+            }
+        } else {
+            if (!ReadDataFromBaseDevice(sector, diff_size)) {
+                return RespondIOError(header_response);
+            }
+
+            total_bytes_read += diff_size;
+
+            if (!WriteDmUserPayload(total_bytes_read, header_response)) {
+                return false;
+            }
+
+            remaining_size -= diff_size;
+            size_t num_sectors_read = (diff_size >> SECTOR_SHIFT);
+            sector += num_sectors_read;
+            CHECK(IsBlockAligned(sector << SECTOR_SHIFT));
+            header_response = false;
+
+            // If we still have pending data to be processed, this will be aligned I/O
+            return ReadAlignedSector(sector, remaining_size, header_response);
+        }
+    }
+
+    return true;
+}
+
+bool Worker::RespondIOError(bool header_response) {
+    struct dm_user_header* header = bufsink_.GetHeaderPtr();
+    header->type = DM_USER_RESP_ERROR;
+    // This is an issue with the dm-user interface. There
+    // is no way to propagate the I/O error back to dm-user
+    // if we have already communicated the header back. Header
+    // is responded once at the beginning; however I/O can
+    // be processed in chunks. If we encounter an I/O error
+    // somewhere in the middle of the processing, we can't communicate
+    // this back to dm-user.
+    //
+    // TODO: Fix the interface
+    CHECK(header_response);
+
+    if (!WriteDmUserPayload(0, header_response)) {
+        return false;
+    }
+
+    // There is no need to process further as we have already seen
+    // an I/O error
+    return true;
+}
+
+bool Worker::DmuserReadRequest() {
+    struct dm_user_header* header = bufsink_.GetHeaderPtr();
+
+    // Unaligned I/O request
+    if (!IsBlockAligned(header->sector << SECTOR_SHIFT)) {
+        return ReadUnalignedSector(header->sector, header->len);
+    }
+
+    return ReadAlignedSector(header->sector, header->len, true);
+}
+
 bool Worker::ProcessIORequest() {
-    // No communication with dm-user yet
+    struct dm_user_header* header = bufsink_.GetHeaderPtr();
+
+    if (!ReadDmUserHeader()) {
+        return false;
+    }
+
+    SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
+    SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
+    SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
+    SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
+    SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
+
+    switch (header->type) {
+        case DM_USER_REQ_MAP_READ: {
+            if (!DmuserReadRequest()) {
+                return false;
+            }
+            break;
+        }
+
+        case DM_USER_REQ_MAP_WRITE: {
+            // TODO: We should not get any write request
+            // to dm-user as we mount all partitions
+            // as read-only. Need to verify how are TRIM commands
+            // handled during mount.
+            return false;
+        }
+    }
+
     return true;
 }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
index 696ede7..47fc7db 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
@@ -52,7 +52,6 @@
                     break;
                 }
 
-                // Check for consecutive blocks
                 uint64_t next_offset = op->new_block * BLOCK_SZ;
                 if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
                     break;
@@ -177,6 +176,7 @@
     void* mapped_addr = snapuserd_->GetMappedAddr();
     void* read_ahead_buffer =
             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
+    size_t block_index = 0;
 
     SNAP_LOG(INFO) << "MergeOrderedOps started....";
 
@@ -190,9 +190,12 @@
         // Wait for RA thread to notify that the merge window
         // is ready for merging.
         if (!snapuserd_->WaitForMergeBegin()) {
+            snapuserd_->SetMergeFailed(block_index);
             return false;
         }
 
+        snapuserd_->SetMergeInProgress(block_index);
+
         loff_t offset = 0;
         int num_ops = snapuserd_->GetTotalBlocksToMerge();
         SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
@@ -213,6 +216,7 @@
             if (ret < 0 || ret != io_size) {
                 SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
                                 << " at offset: " << source_offset << " io_size: " << io_size;
+                snapuserd_->SetMergeFailed(block_index);
                 return false;
             }
 
@@ -226,6 +230,7 @@
         // Flush the data
         if (fsync(base_path_merge_fd_.get()) < 0) {
             SNAP_LOG(ERROR) << " Failed to fsync merged data";
+            snapuserd_->SetMergeFailed(block_index);
             return false;
         }
 
@@ -233,14 +238,20 @@
         // the merge completion
         if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
+            snapuserd_->SetMergeFailed(block_index);
             return false;
         }
 
         SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
+        // Mark the block as merge complete
+        snapuserd_->SetMergeCompleted(block_index);
 
         // Notify RA thread that the merge thread is ready to merge the next
         // window
         snapuserd_->NotifyRAForMergeReady();
+
+        // Get the next block
+        block_index += 1;
     }
 
     return true;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
index 319755b..0bcf26e 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -115,7 +115,7 @@
 }
 
 bool ReadAhead::ReconstructDataFromCow() {
-    std::unordered_map<uint64_t, void*> read_ahead_buffer_map;
+    std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
     loff_t metadata_offset = 0;
     loff_t start_data_offset = snapuserd_->GetBufferDataOffset();
     int num_ops = 0;
@@ -319,6 +319,18 @@
     memcpy(metadata_buffer_, ra_temp_meta_buffer.get(), snapuserd_->GetBufferMetadataSize());
     memcpy(read_ahead_buffer_, ra_temp_buffer.get(), total_blocks_merged * BLOCK_SZ);
 
+    offset = 0;
+    std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
+    read_ahead_buffer_map.clear();
+
+    for (size_t block_index = 0; block_index < blocks.size(); block_index++) {
+        void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + offset);
+        uint64_t new_block = blocks[block_index];
+
+        read_ahead_buffer_map[new_block] = bufptr;
+        offset += BLOCK_SZ;
+    }
+
     snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
 
     // Flush the data only if we have a overlapping blocks in the region
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
similarity index 66%
copy from fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
copy to fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
index 91b4190..a4fd5a0 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
@@ -44,26 +44,29 @@
 using android::base::borrowed_fd;
 using android::base::unique_fd;
 
-DaemonOperations SnapuserdServer::Resolveop(std::string& input) {
-    if (input == "init") return DaemonOperations::INIT;
-    if (input == "start") return DaemonOperations::START;
-    if (input == "stop") return DaemonOperations::STOP;
-    if (input == "query") return DaemonOperations::QUERY;
-    if (input == "delete") return DaemonOperations::DELETE;
-    if (input == "detach") return DaemonOperations::DETACH;
-    if (input == "supports") return DaemonOperations::SUPPORTS;
+DaemonOps SnapuserServer::Resolveop(std::string& input) {
+    if (input == "init") return DaemonOps::INIT;
+    if (input == "start") return DaemonOps::START;
+    if (input == "stop") return DaemonOps::STOP;
+    if (input == "query") return DaemonOps::QUERY;
+    if (input == "delete") return DaemonOps::DELETE;
+    if (input == "detach") return DaemonOps::DETACH;
+    if (input == "supports") return DaemonOps::SUPPORTS;
+    if (input == "initiate_merge") return DaemonOps::INITIATE;
+    if (input == "merge_percent") return DaemonOps::PERCENTAGE;
+    if (input == "getstatus") return DaemonOps::GETSTATUS;
 
-    return DaemonOperations::INVALID;
+    return DaemonOps::INVALID;
 }
 
-SnapuserdServer::~SnapuserdServer() {
+SnapuserServer::~SnapuserServer() {
     // Close any client sockets that were added via AcceptClient().
     for (size_t i = 1; i < watched_fds_.size(); i++) {
         close(watched_fds_[i].fd);
     }
 }
 
-std::string SnapuserdServer::GetDaemonStatus() {
+std::string SnapuserServer::GetDaemonStatus() {
     std::string msg = "";
 
     if (IsTerminating())
@@ -74,8 +77,8 @@
     return msg;
 }
 
-void SnapuserdServer::Parsemsg(std::string const& msg, const char delim,
-                               std::vector<std::string>& out) {
+void SnapuserServer::Parsemsg(std::string const& msg, const char delim,
+                              std::vector<std::string>& out) {
     std::stringstream ss(msg);
     std::string s;
 
@@ -84,15 +87,15 @@
     }
 }
 
-void SnapuserdServer::ShutdownThreads() {
-    StopThreads();
+void SnapuserServer::ShutdownThreads() {
+    terminating_ = true;
     JoinAllThreads();
 }
 
-DmUserHandler::DmUserHandler(std::shared_ptr<Snapuserd> snapuserd)
+DmUserHandler::DmUserHandler(std::shared_ptr<SnapshotHandler> snapuserd)
     : snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {}
 
-bool SnapuserdServer::Sendmsg(android::base::borrowed_fd fd, const std::string& msg) {
+bool SnapuserServer::Sendmsg(android::base::borrowed_fd fd, const std::string& msg) {
     ssize_t ret = TEMP_FAILURE_RETRY(send(fd.get(), msg.data(), msg.size(), MSG_NOSIGNAL));
     if (ret < 0) {
         PLOG(ERROR) << "Snapuserd:server: send() failed";
@@ -106,7 +109,7 @@
     return true;
 }
 
-bool SnapuserdServer::Recv(android::base::borrowed_fd fd, std::string* data) {
+bool SnapuserServer::Recv(android::base::borrowed_fd fd, std::string* data) {
     char msg[MAX_PACKET_SIZE];
     ssize_t rv = TEMP_FAILURE_RETRY(recv(fd.get(), msg, sizeof(msg), 0));
     if (rv < 0) {
@@ -117,25 +120,25 @@
     return true;
 }
 
-bool SnapuserdServer::Receivemsg(android::base::borrowed_fd fd, const std::string& str) {
+bool SnapuserServer::Receivemsg(android::base::borrowed_fd fd, const std::string& str) {
     const char delim = ',';
 
     std::vector<std::string> out;
     Parsemsg(str, delim, out);
-    DaemonOperations op = Resolveop(out[0]);
+    DaemonOps op = Resolveop(out[0]);
 
     switch (op) {
-        case DaemonOperations::INIT: {
+        case DaemonOps::INIT: {
             // Message format:
-            // init,<misc_name>,<cow_device_path>,<backing_device>
+            // init,<misc_name>,<cow_device_path>,<backing_device>,<base_path_merge>
             //
             // Reads the metadata and send the number of sectors
-            if (out.size() != 4) {
+            if (out.size() != 5) {
                 LOG(ERROR) << "Malformed init message, " << out.size() << " parts";
                 return Sendmsg(fd, "fail");
             }
 
-            auto handler = AddHandler(out[1], out[2], out[3]);
+            auto handler = AddHandler(out[1], out[2], out[3], out[4]);
             if (!handler) {
                 return Sendmsg(fd, "fail");
             }
@@ -143,7 +146,7 @@
             auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors());
             return Sendmsg(fd, retval);
         }
-        case DaemonOperations::START: {
+        case DaemonOps::START: {
             // Message format:
             // start,<misc_name>
             //
@@ -168,7 +171,7 @@
             }
             return Sendmsg(fd, "success");
         }
-        case DaemonOperations::STOP: {
+        case DaemonOps::STOP: {
             // Message format: stop
             //
             // Stop all the threads gracefully and then shutdown the
@@ -177,7 +180,7 @@
             ShutdownThreads();
             return true;
         }
-        case DaemonOperations::QUERY: {
+        case DaemonOps::QUERY: {
             // Message format: query
             //
             // As part of transition, Second stage daemon will be
@@ -189,23 +192,41 @@
             // be ready to receive control message.
             return Sendmsg(fd, GetDaemonStatus());
         }
-        case DaemonOperations::DELETE: {
+        case DaemonOps::DELETE: {
             // Message format:
             // delete,<misc_name>
             if (out.size() != 2) {
                 LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
                 return Sendmsg(fd, "fail");
             }
+            {
+                std::lock_guard<std::mutex> lock(lock_);
+                auto iter = FindHandler(&lock, out[1]);
+                if (iter == dm_users_.end()) {
+                    // After merge is completed, we swap dm-user table with
+                    // the underlying dm-linear base device. Hence, worker
+                    // threads would have terminted and was removed from
+                    // the list.
+                    LOG(DEBUG) << "Could not find handler: " << out[1];
+                    return Sendmsg(fd, "success");
+                }
+
+                if (!(*iter)->ThreadTerminated()) {
+                    (*iter)->snapuserd()->NotifyIOTerminated();
+                }
+            }
             if (!RemoveAndJoinHandler(out[1])) {
                 return Sendmsg(fd, "fail");
             }
             return Sendmsg(fd, "success");
         }
-        case DaemonOperations::DETACH: {
+        case DaemonOps::DETACH: {
+            std::lock_guard<std::mutex> lock(lock_);
+            TerminateMergeThreads(&lock);
             terminating_ = true;
             return true;
         }
-        case DaemonOperations::SUPPORTS: {
+        case DaemonOps::SUPPORTS: {
             if (out.size() != 2) {
                 LOG(ERROR) << "Malformed supports message, " << out.size() << " parts";
                 return Sendmsg(fd, "fail");
@@ -215,6 +236,52 @@
             }
             return Sendmsg(fd, "fail");
         }
+        case DaemonOps::INITIATE: {
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts";
+                return Sendmsg(fd, "fail");
+            }
+            if (out[0] == "initiate_merge") {
+                std::lock_guard<std::mutex> lock(lock_);
+                auto iter = FindHandler(&lock, out[1]);
+                if (iter == dm_users_.end()) {
+                    LOG(ERROR) << "Could not find handler: " << out[1];
+                    return Sendmsg(fd, "fail");
+                }
+
+                if (!StartMerge(*iter)) {
+                    return Sendmsg(fd, "fail");
+                }
+
+                return Sendmsg(fd, "success");
+            }
+            return Sendmsg(fd, "fail");
+        }
+        case DaemonOps::PERCENTAGE: {
+            std::lock_guard<std::mutex> lock(lock_);
+            double percentage = GetMergePercentage(&lock);
+
+            return Sendmsg(fd, std::to_string(percentage));
+        }
+        case DaemonOps::GETSTATUS: {
+            // Message format:
+            // getstatus,<misc_name>
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
+                return Sendmsg(fd, "snapshot-merge-failed");
+            }
+            {
+                std::lock_guard<std::mutex> lock(lock_);
+                auto iter = FindHandler(&lock, out[1]);
+                if (iter == dm_users_.end()) {
+                    LOG(ERROR) << "Could not find handler: " << out[1];
+                    return Sendmsg(fd, "snapshot-merge-failed");
+                }
+
+                std::string merge_status = GetMergeStatus(*iter);
+                return Sendmsg(fd, merge_status);
+            }
+        }
         default: {
             LOG(ERROR) << "Received unknown message type from client";
             Sendmsg(fd, "fail");
@@ -223,7 +290,7 @@
     }
 }
 
-void SnapuserdServer::RunThread(std::shared_ptr<DmUserHandler> handler) {
+void SnapuserServer::RunThread(std::shared_ptr<DmUserHandler> handler) {
     LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
 
     handler->snapuserd()->SetSocketPresent(is_socket_present_);
@@ -240,6 +307,8 @@
 
     {
         std::lock_guard<std::mutex> lock(lock_);
+        num_partitions_merge_complete_ += 1;
+        handler->SetThreadTerminated();
         auto iter = FindHandler(&lock, handler->misc_name());
         if (iter == dm_users_.end()) {
             // RemoveAndJoinHandler() already removed us from the list, and is
@@ -264,10 +333,11 @@
         // WaitForDelete() is called, the handler is either in the list, or
         // it's not and its resources are guaranteed to be freed.
         handler->FreeResources();
+        dm_users_.erase(iter);
     }
 }
 
-bool SnapuserdServer::Start(const std::string& socketname) {
+bool SnapuserServer::Start(const std::string& socketname) {
     bool start_listening = true;
 
     sockfd_.reset(android_get_control_socket(socketname.c_str()));
@@ -283,7 +353,7 @@
     return StartWithSocket(start_listening);
 }
 
-bool SnapuserdServer::StartWithSocket(bool start_listening) {
+bool SnapuserServer::StartWithSocket(bool start_listening) {
     if (start_listening && listen(sockfd_.get(), 4) < 0) {
         PLOG(ERROR) << "listen socket failed";
         return false;
@@ -304,7 +374,7 @@
     return true;
 }
 
-bool SnapuserdServer::Run() {
+bool SnapuserServer::Run() {
     LOG(INFO) << "Now listening on snapuserd socket";
 
     while (!IsTerminating()) {
@@ -336,7 +406,7 @@
     return true;
 }
 
-void SnapuserdServer::JoinAllThreads() {
+void SnapuserServer::JoinAllThreads() {
     // Acquire the thread list within the lock.
     std::vector<std::shared_ptr<DmUserHandler>> dm_users;
     {
@@ -351,14 +421,14 @@
     }
 }
 
-void SnapuserdServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
+void SnapuserServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
     struct pollfd p = {};
     p.fd = fd.get();
     p.events = events;
     watched_fds_.emplace_back(std::move(p));
 }
 
-void SnapuserdServer::AcceptClient() {
+void SnapuserServer::AcceptClient() {
     int fd = TEMP_FAILURE_RETRY(accept4(sockfd_.get(), nullptr, nullptr, SOCK_CLOEXEC));
     if (fd < 0) {
         PLOG(ERROR) << "accept4 failed";
@@ -368,7 +438,7 @@
     AddWatchedFd(fd, POLLIN);
 }
 
-bool SnapuserdServer::HandleClient(android::base::borrowed_fd fd, int revents) {
+bool SnapuserServer::HandleClient(android::base::borrowed_fd fd, int revents) {
     if (revents & POLLHUP) {
         LOG(DEBUG) << "Snapuserd client disconnected";
         return false;
@@ -385,16 +455,18 @@
     return true;
 }
 
-void SnapuserdServer::Interrupt() {
+void SnapuserServer::Interrupt() {
     // Force close the socket so poll() fails.
     sockfd_ = {};
     SetTerminating();
 }
 
-std::shared_ptr<DmUserHandler> SnapuserdServer::AddHandler(const std::string& misc_name,
-                                                           const std::string& cow_device_path,
-                                                           const std::string& backing_device) {
-    auto snapuserd = std::make_shared<Snapuserd>(misc_name, cow_device_path, backing_device);
+std::shared_ptr<DmUserHandler> SnapuserServer::AddHandler(const std::string& misc_name,
+                                                          const std::string& cow_device_path,
+                                                          const std::string& backing_device,
+                                                          const std::string& base_path_merge) {
+    auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
+                                                       base_path_merge);
     if (!snapuserd->InitCowDevice()) {
         LOG(ERROR) << "Failed to initialize Snapuserd";
         return nullptr;
@@ -417,7 +489,7 @@
     return handler;
 }
 
-bool SnapuserdServer::StartHandler(const std::shared_ptr<DmUserHandler>& handler) {
+bool SnapuserServer::StartHandler(const std::shared_ptr<DmUserHandler>& handler) {
     if (handler->snapuserd()->IsAttached()) {
         LOG(ERROR) << "Handler already attached";
         return false;
@@ -425,12 +497,22 @@
 
     handler->snapuserd()->AttachControlDevice();
 
-    handler->thread() = std::thread(std::bind(&SnapuserdServer::RunThread, this, handler));
+    handler->thread() = std::thread(std::bind(&SnapuserServer::RunThread, this, handler));
     return true;
 }
 
-auto SnapuserdServer::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
-                                  const std::string& misc_name) -> HandlerList::iterator {
+bool SnapuserServer::StartMerge(const std::shared_ptr<DmUserHandler>& handler) {
+    if (!handler->snapuserd()->IsAttached()) {
+        LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
+        return false;
+    }
+
+    handler->snapuserd()->InitiateMerge();
+    return true;
+}
+
+auto SnapuserServer::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
+                                 const std::string& misc_name) -> HandlerList::iterator {
     CHECK(proof_of_lock);
 
     for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
@@ -441,7 +523,51 @@
     return dm_users_.end();
 }
 
-bool SnapuserdServer::RemoveAndJoinHandler(const std::string& misc_name) {
+void SnapuserServer::TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock) {
+    CHECK(proof_of_lock);
+
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        if (!(*iter)->ThreadTerminated()) {
+            (*iter)->snapuserd()->NotifyIOTerminated();
+        }
+    }
+}
+
+std::string SnapuserServer::GetMergeStatus(const std::shared_ptr<DmUserHandler>& handler) {
+    return handler->snapuserd()->GetMergeStatus();
+}
+
+double SnapuserServer::GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock) {
+    CHECK(proof_of_lock);
+    double percentage = 0.0;
+    int n = 0;
+
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        auto& th = (*iter)->thread();
+        if (th.joinable()) {
+            // Merge percentage by individual partitions wherein merge is still
+            // in-progress
+            percentage += (*iter)->snapuserd()->GetMergePercentage();
+            n += 1;
+        }
+    }
+
+    // Calculate final merge including those partitions where merge was already
+    // completed - num_partitions_merge_complete_ will track them when each
+    // thread exists in RunThread.
+    int total_partitions = n + num_partitions_merge_complete_;
+
+    if (total_partitions) {
+        percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
+    }
+
+    LOG(DEBUG) << "Merge %: " << percentage
+               << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
+               << " total_partitions: " << total_partitions << " n: " << n;
+    return percentage;
+}
+
+bool SnapuserServer::RemoveAndJoinHandler(const std::string& misc_name) {
     std::shared_ptr<DmUserHandler> handler;
     {
         std::lock_guard<std::mutex> lock(lock_);
@@ -462,7 +588,7 @@
     return true;
 }
 
-bool SnapuserdServer::WaitForSocket() {
+bool SnapuserServer::WaitForSocket() {
     auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
 
     auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy;
@@ -516,7 +642,7 @@
     return Run();
 }
 
-bool SnapuserdServer::RunForSocketHandoff() {
+bool SnapuserServer::RunForSocketHandoff() {
     unique_fd proxy_fd(android_get_control_socket(kSnapuserdSocketProxy));
     if (proxy_fd < 0) {
         PLOG(FATAL) << "Proxy could not get android control socket " << kSnapuserdSocketProxy;
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
similarity index 76%
copy from fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
copy to fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
index 14e5de6..e93621c 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
@@ -28,14 +28,14 @@
 #include <vector>
 
 #include <android-base/unique_fd.h>
-#include "dm-snapshot-merge/snapuserd.h"
+#include "snapuserd_core.h"
 
 namespace android {
 namespace snapshot {
 
 static constexpr uint32_t MAX_PACKET_SIZE = 512;
 
-enum class DaemonOperations {
+enum class DaemonOps {
     INIT,
     START,
     QUERY,
@@ -43,12 +43,15 @@
     DELETE,
     DETACH,
     SUPPORTS,
+    INITIATE,
+    PERCENTAGE,
+    GETSTATUS,
     INVALID,
 };
 
 class DmUserHandler {
   public:
-    explicit DmUserHandler(std::shared_ptr<Snapuserd> snapuserd);
+    explicit DmUserHandler(std::shared_ptr<SnapshotHandler> snapuserd);
 
     void FreeResources() {
         // Each worker thread holds a reference to snapuserd.
@@ -59,44 +62,28 @@
             snapuserd_ = nullptr;
         }
     }
-    const std::shared_ptr<Snapuserd>& snapuserd() const { return snapuserd_; }
+    const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
     std::thread& thread() { return thread_; }
 
     const std::string& misc_name() const { return misc_name_; }
+    bool ThreadTerminated() { return thread_terminated_; }
+    void SetThreadTerminated() { thread_terminated_ = true; }
 
   private:
     std::thread thread_;
-    std::shared_ptr<Snapuserd> snapuserd_;
+    std::shared_ptr<SnapshotHandler> snapuserd_;
     std::string misc_name_;
+    bool thread_terminated_ = false;
 };
 
-class Stoppable {
-    std::promise<void> exitSignal_;
-    std::future<void> futureObj_;
-
-  public:
-    Stoppable() : futureObj_(exitSignal_.get_future()) {}
-
-    virtual ~Stoppable() {}
-
-    bool StopRequested() {
-        // checks if value in future object is available
-        if (futureObj_.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) {
-            return false;
-        }
-        return true;
-    }
-    // Request the thread to stop by setting value in promise object
-    void StopThreads() { exitSignal_.set_value(); }
-};
-
-class SnapuserdServer : public Stoppable {
+class SnapuserServer {
   private:
     android::base::unique_fd sockfd_;
     bool terminating_;
     volatile bool received_socket_signal_ = false;
     std::vector<struct pollfd> watched_fds_;
     bool is_socket_present_ = false;
+    int num_partitions_merge_complete_ = 0;
 
     std::mutex lock_;
 
@@ -112,7 +99,7 @@
 
     void ShutdownThreads();
     bool RemoveAndJoinHandler(const std::string& control_device);
-    DaemonOperations Resolveop(std::string& input);
+    DaemonOps Resolveop(std::string& input);
     std::string GetDaemonStatus();
     void Parsemsg(std::string const& msg, const char delim, std::vector<std::string>& out);
 
@@ -126,9 +113,12 @@
     HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
                                       const std::string& misc_name);
 
+    double GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock);
+    void TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock);
+
   public:
-    SnapuserdServer() { terminating_ = false; }
-    ~SnapuserdServer();
+    SnapuserServer() { terminating_ = false; }
+    ~SnapuserServer();
 
     bool Start(const std::string& socketname);
     bool Run();
@@ -138,8 +128,11 @@
 
     std::shared_ptr<DmUserHandler> AddHandler(const std::string& misc_name,
                                               const std::string& cow_device_path,
-                                              const std::string& backing_device);
+                                              const std::string& backing_device,
+                                              const std::string& base_path_merge);
     bool StartHandler(const std::shared_ptr<DmUserHandler>& handler);
+    bool StartMerge(const std::shared_ptr<DmUserHandler>& handler);
+    std::string GetMergeStatus(const std::shared_ptr<DmUserHandler>& handler);
 
     void SetTerminating() { terminating_ = true; }
     void ReceivedSocketSignal() { received_socket_signal_ = true; }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
index 97418bd..6c91fde 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
@@ -359,5 +359,289 @@
     }
 }
 
+std::string SnapshotHandler::GetMergeStatus() {
+    bool merge_not_initiated = false;
+    bool merge_failed = false;
+
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        if (!MergeInitiated()) {
+            merge_not_initiated = true;
+        }
+
+        if (io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED) {
+            merge_failed = true;
+        }
+    }
+
+    struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
+    bool merge_complete = (ch->num_merge_ops == reader_->get_num_total_data_ops());
+
+    if (merge_not_initiated) {
+        // Merge was not initiated yet; however, we have merge completion
+        // recorded in the COW Header. This can happen if the device was
+        // rebooted during merge. During next reboot, libsnapshot will
+        // query the status and if the merge is completed, then snapshot-status
+        // file will be deleted
+        if (merge_complete) {
+            return "snapshot-merge-complete";
+        }
+
+        // Return the state as "snapshot". If the device was rebooted during
+        // merge, we will return the status as "snapshot". This is ok, as
+        // libsnapshot will explicitly resume the merge. This is slightly
+        // different from kernel snapshot wherein once the snapshot was switched
+        // to merge target, during next boot, we immediately switch to merge
+        // target. We don't do that here because, during first stage init, we
+        // don't want to initiate the merge. The problem is that we have daemon
+        // transition between first and second stage init. If the merge was
+        // started, then we will have to quiesce the merge before switching
+        // the dm tables. Instead, we just wait until second stage daemon is up
+        // before resuming the merge.
+        return "snapshot";
+    }
+
+    if (merge_failed) {
+        return "snapshot-merge-failed";
+    }
+
+    // Merge complete
+    if (merge_complete) {
+        return "snapshot-merge-complete";
+    }
+
+    // Merge is in-progress
+    return "snapshot-merge";
+}
+
+//========== End of Read-ahead state transition functions ====================
+
+/*
+ * Root partitions are mounted off dm-user and the I/O's are served
+ * by snapuserd worker threads.
+ *
+ * When there is an I/O request to be served by worker threads, we check
+ * if the corresponding sector is "changed" due to OTA by doing a lookup.
+ * If the lookup succeeds then the sector has been changed and that can
+ * either fall into 4 COW operations viz: COPY, XOR, REPLACE and ZERO.
+ *
+ * For the case of REPLACE and ZERO ops, there is not much of a concern
+ * as there is no dependency between blocks. Hence all the I/O request
+ * mapped to these two COW operations will be served by reading the COW device.
+ *
+ * However, COPY and XOR ops are tricky. Since the merge operations are
+ * in-progress, we cannot just go and read from the source device. We need
+ * to be in sync with the state of the merge thread before serving the I/O.
+ *
+ * Given that we know merge thread processes a set of COW ops called as RA
+ * Blocks - These set of COW ops are fixed size wherein each Block comprises
+ * of 510 COW ops.
+ *
+ *  +--------------------------+
+ *  |op-1|op-2|op-3|....|op-510|
+ *  +--------------------------+
+ *
+ *  <------ Merge Group Block N ------>
+ *
+ * Thus, a Merge Group Block N, will fall into one of these states and will
+ * transition the states in the following order:
+ *
+ * 1: GROUP_MERGE_PENDING
+ * 2: GROUP_MERGE_RA_READY
+ * 2: GROUP_MERGE_IN_PROGRESS
+ * 3: GROUP_MERGE_COMPLETED
+ * 4: GROUP_MERGE_FAILED
+ *
+ * Let's say that we have the I/O request from dm-user whose sector gets mapped
+ * to a COPY operation with op-10 in the above "Merge Group Block N".
+ *
+ * 1: If the Group is in "GROUP_MERGE_PENDING" state:
+ *
+ *    Just read the data from source block based on COW op->source field. Note,
+ *    that we will take a ref count on "Block N". This ref count will prevent
+ *    merge thread to begin merging if there are any pending I/Os. Once the I/O
+ *    is completed, ref count on "Group N" is decremented. Merge thread will
+ *    resume merging "Group N" if there are no pending I/Os.
+ *
+ * 2: If the Group is in "GROUP_MERGE_IN_PROGRESS" or "GROUP_MERGE_RA_READY" state:
+ *
+ *    When the merge thread is ready to process a "Group", it will first move
+ *    the state to GROUP_MERGE_PENDING -> GROUP_MERGE_RA_READY. From this point
+ *    onwards, I/O will be served from Read-ahead buffer. However, merge thread
+ *    cannot start merging this "Group" immediately. If there were any in-flight
+ *    I/O requests, merge thread should wait and allow those I/O's to drain.
+ *    Once all the in-flight I/O's are completed, merge thread will move the
+ *    state from "GROUP_MERGE_RA_READY" -> "GROUP_MERGE_IN_PROGRESS". I/O will
+ *    be continued to serve from Read-ahead buffer during the entire duration
+ *    of the merge.
+ *
+ *    See SetMergeInProgress().
+ *
+ * 3: If the Group is in "GROUP_MERGE_COMPLETED" state:
+ *
+ *    This is straightforward. We just read the data directly from "Base"
+ *    device. We should not be reading the COW op->source field.
+ *
+ * 4: If the Block is in "GROUP_MERGE_FAILED" state:
+ *
+ *    Terminate the I/O with an I/O error as we don't know which "op" in the
+ *    "Group" failed.
+ *
+ *    Transition ensures that the I/O from root partitions are never made to
+ *    wait and are processed immediately. Thus the state transition for any
+ *    "Group" is:
+ *
+ *    GROUP_MERGE_PENDING
+ *          |
+ *          |
+ *          v
+ *    GROUP_MERGE_RA_READY
+ *          |
+ *          |
+ *          v
+ *    GROUP_MERGE_IN_PROGRESS
+ *          |
+ *          |----------------------------(on failure)
+ *          |                           |
+ *          v                           v
+ *    GROUP_MERGE_COMPLETED           GROUP_MERGE_FAILED
+ *
+ */
+
+// Invoked by Merge thread
+void SnapshotHandler::SetMergeCompleted(size_t ra_index) {
+    MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
+    {
+        std::lock_guard<std::mutex> lock(blk_state->m_lock);
+
+        CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS);
+        CHECK(blk_state->num_ios_in_progress == 0);
+
+        // Merge is complete - All I/O henceforth should be read directly
+        // from base device
+        blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED;
+    }
+}
+
+// Invoked by Merge thread. This is called just before the beginning
+// of merging a given Block of 510 ops. If there are any in-flight I/O's
+// from dm-user then wait for them to complete.
+void SnapshotHandler::SetMergeInProgress(size_t ra_index) {
+    MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
+    {
+        std::unique_lock<std::mutex> lock(blk_state->m_lock);
+
+        CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING);
+
+        // First set the state to RA_READY so that in-flight I/O will drain
+        // and any new I/O will start reading from RA buffer
+        blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_RA_READY;
+
+        // Wait if there are any in-flight I/O's - we cannot merge at this point
+        while (!(blk_state->num_ios_in_progress == 0)) {
+            blk_state->m_cv.wait(lock);
+        }
+
+        blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS;
+    }
+}
+
+// Invoked by Merge thread on failure
+void SnapshotHandler::SetMergeFailed(size_t ra_index) {
+    MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
+    {
+        std::unique_lock<std::mutex> lock(blk_state->m_lock);
+
+        blk_state->merge_state_ = MERGE_GROUP_STATE::GROUP_MERGE_FAILED;
+    }
+}
+
+// Invoked by worker threads when I/O is complete on a "MERGE_PENDING"
+// Block. If there are no more in-flight I/Os, wake up merge thread
+// to resume merging.
+void SnapshotHandler::NotifyIOCompletion(uint64_t new_block) {
+    auto it = block_to_ra_index_.find(new_block);
+    CHECK(it != block_to_ra_index_.end()) << " invalid block: " << new_block;
+
+    bool pending_ios = true;
+
+    int ra_index = it->second;
+    MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
+    {
+        std::unique_lock<std::mutex> lock(blk_state->m_lock);
+
+        CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING);
+        blk_state->num_ios_in_progress -= 1;
+        if (blk_state->num_ios_in_progress == 0) {
+            pending_ios = false;
+        }
+    }
+
+    // Give a chance to merge-thread to resume merge
+    // as there are no pending I/O.
+    if (!pending_ios) {
+        blk_state->m_cv.notify_all();
+    }
+}
+
+bool SnapshotHandler::GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block,
+                                  void* buffer) {
+    if (!lock->owns_lock()) {
+        SNAP_LOG(ERROR) << "GetRABuffer - Lock not held";
+        return false;
+    }
+    std::unordered_map<uint64_t, void*>::iterator it = read_ahead_buffer_map_.find(block);
+
+    if (it == read_ahead_buffer_map_.end()) {
+        SNAP_LOG(ERROR) << "Block: " << block << " not found in RA buffer";
+        return false;
+    }
+
+    memcpy(buffer, it->second, BLOCK_SZ);
+    return true;
+}
+
+// Invoked by worker threads in the I/O path. This is called when a sector
+// is mapped to a COPY/XOR COW op.
+MERGE_GROUP_STATE SnapshotHandler::ProcessMergingBlock(uint64_t new_block, void* buffer) {
+    auto it = block_to_ra_index_.find(new_block);
+    if (it == block_to_ra_index_.end()) {
+        return MERGE_GROUP_STATE::GROUP_INVALID;
+    }
+
+    int ra_index = it->second;
+    MergeGroupState* blk_state = merge_blk_state_[ra_index].get();
+    {
+        std::unique_lock<std::mutex> lock(blk_state->m_lock);
+
+        MERGE_GROUP_STATE state = blk_state->merge_state_;
+        switch (state) {
+            case MERGE_GROUP_STATE::GROUP_MERGE_PENDING: {
+                blk_state->num_ios_in_progress += 1;  // ref count
+                [[fallthrough]];
+            }
+            case MERGE_GROUP_STATE::GROUP_MERGE_COMPLETED: {
+                [[fallthrough]];
+            }
+            case MERGE_GROUP_STATE::GROUP_MERGE_FAILED: {
+                return state;
+            }
+            // Fetch the data from RA buffer.
+            case MERGE_GROUP_STATE::GROUP_MERGE_RA_READY: {
+                [[fallthrough]];
+            }
+            case MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS: {
+                if (!GetRABuffer(&lock, new_block, buffer)) {
+                    return MERGE_GROUP_STATE::GROUP_INVALID;
+                }
+                return state;
+            }
+            default: {
+                return MERGE_GROUP_STATE::GROUP_INVALID;
+            }
+        }
+    }
+}
+
 }  // namespace snapshot
 }  // namespace android