Merge "flattened apex: fix /apex/com.android.tethering/bin/for-system{,/clatd} mode" am: db01225c70 am: b986bd3a8d am: 7aea4bfe2e

Original change: https://android-review.googlesource.com/c/platform/system/core/+/2528043

Change-Id: Ibbdd24f012eff1673375c44ca7095f2c0e644fba
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/fs_mgr/libsnapshot/partition_cow_creator.cpp b/fs_mgr/libsnapshot/partition_cow_creator.cpp
index 7057223..5bc7e65 100644
--- a/fs_mgr/libsnapshot/partition_cow_creator.cpp
+++ b/fs_mgr/libsnapshot/partition_cow_creator.cpp
@@ -131,15 +131,28 @@
     return is_optimized;
 }
 
-void WriteExtent(DmSnapCowSizeCalculator* sc, const chromeos_update_engine::Extent& de,
+bool WriteExtent(DmSnapCowSizeCalculator* sc, const chromeos_update_engine::Extent& de,
                  unsigned int sectors_per_block) {
     const auto block_boundary = de.start_block() + de.num_blocks();
     for (auto b = de.start_block(); b < block_boundary; ++b) {
         for (unsigned int s = 0; s < sectors_per_block; ++s) {
-            const auto sector_id = b * sectors_per_block + s;
+            // sector_id = b * sectors_per_block + s;
+            uint64_t block_start_sector_id;
+            if (__builtin_mul_overflow(b, sectors_per_block, &block_start_sector_id)) {
+                LOG(ERROR) << "Integer overflow when calculating sector id (" << b << " * "
+                           << sectors_per_block << ")";
+                return false;
+            }
+            uint64_t sector_id;
+            if (__builtin_add_overflow(block_start_sector_id, s, &sector_id)) {
+                LOG(ERROR) << "Integer overflow when calculating sector id ("
+                           << block_start_sector_id << " + " << s << ")";
+                return false;
+            }
             sc->WriteSector(sector_id);
         }
     }
+    return true;
 }
 
 std::optional<uint64_t> PartitionCowCreator::GetCowSize() {
@@ -167,7 +180,7 @@
     // Allocate space for extra extents (if any). These extents are those that can be
     // used for error corrections or to store verity hash trees.
     for (const auto& de : extra_extents) {
-        WriteExtent(&sc, de, sectors_per_block);
+        if (!WriteExtent(&sc, de, sectors_per_block)) return std::nullopt;
     }
 
     if (update == nullptr) return sc.cow_size_bytes();
@@ -182,7 +195,7 @@
         }
 
         for (const auto& de : written_op->dst_extents()) {
-            WriteExtent(&sc, de, sectors_per_block);
+            if (!WriteExtent(&sc, de, sectors_per_block)) return std::nullopt;
         }
     }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index 7fcaac5..9261482 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -62,7 +62,6 @@
         "dm-snapshot-merge/snapuserd_worker.cpp",
         "dm-snapshot-merge/snapuserd_readahead.cpp",
         "snapuserd_buffer.cpp",
-        "user-space-merge/handler_manager.cpp",
         "user-space-merge/snapuserd_core.cpp",
         "user-space-merge/snapuserd_dm_user.cpp",
         "user-space-merge/snapuserd_merge.cpp",
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
index 0557214..bfe93eb 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
@@ -114,7 +114,7 @@
             return false;
         }
         auto handler = user_server_.AddHandler(parts[0], parts[1], parts[2], parts[3]);
-        if (!handler || !user_server_.StartHandler(parts[0])) {
+        if (!handler || !user_server_.StartHandler(handler)) {
             return false;
         }
     }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
