Merge "Adding Test for Parsing Flash Task" am: 0e25838886 am: 681b382764 am: dbba14a8b5

Original change: https://android-review.googlesource.com/c/platform/system/core/+/2516536

Change-Id: I7a4fd7eab014a8d6eda6ed951ffcd3957673670b
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/fs_mgr/libsnapshot/partition_cow_creator.cpp b/fs_mgr/libsnapshot/partition_cow_creator.cpp
index 7057223..5bc7e65 100644
--- a/fs_mgr/libsnapshot/partition_cow_creator.cpp
+++ b/fs_mgr/libsnapshot/partition_cow_creator.cpp
@@ -131,15 +131,28 @@
     return is_optimized;
 }
 
-void WriteExtent(DmSnapCowSizeCalculator* sc, const chromeos_update_engine::Extent& de,
+bool WriteExtent(DmSnapCowSizeCalculator* sc, const chromeos_update_engine::Extent& de,
                  unsigned int sectors_per_block) {
     const auto block_boundary = de.start_block() + de.num_blocks();
     for (auto b = de.start_block(); b < block_boundary; ++b) {
         for (unsigned int s = 0; s < sectors_per_block; ++s) {
-            const auto sector_id = b * sectors_per_block + s;
+            // sector_id = b * sectors_per_block + s;
+            uint64_t block_start_sector_id;
+            if (__builtin_mul_overflow(b, sectors_per_block, &block_start_sector_id)) {
+                LOG(ERROR) << "Integer overflow when calculating sector id (" << b << " * "
+                           << sectors_per_block << ")";
+                return false;
+            }
+            uint64_t sector_id;
+            if (__builtin_add_overflow(block_start_sector_id, s, &sector_id)) {
+                LOG(ERROR) << "Integer overflow when calculating sector id ("
+                           << block_start_sector_id << " + " << s << ")";
+                return false;
+            }
             sc->WriteSector(sector_id);
         }
     }
+    return true;
 }
 
 std::optional<uint64_t> PartitionCowCreator::GetCowSize() {
@@ -167,7 +180,7 @@
     // Allocate space for extra extents (if any). These extents are those that can be
     // used for error corrections or to store verity hash trees.
     for (const auto& de : extra_extents) {
-        WriteExtent(&sc, de, sectors_per_block);
+        if (!WriteExtent(&sc, de, sectors_per_block)) return std::nullopt;
     }
 
     if (update == nullptr) return sc.cow_size_bytes();
@@ -182,7 +195,7 @@
         }
 
         for (const auto& de : written_op->dst_extents()) {
-            WriteExtent(&sc, de, sectors_per_block);
+            if (!WriteExtent(&sc, de, sectors_per_block)) return std::nullopt;
         }
     }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index 1e03683..9261482 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -62,7 +62,6 @@
         "dm-snapshot-merge/snapuserd_worker.cpp",
         "dm-snapshot-merge/snapuserd_readahead.cpp",
         "snapuserd_buffer.cpp",
-        "user-space-merge/handler_manager.cpp",
         "user-space-merge/snapuserd_core.cpp",
         "user-space-merge/snapuserd_dm_user.cpp",
         "user-space-merge/snapuserd_merge.cpp",
