RESTRICT AUTOMERGE: snapuserd: Split the server into two classes.

This splits server into two classes: one that handles the IPC requests,
and one that managers snapshot handlers. This will allow embedding of
the snapshot handling code, without booting up a server. It'll also make
testing much easier.

The handler code has been further placed behind a virtual interface,
though this is likely unnecessary unless we start testing the server
logic itself (or attempt to unify the S and T+ versions of snapuserd).

Bug: 269361087
Test: ota
Change-Id: I3ca3ad9c8c1fb910a1c7bf9f44165e2f44c930c9
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index 9261482..7fcaac5 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -62,6 +62,7 @@
         "dm-snapshot-merge/snapuserd_worker.cpp",
         "dm-snapshot-merge/snapuserd_readahead.cpp",
         "snapuserd_buffer.cpp",
+        "user-space-merge/handler_manager.cpp",
         "user-space-merge/snapuserd_core.cpp",
         "user-space-merge/snapuserd_dm_user.cpp",
         "user-space-merge/snapuserd_merge.cpp",
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
index bfe93eb..0557214 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
@@ -114,7 +114,7 @@
             return false;
         }
         auto handler = user_server_.AddHandler(parts[0], parts[1], parts[2], parts[3]);
-        if (!handler || !user_server_.StartHandler(handler)) {
+        if (!handler || !user_server_.StartHandler(parts[0])) {
             return false;
         }
     }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