deleted file mode 100644
index c5150c4..0000000
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
+++ /dev/null
@@ -1,371 +0,0 @@
-// Copyright (C) 2023 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "handler_manager.h"
-
-#include <sys/eventfd.h>
-
-#include <android-base/logging.h>
-
-#include "snapuserd_core.h"
-
-namespace android {
-namespace snapshot {
-
-static constexpr uint8_t kMaxMergeThreads = 2;
-
-void HandlerThread::FreeResources() {
-    // Each worker thread holds a reference to snapuserd.
-    // Clear them so that all the resources
-    // held by snapuserd is released
-    if (snapuserd_) {
-        snapuserd_->FreeResources();
-        snapuserd_ = nullptr;
-    }
-}
-
-SnapshotHandlerManager::SnapshotHandlerManager() {
-    monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
-    if (monitor_merge_event_fd_ == -1) {
-        PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
-    }
-}
-
-std::shared_ptr<HandlerThread> SnapshotHandlerManager::AddHandler(
-        const std::string& misc_name, const std::string& cow_device_path,
-        const std::string& backing_device, const std::string& base_path_merge,
-        int num_worker_threads, bool use_iouring, bool perform_verification) {
-    auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
-                                                       base_path_merge, num_worker_threads,
-                                                       use_iouring, perform_verification);
-    if (!snapuserd->InitCowDevice()) {
-        LOG(ERROR) << "Failed to initialize Snapuserd";
-        return nullptr;
-    }
-
-    if (!snapuserd->InitializeWorkers()) {
-        LOG(ERROR) << "Failed to initialize workers";
-        return nullptr;
-    }
-
-    auto handler = std::make_shared<HandlerThread>(snapuserd);
-    {
-        std::lock_guard<std::mutex> lock(lock_);
-        if (FindHandler(&lock, misc_name) != dm_users_.end()) {
-            LOG(ERROR) << "Handler already exists: " << misc_name;
-            return nullptr;
-        }
-        dm_users_.push_back(handler);
-    }
-    return handler;
-}
-
-bool SnapshotHandlerManager::StartHandler(const std::string& misc_name) {
-    std::lock_guard<std::mutex> lock(lock_);
-    auto iter = FindHandler(&lock, misc_name);
-    if (iter == dm_users_.end()) {
-        LOG(ERROR) << "Could not find handler: " << misc_name;
-        return false;
-    }
-    if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
-        LOG(ERROR) << "Tried to re-attach control device: " << misc_name;
-        return false;
-    }
-    if (!StartHandler(*iter)) {
-        return false;
-    }
-    return true;
-}
-
-bool SnapshotHandlerManager::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
-    if (handler->snapuserd()->IsAttached()) {
-        LOG(ERROR) << "Handler already attached";
-        return false;
-    }
-
-    handler->snapuserd()->AttachControlDevice();
-
-    handler->thread() = std::thread(std::bind(&SnapshotHandlerManager::RunThread, this, handler));
-    return true;
-}
-
-bool SnapshotHandlerManager::DeleteHandler(const std::string& misc_name) {
-    {
-        std::lock_guard<std::mutex> lock(lock_);
-        auto iter = FindHandler(&lock, misc_name);
-        if (iter == dm_users_.end()) {
-            // After merge is completed, we swap dm-user table with
-            // the underlying dm-linear base device. Hence, worker
-            // threads would have terminted and was removed from
-            // the list.
-            LOG(DEBUG) << "Could not find handler: " << misc_name;
-            return true;
-        }
-
-        if (!(*iter)->ThreadTerminated()) {
-            (*iter)->snapuserd()->NotifyIOTerminated();
-        }
-    }
-    if (!RemoveAndJoinHandler(misc_name)) {
-        return false;
-    }
-    return true;
-}
-
-void SnapshotHandlerManager::RunThread(std::shared_ptr<HandlerThread> handler) {
-    LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
-
-    if (!handler->snapuserd()->Start()) {
-        LOG(ERROR) << " Failed to launch all worker threads";
-    }
-
-    handler->snapuserd()->CloseFds();
-    bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
-    handler->snapuserd()->UnmapBufferRegion();
-
-    auto misc_name = handler->misc_name();
-    LOG(INFO) << "Handler thread about to exit: " << misc_name;
-
-    {
-        std::lock_guard<std::mutex> lock(lock_);
-        if (merge_completed) {
-            num_partitions_merge_complete_ += 1;
-            active_merge_threads_ -= 1;
-            WakeupMonitorMergeThread();
-        }
-        handler->SetThreadTerminated();
-        auto iter = FindHandler(&lock, handler->misc_name());
-        if (iter == dm_users_.end()) {
-            // RemoveAndJoinHandler() already removed us from the list, and is
-            // now waiting on a join(), so just return. Additionally, release
-            // all the resources held by snapuserd object which are shared
-            // by worker threads. This should be done when the last reference
-            // of "handler" is released; but we will explicitly release here
-            // to make sure snapuserd object is freed as it is the biggest
-            // consumer of memory in the daemon.
-            handler->FreeResources();
-            LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
-            return;
-        }
-
-        LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
-
-        if (handler->snapuserd()->IsAttached()) {
-            handler->thread().detach();
-        }
-
-        // Important: free resources within the lock. This ensures that if
-        // WaitForDelete() is called, the handler is either in the list, or
-        // it's not and its resources are guaranteed to be freed.
-        handler->FreeResources();
-        dm_users_.erase(iter);
-    }
-}
-
-bool SnapshotHandlerManager::InitiateMerge(const std::string& misc_name) {
-    std::lock_guard<std::mutex> lock(lock_);
-    auto iter = FindHandler(&lock, misc_name);
-    if (iter == dm_users_.end()) {
-        LOG(ERROR) << "Could not find handler: " << misc_name;
-        return false;
-    }
-
-    return StartMerge(&lock, *iter);
-}
-
-bool SnapshotHandlerManager::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
-                                        const std::shared_ptr<HandlerThread>& handler) {
-    CHECK(proof_of_lock);
-
-    if (!handler->snapuserd()->IsAttached()) {
-        LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
-        return false;
-    }
-
-    handler->snapuserd()->MonitorMerge();
-
-    if (!is_merge_monitor_started_) {
-        std::thread(&SnapshotHandlerManager::MonitorMerge, this).detach();
-        is_merge_monitor_started_ = true;
-    }
-
-    merge_handlers_.push(handler);
-    WakeupMonitorMergeThread();
-    return true;
-}
-
-void SnapshotHandlerManager::WakeupMonitorMergeThread() {
-    uint64_t notify = 1;
-    ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
-    if (rc < 0) {
-        PLOG(FATAL) << "failed to notify monitor merge thread";
-    }
-}
-
-void SnapshotHandlerManager::MonitorMerge() {
-    while (!stop_monitor_merge_thread_) {
-        uint64_t testVal;
-        ssize_t ret =
-                TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
-        if (ret == -1) {
-            PLOG(FATAL) << "Failed to read from eventfd";
-        } else if (ret == 0) {
-            LOG(FATAL) << "Hit EOF on eventfd";
-        }
-
-        LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
-        {
-            std::lock_guard<std::mutex> lock(lock_);
-            while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
-                auto handler = merge_handlers_.front();
-                merge_handlers_.pop();
-
-                if (!handler->snapuserd()) {
-                    LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
-                    continue;
-                }
-
-                LOG(INFO) << "Starting merge for partition: "
-                          << handler->snapuserd()->GetMiscName();
-                handler->snapuserd()->InitiateMerge();
-                active_merge_threads_ += 1;
-            }
-        }
-    }
-
-    LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
-}
-
-std::string SnapshotHandlerManager::GetMergeStatus(const std::string& misc_name) {
-    std::lock_guard<std::mutex> lock(lock_);
-    auto iter = FindHandler(&lock, misc_name);
-    if (iter == dm_users_.end()) {
-        LOG(ERROR) << "Could not find handler: " << misc_name;
-        return {};
-    }
-
-    return (*iter)->snapuserd()->GetMergeStatus();
-}
-
-double SnapshotHandlerManager::GetMergePercentage() {
-    std::lock_guard<std::mutex> lock(lock_);
-
-    double percentage = 0.0;
-    int n = 0;
-
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        auto& th = (*iter)->thread();
-        if (th.joinable()) {
-            // Merge percentage by individual partitions wherein merge is still
-            // in-progress
-            percentage += (*iter)->snapuserd()->GetMergePercentage();
-            n += 1;
-        }
-    }
-
-    // Calculate final merge including those partitions where merge was already
-    // completed - num_partitions_merge_complete_ will track them when each
-    // thread exists in RunThread.
-    int total_partitions = n + num_partitions_merge_complete_;
-
-    if (total_partitions) {
-        percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
-    }
-
-    LOG(DEBUG) << "Merge %: " << percentage
-               << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
-               << " total_partitions: " << total_partitions << " n: " << n;
-    return percentage;
-}
-
-bool SnapshotHandlerManager::GetVerificationStatus() {
-    std::lock_guard<std::mutex> lock(lock_);
-
-    bool status = true;
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        auto& th = (*iter)->thread();
-        if (th.joinable() && status) {
-            status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
-        } else {
-            // return immediately if there is a failure
-            return false;
-        }
-    }
-
-    return status;
-}
-
-bool SnapshotHandlerManager::RemoveAndJoinHandler(const std::string& misc_name) {
-    std::shared_ptr<HandlerThread> handler;
-    {
-        std::lock_guard<std::mutex> lock(lock_);
-
-        auto iter = FindHandler(&lock, misc_name);
-        if (iter == dm_users_.end()) {
-            // Client already deleted.
-            return true;
-        }
-        handler = std::move(*iter);
-        dm_users_.erase(iter);
-    }
-
-    auto& th = handler->thread();
-    if (th.joinable()) {
-        th.join();
-    }
-    return true;
-}
-
-void SnapshotHandlerManager::TerminateMergeThreads() {
-    std::lock_guard<std::mutex> guard(lock_);
-
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        if (!(*iter)->ThreadTerminated()) {
-            (*iter)->snapuserd()->NotifyIOTerminated();
-        }
-    }
-}
-
-void SnapshotHandlerManager::JoinAllThreads() {
-    // Acquire the thread list within the lock.
-    std::vector<std::shared_ptr<HandlerThread>> dm_users;
-    {
-        std::lock_guard<std::mutex> guard(lock_);
-        dm_users = std::move(dm_users_);
-    }
-
-    for (auto& client : dm_users) {
-        auto& th = client->thread();
-
-        if (th.joinable()) th.join();
-    }
-
-    stop_monitor_merge_thread_ = true;
-    WakeupMonitorMergeThread();
-}
-
-auto SnapshotHandlerManager::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
-                                         const std::string& misc_name) -> HandlerList::iterator {
-    CHECK(proof_of_lock);
-
-    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
-        if ((*iter)->misc_name() == misc_name) {
-            return iter;
-        }
-    }
-    return dm_users_.end();
-}
-
-}  // namespace snapshot
-}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
deleted file mode 100644
index b7ddac1..0000000
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
+++ /dev/null
@@ -1,131 +0,0 @@
-// Copyright (C) 2023 The Android Open Source Project
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//      http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#pragma once
-
-#include <memory>
-#include <queue>
-#include <string>
-#include <thread>
-#include <vector>
-
-#include <android-base/unique_fd.h>
-
-namespace android {
-namespace snapshot {
-
-class SnapshotHandler;
-
-class HandlerThread {
-  public:
-    explicit HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd);
-
-    void FreeResources();
-    const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
-    std::thread& thread() { return thread_; }
-
-    const std::string& misc_name() const { return misc_name_; }
-    bool ThreadTerminated() { return thread_terminated_; }
-    void SetThreadTerminated() { thread_terminated_ = true; }
-
-  private:
-    std::thread thread_;
-    std::shared_ptr<SnapshotHandler> snapuserd_;
-    std::string misc_name_;
-    bool thread_terminated_ = false;
-};
-
-class ISnapshotHandlerManager {
-  public:
-    virtual ~ISnapshotHandlerManager() {}
-
-    // Add a new snapshot handler but do not start serving requests yet.
-    virtual std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
-                                                      const std::string& cow_device_path,
-                                                      const std::string& backing_device,
-                                                      const std::string& base_path_merge,
-                                                      int num_worker_threads, bool use_iouring,
-                                                      bool perform_verification) = 0;
-
-    // Start serving requests on a snapshot handler.
-    virtual bool StartHandler(const std::string& misc_name) = 0;
-
-    // Stop serving requests on a snapshot handler and remove it.
-    virtual bool DeleteHandler(const std::string& misc_name) = 0;
-
-    // Begin merging blocks on the given snapshot handler.
-    virtual bool InitiateMerge(const std::string& misc_name) = 0;
-
-    // Return a string containing a status code indicating the merge status
-    // on the handler. Returns empty on error.
-    virtual std::string GetMergeStatus(const std::string& misc_name) = 0;
-
-    // Wait until all handlers have terminated.
-    virtual void JoinAllThreads() = 0;
-
-    // Stop any in-progress merge threads.
-    virtual void TerminateMergeThreads() = 0;
-
-    // Returns the merge progress across all merging snapshot handlers.
-    virtual double GetMergePercentage() = 0;
-
-    // Returns whether all snapshots have verified.
-    virtual bool GetVerificationStatus() = 0;
-};
-
-class SnapshotHandlerManager final : public ISnapshotHandlerManager {
-  public:
-    SnapshotHandlerManager();
-    std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
-                                              const std::string& cow_device_path,
-                                              const std::string& backing_device,
-                                              const std::string& base_path_merge,
-                                              int num_worker_threads, bool use_iouring,
-                                              bool perform_verification) override;
-    bool StartHandler(const std::string& misc_name) override;
-    bool DeleteHandler(const std::string& misc_name) override;
-    bool InitiateMerge(const std::string& misc_name) override;
-    std::string GetMergeStatus(const std::string& misc_name) override;
-    void JoinAllThreads() override;
-    void TerminateMergeThreads() override;
-    double GetMergePercentage() override;
-    bool GetVerificationStatus() override;
-
-  private:
-    bool StartHandler(const std::shared_ptr<HandlerThread>& handler);
-    void RunThread(std::shared_ptr<HandlerThread> handler);
-    bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
-                    const std::shared_ptr<HandlerThread>& handler);
-    void MonitorMerge();
-    void WakeupMonitorMergeThread();
-    bool RemoveAndJoinHandler(const std::string& misc_name);
-
-    // Find a HandlerThread within a lock.
-    using HandlerList = std::vector<std::shared_ptr<HandlerThread>>;
-    HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
-                                      const std::string& misc_name);
-
-    std::mutex lock_;
-    HandlerList dm_users_;
-
-    bool is_merge_monitor_started_ = false;
-    bool stop_monitor_merge_thread_ = false;
-    int active_merge_threads_ = 0;
-    int num_partitions_merge_complete_ = 0;
-    std::queue<std::shared_ptr<HandlerThread>> merge_handlers_;
-    android::base::unique_fd monitor_merge_event_fd_;
-};
-
-}  // namespace snapshot
-}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
index d87990a..b7ce210 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
@@ -45,9 +45,28 @@
 using android::base::borrowed_fd;
 using android::base::unique_fd;
 