@@ -164,7 +163,7 @@
 }
 
 cc_test {
-    name: "snapuserd_test_legacy",
+    name: "cow_snapuserd_test",
     defaults: [
         "fs_mgr_defaults",
         "libsnapshot_cow_defaults",
@@ -216,17 +215,16 @@
     ],
     static_libs: [
         "libbrotli",
-        "libcutils_sockets",
-        "libdm",
-        "libext4_utils",
-        "libfs_mgr",
-        "libgflags",
         "libgtest",
         "libsnapshot_cow",
         "libsnapshot_snapuserd",
-        "libsnapuserd",
-        "liburing",
+        "libcutils_sockets",
         "libz",
+        "libfs_mgr",
+        "libdm",
+        "libext4_utils",
+        "liburing",
+        "libgflags",
     ],
     include_dirs: ["bionic/libc/kernel"],
     header_libs: [
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
index 36dad33..ae20e1f 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
@@ -114,7 +114,7 @@
             return false;
         }
         auto handler = user_server_.AddHandler(parts[0], parts[1], parts[2], parts[3]);
-        if (!handler || !user_server_.StartHandler(parts[0])) {
+        if (!handler || !user_server_.StartHandler(handler)) {
             return false;
         }
     }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
deleted file mode 100644
index bdba5c0..0000000
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
+++ /dev/null
@@ -1,374 +0,0 @@
-// Copyright (C) 2023 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "handler_manager.h"
-
-#include <sys/eventfd.h>
-
-#include <android-base/logging.h>
-
-#include "snapuserd_core.h"
-
-namespace android {
-namespace snapshot {
-
-static constexpr uint8_t kMaxMergeThreads = 2;
-
-HandlerThread::HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd)
-    : snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {}
-
-void HandlerThread::FreeResources() {
-    // Each worker thread holds a reference to snapuserd.
-    // Clear them so that all the resources
-    // held by snapuserd is released
-    if (snapuserd_) {
-        snapuserd_->FreeResources();
-        snapuserd_ = nullptr;
-    }
-}
-
-SnapshotHandlerManager::SnapshotHandlerManager() {
-    monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
-    if (monitor_merge_event_fd_ == -1) {
-        PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
-    }
-}
-
-std::shared_ptr<HandlerThread> SnapshotHandlerManager::AddHandler(
-        const std::string& misc_name, const std::string& cow_device_path,
-        const std::string& backing_device, const std::string& base_path_merge,
-        int num_worker_threads, bool use_iouring, bool perform_verification) {
-    auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
-                                                       base_path_merge, num_worker_threads,
-                                                       use_iouring, perform_verification);
-    if (!snapuserd->InitCowDevice()) {
-        LOG(ERROR) << "Failed to initialize Snapuserd";
-        return nullptr;
-    }
-
-    if (!snapuserd->InitializeWorkers()) {
-        LOG(ERROR) << "Failed to initialize workers";
-        return nullptr;
-    }
-
-    auto handler = std::make_shared<HandlerThread>(snapuserd);
-    {
-        std::lock_guard<std::mutex> lock(lock_);
-        if (FindHandler(&lock, misc_name) != dm_users_.end()) {
-            LOG(ERROR) << "Handler already exists: " << misc_name;
-            return nullptr;
-        }
-        dm_users_.push_back(handler);
-    }
-    return handler;
-}
-
-bool SnapshotHandlerManager::StartHandler(const std::string& misc_name) {
-    std::lock_guard<std::mutex> lock(lock_);
-    auto iter = FindHandler(&lock, misc_name);
-    if (iter == dm_users_.end()) {
-        LOG(ERROR) << "Could not find handler: " << misc_name;
-        return false;
-    }
-    if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
-        LOG(ERROR) << "Tried to re-attach control device: " << misc_name;
-        return false;
-    }
-    if (!StartHandler(*iter)) {
-        return false;
-    }
-    return true;
-}
-
-bool SnapshotHandlerManager::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
-    if (handler->snapuserd()->IsAttached()) {
-        LOG(ERROR) << "Handler already attached";
-        return false;
-    }
-
-    handler->snapuserd()->AttachControlDevice();
-
-    handler->thread() = std::thread(std::bind(&SnapshotHandlerManager::RunThread, this, handler));
-    return true;
-}
-
-bool SnapshotHandlerManager::DeleteHandler(const std::string& misc_name) {
-    {
-        std::lock_guard<std::mutex> lock(lock_);
-        auto iter = FindHandler(&lock, misc_name);
-        if (iter == dm_users_.end()) {
-            // After merge is completed, we swap dm-user table with
-            // the underlying dm-linear base device. Hence, worker
-            // threads would have terminted and was removed from
-            // the list.
-            LOG(DEBUG) << "Could not find handler: " << misc_name;
-            return true;
-        }
-
-        if (!(*iter)->ThreadTerminated()) {
-            (*iter)->snapuserd()->NotifyIOTerminated();
-        }
-    }
-    if (!RemoveAndJoinHandler(misc_name)) {
-        return false;
-    }
-    return true;
-}
-
-void SnapshotHandlerManager::RunThread(std::shared_ptr<HandlerThread> handler) {
-    LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
-
-    if (!handler->snapuserd()->Start()) {
-        LOG(ERROR) << " Failed to launch all worker threads";
-    }
-
-    handler->snapuserd()->CloseFds();
-    bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
-    handler->snapuserd()->UnmapBufferRegion();
-
-    auto misc_name = handler->misc_name();
-    LOG(INFO) << "Handler thread about to exit: " << misc_name;
-
-    {
-        std::lock_guard<std::mutex> lock(lock_);
-        if (merge_completed) {
-            num_partitions_merge_complete_ += 1;
-            active_merge_threads_ -= 1;
-            WakeupMonitorMergeThread();
-        }
-        handler->SetThreadTerminated();
-        auto iter = FindHandler(&lock, handler->misc_name());
-        if (iter == dm_users_.end()) {
-            // RemoveAndJoinHandler() already removed us from the list, and is
-            // now waiting on a join(), so just return. Additionally, release
-            // all the resources held by snapuserd object which are shared
-            // by worker threads. This should be done when the last reference
-            // of "handler" is released; but we will explicitly release here
-            // to make sure snapuserd object is freed as it is the biggest
-            // consumer of memory in the daemon.
-            handler->FreeResources();
-            LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
-            return;
-        }
-
-        LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
-
-        if (handler->snapuserd()->IsAttached()) {
-            handler->thread().detach();
-        }
-
-        // Important: free resources within the lock. This ensures that if
-        // WaitForDelete() is called, the handler is either in the list, or
-        // it's not and its resources are guaranteed to be freed.
-        handler->FreeResources();
-        dm_users_.erase(iter);
-    }
-}
-
-bool SnapshotHandlerManager::InitiateMerge(const std::string& misc_name) {
-    std::lock_guard<std::mutex> lock(lock_);
-    auto iter = FindHandler(&lock, misc_name);
-    if (iter == dm_users_.end()) {
-        LOG(ERROR) << "Could not find handler: " << misc_name;
-        return false;
-    }
-
-    return StartMerge(&lock, *iter);
-}
-
-bool SnapshotHandlerManager::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
-                                        const std::shared_ptr<HandlerThread>& handler) {
-    CHECK(proof_of_lock);
-
-    if (!handler->snapuserd()->IsAttached()) {
-        LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
-        return false;
-    }
-
-    handler->snapuserd()->MonitorMerge();
-
-    if (!is_merge_monitor_started_) {
-        std::thread(&SnapshotHandlerManager::MonitorMerge, this).detach();
-        is_merge_monitor_started_ = true;
-    }
-
-    merge_handlers_.push(handler);
-    WakeupMonitorMergeThread();
-    return true;
-}
-
-void SnapshotHandlerManager::WakeupMonitorMergeThread() {
-    uint64_t notify = 1;
-    ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
-    if (rc < 0) {
-        PLOG(FATAL) << "failed to notify monitor merge thread";
-    }
-}
-
-void SnapshotHandlerManager::MonitorMerge() {
-    while (!stop_monitor_merge_thread_) {
-        uint64_t testVal;
-        ssize_t ret =
-                TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
-        if (ret == -1) {
-            PLOG(FATAL) << "Failed to read from eventfd";
-        } else if (ret == 0) {
-            LOG(FATAL) << "Hit EOF on eventfd";
-        }
-
-        LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
-        {
-            std::lock_guard<std::mutex> lock(lock_);
-            while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
-                auto handler = merge_handlers_.front();
-                merge_handlers_.pop();
-
-                if (!handler->snapuserd()) {
-                    LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
-                    continue;
-                }
-
-                LOG(INFO) << "Starting merge for partition: "
-                          << handler->snapuserd()->GetMiscName();
-                handler->snapuserd()->InitiateMerge();
-                active_merge_threads_ += 1;
-            }
-        }
-    }
-
-    LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
-}
-
-std::string SnapshotHandlerManager::GetMergeStatus(const std::string& misc_name) {
-    std::lock_guard<std::mutex> lock(lock_);
-    auto iter = FindHandler(&lock, misc_name);
-    if (iter == dm_users_.end()) {
-        LOG(ERROR) << "Could not find handler: " << misc_name;
-        return {};
-    }
-
-    return (*iter)->snapuserd()->GetMergeStatus();
-}
-
-double SnapshotHandlerManager::GetMergePercentage() {
-    std::lock_guard<std::mutex> lock(lock_);
-
-    double percentage = 0.0;
-    int n = 0;
-
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        auto& th = (*iter)->thread();
-        if (th.joinable()) {
-            // Merge percentage by individual partitions wherein merge is still
-            // in-progress
-            percentage += (*iter)->snapuserd()->GetMergePercentage();
-            n += 1;
-        }
-    }
-
-    // Calculate final merge including those partitions where merge was already
-    // completed - num_partitions_merge_complete_ will track them when each
-    // thread exists in RunThread.
-    int total_partitions = n + num_partitions_merge_complete_;
-
-    if (total_partitions) {
-        percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
-    }
-
-    LOG(DEBUG) << "Merge %: " << percentage
-               << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
-               << " total_partitions: " << total_partitions << " n: " << n;
-    return percentage;
-}
-
-bool SnapshotHandlerManager::GetVerificationStatus() {
-    std::lock_guard<std::mutex> lock(lock_);
-
-    bool status = true;
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        auto& th = (*iter)->thread();
-        if (th.joinable() && status) {
-            status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
-        } else {
-            // return immediately if there is a failure
-            return false;
-        }
-    }
-
-    return status;
-}
-
-bool SnapshotHandlerManager::RemoveAndJoinHandler(const std::string& misc_name) {
-    std::shared_ptr<HandlerThread> handler;
-    {
-        std::lock_guard<std::mutex> lock(lock_);
-
-        auto iter = FindHandler(&lock, misc_name);
-        if (iter == dm_users_.end()) {
-            // Client already deleted.
-            return true;
-        }
-        handler = std::move(*iter);
-        dm_users_.erase(iter);
-    }
-
-    auto& th = handler->thread();
-    if (th.joinable()) {
-        th.join();
-    }
-    return true;
-}
-
-void SnapshotHandlerManager::TerminateMergeThreads() {
-    std::lock_guard<std::mutex> guard(lock_);
-
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        if (!(*iter)->ThreadTerminated()) {
-            (*iter)->snapuserd()->NotifyIOTerminated();
-        }
-    }
-}
-
-void SnapshotHandlerManager::JoinAllThreads() {
-    // Acquire the thread list within the lock.
-    std::vector<std::shared_ptr<HandlerThread>> dm_users;
-    {
-        std::lock_guard<std::mutex> guard(lock_);
-        dm_users = std::move(dm_users_);
-    }
-
-    for (auto& client : dm_users) {
-        auto& th = client->thread();
-
-        if (th.joinable()) th.join();
-    }
-
-    stop_monitor_merge_thread_ = true;
-    WakeupMonitorMergeThread();
-}
-
-auto SnapshotHandlerManager::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
-                                         const std::string& misc_name) -> HandlerList::iterator {
-    CHECK(proof_of_lock);
-
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        if ((*iter)->misc_name() == misc_name) {
-            return iter;
-        }
-    }
-    return dm_users_.end();
-}
-
-}  // namespace snapshot
-}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
deleted file mode 100644
index b7ddac1..0000000
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
+++ /dev/null
@@ -1,131 +0,0 @@
-// Copyright (C) 2023 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#pragma once
-
-#include <memory>
-#include <queue>
-#include <string>
-#include <thread>
-#include <vector>
-
-#include <android-base/unique_fd.h>
-
-namespace android {
-namespace snapshot {
-
-class SnapshotHandler;
-
-class HandlerThread {
-  public:
-    explicit HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd);
-
-    void FreeResources();
-    const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
-    std::thread& thread() { return thread_; }
-
-    const std::string& misc_name() const { return misc_name_; }
-    bool ThreadTerminated() { return thread_terminated_; }
-    void SetThreadTerminated() { thread_terminated_ = true; }
-
-  private:
-    std::thread thread_;
-    std::shared_ptr<SnapshotHandler> snapuserd_;
-    std::string misc_name_;
-    bool thread_terminated_ = false;
-};
-
-class ISnapshotHandlerManager {
-  public:
-    virtual ~ISnapshotHandlerManager() {}
-
-    // Add a new snapshot handler but do not start serving requests yet.
-    virtual std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
-                                                      const std::string& cow_device_path,
-                                                      const std::string& backing_device,
-                                                      const std::string& base_path_merge,
-                                                      int num_worker_threads, bool use_iouring,
-                                                      bool perform_verification) = 0;
-
-    // Start serving requests on a snapshot handler.
-    virtual bool StartHandler(const std::string& misc_name) = 0;
-
-    // Stop serving requests on a snapshot handler and remove it.
-    virtual bool DeleteHandler(const std::string& misc_name) = 0;
-
-    // Begin merging blocks on the given snapshot handler.
-    virtual bool InitiateMerge(const std::string& misc_name) = 0;
-
-    // Return a string containing a status code indicating the merge status
-    // on the handler. Returns empty on error.
-    virtual std::string GetMergeStatus(const std::string& misc_name) = 0;
-
-    // Wait until all handlers have terminated.
-    virtual void JoinAllThreads() = 0;
-
-    // Stop any in-progress merge threads.
-    virtual void TerminateMergeThreads() = 0;
-
-    // Returns the merge progress across all merging snapshot handlers.
-    virtual double GetMergePercentage() = 0;
-
-    // Returns whether all snapshots have verified.
-    virtual bool GetVerificationStatus() = 0;
-};
-
-class SnapshotHandlerManager final : public ISnapshotHandlerManager {
-  public:
-    SnapshotHandlerManager();
-    std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
-                                              const std::string& cow_device_path,
-                                              const std::string& backing_device,
-                                              const std::string& base_path_merge,
-                                              int num_worker_threads, bool use_iouring,
-                                              bool perform_verification) override;
-    bool StartHandler(const std::string& misc_name) override;
-    bool DeleteHandler(const std::string& misc_name) override;
-    bool InitiateMerge(const std::string& misc_name) override;
-    std::string GetMergeStatus(const std::string& misc_name) override;
-    void JoinAllThreads() override;
-    void TerminateMergeThreads() override;
-    double GetMergePercentage() override;
-    bool GetVerificationStatus() override;
-
-  private:
-    bool StartHandler(const std::shared_ptr<HandlerThread>& handler);
-    void RunThread(std::shared_ptr<HandlerThread> handler);
-    bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
-                    const std::shared_ptr<HandlerThread>& handler);
-    void MonitorMerge();
-    void WakeupMonitorMergeThread();
-    bool RemoveAndJoinHandler(const std::string& misc_name);
-
-    // Find a HandlerThread within a lock.
-    using HandlerList = std::vector<std::shared_ptr<HandlerThread>>;
-    HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
-                                      const std::string& misc_name);
-
-    std::mutex lock_;
-    HandlerList dm_users_;
-
-    bool is_merge_monitor_started_ = false;
-    bool stop_monitor_merge_thread_ = false;
-    int active_merge_threads_ = 0;
-    int num_partitions_merge_complete_ = 0;
-    std::queue<std::shared_ptr<HandlerThread>> merge_handlers_;
-    android::base::unique_fd monitor_merge_event_fd_;
-};
-
-}  // namespace snapshot
-}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index 25ce0ae..2c201ff 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -433,6 +433,11 @@
     struct utsname uts;
     unsigned int major, minor;
 