new file mode 100644
index 0000000..c5150c4
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
@@ -0,0 +1,371 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "handler_manager.h"
+
+#include <sys/eventfd.h>
+
+#include <android-base/logging.h>
+
+#include "snapuserd_core.h"
+
+namespace android {
+namespace snapshot {
+
+static constexpr uint8_t kMaxMergeThreads = 2;
+
+void HandlerThread::FreeResources() {
+    // Each worker thread holds a reference to snapuserd.
+    // Clear them so that all the resources
+    // held by snapuserd is released
+    if (snapuserd_) {
+        snapuserd_->FreeResources();
+        snapuserd_ = nullptr;
+    }
+}
+
+SnapshotHandlerManager::SnapshotHandlerManager() {
+    monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
+    if (monitor_merge_event_fd_ == -1) {
+        PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
+    }
+}
+
+std::shared_ptr<HandlerThread> SnapshotHandlerManager::AddHandler(
+        const std::string& misc_name, const std::string& cow_device_path,
+        const std::string& backing_device, const std::string& base_path_merge,
+        int num_worker_threads, bool use_iouring, bool perform_verification) {
+    auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
+                                                       base_path_merge, num_worker_threads,
+                                                       use_iouring, perform_verification);
+    if (!snapuserd->InitCowDevice()) {
+        LOG(ERROR) << "Failed to initialize Snapuserd";
+        return nullptr;
+    }
+
+    if (!snapuserd->InitializeWorkers()) {
+        LOG(ERROR) << "Failed to initialize workers";
+        return nullptr;
+    }
+
+    auto handler = std::make_shared<HandlerThread>(snapuserd);
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        if (FindHandler(&lock, misc_name) != dm_users_.end()) {
+            LOG(ERROR) << "Handler already exists: " << misc_name;
+            return nullptr;
+        }
+        dm_users_.push_back(handler);
+    }
+    return handler;
+}
+
+bool SnapshotHandlerManager::StartHandler(const std::string& misc_name) {
+    std::lock_guard<std::mutex> lock(lock_);
+    auto iter = FindHandler(&lock, misc_name);
+    if (iter == dm_users_.end()) {
+        LOG(ERROR) << "Could not find handler: " << misc_name;
+        return false;
+    }
+    if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
+        LOG(ERROR) << "Tried to re-attach control device: " << misc_name;
+        return false;
+    }
+    if (!StartHandler(*iter)) {
+        return false;
+    }
+    return true;
+}
+
+bool SnapshotHandlerManager::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
+    if (handler->snapuserd()->IsAttached()) {
+        LOG(ERROR) << "Handler already attached";
+        return false;
+    }
+
+    handler->snapuserd()->AttachControlDevice();
+
+    handler->thread() = std::thread(std::bind(&SnapshotHandlerManager::RunThread, this, handler));
+    return true;
+}
+
+bool SnapshotHandlerManager::DeleteHandler(const std::string& misc_name) {
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        auto iter = FindHandler(&lock, misc_name);
+        if (iter == dm_users_.end()) {
+            // After merge is completed, we swap dm-user table with
+            // the underlying dm-linear base device. Hence, worker
+            // threads would have terminted and was removed from
+            // the list.
+            LOG(DEBUG) << "Could not find handler: " << misc_name;
+            return true;
+        }
+
+        if (!(*iter)->ThreadTerminated()) {
+            (*iter)->snapuserd()->NotifyIOTerminated();
+        }
+    }
+    if (!RemoveAndJoinHandler(misc_name)) {
+        return false;
+    }
+    return true;
+}
+
+void SnapshotHandlerManager::RunThread(std::shared_ptr<HandlerThread> handler) {
+    LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
+
+    if (!handler->snapuserd()->Start()) {
+        LOG(ERROR) << " Failed to launch all worker threads";
+    }
+
+    handler->snapuserd()->CloseFds();
+    bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
+    handler->snapuserd()->UnmapBufferRegion();
+
+    auto misc_name = handler->misc_name();
+    LOG(INFO) << "Handler thread about to exit: " << misc_name;
+
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        if (merge_completed) {
+            num_partitions_merge_complete_ += 1;
+            active_merge_threads_ -= 1;
+            WakeupMonitorMergeThread();
+        }
+        handler->SetThreadTerminated();
+        auto iter = FindHandler(&lock, handler->misc_name());
+        if (iter == dm_users_.end()) {
+            // RemoveAndJoinHandler() already removed us from the list, and is
+            // now waiting on a join(), so just return. Additionally, release
+            // all the resources held by snapuserd object which are shared
+            // by worker threads. This should be done when the last reference
+            // of "handler" is released; but we will explicitly release here
+            // to make sure snapuserd object is freed as it is the biggest
+            // consumer of memory in the daemon.
+            handler->FreeResources();
+            LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
+            return;
+        }
+
+        LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
+
+        if (handler->snapuserd()->IsAttached()) {
+            handler->thread().detach();
+        }
+
+        // Important: free resources within the lock. This ensures that if
+        // WaitForDelete() is called, the handler is either in the list, or
+        // it's not and its resources are guaranteed to be freed.
+        handler->FreeResources();
+        dm_users_.erase(iter);
+    }
+}
+
+bool SnapshotHandlerManager::InitiateMerge(const std::string& misc_name) {
+    std::lock_guard<std::mutex> lock(lock_);
+    auto iter = FindHandler(&lock, misc_name);
+    if (iter == dm_users_.end()) {
+        LOG(ERROR) << "Could not find handler: " << misc_name;
+        return false;
+    }
+
+    return StartMerge(&lock, *iter);
+}
+
+bool SnapshotHandlerManager::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+                                        const std::shared_ptr<HandlerThread>& handler) {
+    CHECK(proof_of_lock);
+
+    if (!handler->snapuserd()->IsAttached()) {
+        LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
+        return false;
+    }
+
+    handler->snapuserd()->MonitorMerge();
+
+    if (!is_merge_monitor_started_) {
+        std::thread(&SnapshotHandlerManager::MonitorMerge, this).detach();
+        is_merge_monitor_started_ = true;
+    }
+
+    merge_handlers_.push(handler);
+    WakeupMonitorMergeThread();
+    return true;
+}
+
+void SnapshotHandlerManager::WakeupMonitorMergeThread() {
+    uint64_t notify = 1;
+    ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
+    if (rc < 0) {
+        PLOG(FATAL) << "failed to notify monitor merge thread";
+    }
+}
+
+void SnapshotHandlerManager::MonitorMerge() {
+    while (!stop_monitor_merge_thread_) {
+        uint64_t testVal;
+        ssize_t ret =
+                TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
+        if (ret == -1) {
+            PLOG(FATAL) << "Failed to read from eventfd";
+        } else if (ret == 0) {
+            LOG(FATAL) << "Hit EOF on eventfd";
+        }
+
+        LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
+        {
+            std::lock_guard<std::mutex> lock(lock_);
+            while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
+                auto handler = merge_handlers_.front();
+                merge_handlers_.pop();
+
+                if (!handler->snapuserd()) {
+                    LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
+                    continue;
+                }
+
+                LOG(INFO) << "Starting merge for partition: "
+                          << handler->snapuserd()->GetMiscName();
+                handler->snapuserd()->InitiateMerge();
+                active_merge_threads_ += 1;
+            }
+        }
+    }
+
+    LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
+}
+
+std::string SnapshotHandlerManager::GetMergeStatus(const std::string& misc_name) {
+    std::lock_guard<std::mutex> lock(lock_);
+    auto iter = FindHandler(&lock, misc_name);
+    if (iter == dm_users_.end()) {
+        LOG(ERROR) << "Could not find handler: " << misc_name;
+        return {};
+    }
+
+    return (*iter)->snapuserd()->GetMergeStatus();
+}
+
+double SnapshotHandlerManager::GetMergePercentage() {
+    std::lock_guard<std::mutex> lock(lock_);
+
+    double percentage = 0.0;
+    int n = 0;
+
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        auto& th = (*iter)->thread();
+        if (th.joinable()) {
+            // Merge percentage by individual partitions wherein merge is still
+            // in-progress
+            percentage += (*iter)->snapuserd()->GetMergePercentage();
+            n += 1;
+        }
+    }
+
+    // Calculate final merge including those partitions where merge was already
+    // completed - num_partitions_merge_complete_ will track them when each
+    // thread exists in RunThread.
+    int total_partitions = n + num_partitions_merge_complete_;
+
+    if (total_partitions) {
+        percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
+    }
+
+    LOG(DEBUG) << "Merge %: " << percentage
+               << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
+               << " total_partitions: " << total_partitions << " n: " << n;
+    return percentage;
+}
+
+bool SnapshotHandlerManager::GetVerificationStatus() {
+    std::lock_guard<std::mutex> lock(lock_);
+
+    bool status = true;
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        auto& th = (*iter)->thread();
+        if (th.joinable() && status) {
+            status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
+        } else {
+            // return immediately if there is a failure
+            return false;
+        }
+    }
+
+    return status;
+}
+
+bool SnapshotHandlerManager::RemoveAndJoinHandler(const std::string& misc_name) {
+    std::shared_ptr<HandlerThread> handler;
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+
+        auto iter = FindHandler(&lock, misc_name);
+        if (iter == dm_users_.end()) {
+            // Client already deleted.
+            return true;
+        }
+        handler = std::move(*iter);
+        dm_users_.erase(iter);
+    }
+
+    auto& th = handler->thread();
+    if (th.joinable()) {
+        th.join();
+    }
+    return true;
+}
+
+void SnapshotHandlerManager::TerminateMergeThreads() {
+    std::lock_guard<std::mutex> guard(lock_);
+
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        if (!(*iter)->ThreadTerminated()) {
+            (*iter)->snapuserd()->NotifyIOTerminated();
+        }
+    }
+}
+
+void SnapshotHandlerManager::JoinAllThreads() {
+    // Acquire the thread list within the lock.
+    std::vector<std::shared_ptr<HandlerThread>> dm_users;
+    {
+        std::lock_guard<std::mutex> guard(lock_);
+        dm_users = std::move(dm_users_);
+    }
+
+    for (auto& client : dm_users) {
+        auto& th = client->thread();
+
+        if (th.joinable()) th.join();
+    }
+
+    stop_monitor_merge_thread_ = true;
+    WakeupMonitorMergeThread();
+}
+
+auto SnapshotHandlerManager::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
+                                         const std::string& misc_name) -> HandlerList::iterator {
+    CHECK(proof_of_lock);
+
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        if ((*iter)->misc_name() == misc_name) {
+            return iter;
+        }
+    }
+    return dm_users_.end();
+}
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
new file mode 100644
index 0000000..b7ddac1
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
@@ -0,0 +1,131 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <memory>
+#include <queue>
+#include <string>
+#include <thread>
+#include <vector>
+
+#include <android-base/unique_fd.h>
+
+namespace android {
+namespace snapshot {
+
+class SnapshotHandler;
+
+class HandlerThread {
+  public:
+    explicit HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd);
+
+    void FreeResources();
+    const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
+    std::thread& thread() { return thread_; }
+
+    const std::string& misc_name() const { return misc_name_; }
+    bool ThreadTerminated() { return thread_terminated_; }
+    void SetThreadTerminated() { thread_terminated_ = true; }
+
+  private:
+    std::thread thread_;
+    std::shared_ptr<SnapshotHandler> snapuserd_;
+    std::string misc_name_;
+    bool thread_terminated_ = false;
+};
+
+class ISnapshotHandlerManager {
+  public:
+    virtual ~ISnapshotHandlerManager() {}
+
+    // Add a new snapshot handler but do not start serving requests yet.
+    virtual std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
+                                                      const std::string& cow_device_path,
+                                                      const std::string& backing_device,
+                                                      const std::string& base_path_merge,
+                                                      int num_worker_threads, bool use_iouring,
+                                                      bool perform_verification) = 0;
+
+    // Start serving requests on a snapshot handler.
+    virtual bool StartHandler(const std::string& misc_name) = 0;
+
+    // Stop serving requests on a snapshot handler and remove it.
+    virtual bool DeleteHandler(const std::string& misc_name) = 0;
+
+    // Begin merging blocks on the given snapshot handler.
+    virtual bool InitiateMerge(const std::string& misc_name) = 0;
+
+    // Return a string containing a status code indicating the merge status
+    // on the handler. Returns empty on error.
+    virtual std::string GetMergeStatus(const std::string& misc_name) = 0;
+
+    // Wait until all handlers have terminated.
+    virtual void JoinAllThreads() = 0;
+
+    // Stop any in-progress merge threads.
+    virtual void TerminateMergeThreads() = 0;
+
+    // Returns the merge progress across all merging snapshot handlers.
+    virtual double GetMergePercentage() = 0;
+
+    // Returns whether all snapshots have verified.
+    virtual bool GetVerificationStatus() = 0;
+};
+
+class SnapshotHandlerManager final : public ISnapshotHandlerManager {
+  public:
+    SnapshotHandlerManager();
+    std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
+                                              const std::string& cow_device_path,
+                                              const std::string& backing_device,
+                                              const std::string& base_path_merge,
+                                              int num_worker_threads, bool use_iouring,
+                                              bool perform_verification) override;
+    bool StartHandler(const std::string& misc_name) override;
+    bool DeleteHandler(const std::string& misc_name) override;
+    bool InitiateMerge(const std::string& misc_name) override;
+    std::string GetMergeStatus(const std::string& misc_name) override;
+    void JoinAllThreads() override;
+    void TerminateMergeThreads() override;
+    double GetMergePercentage() override;
+    bool GetVerificationStatus() override;
+
+  private:
+    bool StartHandler(const std::shared_ptr<HandlerThread>& handler);
+    void RunThread(std::shared_ptr<HandlerThread> handler);
+    bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+                    const std::shared_ptr<HandlerThread>& handler);
+    void MonitorMerge();
+    void WakeupMonitorMergeThread();
+    bool RemoveAndJoinHandler(const std::string& misc_name);
+
+    // Find a HandlerThread within a lock.
+    using HandlerList = std::vector<std::shared_ptr<HandlerThread>>;
+    HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
+                                      const std::string& misc_name);
+
+    std::mutex lock_;
+    HandlerList dm_users_;
+
+    bool is_merge_monitor_started_ = false;
+    bool stop_monitor_merge_thread_ = false;
+    int active_merge_threads_ = 0;
+    int num_partitions_merge_complete_ = 0;
+    std::queue<std::shared_ptr<HandlerThread>> merge_handlers_;
+    android::base::unique_fd monitor_merge_event_fd_;
+};
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
index b7ce210..cc8115f 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
@@ -62,11 +62,8 @@
 }
 
 UserSnapshotServer::UserSnapshotServer() {
-    monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
-    if (monitor_merge_event_fd_ == -1) {
-        PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
-    }
     terminating_ = false;
+    handlers_ = std::make_unique<SnapshotHandlerManager>();
 }
 
 UserSnapshotServer::~UserSnapshotServer() {
@@ -99,7 +96,7 @@
 
 void UserSnapshotServer::ShutdownThreads() {
     terminating_ = true;
-    JoinAllThreads();
+    handlers_->JoinAllThreads();
 }
 
 HandlerThread::HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd)
