Merge "Revert "Delete KM1""
diff --git a/fs_mgr/fs_mgr.cpp b/fs_mgr/fs_mgr.cpp
index 002b302..07e1e6b 100644
--- a/fs_mgr/fs_mgr.cpp
+++ b/fs_mgr/fs_mgr.cpp
@@ -2119,6 +2119,9 @@
         PERROR << "Cannot open " << loop_device;
         return false;
     }
+    if (!LoopControl::SetAutoClearStatus(loop_fd.get())) {
+        PERROR << "Failed set LO_FLAGS_AUTOCLEAR for " << loop_device;
+    }
     if (!LoopControl::EnableDirectIo(loop_fd.get())) {
         return false;
     }
diff --git a/fs_mgr/libdm/include/libdm/loop_control.h b/fs_mgr/libdm/include/libdm/loop_control.h
index ad53c11..f519054 100644
--- a/fs_mgr/libdm/include/libdm/loop_control.h
+++ b/fs_mgr/libdm/include/libdm/loop_control.h
@@ -46,6 +46,9 @@
     // Enable Direct I/O on a loop device. This requires kernel 4.9+.
     static bool EnableDirectIo(int fd);
 
+    // Set LO_FLAGS_AUTOCLEAR on a loop device.
+    static bool SetAutoClearStatus(int fd);
+
     LoopControl(const LoopControl&) = delete;
     LoopControl& operator=(const LoopControl&) = delete;
     LoopControl& operator=(LoopControl&&) = default;
diff --git a/fs_mgr/libdm/loop_control.cpp b/fs_mgr/libdm/loop_control.cpp
index 2e40a18..32d5f38 100644
--- a/fs_mgr/libdm/loop_control.cpp
+++ b/fs_mgr/libdm/loop_control.cpp
@@ -133,6 +133,16 @@
     return true;
 }
 
+bool LoopControl::SetAutoClearStatus(int fd) {
+    struct loop_info64 info = {};
+
+    info.lo_flags |= LO_FLAGS_AUTOCLEAR;
+    if (ioctl(fd, LOOP_SET_STATUS64, &info)) {
+        return false;
+    }
+    return true;
+}
+
 LoopDevice::LoopDevice(android::base::borrowed_fd fd, const std::chrono::milliseconds& timeout_ms,
                        bool auto_close)
     : fd_(fd), owned_fd_(-1) {
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
index 143f73c..63a9e68 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
@@ -145,6 +145,8 @@
     // Creates a clone of the current CowReader without the file handlers
     std::unique_ptr<CowReader> CloneCowReader();
 
+    void UpdateMergeOpsCompleted(int num_merge_ops) { header_.num_merge_ops += num_merge_ops; }
+
   private:
     bool ParseOps(std::optional<uint64_t> label);
     bool PrepMergeOps();
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h b/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
index 55f4ed7..a49b026 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
@@ -408,6 +408,7 @@
     FRIEND_TEST(SnapshotUpdateTest, FullUpdateFlow);
     FRIEND_TEST(SnapshotUpdateTest, MergeCannotRemoveCow);
     FRIEND_TEST(SnapshotUpdateTest, MergeInRecovery);
+    FRIEND_TEST(SnapshotUpdateTest, QueryStatusError);
     FRIEND_TEST(SnapshotUpdateTest, SnapshotStatusFileWithoutCow);
     FRIEND_TEST(SnapshotUpdateTest, SpaceSwapUpdate);
     friend class SnapshotTest;
diff --git a/fs_mgr/libsnapshot/include_test/libsnapshot/test_helpers.h b/fs_mgr/libsnapshot/include_test/libsnapshot/test_helpers.h
index 1f57bbc..c3b40dc 100644
--- a/fs_mgr/libsnapshot/include_test/libsnapshot/test_helpers.h
+++ b/fs_mgr/libsnapshot/include_test/libsnapshot/test_helpers.h
@@ -100,6 +100,9 @@
         return IDeviceInfo::OpenImageManager("ota/test");
     }
     android::dm::IDeviceMapper& GetDeviceMapper() override {
+        if (dm_) {
+            return *dm_;
+        }
         return android::dm::DeviceMapper::Instance();
     }
 
@@ -111,6 +114,8 @@
     }
     void set_recovery(bool value) { recovery_ = value; }
     void set_first_stage_init(bool value) { first_stage_init_ = value; }
+    void set_dm(android::dm::IDeviceMapper* dm) { dm_ = dm; }
+
     MergeStatus merge_status() const { return merge_status_; }
 
   private:
@@ -120,6 +125,45 @@
     bool recovery_ = false;
     bool first_stage_init_ = false;
     std::unordered_set<uint32_t> unbootable_slots_;
+    android::dm::IDeviceMapper* dm_ = nullptr;
+};
+
+class DeviceMapperWrapper : public android::dm::IDeviceMapper {
+    using DmDeviceState = android::dm::DmDeviceState;
+    using DmTable = android::dm::DmTable;
+
+  public:
+    DeviceMapperWrapper() : impl_(android::dm::DeviceMapper::Instance()) {}
+    explicit DeviceMapperWrapper(android::dm::IDeviceMapper& impl) : impl_(impl) {}
+
+    virtual bool CreateDevice(const std::string& name, const DmTable& table, std::string* path,
+                              const std::chrono::milliseconds& timeout_ms) override {
+        return impl_.CreateDevice(name, table, path, timeout_ms);
+    }
+    virtual DmDeviceState GetState(const std::string& name) const override {
+        return impl_.GetState(name);
+    }
+    virtual bool LoadTableAndActivate(const std::string& name, const DmTable& table) {
+        return impl_.LoadTableAndActivate(name, table);
+    }
+    virtual bool GetTableInfo(const std::string& name, std::vector<TargetInfo>* table) {
+        return impl_.GetTableInfo(name, table);
+    }
+    virtual bool GetTableStatus(const std::string& name, std::vector<TargetInfo>* table) {
+        return impl_.GetTableStatus(name, table);
+    }
+    virtual bool GetDmDevicePathByName(const std::string& name, std::string* path) {
+        return impl_.GetDmDevicePathByName(name, path);
+    }
+    virtual bool GetDeviceString(const std::string& name, std::string* dev) {
+        return impl_.GetDeviceString(name, dev);
+    }
+    virtual bool DeleteDeviceIfExists(const std::string& name) {
+        return impl_.DeleteDeviceIfExists(name);
+    }
+
+  private:
+    android::dm::IDeviceMapper& impl_;
 };
 
 class SnapshotTestPropertyFetcher : public android::fs_mgr::testing::MockPropertyFetcher {
diff --git a/fs_mgr/libsnapshot/snapshot_test.cpp b/fs_mgr/libsnapshot/snapshot_test.cpp
index 43c7fe2..d78ba0a 100644
--- a/fs_mgr/libsnapshot/snapshot_test.cpp
+++ b/fs_mgr/libsnapshot/snapshot_test.cpp
@@ -56,6 +56,7 @@
 using android::base::unique_fd;
 using android::dm::DeviceMapper;
 using android::dm::DmDeviceState;
+using android::dm::IDeviceMapper;
 using android::fiemap::FiemapStatus;
 using android::fiemap::IImageManager;
 using android::fs_mgr::BlockDeviceInfo;
@@ -911,6 +912,11 @@
             ASSERT_TRUE(hash.has_value());
             hashes_[name] = *hash;
         }
+
+        // OTA client blindly unmaps all partitions that are possibly mapped.
+        for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
+            ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
+        }
     }
     void TearDown() override {
         RETURN_IF_NON_VIRTUAL_AB();
@@ -925,6 +931,14 @@
         MountMetadata();
         for (const auto& suffix : {"_a", "_b"}) {
             test_device->set_slot_suffix(suffix);
+
+            // Cheat our way out of merge failed states.
+            if (sm->ProcessUpdateState() == UpdateState::MergeFailed) {
+                ASSERT_TRUE(AcquireLock());
+                ASSERT_TRUE(sm->WriteUpdateState(lock_.get(), UpdateState::None));
+                lock_ = {};
+            }
+
             EXPECT_TRUE(sm->CancelUpdate()) << suffix;
         }
         EXPECT_TRUE(UnmapAll());
@@ -1097,11 +1111,6 @@
 // Also test UnmapUpdateSnapshot unmaps everything.
 // Also test first stage mount and merge after this.
 TEST_F(SnapshotUpdateTest, FullUpdateFlow) {
-    // OTA client blindly unmaps all partitions that are possibly mapped.
-    for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
-        ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
-    }
-
     // Grow all partitions. Set |prd| large enough that |sys| and |vnd|'s COWs
     // fit in super, but not |prd|.
     constexpr uint64_t partition_size = 3788_KiB;
@@ -1189,11 +1198,6 @@
         GTEST_SKIP() << "Compression-only test";
     }
 
-    // OTA client blindly unmaps all partitions that are possibly mapped.
-    for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
-        ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
-    }
-
     // Execute the update.
     ASSERT_TRUE(sm->BeginUpdate());
     ASSERT_TRUE(sm->CreateUpdateSnapshots(manifest_));
@@ -1239,11 +1243,6 @@
         GTEST_SKIP() << "Skipping Virtual A/B Compression test";
     }
 
-    // OTA client blindly unmaps all partitions that are possibly mapped.
-    for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
-        ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
-    }
-
     auto old_sys_size = GetSize(sys_);
     auto old_prd_size = GetSize(prd_);
 
@@ -1630,11 +1629,6 @@
     ASSERT_NE(nullptr, metadata);
     ASSERT_TRUE(UpdatePartitionTable(*opener_, "super", *metadata.get(), 0));
 
-    // OTA client blindly unmaps all partitions that are possibly mapped.
-    for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
-        ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
-    }
-
     // Add operations for sys. The whole device is written.
     AddOperation(sys_);
 
@@ -2074,11 +2068,6 @@
 }
 
 TEST_F(SnapshotUpdateTest, AddPartition) {
-    // OTA client blindly unmaps all partitions that are possibly mapped.
-    for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
-        ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
-    }
-
     group_->add_partition_names("dlkm");
 
     auto dlkm = manifest_.add_partitions();
@@ -2249,6 +2238,60 @@
     ASSERT_TRUE(sm->BeginUpdate());
 }
 