+    if (android::base::GetBoolProperty("snapuserd.test.io_uring.force_disable", false)) {
+        SNAP_LOG(INFO) << "io_uring disabled for testing";
+        return false;
+    }
+
     if ((uname(&uts) != 0) || (sscanf(uts.release, "%u.%u", &major, &minor) != 2)) {
         SNAP_LOG(ERROR) << "Could not parse the kernel version from uname. "
                         << " io_uring not supported";
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
index c953f1a..b7ce210 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
@@ -45,9 +45,28 @@
 using android::base::borrowed_fd;
 using android::base::unique_fd;
 
+DaemonOps UserSnapshotServer::Resolveop(std::string& input) {
+    if (input == "init") return DaemonOps::INIT;
+    if (input == "start") return DaemonOps::START;
+    if (input == "stop") return DaemonOps::STOP;
+    if (input == "query") return DaemonOps::QUERY;
+    if (input == "delete") return DaemonOps::DELETE;
+    if (input == "detach") return DaemonOps::DETACH;
+    if (input == "supports") return DaemonOps::SUPPORTS;
+    if (input == "initiate_merge") return DaemonOps::INITIATE;
+    if (input == "merge_percent") return DaemonOps::PERCENTAGE;
+    if (input == "getstatus") return DaemonOps::GETSTATUS;
+    if (input == "update-verify") return DaemonOps::UPDATE_VERIFY;
+
+    return DaemonOps::INVALID;
+}
+
 UserSnapshotServer::UserSnapshotServer() {
+    monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
+    if (monitor_merge_event_fd_ == -1) {
+        PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
+    }
     terminating_ = false;
-    handlers_ = std::make_unique<SnapshotHandlerManager>();
 }
 
 UserSnapshotServer::~UserSnapshotServer() {
@@ -80,9 +99,12 @@
 
 void UserSnapshotServer::ShutdownThreads() {
     terminating_ = true;
-    handlers_->JoinAllThreads();
+    JoinAllThreads();
 }
 
+HandlerThread::HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd)
+    : snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {}
+
 bool UserSnapshotServer::Sendmsg(android::base::borrowed_fd fd, const std::string& msg) {
     ssize_t ret = TEMP_FAILURE_RETRY(send(fd.get(), msg.data(), msg.size(), MSG_NOSIGNAL));
     if (ret < 0) {
@@ -113,118 +135,226 @@
 
     std::vector<std::string> out;
     Parsemsg(str, delim, out);
+    DaemonOps op = Resolveop(out[0]);
 
-    const auto& cmd = out[0];
-    if (cmd == "init") {
-        // Message format:
-        // init,<misc_name>,<cow_device_path>,<backing_device>,<base_path_merge>
-        //
-        // Reads the metadata and send the number of sectors
-        if (out.size() != 5) {
-            LOG(ERROR) << "Malformed init message, " << out.size() << " parts";
-            return Sendmsg(fd, "fail");
-        }
+    switch (op) {
+        case DaemonOps::INIT: {
+            // Message format:
+            // init,<misc_name>,<cow_device_path>,<backing_device>,<base_path_merge>
+            //
+            // Reads the metadata and send the number of sectors
+            if (out.size() != 5) {
+                LOG(ERROR) << "Malformed init message, " << out.size() << " parts";
+                return Sendmsg(fd, "fail");
+            }
 
-        auto handler = AddHandler(out[1], out[2], out[3], out[4]);
-        if (!handler) {
-            return Sendmsg(fd, "fail");
-        }
+            auto handler = AddHandler(out[1], out[2], out[3], out[4]);
+            if (!handler) {
+                return Sendmsg(fd, "fail");
+            }
 
-        auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors());
-        return Sendmsg(fd, retval);
-    } else if (cmd == "start") {
-        // Message format:
-        // start,<misc_name>
-        //
-        // Start the new thread which binds to dm-user misc device
-        if (out.size() != 2) {
-            LOG(ERROR) << "Malformed start message, " << out.size() << " parts";
-            return Sendmsg(fd, "fail");
+            auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors());
+            return Sendmsg(fd, retval);
         }
+        case DaemonOps::START: {
+            // Message format:
+            // start,<misc_name>
+            //
+            // Start the new thread which binds to dm-user misc device
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed start message, " << out.size() << " parts";
+                return Sendmsg(fd, "fail");
+            }
 
-        if (!handlers_->StartHandler(out[1])) {
-            return Sendmsg(fd, "fail");
-        }
-        return Sendmsg(fd, "success");
-    } else if (cmd == "stop") {
-        // Message format: stop
-        //
-        // Stop all the threads gracefully and then shutdown the
-        // main thread
-        SetTerminating();
-        ShutdownThreads();
-        return true;
-    } else if (cmd == "query") {
-        // Message format: query
-        //
-        // As part of transition, Second stage daemon will be
-        // created before terminating the first stage daemon. Hence,
-        // for a brief period client may have to distiguish between
-        // first stage daemon and second stage daemon.
-        //
-        // Second stage daemon is marked as active and hence will
-        // be ready to receive control message.
-        return Sendmsg(fd, GetDaemonStatus());
-    } else if (cmd == "delete") {
-        // Message format:
-        // delete,<misc_name>
-        if (out.size() != 2) {
-            LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
-            return Sendmsg(fd, "fail");
-        }
-        if (!handlers_->DeleteHandler(out[1])) {
-            return Sendmsg(fd, "fail");
-        }
-        return Sendmsg(fd, "success");
-    } else if (cmd == "detach") {
-        handlers_->TerminateMergeThreads();
-        terminating_ = true;
-        return true;
-    } else if (cmd == "supports") {
-        if (out.size() != 2) {
-            LOG(ERROR) << "Malformed supports message, " << out.size() << " parts";
-            return Sendmsg(fd, "fail");
-        }
-        if (out[1] == "second_stage_socket_handoff") {
-            return Sendmsg(fd, "success");
-        }
-        return Sendmsg(fd, "fail");
-    } else if (cmd == "initiate_merge") {
-        if (out.size() != 2) {
-            LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts";
-            return Sendmsg(fd, "fail");
-        }
-        if (out[0] == "initiate_merge") {
-            if (!handlers_->InitiateMerge(out[1])) {
+            std::lock_guard<std::mutex> lock(lock_);
+            auto iter = FindHandler(&lock, out[1]);
+            if (iter == dm_users_.end()) {
+                LOG(ERROR) << "Could not find handler: " << out[1];
+                return Sendmsg(fd, "fail");
+            }
+            if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
+                LOG(ERROR) << "Tried to re-attach control device: " << out[1];
+                return Sendmsg(fd, "fail");
+            }
+            if (!StartHandler(*iter)) {
                 return Sendmsg(fd, "fail");
             }
             return Sendmsg(fd, "success");
         }
-        return Sendmsg(fd, "fail");
-    } else if (cmd == "merge_percent") {
-        double percentage = handlers_->GetMergePercentage();
-        return Sendmsg(fd, std::to_string(percentage));
-    } else if (cmd == "getstatus") {
-        // Message format:
-        // getstatus,<misc_name>
-        if (out.size() != 2) {
-            LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
-            return Sendmsg(fd, "snapshot-merge-failed");
+        case DaemonOps::STOP: {
+            // Message format: stop
+            //
+            // Stop all the threads gracefully and then shutdown the
+            // main thread
+            SetTerminating();
+            ShutdownThreads();
+            return true;
         }
-        auto status = handlers_->GetMergeStatus(out[1]);
-        if (status.empty()) {
-            return Sendmsg(fd, "snapshot-merge-failed");
+        case DaemonOps::QUERY: {
+            // Message format: query
+            //
+            // As part of transition, Second stage daemon will be
+            // created before terminating the first stage daemon. Hence,
+            // for a brief period client may have to distiguish between
+            // first stage daemon and second stage daemon.
+            //
+            // Second stage daemon is marked as active and hence will
+            // be ready to receive control message.
+            return Sendmsg(fd, GetDaemonStatus());
         }
-        return Sendmsg(fd, status);
-    } else if (cmd == "update-verify") {
-        if (!handlers_->GetVerificationStatus()) {
+        case DaemonOps::DELETE: {
+            // Message format:
+            // delete,<misc_name>
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
+                return Sendmsg(fd, "fail");
+            }
+            {
+                std::lock_guard<std::mutex> lock(lock_);
+                auto iter = FindHandler(&lock, out[1]);
+                if (iter == dm_users_.end()) {
+                    // After merge is completed, we swap dm-user table with
+                    // the underlying dm-linear base device. Hence, worker
+                    // threads would have terminted and was removed from
+                    // the list.
+                    LOG(DEBUG) << "Could not find handler: " << out[1];
+                    return Sendmsg(fd, "success");
+                }
+
+                if (!(*iter)->ThreadTerminated()) {
+                    (*iter)->snapuserd()->NotifyIOTerminated();
+                }
+            }
+            if (!RemoveAndJoinHandler(out[1])) {
+                return Sendmsg(fd, "fail");
+            }
+            return Sendmsg(fd, "success");
+        }
+        case DaemonOps::DETACH: {
+            std::lock_guard<std::mutex> lock(lock_);
+            TerminateMergeThreads(&lock);
+            terminating_ = true;
+            return true;
+        }
+        case DaemonOps::SUPPORTS: {
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed supports message, " << out.size() << " parts";
+                return Sendmsg(fd, "fail");
+            }
+            if (out[1] == "second_stage_socket_handoff") {
+                return Sendmsg(fd, "success");
+            }
             return Sendmsg(fd, "fail");
         }
-        return Sendmsg(fd, "success");
-    } else {
-        LOG(ERROR) << "Received unknown message type from client";
-        Sendmsg(fd, "fail");
-        return false;
+        case DaemonOps::INITIATE: {
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts";
+                return Sendmsg(fd, "fail");
+            }
+            if (out[0] == "initiate_merge") {
+                std::lock_guard<std::mutex> lock(lock_);
+                auto iter = FindHandler(&lock, out[1]);
+                if (iter == dm_users_.end()) {
+                    LOG(ERROR) << "Could not find handler: " << out[1];
+                    return Sendmsg(fd, "fail");
+                }
+
+                if (!StartMerge(&lock, *iter)) {
+                    return Sendmsg(fd, "fail");
+                }
+
+                return Sendmsg(fd, "success");
+            }
+            return Sendmsg(fd, "fail");
+        }
+        case DaemonOps::PERCENTAGE: {
+            std::lock_guard<std::mutex> lock(lock_);
+            double percentage = GetMergePercentage(&lock);
+
+            return Sendmsg(fd, std::to_string(percentage));
+        }
+        case DaemonOps::GETSTATUS: {
+            // Message format:
+            // getstatus,<misc_name>
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
+                return Sendmsg(fd, "snapshot-merge-failed");
+            }
+            {
+                std::lock_guard<std::mutex> lock(lock_);
+                auto iter = FindHandler(&lock, out[1]);
+                if (iter == dm_users_.end()) {
+                    LOG(ERROR) << "Could not find handler: " << out[1];
+                    return Sendmsg(fd, "snapshot-merge-failed");
+                }
+
+                std::string merge_status = GetMergeStatus(*iter);
+                return Sendmsg(fd, merge_status);
+            }
+        }
+        case DaemonOps::UPDATE_VERIFY: {
+            std::lock_guard<std::mutex> lock(lock_);
+            if (!UpdateVerification(&lock)) {
+                return Sendmsg(fd, "fail");
+            }
+
+            return Sendmsg(fd, "success");
+        }
+        default: {
+            LOG(ERROR) << "Received unknown message type from client";
+            Sendmsg(fd, "fail");
+            return false;
+        }
+    }
+}
+
+void UserSnapshotServer::RunThread(std::shared_ptr<HandlerThread> handler) {
+    LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
+
+    if (!handler->snapuserd()->Start()) {
+        LOG(ERROR) << " Failed to launch all worker threads";
+    }
+
+    handler->snapuserd()->CloseFds();
+    bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
+    handler->snapuserd()->UnmapBufferRegion();
+
+    auto misc_name = handler->misc_name();
+    LOG(INFO) << "Handler thread about to exit: " << misc_name;
+
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        if (merge_completed) {
+            num_partitions_merge_complete_ += 1;
+            active_merge_threads_ -= 1;
+            WakeupMonitorMergeThread();
+        }
+        handler->SetThreadTerminated();
+        auto iter = FindHandler(&lock, handler->misc_name());
+        if (iter == dm_users_.end()) {
+            // RemoveAndJoinHandler() already removed us from the list, and is
+            // now waiting on a join(), so just return. Additionally, release
+            // all the resources held by snapuserd object which are shared
+            // by worker threads. This should be done when the last reference
+            // of "handler" is released; but we will explicitly release here
+            // to make sure snapuserd object is freed as it is the biggest
+            // consumer of memory in the daemon.
+            handler->FreeResources();
+            LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
+            return;
+        }
+
+        LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
+
+        if (handler->snapuserd()->IsAttached()) {
+            handler->thread().detach();
+        }
+
+        // Important: free resources within the lock. This ensures that if
+        // WaitForDelete() is called, the handler is either in the list, or
+        // it's not and its resources are guaranteed to be freed.
+        handler->FreeResources();
+        dm_users_.erase(iter);
     }
 }
 