@@ -166,17 +163,7 @@
                 return Sendmsg(fd, "fail");
             }
 
-            std::lock_guard<std::mutex> lock(lock_);
-            auto iter = FindHandler(&lock, out[1]);
-            if (iter == dm_users_.end()) {
-                LOG(ERROR) << "Could not find handler: " << out[1];
-                return Sendmsg(fd, "fail");
-            }
-            if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
-                LOG(ERROR) << "Tried to re-attach control device: " << out[1];
-                return Sendmsg(fd, "fail");
-            }
-            if (!StartHandler(*iter)) {
+            if (!handlers_->StartHandler(out[1])) {
                 return Sendmsg(fd, "fail");
             }
             return Sendmsg(fd, "success");
@@ -209,30 +196,13 @@
                 LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
                 return Sendmsg(fd, "fail");
             }
-            {
-                std::lock_guard<std::mutex> lock(lock_);
-                auto iter = FindHandler(&lock, out[1]);
-                if (iter == dm_users_.end()) {
-                    // After merge is completed, we swap dm-user table with
-                    // the underlying dm-linear base device. Hence, worker
-                    // threads would have terminted and was removed from
-                    // the list.
-                    LOG(DEBUG) << "Could not find handler: " << out[1];
-                    return Sendmsg(fd, "success");
-                }
-
-                if (!(*iter)->ThreadTerminated()) {
-                    (*iter)->snapuserd()->NotifyIOTerminated();
-                }
-            }
-            if (!RemoveAndJoinHandler(out[1])) {
+            if (!handlers_->DeleteHandler(out[1])) {
                 return Sendmsg(fd, "fail");
             }
             return Sendmsg(fd, "success");
         }
         case DaemonOps::DETACH: {
-            std::lock_guard<std::mutex> lock(lock_);
-            TerminateMergeThreads(&lock);
+            handlers_->TerminateMergeThreads();
             terminating_ = true;
             return true;
         }