+DaemonOps UserSnapshotServer::Resolveop(std::string& input) {
+    if (input == "init") return DaemonOps::INIT;
+    if (input == "start") return DaemonOps::START;
+    if (input == "stop") return DaemonOps::STOP;
+    if (input == "query") return DaemonOps::QUERY;
+    if (input == "delete") return DaemonOps::DELETE;
+    if (input == "detach") return DaemonOps::DETACH;
+    if (input == "supports") return DaemonOps::SUPPORTS;
+    if (input == "initiate_merge") return DaemonOps::INITIATE;
+    if (input == "merge_percent") return DaemonOps::PERCENTAGE;
+    if (input == "getstatus") return DaemonOps::GETSTATUS;
+    if (input == "update-verify") return DaemonOps::UPDATE_VERIFY;
+
+    return DaemonOps::INVALID;
+}
+
 UserSnapshotServer::UserSnapshotServer() {
+    monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
+    if (monitor_merge_event_fd_ == -1) {
+        PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
+    }
     terminating_ = false;
-    handlers_ = std::make_unique<SnapshotHandlerManager>();
 }
 
 UserSnapshotServer::~UserSnapshotServer() {
@@ -80,7 +99,7 @@
 
 void UserSnapshotServer::ShutdownThreads() {
     terminating_ = true;
-    handlers_->JoinAllThreads();
+    JoinAllThreads();
 }
 
 HandlerThread::HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd)
@@ -116,118 +135,226 @@
 
     std::vector<std::string> out;
     Parsemsg(str, delim, out);
+    DaemonOps op = Resolveop(out[0]);
 