@@ -293,10 +423,28 @@
         }
     }
 
-    handlers_->JoinAllThreads();
+    JoinAllThreads();
     return true;
 }
 
+void UserSnapshotServer::JoinAllThreads() {
+    // Acquire the thread list within the lock.
+    std::vector<std::shared_ptr<HandlerThread>> dm_users;
+    {
+        std::lock_guard<std::mutex> guard(lock_);
+        dm_users = std::move(dm_users_);
+    }
+
+    for (auto& client : dm_users) {
+        auto& th = client->thread();
+
+        if (th.joinable()) th.join();
+    }
+
+    stop_monitor_merge_thread_ = true;
+    WakeupMonitorMergeThread();
+}
+
 void UserSnapshotServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
     struct pollfd p = {};
     p.fd = fd.get();
@@ -358,13 +506,185 @@
         perform_verification = false;
     }
 
-    return handlers_->AddHandler(misc_name, cow_device_path, backing_device, base_path_merge,
-                                 num_worker_threads, io_uring_enabled_, perform_verification);
+    auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
+                                                       base_path_merge, num_worker_threads,
+                                                       io_uring_enabled_, perform_verification);
+    if (!snapuserd->InitCowDevice()) {
+        LOG(ERROR) << "Failed to initialize Snapuserd";
+        return nullptr;
+    }
+
+    if (!snapuserd->InitializeWorkers()) {
+        LOG(ERROR) << "Failed to initialize workers";
+        return nullptr;
+    }
+
+    auto handler = std::make_shared<HandlerThread>(snapuserd);
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        if (FindHandler(&lock, misc_name) != dm_users_.end()) {
+            LOG(ERROR) << "Handler already exists: " << misc_name;
+            return nullptr;
+        }
+        dm_users_.push_back(handler);
+    }
+    return handler;
+}
+
+bool UserSnapshotServer::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
+    if (handler->snapuserd()->IsAttached()) {
+        LOG(ERROR) << "Handler already attached";
+        return false;
+    }
+
+    handler->snapuserd()->AttachControlDevice();
+
+    handler->thread() = std::thread(std::bind(&UserSnapshotServer::RunThread, this, handler));
+    return true;
+}
+
+bool UserSnapshotServer::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+                                    const std::shared_ptr<HandlerThread>& handler) {
+    CHECK(proof_of_lock);
+
+    if (!handler->snapuserd()->IsAttached()) {
+        LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
+        return false;
+    }
+
+    handler->snapuserd()->MonitorMerge();
+
+    if (!is_merge_monitor_started_.has_value()) {
+        std::thread(&UserSnapshotServer::MonitorMerge, this).detach();
+        is_merge_monitor_started_ = true;
+    }
+
+    merge_handlers_.push(handler);
+    WakeupMonitorMergeThread();
+    return true;
+}
+
+auto UserSnapshotServer::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
+                                     const std::string& misc_name) -> HandlerList::iterator {
+    CHECK(proof_of_lock);
+
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        if ((*iter)->misc_name() == misc_name) {
+            return iter;
+        }
+    }
+    return dm_users_.end();
+}
+
+void UserSnapshotServer::TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock) {
+    CHECK(proof_of_lock);
+
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        if (!(*iter)->ThreadTerminated()) {
+            (*iter)->snapuserd()->NotifyIOTerminated();
+        }
+    }
+}
+
+std::string UserSnapshotServer::GetMergeStatus(const std::shared_ptr<HandlerThread>& handler) {
+    return handler->snapuserd()->GetMergeStatus();
+}
+
+double UserSnapshotServer::GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock) {
+    CHECK(proof_of_lock);
+    double percentage = 0.0;
+    int n = 0;
+
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        auto& th = (*iter)->thread();
+        if (th.joinable()) {
+            // Merge percentage by individual partitions wherein merge is still
+            // in-progress
+            percentage += (*iter)->snapuserd()->GetMergePercentage();
+            n += 1;
+        }
+    }
+
+    // Calculate final merge including those partitions where merge was already
+    // completed - num_partitions_merge_complete_ will track them when each
+    // thread exists in RunThread.
+    int total_partitions = n + num_partitions_merge_complete_;
+
+    if (total_partitions) {
+        percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
+    }
+
+    LOG(DEBUG) << "Merge %: " << percentage
+               << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
+               << " total_partitions: " << total_partitions << " n: " << n;
+    return percentage;
+}
+
+bool UserSnapshotServer::RemoveAndJoinHandler(const std::string& misc_name) {
+    std::shared_ptr<HandlerThread> handler;
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+
+        auto iter = FindHandler(&lock, misc_name);
+        if (iter == dm_users_.end()) {
+            // Client already deleted.
+            return true;
+        }
+        handler = std::move(*iter);
+        dm_users_.erase(iter);
+    }
+
+    auto& th = handler->thread();
+    if (th.joinable()) {
+        th.join();
+    }
+    return true;
+}
+
+void UserSnapshotServer::WakeupMonitorMergeThread() {
+    uint64_t notify = 1;
+    ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
+    if (rc < 0) {
+        PLOG(FATAL) << "failed to notify monitor merge thread";
+    }
+}
+
+void UserSnapshotServer::MonitorMerge() {
+    while (!stop_monitor_merge_thread_) {
+        uint64_t testVal;
+        ssize_t ret =
+                TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
+        if (ret == -1) {
+            PLOG(FATAL) << "Failed to read from eventfd";
+        } else if (ret == 0) {
+            LOG(FATAL) << "Hit EOF on eventfd";
+        }
+
+        LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
+        {
+            std::lock_guard<std::mutex> lock(lock_);
+            while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
+                auto handler = merge_handlers_.front();
+                merge_handlers_.pop();
+
+                if (!handler->snapuserd()) {
+                    LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
+                    continue;
+                }
+
+                LOG(INFO) << "Starting merge for partition: "
+                          << handler->snapuserd()->GetMiscName();
+                handler->snapuserd()->InitiateMerge();
+                active_merge_threads_ += 1;
+            }
+        }
+    }
+
+    LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
 }
 
 bool UserSnapshotServer::WaitForSocket() {
-    auto scope_guard =
-            android::base::make_scope_guard([this]() -> void { handlers_->JoinAllThreads(); });
+    auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
 
     auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy;
 
@@ -461,8 +781,21 @@
     return true;
 }
 