@@ -252,24 +222,15 @@
                 return Sendmsg(fd, "fail");
             }
             if (out[0] == "initiate_merge") {
-                std::lock_guard<std::mutex> lock(lock_);
-                auto iter = FindHandler(&lock, out[1]);
-                if (iter == dm_users_.end()) {
-                    LOG(ERROR) << "Could not find handler: " << out[1];
+                if (!handlers_->InitiateMerge(out[1])) {
                     return Sendmsg(fd, "fail");
                 }
-
-                if (!StartMerge(&lock, *iter)) {
-                    return Sendmsg(fd, "fail");
-                }
-
                 return Sendmsg(fd, "success");
             }
             return Sendmsg(fd, "fail");
         }
         case DaemonOps::PERCENTAGE: {
-            std::lock_guard<std::mutex> lock(lock_);
-            double percentage = GetMergePercentage(&lock);
+            double percentage = handlers_->GetMergePercentage();
 
             return Sendmsg(fd, std::to_string(percentage));
         }
@@ -280,24 +241,16 @@
                 LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
                 return Sendmsg(fd, "snapshot-merge-failed");
             }
-            {
-                std::lock_guard<std::mutex> lock(lock_);
-                auto iter = FindHandler(&lock, out[1]);
-                if (iter == dm_users_.end()) {
-                    LOG(ERROR) << "Could not find handler: " << out[1];
-                    return Sendmsg(fd, "snapshot-merge-failed");
-                }
-
-                std::string merge_status = GetMergeStatus(*iter);
-                return Sendmsg(fd, merge_status);
+            auto status = handlers_->GetMergeStatus(out[1]);
+            if (status.empty()) {
+                return Sendmsg(fd, "snapshot-merge-failed");
             }
+            return Sendmsg(fd, status);
         }
         case DaemonOps::UPDATE_VERIFY: {
-            std::lock_guard<std::mutex> lock(lock_);
-            if (!UpdateVerification(&lock)) {
+            if (!handlers_->GetVerificationStatus()) {
                 return Sendmsg(fd, "fail");
             }
-
             return Sendmsg(fd, "success");
         }
         default: {
@@ -308,56 +261,6 @@
     }
 }
 