-    const auto& cmd = out[0];
-    if (cmd == "init") {
-        // Message format:
-        // init,<misc_name>,<cow_device_path>,<backing_device>,<base_path_merge>
-        //
-        // Reads the metadata and send the number of sectors
-        if (out.size() != 5) {
-            LOG(ERROR) << "Malformed init message, " << out.size() << " parts";
-            return Sendmsg(fd, "fail");
-        }
+    switch (op) {
+        case DaemonOps::INIT: {
+            // Message format:
+            // init,<misc_name>,<cow_device_path>,<backing_device>,<base_path_merge>
+            //
+            // Reads the metadata and send the number of sectors
+            if (out.size() != 5) {
+                LOG(ERROR) << "Malformed init message, " << out.size() << " parts";
+                return Sendmsg(fd, "fail");
+            }
 
-        auto handler = AddHandler(out[1], out[2], out[3], out[4]);
-        if (!handler) {
-            return Sendmsg(fd, "fail");
-        }
+            auto handler = AddHandler(out[1], out[2], out[3], out[4]);
+            if (!handler) {
+                return Sendmsg(fd, "fail");
+            }
 
-        auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors());
-        return Sendmsg(fd, retval);
-    } else if (cmd == "start") {
-        // Message format:
-        // start,<misc_name>
-        //
-        // Start the new thread which binds to dm-user misc device
-        if (out.size() != 2) {
-            LOG(ERROR) << "Malformed start message, " << out.size() << " parts";
-            return Sendmsg(fd, "fail");
+            auto retval = "success," + std::to_string(handler->snapuserd()->GetNumSectors());
+            return Sendmsg(fd, retval);
         }
+        case DaemonOps::START: {
+            // Message format:
+            // start,<misc_name>
+            //
+            // Start the new thread which binds to dm-user misc device
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed start message, " << out.size() << " parts";
+                return Sendmsg(fd, "fail");
+            }
 
-        if (!handlers_->StartHandler(out[1])) {
-            return Sendmsg(fd, "fail");
-        }
-        return Sendmsg(fd, "success");
-    } else if (cmd == "stop") {
-        // Message format: stop
-        //
-        // Stop all the threads gracefully and then shutdown the
-        // main thread
-        SetTerminating();
-        ShutdownThreads();
-        return true;
-    } else if (cmd == "query") {
-        // Message format: query
-        //
-        // As part of transition, Second stage daemon will be
-        // created before terminating the first stage daemon. Hence,
-        // for a brief period client may have to distiguish between
-        // first stage daemon and second stage daemon.
-        //
-        // Second stage daemon is marked as active and hence will
-        // be ready to receive control message.
-        return Sendmsg(fd, GetDaemonStatus());
-    } else if (cmd == "delete") {
-        // Message format:
-        // delete,<misc_name>
-        if (out.size() != 2) {
-            LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
-            return Sendmsg(fd, "fail");
-        }
-        if (!handlers_->DeleteHandler(out[1])) {
-            return Sendmsg(fd, "fail");
-        }
-        return Sendmsg(fd, "success");
-    } else if (cmd == "detach") {
-        handlers_->TerminateMergeThreads();
-        terminating_ = true;
-        return true;
-    } else if (cmd == "supports") {
-        if (out.size() != 2) {
-            LOG(ERROR) << "Malformed supports message, " << out.size() << " parts";
-            return Sendmsg(fd, "fail");
-        }
-        if (out[1] == "second_stage_socket_handoff") {
-            return Sendmsg(fd, "success");
-        }
-        return Sendmsg(fd, "fail");
-    } else if (cmd == "initiate_merge") {
-        if (out.size() != 2) {
-            LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts";
-            return Sendmsg(fd, "fail");
-        }
-        if (out[0] == "initiate_merge") {
-            if (!handlers_->InitiateMerge(out[1])) {
+            std::lock_guard<std::mutex> lock(lock_);
+            auto iter = FindHandler(&lock, out[1]);
+            if (iter == dm_users_.end()) {
+                LOG(ERROR) << "Could not find handler: " << out[1];
+                return Sendmsg(fd, "fail");
+            }
+            if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
+                LOG(ERROR) << "Tried to re-attach control device: " << out[1];
+                return Sendmsg(fd, "fail");
+            }
+            if (!StartHandler(*iter)) {
                 return Sendmsg(fd, "fail");
             }
             return Sendmsg(fd, "success");
         }
-        return Sendmsg(fd, "fail");
-    } else if (cmd == "merge_percent") {
-        double percentage = handlers_->GetMergePercentage();
-        return Sendmsg(fd, std::to_string(percentage));
-    } else if (cmd == "getstatus") {
-        // Message format:
-        // getstatus,<misc_name>
-        if (out.size() != 2) {
-            LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
-            return Sendmsg(fd, "snapshot-merge-failed");
+        case DaemonOps::STOP: {
+            // Message format: stop
+            //
+            // Stop all the threads gracefully and then shutdown the
+            // main thread
+            SetTerminating();
+            ShutdownThreads();
+            return true;
         }
-        auto status = handlers_->GetMergeStatus(out[1]);
-        if (status.empty()) {
-            return Sendmsg(fd, "snapshot-merge-failed");
+        case DaemonOps::QUERY: {
+            // Message format: query
+            //
+            // As part of transition, Second stage daemon will be
+            // created before terminating the first stage daemon. Hence,
+            // for a brief period client may have to distiguish between
+            // first stage daemon and second stage daemon.
+            //
+            // Second stage daemon is marked as active and hence will
+            // be ready to receive control message.
+            return Sendmsg(fd, GetDaemonStatus());
         }
-        return Sendmsg(fd, status);
-    } else if (cmd == "update-verify") {
-        if (!handlers_->GetVerificationStatus()) {
+        case DaemonOps::DELETE: {
+            // Message format:
+            // delete,<misc_name>
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
+                return Sendmsg(fd, "fail");
+            }
+            {
+                std::lock_guard<std::mutex> lock(lock_);
+                auto iter = FindHandler(&lock, out[1]);
+                if (iter == dm_users_.end()) {
+                    // After merge is completed, we swap dm-user table with
+                    // the underlying dm-linear base device. Hence, worker
+                    // threads would have terminted and was removed from
+                    // the list.
+                    LOG(DEBUG) << "Could not find handler: " << out[1];
+                    return Sendmsg(fd, "success");
+                }
+
+                if (!(*iter)->ThreadTerminated()) {
+                    (*iter)->snapuserd()->NotifyIOTerminated();
+                }
+            }
+            if (!RemoveAndJoinHandler(out[1])) {
+                return Sendmsg(fd, "fail");
+            }
+            return Sendmsg(fd, "success");
+        }
+        case DaemonOps::DETACH: {
+            std::lock_guard<std::mutex> lock(lock_);
+            TerminateMergeThreads(&lock);
+            terminating_ = true;
+            return true;
+        }
+        case DaemonOps::SUPPORTS: {
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed supports message, " << out.size() << " parts";
+                return Sendmsg(fd, "fail");
+            }
+            if (out[1] == "second_stage_socket_handoff") {
+                return Sendmsg(fd, "success");
+            }
             return Sendmsg(fd, "fail");
         }
-        return Sendmsg(fd, "success");
-    } else {
-        LOG(ERROR) << "Received unknown message type from client";
-        Sendmsg(fd, "fail");
-        return false;
+        case DaemonOps::INITIATE: {
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed initiate-merge message, " << out.size() << " parts";
+                return Sendmsg(fd, "fail");
+            }
+            if (out[0] == "initiate_merge") {
+                std::lock_guard<std::mutex> lock(lock_);
+                auto iter = FindHandler(&lock, out[1]);
+                if (iter == dm_users_.end()) {
+                    LOG(ERROR) << "Could not find handler: " << out[1];
+                    return Sendmsg(fd, "fail");
+                }
+
+                if (!StartMerge(&lock, *iter)) {
+                    return Sendmsg(fd, "fail");
+                }
+
+                return Sendmsg(fd, "success");
+            }
+            return Sendmsg(fd, "fail");
+        }
+        case DaemonOps::PERCENTAGE: {
+            std::lock_guard<std::mutex> lock(lock_);
+            double percentage = GetMergePercentage(&lock);
+
+            return Sendmsg(fd, std::to_string(percentage));
+        }
+        case DaemonOps::GETSTATUS: {
+            // Message format:
+            // getstatus,<misc_name>
+            if (out.size() != 2) {
+                LOG(ERROR) << "Malformed delete message, " << out.size() << " parts";
+                return Sendmsg(fd, "snapshot-merge-failed");
+            }
+            {
+                std::lock_guard<std::mutex> lock(lock_);
+                auto iter = FindHandler(&lock, out[1]);
+                if (iter == dm_users_.end()) {
+                    LOG(ERROR) << "Could not find handler: " << out[1];
+                    return Sendmsg(fd, "snapshot-merge-failed");
+                }
+
+                std::string merge_status = GetMergeStatus(*iter);
+                return Sendmsg(fd, merge_status);
+            }
+        }
+        case DaemonOps::UPDATE_VERIFY: {
+            std::lock_guard<std::mutex> lock(lock_);
+            if (!UpdateVerification(&lock)) {
+                return Sendmsg(fd, "fail");
+            }
+
+            return Sendmsg(fd, "success");
+        }
+        default: {
+            LOG(ERROR) << "Received unknown message type from client";
+            Sendmsg(fd, "fail");
+            return false;
+        }
+    }
+}
+
+void UserSnapshotServer::RunThread(std::shared_ptr<HandlerThread> handler) {
+    LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
+
+    if (!handler->snapuserd()->Start()) {
+        LOG(ERROR) << " Failed to launch all worker threads";
+    }
+
+    handler->snapuserd()->CloseFds();
+    bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
+    handler->snapuserd()->UnmapBufferRegion();
+
+    auto misc_name = handler->misc_name();
+    LOG(INFO) << "Handler thread about to exit: " << misc_name;
+
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        if (merge_completed) {
+            num_partitions_merge_complete_ += 1;
+            active_merge_threads_ -= 1;
+            WakeupMonitorMergeThread();
+        }
+        handler->SetThreadTerminated();
+        auto iter = FindHandler(&lock, handler->misc_name());
+        if (iter == dm_users_.end()) {
+            // RemoveAndJoinHandler() already removed us from the list, and is
+            // now waiting on a join(), so just return. Additionally, release
+            // all the resources held by snapuserd object which are shared
+            // by worker threads. This should be done when the last reference
+            // of "handler" is released; but we will explicitly release here
+            // to make sure snapuserd object is freed as it is the biggest
+            // consumer of memory in the daemon.
+            handler->FreeResources();
+            LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
+            return;
+        }
+
+        LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
+
+        if (handler->snapuserd()->IsAttached()) {
+            handler->thread().detach();
+        }
+
+        // Important: free resources within the lock. This ensures that if
+        // WaitForDelete() is called, the handler is either in the list, or
+        // it's not and its resources are guaranteed to be freed.
+        handler->FreeResources();
+        dm_users_.erase(iter);
     }
 }
 