-bool UserSnapshotServer::StartHandler(const std::string& misc_name) {
-    return handlers_->StartHandler(misc_name);
+bool UserSnapshotServer::UpdateVerification(std::lock_guard<std::mutex>* proof_of_lock) {
+    CHECK(proof_of_lock);
+
+    bool status = true;
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        auto& th = (*iter)->thread();
+        if (th.joinable() && status) {
+            status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
+        } else {
+            // return immediately if there is a failure
+            return false;
+        }
+    }
+
+    return status;
 }
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
index 988c01a..12c3903 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
@@ -31,7 +31,6 @@
 #include <vector>
 
 #include <android-base/unique_fd.h>
-#include "handler_manager.h"
 #include "snapuserd_core.h"
 
 namespace android {
@@ -40,6 +39,48 @@
 static constexpr uint32_t kMaxPacketSize = 512;
 static constexpr uint8_t kMaxMergeThreads = 2;
 
+enum class DaemonOps {
+    INIT,
+    START,
+    QUERY,
+    STOP,
+    DELETE,
+    DETACH,
+    SUPPORTS,
+    INITIATE,
+    PERCENTAGE,
+    GETSTATUS,
+    UPDATE_VERIFY,
+    INVALID,
+};
+
+class HandlerThread {
+  public:
+    explicit HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd);
+
+    void FreeResources() {
+        // Each worker thread holds a reference to snapuserd.
+        // Clear them so that all the resources
+        // held by snapuserd is released
+        if (snapuserd_) {
+            snapuserd_->FreeResources();
+            snapuserd_ = nullptr;
+        }
+    }
+    const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
+    std::thread& thread() { return thread_; }
+
+    const std::string& misc_name() const { return misc_name_; }
+    bool ThreadTerminated() { return thread_terminated_; }
+    void SetThreadTerminated() { thread_terminated_ = true; }
+
+  private:
+    std::thread thread_;
+    std::shared_ptr<SnapshotHandler> snapuserd_;
+    std::string misc_name_;
+    bool thread_terminated_ = false;
+};
+
 class UserSnapshotServer {
   private:
     android::base::unique_fd sockfd_;
@@ -47,12 +88,21 @@
     volatile bool received_socket_signal_ = false;
     std::vector<struct pollfd> watched_fds_;
     bool is_socket_present_ = false;
+    int num_partitions_merge_complete_ = 0;
+    int active_merge_threads_ = 0;
+    bool stop_monitor_merge_thread_ = false;
     bool is_server_running_ = false;
     bool io_uring_enabled_ = false;
-    std::unique_ptr<ISnapshotHandlerManager> handlers_;
+    std::optional<bool> is_merge_monitor_started_;
+
+    android::base::unique_fd monitor_merge_event_fd_;
 
     std::mutex lock_;
 
+    using HandlerList = std::vector<std::shared_ptr<HandlerThread>>;
+    HandlerList dm_users_;
+    std::queue<std::shared_ptr<HandlerThread>> merge_handlers_;
+
     void AddWatchedFd(android::base::borrowed_fd fd, int events);
     void AcceptClient();
     bool HandleClient(android::base::borrowed_fd fd, int revents);
@@ -61,14 +111,28 @@
     bool Receivemsg(android::base::borrowed_fd fd, const std::string& str);
 
     void ShutdownThreads();
+    bool RemoveAndJoinHandler(const std::string& control_device);
+    DaemonOps Resolveop(std::string& input);
     std::string GetDaemonStatus();
     void Parsemsg(std::string const& msg, const char delim, std::vector<std::string>& out);
 
     bool IsTerminating() { return terminating_; }
 
+    void RunThread(std::shared_ptr<HandlerThread> handler);
+    void MonitorMerge();
+
     void JoinAllThreads();
     bool StartWithSocket(bool start_listening);
 
+    // Find a HandlerThread within a lock.
+    HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
+                                      const std::string& misc_name);
+
+    double GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock);
+    void TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock);
+
+    bool UpdateVerification(std::lock_guard<std::mutex>* proof_of_lock);
+
   public:
     UserSnapshotServer();
     ~UserSnapshotServer();