-void UserSnapshotServer::RunThread(std::shared_ptr<HandlerThread> handler) {
-    LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
-
-    if (!handler->snapuserd()->Start()) {
-        LOG(ERROR) << " Failed to launch all worker threads";
-    }
-
-    handler->snapuserd()->CloseFds();
-    bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
-    handler->snapuserd()->UnmapBufferRegion();
-
-    auto misc_name = handler->misc_name();
-    LOG(INFO) << "Handler thread about to exit: " << misc_name;
-
-    {
-        std::lock_guard<std::mutex> lock(lock_);
-        if (merge_completed) {
-            num_partitions_merge_complete_ += 1;
-            active_merge_threads_ -= 1;
-            WakeupMonitorMergeThread();
-        }
-        handler->SetThreadTerminated();
-        auto iter = FindHandler(&lock, handler->misc_name());
-        if (iter == dm_users_.end()) {
-            // RemoveAndJoinHandler() already removed us from the list, and is
-            // now waiting on a join(), so just return. Additionally, release
-            // all the resources held by snapuserd object which are shared
-            // by worker threads. This should be done when the last reference
-            // of "handler" is released; but we will explicitly release here
-            // to make sure snapuserd object is freed as it is the biggest
-            // consumer of memory in the daemon.
-            handler->FreeResources();
-            LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
-            return;
-        }
-
-        LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
-
-        if (handler->snapuserd()->IsAttached()) {
-            handler->thread().detach();
-        }
-
-        // Important: free resources within the lock. This ensures that if
-        // WaitForDelete() is called, the handler is either in the list, or
-        // it's not and its resources are guaranteed to be freed.
-        handler->FreeResources();
-        dm_users_.erase(iter);
-    }
-}
-
 bool UserSnapshotServer::Start(const std::string& socketname) {
     bool start_listening = true;
 
@@ -423,28 +326,10 @@
         }
     }
 