@@ -296,10 +423,28 @@
         }
     }
 
-    handlers_->JoinAllThreads();
+    JoinAllThreads();
     return true;
 }
 
+void UserSnapshotServer::JoinAllThreads() {
+    // Acquire the thread list within the lock.
+    std::vector<std::shared_ptr<HandlerThread>> dm_users;
+    {
+        std::lock_guard<std::mutex> guard(lock_);
+        dm_users = std::move(dm_users_);
+    }
+
+    for (auto& client : dm_users) {
+        auto& th = client->thread();
+
+        if (th.joinable()) th.join();
+    }
+
+    stop_monitor_merge_thread_ = true;
+    WakeupMonitorMergeThread();
+}
+
 void UserSnapshotServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
     struct pollfd p = {};
     p.fd = fd.get();
@@ -361,13 +506,185 @@
         perform_verification = false;
     }
 
-    return handlers_->AddHandler(misc_name, cow_device_path, backing_device, base_path_merge,
-                                 num_worker_threads, io_uring_enabled_, perform_verification);
+    auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
+                                                       base_path_merge, num_worker_threads,
+                                                       io_uring_enabled_, perform_verification);
+    if (!snapuserd->InitCowDevice()) {
+        LOG(ERROR) << "Failed to initialize Snapuserd";
+        return nullptr;
+    }
+
+    if (!snapuserd->InitializeWorkers()) {
+        LOG(ERROR) << "Failed to initialize workers";
+        return nullptr;
+    }
+
+    auto handler = std::make_shared<HandlerThread>(snapuserd);
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        if (FindHandler(&lock, misc_name) != dm_users_.end()) {
+            LOG(ERROR) << "Handler already exists: " << misc_name;
+            return nullptr;
+        }
+        dm_users_.push_back(handler);
+    }
+    return handler;
+}
+
+bool UserSnapshotServer::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
+    if (handler->snapuserd()->IsAttached()) {
+        LOG(ERROR) << "Handler already attached";
+        return false;
+    }
+
+    handler->snapuserd()->AttachControlDevice();
+
+    handler->thread() = std::thread(std::bind(&UserSnapshotServer::RunThread, this, handler));
+    return true;
+}
+
+bool UserSnapshotServer::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+                                    const std::shared_ptr<HandlerThread>& handler) {
+    CHECK(proof_of_lock);
+
+    if (!handler->snapuserd()->IsAttached()) {
+        LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
+        return false;
+    }
+
+    handler->snapuserd()->MonitorMerge();
+
+    if (!is_merge_monitor_started_.has_value()) {
+        std::thread(&UserSnapshotServer::MonitorMerge, this).detach();
+        is_merge_monitor_started_ = true;
+    }
+
+    merge_handlers_.push(handler);
+    WakeupMonitorMergeThread();
+    return true;
+}
+
+auto UserSnapshotServer::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
+                                     const std::string& misc_name) -> HandlerList::iterator {
+    CHECK(proof_of_lock);
+
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        if ((*iter)->misc_name() == misc_name) {
+            return iter;
+        }
+    }
+    return dm_users_.end();
+}
+
+void UserSnapshotServer::TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock) {
+    CHECK(proof_of_lock);
+
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        if (!(*iter)->ThreadTerminated()) {
+            (*iter)->snapuserd()->NotifyIOTerminated();
+        }
+    }
+}
+
+std::string UserSnapshotServer::GetMergeStatus(const std::shared_ptr<HandlerThread>& handler) {
+    return handler->snapuserd()->GetMergeStatus();
+}
+
+double UserSnapshotServer::GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock) {
+    CHECK(proof_of_lock);
+    double percentage = 0.0;
+    int n = 0;
+
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        auto& th = (*iter)->thread();
+        if (th.joinable()) {
+            // Merge percentage by individual partitions wherein merge is still
+            // in-progress
+            percentage += (*iter)->snapuserd()->GetMergePercentage();
+            n += 1;
+        }
+    }
+
+    // Calculate final merge including those partitions where merge was already
+    // completed - num_partitions_merge_complete_ will track them when each
+    // thread exists in RunThread.
+    int total_partitions = n + num_partitions_merge_complete_;
+
+    if (total_partitions) {
+        percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
+    }
+
+    LOG(DEBUG) << "Merge %: " << percentage
+               << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
+               << " total_partitions: " << total_partitions << " n: " << n;
+    return percentage;
+}
+
+bool UserSnapshotServer::RemoveAndJoinHandler(const std::string& misc_name) {
+    std::shared_ptr<HandlerThread> handler;
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+
+        auto iter = FindHandler(&lock, misc_name);
+        if (iter == dm_users_.end()) {
+            // Client already deleted.
+            return true;
+        }
+        handler = std::move(*iter);
+        dm_users_.erase(iter);
+    }
+
+    auto& th = handler->thread();
+    if (th.joinable()) {
+        th.join();
+    }
+    return true;
+}
+
+void UserSnapshotServer::WakeupMonitorMergeThread() {
+    uint64_t notify = 1;
+    ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
+    if (rc < 0) {
+        PLOG(FATAL) << "failed to notify monitor merge thread";
+    }
+}
+
+void UserSnapshotServer::MonitorMerge() {
+    while (!stop_monitor_merge_thread_) {
+        uint64_t testVal;
+        ssize_t ret =
+                TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
+        if (ret == -1) {
+            PLOG(FATAL) << "Failed to read from eventfd";
+        } else if (ret == 0) {
+            LOG(FATAL) << "Hit EOF on eventfd";
+        }
+
+        LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
+        {
+            std::lock_guard<std::mutex> lock(lock_);
+            while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
+                auto handler = merge_handlers_.front();
+                merge_handlers_.pop();
+
+                if (!handler->snapuserd()) {
+                    LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
+                    continue;
+                }
+
+                LOG(INFO) << "Starting merge for partition: "
+                          << handler->snapuserd()->GetMiscName();
+                handler->snapuserd()->InitiateMerge();
+                active_merge_threads_ += 1;
+            }
+        }
+    }
+
+    LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
 }
 
 bool UserSnapshotServer::WaitForSocket() {
-    auto scope_guard =
-            android::base::make_scope_guard([this]() -> void { handlers_->JoinAllThreads(); });
+    auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
 
     auto socket_path = ANDROID_SOCKET_DIR "/"s + kSnapuserdSocketProxy;
 
@@ -464,8 +781,21 @@
     return true;
 }
 
-bool UserSnapshotServer::StartHandler(const std::string& misc_name) {
-    return handlers_->StartHandler(misc_name);
+bool UserSnapshotServer::UpdateVerification(std::lock_guard<std::mutex>* proof_of_lock) {
+    CHECK(proof_of_lock);
+
+    bool status = true;
+    for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
+        auto& th = (*iter)->thread();
+        if (th.joinable() && status) {
+            status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
+        } else {
+            // return immediately if there is a failure
+            return false;
+        }
+    }
+
+    return status;
 }
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
index 988c01a..12c3903 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
@@ -31,7 +31,6 @@
 #include <vector>
 
 #include <android-base/unique_fd.h>