+TEST_F(SnapshotUpdateTest, QueryStatusError) {
+    // Grow all partitions. Set |prd| large enough that |sys| and |vnd|'s COWs
+    // fit in super, but not |prd|.
+    constexpr uint64_t partition_size = 3788_KiB;
+    SetSize(sys_, partition_size);
+
+    AddOperationForPartitions({sys_});
+
+    // Execute the update.
+    ASSERT_TRUE(sm->BeginUpdate());
+    ASSERT_TRUE(sm->CreateUpdateSnapshots(manifest_));
+    ASSERT_TRUE(WriteSnapshotAndHash("sys_b"));
+    ASSERT_TRUE(sm->FinishedSnapshotWrites(false));
+    ASSERT_TRUE(UnmapAll());
+
+    class DmStatusFailure final : public DeviceMapperWrapper {
+      public:
+        bool GetTableStatus(const std::string& name, std::vector<TargetInfo>* table) override {
+            if (!DeviceMapperWrapper::GetTableStatus(name, table)) {
+                return false;
+            }
+            if (name == "sys_b" && !table->empty()) {
+                auto& info = table->at(0);
+                if (DeviceMapper::GetTargetType(info.spec) == "snapshot-merge") {
+                    info.data = "Merge failed";
+                }
+            }
+            return true;
+        }
+    };
+    DmStatusFailure wrapper;
+
+    // After reboot, init does first stage mount.
+    auto info = new TestDeviceInfo(fake_super, "_b");
+    info->set_dm(&wrapper);
+
+    auto init = NewManagerForFirstStageMount(info);
+    ASSERT_NE(init, nullptr);
+
+    ASSERT_TRUE(init->NeedSnapshotsInFirstStageMount());
+    ASSERT_TRUE(init->CreateLogicalAndSnapshotPartitions("super", snapshot_timeout_));
+
+    // Initiate the merge and wait for it to be completed.
+    ASSERT_TRUE(init->InitiateMerge());
+    ASSERT_EQ(UpdateState::MergeFailed, init->ProcessUpdateState());
+
+    // Simulate a reboot that tries the merge again, with the non-failing dm.
+    ASSERT_TRUE(UnmapAll());
+    init = NewManagerForFirstStageMount("_b");
+    ASSERT_NE(init, nullptr);
+    ASSERT_TRUE(init->CreateLogicalAndSnapshotPartitions("super", snapshot_timeout_));
+    ASSERT_EQ(UpdateState::MergeCompleted, init->ProcessUpdateState());
+}
+
 class FlashAfterUpdateTest : public SnapshotUpdateTest,
                              public WithParamInterface<std::tuple<uint32_t, bool>> {
   public:
@@ -2265,11 +2308,6 @@
 };
 
 TEST_P(FlashAfterUpdateTest, FlashSlotAfterUpdate) {
-    // OTA client blindly unmaps all partitions that are possibly mapped.
-    for (const auto& name : {"sys_b", "vnd_b", "prd_b"}) {
-        ASSERT_TRUE(sm->UnmapUpdateSnapshot(name));
-    }
-
     // Execute the update.
     ASSERT_TRUE(sm->BeginUpdate());
     ASSERT_TRUE(sm->CreateUpdateSnapshots(manifest_));
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index 6bd5323..837f33a 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -57,10 +57,16 @@
     ],
     srcs: [
         "snapuserd_server.cpp",
-        "snapuserd.cpp",
+        "dm-snapshot-merge/snapuserd.cpp",
+        "dm-snapshot-merge/snapuserd_worker.cpp",
+        "dm-snapshot-merge/snapuserd_readahead.cpp",
         "snapuserd_daemon.cpp",
-        "snapuserd_worker.cpp",
-        "snapuserd_readahead.cpp",
+	"snapuserd_buffer.cpp",
+	"user-space-merge/snapuserd_core.cpp",
+	"user-space-merge/snapuserd_dm_user.cpp",
+	"user-space-merge/snapuserd_merge.cpp",
+	"user-space-merge/snapuserd_readahead.cpp",
+	"user-space-merge/snapuserd_transitions.cpp",
     ],
 
     cflags: [
@@ -101,9 +107,10 @@
         "fs_mgr_defaults",
     ],
     srcs: [
-        "cow_snapuserd_test.cpp",
-        "snapuserd.cpp",
-        "snapuserd_worker.cpp",
+        "dm-snapshot-merge/cow_snapuserd_test.cpp",
+        "dm-snapshot-merge/snapuserd.cpp",
+        "dm-snapshot-merge/snapuserd_worker.cpp",
+	"snapuserd_buffer.cpp",
     ],
     cflags: [
         "-Wall",
diff --git a/fs_mgr/libsnapshot/snapuserd/cow_snapuserd_test.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/cow_snapuserd_test.cpp
similarity index 99%
rename from fs_mgr/libsnapshot/snapuserd/cow_snapuserd_test.cpp
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/cow_snapuserd_test.cpp
index bff0a50..b86a802 100644
--- a/fs_mgr/libsnapshot/snapuserd/cow_snapuserd_test.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/cow_snapuserd_test.cpp
@@ -33,6 +33,7 @@
 #include <libdm/dm.h>
 #include <libdm/loop_control.h>
 #include <libsnapshot/cow_writer.h>
+#include <snapuserd/snapuserd_buffer.h>
 #include <snapuserd/snapuserd_client.h>
 #include <storage_literals/storage_literals.h>
 
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd.cpp
similarity index 100%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd.cpp
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd.cpp
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd.h b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd.h
similarity index 91%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd.h
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd.h
index 6388a83..47b9b22 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd.h
+++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd.h
@@ -42,6 +42,7 @@
 #include <libdm/dm.h>
 #include <libsnapshot/cow_reader.h>
 #include <libsnapshot/cow_writer.h>
+#include <snapuserd/snapuserd_buffer.h>
 #include <snapuserd/snapuserd_kernel.h>
 
 namespace android {
@@ -89,39 +90,6 @@
     READ_AHEAD_FAILURE,
 };
 
-class BufferSink : public IByteSink {
-  public:
-    void Initialize(size_t size);
-    void* GetBufPtr() { return buffer_.get(); }
-    void Clear() { memset(GetBufPtr(), 0, buffer_size_); }
-    void* GetPayloadBuffer(size_t size);
-    void* GetBuffer(size_t requested, size_t* actual) override;
-    void UpdateBufferOffset(size_t size) { buffer_offset_ += size; }
-    struct dm_user_header* GetHeaderPtr();
-    bool ReturnData(void*, size_t) override { return true; }
-    void ResetBufferOffset() { buffer_offset_ = 0; }
-    void* GetPayloadBufPtr();
-
-  private:
-    std::unique_ptr<uint8_t[]> buffer_;
-    loff_t buffer_offset_;
-    size_t buffer_size_;
-};
-
-class XorSink : public IByteSink {
-  public:
-    void Initialize(BufferSink* sink, size_t size);
-    void Reset();
-    void* GetBuffer(size_t requested, size_t* actual) override;
-    bool ReturnData(void* buffer, size_t len) override;
-
-  private:
-    BufferSink* bufsink_;
-    std::unique_ptr<uint8_t[]> buffer_;
-    size_t buffer_size_;
-    size_t returned_;
-};
-
 class Snapuserd;
 
 class ReadAheadThread {
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_readahead.cpp
similarity index 100%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd_readahead.cpp
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_readahead.cpp
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_worker.cpp b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_worker.cpp
similarity index 93%
rename from fs_mgr/libsnapshot/snapuserd/snapuserd_worker.cpp
rename to fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_worker.cpp
index 5d184ad..0e9f0f1 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_worker.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/dm-snapshot-merge/snapuserd_worker.cpp
@@ -32,78 +32,6 @@
 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
 
-void BufferSink::Initialize(size_t size) {
-    buffer_size_ = size;
-    buffer_offset_ = 0;
-    buffer_ = std::make_unique<uint8_t[]>(size);
-}
-
-void* BufferSink::GetPayloadBuffer(size_t size) {
-    if ((buffer_size_ - buffer_offset_) < size) return nullptr;
-
-    char* buffer = reinterpret_cast<char*>(GetBufPtr());
-    struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
-    return (char*)msg->payload.buf + buffer_offset_;
-}
-
-void* BufferSink::GetBuffer(size_t requested, size_t* actual) {
-    void* buf = GetPayloadBuffer(requested);
-    if (!buf) {
-        *actual = 0;
-        return nullptr;
-    }
-    *actual = requested;
-    return buf;
-}
-
-struct dm_user_header* BufferSink::GetHeaderPtr() {
-    if (!(sizeof(struct dm_user_header) <= buffer_size_)) {
-        return nullptr;
-    }
-    char* buf = reinterpret_cast<char*>(GetBufPtr());
-    struct dm_user_header* header = (struct dm_user_header*)(&(buf[0]));
-    return header;
-}
-
-void* BufferSink::GetPayloadBufPtr() {
-    char* buffer = reinterpret_cast<char*>(GetBufPtr());
-    struct dm_user_message* msg = reinterpret_cast<struct dm_user_message*>(&(buffer[0]));
-    return msg->payload.buf;
-}
-
-void XorSink::Initialize(BufferSink* sink, size_t size) {
-    bufsink_ = sink;
-    buffer_size_ = size;
-    returned_ = 0;
-    buffer_ = std::make_unique<uint8_t[]>(size);
-}
-
-void XorSink::Reset() {
-    returned_ = 0;
-}
-
-void* XorSink::GetBuffer(size_t requested, size_t* actual) {
-    if (requested > buffer_size_) {
-        *actual = buffer_size_;
-    } else {
-        *actual = requested;
-    }
-    return buffer_.get();
-}
-
-bool XorSink::ReturnData(void* buffer, size_t len) {
-    uint8_t* xor_data = reinterpret_cast<uint8_t*>(buffer);
-    uint8_t* buff = reinterpret_cast<uint8_t*>(bufsink_->GetPayloadBuffer(len + returned_));
-    if (buff == nullptr) {
-        return false;
-    }
-    for (size_t i = 0; i < len; i++) {
-        buff[returned_ + i] ^= xor_data[i];
-    }
-    returned_ += len;
-    return true;
-}
-
 WorkerThread::WorkerThread(const std::string& cow_device, const std::string& backing_device,
                            const std::string& control_device, const std::string& misc_name,
                            std::shared_ptr<Snapuserd> snapuserd) {
diff --git a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_buffer.h b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_buffer.h
new file mode 100644
index 0000000..2e4cac6
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/snapuserd_buffer.h
@@ -0,0 +1,62 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <linux/types.h>
+#include <stdint.h>
+#include <stdlib.h>
+
+#include <iostream>
+
+#include <libsnapshot/cow_reader.h>
+
+namespace android {
+namespace snapshot {
+
+class BufferSink : public IByteSink {
+  public:
+    void Initialize(size_t size);
+    void* GetBufPtr() { return buffer_.get(); }
+    void Clear() { memset(GetBufPtr(), 0, buffer_size_); }
+    void* GetPayloadBuffer(size_t size);
+    void* GetBuffer(size_t requested, size_t* actual) override;
+    void UpdateBufferOffset(size_t size) { buffer_offset_ += size; }
+    struct dm_user_header* GetHeaderPtr();
+    bool ReturnData(void*, size_t) override { return true; }
+    void ResetBufferOffset() { buffer_offset_ = 0; }
+    void* GetPayloadBufPtr();
+
+  private:
+    std::unique_ptr<uint8_t[]> buffer_;
+    loff_t buffer_offset_;
+    size_t buffer_size_;
+};
+
+class XorSink : public IByteSink {
+  public:
+    void Initialize(BufferSink* sink, size_t size);
+    void Reset();
+    void* GetBuffer(size_t requested, size_t* actual) override;
+    bool ReturnData(void* buffer, size_t len) override;
+
+  private:
+    BufferSink* bufsink_;
+    std::unique_ptr<uint8_t[]> buffer_;
+    size_t buffer_size_;
+    size_t returned_;
+};
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_buffer.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_buffer.cpp
new file mode 100644
index 0000000..ab763ab
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_buffer.cpp
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <snapuserd/snapuserd_buffer.h>
+#include <snapuserd/snapuserd_kernel.h>
+
+namespace android {
+namespace snapshot {
+
+void BufferSink::Initialize(size_t size) {
+    buffer_size_ = size;
+    buffer_offset_ = 0;
+    buffer_ = std::make_unique<uint8_t[]>(size);
+}
+
+void* BufferSink::GetPayloadBuffer(size_t size) {
+    if ((buffer_size_ - buffer_offset_) < size) return nullptr;
+
+    char* buffer = reinterpret_cast<char*>(GetBufPtr());
+    struct dm_user_message* msg = (struct dm_user_message*)(&(buffer[0]));
+    return (char*)msg->payload.buf + buffer_offset_;
+}
+
+void* BufferSink::GetBuffer(size_t requested, size_t* actual) {
+    void* buf = GetPayloadBuffer(requested);
+    if (!buf) {
+        *actual = 0;
+        return nullptr;
+    }
+    *actual = requested;
+    return buf;
+}
+
+struct dm_user_header* BufferSink::GetHeaderPtr() {
+    if (!(sizeof(struct dm_user_header) <= buffer_size_)) {
+        return nullptr;
+    }
+    char* buf = reinterpret_cast<char*>(GetBufPtr());
+    struct dm_user_header* header = (struct dm_user_header*)(&(buf[0]));
+    return header;
+}
+
+void* BufferSink::GetPayloadBufPtr() {
+    char* buffer = reinterpret_cast<char*>(GetBufPtr());
+    struct dm_user_message* msg = reinterpret_cast<struct dm_user_message*>(&(buffer[0]));
+    return msg->payload.buf;
+}
+
+void XorSink::Initialize(BufferSink* sink, size_t size) {
+    bufsink_ = sink;
+    buffer_size_ = size;
+    returned_ = 0;
+    buffer_ = std::make_unique<uint8_t[]>(size);
+}
+
+void XorSink::Reset() {
+    returned_ = 0;
+}
+
+void* XorSink::GetBuffer(size_t requested, size_t* actual) {
+    if (requested > buffer_size_) {
+        *actual = buffer_size_;
+    } else {
+        *actual = requested;
+    }
+    return buffer_.get();
+}
+
+bool XorSink::ReturnData(void* buffer, size_t len) {
+    uint8_t* xor_data = reinterpret_cast<uint8_t*>(buffer);
+    uint8_t* buff = reinterpret_cast<uint8_t*>(bufsink_->GetPayloadBuffer(len + returned_));
+    if (buff == nullptr) {
+        return false;
+    }
+    for (size_t i = 0; i < len; i++) {
+        buff[returned_ + i] ^= xor_data[i];
+    }
+    returned_ += len;
+    return true;
+}
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
index 2f87557..91b4190 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_server.cpp
@@ -31,7 +31,6 @@
 #include <android-base/scopeguard.h>
 #include <fs_mgr/file_wait.h>
 #include <snapuserd/snapuserd_client.h>
-#include "snapuserd.h"
 #include "snapuserd_server.h"
 
 #define _REALLY_INCLUDE_SYS__SYSTEM_PROPERTIES_H_
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
index 3b6ff15..14e5de6 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_server.h
@@ -28,7 +28,7 @@
 #include <vector>
 
 #include <android-base/unique_fd.h>
-#include "snapuserd.h"
+#include "dm-snapshot-merge/snapuserd.h"
 
 namespace android {
 namespace snapshot {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
new file mode 100644
index 0000000..a2538d2
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -0,0 +1,464 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "snapuserd_core.h"
+
+#include <android-base/strings.h>
+
+namespace android {
+namespace snapshot {
+
+using namespace android;
+using namespace android::dm;
+using android::base::unique_fd;
+
+SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,
+                                 std::string backing_device, std::string base_path_merge) {
+    misc_name_ = std::move(misc_name);
+    cow_device_ = std::move(cow_device);
+    backing_store_device_ = std::move(backing_device);
+    control_device_ = "/dev/dm-user/" + misc_name_;
+    base_path_merge_ = std::move(base_path_merge);
+}
+
+bool SnapshotHandler::InitializeWorkers() {
+    for (int i = 0; i < NUM_THREADS_PER_PARTITION; i++) {
+        std::unique_ptr<Worker> wt =
+                std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
+                                         misc_name_, base_path_merge_, GetSharedPtr());
+        if (!wt->Init()) {
+            SNAP_LOG(ERROR) << "Thread initialization failed";
+            return false;
+        }
+
+        worker_threads_.push_back(std::move(wt));
+    }
+
+    merge_thread_ = std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
+                                             misc_name_, base_path_merge_, GetSharedPtr());
+
+    read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
+                                                     GetSharedPtr());
+    return true;
+}
+
+std::unique_ptr<CowReader> SnapshotHandler::CloneReaderForWorker() {
+    return reader_->CloneCowReader();
+}
+
+bool SnapshotHandler::CommitMerge(int num_merge_ops) {
+    struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
+    ch->num_merge_ops += num_merge_ops;
+
+    if (scratch_space_) {
+        if (ra_thread_) {
+            struct BufferState* ra_state = GetBufferState();
+            ra_state->read_ahead_state = kCowReadAheadInProgress;
+        }
+
+        int ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
+        if (ret < 0) {
+            SNAP_PLOG(ERROR) << "msync header failed: " << ret;
+            return false;
+        }
+    } else {
+        reader_->UpdateMergeOpsCompleted(num_merge_ops);
+        CowHeader header;
+        reader_->GetHeader(&header);
+
+        if (lseek(cow_fd_.get(), 0, SEEK_SET) < 0) {
+            SNAP_PLOG(ERROR) << "lseek failed";
+            return false;
+        }
+
+        if (!android::base::WriteFully(cow_fd_, &header, sizeof(CowHeader))) {
+            SNAP_PLOG(ERROR) << "Write to header failed";
+            return false;
+        }
+
+        if (fsync(cow_fd_.get()) < 0) {
+            SNAP_PLOG(ERROR) << "fsync failed";
+            return false;
+        }
+    }
+
+    return true;
+}
+
+void SnapshotHandler::PrepareReadAhead() {
+    struct BufferState* ra_state = GetBufferState();
+    // Check if the data has to be re-constructed from COW device
+    if (ra_state->read_ahead_state == kCowReadAheadDone) {
+        populate_data_from_cow_ = true;
+    } else {
+        populate_data_from_cow_ = false;
+    }
+
+    NotifyRAForMergeReady();
+}
+
+void SnapshotHandler::CheckMergeCompletionStatus() {
+    if (!merge_initiated_) {
+        SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: "
+                       << reader_->get_num_total_data_ops();
+        return;
+    }
+
+    struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
+
+    SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops
+                   << " Total-data-ops: " << reader_->get_num_total_data_ops();
+}
+
+bool SnapshotHandler::ReadMetadata() {
+    reader_ = std::make_unique<CowReader>();
+    CowHeader header;
+    CowOptions options;
+
+    SNAP_LOG(DEBUG) << "ReadMetadata: Parsing cow file";
+
+    if (!reader_->Parse(cow_fd_)) {
+        SNAP_LOG(ERROR) << "Failed to parse";
+        return false;
+    }
+
+    if (!reader_->GetHeader(&header)) {
+        SNAP_LOG(ERROR) << "Failed to get header";
+        return false;
+    }
+
+    if (!(header.block_size == BLOCK_SZ)) {
+        SNAP_LOG(ERROR) << "Invalid header block size found: " << header.block_size;
+        return false;
+    }
+
+    SNAP_LOG(INFO) << "Merge-ops: " << header.num_merge_ops;
+
+    if (!MmapMetadata()) {
+        SNAP_LOG(ERROR) << "mmap failed";
+        return false;
+    }
+
+    // Initialize the iterator for reading metadata
+    std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
+
+    while (!cowop_iter->Done()) {
+        const CowOperation* cow_op = &cowop_iter->Get();
+
+        chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op));
+
+        if (!ra_thread_ && IsOrderedOp(*cow_op)) {
+            ra_thread_ = true;
+        }
+        cowop_iter->Next();
+    }
+
+    chunk_vec_.shrink_to_fit();
+
+    // Sort the vector based on sectors as we need this during un-aligned access
+    std::sort(chunk_vec_.begin(), chunk_vec_.end(), compare);
+
+    PrepareReadAhead();
+
+    return true;
+}
+
+bool SnapshotHandler::MmapMetadata() {
+    CowHeader header;
+    reader_->GetHeader(&header);
+
+    total_mapped_addr_length_ = header.header_size + BUFFER_REGION_DEFAULT_SIZE;
+
+    if (header.major_version >= 2 && header.buffer_size > 0) {
+        scratch_space_ = true;
+    }
+
+    if (scratch_space_) {
+        mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE, MAP_SHARED,
+                            cow_fd_.get(), 0);
+    } else {
+        mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE,
+                            MAP_SHARED | MAP_ANONYMOUS, -1, 0);
+        struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
+        ch->num_merge_ops = header.num_merge_ops;
+    }
+
+    if (mapped_addr_ == MAP_FAILED) {
+        SNAP_LOG(ERROR) << "mmap metadata failed";
+        return false;
+    }
+
+    return true;
+}
+
+void SnapshotHandler::UnmapBufferRegion() {
+    int ret = munmap(mapped_addr_, total_mapped_addr_length_);
+    if (ret < 0) {
+        SNAP_PLOG(ERROR) << "munmap failed";
+    }
+}
+
+bool SnapshotHandler::InitCowDevice() {
+    cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
+    if (cow_fd_ < 0) {
+        SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
+        return false;
+    }
+
+    unique_fd fd(TEMP_FAILURE_RETRY(open(base_path_merge_.c_str(), O_RDONLY | O_CLOEXEC)));
+    if (fd < 0) {
+        SNAP_LOG(ERROR) << "Cannot open block device";
+        return false;
+    }
+
+    uint64_t dev_sz = get_block_device_size(fd.get());
+    if (!dev_sz) {
+        SNAP_LOG(ERROR) << "Failed to find block device size: " << base_path_merge_;
+        return false;
+    }
+
+    num_sectors_ = dev_sz >> SECTOR_SHIFT;
+
+    return ReadMetadata();
+}
+
+void SnapshotHandler::ReadBlocksToCache(const std::string& dm_block_device,
+                                        const std::string& partition_name, off_t offset,
+                                        size_t size) {
+    android::base::unique_fd fd(TEMP_FAILURE_RETRY(open(dm_block_device.c_str(), O_RDONLY)));
+    if (fd.get() == -1) {
+        SNAP_PLOG(ERROR) << "Error reading " << dm_block_device
+                         << " partition-name: " << partition_name;
+        return;
+    }
+
+    size_t remain = size;
+    off_t file_offset = offset;
+    // We pick 4M I/O size based on the fact that the current
+    // update_verifier has a similar I/O size.
+    size_t read_sz = 1024 * BLOCK_SZ;
+    std::vector<uint8_t> buf(read_sz);
+
+    while (remain > 0) {
+        size_t to_read = std::min(remain, read_sz);
+
+        if (!android::base::ReadFullyAtOffset(fd.get(), buf.data(), to_read, file_offset)) {
+            SNAP_PLOG(ERROR) << "Failed to read block from block device: " << dm_block_device
+                             << " at offset: " << file_offset
+                             << " partition-name: " << partition_name << " total-size: " << size
+                             << " remain_size: " << remain;
+            return;
+        }
+
+        file_offset += to_read;
+        remain -= to_read;
+    }
+
+    SNAP_LOG(INFO) << "Finished reading block-device: " << dm_block_device
+                   << " partition: " << partition_name << " size: " << size
+                   << " offset: " << offset;
+}
+
+void SnapshotHandler::ReadBlocks(const std::string partition_name,
+                                 const std::string& dm_block_device) {
+    SNAP_LOG(DEBUG) << "Reading partition: " << partition_name
+                    << " Block-Device: " << dm_block_device;
+
+    uint64_t dev_sz = 0;
+
+    unique_fd fd(TEMP_FAILURE_RETRY(open(dm_block_device.c_str(), O_RDONLY | O_CLOEXEC)));
+    if (fd < 0) {
+        SNAP_LOG(ERROR) << "Cannot open block device";
+        return;
+    }
+
+    dev_sz = get_block_device_size(fd.get());
+    if (!dev_sz) {
+        SNAP_PLOG(ERROR) << "Could not determine block device size: " << dm_block_device;
+        return;
+    }
+
+    int num_threads = 2;
+    size_t num_blocks = dev_sz >> BLOCK_SHIFT;
+    size_t num_blocks_per_thread = num_blocks / num_threads;
+    size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT;
+    off_t offset = 0;
+
+    for (int i = 0; i < num_threads; i++) {
+        std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device,
+                   partition_name, offset, read_sz_per_thread);
+
+        offset += read_sz_per_thread;
+    }
+}
+
+/*
+ * Entry point to launch threads
+ */
+bool SnapshotHandler::Start() {
+    std::vector<std::future<bool>> threads;
+    std::future<bool> ra_thread_status;
+
+    if (ra_thread_) {
+        ra_thread_status =
+                std::async(std::launch::async, &ReadAhead::RunThread, read_ahead_thread_.get());
+
+        SNAP_LOG(INFO) << "Read-ahead thread started...";
+    }
+
+    // Launch worker threads
+    for (int i = 0; i < worker_threads_.size(); i++) {
+        threads.emplace_back(
+                std::async(std::launch::async, &Worker::RunThread, worker_threads_[i].get()));
+    }
+
+    bool second_stage_init = true;
+
+    // We don't want to read the blocks during first stage init.
+    if (android::base::EndsWith(misc_name_, "-init") || is_socket_present_) {
+        second_stage_init = false;
+    }
+
+    if (second_stage_init) {
+        SNAP_LOG(INFO) << "Reading blocks to cache....";
+        auto& dm = DeviceMapper::Instance();
+        auto dm_block_devices = dm.FindDmPartitions();
+        if (dm_block_devices.empty()) {
+            SNAP_LOG(ERROR) << "No dm-enabled block device is found.";
+        } else {
+            auto parts = android::base::Split(misc_name_, "-");
+            std::string partition_name = parts[0];
+
+            const char* suffix_b = "_b";
+            const char* suffix_a = "_a";
+
+            partition_name.erase(partition_name.find_last_not_of(suffix_b) + 1);
+            partition_name.erase(partition_name.find_last_not_of(suffix_a) + 1);
+
+            if (dm_block_devices.find(partition_name) == dm_block_devices.end()) {
+                SNAP_LOG(ERROR) << "Failed to find dm block device for " << partition_name;
+            } else {
+                ReadBlocks(partition_name, dm_block_devices.at(partition_name));
+            }
+        }
+    } else {
+        SNAP_LOG(INFO) << "Not reading block device into cache";
+    }
+
+    std::future<bool> merge_thread =
+            std::async(std::launch::async, &Worker::RunMergeThread, merge_thread_.get());
+
+    bool ret = true;
+    for (auto& t : threads) {
+        ret = t.get() && ret;
+    }
+
+    // Worker threads are terminated by this point - this can only happen:
+    //
+    // 1: If dm-user device is destroyed
+    // 2: We had an I/O failure when reading root partitions
+    //
+    // In case (1), this would be a graceful shutdown. In this case, merge
+    // thread and RA thread should have already terminated by this point. We will be
+    // destroying the dm-user device only _after_ merge is completed.
+    //
+    // In case (2), if merge thread had started, then it will be
+    // continuing to merge; however, since we had an I/O failure and the
+    // I/O on root partitions are no longer served, we will terminate the
+    // merge
+
+    NotifyIOTerminated();
+
+    bool read_ahead_retval = false;
+
+    SNAP_LOG(INFO) << "Snapshot I/O terminated. Waiting for merge thread....";
+    bool merge_thread_status = merge_thread.get();
+
+    if (ra_thread_) {
+        read_ahead_retval = ra_thread_status.get();
+    }
+
+    SNAP_LOG(INFO) << "Worker threads terminated with ret: " << ret
+                   << " Merge-thread with ret: " << merge_thread_status
+                   << " RA-thread with ret: " << read_ahead_retval;
+    return ret;
+}
+
+uint64_t SnapshotHandler::GetBufferMetadataOffset() {
+    CowHeader header;
+    reader_->GetHeader(&header);
+
+    return (header.header_size + sizeof(BufferState));
+}
+
+/*
+ * Metadata for read-ahead is 16 bytes. For a 2 MB region, we will
+ * end up with 8k (2 PAGE) worth of metadata. Thus, a 2MB buffer
+ * region is split into:
+ *
+ * 1: 8k metadata
+ * 2: Scratch space
+ *
+ */
+size_t SnapshotHandler::GetBufferMetadataSize() {
+    CowHeader header;
+    reader_->GetHeader(&header);
+    size_t buffer_size = header.buffer_size;
+
+    // If there is no scratch space, then just use the
+    // anonymous memory
+    if (buffer_size == 0) {
+        buffer_size = BUFFER_REGION_DEFAULT_SIZE;
+    }
+
+    return ((buffer_size * sizeof(struct ScratchMetadata)) / BLOCK_SZ);
+}
+
+size_t SnapshotHandler::GetBufferDataOffset() {
+    CowHeader header;
+    reader_->GetHeader(&header);
+
+    return (header.header_size + GetBufferMetadataSize());
+}
+
+/*
+ * (2MB - 8K = 2088960 bytes) will be the buffer region to hold the data.
+ */
+size_t SnapshotHandler::GetBufferDataSize() {
+    CowHeader header;
+    reader_->GetHeader(&header);
+    size_t buffer_size = header.buffer_size;
+
+    // If there is no scratch space, then just use the
+    // anonymous memory
+    if (buffer_size == 0) {
+        buffer_size = BUFFER_REGION_DEFAULT_SIZE;
+    }
+
+    return (buffer_size - GetBufferMetadataSize());
+}
+
+struct BufferState* SnapshotHandler::GetBufferState() {
+    CowHeader header;
+    reader_->GetHeader(&header);
+
+    struct BufferState* ra_state =
+            reinterpret_cast<struct BufferState*>((char*)mapped_addr_ + header.header_size);
+    return ra_state;
+}
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
new file mode 100644
index 0000000..c171eda
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -0,0 +1,296 @@
+// Copyright (C) 2021 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <linux/types.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <sys/mman.h>
+
+#include <condition_variable>
+#include <cstring>
+#include <future>
+#include <iostream>
+#include <limits>
+#include <mutex>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include <android-base/file.h>
+#include <android-base/logging.h>
+#include <android-base/stringprintf.h>
+#include <android-base/unique_fd.h>
+#include <ext4_utils/ext4_utils.h>
+#include <libdm/dm.h>
+#include <libsnapshot/cow_reader.h>
+#include <libsnapshot/cow_writer.h>
+#include <snapuserd/snapuserd_buffer.h>
+#include <snapuserd/snapuserd_kernel.h>
+
+namespace android {
+namespace snapshot {
+
+using android::base::unique_fd;
+using namespace std::chrono_literals;
+
+static constexpr size_t PAYLOAD_SIZE = (1UL << 20);
+static_assert(PAYLOAD_SIZE >= BLOCK_SZ);
+
+static constexpr int NUM_THREADS_PER_PARTITION = 1;
+
+#define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
+#define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
+
+enum class MERGE_IO_TRANSITION {
+    MERGE_READY,
+    MERGE_BEGIN,
+    MERGE_FAILED,
+    MERGE_COMPLETE,
+    IO_TERMINATED,
+    READ_AHEAD_FAILURE,
+};
+
+class SnapshotHandler;
+
+class ReadAhead {
+  public:
+    ReadAhead(const std::string& cow_device, const std::string& backing_device,
+              const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd);
+    bool RunThread();
+
+  private:
+    void InitializeRAIter();
+    bool RAIterDone();
+    void RAIterNext();
+    const CowOperation* GetRAOpIter();
+
+    void InitializeBuffer();
+    bool InitReader();
+    bool InitializeFds();
+
+    void CloseFds() { backing_store_fd_ = {}; }
+
+    bool ReadAheadIOStart();
+    int PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops,
+                             std::vector<uint64_t>& blocks,
+                             std::vector<const CowOperation*>& xor_op_vec);
+    bool ReconstructDataFromCow();
+    void CheckOverlap(const CowOperation* cow_op);
+
+    void* read_ahead_buffer_;
+    void* metadata_buffer_;
+
+    std::unique_ptr<ICowOpIter> cowop_iter_;
+
+    std::string cow_device_;
+    std::string backing_store_device_;
+    std::string misc_name_;
+
+    unique_fd cow_fd_;
+    unique_fd backing_store_fd_;
+
+    std::shared_ptr<SnapshotHandler> snapuserd_;
+    std::unique_ptr<CowReader> reader_;
+
+    std::unordered_set<uint64_t> dest_blocks_;
+    std::unordered_set<uint64_t> source_blocks_;
+    bool overlap_;
+    BufferSink bufsink_;
+};
+
+class Worker {
+  public:
+    Worker(const std::string& cow_device, const std::string& backing_device,
+           const std::string& control_device, const std::string& misc_name,
+           const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
+    bool RunThread();
+    bool RunMergeThread();
+    bool Init();
+
+  private:
+    // Initialization
+    void InitializeBufsink();
+    bool InitializeFds();
+    bool InitReader();
+    void CloseFds() {
+        ctrl_fd_ = {};
+        backing_store_fd_ = {};
+        base_path_merge_fd_ = {};
+    }
+
+    // IO Path
+    bool ProcessIORequest();
+
+    // Processing COW operations
+    bool ProcessReplaceOp(const CowOperation* cow_op);
+    bool ProcessZeroOp();
+
+    // Handles Copy and Xor
+    bool ProcessCopyOp(const CowOperation* cow_op);
+    bool ProcessXorOp(const CowOperation* cow_op);
+
+    // Merge related ops
+    bool Merge();
+    bool MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
+    bool MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
+    int PrepareMerge(uint64_t* source_offset, int* pending_ops,
+                     const std::unique_ptr<ICowOpIter>& cowop_iter,
+                     std::vector<const CowOperation*>* replace_zero_vec = nullptr);
+
+    std::unique_ptr<CowReader> reader_;
+    BufferSink bufsink_;
+    XorSink xorsink_;
+
+    std::string cow_device_;
+    std::string backing_store_device_;
+    std::string control_device_;
+    std::string misc_name_;
+    std::string base_path_merge_;
+
+    unique_fd cow_fd_;
+    unique_fd backing_store_fd_;
+    unique_fd base_path_merge_fd_;
+    unique_fd ctrl_fd_;
+
+    std::shared_ptr<SnapshotHandler> snapuserd_;
+};
+
+class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
+  public:
+    SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device,
+                    std::string base_path_merge);
+    bool InitCowDevice();
+    bool Start();
+
+    const std::string& GetControlDevicePath() { return control_device_; }
+    const std::string& GetMiscName() { return misc_name_; }
+    const uint64_t& GetNumSectors() { return num_sectors_; }
+    const bool& IsAttached() const { return attached_; }
+    void AttachControlDevice() { attached_ = true; }
+
+    void CheckMergeCompletionStatus();
+    bool CommitMerge(int num_merge_ops);
+
+    void CloseFds() { cow_fd_ = {}; }
+    void FreeResources() {
+        worker_threads_.clear();
+        read_ahead_thread_ = nullptr;
+        merge_thread_ = nullptr;
+    }
+
+    bool InitializeWorkers();
+    std::unique_ptr<CowReader> CloneReaderForWorker();
+    std::shared_ptr<SnapshotHandler> GetSharedPtr() { return shared_from_this(); }
+
+    std::vector<std::pair<sector_t, const CowOperation*>>& GetChunkVec() { return chunk_vec_; }
+
+    static bool compare(std::pair<sector_t, const CowOperation*> p1,
+                        std::pair<sector_t, const CowOperation*> p2) {
+        return p1.first < p2.first;
+    }
+
+    void UnmapBufferRegion();
+    bool MmapMetadata();
+
+    // Read-ahead related functions
+    void* GetMappedAddr() { return mapped_addr_; }
+    void PrepareReadAhead();
+
+    // State transitions for merge
+    void InitiateMerge();
+    void WaitForMergeComplete();
+    bool WaitForMergeBegin();
+    void NotifyRAForMergeReady();
+    bool WaitForMergeReady();
+    void MergeFailed();
+    bool IsIOTerminated();
+    void MergeCompleted();
+    void NotifyIOTerminated();
+    bool ReadAheadIOCompleted(bool sync);
+    void ReadAheadIOFailed();
+
+    bool ShouldReconstructDataFromCow() { return populate_data_from_cow_; }
+    void FinishReconstructDataFromCow() { populate_data_from_cow_ = false; }
+
+    // RA related functions
+    uint64_t GetBufferMetadataOffset();
+    size_t GetBufferMetadataSize();
+    size_t GetBufferDataOffset();
+    size_t GetBufferDataSize();
+
+    // Total number of blocks to be merged in a given read-ahead buffer region
+    void SetMergedBlockCountForNextCommit(int x) { total_ra_blocks_merged_ = x; }
+    int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; }
+    void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
+    bool MergeInitiated() { return merge_initiated_; }
+
+  private:
+    bool ReadMetadata();
+    sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
+    chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
+    struct BufferState* GetBufferState();
+
+    void ReadBlocks(const std::string partition_name, const std::string& dm_block_device);
+    void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name,
+                           off_t offset, size_t size);
+
+    // COW device
+    std::string cow_device_;
+    // Source device
+    std::string backing_store_device_;
+    // dm-user control device
+    std::string control_device_;
+    std::string misc_name_;
+    // Base device for merging
+    std::string base_path_merge_;
+
+    unique_fd cow_fd_;
+
+    // Number of sectors required when initializing dm-user
+    uint64_t num_sectors_;
+
+    std::unique_ptr<CowReader> reader_;
+
+    // chunk_vec stores the pseudo mapping of sector
+    // to COW operations.
+    std::vector<std::pair<sector_t, const CowOperation*>> chunk_vec_;
+
+    std::mutex lock_;
+    std::condition_variable cv;
+
+    void* mapped_addr_;
+    size_t total_mapped_addr_length_;
+
+    std::vector<std::unique_ptr<Worker>> worker_threads_;
+    // Read-ahead related
+    bool populate_data_from_cow_ = false;
+    bool ra_thread_ = false;
+    int total_ra_blocks_merged_ = 0;
+    MERGE_IO_TRANSITION io_state_;
+    std::unique_ptr<ReadAhead> read_ahead_thread_;
+
+    std::unique_ptr<Worker> merge_thread_;
+
+    bool merge_initiated_ = false;
+    bool attached_ = false;
+    bool is_socket_present_;
+    bool scratch_space_ = false;
+};
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
new file mode 100644
index 0000000..18c7f2c
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
@@ -0,0 +1,152 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "snapuserd_core.h"
+
+namespace android {
+namespace snapshot {
+
+using namespace android;
+using namespace android::dm;
+using android::base::unique_fd;
+
+Worker::Worker(const std::string& cow_device, const std::string& backing_device,
+               const std::string& control_device, const std::string& misc_name,
+               const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd) {
+    cow_device_ = cow_device;
+    backing_store_device_ = backing_device;
+    control_device_ = control_device;
+    misc_name_ = misc_name;
+    base_path_merge_ = base_path_merge;
+    snapuserd_ = snapuserd;
+}
+
+bool Worker::InitializeFds() {
+    backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
+    if (backing_store_fd_ < 0) {
+        SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
+        return false;
+    }
+
+    cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
+    if (cow_fd_ < 0) {
+        SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
+        return false;
+    }
+
+    ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
+    if (ctrl_fd_ < 0) {
+        SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
+        return false;
+    }
+
+    // Base device used by merge thread
+    base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR));
+    if (base_path_merge_fd_ < 0) {
+        SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_;
+        return false;
+    }
+
+    return true;
+}
+
+bool Worker::InitReader() {
+    reader_ = snapuserd_->CloneReaderForWorker();
+
+    if (!reader_->InitForMerge(std::move(cow_fd_))) {
+        return false;
+    }
+    return true;
+}
+
+// Start the replace operation. This will read the
+// internal COW format and if the block is compressed,
+// it will be de-compressed.
+bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
+    if (!reader_->ReadData(*cow_op, &bufsink_)) {
+        SNAP_LOG(ERROR) << "ProcessReplaceOp failed for block " << cow_op->new_block;
+        return false;
+    }
+
+    return true;
+}
+
+bool Worker::ProcessZeroOp() {
+    // Zero out the entire block
+    void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+    if (buffer == nullptr) {
+        SNAP_LOG(ERROR) << "ProcessZeroOp: Failed to get payload buffer";
+        return false;
+    }
+
+    memset(buffer, 0, BLOCK_SZ);
+    return true;
+}
+
+bool Worker::ProcessCopyOp(const CowOperation*) {
+    return true;
+}
+
+bool Worker::ProcessXorOp(const CowOperation*) {
+    return true;
+}
+
+void Worker::InitializeBufsink() {
+    // Allocate the buffer which is used to communicate between
+    // daemon and dm-user. The buffer comprises of header and a fixed payload.
+    // If the dm-user requests a big IO, the IO will be broken into chunks
+    // of PAYLOAD_SIZE.
+    size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_SIZE;
+    bufsink_.Initialize(buf_size);
+}
+
+bool Worker::Init() {
+    InitializeBufsink();
+    xorsink_.Initialize(&bufsink_, BLOCK_SZ);
+
+    if (!InitializeFds()) {
+        return false;
+    }
+
+    if (!InitReader()) {
+        return false;
+    }
+
+    return true;
+}
+
+bool Worker::RunThread() {
+    SNAP_LOG(DEBUG) << "Processing snapshot I/O requests...";
+    // Start serving IO
+    while (true) {
+        if (!ProcessIORequest()) {
+            break;
+        }
+    }
+
+    CloseFds();
+    reader_->CloseCowFd();
+
+    return true;
+}
+
+bool Worker::ProcessIORequest() {
+    // No communication with dm-user yet
+    return true;
+}
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
new file mode 100644
index 0000000..696ede7
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
@@ -0,0 +1,300 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "snapuserd_core.h"
+
+namespace android {
+namespace snapshot {
+
+using namespace android;
+using namespace android::dm;
+using android::base::unique_fd;
+
+int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
+                         const std::unique_ptr<ICowOpIter>& cowop_iter,
+                         std::vector<const CowOperation*>* replace_zero_vec) {
+    int num_ops = *pending_ops;
+    int nr_consecutive = 0;
+    bool checkOrderedOp = (replace_zero_vec == nullptr);
+
+    do {
+        if (!cowop_iter->Done() && num_ops) {
+            const CowOperation* cow_op = &cowop_iter->Get();
+            if (checkOrderedOp && !IsOrderedOp(*cow_op)) {
+                break;
+            }
+
+            *source_offset = cow_op->new_block * BLOCK_SZ;
+            if (!checkOrderedOp) {
+                replace_zero_vec->push_back(cow_op);
+            }
+
+            cowop_iter->Next();
+            num_ops -= 1;
+            nr_consecutive = 1;
+
+            while (!cowop_iter->Done() && num_ops) {
+                const CowOperation* op = &cowop_iter->Get();
+                if (checkOrderedOp && !IsOrderedOp(*op)) {
+                    break;
+                }
+
+                // Check for consecutive blocks
+                uint64_t next_offset = op->new_block * BLOCK_SZ;
+                if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
+                    break;
+                }
+
+                if (!checkOrderedOp) {
+                    replace_zero_vec->push_back(op);
+                }
+
+                nr_consecutive += 1;
+                num_ops -= 1;
+                cowop_iter->Next();
+            }
+        }
+    } while (0);
+
+    return nr_consecutive;
+}
+
+bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
+    // Flush every 2048 ops. Since all ops are independent and there is no
+    // dependency between COW ops, we will flush the data and the number
+    // of ops merged in COW file for every 2048 ops. If there is a crash,
+    // we will end up replaying some of the COW ops which were already merged.
+    // That is ok.
+    //
+    // Why 2048 ops ? We can probably increase this to bigger value but just
+    // need to ensure that merge makes forward progress if there are
+    // crashes repeatedly which is highly unlikely.
+    int total_ops_merged_per_commit = (PAYLOAD_SIZE / BLOCK_SZ) * 8;
+    int num_ops_merged = 0;
+
+    while (!cowop_iter->Done()) {
+        int num_ops = PAYLOAD_SIZE / BLOCK_SZ;
+        std::vector<const CowOperation*> replace_zero_vec;
+        uint64_t source_offset;
+
+        int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter, &replace_zero_vec);
+        if (linear_blocks == 0) {
+            // Merge complete
+            CHECK(cowop_iter->Done());
+            break;
+        }
+
+        for (size_t i = 0; i < replace_zero_vec.size(); i++) {
+            const CowOperation* cow_op = replace_zero_vec[i];
+            if (cow_op->type == kCowReplaceOp) {
+                if (!ProcessReplaceOp(cow_op)) {
+                    SNAP_LOG(ERROR) << "Merge - ReplaceOp failed for block: " << cow_op->new_block;
+                    return false;
+                }
+            } else {
+                CHECK(cow_op->type == kCowZeroOp);
+                if (!ProcessZeroOp()) {
+                    SNAP_LOG(ERROR) << "Merge ZeroOp failed.";
+                    return false;
+                }
+            }
+
+            bufsink_.UpdateBufferOffset(BLOCK_SZ);
+        }
+
+        size_t io_size = linear_blocks * BLOCK_SZ;
+
+        // Merge - Write the contents back to base device
+        int ret = pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), io_size,
+                         source_offset);
+        if (ret < 0 || ret != io_size) {
+            SNAP_LOG(ERROR)
+                    << "Merge: ReplaceZeroOps: Failed to write to backing device while merging "
+                    << " at offset: " << source_offset << " io_size: " << io_size;
+            return false;
+        }
+
+        num_ops_merged += linear_blocks;
+
+        if (num_ops_merged == total_ops_merged_per_commit) {
+            // Flush the data
+            if (fsync(base_path_merge_fd_.get()) < 0) {
+                SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
+                return false;
+            }
+
+            // Track the merge completion
+            if (!snapuserd_->CommitMerge(num_ops_merged)) {
+                SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
+                return false;
+            }
+
+            num_ops_merged = 0;
+        }
+
+        bufsink_.ResetBufferOffset();
+
+        if (snapuserd_->IsIOTerminated()) {
+            SNAP_LOG(ERROR)
+                    << "MergeReplaceZeroOps: Worker threads terminated - shutting down merge";
+            return false;
+        }
+    }
+
+    // Any left over ops not flushed yet.
+    if (num_ops_merged) {
+        // Flush the data
+        if (fsync(base_path_merge_fd_.get()) < 0) {
+            SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
+            return false;
+        }
+
+        if (!snapuserd_->CommitMerge(num_ops_merged)) {
+            SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
+            return false;
+        }
+
+        num_ops_merged = 0;
+    }
+
+    return true;
+}
+
+bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
+    void* mapped_addr = snapuserd_->GetMappedAddr();
+    void* read_ahead_buffer =
+            static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
+
+    SNAP_LOG(INFO) << "MergeOrderedOps started....";
+
+    while (!cowop_iter->Done()) {
+        const CowOperation* cow_op = &cowop_iter->Get();
+        if (!IsOrderedOp(*cow_op)) {
+            break;
+        }
+
+        SNAP_LOG(DEBUG) << "Waiting for merge begin...";
+        // Wait for RA thread to notify that the merge window
+        // is ready for merging.
+        if (!snapuserd_->WaitForMergeBegin()) {
+            return false;
+        }
+
+        loff_t offset = 0;
+        int num_ops = snapuserd_->GetTotalBlocksToMerge();
+        SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
+        while (num_ops) {
+            uint64_t source_offset;
+
+            int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter);
+            if (linear_blocks == 0) {
+                break;
+            }
+
+            size_t io_size = (linear_blocks * BLOCK_SZ);
+            // Write to the base device. Data is already in the RA buffer. Note
+            // that XOR ops is already handled by the RA thread. We just write
+            // the contents out.
+            int ret = pwrite(base_path_merge_fd_.get(), (char*)read_ahead_buffer + offset, io_size,
+                             source_offset);
+            if (ret < 0 || ret != io_size) {
+                SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
+                                << " at offset: " << source_offset << " io_size: " << io_size;
+                return false;
+            }
+
+            offset += io_size;
+            num_ops -= linear_blocks;
+        }
+
+        // Verify all ops are merged
+        CHECK(num_ops == 0);
+
+        // Flush the data
+        if (fsync(base_path_merge_fd_.get()) < 0) {
+            SNAP_LOG(ERROR) << " Failed to fsync merged data";
+            return false;
+        }
+
+        // Merge is done and data is on disk. Update the COW Header about
+        // the merge completion
+        if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
+            SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
+            return false;
+        }
+
+        SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
+
+        // Notify RA thread that the merge thread is ready to merge the next
+        // window
+        snapuserd_->NotifyRAForMergeReady();
+    }
+
+    return true;
+}
+
+bool Worker::Merge() {
+    std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
+
+    // Start with Copy and Xor ops
+    if (!MergeOrderedOps(cowop_iter)) {
+        SNAP_LOG(ERROR) << "Merge failed for ordered ops";
+        snapuserd_->MergeFailed();
+        return false;
+    }
+
+    SNAP_LOG(INFO) << "MergeOrderedOps completed...";
+
+    // Replace and Zero ops
+    if (!MergeReplaceZeroOps(cowop_iter)) {
+        SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
+        snapuserd_->MergeFailed();
+        return false;
+    }
+
+    snapuserd_->MergeCompleted();
+
+    return true;
+}
+
+bool Worker::RunMergeThread() {
+    SNAP_LOG(DEBUG) << "Waiting for merge begin...";
+    if (!snapuserd_->WaitForMergeBegin()) {
+        SNAP_LOG(ERROR) << "Merge terminated early...";
+        return true;
+    }
+
+    SNAP_LOG(INFO) << "Merge starting..";
+
+    if (!Init()) {
+        SNAP_LOG(ERROR) << "Merge thread initialization failed...";
+        return false;
+    }
+
+    if (!Merge()) {
+        return false;
+    }
+
+    CloseFds();
+    reader_->CloseCowFd();
+
+    SNAP_LOG(INFO) << "Merge finish";
+
+    return true;
+}
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
new file mode 100644
index 0000000..319755b
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -0,0 +1,424 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "snapuserd_core.h"
+
+namespace android {
+namespace snapshot {
+
+using namespace android;
+using namespace android::dm;
+using android::base::unique_fd;
+
+ReadAhead::ReadAhead(const std::string& cow_device, const std::string& backing_device,
+                     const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd) {
+    cow_device_ = cow_device;
+    backing_store_device_ = backing_device;
+    misc_name_ = misc_name;
+    snapuserd_ = snapuserd;
+}
+
+void ReadAhead::CheckOverlap(const CowOperation* cow_op) {
+    uint64_t source_block = cow_op->source;
+    uint64_t source_offset = 0;
+    if (cow_op->type == kCowXorOp) {
+        source_block /= BLOCK_SZ;
+        source_offset = cow_op->source % BLOCK_SZ;
+    }
+    if (dest_blocks_.count(cow_op->new_block) || source_blocks_.count(source_block) ||
+        (source_offset > 0 && source_blocks_.count(source_block + 1))) {
+        overlap_ = true;
+    }
+
+    dest_blocks_.insert(source_block);
+    if (source_offset > 0) {
+        dest_blocks_.insert(source_block + 1);
+    }
+    source_blocks_.insert(cow_op->new_block);
+}
+
+int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops,
+                                    std::vector<uint64_t>& blocks,
+                                    std::vector<const CowOperation*>& xor_op_vec) {
+    int num_ops = *pending_ops;
+    int nr_consecutive = 0;
+
+    bool is_ops_present = (!RAIterDone() && num_ops);
+
+    if (!is_ops_present) {
+        return nr_consecutive;
+    }
+
+    // Get the first block with offset
+    const CowOperation* cow_op = GetRAOpIter();
+    *source_offset = cow_op->source;
+
+    if (cow_op->type == kCowCopyOp) {
+        *source_offset *= BLOCK_SZ;
+    } else if (cow_op->type == kCowXorOp) {
+        xor_op_vec.push_back(cow_op);
+    }
+
+    RAIterNext();
+    num_ops -= 1;
+    nr_consecutive = 1;
+    blocks.push_back(cow_op->new_block);
+
+    if (!overlap_) {
+        CheckOverlap(cow_op);
+    }
+
+    /*
+     * Find number of consecutive blocks
+     */
+    while (!RAIterDone() && num_ops) {
+        const CowOperation* op = GetRAOpIter();
+        uint64_t next_offset = op->source;
+
+        if (cow_op->type == kCowCopyOp) {
+            next_offset *= BLOCK_SZ;
+        }
+
+        // Check for consecutive blocks
+        if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
+            break;
+        }
+
+        if (op->type == kCowXorOp) {
+            xor_op_vec.push_back(op);
+        }
+
+        nr_consecutive += 1;
+        num_ops -= 1;
+        blocks.push_back(op->new_block);
+        RAIterNext();
+
+        if (!overlap_) {
+            CheckOverlap(op);
+        }
+    }
+
+    return nr_consecutive;
+}
+
+bool ReadAhead::ReconstructDataFromCow() {
+    std::unordered_map<uint64_t, void*> read_ahead_buffer_map;
+    loff_t metadata_offset = 0;
+    loff_t start_data_offset = snapuserd_->GetBufferDataOffset();
+    int num_ops = 0;
+    int total_blocks_merged = 0;
+
+    while (true) {
+        struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
+                (char*)metadata_buffer_ + metadata_offset);
+
+        // Done reading metadata
+        if (bm->new_block == 0 && bm->file_offset == 0) {
+            break;
+        }
+
+        loff_t buffer_offset = bm->file_offset - start_data_offset;
+        void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + buffer_offset);
+        read_ahead_buffer_map[bm->new_block] = bufptr;
+        num_ops += 1;
+        total_blocks_merged += 1;
+
+        metadata_offset += sizeof(struct ScratchMetadata);
+    }
+
+    // We are done re-constructing the mapping; however, we need to make sure
+    // all the COW operations to-be merged are present in the re-constructed
+    // mapping.
+    while (!RAIterDone()) {
+        const CowOperation* op = GetRAOpIter();
+        if (read_ahead_buffer_map.find(op->new_block) != read_ahead_buffer_map.end()) {
+            num_ops -= 1;
+            RAIterNext();
+            continue;
+        }
+
+        // Verify that we have covered all the ops which were re-constructed
+        // from COW device - These are the ops which are being
+        // re-constructed after crash.
+        if (!(num_ops == 0)) {
+            SNAP_LOG(ERROR) << "ReconstructDataFromCow failed. Not all ops recoverd "
+                            << " Pending ops: " << num_ops;
+            snapuserd_->ReadAheadIOFailed();
+            return false;
+        }
+
+        break;
+    }
+
+    snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
+
+    snapuserd_->FinishReconstructDataFromCow();
+
+    if (!snapuserd_->ReadAheadIOCompleted(true)) {
+        SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
+        snapuserd_->ReadAheadIOFailed();
+        return false;
+    }
+
+    SNAP_LOG(INFO) << "ReconstructDataFromCow success";
+    return true;
+}
+
+bool ReadAhead::ReadAheadIOStart() {
+    // Check if the data has to be constructed from the COW file.
+    // This will be true only once during boot up after a crash
+    // during merge.
+    if (snapuserd_->ShouldReconstructDataFromCow()) {
+        return ReconstructDataFromCow();
+    }
+
+    std::vector<uint64_t> blocks;
+
+    int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
+    loff_t buffer_offset = 0;
+    int total_blocks_merged = 0;
+    overlap_ = false;
+    dest_blocks_.clear();
+    source_blocks_.clear();
+    std::vector<const CowOperation*> xor_op_vec;
+
+    auto ra_temp_buffer = std::make_unique<uint8_t[]>(snapuserd_->GetBufferDataSize());
+
+    // Number of ops to be merged in this window. This is a fixed size
+    // except for the last window wherein the number of ops can be less
+    // than the size of the RA window.
+    while (num_ops) {
+        uint64_t source_offset;
+
+        int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks, xor_op_vec);
+        if (linear_blocks == 0) {
+            // No more blocks to read
+            SNAP_LOG(DEBUG) << " Read-ahead completed....";
+            break;
+        }
+
+        size_t io_size = (linear_blocks * BLOCK_SZ);
+
+        // Read from the base device consecutive set of blocks in one shot
+        if (!android::base::ReadFullyAtOffset(backing_store_fd_,
+                                              (char*)ra_temp_buffer.get() + buffer_offset, io_size,
+                                              source_offset)) {
+            SNAP_PLOG(ERROR) << "Ordered-op failed. Read from backing store: "
+                             << backing_store_device_ << "at block :" << source_offset / BLOCK_SZ
+                             << " offset :" << source_offset % BLOCK_SZ
+                             << " buffer_offset : " << buffer_offset << " io_size : " << io_size
+                             << " buf-addr : " << read_ahead_buffer_;
+
+            snapuserd_->ReadAheadIOFailed();
+            return false;
+        }
+
+        buffer_offset += io_size;
+        total_blocks_merged += linear_blocks;
+        num_ops -= linear_blocks;
+    }
+
+    // Done with merging ordered ops
+    if (RAIterDone() && total_blocks_merged == 0) {
+        return true;
+    }
+
+    loff_t metadata_offset = 0;
+
+    auto ra_temp_meta_buffer = std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
+
+    struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
+            (char*)ra_temp_meta_buffer.get() + metadata_offset);
+
+    bm->new_block = 0;
+    bm->file_offset = 0;
+
+    loff_t file_offset = snapuserd_->GetBufferDataOffset();
+
+    loff_t offset = 0;
+    CHECK(blocks.size() == total_blocks_merged);
+
+    size_t xor_index = 0;
+    for (size_t block_index = 0; block_index < blocks.size(); block_index++) {
+        void* bufptr = static_cast<void*>((char*)ra_temp_buffer.get() + offset);
+        uint64_t new_block = blocks[block_index];
+
+        if (xor_index < xor_op_vec.size()) {
+            const CowOperation* xor_op = xor_op_vec[xor_index];
+
+            // Check if this block is an XOR op
+            if (xor_op->new_block == new_block) {
+                // Read the xor'ed data from COW
+                if (!reader_->ReadData(*xor_op, &bufsink_)) {
+                    SNAP_LOG(ERROR)
+                            << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
+                    snapuserd_->ReadAheadIOFailed();
+                    return false;
+                }
+
+                // Pointer to the data read from base device
+                uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
+                // Get the xor'ed data read from COW device
+                uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink_.GetPayloadBufPtr());
+
+                // Retrieve the original data
+                for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
+                    buffer[byte_offset] ^= xor_data[byte_offset];
+                }
+
+                // Move to next XOR op
+                xor_index += 1;
+            }
+        }
+
+        offset += BLOCK_SZ;
+        // Track the metadata blocks which are stored in scratch space
+        bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer.get() +
+                                                       metadata_offset);
+
+        bm->new_block = new_block;
+        bm->file_offset = file_offset;
+
+        metadata_offset += sizeof(struct ScratchMetadata);
+        file_offset += BLOCK_SZ;
+    }
+
+    // Verify if all the xor blocks were scanned to retrieve the original data
+    CHECK(xor_index == xor_op_vec.size());
+
+    // This is important - explicitly set the contents to zero. This is used
+    // when re-constructing the data after crash. This indicates end of
+    // reading metadata contents when re-constructing the data
+    bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer.get() +
+                                                   metadata_offset);
+    bm->new_block = 0;
+    bm->file_offset = 0;
+
+    // Wait for the merge to finish for the previous RA window. We shouldn't
+    // be touching the scratch space until merge is complete of previous RA
+    // window. If there is a crash during this time frame, merge should resume
+    // based on the contents of the scratch space.
+    if (!snapuserd_->WaitForMergeReady()) {
+        return false;
+    }
+
+    // Copy the data to scratch space
+    memcpy(metadata_buffer_, ra_temp_meta_buffer.get(), snapuserd_->GetBufferMetadataSize());
+    memcpy(read_ahead_buffer_, ra_temp_buffer.get(), total_blocks_merged * BLOCK_SZ);
+
+    snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
+
+    // Flush the data only if we have a overlapping blocks in the region
+    // Notify the Merge thread to resume merging this window
+    if (!snapuserd_->ReadAheadIOCompleted(overlap_)) {
+        SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
+        snapuserd_->ReadAheadIOFailed();
+        return false;
+    }
+
+    return true;
+}
+
+bool ReadAhead::RunThread() {
+    if (!InitializeFds()) {
+        return false;
+    }
+
+    InitializeBuffer();
+
+    if (!InitReader()) {
+        return false;
+    }
+
+    InitializeRAIter();
+
+    while (!RAIterDone()) {
+        if (!ReadAheadIOStart()) {
+            break;
+        }
+    }
+
+    CloseFds();
+    reader_->CloseCowFd();
+    SNAP_LOG(INFO) << " ReadAhead thread terminating....";
+    return true;
+}
+
+// Initialization
+bool ReadAhead::InitializeFds() {
+    backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
+    if (backing_store_fd_ < 0) {
+        SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
+        return false;
+    }
+
+    cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
+    if (cow_fd_ < 0) {
+        SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
+        return false;
+    }
+
+    return true;
+}
+
+bool ReadAhead::InitReader() {
+    reader_ = snapuserd_->CloneReaderForWorker();
+
+    if (!reader_->InitForMerge(std::move(cow_fd_))) {
+        return false;
+    }
+    return true;
+}
+
+void ReadAhead::InitializeRAIter() {
+    cowop_iter_ = reader_->GetMergeOpIter();
+}
+
+bool ReadAhead::RAIterDone() {
+    if (cowop_iter_->Done()) {
+        return true;
+    }
+
+    const CowOperation* cow_op = GetRAOpIter();
+
+    if (!IsOrderedOp(*cow_op)) {
+        return true;
+    }
+
+    return false;
+}
+
+void ReadAhead::RAIterNext() {
+    cowop_iter_->Next();
+}
+
+const CowOperation* ReadAhead::GetRAOpIter() {
+    const CowOperation* cow_op = &cowop_iter_->Get();
+    return cow_op;
+}
+
+void ReadAhead::InitializeBuffer() {
+    void* mapped_addr = snapuserd_->GetMappedAddr();
+    // Map the scratch space region into memory
+    metadata_buffer_ =
+            static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferMetadataOffset());
+    read_ahead_buffer_ = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
+    // For xor ops
+    bufsink_.Initialize(PAYLOAD_SIZE);
+}
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
new file mode 100644
index 0000000..97418bd
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
@@ -0,0 +1,363 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "snapuserd_core.h"
+
+/*
+ * Readahead is used to optimize the merge of COPY and XOR Ops.
+ *
+ * We create a scratch space of 2MB to store the read-ahead data in the COW
+ * device.
+ *
+ *      +-----------------------+
+ *      |     Header (fixed)    |
+ *      +-----------------------+
+ *      |    Scratch space      |  <-- 2MB
+ *      +-----------------------+
+ *
+ *      Scratch space is as follows:
+ *
+ *      +-----------------------+
+ *      |       Metadata        | <- 4k page
+ *      +-----------------------+
+ *      |       Metadata        | <- 4k page
+ *      +-----------------------+
+ *      |                       |
+ *      |    Read-ahead data    |
+ *      |                       |
+ *      +-----------------------+
+ *
+ *
+ * * ===================================================================
+ *
+ * Example:
+ *
+ * We have 6 copy operations to be executed in OTA. Update-engine
+ * will write to COW file as follows:
+ *
+ * Op-1: 20 -> 23
+ * Op-2: 19 -> 22
+ * Op-3: 18 -> 21
+ * Op-4: 17 -> 20
+ * Op-5: 16 -> 19
+ * Op-6: 15 -> 18
+ *
+ * Read-ahead thread will read all the 6 source blocks and store the data in the
+ * scratch space. Metadata will contain the destination block numbers. Thus,
+ * scratch space will look something like this:
+ *
+ * +--------------+
+ * | Block   23   |
+ * | offset - 1   |
+ * +--------------+
+ * | Block   22   |
+ * | offset - 2   |
+ * +--------------+
+ * | Block   21   |
+ * | offset - 3   |
+ * +--------------+
+ *    ...
+ *    ...
+ * +--------------+
+ * | Data-Block 20| <-- offset - 1
+ * +--------------+
+ * | Data-Block 19| <-- offset - 2
+ * +--------------+
+ * | Data-Block 18| <-- offset - 3
+ * +--------------+
+ *     ...
+ *     ...
+ *
+ * ====================================================================
+ *
+ *
+ *  Read-ahead thread will process the COW Ops in fixed set. Consider
+ *  the following example:
+ *
+ *  +--------------------------+
+ *  |op-1|op-2|op-3|....|op-510|
+ *  +--------------------------+
+ *
+ *  <------ One RA Block ------>
+ *
+ *  RA thread will read 510 ordered COW ops at a time and will store
+ *  the data in the scratch space.
+ *
+ *  RA thread and Merge thread will go lock-step wherein RA thread
+ *  will make sure that 510 COW operation data are read upfront
+ *  and is in memory. Thus, when merge thread will pick up the data
+ *  directly from memory and write it back to base device.
+ *
+ *
+ *  +--------------------------+------------------------------------+
+ *  |op-1|op-2|op-3|....|op-510|op-511|op-512|op-513........|op-1020|
+ *  +--------------------------+------------------------------------+
+ *
+ *  <------Merge 510 Blocks----><-Prepare 510 blocks for merge by RA->
+ *           ^                                  ^
+ *           |                                  |
+ *      Merge thread                        RA thread
+ *
+ * Both Merge and RA thread will strive to work in parallel.
+ *
+ * ===========================================================================
+ *
+ * State transitions and communication between RA thread and Merge thread:
+ *
+ *  Merge Thread                                      RA Thread
+ *  ----------------------------------------------------------------------------
+ *
+ *          |                                         |
+ *    WAIT for RA Block N                     READ one RA Block (N)
+ *        for merge                                   |
+ *          |                                         |
+ *          |                                         |
+ *          <--------------MERGE BEGIN--------READ Block N done(copy to scratch)
+ *          |                                         |
+ *          |                                         |
+ *    Merge Begin Block N                     READ one RA BLock (N+1)
+ *          |                                         |
+ *          |                                         |
+ *          |                                  READ done. Wait for merge complete
+ *          |                                         |
+ *          |                                        WAIT
+ *          |                                         |
+ *    Merge done Block N                              |
+ *          ----------------MERGE READY-------------->|
+ *    WAIT for RA Block N+1                     Copy RA Block (N+1)
+ *        for merge                              to scratch space
+ *          |                                         |
+ *          <---------------MERGE BEGIN---------BLOCK N+1 Done
+ *          |                                         |
+ *          |                                         |
+ *    Merge Begin Block N+1                   READ one RA BLock (N+2)
+ *          |                                         |
+ *          |                                         |
+ *          |                                  READ done. Wait for merge complete
+ *          |                                         |
+ *          |                                        WAIT
+ *          |                                         |
+ *    Merge done Block N+1                            |
+ *          ----------------MERGE READY-------------->|
+ *    WAIT for RA Block N+2                     Copy RA Block (N+2)
+ *        for merge                              to scratch space
+ *          |                                         |
+ *          <---------------MERGE BEGIN---------BLOCK N+2 Done
+ */
+
+namespace android {
+namespace snapshot {
+
+using namespace android;
+using namespace android::dm;
+using android::base::unique_fd;
+
+// This is invoked once primarily by update-engine to initiate
+// the merge
+void SnapshotHandler::InitiateMerge() {
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        merge_initiated_ = true;
+
+        // If there are only REPLACE ops to be merged, then we need
+        // to explicitly set the state to MERGE_BEGIN as there
+        // is no read-ahead thread
+        if (!ra_thread_) {
+            io_state_ = MERGE_IO_TRANSITION::MERGE_BEGIN;
+        }
+    }
+    cv.notify_all();
+}
+
+// Invoked by Merge thread - Waits on RA thread to resume merging. Will
+// be waken up RA thread.
+bool SnapshotHandler::WaitForMergeBegin() {
+    {
+        std::unique_lock<std::mutex> lock(lock_);
+        while (!MergeInitiated()) {
+            cv.wait(lock);
+
+            if (io_state_ == MERGE_IO_TRANSITION::READ_AHEAD_FAILURE ||
+                io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED) {
+                return false;
+            }
+        }
+
+        while (!(io_state_ == MERGE_IO_TRANSITION::MERGE_BEGIN ||
+                 io_state_ == MERGE_IO_TRANSITION::READ_AHEAD_FAILURE ||
+                 io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED)) {
+            cv.wait(lock);
+        }
+
+        if (io_state_ == MERGE_IO_TRANSITION::READ_AHEAD_FAILURE ||
+            io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED) {
+            return false;
+        }
+
+        return true;
+    }
+}
+
+// Invoked by RA thread - Flushes the RA block to scratch space if necessary
+// and then notifies the merge thread to resume merging
+bool SnapshotHandler::ReadAheadIOCompleted(bool sync) {
+    if (sync) {
+        // Flush the entire buffer region
+        int ret = msync(mapped_addr_, total_mapped_addr_length_, MS_SYNC);
+        if (ret < 0) {
+            PLOG(ERROR) << "msync failed after ReadAheadIOCompleted: " << ret;
+            return false;
+        }
+
+        // Metadata and data are synced. Now, update the state.
+        // We need to update the state after flushing data; if there is a crash
+        // when read-ahead IO is in progress, the state of data in the COW file
+        // is unknown. kCowReadAheadDone acts as a checkpoint wherein the data
+        // in the scratch space is good and during next reboot, read-ahead thread
+        // can safely re-construct the data.
+        struct BufferState* ra_state = GetBufferState();
+        ra_state->read_ahead_state = kCowReadAheadDone;
+
+        ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
+        if (ret < 0) {
+            PLOG(ERROR) << "msync failed to flush Readahead completion state...";
+            return false;
+        }
+    }
+
+    // Notify the merge thread to resume merging
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        if (io_state_ != MERGE_IO_TRANSITION::IO_TERMINATED &&
+            io_state_ != MERGE_IO_TRANSITION::MERGE_FAILED) {
+            io_state_ = MERGE_IO_TRANSITION::MERGE_BEGIN;
+        }
+    }
+
+    cv.notify_all();
+    return true;
+}
+
+// Invoked by RA thread - Waits for merge thread to finish merging
+// RA Block N - RA thread would be ready will with Block N+1 but
+// will wait to merge thread to finish Block N. Once Block N
+// is merged, RA thread will be woken up by Merge thread and will
+// flush the data of Block N+1 to scratch space
+bool SnapshotHandler::WaitForMergeReady() {
+    {
+        std::unique_lock<std::mutex> lock(lock_);
+        while (!(io_state_ == MERGE_IO_TRANSITION::MERGE_READY ||
+                 io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED ||
+                 io_state_ == MERGE_IO_TRANSITION::MERGE_COMPLETE ||
+                 io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED)) {
+            cv.wait(lock);
+        }
+
+        // Check if merge failed
+        if (io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED ||
+            io_state_ == MERGE_IO_TRANSITION::MERGE_COMPLETE ||
+            io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED) {
+            return false;
+        }
+        return true;
+    }
+}
+
+// Invoked by Merge thread - Notify RA thread about Merge completion
+// for Block N and wake up
+void SnapshotHandler::NotifyRAForMergeReady() {
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        if (io_state_ != MERGE_IO_TRANSITION::IO_TERMINATED &&
+            io_state_ != MERGE_IO_TRANSITION::READ_AHEAD_FAILURE) {
+            io_state_ = MERGE_IO_TRANSITION::MERGE_READY;
+        }
+    }
+
+    cv.notify_all();
+}
+
+// The following transitions are mostly in the failure paths
+void SnapshotHandler::MergeFailed() {
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        io_state_ = MERGE_IO_TRANSITION::MERGE_FAILED;
+    }
+
+    cv.notify_all();
+}
+
+void SnapshotHandler::MergeCompleted() {
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        io_state_ = MERGE_IO_TRANSITION::MERGE_COMPLETE;
+    }
+
+    cv.notify_all();
+}
+
+// This is invoked by worker threads.
+//
+// Worker threads are terminated either by two scenarios:
+//
+// 1: If dm-user device is destroyed
+// 2: We had an I/O failure when reading root partitions
+//
+// In case (1), this would be a graceful shutdown. In this case, merge
+// thread and RA thread should have _already_ terminated by this point. We will be
+// destroying the dm-user device only _after_ merge is completed.
+//
+// In case (2), if merge thread had started, then it will be
+// continuing to merge; however, since we had an I/O failure and the
+// I/O on root partitions are no longer served, we will terminate the
+// merge.
+//
+// This functions is about handling case (2)
+void SnapshotHandler::NotifyIOTerminated() {
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        io_state_ = MERGE_IO_TRANSITION::IO_TERMINATED;
+    }
+
+    cv.notify_all();
+}
+
+bool SnapshotHandler::IsIOTerminated() {
+    std::lock_guard<std::mutex> lock(lock_);
+    return (io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED);
+}
+
+// Invoked by RA thread
+void SnapshotHandler::ReadAheadIOFailed() {
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        io_state_ = MERGE_IO_TRANSITION::READ_AHEAD_FAILURE;
+    }
+
+    cv.notify_all();
+}
+
+void SnapshotHandler::WaitForMergeComplete() {
+    std::unique_lock<std::mutex> lock(lock_);
+    while (!(io_state_ == MERGE_IO_TRANSITION::MERGE_COMPLETE ||
+             io_state_ == MERGE_IO_TRANSITION::MERGE_FAILED ||
+             io_state_ == MERGE_IO_TRANSITION::IO_TERMINATED)) {
+        cv.wait(lock);
+    }
+}
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/init/Android.bp b/init/Android.bp
index 9b02c38..66427dc 100644
--- a/init/Android.bp
+++ b/init/Android.bp
@@ -448,6 +448,7 @@
 
     srcs: [
         "devices_test.cpp",
+        "epoll_test.cpp",
         "firmware_handler_test.cpp",
         "init_test.cpp",
         "keychords_test.cpp",
diff --git a/init/epoll.cpp b/init/epoll.cpp
index 17d63fa..74d8aac 100644
--- a/init/epoll.cpp
+++ b/init/epoll.cpp
@@ -38,11 +38,12 @@
     return {};
 }
 