-    JoinAllThreads();
+    handlers_->JoinAllThreads();
     return true;
 }
 
-void UserSnapshotServer::JoinAllThreads() {
-    // Acquire the thread list within the lock.
-    std::vector<std::shared_ptr<HandlerThread>> dm_users;
-    {
-        std::lock_guard<std::mutex> guard(lock_);
-        dm_users = std::move(dm_users_);
-    }
-
-    for (auto& client : dm_users) {
-        auto& th = client->thread();
-
-        if (th.joinable()) th.join();
-    }
-
-    stop_monitor_merge_thread_ = true;
-    WakeupMonitorMergeThread();
-}
-
 void UserSnapshotServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
     struct pollfd p = {};
     p.fd = fd.get();
@@ -506,185 +391,13 @@
         perform_verification = false;
     }
 
-    auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
-                                                       base_path_merge, num_worker_threads,
-                                                       io_uring_enabled_, perform_verification);
-    if (!snapuserd->InitCowDevice()) {
-        LOG(ERROR) << "Failed to initialize Snapuserd";
-        return nullptr;
-    }
-
-    if (!snapuserd->InitializeWorkers()) {
-        LOG(ERROR) << "Failed to initialize workers";
-        return nullptr;
-    }
-
-    auto handler = std::make_shared<HandlerThread>(snapuserd);
-    {
-        std::lock_guard<std::mutex> lock(lock_);
-        if (FindHandler(&lock, misc_name) != dm_users_.end()) {
-            LOG(ERROR) << "Handler already exists: " << misc_name;
-            return nullptr;
-        }
-        dm_users_.push_back(handler);
-    }
-    return handler;
-}
-
-bool UserSnapshotServer::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
-    if (handler->snapuserd()->IsAttached()) {
-        LOG(ERROR) << "Handler already attached";
-        return false;
-    }
-
-    handler->snapuserd()->AttachControlDevice();
-
-    handler->thread() = std::thread(std::bind(&UserSnapshotServer::RunThread, this, handler));
-    return true;
-}
-
-bool UserSnapshotServer::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
-                                    const std::shared_ptr<HandlerThread>& handler) {
-    CHECK(proof_of_lock);
-
-    if (!handler->snapuserd()->IsAttached()) {
-        LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
-        return false;
-    }
-
-    handler->snapuserd()->MonitorMerge();
-
-    if (!is_merge_monitor_started_.has_value()) {
-        std::thread(&UserSnapshotServer::MonitorMerge, this).detach();
-        is_merge_monitor_started_ = true;
-    }
-
-    merge_handlers_.push(handler);
-    WakeupMonitorMergeThread();
-    return true;
-}
-
-auto UserSnapshotServer::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
-                                     const std::string& misc_name) -> HandlerList::iterator {
-    CHECK(proof_of_lock);
-
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        if ((*iter)->misc_name() == misc_name) {
-            return iter;
-        }
-    }
-    return dm_users_.end();
-}
-
-void UserSnapshotServer::TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock) {
-    CHECK(proof_of_lock);
-
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        if (!(*iter)->ThreadTerminated()) {
-            (*iter)->snapuserd()->NotifyIOTerminated();
-        }
-    }
-}
-
-std::string UserSnapshotServer::GetMergeStatus(const std::shared_ptr<HandlerThread>& handler) {
-    return handler->snapuserd()->GetMergeStatus();
-}
-
-double UserSnapshotServer::GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock) {
-    CHECK(proof_of_lock);
-    double percentage = 0.0;
-    int n = 0;
-
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        auto& th = (*iter)->thread();
-        if (th.joinable()) {
-            // Merge percentage by individual partitions wherein merge is still
-            // in-progress
-            percentage += (*iter)->snapuserd()->GetMergePercentage();
-            n += 1;
-        }
-    }
-
-    // Calculate final merge including those partitions where merge was already
-    // completed - num_partitions_merge_complete_ will track them when each
-    // thread exists in RunThread.
-    int total_partitions = n + num_partitions_merge_complete_;
-
-    if (total_partitions) {
-        percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
-    }
-
-    LOG(DEBUG) << "Merge %: " << percentage
-               << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
-               << " total_partitions: " << total_partitions << " n: " << n;
-    return percentage;
-}
-
-bool UserSnapshotServer::RemoveAndJoinHandler(const std::string& misc_name) {
-    std::shared_ptr<HandlerThread> handler;
-    {
-        std::lock_guard<std::mutex> lock(lock_);
-
-        auto iter = FindHandler(&lock, misc_name);
-        if (iter == dm_users_.end()) {
-            // Client already deleted.
-            return true;
-        }
-        handler = std::move(*iter);
-        dm_users_.erase(iter);
-    }
-
-    auto& th = handler->thread();
-    if (th.joinable()) {
-        th.join();
-    }
-    return true;
-}
-
-void UserSnapshotServer::WakeupMonitorMergeThread() {
-    uint64_t notify = 1;
-    ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
-    if (rc < 0) {
-        PLOG(FATAL) << "failed to notify monitor merge thread";
-    }
-}
-
-void UserSnapshotServer::MonitorMerge() {
-    while (!stop_monitor_merge_thread_) {
-        uint64_t testVal;
-        ssize_t ret =
-                TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
-        if (ret == -1) {
-            PLOG(FATAL) << "Failed to read from eventfd";
-        } else if (ret == 0) {
-            LOG(FATAL) << "Hit EOF on eventfd";
-        }
-
-        LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
-        {
-            std::lock_guard<std::mutex> lock(lock_);
-            while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
-                auto handler = merge_handlers_.front();
-                merge_handlers_.pop();
-
-                if (!handler->snapuserd()) {
-                    LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
-                    continue;
-                }
-
-                LOG(INFO) << "Starting merge for partition: "
-                          << handler->snapuserd()->GetMiscName();
-                handler->snapuserd()->InitiateMerge();
-                active_merge_threads_ += 1;
-            }
-        }
-    }
-
-    LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
+    return handlers_->AddHandler(misc_name, cow_device_path, backing_device, base_path_merge,
+                                 num_worker_threads, io_uring_enabled_, perform_verification);
 }
 
 bool UserSnapshotServer::WaitForSocket() {
-    auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
+    auto scope_guard =
+            android::base::make_scope_guard([this]() -> void { handlers_->JoinAllThreads(); });
 
     auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy;
 
@@ -781,21 +494,8 @@
     return true;
 }
 