-#include "handler_manager.h"
 #include "snapuserd_core.h"
 
 namespace android {
@@ -40,6 +39,48 @@
 static constexpr uint32_t kMaxPacketSize = 512;
 static constexpr uint8_t kMaxMergeThreads = 2;
 
+enum class DaemonOps {
+    INIT,
+    START,
+    QUERY,
+    STOP,
+    DELETE,
+    DETACH,
+    SUPPORTS,
+    INITIATE,
+    PERCENTAGE,
+    GETSTATUS,
+    UPDATE_VERIFY,
+    INVALID,
+};
+
+class HandlerThread {
+  public:
+    explicit HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd);
+
+    void FreeResources() {
+        // Each worker thread holds a reference to snapuserd.
+        // Clear them so that all the resources
+        // held by snapuserd is released
+        if (snapuserd_) {
+            snapuserd_->FreeResources();
+            snapuserd_ = nullptr;
+        }
+    }
+    const std::shared_ptr<SnapshotHandler>& snapuserd() const { return snapuserd_; }
+    std::thread& thread() { return thread_; }
+
+    const std::string& misc_name() const { return misc_name_; }
+    bool ThreadTerminated() { return thread_terminated_; }
+    void SetThreadTerminated() { thread_terminated_ = true; }
+
+  private:
+    std::thread thread_;
+    std::shared_ptr<SnapshotHandler> snapuserd_;
+    std::string misc_name_;
+    bool thread_terminated_ = false;
+};
+
 class UserSnapshotServer {
   private:
     android::base::unique_fd sockfd_;
@@ -47,12 +88,21 @@
     volatile bool received_socket_signal_ = false;
     std::vector<struct pollfd> watched_fds_;
     bool is_socket_present_ = false;
+    int num_partitions_merge_complete_ = 0;
+    int active_merge_threads_ = 0;
+    bool stop_monitor_merge_thread_ = false;
     bool is_server_running_ = false;
     bool io_uring_enabled_ = false;
-    std::unique_ptr<ISnapshotHandlerManager> handlers_;
+    std::optional<bool> is_merge_monitor_started_;
+
+    android::base::unique_fd monitor_merge_event_fd_;
 
     std::mutex lock_;
 
+    using HandlerList = std::vector<std::shared_ptr<HandlerThread>>;
+    HandlerList dm_users_;
+    std::queue<std::shared_ptr<HandlerThread>> merge_handlers_;
+
     void AddWatchedFd(android::base::borrowed_fd fd, int events);
     void AcceptClient();
     bool HandleClient(android::base::borrowed_fd fd, int revents);
@@ -61,14 +111,28 @@
     bool Receivemsg(android::base::borrowed_fd fd, const std::string& str);
 
     void ShutdownThreads();
+    bool RemoveAndJoinHandler(const std::string& control_device);
+    DaemonOps Resolveop(std::string& input);
     std::string GetDaemonStatus();
     void Parsemsg(std::string const& msg, const char delim, std::vector<std::string>& out);
 
     bool IsTerminating() { return terminating_; }
 
+    void RunThread(std::shared_ptr<HandlerThread> handler);
+    void MonitorMerge();
+
     void JoinAllThreads();
     bool StartWithSocket(bool start_listening);
 
+    // Find a HandlerThread within a lock.
+    HandlerList::iterator FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
+                                      const std::string& misc_name);
+
+    double GetMergePercentage(std::lock_guard<std::mutex>* proof_of_lock);
+    void TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock);
+
+    bool UpdateVerification(std::lock_guard<std::mutex>* proof_of_lock);
+
   public:
     UserSnapshotServer();
     ~UserSnapshotServer();
@@ -83,8 +147,12 @@
                                               const std::string& cow_device_path,
                                               const std::string& backing_device,
                                               const std::string& base_path_merge);
-    bool StartHandler(const std::string& misc_name);
+    bool StartHandler(const std::shared_ptr<HandlerThread>& handler);
+    bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+                    const std::shared_ptr<HandlerThread>& handler);
+    std::string GetMergeStatus(const std::shared_ptr<HandlerThread>& handler);
 
+    void WakeupMonitorMergeThread();
     void SetTerminating() { terminating_ = true; }
     void ReceivedSocketSignal() { received_socket_signal_ = true; }
     void SetServerRunning() { is_server_running_ = true; }
diff --git a/init/property_service.cpp b/init/property_service.cpp
index 8da6982..4242912 100644
--- a/init/property_service.cpp
+++ b/init/property_service.cpp
@@ -57,12 +57,12 @@
 #include <android-base/result.h>
 #include <android-base/stringprintf.h>
 #include <android-base/strings.h>
+#include <private/android_filesystem_config.h>
 #include <property_info_parser/property_info_parser.h>
 #include <property_info_serializer/property_info_serializer.h>
 #include <selinux/android.h>
 #include <selinux/label.h>
 #include <selinux/selinux.h>
-
 #include "debug_ramdisk.h"
 #include "epoll.h"
 #include "init.h"
@@ -111,12 +111,12 @@
 
 static bool persistent_properties_loaded = false;
 
-static int property_set_fd = -1;
 static int from_init_socket = -1;
 static int init_socket = -1;
 static bool accept_messages = false;
 static std::mutex accept_messages_lock;
 static std::thread property_service_thread;
+static std::thread property_service_for_system_thread;
 
 static std::unique_ptr<PersistWriteThread> persist_write_thread;
 
@@ -394,31 +394,37 @@
         return {PROP_ERROR_INVALID_VALUE};
     }
 
