Merge changes I1625d1a6,I2db9cfa2,I59c31318,Ic0ed1a8d,I612374bb into main

* changes:
  snapuserd: Move Process ops out of Worker.
  snapuserd: Move more fields out of Worker.
  snapuserd: Split more methods out of Worker.
  snapuserd: Create a ReadWorker class.
  snapuserd: Create a MergeWorker class.
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index e5b561b..a9b96e2 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -64,12 +64,13 @@
         "dm-snapshot-merge/snapuserd_readahead.cpp",
         "snapuserd_buffer.cpp",
         "user-space-merge/handler_manager.cpp",
+        "user-space-merge/read_worker.cpp",
         "user-space-merge/snapuserd_core.cpp",
-        "user-space-merge/snapuserd_dm_user.cpp",
         "user-space-merge/snapuserd_merge.cpp",
         "user-space-merge/snapuserd_readahead.cpp",
         "user-space-merge/snapuserd_transitions.cpp",
         "user-space-merge/snapuserd_verify.cpp",
+        "user-space-merge/worker.cpp",
     ],
     static_libs: [
         "libbase",
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
index bdba5c0..4105b4b 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
@@ -18,7 +18,9 @@
 
 #include <android-base/logging.h>
 
+#include "read_worker.h"
 #include "snapuserd_core.h"
+#include "snapuserd_merge.h"
 
 namespace android {
 namespace snapshot {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
similarity index 88%
rename from fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
rename to fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
index 2b9d14e..dd2996b 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_dm_user.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
@@ -14,6 +14,8 @@
  * limitations under the License.
  */
 
+#include "read_worker.h"
+
 #include "snapuserd_core.h"
 
 namespace android {
@@ -23,59 +25,24 @@
 using namespace android::dm;
 using android::base::unique_fd;
 
-Worker::Worker(const std::string& cow_device, const std::string& backing_device,
-               const std::string& control_device, const std::string& misc_name,
-               const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd) {
-    cow_device_ = cow_device;
-    backing_store_device_ = backing_device;
-    control_device_ = control_device;
-    misc_name_ = misc_name;
-    base_path_merge_ = base_path_merge;
-    snapuserd_ = snapuserd;
+void ReadWorker::CloseFds() {
+    ctrl_fd_ = {};
+    backing_store_fd_ = {};
+    Worker::CloseFds();
 }
 
-bool Worker::InitializeFds() {
-    backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
-    if (backing_store_fd_ < 0) {
-        SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
-        return false;
-    }
-
-    cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
-    if (cow_fd_ < 0) {
-        SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
-        return false;
-    }
-
-    ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
-    if (ctrl_fd_ < 0) {
-        SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
-        return false;
-    }
-
-    // Base device used by merge thread
-    base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR));
-    if (base_path_merge_fd_ < 0) {
-        SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_;
-        return false;
-    }
-
-    return true;
-}
-
-bool Worker::InitReader() {
-    reader_ = snapuserd_->CloneReaderForWorker();
-
-    if (!reader_->InitForMerge(std::move(cow_fd_))) {
-        return false;
-    }
-    return true;
-}
+ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing_device,
+                       const std::string& control_device, const std::string& misc_name,
+                       const std::string& base_path_merge,
+                       std::shared_ptr<SnapshotHandler> snapuserd)
+    : Worker(cow_device, misc_name, base_path_merge, snapuserd),
+      backing_store_device_(backing_device),
+      control_device_(control_device) {}
 
 // Start the replace operation. This will read the
 // internal COW format and if the block is compressed,
 // it will be de-compressed.
-bool Worker::ProcessReplaceOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessReplaceOp(const CowOperation* cow_op) {
     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
     if (!buffer) {
         SNAP_LOG(ERROR) << "ProcessReplaceOp failed to allocate buffer";
@@ -88,7 +55,7 @@
     return true;
 }
 
-bool Worker::ReadFromSourceDevice(const CowOperation* cow_op) {
+bool ReadWorker::ReadFromSourceDevice(const CowOperation* cow_op) {
     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
     if (buffer == nullptr) {
         SNAP_LOG(ERROR) << "ReadFromBaseDevice: Failed to get payload buffer";
@@ -118,7 +85,7 @@
 
 // Start the copy operation. This will read the backing
 // block device which is represented by cow_op->source.
-bool Worker::ProcessCopyOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessCopyOp(const CowOperation* cow_op) {
     if (!ReadFromSourceDevice(cow_op)) {
         return false;
     }
@@ -126,7 +93,7 @@
     return true;
 }
 
-bool Worker::ProcessXorOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessXorOp(const CowOperation* cow_op) {
     if (!ReadFromSourceDevice(cow_op)) {
         return false;
     }
@@ -153,7 +120,7 @@
     return true;
 }
 
-bool Worker::ProcessZeroOp() {
+bool ReadWorker::ProcessZeroOp() {
     // Zero out the entire block
     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
     if (buffer == nullptr) {
@@ -165,7 +132,7 @@
     return true;
 }
 
-bool Worker::ProcessOrderedOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessOrderedOp(const CowOperation* cow_op) {
     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
     if (buffer == nullptr) {
         SNAP_LOG(ERROR) << "ProcessOrderedOp: Failed to get payload buffer";
@@ -218,7 +185,7 @@
     return false;
 }
 
-bool Worker::ProcessCowOp(const CowOperation* cow_op) {
+bool ReadWorker::ProcessCowOp(const CowOperation* cow_op) {
     if (cow_op == nullptr) {
         SNAP_LOG(ERROR) << "ProcessCowOp: Invalid cow_op";
         return false;
@@ -246,31 +213,28 @@
     return false;
 }
 
-void Worker::InitializeBufsink() {
-    // Allocate the buffer which is used to communicate between
-    // daemon and dm-user. The buffer comprises of header and a fixed payload.
-    // If the dm-user requests a big IO, the IO will be broken into chunks
-    // of PAYLOAD_BUFFER_SZ.
-    size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_BUFFER_SZ;
-    bufsink_.Initialize(buf_size);
-}
+bool ReadWorker::Init() {
+    if (!Worker::Init()) {
+        return false;
+    }
 
-bool Worker::Init() {
-    InitializeBufsink();
+    backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
+    if (backing_store_fd_ < 0) {
+        SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
+        return false;
+    }
+
+    ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
+    if (ctrl_fd_ < 0) {
+        SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
+        return false;
+    }
+
     xorsink_.Initialize(&bufsink_, BLOCK_SZ);
-
-    if (!InitializeFds()) {
-        return false;
-    }
-
-    if (!InitReader()) {
-        return false;
-    }
-
     return true;
 }
 
-bool Worker::RunThread() {
+bool ReadWorker::Run() {
     SNAP_LOG(INFO) << "Processing snapshot I/O requests....";
 
     if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
@@ -291,7 +255,7 @@
 }
 
 // Send the payload/data back to dm-user misc device.
-bool Worker::WriteDmUserPayload(size_t size) {
+bool ReadWorker::WriteDmUserPayload(size_t size) {
     size_t payload_size = size;
     void* buf = bufsink_.GetPayloadBufPtr();
     if (header_response_) {
@@ -310,7 +274,7 @@
     return true;
 }
 
-bool Worker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
+bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, size_t read_size) {
     CHECK(read_size <= BLOCK_SZ);
 
     void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
@@ -329,7 +293,7 @@
     return true;
 }
 
-bool Worker::ReadAlignedSector(sector_t sector, size_t sz) {
+bool ReadWorker::ReadAlignedSector(sector_t sector, size_t sz) {
     size_t remaining_size = sz;
     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
     int ret = 0;
@@ -389,7 +353,7 @@
     return true;
 }
 
-int Worker::ReadUnalignedSector(
+int ReadWorker::ReadUnalignedSector(
         sector_t sector, size_t size,
         std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it) {
     size_t skip_sector_size = 0;
@@ -424,7 +388,7 @@
     return std::min(size, (BLOCK_SZ - skip_sector_size));
 }
 
-bool Worker::ReadUnalignedSector(sector_t sector, size_t size) {
+bool ReadWorker::ReadUnalignedSector(sector_t sector, size_t size) {
     bufsink_.ResetBufferOffset();
     std::vector<std::pair<sector_t, const CowOperation*>>& chunk_vec = snapuserd_->GetChunkVec();
 
@@ -563,7 +527,7 @@
     return true;
 }
 
-void Worker::RespondIOError() {
+void ReadWorker::RespondIOError() {
     struct dm_user_header* header = bufsink_.GetHeaderPtr();
     header->type = DM_USER_RESP_ERROR;
     // This is an issue with the dm-user interface. There
@@ -580,7 +544,7 @@
     WriteDmUserPayload(0);
 }
 
-bool Worker::DmuserReadRequest() {
+bool ReadWorker::DmuserReadRequest() {
     struct dm_user_header* header = bufsink_.GetHeaderPtr();
 
     // Unaligned I/O request
@@ -591,7 +555,7 @@
     return ReadAlignedSector(header->sector, header->len);
 }
 
-bool Worker::ProcessIORequest() {
+bool ReadWorker::ProcessIORequest() {
     // Read Header from dm-user misc device. This gives
     // us the sector number for which IO is issued by dm-snapshot device
     struct dm_user_header* header = bufsink_.GetHeaderPtr();
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
new file mode 100644
index 0000000..c3a4c34
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
@@ -0,0 +1,70 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <utility>
+#include <vector>
+
+#include "worker.h"
+
+namespace android {
+namespace snapshot {
+
+class ReadWorker : public Worker {
+  public:
+    ReadWorker(const std::string& cow_device, const std::string& backing_device,
+               const std::string& control_device, const std::string& misc_name,
+               const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
+
+    bool Run();
+    bool Init() override;
+    void CloseFds() override;
+
+  private:
+    // Functions interacting with dm-user
+    bool ProcessIORequest();
+    bool WriteDmUserPayload(size_t size);
+    bool DmuserReadRequest();
+    void RespondIOError();
+
+    bool ProcessCowOp(const CowOperation* cow_op);
+    bool ProcessXorOp(const CowOperation* cow_op);
+    bool ProcessOrderedOp(const CowOperation* cow_op);
+    bool ProcessCopyOp(const CowOperation* cow_op);
+    bool ProcessReplaceOp(const CowOperation* cow_op);
+    bool ProcessZeroOp();
+
+    bool ReadAlignedSector(sector_t sector, size_t sz);
+    bool ReadUnalignedSector(sector_t sector, size_t size);
+    int ReadUnalignedSector(sector_t sector, size_t size,
+                            std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
+    bool ReadFromSourceDevice(const CowOperation* cow_op);
+    bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
+
+    constexpr bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
+    constexpr sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
+
+    std::string backing_store_device_;
+    unique_fd backing_store_fd_;
+
+    std::string control_device_;
+    unique_fd ctrl_fd_;
+
+    XorSink xorsink_;
+    bool header_response_ = false;
+};
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index 8e1212b..e52d752 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -23,6 +23,9 @@
 #include <android-base/scopeguard.h>
 #include <android-base/strings.h>
 
+#include "read_worker.h"
+#include "snapuserd_merge.h"
+
 namespace android {
 namespace snapshot {
 
@@ -46,9 +49,8 @@
 
 bool SnapshotHandler::InitializeWorkers() {
     for (int i = 0; i < num_worker_threads_; i++) {
-        std::unique_ptr<Worker> wt =
-                std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
-                                         misc_name_, base_path_merge_, GetSharedPtr());
+        auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, control_device_,
+                                               misc_name_, base_path_merge_, GetSharedPtr());
         if (!wt->Init()) {
             SNAP_LOG(ERROR) << "Thread initialization failed";
             return false;
@@ -57,8 +59,8 @@
         worker_threads_.push_back(std::move(wt));
     }
 
-    merge_thread_ = std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
-                                             misc_name_, base_path_merge_, GetSharedPtr());
+    merge_thread_ = std::make_unique<MergeWorker>(cow_device_, misc_name_, base_path_merge_,
+                                                  GetSharedPtr());
 
     read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
                                                      GetSharedPtr());
@@ -312,11 +314,11 @@
     // Launch worker threads
     for (int i = 0; i < worker_threads_.size(); i++) {
         threads.emplace_back(
-                std::async(std::launch::async, &Worker::RunThread, worker_threads_[i].get()));
+                std::async(std::launch::async, &ReadWorker::Run, worker_threads_[i].get()));
     }
 
     std::future<bool> merge_thread =
-            std::async(std::launch::async, &Worker::RunMergeThread, merge_thread_.get());
+            std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get());
 
     // Now that the worker threads are up, scan the partitions.
     if (perform_verification_) {
@@ -452,5 +454,11 @@
     return update_verify_->CheckPartitionVerification();
 }
 
+void SnapshotHandler::FreeResources() {
+    worker_threads_.clear();
+    read_ahead_thread_ = nullptr;
+    merge_thread_ = nullptr;
+}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index c984a61..cdc38c0 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -75,7 +75,8 @@
     READ_AHEAD_FAILURE,
 };
 
-class SnapshotHandler;
+class MergeWorker;
+class ReadWorker;
 
 enum class MERGE_GROUP_STATE {
     GROUP_MERGE_PENDING,
@@ -98,102 +99,6 @@
         : merge_state_(state), num_ios_in_progress(n_ios) {}
 };
 
-class Worker {
-  public:
-    Worker(const std::string& cow_device, const std::string& backing_device,
-           const std::string& control_device, const std::string& misc_name,
-           const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
-    bool RunThread();
-    bool RunMergeThread();
-    bool Init();
-
-  private:
-    // Initialization
-    void InitializeBufsink();
-    bool InitializeFds();
-    bool InitReader();
-    void CloseFds() {
-        ctrl_fd_ = {};
-        backing_store_fd_ = {};
-        base_path_merge_fd_ = {};
-    }
-
-    // Functions interacting with dm-user
-    bool WriteDmUserPayload(size_t size);
-    bool DmuserReadRequest();
-
-    // IO Path
-    bool ProcessIORequest();
-    bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
-
-    bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
-    bool ReadFromSourceDevice(const CowOperation* cow_op);
-
-    bool ReadAlignedSector(sector_t sector, size_t sz);
-    bool ReadUnalignedSector(sector_t sector, size_t size);
-    int ReadUnalignedSector(sector_t sector, size_t size,
-                            std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
-    void RespondIOError();
-
-    // Processing COW operations
-    bool ProcessCowOp(const CowOperation* cow_op);
-    bool ProcessReplaceOp(const CowOperation* cow_op);
-    bool ProcessZeroOp();
-
-    // Handles Copy and Xor
-    bool ProcessCopyOp(const CowOperation* cow_op);
-    bool ProcessXorOp(const CowOperation* cow_op);
-    bool ProcessOrderedOp(const CowOperation* cow_op);
-
-    // Merge related ops
-    bool Merge();
-    bool AsyncMerge();
-    bool SyncMerge();
-    bool MergeOrderedOps();
-    bool MergeOrderedOpsAsync();
-    bool MergeReplaceZeroOps();
-    int PrepareMerge(uint64_t* source_offset, int* pending_ops,
-                     std::vector<const CowOperation*>* replace_zero_vec = nullptr);
-
-    sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
-    chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
-
-    bool InitializeIouring();
-    void FinalizeIouring();
-
-    std::unique_ptr<CowReader> reader_;
-    BufferSink bufsink_;
-    XorSink xorsink_;
-
-    std::string cow_device_;
-    std::string backing_store_device_;
-    std::string control_device_;
-    std::string misc_name_;
-    std::string base_path_merge_;
-
-    unique_fd cow_fd_;
-    unique_fd backing_store_fd_;
-    unique_fd base_path_merge_fd_;
-    unique_fd ctrl_fd_;
-    bool header_response_ = false;
-
-    std::unique_ptr<ICowOpIter> cowop_iter_;
-    size_t ra_block_index_ = 0;
-    uint64_t blocks_merged_in_group_ = 0;
-    bool merge_async_ = false;
-    // Queue depth of 8 seems optimal. We don't want
-    // to have a huge depth as it may put more memory pressure
-    // on the kernel worker threads given that we use
-    // IOSQE_ASYNC flag - ASYNC flags can potentially
-    // result in EINTR; Since we don't restart
-    // syscalls and fallback to synchronous I/O, we
-    // don't want huge queue depth
-    int queue_depth_ = 8;
-    std::unique_ptr<struct io_uring> ring_;
-
-    std::shared_ptr<SnapshotHandler> snapuserd_;
-};
-
 class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
   public:
     SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device,
@@ -212,11 +117,7 @@
     bool CommitMerge(int num_merge_ops);
 
     void CloseFds() { cow_fd_ = {}; }
-    void FreeResources() {
-        worker_threads_.clear();
-        read_ahead_thread_ = nullptr;
-        merge_thread_ = nullptr;
-    }
+    void FreeResources();
 
     bool InitializeWorkers();
     std::unique_ptr<CowReader> CloneReaderForWorker();
@@ -315,7 +216,7 @@
     void* mapped_addr_;
     size_t total_mapped_addr_length_;
 
-    std::vector<std::unique_ptr<Worker>> worker_threads_;
+    std::vector<std::unique_ptr<ReadWorker>> worker_threads_;
     // Read-ahead related
     bool populate_data_from_cow_ = false;
     bool ra_thread_ = false;
@@ -330,7 +231,7 @@
     // Merge Block state
     std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
 
-    std::unique_ptr<Worker> merge_thread_;
+    std::unique_ptr<MergeWorker> merge_thread_;
     double merge_completion_percentage_;
 
     bool merge_initiated_ = false;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
index ce95b76..563f6ad 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
@@ -13,6 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "snapuserd_merge.h"
 
 #include "snapuserd_core.h"
 
@@ -23,8 +24,13 @@
 using namespace android::dm;
 using android::base::unique_fd;
 
-int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
-                         std::vector<const CowOperation*>* replace_zero_vec) {
+MergeWorker::MergeWorker(const std::string& cow_device, const std::string& misc_name,
+                         const std::string& base_path_merge,
+                         std::shared_ptr<SnapshotHandler> snapuserd)
+    : Worker(cow_device, misc_name, base_path_merge, snapuserd) {}
+
+int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
+                              std::vector<const CowOperation*>* replace_zero_vec) {
     int num_ops = *pending_ops;
     int nr_consecutive = 0;
     bool checkOrderedOp = (replace_zero_vec == nullptr);
@@ -70,7 +76,7 @@
     return nr_consecutive;
 }
 
-bool Worker::MergeReplaceZeroOps() {
+bool MergeWorker::MergeReplaceZeroOps() {
     // Flush after merging 2MB. Since all ops are independent and there is no
     // dependency between COW ops, we will flush the data and the number
     // of ops merged in COW block device. If there is a crash, we will
@@ -99,17 +105,20 @@
 
         for (size_t i = 0; i < replace_zero_vec.size(); i++) {
             const CowOperation* cow_op = replace_zero_vec[i];
+
+            void* buffer = bufsink_.GetPayloadBuffer(BLOCK_SZ);
+            if (!buffer) {
+                SNAP_LOG(ERROR) << "Failed to acquire buffer in merge";
+                return false;
+            }
             if (cow_op->type == kCowReplaceOp) {
-                if (!ProcessReplaceOp(cow_op)) {
-                    SNAP_LOG(ERROR) << "Merge - ReplaceOp failed for block: " << cow_op->new_block;
+                if (!reader_->ReadData(cow_op, buffer, BLOCK_SZ)) {
+                    SNAP_LOG(ERROR) << "Failed to read COW in merge";
                     return false;
                 }
             } else {
                 CHECK(cow_op->type == kCowZeroOp);
-                if (!ProcessZeroOp()) {
-                    SNAP_LOG(ERROR) << "Merge ZeroOp failed.";
-                    return false;
-                }
+                memset(buffer, 0, BLOCK_SZ);
             }
 
             bufsink_.UpdateBufferOffset(BLOCK_SZ);
@@ -149,7 +158,7 @@
 
         if (snapuserd_->IsIOTerminated()) {
             SNAP_LOG(ERROR)
-                    << "MergeReplaceZeroOps: Worker threads terminated - shutting down merge";
+                    << "MergeReplaceZeroOps: MergeWorker threads terminated - shutting down merge";
             return false;
         }
     }
@@ -173,7 +182,7 @@
     return true;
 }
 
-bool Worker::MergeOrderedOpsAsync() {
+bool MergeWorker::MergeOrderedOpsAsync() {
     void* mapped_addr = snapuserd_->GetMappedAddr();
     void* read_ahead_buffer =
             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
@@ -354,7 +363,7 @@
     return true;
 }
 
-bool Worker::MergeOrderedOps() {
+bool MergeWorker::MergeOrderedOps() {
     void* mapped_addr = snapuserd_->GetMappedAddr();
     void* read_ahead_buffer =
             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
@@ -439,7 +448,7 @@
     return true;
 }
 
-bool Worker::AsyncMerge() {
+bool MergeWorker::AsyncMerge() {
     if (!MergeOrderedOpsAsync()) {
         SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
         // Reset the iter so that we retry the merge
@@ -455,7 +464,7 @@
     return true;
 }
 
-bool Worker::SyncMerge() {
+bool MergeWorker::SyncMerge() {
     if (!MergeOrderedOps()) {
         SNAP_LOG(ERROR) << "Merge failed for ordered ops";
         return false;
@@ -465,7 +474,7 @@
     return true;
 }
 
-bool Worker::Merge() {
+bool MergeWorker::Merge() {
     cowop_iter_ = reader_->GetOpIter(true);
 
     bool retry = false;
@@ -511,7 +520,7 @@
     return true;
 }
 
-bool Worker::InitializeIouring() {
+bool MergeWorker::InitializeIouring() {
     if (!snapuserd_->IsIouringSupported()) {
         return false;
     }
@@ -530,13 +539,13 @@
     return true;
 }
 
-void Worker::FinalizeIouring() {
+void MergeWorker::FinalizeIouring() {
     if (merge_async_) {
         io_uring_queue_exit(ring_.get());
     }
 }
 
-bool Worker::RunMergeThread() {
+bool MergeWorker::Run() {
     SNAP_LOG(DEBUG) << "Waiting for merge begin...";
     if (!snapuserd_->WaitForMergeBegin()) {
         SNAP_LOG(ERROR) << "Merge terminated early...";
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.h
new file mode 100644
index 0000000..f35147f
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.h
@@ -0,0 +1,58 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+#pragma once
+
+#include "worker.h"
+
+#include <liburing.h>
+
+namespace android {
+namespace snapshot {
+
+class MergeWorker : public Worker {
+  public:
+    MergeWorker(const std::string& cow_device, const std::string& misc_name,
+                const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
+    bool Run();
+
+  private:
+    int PrepareMerge(uint64_t* source_offset, int* pending_ops,
+                     std::vector<const CowOperation*>* replace_zero_vec = nullptr);
+    bool MergeReplaceZeroOps();
+    bool MergeOrderedOps();
+    bool MergeOrderedOpsAsync();
+    bool Merge();
+    bool AsyncMerge();
+    bool SyncMerge();
+    bool InitializeIouring();
+    void FinalizeIouring();
+
+  private:
+    std::unique_ptr<ICowOpIter> cowop_iter_;
+    std::unique_ptr<struct io_uring> ring_;
+    size_t ra_block_index_ = 0;
+    uint64_t blocks_merged_in_group_ = 0;
+    bool merge_async_ = false;
+    // Queue depth of 8 seems optimal. We don't want
+    // to have a huge depth as it may put more memory pressure
+    // on the kernel worker threads given that we use
+    // IOSQE_ASYNC flag - ASYNC flags can potentially
+    // result in EINTR; Since we don't restart
+    // syscalls and fallback to synchronous I/O, we
+    // don't want huge queue depth
+    int queue_depth_ = 8;
+};
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.cpp
new file mode 100644
index 0000000..aa15630
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.cpp
@@ -0,0 +1,80 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "worker.h"
+
+#include "snapuserd_core.h"
+
+namespace android {
+namespace snapshot {
+
+Worker::Worker(const std::string& cow_device, const std::string& misc_name,
+               const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd) {
+    cow_device_ = cow_device;
+    misc_name_ = misc_name;
+    base_path_merge_ = base_path_merge;
+    snapuserd_ = snapuserd;
+}
+
+void Worker::InitializeBufsink() {
+    // Allocate the buffer which is used to communicate between
+    // daemon and dm-user. The buffer comprises of header and a fixed payload.
+    // If the dm-user requests a big IO, the IO will be broken into chunks
+    // of PAYLOAD_BUFFER_SZ.
+    size_t buf_size = sizeof(struct dm_user_header) + PAYLOAD_BUFFER_SZ;
+    bufsink_.Initialize(buf_size);
+}
+
+bool Worker::Init() {
+    InitializeBufsink();
+
+    if (!InitializeFds()) {
+        return false;
+    }
+
+    if (!InitReader()) {
+        return false;
+    }
+
+    return true;
+}
+
+bool Worker::InitReader() {
+    reader_ = snapuserd_->CloneReaderForWorker();
+
+    if (!reader_->InitForMerge(std::move(cow_fd_))) {
+        return false;
+    }
+    return true;
+}
+
+bool Worker::InitializeFds() {
+    cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
+    if (cow_fd_ < 0) {
+        SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
+        return false;
+    }
+
+    // Base device used by merge thread
+    base_path_merge_fd_.reset(open(base_path_merge_.c_str(), O_RDWR));
+    if (base_path_merge_fd_ < 0) {
+        SNAP_PLOG(ERROR) << "Open Failed: " << base_path_merge_;
+        return false;
+    }
+
+    return true;
+}
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.h
new file mode 100644
index 0000000..813b159
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/worker.h
@@ -0,0 +1,65 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <stddef.h>
+
+#include <memory>
+#include <string>
+
+#include <android-base/unique_fd.h>
+#include <libsnapshot/cow_reader.h>
+#include <snapuserd/snapuserd_buffer.h>
+#include <snapuserd/snapuserd_kernel.h>
+
+namespace android {
+namespace snapshot {
+
+using android::base::unique_fd;
+
+class SnapshotHandler;
+
+class Worker {
+  public:
+    Worker(const std::string& cow_device, const std::string& misc_name,
+           const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
+    virtual ~Worker() = default;
+
+    virtual bool Init();
+
+  protected:
+    // Initialization
+    void InitializeBufsink();
+    bool InitializeFds();
+    bool InitReader();
+    virtual void CloseFds() { base_path_merge_fd_ = {}; }
+
+    std::unique_ptr<CowReader> reader_;
+    BufferSink bufsink_;
+
+    std::string misc_name_;  // Needed for SNAP_LOG.
+
+    unique_fd base_path_merge_fd_;
+
+    std::shared_ptr<SnapshotHandler> snapuserd_;
+
+  private:
+    std::string cow_device_;
+    std::string base_path_merge_;
+    unique_fd cow_fd_;
+};
+
+}  // namespace snapshot
+}  // namespace android