-bool UserSnapshotServer::UpdateVerification(std::lock_guard<std::mutex>* proof_of_lock) {
-    CHECK(proof_of_lock);
-
-    bool status = true;
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        auto& th = (*iter)->thread();
-        if (th.joinable() && status) {
-            status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
-        } else {
-            // return immediately if there is a failure
-            return false;
-        }
-    }
-
-    return status;
+bool UserSnapshotServer::StartHandler(const std::string& misc_name) {
+    return handlers_->StartHandler(misc_name);
 }
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
index 12c3903..ae51903 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
@@ -31,6 +31,7 @@
 #include <vector>
 
 #include <android-base/unique_fd.h>
+#include "handler_manager.h"
 #include "snapuserd_core.h"
 
 namespace android {
@@ -54,33 +55,6 @@
     INVALID,
 };
 
-class HandlerThread {
-  public:
-    explicit HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd);
-
-    void FreeResources() {
-        // Each worker thread holds a reference to snapuserd.
-        // Clear them so that all the resources
-        // held by snapuserd is released
-        if (snapuserd_) {
-            snapuserd_->FreeResources();
-            snapuserd_ = nullptr;
-        }
-    }
-    const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
-    std::thread& thread() { return thread_; }
-
-    const std::string& misc_name() const { return misc_name_; }
-    bool ThreadTerminated() { return thread_terminated_; }
-    void SetThreadTerminated() { thread_terminated_ = true; }
-
-  private:
-    std::thread thread_;
-    std::shared_ptr<SnapshotHandler> snapuserd_;
-    std::string misc_name_;
-    bool thread_terminated_ = false;
-};
-
 class UserSnapshotServer {
   private:
     android::base::unique_fd sockfd_;
@@ -88,21 +62,12 @@
     volatile bool received_socket_signal_ = false;
     std::vector<struct pollfd> watched_fds_;
     bool is_socket_present_ = false;
-    int num_partitions_merge_complete_ = 0;
-    int active_merge_threads_ = 0;
-    bool stop_monitor_merge_thread_ = false;
     bool is_server_running_ = false;
     bool io_uring_enabled_ = false;
-    std::optional<bool> is_merge_monitor_started_;
-
-    android::base::unique_fd monitor_merge_event_fd_;
+    std::unique_ptr<ISnapshotHandlerManager> handlers_;
 
     std::mutex lock_;
 
-    using HandlerList = std::vector<std::shared_ptr<HandlerThread>>;
-    HandlerList dm_users_;
-    std::queue<std::shared_ptr<HandlerThread>> merge_handlers_;
-
     void AddWatchedFd(android::base::borrowed_fd fd, int events);
     void AcceptClient();
     bool HandleClient(android::base::borrowed_fd fd, int revents);
@@ -111,28 +76,15 @@
     bool Receivemsg(android::base::borrowed_fd fd, const std::string& str);
 
     void ShutdownThreads();
-    bool RemoveAndJoinHandler(const std::string& control_device);
     DaemonOps Resolveop(std::string& input);
     std::string GetDaemonStatus();
     void Parsemsg(std::string const& msg, const char delim, std::vector<std::string>& out);
 
     bool IsTerminating() { return terminating_; }
 
-    void RunThread(std::shared_ptr<HandlerThread> handler);
-    void MonitorMerge();
-
     void JoinAllThreads();
     bool StartWithSocket(bool start_listening);
 
-    // Find a HandlerThread within a lock.
-    HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
-                                      const std::string& misc_name);
-
-    double GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock);
-    void TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock);
-
-    bool UpdateVerification(std::lock_guard<std::mutex>* proof_of_lock);
-
   public:
     UserSnapshotServer();
     ~UserSnapshotServer();
@@ -147,12 +99,8 @@
                                               const std::string& cow_device_path,
                                               const std::string& backing_device,
                                               const std::string& base_path_merge);
-    bool StartHandler(const std::shared_ptr<HandlerThread>& handler);
-    bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
-                    const std::shared_ptr<HandlerThread>& handler);
-    std::string GetMergeStatus(const std::shared_ptr<HandlerThread>& handler);
+    bool StartHandler(const std::string& misc_name);
 
-    void WakeupMonitorMergeThread();
     void SetTerminating() { terminating_ = true; }
     void ReceivedSocketSignal() { received_socket_signal_ = true; }
     void SetServerRunning() { is_server_running_ = true; }