-    prop_info* pi = (prop_info*)__system_property_find(name.c_str());
-    if (pi != nullptr) {
-        // ro.* properties are actually "write-once".
-        if (StartsWith(name, "ro.")) {
-            *error = "Read-only property was already set";
-            return {PROP_ERROR_READ_ONLY_PROPERTY};
-        }
-
-        __system_property_update(pi, value.c_str(), valuelen);
+    if (name == "sys.powerctl") {
+        // No action here - NotifyPropertyChange will trigger the appropriate action, and since this
+        // can come to the second thread, we mustn't call out to the __system_property_* functions
+        // which support multiple readers but only one mutator.
     } else {
-        int rc = __system_property_add(name.c_str(), name.size(), value.c_str(), valuelen);
-        if (rc < 0) {
-            *error = "__system_property_add failed";
-            return {PROP_ERROR_SET_FAILED};
-        }
-    }
+        prop_info* pi = (prop_info*)__system_property_find(name.c_str());
+        if (pi != nullptr) {
+            // ro.* properties are actually "write-once".
+            if (StartsWith(name, "ro.")) {
+                *error = "Read-only property was already set";
+                return {PROP_ERROR_READ_ONLY_PROPERTY};
+            }
 
-    // Don't write properties to disk until after we have read all default
-    // properties to prevent them from being overwritten by default values.
-    if (socket && persistent_properties_loaded && StartsWith(name, "persist.")) {
-        if (persist_write_thread) {
-            persist_write_thread->Write(name, value, std::move(*socket));
-            return {};
+            __system_property_update(pi, value.c_str(), valuelen);
+        } else {
+            int rc = __system_property_add(name.c_str(), name.size(), value.c_str(), valuelen);
+            if (rc < 0) {
+                *error = "__system_property_add failed";
+                return {PROP_ERROR_SET_FAILED};
+            }
         }
-        WritePersistentProperty(name, value);
+
+        // Don't write properties to disk until after we have read all default
+        // properties to prevent them from being overwritten by default values.
+        if (socket && persistent_properties_loaded && StartsWith(name, "persist.")) {
+            if (persist_write_thread) {
+                persist_write_thread->Write(name, value, std::move(*socket));
+                return {};
+            }
+            WritePersistentProperty(name, value);
+        }
     }
 
     NotifyPropertyChange(name, value);
@@ -579,10 +585,10 @@
     return *ret;
 }
 
-static void handle_property_set_fd() {
+static void handle_property_set_fd(int fd) {
     static constexpr uint32_t kDefaultSocketTimeout = 2000; /* ms */
 
-    int s = accept4(property_set_fd, nullptr, nullptr, SOCK_CLOEXEC);
+    int s = accept4(fd, nullptr, nullptr, SOCK_CLOEXEC);
     if (s == -1) {
         return;
     }
@@ -1419,19 +1425,21 @@
     }
 }
 
-static void PropertyServiceThread() {
+static void PropertyServiceThread(int fd, bool listen_init) {
     Epoll epoll;
     if (auto result = epoll.Open(); !result.ok()) {
         LOG(FATAL) << result.error();
     }
 
-    if (auto result = epoll.RegisterHandler(property_set_fd, handle_property_set_fd);
+    if (auto result = epoll.RegisterHandler(fd, std::bind(handle_property_set_fd, fd));
         !result.ok()) {
         LOG(FATAL) << result.error();
     }
 
-    if (auto result = epoll.RegisterHandler(init_socket, HandleInitSocket); !result.ok()) {
-        LOG(FATAL) << result.error();
+    if (listen_init) {
+        if (auto result = epoll.RegisterHandler(init_socket, HandleInitSocket); !result.ok()) {
+            LOG(FATAL) << result.error();
+        }
     }
 
     while (true) {
@@ -1482,6 +1490,23 @@
     cv_.notify_all();
 }
 
+void StartThread(const char* name, int mode, int gid, std::thread& t, bool listen_init) {
+    int fd = -1;
+    if (auto result = CreateSocket(name, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK,
+                                   /*passcred=*/false, /*should_listen=*/false, mode, /*uid=*/0,
+                                   /*gid=*/gid, /*socketcon=*/{});
+        result.ok()) {
+        fd = *result;
+    } else {
+        LOG(FATAL) << "start_property_service socket creation failed: " << result.error();
+    }
+
+    listen(fd, 8);
+
+    auto new_thread = std::thread(PropertyServiceThread, fd, listen_init);
+    t.swap(new_thread);
+}
+
 void StartPropertyService(int* epoll_socket) {
     InitPropertySet("ro.property_service.version", "2");
 
@@ -1493,19 +1518,9 @@
     init_socket = sockets[1];
     StartSendingMessages();
 
-    if (auto result = CreateSocket(PROP_SERVICE_NAME, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK,
-                                   /*passcred=*/false, /*should_listen=*/false, 0666, /*uid=*/0,
-                                   /*gid=*/0, /*socketcon=*/{});
-        result.ok()) {
-        property_set_fd = *result;
-    } else {
-        LOG(FATAL) << "start_property_service socket creation failed: " << result.error();
-    }
-
-    listen(property_set_fd, 8);
-
-    auto new_thread = std::thread{PropertyServiceThread};
-    property_service_thread.swap(new_thread);
+    StartThread(PROP_SERVICE_FOR_SYSTEM_NAME, 0660, AID_SYSTEM, property_service_for_system_thread,
+                true);
+    StartThread(PROP_SERVICE_NAME, 0666, 0, property_service_thread, false);
 
     auto async_persist_writes =
             android::base::GetBoolProperty("ro.property_service.async_persist_writes", false);
diff --git a/libprocessgroup/include/processgroup/processgroup.h b/libprocessgroup/include/processgroup/processgroup.h
index 9b2d775..8fa9fd5 100644
--- a/libprocessgroup/include/processgroup/processgroup.h
+++ b/libprocessgroup/include/processgroup/processgroup.h
@@ -36,6 +36,7 @@
 
 bool SetTaskProfiles(int tid, const std::vector<std::string>& profiles, bool use_fd_cache = false);
 bool SetProcessProfiles(uid_t uid, pid_t pid, const std::vector<std::string>& profiles);
+bool SetUserProfiles(uid_t uid, const std::vector<std::string>& profiles);
 
 __END_DECLS
 
diff --git a/libprocessgroup/processgroup.cpp b/libprocessgroup/processgroup.cpp
index 38eb92f..ae9914d 100644
--- a/libprocessgroup/processgroup.cpp
+++ b/libprocessgroup/processgroup.cpp
@@ -200,6 +200,11 @@
     return SetProcessProfiles(uid, pid, std::span<const std::string_view>(profiles_));
 }
 
+bool SetUserProfiles(uid_t uid, const std::vector<std::string>& profiles) {
+    return TaskProfiles::GetInstance().SetUserProfiles(uid, std::span<const std::string>(profiles),
+                                                       false);
+}
+
 static std::string ConvertUidToPath(const char* cgroup, uid_t uid) {
     return StringPrintf("%s/uid_%d", cgroup, uid);
 }
diff --git a/libprocessgroup/profiles/cgroups.json b/libprocessgroup/profiles/cgroups.json
index d013ec8..3e4393d 100644
--- a/libprocessgroup/profiles/cgroups.json
+++ b/libprocessgroup/profiles/cgroups.json
@@ -1,6 +1,13 @@
 {
   "Cgroups": [
     {
+      "Controller": "blkio",
+      "Path": "/dev/blkio",
+      "Mode": "0775",
+      "UID": "system",
+      "GID": "system"
+    },
+    {
       "Controller": "cpu",
       "Path": "/dev/cpuctl",
       "Mode": "0755",
@@ -32,12 +39,6 @@
       {
         "Controller": "freezer",
         "Path": "."
-      },
-      {
-        "Controller": "io",
-        "Path": ".",
-        "NeedsActivation": true,
-        "Optional": true
       }
     ]
   }
diff --git a/libprocessgroup/profiles/task_profiles.json b/libprocessgroup/profiles/task_profiles.json
index 12f7b44..e44d3bf 100644
--- a/libprocessgroup/profiles/task_profiles.json
+++ b/libprocessgroup/profiles/task_profiles.json
@@ -457,6 +457,14 @@
       "Name": "LowIoPriority",
       "Actions": [
         {
+          "Name": "JoinCgroup",
+          "Params":
+          {
+            "Controller": "blkio",
+            "Path": "background"
+          }
+        },
+        {
           "Name": "SetAttribute",
           "Params":
           {
@@ -489,6 +497,14 @@
       "Name": "NormalIoPriority",
       "Actions": [
         {
+          "Name": "JoinCgroup",
+          "Params":
+          {
+            "Controller": "blkio",
+            "Path": ""
+          }
+        },
+        {
           "Name": "SetAttribute",
           "Params":
           {
@@ -521,6 +537,14 @@
       "Name": "HighIoPriority",
       "Actions": [
         {
+          "Name": "JoinCgroup",
+          "Params":
+          {
+            "Controller": "blkio",
+            "Path": ""
+          }
+        },
+        {
           "Name": "SetAttribute",
           "Params":
           {
@@ -553,6 +577,14 @@
       "Name": "MaxIoPriority",
       "Actions": [
         {
+          "Name": "JoinCgroup",
+          "Params":
+          {
+            "Controller": "blkio",
+            "Path": ""
+          }
+        },
+        {
           "Name": "SetAttribute",
           "Params":
           {
diff --git a/libprocessgroup/task_profiles.cpp b/libprocessgroup/task_profiles.cpp
index 4db7372..1731828 100644
--- a/libprocessgroup/task_profiles.cpp
+++ b/libprocessgroup/task_profiles.cpp
@@ -139,6 +139,17 @@
     return true;
 }
 
+bool ProfileAttribute::GetPathForUID(uid_t uid, std::string* path) const {
+    if (path == nullptr) {
+        return true;
+    }
+
+    const std::string& file_name =
+            controller()->version() == 2 && !file_v2_name_.empty() ? file_v2_name_ : file_name_;
+    *path = StringPrintf("%s/uid_%d/%s", controller()->path(), uid, file_name.c_str());
+    return true;
+}
+
 bool SetClampsAction::ExecuteForProcess(uid_t, pid_t) const {
     // TODO: add support when kernel supports util_clamp
     LOG(WARNING) << "SetClampsAction::ExecuteForProcess is not supported";
@@ -225,6 +236,29 @@
     return true;
 }
 
+bool SetAttributeAction::ExecuteForUID(uid_t uid) const {
+    std::string path;
+
+    if (!attribute_->GetPathForUID(uid, &path)) {
+        LOG(ERROR) << "Failed to find cgroup for uid " << uid;
+        return false;
+    }
+
+    if (!WriteStringToFile(value_, path)) {
+        if (access(path.c_str(), F_OK) < 0) {
+            if (optional_) {
+                return true;
+            } else {
+                LOG(ERROR) << "No such cgroup attribute: " << path;
+                return false;
+            }
+        }
+        PLOG(ERROR) << "Failed to write '" << value_ << "' to " << path;
+        return false;
+    }
+    return true;
+}
+
 SetCgroupAction::SetCgroupAction(const CgroupController& c, const std::string& p)
     : controller_(c), path_(p) {
     FdCacheHelper::Init(controller_.GetTasksFilePath(path_), fd_[ProfileAction::RCT_TASK]);
@@ -552,6 +586,16 @@
     return true;
 }
 
+bool TaskProfile::ExecuteForUID(uid_t uid) const {
+    for (const auto& element : elements_) {
+        if (!element->ExecuteForUID(uid)) {
+            LOG(VERBOSE) << "Applying profile action " << element->Name() << " failed";
+            return false;
+        }
+    }
+    return true;
+}
+
 void TaskProfile::EnableResourceCaching(ProfileAction::ResourceCacheType cache_type) {
     if (res_cached_) {
         return;
@@ -805,6 +849,24 @@
 }
 
 template <typename T>
+bool TaskProfiles::SetUserProfiles(uid_t uid, std::span<const T> profiles, bool use_fd_cache) {
+    for (const auto& name : profiles) {
+        TaskProfile* profile = GetProfile(name);
+        if (profile != nullptr) {
+            if (use_fd_cache) {
+                profile->EnableResourceCaching(ProfileAction::RCT_PROCESS);
+            }
+            if (!profile->ExecuteForUID(uid)) {
+                PLOG(WARNING) << "Failed to apply " << name << " process profile";
+            }
+        } else {
+            PLOG(WARNING) << "Failed to find " << name << "process profile";
+        }
+    }
+    return true;
+}
+
+template <typename T>
 bool TaskProfiles::SetProcessProfiles(uid_t uid, pid_t pid, std::span<const T> profiles,
                                       bool use_fd_cache) {
     bool success = true;
@@ -857,3 +919,5 @@
                                             bool use_fd_cache);
 template bool TaskProfiles::SetTaskProfiles(int tid, std::span<const std::string_view> profiles,
                                             bool use_fd_cache);
+template bool TaskProfiles::SetUserProfiles(uid_t uid, std::span<const std::string> profiles,
+                                            bool use_fd_cache);
diff --git a/libprocessgroup/task_profiles.h b/libprocessgroup/task_profiles.h
index 85b3f91..a8ecb87 100644
--- a/libprocessgroup/task_profiles.h
+++ b/libprocessgroup/task_profiles.h
@@ -36,6 +36,7 @@
     virtual const CgroupController* controller() const = 0;
     virtual const std::string& file_name() const = 0;
     virtual bool GetPathForTask(int tid, std::string* path) const = 0;
+    virtual bool GetPathForUID(uid_t uid, std::string* path) const = 0;
 };
 
 class ProfileAttribute : public IProfileAttribute {
@@ -53,6 +54,7 @@
     void Reset(const CgroupController& controller, const std::string& file_name) override;
 
     bool GetPathForTask(int tid, std::string* path) const override;
+    bool GetPathForUID(uid_t uid, std::string* path) const override;
 
   private:
     CgroupController controller_;
@@ -72,6 +74,7 @@
     // Default implementations will fail
     virtual bool ExecuteForProcess(uid_t, pid_t) const { return false; };
     virtual bool ExecuteForTask(int) const { return false; };
+    virtual bool ExecuteForUID(uid_t) const { return false; };
 
     virtual void EnableResourceCaching(ResourceCacheType) {}
     virtual void DropResourceCaching(ResourceCacheType) {}
@@ -116,6 +119,7 @@
     const char* Name() const override { return "SetAttribute"; }
     bool ExecuteForProcess(uid_t uid, pid_t pid) const override;
     bool ExecuteForTask(int tid) const override;
+    bool ExecuteForUID(uid_t uid) const override;
 
   private:
     const IProfileAttribute* attribute_;
@@ -179,6 +183,7 @@
 
     bool ExecuteForProcess(uid_t uid, pid_t pid) const;
     bool ExecuteForTask(int tid) const;
+    bool ExecuteForUID(uid_t uid) const;
     void EnableResourceCaching(ProfileAction::ResourceCacheType cache_type);
     void DropResourceCaching(ProfileAction::ResourceCacheType cache_type);
 
@@ -216,6 +221,8 @@
     bool SetProcessProfiles(uid_t uid, pid_t pid, std::span<const T> profiles, bool use_fd_cache);
     template <typename T>
     bool SetTaskProfiles(int tid, std::span<const T> profiles, bool use_fd_cache);
+    template <typename T>
+    bool SetUserProfiles(uid_t uid, std::span<const T> profiles, bool use_fd_cache);
 
   private:
     TaskProfiles();
diff --git a/libprocessgroup/task_profiles_test.cpp b/libprocessgroup/task_profiles_test.cpp
index 09ac44c..aa74f9d 100644
--- a/libprocessgroup/task_profiles_test.cpp
+++ b/libprocessgroup/task_profiles_test.cpp
@@ -121,6 +121,10 @@
         return true;
     };
 
+    bool GetPathForUID(uid_t, std::string*) const override {
+        return false;
+    }
+
   private:
     const std::string file_name_;
 };
diff --git a/rootdir/init.rc b/rootdir/init.rc
index b165778..68191bb 100644
--- a/rootdir/init.rc
+++ b/rootdir/init.rc
@@ -221,6 +221,26 @@
     write /dev/stune/nnapi-hal/schedtune.boost 1
     write /dev/stune/nnapi-hal/schedtune.prefer_idle 1
 
+    # Create blkio group and apply initial settings.
+    # This feature needs kernel to support it, and the
+    # device's init.rc must actually set the correct values.
+    mkdir /dev/blkio/background
+    chown system system /dev/blkio
+    chown system system /dev/blkio/background
+    chown system system /dev/blkio/tasks
+    chown system system /dev/blkio/background/tasks
+    chown system system /dev/blkio/cgroup.procs
+    chown system system /dev/blkio/background/cgroup.procs
+    chmod 0664 /dev/blkio/tasks
+    chmod 0664 /dev/blkio/background/tasks
+    chmod 0664 /dev/blkio/cgroup.procs
+    chmod 0664 /dev/blkio/background/cgroup.procs
+    write /dev/blkio/blkio.weight 1000
+    write /dev/blkio/background/blkio.weight 200
+    write /dev/blkio/background/blkio.bfq.weight 10
+    write /dev/blkio/blkio.group_idle 0
+    write /dev/blkio/background/blkio.group_idle 0
+
     restorecon_recursive /mnt
 
     mount configfs none /config nodev noexec nosuid