@@ -83,8 +147,12 @@
                                               const std::string& cow_device_path,
                                               const std::string& backing_device,
                                               const std::string& base_path_merge);
-    bool StartHandler(const std::string& misc_name);
+    bool StartHandler(const std::shared_ptr<HandlerThread>& handler);
+    bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+                    const std::shared_ptr<HandlerThread>& handler);
+    std::string GetMergeStatus(const std::shared_ptr<HandlerThread>& handler);
 
+    void WakeupMonitorMergeThread();
     void SetTerminating() { terminating_ = true; }
     void ReceivedSocketSignal() { received_socket_signal_ = true; }
     void SetServerRunning() { is_server_running_ = true; }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
index 57f9e7a..1421403 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
@@ -37,9 +37,9 @@
 #include <libdm/dm.h>
 #include <libdm/loop_control.h>
 #include <libsnapshot/cow_writer.h>
+#include <snapuserd/snapuserd_client.h>
 #include <storage_literals/storage_literals.h>
 
-#include "handler_manager.h"
 #include "snapuserd_core.h"
 
 DEFINE_string(force_config, "", "Force testing mode with iouring disabled");
@@ -54,6 +54,8 @@
 using namespace android::dm;
 using namespace std;
 
+static constexpr char kSnapuserdSocketTest[] = "snapuserdTest";
+
 class Tempdevice {
   public:
     Tempdevice(const std::string& name, const DmTable& table)
@@ -66,15 +68,15 @@
     }
     ~Tempdevice() {
         if (valid_) {
-            dm_.DeleteDeviceIfExists(name_);
+            dm_.DeleteDevice(name_);
         }
     }
     bool Destroy() {
         if (!valid_) {
-            return true;
+            return false;
         }
         valid_ = false;
-        return dm_.DeleteDeviceIfExists(name_);
+        return dm_.DeleteDevice(name_);
     }
     const std::string& path() const { return path_; }
     const std::string& name() const { return name_; }
@@ -136,6 +138,7 @@
     void SetDeviceControlName();
     void InitDaemon();
     void CreateDmUserDevice();
+    void StartSnapuserdDaemon();
 
     unique_ptr<LoopDevice> base_loop_;
     unique_ptr<Tempdevice> dmuser_dev_;
@@ -145,9 +148,9 @@
 
     unique_fd base_fd_;
     std::unique_ptr<TemporaryFile> cow_system_;
+    std::unique_ptr<SnapuserdClient> client_;
     std::unique_ptr<uint8_t[]> orig_buffer_;
     std::unique_ptr<uint8_t[]> merged_buffer_;
-    SnapshotHandlerManager handlers_;
     bool setup_ok_ = false;
     bool merge_ok_ = false;
     size_t size_ = 100_MiB;
