Merge changes Ie9a781e4,I060788c9,Ie75e9440 into main

* changes:
  snapuserd: Remove dm-user specific code from ReadWorker.
  snapuserd: Add an IBlockServer abstraction around dm-user.
  snapuserd: Rename snapuserd_merge to merge_worker.
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index f63579b..40dcc2a 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -60,13 +60,14 @@
     local_include_dirs: ["include/"],
     srcs: [
         "dm-snapshot-merge/snapuserd.cpp",
-        "dm-snapshot-merge/snapuserd_worker.cpp",
         "dm-snapshot-merge/snapuserd_readahead.cpp",
+        "dm-snapshot-merge/snapuserd_worker.cpp",
+        "dm_user_block_server.cpp",
         "snapuserd_buffer.cpp",
         "user-space-merge/handler_manager.cpp",
+        "user-space-merge/merge_worker.cpp",
         "user-space-merge/read_worker.cpp",
         "user-space-merge/snapuserd_core.cpp",
-        "user-space-merge/snapuserd_merge.cpp",
         "user-space-merge/snapuserd_readahead.cpp",
         "user-space-merge/snapuserd_transitions.cpp",
         "user-space-merge/snapuserd_verify.cpp",
diff --git a/fs_mgr/libsnapshot/snapuserd/dm_user_block_server.cpp b/fs_mgr/libsnapshot/snapuserd/dm_user_block_server.cpp
new file mode 100644
index 0000000..ae62bc6
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/dm_user_block_server.cpp
@@ -0,0 +1,146 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <snapuserd/dm_user_block_server.h>
+
+#include <android-base/file.h>
+#include <android-base/logging.h>
+#include <snapuserd/snapuserd_kernel.h>
+#include "snapuserd_logging.h"
+
+namespace android {
+namespace snapshot {
+
+using android::base::unique_fd;
+
+DmUserBlockServer::DmUserBlockServer(const std::string& misc_name, unique_fd&& ctrl_fd,
+                                     Delegate* delegate, size_t buffer_size)
+    : misc_name_(misc_name), ctrl_fd_(std::move(ctrl_fd)), delegate_(delegate) {
+    buffer_.Initialize(sizeof(struct dm_user_header) + buffer_size);
+}
+
+bool DmUserBlockServer::ProcessRequests() {
+    struct dm_user_header* header = buffer_.GetHeaderPtr();
+    if (!android::base::ReadFully(ctrl_fd_, header, sizeof(*header))) {
+        if (errno != ENOTBLK) {
+            SNAP_PLOG(ERROR) << "Control-read failed";
+        }
+
+        SNAP_PLOG(DEBUG) << "ReadDmUserHeader failed....";
+        return false;
+    }
+
+    SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
+    SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
+    SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
+    SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
+    SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
+
+    if (!ProcessRequest(header)) {
+        if (header->type != DM_USER_RESP_ERROR) {
+            SendError();
+        }
+        return false;
+    }
+    return true;
+}
+
+bool DmUserBlockServer::ProcessRequest(dm_user_header* header) {
+    // Use the same header buffer as the response header.
+    int request_type = header->type;
+    header->type = DM_USER_RESP_SUCCESS;
+    header_response_ = true;
+
+    // Reset the output buffer.
+    buffer_.ResetBufferOffset();
+
+    switch (request_type) {
+        case DM_USER_REQ_MAP_READ:
+            return delegate_->RequestSectors(header->sector, header->len);
+
+        case DM_USER_REQ_MAP_WRITE:
+            // We should not get any write request to dm-user as we mount all
+            // partitions as read-only.
+            SNAP_LOG(ERROR) << "Unexpected write request from dm-user";
+            return false;
+
+        default:
+            SNAP_LOG(ERROR) << "Unexpected request from dm-user: " << request_type;
+            return false;
+    }
+}
+
+void* DmUserBlockServer::GetResponseBuffer(size_t size, size_t to_write) {
+    return buffer_.AcquireBuffer(size, to_write);
+}
+
+bool DmUserBlockServer::SendBufferedIo() {
+    return WriteDmUserPayload(buffer_.GetPayloadBytesWritten());
+}
+
+void DmUserBlockServer::SendError() {
+    struct dm_user_header* header = buffer_.GetHeaderPtr();
+    header->type = DM_USER_RESP_ERROR;
+    // This is an issue with the dm-user interface. There
+    // is no way to propagate the I/O error back to dm-user
+    // if we have already communicated the header back. Header
+    // is responded once at the beginning; however I/O can
+    // be processed in chunks. If we encounter an I/O error
+    // somewhere in the middle of the processing, we can't communicate
+    // this back to dm-user.
+    //
+    // TODO: Fix the interface
+    CHECK(header_response_);
+
+    WriteDmUserPayload(0);
+}
+
+bool DmUserBlockServer::WriteDmUserPayload(size_t size) {
+    size_t payload_size = size;
+    void* buf = buffer_.GetPayloadBufPtr();
+    if (header_response_) {
+        payload_size += sizeof(struct dm_user_header);
+        buf = buffer_.GetBufPtr();
+    }
+
+    if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
+        SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
+        return false;
+    }
+
+    // After the first header is sent in response to a request, we cannot
+    // send any additional headers.
+    header_response_ = false;
+
+    // Reset the buffer for use by the next request.
+    buffer_.ResetBufferOffset();
+    return true;
+}
+
+DmUserBlockServerOpener::DmUserBlockServerOpener(const std::string& misc_name,
+                                                 const std::string& dm_user_path)
+    : misc_name_(misc_name), dm_user_path_(dm_user_path) {}
+
+std::unique_ptr<IBlockServer> DmUserBlockServerOpener::Open(IBlockServer::Delegate* delegate,
+                                                            size_t buffer_size) {
+    unique_fd fd(open(dm_user_path_.c_str(), O_RDWR | O_CLOEXEC));
+    if (fd < 0) {
+        SNAP_PLOG(ERROR) << "Could not open dm-user path: " << dm_user_path_;
+        return nullptr;
+    }
+    return std::make_unique<DmUserBlockServer>(misc_name_, std::move(fd), delegate, buffer_size);
+}
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/block_server.h b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/block_server.h
new file mode 100644
index 0000000..72b73fc
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/block_server.h
@@ -0,0 +1,87 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <memory>
+
+namespace android {
+namespace snapshot {
+
+// These interfaces model the block device driver component of snapuserd (eg,
+// dm-user).
+
+// An open connection to a userspace block device control
+class IBlockServer {
+  public:
+    class Delegate {
+      public:
+        virtual ~Delegate() {}
+
+        // Respond to a request for reading a contiguous run of sectors. This
+        // call should be followed by calls to GetResponseBuffer/CommitBuffer
+        // until the |size| is fulfilled.
+        //
+        // If false is returned, an error will be automatically reported unless
+        // SendError was called.
+        virtual bool RequestSectors(uint64_t sector, uint64_t size) = 0;
+    };
+
+    virtual ~IBlockServer() {}
+
+    // Process I/O requests. This can block the worker thread until either a
+    // request is available or the underlying connection has been destroyed.
+    //
+    // True indicates that one or more requests was processed. False indicates
+    // an unrecoverable condition and processing should stop.
+    virtual bool ProcessRequests() = 0;
+
+    // Return a buffer for fulfilling a RequestSectors request. This buffer
+    // is valid until calling SendBufferedIo. This cannot be called outside
+    // of RequestSectors().
+    //
+    // "to_write" must be <= "size". If it is < size, the excess bytes are
+    // available for writing, but will not be send via SendBufferedIo, and
+    // may be reallocated in the next call to GetResponseBuffer.
+    //
+    // All buffers returned are invalidated after SendBufferedIo or returning
+    // control from RequestSectors.
+    virtual void* GetResponseBuffer(size_t size, size_t to_write) = 0;
+
+    // Send all outstanding buffers to the driver, in order. This should
+    // be called at least once in response to RequestSectors. This returns
+    // ownership of any buffers returned by GetResponseBuffer.
+    //
+    // If false is returned, an error is automatically reported to the driver.
+    virtual bool SendBufferedIo() = 0;
+
+    void* GetResponseBuffer(size_t size) { return GetResponseBuffer(size, size); }
+};
+
+class IBlockServerOpener {
+  public:
+    virtual ~IBlockServerOpener() = default;
+
+    // Open a connection to the service. This is called on the daemon thread.
+    //
+    // buffer_size is the maximum amount of buffered I/O to use.
+    virtual std::unique_ptr<IBlockServer> Open(IBlockServer::Delegate* delegate,
+                                               size_t buffer_size) = 0;
+};
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/include/snapuserd/dm_user_block_server.h b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/dm_user_block_server.h
new file mode 100644
index 0000000..6aecf50
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/include/snapuserd/dm_user_block_server.h
@@ -0,0 +1,63 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <android-base/unique_fd.h>
+
+#include <string>
+
+#include <snapuserd/block_server.h>
+#include <snapuserd/snapuserd_buffer.h>
+
+namespace android {
+namespace snapshot {
+
+class DmUserBlockServer : public IBlockServer {
+  public:
+    DmUserBlockServer(const std::string& misc_name, android::base::unique_fd&& ctrl_fd,
+                      Delegate* delegate, size_t buffer_size);
+
+    bool ProcessRequests() override;
+    void* GetResponseBuffer(size_t size, size_t to_write) override;
+    bool SendBufferedIo() override;
+    void SendError();
+
+  private:
+    bool ProcessRequest(dm_user_header* header);
+    bool WriteDmUserPayload(size_t size);
+
+    std::string misc_name_;
+    android::base::unique_fd ctrl_fd_;
+    Delegate* delegate_;
+
+    // Per-request state.
+    BufferSink buffer_;
+    bool header_response_ = false;
+};
+
+class DmUserBlockServerOpener : public IBlockServerOpener {
+  public:
+    DmUserBlockServerOpener(const std::string& misc_name, const std::string& dm_user_path);
+
+    std::unique_ptr<IBlockServer> Open(IBlockServer::Delegate* delegate,
+                                       size_t buffer_size) override;
+
+  private:
+    std::string misc_name_;
+    std::string dm_user_path_;
+};
+
+}  // namespace snapshot
+}  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_logging.h b/fs_mgr/libsnapshot/snapuserd/snapuserd_logging.h
new file mode 100644
index 0000000..bc470ce
--- /dev/null
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_logging.h
@@ -0,0 +1,20 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#pragma once
+
+#include <android-base/logging.h>
+
+#define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
+#define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
index 4105b4b..041e516 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
@@ -18,9 +18,9 @@
 
 #include <android-base/logging.h>
 
+#include "merge_worker.h"
 #include "read_worker.h"
 #include "snapuserd_core.h"
-#include "snapuserd_merge.h"
 
 namespace android {
 namespace snapshot {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
similarity index 99%
rename from fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
rename to fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
index 517148d..2305a1c 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "snapuserd_merge.h"
+#include "merge_worker.h"
 
 #include "snapuserd_core.h"
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.h
similarity index 100%
rename from fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.h
rename to fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.h
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
index 7d2e3a6..ce4be43 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
@@ -26,18 +26,18 @@
 using android::base::unique_fd;
 
 void ReadWorker::CloseFds() {
-    ctrl_fd_ = {};
+    block_server_ = {};
     backing_store_fd_ = {};
     Worker::CloseFds();
 }
 
 ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing_device,
-                       const std::string& control_device, const std::string& misc_name,
-                       const std::string& base_path_merge,
-                       std::shared_ptr<SnapshotHandler> snapuserd)
+                       const std::string& misc_name, const std::string& base_path_merge,
+                       std::shared_ptr<SnapshotHandler> snapuserd,
+                       std::shared_ptr<IBlockServerOpener> opener)
     : Worker(cow_device, misc_name, base_path_merge, snapuserd),
       backing_store_device_(backing_device),
-      control_device_(control_device) {}
+      block_server_opener_(opener) {}
 
 // Start the replace operation. This will read the
 // internal COW format and if the block is compressed,
@@ -197,9 +197,9 @@
         return false;
     }
 
-    ctrl_fd_.reset(open(control_device_.c_str(), O_RDWR));
-    if (ctrl_fd_ < 0) {
-        SNAP_PLOG(ERROR) << "Unable to open " << control_device_;
+    block_server_ = block_server_opener_->Open(this, PAYLOAD_BUFFER_SZ);
+    if (!block_server_) {
+        SNAP_PLOG(ERROR) << "Unable to open block server";
         return false;
     }
     return true;
@@ -214,7 +214,7 @@
 
     // Start serving IO
     while (true) {
-        if (!ProcessIORequest()) {
+        if (!block_server_->ProcessRequests()) {
             break;
         }
     }
@@ -225,29 +225,6 @@
     return true;
 }
 
-// Send the payload/data back to dm-user misc device.
-bool ReadWorker::WriteDmUserPayload(size_t size) {
-    size_t payload_size = size;
-    void* buf = bufsink_.GetPayloadBufPtr();
-    if (header_response_) {
-        payload_size += sizeof(struct dm_user_header);
-        buf = bufsink_.GetBufPtr();
-    }
-
-    if (!android::base::WriteFully(ctrl_fd_, buf, payload_size)) {
-        SNAP_PLOG(ERROR) << "Write to dm-user failed size: " << payload_size;
-        return false;
-    }
-
-    // After the first header is sent in response to a request, we cannot
-    // send any additional headers.
-    header_response_ = false;
-
-    // Reset the buffer for use by the next request.
-    bufsink_.ResetBufferOffset();
-    return true;
-}
-
 bool ReadWorker::ReadDataFromBaseDevice(sector_t sector, void* buffer, size_t read_size) {
     CHECK(read_size <= BLOCK_SZ);
 
@@ -281,7 +258,7 @@
                                        std::make_pair(sector, nullptr), SnapshotHandler::compare);
             bool not_found = (it == chunk_vec.end() || it->first != sector);
 
-            void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ, size);
+            void* buffer = block_server_->GetResponseBuffer(BLOCK_SZ, size);
             if (!buffer) {
                 SNAP_LOG(ERROR) << "AcquireBuffer failed in ReadAlignedSector";
                 return false;
@@ -334,7 +311,8 @@
     int num_sectors_skip = sector - it->first;
     size_t skip_size = num_sectors_skip << SECTOR_SHIFT;
     size_t write_size = std::min(size, BLOCK_SZ - skip_size);
-    auto buffer = reinterpret_cast<uint8_t*>(bufsink_.AcquireBuffer(BLOCK_SZ, write_size));
+    auto buffer =
+            reinterpret_cast<uint8_t*>(block_server_->GetResponseBuffer(BLOCK_SZ, write_size));
     if (!buffer) {
         SNAP_LOG(ERROR) << "ProcessCowOp failed to allocate buffer";
         return -1;
@@ -462,7 +440,7 @@
         CHECK(diff_size <= BLOCK_SZ);
 
         size_t read_size = std::min(remaining_size, diff_size);
-        void* buffer = bufsink_.AcquireBuffer(BLOCK_SZ, read_size);
+        void* buffer = block_server_->GetResponseBuffer(BLOCK_SZ, read_size);
         if (!buffer) {
             SNAP_LOG(ERROR) << "AcquireBuffer failed in ReadUnalignedSector";
             return false;
@@ -488,88 +466,17 @@
     return true;
 }
 
-void ReadWorker::RespondIOError() {
-    struct dm_user_header* header = bufsink_.GetHeaderPtr();
-    header->type = DM_USER_RESP_ERROR;
-    // This is an issue with the dm-user interface. There
-    // is no way to propagate the I/O error back to dm-user
-    // if we have already communicated the header back. Header
-    // is responded once at the beginning; however I/O can
-    // be processed in chunks. If we encounter an I/O error
-    // somewhere in the middle of the processing, we can't communicate
-    // this back to dm-user.
-    //
-    // TODO: Fix the interface
-    CHECK(header_response_);
-
-    WriteDmUserPayload(0);
-}
-
-bool ReadWorker::DmuserReadRequest() {
-    struct dm_user_header* header = bufsink_.GetHeaderPtr();
-
+bool ReadWorker::RequestSectors(uint64_t sector, uint64_t len) {
     // Unaligned I/O request
-    if (!IsBlockAligned(header->sector << SECTOR_SHIFT)) {
-        return ReadUnalignedSector(header->sector, header->len);
+    if (!IsBlockAligned(sector << SECTOR_SHIFT)) {
+        return ReadUnalignedSector(sector, len);
     }
 
-    return ReadAlignedSector(header->sector, header->len);
+    return ReadAlignedSector(sector, len);
 }
 
 bool ReadWorker::SendBufferedIo() {
-    return WriteDmUserPayload(bufsink_.GetPayloadBytesWritten());
-}
-
-bool ReadWorker::ProcessIORequest() {
-    // Read Header from dm-user misc device. This gives
-    // us the sector number for which IO is issued by dm-snapshot device
-    struct dm_user_header* header = bufsink_.GetHeaderPtr();
-    if (!android::base::ReadFully(ctrl_fd_, header, sizeof(*header))) {
-        if (errno != ENOTBLK) {
-            SNAP_PLOG(ERROR) << "Control-read failed";
-        }
-
-        SNAP_PLOG(DEBUG) << "ReadDmUserHeader failed....";
-        return false;
-    }
-
-    SNAP_LOG(DEBUG) << "Daemon: msg->seq: " << std::dec << header->seq;
-    SNAP_LOG(DEBUG) << "Daemon: msg->len: " << std::dec << header->len;
-    SNAP_LOG(DEBUG) << "Daemon: msg->sector: " << std::dec << header->sector;
-    SNAP_LOG(DEBUG) << "Daemon: msg->type: " << std::dec << header->type;
-    SNAP_LOG(DEBUG) << "Daemon: msg->flags: " << std::dec << header->flags;
-
-    // Use the same header buffer as the response header.
-    int request_type = header->type;
-    header->type = DM_USER_RESP_SUCCESS;
-    header_response_ = true;
-
-    // Reset the output buffer.
-    bufsink_.ResetBufferOffset();
-
-    bool ok;
-    switch (request_type) {
-        case DM_USER_REQ_MAP_READ:
-            ok = DmuserReadRequest();
-            break;
-
-        case DM_USER_REQ_MAP_WRITE:
-            // TODO: We should not get any write request
-            // to dm-user as we mount all partitions
-            // as read-only. Need to verify how are TRIM commands
-            // handled during mount.
-            ok = false;
-            break;
-
-        default:
-            ok = false;
-            break;
-    }
-
-    if (!ok && header->type != DM_USER_RESP_ERROR) {
-        RespondIOError();
-    }
-    return ok;
+    return block_server_->SendBufferedIo();
 }
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
index 8d6f663..a6a3eb8 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
@@ -17,28 +17,26 @@
 #include <utility>
 #include <vector>
 
+#include <snapuserd/block_server.h>
 #include "worker.h"
 
 namespace android {
 namespace snapshot {
 
-class ReadWorker : public Worker {
+class ReadWorker : public Worker, public IBlockServer::Delegate {
   public:
     ReadWorker(const std::string& cow_device, const std::string& backing_device,
-               const std::string& control_device, const std::string& misc_name,
-               const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
+               const std::string& misc_name, const std::string& base_path_merge,
+               std::shared_ptr<SnapshotHandler> snapuserd,
+               std::shared_ptr<IBlockServerOpener> opener);
 
     bool Run();
     bool Init() override;
     void CloseFds() override;
 
   private:
-    // Functions interacting with dm-user
-    bool ProcessIORequest();
-    bool WriteDmUserPayload(size_t size);
-    bool DmuserReadRequest();
+    bool RequestSectors(uint64_t sector, uint64_t size) override;
     bool SendBufferedIo();
-    void RespondIOError();
 
     bool ProcessCowOp(const CowOperation* cow_op, void* buffer);
     bool ProcessXorOp(const CowOperation* cow_op, void* buffer);
@@ -60,10 +58,9 @@
     std::string backing_store_device_;
     unique_fd backing_store_fd_;
 
-    std::string control_device_;
-    unique_fd ctrl_fd_;
+    std::shared_ptr<IBlockServerOpener> block_server_opener_;
+    std::unique_ptr<IBlockServer> block_server_;
 
-    bool header_response_ = false;
     std::basic_string<uint8_t> xor_buffer_;
 };
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index 12d095a..6d3d5c7 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -22,9 +22,10 @@
 #include <android-base/properties.h>
 #include <android-base/scopeguard.h>
 #include <android-base/strings.h>
+#include <snapuserd/dm_user_block_server.h>
 
+#include "merge_worker.h"
 #include "read_worker.h"
-#include "snapuserd_merge.h"
 
 namespace android {
 namespace snapshot {
@@ -48,9 +49,10 @@
 }
 
 bool SnapshotHandler::InitializeWorkers() {
+    auto opener = std::make_shared<DmUserBlockServerOpener>(misc_name_, control_device_);
     for (int i = 0; i < num_worker_threads_; i++) {
-        auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, control_device_,
-                                               misc_name_, base_path_merge_, GetSharedPtr());
+        auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, misc_name_,
+                                               base_path_merge_, GetSharedPtr(), opener);
         if (!wt->Init()) {
             SNAP_LOG(ERROR) << "Thread initialization failed";
             return false;