-Result<void> Epoll::RegisterHandler(int fd, std::function<void()> handler, uint32_t events) {
+Result<void> Epoll::RegisterHandler(int fd, Handler handler, uint32_t events) {
     if (!events) {
         return Error() << "Must specify events";
     }
-    auto [it, inserted] = epoll_handlers_.emplace(fd, std::move(handler));
+    auto sp = std::make_shared<decltype(handler)>(std::move(handler));
+    auto [it, inserted] = epoll_handlers_.emplace(fd, std::move(sp));
     if (!inserted) {
         return Error() << "Cannot specify two epoll handlers for a given FD";
     }
@@ -69,7 +70,7 @@
     return {};
 }
 
-Result<std::vector<std::function<void()>*>> Epoll::Wait(
+Result<std::vector<std::shared_ptr<Epoll::Handler>>> Epoll::Wait(
         std::optional<std::chrono::milliseconds> timeout) {
     int timeout_ms = -1;
     if (timeout && timeout->count() < INT_MAX) {
@@ -81,9 +82,10 @@
     if (num_events == -1) {
         return ErrnoError() << "epoll_wait failed";
     }
-    std::vector<std::function<void()>*> pending_functions;
+    std::vector<std::shared_ptr<Handler>> pending_functions;
     for (int i = 0; i < num_events; ++i) {
-        pending_functions.emplace_back(reinterpret_cast<std::function<void()>*>(ev[i].data.ptr));
+        auto sp = *reinterpret_cast<std::shared_ptr<Handler>*>(ev[i].data.ptr);
+        pending_functions.emplace_back(std::move(sp));
     }
 
     return pending_functions;
diff --git a/init/epoll.h b/init/epoll.h
index c32a661..0df5289 100644
--- a/init/epoll.h
+++ b/init/epoll.h
@@ -22,6 +22,7 @@
 #include <chrono>
 #include <functional>
 #include <map>
+#include <memory>
 #include <optional>
 #include <vector>
 
@@ -36,15 +37,17 @@
   public:
     Epoll();
 
+    typedef std::function<void()> Handler;
+
     Result<void> Open();
-    Result<void> RegisterHandler(int fd, std::function<void()> handler, uint32_t events = EPOLLIN);
+    Result<void> RegisterHandler(int fd, Handler handler, uint32_t events = EPOLLIN);
     Result<void> UnregisterHandler(int fd);
-    Result<std::vector<std::function<void()>*>> Wait(
+    Result<std::vector<std::shared_ptr<Handler>>> Wait(
             std::optional<std::chrono::milliseconds> timeout);
 
   private:
     android::base::unique_fd epoll_fd_;
-    std::map<int, std::function<void()>> epoll_handlers_;
+    std::map<int, std::shared_ptr<Handler>> epoll_handlers_;
 };
 
 }  // namespace init
diff --git a/init/epoll_test.cpp b/init/epoll_test.cpp
new file mode 100644
index 0000000..9236cd5
--- /dev/null
+++ b/init/epoll_test.cpp
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "epoll.h"
+
+#include <sys/unistd.h>
+
+#include <unordered_set>
+
+#include <android-base/file.h>
+#include <gtest/gtest.h>
+
+namespace android {
+namespace init {
+
+std::unordered_set<void*> sValidObjects;
+
+class CatchDtor final {
+  public:
+    CatchDtor() { sValidObjects.emplace(this); }
+    CatchDtor(const CatchDtor&) { sValidObjects.emplace(this); }
+    ~CatchDtor() {
+        auto iter = sValidObjects.find(this);
+        if (iter != sValidObjects.end()) {
+            sValidObjects.erase(iter);
+        }
+    }
+};
+
+TEST(epoll, UnregisterHandler) {
+    Epoll epoll;
+    ASSERT_RESULT_OK(epoll.Open());
+
+    int fds[2];
+    ASSERT_EQ(pipe(fds), 0);
+
+    CatchDtor catch_dtor;
+    bool handler_invoked;
+    auto handler = [&, catch_dtor]() -> void {
+        auto result = epoll.UnregisterHandler(fds[0]);
+        ASSERT_EQ(result.ok(), !handler_invoked);
+        handler_invoked = true;
+        ASSERT_NE(sValidObjects.find((void*)&catch_dtor), sValidObjects.end());
+    };
+
+    epoll.RegisterHandler(fds[0], std::move(handler));
+
+    uint8_t byte = 0xee;
+    ASSERT_TRUE(android::base::WriteFully(fds[1], &byte, sizeof(byte)));
+
+    auto results = epoll.Wait({});
+    ASSERT_RESULT_OK(results);
+    ASSERT_EQ(results->size(), size_t(1));
+
+    for (const auto& function : *results) {
+        (*function)();
+        (*function)();
+    }
+    ASSERT_TRUE(handler_invoked);
+}
+
+}  // namespace init
+}  // namespace android
diff --git a/libprocessgroup/profiles/task_profiles_28.json b/libprocessgroup/profiles/task_profiles_28.json
index 9f83785..56053e0 100644
--- a/libprocessgroup/profiles/task_profiles_28.json
+++ b/libprocessgroup/profiles/task_profiles_28.json
@@ -40,6 +40,19 @@
       ]
     },
     {
+      "Name": "ServicePerformance",
+      "Actions": [
+        {
+          "Name": "JoinCgroup",
+          "Params":
+          {
+            "Controller": "schedtune",
+            "Path": "background"
+          }
+        }
+      ]
+    },
+    {
       "Name": "HighPerformance",
       "Actions": [
         {
diff --git a/libprocessgroup/profiles/task_profiles_29.json b/libprocessgroup/profiles/task_profiles_29.json
index 9f83785..52279b8 100644
--- a/libprocessgroup/profiles/task_profiles_29.json
+++ b/libprocessgroup/profiles/task_profiles_29.json
@@ -53,6 +53,19 @@
       ]
     },
     {
+      "Name": "ServicePerformance",
+      "Actions": [
+        {
+          "Name": "JoinCgroup",
+          "Params":
+          {
+            "Controller": "schedtune",
+            "Path": "background"
+          }
+        }
+      ]
+    },
+    {
       "Name": "MaxPerformance",
       "Actions": [
         {
diff --git a/libprocessgroup/profiles/task_profiles_30.json b/libprocessgroup/profiles/task_profiles_30.json
index 9f83785..56053e0 100644
--- a/libprocessgroup/profiles/task_profiles_30.json
+++ b/libprocessgroup/profiles/task_profiles_30.json
@@ -40,6 +40,19 @@
       ]
     },
     {
+      "Name": "ServicePerformance",
+      "Actions": [
+        {
+          "Name": "JoinCgroup",
+          "Params":
+          {
+            "Controller": "schedtune",
+            "Path": "background"
+          }
+        }
+      ]
+    },
+    {
       "Name": "HighPerformance",
       "Actions": [
         {
diff --git a/libutils/String16.cpp b/libutils/String16.cpp
index c42cada..68642d8 100644
--- a/libutils/String16.cpp
+++ b/libutils/String16.cpp
@@ -199,99 +199,59 @@
     return NO_MEMORY;
 }
 
-status_t String16::append(const String16& other)
-{
-    const size_t myLen = size();
-    const size_t otherLen = other.size();
-    if (myLen == 0) {
-        setTo(other);
-        return OK;
-    } else if (otherLen == 0) {
-        return OK;
-    }
-
-    if (myLen >= SIZE_MAX / sizeof(char16_t) - otherLen) {
-        android_errorWriteLog(0x534e4554, "73826242");
-        abort();
-    }
-
-    SharedBuffer* buf =
-            static_cast<SharedBuffer*>(editResize((myLen + otherLen + 1) * sizeof(char16_t)));
-    if (buf) {
-        char16_t* str = (char16_t*)buf->data();
-        memcpy(str+myLen, other, (otherLen+1)*sizeof(char16_t));
-        mString = str;
-        return OK;
-    }
-    return NO_MEMORY;
+status_t String16::append(const String16& other) {
+    return append(other.string(), other.size());
 }
 
-status_t String16::append(const char16_t* chrs, size_t otherLen)
-{
+status_t String16::append(const char16_t* chrs, size_t otherLen) {
     const size_t myLen = size();
-    if (myLen == 0) {
-        setTo(chrs, otherLen);
-        return OK;
-    } else if (otherLen == 0) {
-        return OK;
-    }
 
-    if (myLen >= SIZE_MAX / sizeof(char16_t) - otherLen) {
-        android_errorWriteLog(0x534e4554, "73826242");
-        abort();
-    }
+    if (myLen == 0) return setTo(chrs, otherLen);
 
-    SharedBuffer* buf =
-            static_cast<SharedBuffer*>(editResize((myLen + otherLen + 1) * sizeof(char16_t)));
-    if (buf) {
-        char16_t* str = (char16_t*)buf->data();
-        memcpy(str+myLen, chrs, otherLen*sizeof(char16_t));
-        str[myLen+otherLen] = 0;
-        mString = str;
-        return OK;
-    }
-    return NO_MEMORY;
+    if (otherLen == 0) return OK;
+
+    size_t size = myLen;
+    if (__builtin_add_overflow(size, otherLen, &size) ||
+        __builtin_add_overflow(size, 1, &size) ||
+        __builtin_mul_overflow(size, sizeof(char16_t), &size)) return NO_MEMORY;
+
+    SharedBuffer* buf = static_cast<SharedBuffer*>(editResize(size));
+    if (!buf) return NO_MEMORY;
+
+    char16_t* str = static_cast<char16_t*>(buf->data());
+    memcpy(str + myLen, chrs, otherLen * sizeof(char16_t));
+    str[myLen + otherLen] = 0;
+    mString = str;
+    return OK;
 }
 
-status_t String16::insert(size_t pos, const char16_t* chrs)
-{
+status_t String16::insert(size_t pos, const char16_t* chrs) {
     return insert(pos, chrs, strlen16(chrs));
 }
 
-status_t String16::insert(size_t pos, const char16_t* chrs, size_t len)
-{
+status_t String16::insert(size_t pos, const char16_t* chrs, size_t otherLen) {
     const size_t myLen = size();
-    if (myLen == 0) {
-        return setTo(chrs, len);
-        return OK;
-    } else if (len == 0) {
-        return OK;
-    }
+
+    if (myLen == 0) return setTo(chrs, otherLen);
+
+    if (otherLen == 0) return OK;
 
     if (pos > myLen) pos = myLen;
 
-    #if 0
-    printf("Insert in to %s: pos=%d, len=%d, myLen=%d, chrs=%s\n",
-           String8(*this).string(), pos,
-           len, myLen, String8(chrs, len).string());
-    #endif
+    size_t size = myLen;
+    if (__builtin_add_overflow(size, otherLen, &size) ||
+        __builtin_add_overflow(size, 1, &size) ||
+        __builtin_mul_overflow(size, sizeof(char16_t), &size)) return NO_MEMORY;
 
-    SharedBuffer* buf =
-            static_cast<SharedBuffer*>(editResize((myLen + len + 1) * sizeof(char16_t)));
-    if (buf) {
-        char16_t* str = (char16_t*)buf->data();
-        if (pos < myLen) {
-            memmove(str+pos+len, str+pos, (myLen-pos)*sizeof(char16_t));
-        }
-        memcpy(str+pos, chrs, len*sizeof(char16_t));
-        str[myLen+len] = 0;
-        mString = str;
-        #if 0
-        printf("Result (%d chrs): %s\n", size(), String8(*this).string());
-        #endif
-        return OK;
-    }
-    return NO_MEMORY;
+    SharedBuffer* buf = static_cast<SharedBuffer*>(editResize(size));
+    if (!buf) return NO_MEMORY;
+
+    char16_t* str = static_cast<char16_t*>(buf->data());
+    if (pos < myLen) memmove(str + pos + otherLen, str + pos, (myLen - pos) * sizeof(char16_t));
+    memcpy(str + pos, chrs, otherLen * sizeof(char16_t));
+    str[myLen + otherLen] = 0;
+    mString = str;
+    return OK;
 }
 
 ssize_t String16::findFirst(char16_t c) const
diff --git a/libutils/String16_test.cpp b/libutils/String16_test.cpp
index 7d7230e..c6e6f74 100644
--- a/libutils/String16_test.cpp
+++ b/libutils/String16_test.cpp
@@ -19,7 +19,7 @@
 
 #include <gtest/gtest.h>
 
-namespace android {
+using namespace android;
 
 ::testing::AssertionResult Char16_tStringEquals(const char16_t* a, const char16_t* b) {
     if (strcmp16(a, b) != 0) {
@@ -224,4 +224,36 @@
     EXPECT_STR16EQ(another, u"abcdef");
 }
 
-}  // namespace android
+TEST(String16Test, append) {
+    String16 s;
+    EXPECT_EQ(OK, s.append(String16(u"foo")));
+    EXPECT_STR16EQ(u"foo", s);
+    EXPECT_EQ(OK, s.append(String16(u"bar")));
+    EXPECT_STR16EQ(u"foobar", s);
+    EXPECT_EQ(OK, s.append(u"baz", 0));
+    EXPECT_STR16EQ(u"foobar", s);
+    EXPECT_EQ(NO_MEMORY, s.append(u"baz", SIZE_MAX));
+    EXPECT_STR16EQ(u"foobar", s);
+}
+
+TEST(String16Test, insert) {
+    String16 s;
+
+    // Inserting into the empty string inserts at the start.
+    EXPECT_EQ(OK, s.insert(123, u"foo"));
+    EXPECT_STR16EQ(u"foo", s);
+
+    // Inserting zero characters at any position is okay, but won't expand the string.
+    EXPECT_EQ(OK, s.insert(123, u"foo", 0));
+    EXPECT_STR16EQ(u"foo", s);
+
+    // Inserting past the end of a non-empty string appends.
+    EXPECT_EQ(OK, s.insert(123, u"bar"));
+    EXPECT_STR16EQ(u"foobar", s);
+
+    EXPECT_EQ(OK, s.insert(3, u"!"));
+    EXPECT_STR16EQ(u"foo!bar", s);
+
+    EXPECT_EQ(NO_MEMORY, s.insert(3, u"", SIZE_MAX));
+    EXPECT_STR16EQ(u"foo!bar", s);
+}
diff --git a/libutils/String8.cpp b/libutils/String8.cpp
index 8511da9..419b2de 100644
--- a/libutils/String8.cpp
+++ b/libutils/String8.cpp
@@ -313,8 +313,8 @@
 
     if (n > 0) {
         size_t oldLength = length();
-        if ((size_t)n > SIZE_MAX - 1 ||
-            oldLength > SIZE_MAX - (size_t)n - 1) {
+        if (n > std::numeric_limits<size_t>::max() - 1 ||
+            oldLength > std::numeric_limits<size_t>::max() - n - 1) {
             return NO_MEMORY;
         }
         char* buf = lockBuffer(oldLength + n);
@@ -327,21 +327,23 @@
     return result;
 }
 
-status_t String8::real_append(const char* other, size_t otherLen)
-{
+status_t String8::real_append(const char* other, size_t otherLen) {
     const size_t myLen = bytes();
 
-    SharedBuffer* buf = SharedBuffer::bufferFromData(mString)
-        ->editResize(myLen+otherLen+1);
-    if (buf) {
-        char* str = (char*)buf->data();
-        mString = str;
-        str += myLen;
-        memcpy(str, other, otherLen);
-        str[otherLen] = '\0';
-        return OK;
+    SharedBuffer* buf;
+    size_t newLen;
+    if (__builtin_add_overflow(myLen, otherLen, &newLen) ||
+        __builtin_add_overflow(newLen, 1, &newLen) ||
+        (buf = SharedBuffer::bufferFromData(mString)->editResize(newLen)) == nullptr) {
+        return NO_MEMORY;
     }
-    return NO_MEMORY;
+
+    char* str = (char*)buf->data();
+    mString = str;
+    str += myLen;
+    memcpy(str, other, otherLen);
+    str[otherLen] = '\0';
+    return OK;
 }
 
 char* String8::lockBuffer(size_t size)
diff --git a/libutils/String8_test.cpp b/libutils/String8_test.cpp
index 9efcc6f..1356cd0 100644
--- a/libutils/String8_test.cpp
+++ b/libutils/String8_test.cpp
@@ -15,13 +15,14 @@
  */
 
 #define LOG_TAG "String8_test"
+
 #include <utils/Log.h>
 #include <utils/String8.h>
 #include <utils/String16.h>
 
 #include <gtest/gtest.h>
 
-namespace android {
+using namespace android;
 
 class String8Test : public testing::Test {
 protected:
@@ -101,4 +102,15 @@
     String8 valid = String8(String16(tmp));
     EXPECT_STREQ(valid, "abcdef");
 }
+
+TEST_F(String8Test, append) {
+    String8 s;
+    EXPECT_EQ(OK, s.append("foo"));
+    EXPECT_STREQ("foo", s);
+    EXPECT_EQ(OK, s.append("bar"));
+    EXPECT_STREQ("foobar", s);
+    EXPECT_EQ(OK, s.append("baz", 0));
+    EXPECT_STREQ("foobar", s);
+    EXPECT_EQ(NO_MEMORY, s.append("baz", SIZE_MAX));
+    EXPECT_STREQ("foobar", s);
 }
diff --git a/libutils/Threads.cpp b/libutils/Threads.cpp
index 540dcf4..6e293c7 100644
--- a/libutils/Threads.cpp
+++ b/libutils/Threads.cpp
@@ -86,8 +86,10 @@
 
         // A new thread will be in its parent's sched group by default,
         // so we just need to handle the background case.
+        // currently set to system_background group which is different
+        // from background group for app.
         if (prio >= ANDROID_PRIORITY_BACKGROUND) {
-            SetTaskProfiles(0, {"SCHED_SP_BACKGROUND"}, true);
+            SetTaskProfiles(0, {"SCHED_SP_SYSTEM"}, true);
         }
 
         if (name) {
@@ -313,7 +315,7 @@
     }
 
     if (pri >= ANDROID_PRIORITY_BACKGROUND) {
-        rc = SetTaskProfiles(tid, {"SCHED_SP_BACKGROUND"}, true) ? 0 : -1;
+        rc = SetTaskProfiles(tid, {"SCHED_SP_SYSTEM"}, true) ? 0 : -1;
     } else if (curr_pri >= ANDROID_PRIORITY_BACKGROUND) {
         SchedPolicy policy = SP_FOREGROUND;
         // Change to the sched policy group of the process.