@@ -177,9 +180,9 @@
     ASSERT_TRUE(dmuser_dev_->Destroy());
 
     auto misc_device = "/dev/dm-user/" + system_device_ctrl_name_;
-    ASSERT_TRUE(handlers_.DeleteHandler(system_device_ctrl_name_));
+    ASSERT_TRUE(client_->WaitForDeviceDelete(system_device_ctrl_name_));
     ASSERT_TRUE(android::fs_mgr::WaitForFileDeleted(misc_device, 10s));
-    handlers_.TerminateMergeThreads();
+    ASSERT_TRUE(client_->DetachSnapuserd());
 }
 
 bool SnapuserdTest::SetupDefault() {
@@ -214,6 +217,8 @@
 bool SnapuserdTest::SetupDaemon() {
     SetDeviceControlName();
 
+    StartSnapuserdDaemon();
+
     CreateDmUserDevice();
     InitCowDevice();
     InitDaemon();
@@ -223,6 +228,20 @@
     return setup_ok_;
 }
 
+void SnapuserdTest::StartSnapuserdDaemon() {
+    pid_t pid = fork();
+    ASSERT_GE(pid, 0);
+    if (pid == 0) {
+        std::string arg0 = "/system/bin/snapuserd";
+        std::string arg1 = "-socket="s + kSnapuserdSocketTest;
+        char* const argv[] = {arg0.data(), arg1.data(), nullptr};
+        ASSERT_GE(execv(arg0.c_str(), argv), 0);
+    } else {
+        client_ = SnapuserdClient::Connect(kSnapuserdSocketTest, 10s);
+        ASSERT_NE(client_, nullptr);
+    }
+}
+
 void SnapuserdTest::CreateBaseDevice() {
     unique_fd rnd_fd;
 
@@ -587,17 +606,9 @@
 }
 
 void SnapuserdTest::InitCowDevice() {
-    bool use_iouring = true;
-    if (FLAGS_force_config == "iouring_disabled") {
-        use_iouring = false;
-    }
-
-    auto handler =
-            handlers_.AddHandler(system_device_ctrl_name_, cow_system_->path, base_loop_->device(),
-                                 base_loop_->device(), 1, use_iouring, false);
-    ASSERT_NE(handler, nullptr);
-    ASSERT_NE(handler->snapuserd(), nullptr);
-    ASSERT_NE(handler->snapuserd()->GetNumSectors(), 0);
+    uint64_t num_sectors = client_->InitDmUserCow(system_device_ctrl_name_, cow_system_->path,
+                                                  base_loop_->device(), base_loop_->device());
+    ASSERT_NE(num_sectors, 0);
 }
 
 void SnapuserdTest::SetDeviceControlName() {
@@ -635,12 +646,13 @@
 }
 
 void SnapuserdTest::InitDaemon() {
-    ASSERT_TRUE(handlers_.StartHandler(system_device_ctrl_name_));
+    bool ok = client_->AttachDmUser(system_device_ctrl_name_);
+    ASSERT_TRUE(ok);
 }
 
 void SnapuserdTest::CheckMergeCompletion() {
     while (true) {
-        double percentage = handlers_.GetMergePercentage();
+        double percentage = client_->GetMergePercent();
         if ((int)percentage == 100) {
             break;
         }
@@ -655,6 +667,8 @@
 
     SetDeviceControlName();
 
+    StartSnapuserdDaemon();
+
     CreateDmUserDevice();
     InitCowDevice();
     InitDaemon();
@@ -670,7 +684,8 @@
 }
 
 void SnapuserdTest::StartMerge() {
-    ASSERT_TRUE(handlers_.InitiateMerge(system_device_ctrl_name_));
+    bool ok = client_->InitiateMerge(system_device_ctrl_name_);
+    ASSERT_TRUE(ok);
 }
 
 void SnapuserdTest::ValidateMerge() {
@@ -684,6 +699,7 @@
     Shutdown();
     std::this_thread::sleep_for(500ms);
     SetDeviceControlName();
+    StartSnapuserdDaemon();
     CreateDmUserDevice();
     InitCowDevice();
     InitDaemon();
@@ -843,5 +859,20 @@
 
     gflags::ParseCommandLineFlags(&argc, &argv, false);
 
-    return RUN_ALL_TESTS();
+    android::base::SetProperty("ctl.stop", "snapuserd");
+
+    if (FLAGS_force_config == "iouring_disabled") {
+        if (!android::base::SetProperty("snapuserd.test.io_uring.force_disable", "1")) {
+            return testing::AssertionFailure()
+                   << "Failed to disable property: snapuserd.test.io_uring.disabled";
+        }
+    }
+
+    int ret = RUN_ALL_TESTS();
+
+    if (FLAGS_force_config == "iouring_disabled") {
+        android::base::SetProperty("snapuserd.test.io_uring.force_disable", "0");
+    }
+
+    return ret;
 }
diff --git a/init/property_service.cpp b/init/property_service.cpp
index 8da6982..4242912 100644
--- a/init/property_service.cpp
+++ b/init/property_service.cpp
@@ -57,12 +57,12 @@
 #include <android-base/result.h>
 #include <android-base/stringprintf.h>
 #include <android-base/strings.h>
+#include <private/android_filesystem_config.h>
 #include <property_info_parser/property_info_parser.h>
 #include <property_info_serializer/property_info_serializer.h>
 #include <selinux/android.h>
 #include <selinux/label.h>
 #include <selinux/selinux.h>
-
 #include "debug_ramdisk.h"
 #include "epoll.h"
 #include "init.h"
@@ -111,12 +111,12 @@
 
 static bool persistent_properties_loaded = false;
 
-static int property_set_fd = -1;
 static int from_init_socket = -1;
 static int init_socket = -1;
 static bool accept_messages = false;
 static std::mutex accept_messages_lock;
 static std::thread property_service_thread;
+static std::thread property_service_for_system_thread;
 
 static std::unique_ptr<PersistWriteThread> persist_write_thread;
 
@@ -394,31 +394,37 @@
         return {PROP_ERROR_INVALID_VALUE};
     }
 
-    prop_info* pi = (prop_info*)__system_property_find(name.c_str());
-    if (pi != nullptr) {
-        // ro.* properties are actually "write-once".
-        if (StartsWith(name, "ro.")) {
-            *error = "Read-only property was already set";
-            return {PROP_ERROR_READ_ONLY_PROPERTY};
-        }
-
-        __system_property_update(pi, value.c_str(), valuelen);
+    if (name == "sys.powerctl") {
+        // No action here - NotifyPropertyChange will trigger the appropriate action, and since this
+        // can come to the second thread, we mustn't call out to the __system_property_* functions
+        // which support multiple readers but only one mutator.
     } else {
-        int rc = __system_property_add(name.c_str(), name.size(), value.c_str(), valuelen);
-        if (rc < 0) {
-            *error = "__system_property_add failed";
-            return {PROP_ERROR_SET_FAILED};
-        }
-    }
+        prop_info* pi = (prop_info*)__system_property_find(name.c_str());
+        if (pi != nullptr) {
+            // ro.* properties are actually "write-once".
+            if (StartsWith(name, "ro.")) {
+                *error = "Read-only property was already set";
+                return {PROP_ERROR_READ_ONLY_PROPERTY};
+            }
 
-    // Don't write properties to disk until after we have read all default
-    // properties to prevent them from being overwritten by default values.
-    if (socket && persistent_properties_loaded && StartsWith(name, "persist.")) {
-        if (persist_write_thread) {
-            persist_write_thread->Write(name, value, std::move(*socket));
-            return {};
+            __system_property_update(pi, value.c_str(), valuelen);
+        } else {
+            int rc = __system_property_add(name.c_str(), name.size(), value.c_str(), valuelen);
+            if (rc < 0) {
+                *error = "__system_property_add failed";
+                return {PROP_ERROR_SET_FAILED};
+            }
         }
-        WritePersistentProperty(name, value);
+
+        // Don't write properties to disk until after we have read all default
+        // properties to prevent them from being overwritten by default values.
+        if (socket && persistent_properties_loaded && StartsWith(name, "persist.")) {
+            if (persist_write_thread) {
+                persist_write_thread->Write(name, value, std::move(*socket));
+                return {};
+            }
+            WritePersistentProperty(name, value);
+        }
     }
 
     NotifyPropertyChange(name, value);
@@ -579,10 +585,10 @@
     return *ret;
 }
 
-static void handle_property_set_fd() {
+static void handle_property_set_fd(int fd) {
     static constexpr uint32_t kDefaultSocketTimeout = 2000; /* ms */
 
-    int s = accept4(property_set_fd, nullptr, nullptr, SOCK_CLOEXEC);
+    int s = accept4(fd, nullptr, nullptr, SOCK_CLOEXEC);
     if (s == -1) {
         return;
     }
@@ -1419,19 +1425,21 @@
     }
 }
 
-static void PropertyServiceThread() {
+static void PropertyServiceThread(int fd, bool listen_init) {
     Epoll epoll;
     if (auto result = epoll.Open(); !result.ok()) {
         LOG(FATAL) << result.error();
     }
 
-    if (auto result = epoll.RegisterHandler(property_set_fd, handle_property_set_fd);
+    if (auto result = epoll.RegisterHandler(fd, std::bind(handle_property_set_fd, fd));
         !result.ok()) {
         LOG(FATAL) << result.error();
     }
 
-    if (auto result = epoll.RegisterHandler(init_socket, HandleInitSocket); !result.ok()) {
-        LOG(FATAL) << result.error();
+    if (listen_init) {
+        if (auto result = epoll.RegisterHandler(init_socket, HandleInitSocket); !result.ok()) {
+            LOG(FATAL) << result.error();
+        }
     }
 
     while (true) {
@@ -1482,6 +1490,23 @@
     cv_.notify_all();
 }
 
+void StartThread(const char* name, int mode, int gid, std::thread& t, bool listen_init) {
+    int fd = -1;
+    if (auto result = CreateSocket(name, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK,
+                                   /*passcred=*/false, /*should_listen=*/false, mode, /*uid=*/0,
+                                   /*gid=*/gid, /*socketcon=*/{});
+        result.ok()) {
+        fd = *result;
+    } else {
+        LOG(FATAL) << "start_property_service socket creation failed: " << result.error();
+    }
+
+    listen(fd, 8);
+
+    auto new_thread = std::thread(PropertyServiceThread, fd, listen_init);
+    t.swap(new_thread);
+}
+
 void StartPropertyService(int* epoll_socket) {
     InitPropertySet("ro.property_service.version", "2");
 
@@ -1493,19 +1518,9 @@
     init_socket = sockets[1];
     StartSendingMessages();
 
-    if (auto result = CreateSocket(PROP_SERVICE_NAME, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK,
-                                   /*passcred=*/false, /*should_listen=*/false, 0666, /*uid=*/0,
-                                   /*gid=*/0, /*socketcon=*/{});
-        result.ok()) {
-        property_set_fd = *result;
-    } else {
-        LOG(FATAL) << "start_property_service socket creation failed: " << result.error();
-    }
-
-    listen(property_set_fd, 8);
-
-    auto new_thread = std::thread{PropertyServiceThread};
-    property_service_thread.swap(new_thread);
+    StartThread(PROP_SERVICE_FOR_SYSTEM_NAME, 0660, AID_SYSTEM, property_service_for_system_thread,
+                true);
+    StartThread(PROP_SERVICE_NAME, 0666, 0, property_service_thread, false);
 
     auto async_persist_writes =
             android::base::GetBoolProperty("ro.property_service.async_persist_writes", false);
diff --git a/libprocessgroup/profiles/cgroups.json b/libprocessgroup/profiles/cgroups.json
index d013ec8..3e4393d 100644
--- a/libprocessgroup/profiles/cgroups.json
+++ b/libprocessgroup/profiles/cgroups.json
@@ -1,6 +1,13 @@
 {
   "Cgroups": [
     {
+      "Controller": "blkio",
+      "Path": "/dev/blkio",
+      "Mode": "0775",
+      "UID": "system",
+      "GID": "system"
+    },
+    {
       "Controller": "cpu",
       "Path": "/dev/cpuctl",
       "Mode": "0755",
@@ -32,12 +39,6 @@
       {
         "Controller": "freezer",
         "Path": "."
-      },
-      {
-        "Controller": "io",
-        "Path": ".",
-        "NeedsActivation": true,
-        "Optional": true
       }
     ]
   }
diff --git a/libprocessgroup/profiles/task_profiles.json b/libprocessgroup/profiles/task_profiles.json
index 12f7b44..e44d3bf 100644
--- a/libprocessgroup/profiles/task_profiles.json
+++ b/libprocessgroup/profiles/task_profiles.json
@@ -457,6 +457,14 @@
       "Name": "LowIoPriority",
       "Actions": [
         {
+          "Name": "JoinCgroup",
+          "Params":
+          {
+            "Controller": "blkio",
+            "Path": "background"
+          }
+        },
+        {
           "Name": "SetAttribute",
           "Params":
           {
@@ -489,6 +497,14 @@
       "Name": "NormalIoPriority",
       "Actions": [
         {
+          "Name": "JoinCgroup",
+          "Params":
+          {
+            "Controller": "blkio",
+            "Path": ""
+          }
+        },
+        {
           "Name": "SetAttribute",
           "Params":
           {
@@ -521,6 +537,14 @@
       "Name": "HighIoPriority",
       "Actions": [
         {
+          "Name": "JoinCgroup",
+          "Params":
+          {
+            "Controller": "blkio",
+            "Path": ""
+          }
+        },
+        {
           "Name": "SetAttribute",
           "Params":
           {
@@ -553,6 +577,14 @@
       "Name": "MaxIoPriority",
       "Actions": [
         {
+          "Name": "JoinCgroup",
+          "Params":
+          {
+            "Controller": "blkio",
+            "Path": ""
+          }
+        },
+        {
           "Name": "SetAttribute",
           "Params":
           {
diff --git a/rootdir/init.rc b/rootdir/init.rc
index d755b50..7326783 100644
--- a/rootdir/init.rc
+++ b/rootdir/init.rc
@@ -221,6 +221,26 @@
     write /dev/stune/nnapi-hal/schedtune.boost 1
     write /dev/stune/nnapi-hal/schedtune.prefer_idle 1
 
+    # Create blkio group and apply initial settings.
+    # This feature needs kernel to support it, and the
+    # device's init.rc must actually set the correct values.
+    mkdir /dev/blkio/background
+    chown system system /dev/blkio
+    chown system system /dev/blkio/background
+    chown system system /dev/blkio/tasks
+    chown system system /dev/blkio/background/tasks
+    chown system system /dev/blkio/cgroup.procs
+    chown system system /dev/blkio/background/cgroup.procs
+    chmod 0664 /dev/blkio/tasks
+    chmod 0664 /dev/blkio/background/tasks
+    chmod 0664 /dev/blkio/cgroup.procs
+    chmod 0664 /dev/blkio/background/cgroup.procs
+    write /dev/blkio/blkio.weight 1000
+    write /dev/blkio/background/blkio.weight 200
+    write /dev/blkio/background/blkio.bfq.weight 10
+    write /dev/blkio/blkio.group_idle 0
+    write /dev/blkio/background/blkio.group_idle 0
+
     restorecon_recursive /mnt
 
     mount configfs none /config nodev noexec nosuid