diff --git a/debuggerd/libdebuggerd/gwp_asan.cpp b/debuggerd/libdebuggerd/gwp_asan.cpp
index 3ee309f..ed2b974 100644
--- a/debuggerd/libdebuggerd/gwp_asan.cpp
+++ b/debuggerd/libdebuggerd/gwp_asan.cpp
@@ -147,7 +147,7 @@
   for (size_t i = 0; i != num_frames; ++i) {
     unwindstack::FrameData frame_data = unwinder->BuildFrameFromPcOnly(frames[i]);
     BacktraceFrame* f = heap_object->add_allocation_backtrace();
-    fill_in_backtrace_frame(f, frame_data, unwinder->GetMaps());
+    fill_in_backtrace_frame(f, frame_data);
   }
 
   heap_object->set_deallocation_tid(__gwp_asan_get_deallocation_thread_id(responsible_allocation_));
@@ -156,7 +156,7 @@
   for (size_t i = 0; i != num_frames; ++i) {
     unwindstack::FrameData frame_data = unwinder->BuildFrameFromPcOnly(frames[i]);
     BacktraceFrame* f = heap_object->add_deallocation_backtrace();
-    fill_in_backtrace_frame(f, frame_data, unwinder->GetMaps());
+    fill_in_backtrace_frame(f, frame_data);
   }
 
   set_human_readable_cause(cause, crash_address_);
diff --git a/debuggerd/libdebuggerd/include/libdebuggerd/tombstone.h b/debuggerd/libdebuggerd/include/libdebuggerd/tombstone.h
index 2331f1e..7bf1688 100644
--- a/debuggerd/libdebuggerd/include/libdebuggerd/tombstone.h
+++ b/debuggerd/libdebuggerd/include/libdebuggerd/tombstone.h
@@ -37,7 +37,6 @@
 
 namespace unwindstack {
 struct FrameData;
-class Maps;
 class Unwinder;
 }
 
@@ -68,8 +67,7 @@
     const Tombstone& tombstone,
     std::function<void(const std::string& line, bool should_log)> callback);
 
-void fill_in_backtrace_frame(BacktraceFrame* f, const unwindstack::FrameData& frame,
-                             unwindstack::Maps* maps);
+void fill_in_backtrace_frame(BacktraceFrame* f, const unwindstack::FrameData& frame);
 void set_human_readable_cause(Cause* cause, uint64_t fault_addr);
 
 #endif  // _DEBUGGERD_TOMBSTONE_H
diff --git a/debuggerd/libdebuggerd/scudo.cpp b/debuggerd/libdebuggerd/scudo.cpp
index a89f385..a2933f2 100644
--- a/debuggerd/libdebuggerd/scudo.cpp
+++ b/debuggerd/libdebuggerd/scudo.cpp
@@ -108,7 +108,7 @@
   for (size_t i = 0; i < arraysize(report->allocation_trace) && report->allocation_trace[i]; ++i) {
     unwindstack::FrameData frame_data = unwinder->BuildFrameFromPcOnly(report->allocation_trace[i]);
     BacktraceFrame* f = heap_object->add_allocation_backtrace();
-    fill_in_backtrace_frame(f, frame_data, unwinder->GetMaps());
+    fill_in_backtrace_frame(f, frame_data);
   }
 
   heap_object->set_deallocation_tid(report->deallocation_tid);
@@ -117,7 +117,7 @@
     unwindstack::FrameData frame_data =
         unwinder->BuildFrameFromPcOnly(report->deallocation_trace[i]);
     BacktraceFrame* f = heap_object->add_deallocation_backtrace();
-    fill_in_backtrace_frame(f, frame_data, unwinder->GetMaps());
+    fill_in_backtrace_frame(f, frame_data);
   }
 
   set_human_readable_cause(cause, untagged_fault_addr_);
diff --git a/debuggerd/libdebuggerd/tombstone_proto.cpp b/debuggerd/libdebuggerd/tombstone_proto.cpp
index b1c4ef3..714d6b2 100644
--- a/debuggerd/libdebuggerd/tombstone_proto.cpp
+++ b/debuggerd/libdebuggerd/tombstone_proto.cpp
@@ -312,8 +312,7 @@
   }
 }
 
-void fill_in_backtrace_frame(BacktraceFrame* f, const unwindstack::FrameData& frame,
-                             unwindstack::Maps* maps) {
+void fill_in_backtrace_frame(BacktraceFrame* f, const unwindstack::FrameData& frame) {
   f->set_rel_pc(frame.rel_pc);
   f->set_pc(frame.pc);
   f->set_sp(frame.sp);
@@ -331,21 +330,20 @@
 
   f->set_function_offset(frame.function_offset);
 
-  if (frame.map_start == frame.map_end) {
+  if (frame.map_info == nullptr) {
     // No valid map associated with this frame.
     f->set_file_name("<unknown>");
-  } else if (!frame.map_name.empty()) {
-    f->set_file_name(frame.map_name);
+    return;
+  }
+
+  if (!frame.map_info->name().empty()) {
+    f->set_file_name(frame.map_info->GetFullName());
   } else {
-    f->set_file_name(StringPrintf("<anonymous:%" PRIx64 ">", frame.map_start));
+    f->set_file_name(StringPrintf("<anonymous:%" PRIx64 ">", frame.map_info->start()));
   }
+  f->set_file_map_offset(frame.map_info->elf_start_offset());
 
-  f->set_file_map_offset(frame.map_elf_start_offset);
-
-  auto map_info = maps->Find(frame.map_start);
-  if (map_info.get() != nullptr) {
-    f->set_build_id(map_info->GetPrintableBuildID());
-  }
+  f->set_build_id(frame.map_info->GetPrintableBuildID());
 }
 
 static void dump_thread(Tombstone* tombstone, unwindstack::Unwinder* unwinder,
@@ -434,7 +432,7 @@
     unwinder->SetDisplayBuildID(true);
     for (const auto& frame : unwinder->frames()) {
       BacktraceFrame* f = thread.add_current_backtrace();
-      fill_in_backtrace_frame(f, frame, maps);
+      fill_in_backtrace_frame(f, frame);
     }
   }
 
diff --git a/fs_mgr/libsnapshot/snapshot.cpp b/fs_mgr/libsnapshot/snapshot.cpp
index 18a9d22..f3de2b4 100644
--- a/fs_mgr/libsnapshot/snapshot.cpp
+++ b/fs_mgr/libsnapshot/snapshot.cpp
@@ -1467,6 +1467,14 @@
     }
 
     RemoveAllUpdateState(lock);
+
+    if (UpdateUsesUserSnapshots(lock) && !device()->IsTestDevice()) {
+        if (snapuserd_client_) {
+            snapuserd_client_->DetachSnapuserd();
+            snapuserd_client_->CloseConnection();
+            snapuserd_client_ = nullptr;
+        }
+    }
 }
 
 void SnapshotManager::AcknowledgeMergeFailure(MergeFailureCode failure_code) {
@@ -3200,7 +3208,7 @@
 
             // Terminate stale daemon if any
             std::unique_ptr<SnapuserdClient> snapuserd_client =
-                    SnapuserdClient::Connect(kSnapuserdSocket, 10s);
+                    SnapuserdClient::Connect(kSnapuserdSocket, 5s);
             if (snapuserd_client) {
                 snapuserd_client->DetachSnapuserd();
                 snapuserd_client->CloseConnection();
diff --git a/fs_mgr/libsnapshot/snapshot_test.cpp b/fs_mgr/libsnapshot/snapshot_test.cpp
index 11cebe1..d76558b 100644
--- a/fs_mgr/libsnapshot/snapshot_test.cpp
+++ b/fs_mgr/libsnapshot/snapshot_test.cpp
@@ -54,6 +54,8 @@
 #include <libsnapshot/mock_snapshot.h>
 
 DEFINE_string(force_config, "", "Force testing mode (dmsnap, vab, vabc) ignoring device config.");
+DEFINE_string(force_iouring_disable, "",
+              "Force testing mode (iouring_disabled) - disable io_uring");
 
 namespace android {
 namespace snapshot {
@@ -2769,10 +2771,22 @@
         }
     }
 
+    if (FLAGS_force_iouring_disable == "iouring_disabled") {
+        if (!android::base::SetProperty("snapuserd.test.io_uring.force_disable", "1")) {
+            return testing::AssertionFailure()
+                   << "Failed to disable property: snapuserd.test.io_uring.disabled";
+        }
+    }
+
     int ret = RUN_ALL_TESTS();
 
     if (FLAGS_force_config == "dmsnap") {
         android::base::SetProperty("snapuserd.test.dm.snapshots", "0");
     }
+
+    if (FLAGS_force_iouring_disable == "iouring_disabled") {
+        android::base::SetProperty("snapuserd.test.io_uring.force_disable", "0");
+    }
+
     return ret;
 }
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index 84bcb94..bc2bceb 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -86,7 +86,9 @@
         "libsnapshot_cow",
         "libz",
         "libext4_utils",
+        "liburing",
     ],
+    include_dirs: ["bionic/libc/kernel"],
 }
 
 cc_binary {
@@ -182,7 +184,10 @@
         "libfs_mgr",
         "libdm",
         "libext4_utils",
+        "liburing",
+        "libgflags",
     ],
+    include_dirs: ["bionic/libc/kernel"],
     header_libs: [
         "libstorage_literals_headers",
         "libfiemap_headers",
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index 95d95cd..5109d82 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -16,6 +16,10 @@
 
 #include "snapuserd_core.h"
 
+#include <sys/utsname.h>
+
+#include <android-base/properties.h>
+#include <android-base/scopeguard.h>
 #include <android-base/strings.h>
 
 namespace android {
@@ -288,6 +292,136 @@
     return ReadMetadata();
 }
 
+void SnapshotHandler::FinalizeIouring() {
+    io_uring_queue_exit(ring_.get());
+}
+
+bool SnapshotHandler::InitializeIouring(int io_depth) {
+    ring_ = std::make_unique<struct io_uring>();
+
+    int ret = io_uring_queue_init(io_depth, ring_.get(), 0);
+    if (ret) {
+        LOG(ERROR) << "io_uring_queue_init failed with ret: " << ret;
+        return false;
+    }
+
+    LOG(INFO) << "io_uring_queue_init success with io_depth: " << io_depth;
+    return true;
+}
+
+bool SnapshotHandler::ReadBlocksAsync(const std::string& dm_block_device,
+                                      const std::string& partition_name, size_t size) {
+    // 64k block size with io_depth of 64 is optimal
+    // for a single thread. We just need a single thread
+    // to read all the blocks from all dynamic partitions.
+    size_t io_depth = 64;
+    size_t bs = (64 * 1024);
+
+    if (!InitializeIouring(io_depth)) {
+        return false;
+    }
+
+    LOG(INFO) << "ReadBlockAsync start "
+              << " Block-device: " << dm_block_device << " Partition-name: " << partition_name
+              << " Size: " << size;
+
+    auto scope_guard = android::base::make_scope_guard([this]() -> void { FinalizeIouring(); });
+
+    std::vector<std::unique_ptr<struct iovec>> vecs;
+    using AlignedBuf = std::unique_ptr<void, decltype(free)*>;
+    std::vector<AlignedBuf> alignedBufVector;
+
+    /*
+     * TODO: We need aligned memory for DIRECT-IO. However, if we do
+     * a DIRECT-IO and verify the blocks then we need to inform
+     * update-verifier that block verification has been done and
+     * there is no need to repeat the same. We are not there yet
+     * as we need to see if there are any boot time improvements doing
+     * a DIRECT-IO.
+     *
+     * Also, we could you the same function post merge for block verification;
+     * again, we can do a DIRECT-IO instead of thrashing page-cache and
+     * hurting other applications.
+     *
+     * For now, we will just create aligned buffers but rely on buffered
+     * I/O until we have perf numbers to justify DIRECT-IO.
+     */
+    for (int i = 0; i < io_depth; i++) {
+        auto iovec = std::make_unique<struct iovec>();
+        vecs.push_back(std::move(iovec));
+
+        struct iovec* iovec_ptr = vecs[i].get();
+
+        if (posix_memalign(&iovec_ptr->iov_base, BLOCK_SZ, bs)) {
+            LOG(ERROR) << "posix_memalign failed";
+            return false;
+        }
+
+        iovec_ptr->iov_len = bs;
+        alignedBufVector.push_back(
+                std::unique_ptr<void, decltype(free)*>(iovec_ptr->iov_base, free));
+    }
+
+    android::base::unique_fd fd(TEMP_FAILURE_RETRY(open(dm_block_device.c_str(), O_RDONLY)));
+    if (fd.get() == -1) {
+        SNAP_PLOG(ERROR) << "File open failed - block-device " << dm_block_device
+                         << " partition-name: " << partition_name;
+        return false;
+    }
+
+    loff_t offset = 0;
+    size_t remain = size;
+    size_t read_sz = io_depth * bs;
+
+    while (remain > 0) {
+        size_t to_read = std::min(remain, read_sz);
+        size_t queue_size = to_read / bs;
+
+        for (int i = 0; i < queue_size; i++) {
+            struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
+            if (!sqe) {
+                SNAP_LOG(ERROR) << "io_uring_get_sqe() failed";
+                return false;
+            }
+
+            struct iovec* iovec_ptr = vecs[i].get();
+
+            io_uring_prep_read(sqe, fd.get(), iovec_ptr->iov_base, iovec_ptr->iov_len, offset);
+            sqe->flags |= IOSQE_ASYNC;
+            offset += bs;
+        }
+
+        int ret = io_uring_submit(ring_.get());
+        if (ret != queue_size) {
+            SNAP_LOG(ERROR) << "submit got: " << ret << " wanted: " << queue_size;
+            return false;
+        }
+
+        for (int i = 0; i < queue_size; i++) {
+            struct io_uring_cqe* cqe;
+
+            int ret = io_uring_wait_cqe(ring_.get(), &cqe);
+            if (ret) {
+                SNAP_PLOG(ERROR) << "wait_cqe failed" << ret;
+                return false;
+            }
+
+            if (cqe->res < 0) {
+                SNAP_LOG(ERROR) << "io failed with res: " << cqe->res;
+                return false;
+            }
+            io_uring_cqe_seen(ring_.get(), cqe);
+        }
+
+        remain -= to_read;
+    }
+
+    LOG(INFO) << "ReadBlockAsync complete: "
+              << " Block-device: " << dm_block_device << " Partition-name: " << partition_name
+              << " Size: " << size;
+    return true;
+}
+
 void SnapshotHandler::ReadBlocksToCache(const std::string& dm_block_device,
                                         const std::string& partition_name, off_t offset,
                                         size_t size) {
@@ -344,17 +478,22 @@
         return;
     }
 
-    int num_threads = 2;
-    size_t num_blocks = dev_sz >> BLOCK_SHIFT;
-    size_t num_blocks_per_thread = num_blocks / num_threads;
-    size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT;
-    off_t offset = 0;
+    if (IsIouringSupported()) {
+        std::async(std::launch::async, &SnapshotHandler::ReadBlocksAsync, this, dm_block_device,
+                   partition_name, dev_sz);
+    } else {
+        int num_threads = 2;
+        size_t num_blocks = dev_sz >> BLOCK_SHIFT;
+        size_t num_blocks_per_thread = num_blocks / num_threads;
+        size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT;
+        off_t offset = 0;
 
-    for (int i = 0; i < num_threads; i++) {
-        std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device,
-                   partition_name, offset, read_sz_per_thread);
+        for (int i = 0; i < num_threads; i++) {
+            std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this,
+                       dm_block_device, partition_name, offset, read_sz_per_thread);
 
-        offset += read_sz_per_thread;
+            offset += read_sz_per_thread;
+        }
     }
 }
 
@@ -513,5 +652,33 @@
     return ra_state;
 }
 
+bool SnapshotHandler::IsIouringSupported() {
+    struct utsname uts;
+    unsigned int major, minor;
+
+    if (android::base::GetBoolProperty("snapuserd.test.io_uring.force_disable", false)) {
+        SNAP_LOG(INFO) << "io_uring disabled for testing";
+        return false;
+    }
+
+    if ((uname(&uts) != 0) || (sscanf(uts.release, "%u.%u", &major, &minor) != 2)) {
+        SNAP_LOG(ERROR) << "Could not parse the kernel version from uname. "
+                        << " io_uring not supported";
+        return false;
+    }
+
+    // We will only support kernels from 5.6 onwards as IOSQE_ASYNC flag and
+    // IO_URING_OP_READ/WRITE opcodes were introduced only on 5.6 kernel
+    if (major >= 5) {
+        if (major == 5 && minor < 6) {
+            return false;
+        }
+    } else {
+        return false;
+    }
+
+    return android::base::GetBoolProperty("ro.virtual_ab.io_uring.enabled", false);
+}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index 1953316..b0f2d65 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -39,6 +39,7 @@
 #include <libdm/dm.h>
 #include <libsnapshot/cow_reader.h>
 #include <libsnapshot/cow_writer.h>
+#include <liburing.h>
 #include <snapuserd/snapuserd_buffer.h>
 #include <snapuserd/snapuserd_kernel.h>
 
@@ -113,6 +114,19 @@
     bool ReconstructDataFromCow();
     void CheckOverlap(const CowOperation* cow_op);
 
+    bool ReadAheadAsyncIO();
+    bool ReapIoCompletions(int pending_ios_to_complete);
+    bool ReadXorData(size_t block_index, size_t xor_op_index,
+                     std::vector<const CowOperation*>& xor_op_vec);
+    void ProcessXorData(size_t& block_xor_index, size_t& xor_index,
+                        std::vector<const CowOperation*>& xor_op_vec, void* buffer,
+                        loff_t& buffer_offset);
+    void UpdateScratchMetadata();
+
+    bool ReadAheadSyncIO();
+    bool InitializeIouring();
+    void FinalizeIouring();
+
     void* read_ahead_buffer_;
     void* metadata_buffer_;
 
@@ -131,7 +145,19 @@
     std::unordered_set<uint64_t> dest_blocks_;
     std::unordered_set<uint64_t> source_blocks_;
     bool overlap_;
+    std::vector<uint64_t> blocks_;
+    int total_blocks_merged_ = 0;
+    std::unique_ptr<uint8_t[]> ra_temp_buffer_;
+    std::unique_ptr<uint8_t[]> ra_temp_meta_buffer_;
     BufferSink bufsink_;
+
+    bool read_ahead_async_ = false;
+    // Queue depth of 32 seems optimal. We don't want
+    // to have a huge depth as it may put more memory pressure
+    // on the kernel worker threads given that we use
+    // IOSQE_ASYNC flag.
+    int queue_depth_ = 32;
+    std::unique_ptr<struct io_uring> ring_;
 };
 
 class Worker {
@@ -185,6 +211,7 @@
     // Merge related ops
     bool Merge();
     bool MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
+    bool MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter);
     bool MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
     int PrepareMerge(uint64_t* source_offset, int* pending_ops,
                      const std::unique_ptr<ICowOpIter>& cowop_iter,
@@ -193,6 +220,9 @@
     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
     chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
 
+    bool InitializeIouring();
+    void FinalizeIouring();
+
     std::unique_ptr<CowReader> reader_;
     BufferSink bufsink_;
     XorSink xorsink_;
@@ -208,6 +238,14 @@
     unique_fd base_path_merge_fd_;
     unique_fd ctrl_fd_;
 
+    bool merge_async_ = false;
+    // Queue depth of 32 seems optimal. We don't want
+    // to have a huge depth as it may put more memory pressure
+    // on the kernel worker threads given that we use
+    // IOSQE_ASYNC flag.
+    int queue_depth_ = 32;
+    std::unique_ptr<struct io_uring> ring_;
+
     std::shared_ptr<SnapshotHandler> snapuserd_;
 };
 
@@ -292,6 +330,8 @@
     bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
     MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer);
 
+    bool IsIouringSupported();
+
   private:
     bool ReadMetadata();
     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
@@ -304,6 +344,11 @@
     void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name,
                            off_t offset, size_t size);
 
+    bool InitializeIouring(int io_depth);
+    void FinalizeIouring();
+    bool ReadBlocksAsync(const std::string& dm_block_device, const std::string& partition_name,
+                         size_t size);
+
     // COW device
     std::string cow_device_;
     // Source device
@@ -352,6 +397,8 @@
     bool attached_ = false;
     bool is_socket_present_;
     bool scratch_space_ = false;
+
+    std::unique_ptr<struct io_uring> ring_;
 };
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
index fa055b7..d4d4efe 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
@@ -72,16 +72,16 @@
 }
 
 bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
-    // Flush every 2048 ops. Since all ops are independent and there is no
+    // Flush every 8192 ops. Since all ops are independent and there is no
     // dependency between COW ops, we will flush the data and the number
-    // of ops merged in COW file for every 2048 ops. If there is a crash,
+    // of ops merged in COW file for every 8192 ops. If there is a crash,
     // we will end up replaying some of the COW ops which were already merged.
     // That is ok.
     //
-    // Why 2048 ops ? We can probably increase this to bigger value but just
-    // need to ensure that merge makes forward progress if there are
-    // crashes repeatedly which is highly unlikely.
-    int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 8;
+    // Why 8192 ops ? Increasing this may improve merge time 3-4 seconds but
+    // we need to make sure that we checkpoint; 8k ops seems optimal. In-case
+    // if there is a crash merge should always make forward progress.
+    int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32;
     int num_ops_merged = 0;
 
     while (!cowop_iter->Done()) {
@@ -128,7 +128,7 @@
 
         num_ops_merged += linear_blocks;
 
-        if (num_ops_merged == total_ops_merged_per_commit) {
+        if (num_ops_merged >= total_ops_merged_per_commit) {
             // Flush the data
             if (fsync(base_path_merge_fd_.get()) < 0) {
                 SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
@@ -172,6 +172,173 @@
     return true;
 }
 
+bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) {
+    void* mapped_addr = snapuserd_->GetMappedAddr();
+    void* read_ahead_buffer =
+            static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
+    size_t block_index = 0;
+
+    SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";
+
+    while (!cowop_iter->Done()) {
+        const CowOperation* cow_op = &cowop_iter->Get();
+        if (!IsOrderedOp(*cow_op)) {
+            break;
+        }
+
+        SNAP_LOG(DEBUG) << "Waiting for merge begin...";
+        // Wait for RA thread to notify that the merge window
+        // is ready for merging.
+        if (!snapuserd_->WaitForMergeBegin()) {
+            snapuserd_->SetMergeFailed(block_index);
+            return false;
+        }
+
+        snapuserd_->SetMergeInProgress(block_index);
+
+        loff_t offset = 0;
+        int num_ops = snapuserd_->GetTotalBlocksToMerge();
+
+        int pending_sqe = queue_depth_;
+        int pending_ios_to_submit = 0;
+        bool flush_required = false;
+
+        SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
+        while (num_ops) {
+            uint64_t source_offset;
+
+            int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter);
+
+            if (linear_blocks != 0) {
+                size_t io_size = (linear_blocks * BLOCK_SZ);
+
+                // Get an SQE entry from the ring and populate the I/O variables
+                struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
+                if (!sqe) {
+                    SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
+                    snapuserd_->SetMergeFailed(block_index);
+                    return false;
+                }
+
+                io_uring_prep_write(sqe, base_path_merge_fd_.get(),
+                                    (char*)read_ahead_buffer + offset, io_size, source_offset);
+
+                offset += io_size;
+                num_ops -= linear_blocks;
+
+                pending_sqe -= 1;
+                pending_ios_to_submit += 1;
+                sqe->flags |= IOSQE_ASYNC;
+            }
+
+            // Ring is full or no more COW ops to be merged in this batch
+            if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
+                // If this is a last set of COW ops to be merged in this batch, we need
+                // to sync the merged data. We will try to grab an SQE entry
+                // and set the FSYNC command; additionally, make sure that
+                // the fsync is done after all the I/O operations queued
+                // in the ring is completed by setting IOSQE_IO_DRAIN.
+                //
+                // If there is no space in the ring, we will flush it later
+                // by explicitly calling fsync() system call.
+                if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
+                    if (pending_sqe != 0) {
+                        struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
+                        if (!sqe) {
+                            // very unlikely but let's continue and not fail the
+                            // merge - we will flush it later
+                            SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
+                            flush_required = true;
+                        } else {
+                            io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0);
+                            // Drain the queue before fsync
+                            io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN);
+                            pending_sqe -= 1;
+                            flush_required = false;
+                            pending_ios_to_submit += 1;
+                            sqe->flags |= IOSQE_ASYNC;
+                        }
+                    } else {
+                        flush_required = true;
+                    }
+                }
+
+                // Submit the IO for all the COW ops in a single syscall
+                int ret = io_uring_submit(ring_.get());
+                if (ret != pending_ios_to_submit) {
+                    SNAP_PLOG(ERROR)
+                            << "io_uring_submit failed for read-ahead: "
+                            << " io submit: " << ret << " expected: " << pending_ios_to_submit;
+                    snapuserd_->SetMergeFailed(block_index);
+                    return false;
+                }
+
+                int pending_ios_to_complete = pending_ios_to_submit;
+                pending_ios_to_submit = 0;
+
+                // Reap I/O completions
+                while (pending_ios_to_complete) {
+                    struct io_uring_cqe* cqe;
+
+                    ret = io_uring_wait_cqe(ring_.get(), &cqe);
+                    if (ret) {
+                        SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
+                        snapuserd_->SetMergeFailed(block_index);
+                        return false;
+                    }
+
+                    if (cqe->res < 0) {
+                        SNAP_LOG(ERROR)
+                                << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
+                        snapuserd_->SetMergeFailed(block_index);
+                        return false;
+                    }
+
+                    io_uring_cqe_seen(ring_.get(), cqe);
+                    pending_ios_to_complete -= 1;
+                }
+
+                pending_sqe = queue_depth_;
+            }
+
+            if (linear_blocks == 0) {
+                break;
+            }
+        }
+
+        // Verify all ops are merged
+        CHECK(num_ops == 0);
+
+        // Flush the data
+        if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
+            SNAP_LOG(ERROR) << " Failed to fsync merged data";
+            snapuserd_->SetMergeFailed(block_index);
+            return false;
+        }
+
+        // Merge is done and data is on disk. Update the COW Header about
+        // the merge completion
+        if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
+            SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
+            snapuserd_->SetMergeFailed(block_index);
+            return false;
+        }
+
+        SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
+        // Mark the block as merge complete
+        snapuserd_->SetMergeCompleted(block_index);
+
+        // Notify RA thread that the merge thread is ready to merge the next
+        // window
+        snapuserd_->NotifyRAForMergeReady();
+
+        // Get the next block
+        block_index += 1;
+    }
+
+    return true;
+}
+
 bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
     void* mapped_addr = snapuserd_->GetMappedAddr();
     void* read_ahead_buffer =
@@ -260,15 +427,23 @@
 bool Worker::Merge() {
     std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
 
-    // Start with Copy and Xor ops
-    if (!MergeOrderedOps(cowop_iter)) {
-        SNAP_LOG(ERROR) << "Merge failed for ordered ops";
-        snapuserd_->MergeFailed();
-        return false;
+    if (merge_async_) {
+        if (!MergeOrderedOpsAsync(cowop_iter)) {
+            SNAP_LOG(ERROR) << "Merge failed for ordered ops";
+            snapuserd_->MergeFailed();
+            return false;
+        }
+        SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed.....";
+    } else {
+        // Start with Copy and Xor ops
+        if (!MergeOrderedOps(cowop_iter)) {
+            SNAP_LOG(ERROR) << "Merge failed for ordered ops";
+            snapuserd_->MergeFailed();
+            return false;
+        }
+        SNAP_LOG(INFO) << "MergeOrderedOps completed.....";
     }
 
-    SNAP_LOG(INFO) << "MergeOrderedOps completed...";
-
     // Replace and Zero ops
     if (!MergeReplaceZeroOps(cowop_iter)) {
         SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
@@ -281,6 +456,31 @@
     return true;
 }
 
+bool Worker::InitializeIouring() {
+    if (!snapuserd_->IsIouringSupported()) {
+        return false;
+    }
+
+    ring_ = std::make_unique<struct io_uring>();
+
+    int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
+    if (ret) {
+        LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret;
+        return false;
+    }
+
+    merge_async_ = true;
+
+    LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_;
+    return true;
+}
+
+void Worker::FinalizeIouring() {
+    if (merge_async_) {
+        io_uring_queue_exit(ring_.get());
+    }
+}
+
 bool Worker::RunMergeThread() {
     SNAP_LOG(DEBUG) << "Waiting for merge begin...";
     if (!snapuserd_->WaitForMergeBegin()) {
@@ -296,10 +496,13 @@
         return false;
     }
 
+    InitializeIouring();
+
     if (!Merge()) {
         return false;
     }
 
+    FinalizeIouring();
     CloseFds();
     reader_->CloseCowFd();
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
index 9e8ccfb..26c5f19 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -183,25 +183,311 @@
     return true;
 }
 
-bool ReadAhead::ReadAheadIOStart() {
-    // Check if the data has to be constructed from the COW file.
-    // This will be true only once during boot up after a crash
-    // during merge.
-    if (snapuserd_->ShouldReconstructDataFromCow()) {
-        return ReconstructDataFromCow();
-    }
+/*
+ * With io_uring, the data flow is slightly different.
+ *
+ * The data flow is as follows:
+ *
+ * 1: Queue the I/O requests to be read from backing source device.
+ * This is done by retrieving the SQE entry from ring and populating
+ * the SQE entry. Note that the I/O is not submitted yet.
+ *
+ * 2: Once the ring is full (aka queue_depth), we will submit all
+ * the queued I/O request with a single system call. This essentially
+ * cuts down "queue_depth" number of system calls to a single system call.
+ *
+ * 3: Once the I/O is submitted, user-space thread will now work
+ * on processing the XOR Operations. This happens in parallel when
+ * I/O requests are submitted to the kernel. This is ok because, for XOR
+ * operations, we first need to retrieve the compressed data form COW block
+ * device. Thus, we have offloaded the backing source I/O to the kernel
+ * and user-space is parallely working on fetching the data for XOR operations.
+ *
+ * 4: After the XOR operations are read from COW device, poll the completion
+ * queue for all the I/O submitted. If the I/O's were already completed,
+ * then user-space thread will just read the CQE requests from the ring
+ * without doing any system call. If none of the I/O were completed yet,
+ * user-space thread will do a system call and wait for I/O completions.
+ *
+ * Flow diagram:
+ *                                                    SQ-RING
+ *  SQE1 <----------- Fetch SQE1 Entry ---------- |SQE1||SQE2|SQE3|
+ *
+ *  SQE1  ------------ Populate SQE1 Entry ------> |SQE1-X||SQE2|SQE3|
+ *
+ *  SQE2 <----------- Fetch SQE2 Entry ---------- |SQE1-X||SQE2|SQE3|
+ *
+ *  SQE2  ------------ Populate SQE2 Entry ------> |SQE1-X||SQE2-X|SQE3|
+ *
+ *  SQE3 <----------- Fetch SQE3 Entry ---------- |SQE1-X||SQE2-X|SQE3|
+ *
+ *  SQE3  ------------ Populate SQE3 Entry ------> |SQE1-X||SQE2-X|SQE3-X|
+ *
+ *  Submit-IO ---------------------------------> |SQE1-X||SQE2-X|SQE3-X|
+ *     |                                                  |
+ *     |                                        Process I/O entries in kernel
+ *     |                                                  |
+ *  Retrieve XOR                                          |
+ *  data from COW                                         |
+ *     |                                                  |
+ *     |                                                  |
+ *  Fetch CQ completions
+ *     |                                              CQ-RING
+ *                                               |CQE1-X||CQE2-X|CQE3-X|
+ *                                                        |
+ *   CQE1 <------------Fetch CQE1 Entry          |CQE1||CQE2-X|CQE3-X|
+ *   CQE2 <------------Fetch CQE2 Entry          |CQE1||CQE2-|CQE3-X|
+ *   CQE3 <------------Fetch CQE3 Entry          |CQE1||CQE2-|CQE3-|
+ *    |
+ *    |
+ *  Continue Next set of operations in the RING
+ */
 
-    std::vector<uint64_t> blocks;
-
+bool ReadAhead::ReadAheadAsyncIO() {
     int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
     loff_t buffer_offset = 0;
-    int total_blocks_merged = 0;
+    total_blocks_merged_ = 0;
     overlap_ = false;
     dest_blocks_.clear();
     source_blocks_.clear();
+    blocks_.clear();
     std::vector<const CowOperation*> xor_op_vec;
 
-    auto ra_temp_buffer = std::make_unique<uint8_t[]>(snapuserd_->GetBufferDataSize());
+    int pending_sqe = queue_depth_;
+    int pending_ios_to_submit = 0;
+
+    size_t xor_op_index = 0;
+    size_t block_index = 0;
+
+    loff_t offset = 0;
+
+    bufsink_.ResetBufferOffset();
+
+    // Number of ops to be merged in this window. This is a fixed size
+    // except for the last window wherein the number of ops can be less
+    // than the size of the RA window.
+    while (num_ops) {
+        uint64_t source_offset;
+        struct io_uring_sqe* sqe;
+
+        int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
+
+        if (linear_blocks != 0) {
+            size_t io_size = (linear_blocks * BLOCK_SZ);
+
+            // Get an SQE entry from the ring and populate the I/O variables
+            sqe = io_uring_get_sqe(ring_.get());
+            if (!sqe) {
+                SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during read-ahead";
+                snapuserd_->ReadAheadIOFailed();
+                return false;
+            }
+
+            io_uring_prep_read(sqe, backing_store_fd_.get(),
+                               (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
+                               source_offset);
+
+            buffer_offset += io_size;
+            num_ops -= linear_blocks;
+            total_blocks_merged_ += linear_blocks;
+
+            pending_sqe -= 1;
+            pending_ios_to_submit += 1;
+            sqe->flags |= IOSQE_ASYNC;
+        }
+
+        // pending_sqe == 0 : Ring is full
+        //
+        // num_ops == 0 : All the COW ops in this batch are processed - Submit
+        // pending I/O requests in the ring
+        //
+        // linear_blocks == 0 : All the COW ops processing is done. Submit
+        // pending I/O requests in the ring
+        if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
+            // Submit the IO for all the COW ops in a single syscall
+            int ret = io_uring_submit(ring_.get());
+            if (ret != pending_ios_to_submit) {
+                SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: "
+                                 << " io submit: " << ret << " expected: " << pending_ios_to_submit;
+                snapuserd_->ReadAheadIOFailed();
+                return false;
+            }
+
+            int pending_ios_to_complete = pending_ios_to_submit;
+            pending_ios_to_submit = 0;
+
+            bool xor_processing_required = (xor_op_vec.size() > 0);
+
+            // Read XOR data from COW file in parallel when I/O's are in-flight
+            if (xor_processing_required && !ReadXorData(block_index, xor_op_index, xor_op_vec)) {
+                SNAP_LOG(ERROR) << "ReadXorData failed";
+                snapuserd_->ReadAheadIOFailed();
+                return false;
+            }
+
+            // Fetch I/O completions
+            if (!ReapIoCompletions(pending_ios_to_complete)) {
+                SNAP_LOG(ERROR) << "ReapIoCompletions failed";
+                snapuserd_->ReadAheadIOFailed();
+                return false;
+            }
+
+            // Retrieve XOR'ed data
+            if (xor_processing_required) {
+                ProcessXorData(block_index, xor_op_index, xor_op_vec, ra_temp_buffer_.get(),
+                               offset);
+            }
+
+            // All the I/O in the ring is processed.
+            pending_sqe = queue_depth_;
+        }
+
+        if (linear_blocks == 0) {
+            break;
+        }
+    }
+
+    // Done with merging ordered ops
+    if (RAIterDone() && total_blocks_merged_ == 0) {
+        return true;
+    }
+
+    CHECK(blocks_.size() == total_blocks_merged_);
+
+    UpdateScratchMetadata();
+
+    return true;
+}
+
+void ReadAhead::UpdateScratchMetadata() {
+    loff_t metadata_offset = 0;
+
+    struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
+            (char*)ra_temp_meta_buffer_.get() + metadata_offset);
+
+    bm->new_block = 0;
+    bm->file_offset = 0;
+
+    loff_t file_offset = snapuserd_->GetBufferDataOffset();
+
+    for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
+        uint64_t new_block = blocks_[block_index];
+        // Track the metadata blocks which are stored in scratch space
+        bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
+                                                       metadata_offset);
+
+        bm->new_block = new_block;
+        bm->file_offset = file_offset;
+
+        metadata_offset += sizeof(struct ScratchMetadata);
+        file_offset += BLOCK_SZ;
+    }
+
+    // This is important - explicitly set the contents to zero. This is used
+    // when re-constructing the data after crash. This indicates end of
+    // reading metadata contents when re-constructing the data
+    bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
+                                                   metadata_offset);
+    bm->new_block = 0;
+    bm->file_offset = 0;
+}
+
+bool ReadAhead::ReapIoCompletions(int pending_ios_to_complete) {
+    // Reap I/O completions
+    while (pending_ios_to_complete) {
+        struct io_uring_cqe* cqe;
+
+        int ret = io_uring_wait_cqe(ring_.get(), &cqe);
+        if (ret) {
+            SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
+            return false;
+        }
+
+        if (cqe->res < 0) {
+            SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
+            return false;
+        }
+
+        io_uring_cqe_seen(ring_.get(), cqe);
+        pending_ios_to_complete -= 1;
+    }
+
+    return true;
+}
+
+void ReadAhead::ProcessXorData(size_t& block_xor_index, size_t& xor_index,
+                               std::vector<const CowOperation*>& xor_op_vec, void* buffer,
+                               loff_t& buffer_offset) {
+    loff_t xor_buf_offset = 0;
+
+    while (block_xor_index < blocks_.size()) {
+        void* bufptr = static_cast<void*>((char*)buffer + buffer_offset);
+        uint64_t new_block = blocks_[block_xor_index];
+
+        if (xor_index < xor_op_vec.size()) {
+            const CowOperation* xor_op = xor_op_vec[xor_index];
+
+            // Check if this block is an XOR op
+            if (xor_op->new_block == new_block) {
+                // Pointer to the data read from base device
+                uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
+                // Get the xor'ed data read from COW device
+                uint8_t* xor_data = reinterpret_cast<uint8_t*>((char*)bufsink_.GetPayloadBufPtr() +
+                                                               xor_buf_offset);
+
+                for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
+                    buffer[byte_offset] ^= xor_data[byte_offset];
+                }
+
+                // Move to next XOR op
+                xor_index += 1;
+                xor_buf_offset += BLOCK_SZ;
+            }
+        }
+
+        buffer_offset += BLOCK_SZ;
+        block_xor_index += 1;
+    }
+
+    bufsink_.ResetBufferOffset();
+}
+
+bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index,
+                            std::vector<const CowOperation*>& xor_op_vec) {
+    // Process the XOR ops in parallel - We will be reading data
+    // from COW file for XOR ops processing.
+    while (block_index < blocks_.size()) {
+        uint64_t new_block = blocks_[block_index];
+
+        if (xor_op_index < xor_op_vec.size()) {
+            const CowOperation* xor_op = xor_op_vec[xor_op_index];
+            if (xor_op->new_block == new_block) {
+                if (!reader_->ReadData(*xor_op, &bufsink_)) {
+                    SNAP_LOG(ERROR)
+                            << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
+                    return false;
+                }
+
+                xor_op_index += 1;
+                bufsink_.UpdateBufferOffset(BLOCK_SZ);
+            }
+        }
+        block_index += 1;
+    }
+    return true;
+}
+
+bool ReadAhead::ReadAheadSyncIO() {
+    int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
+    loff_t buffer_offset = 0;
+    total_blocks_merged_ = 0;
+    overlap_ = false;
+    dest_blocks_.clear();
+    source_blocks_.clear();
+    blocks_.clear();
+    std::vector<const CowOperation*> xor_op_vec;
+
+    bufsink_.ResetBufferOffset();
 
     // Number of ops to be merged in this window. This is a fixed size
     // except for the last window wherein the number of ops can be less
@@ -209,7 +495,7 @@
     while (num_ops) {
         uint64_t source_offset;
 
-        int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks, xor_op_vec);
+        int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
         if (linear_blocks == 0) {
             // No more blocks to read
             SNAP_LOG(DEBUG) << " Read-ahead completed....";
@@ -220,7 +506,7 @@
 
         // Read from the base device consecutive set of blocks in one shot
         if (!android::base::ReadFullyAtOffset(backing_store_fd_,
-                                              (char*)ra_temp_buffer.get() + buffer_offset, io_size,
+                                              (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
                                               source_offset)) {
             SNAP_PLOG(ERROR) << "Ordered-op failed. Read from backing store: "
                              << backing_store_device_ << "at block :" << source_offset / BLOCK_SZ
@@ -233,21 +519,19 @@
         }
 
         buffer_offset += io_size;
-        total_blocks_merged += linear_blocks;
+        total_blocks_merged_ += linear_blocks;
         num_ops -= linear_blocks;
     }
 
     // Done with merging ordered ops
-    if (RAIterDone() && total_blocks_merged == 0) {
+    if (RAIterDone() && total_blocks_merged_ == 0) {
         return true;
     }
 
     loff_t metadata_offset = 0;
 
-    auto ra_temp_meta_buffer = std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
-
     struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
-            (char*)ra_temp_meta_buffer.get() + metadata_offset);
+            (char*)ra_temp_meta_buffer_.get() + metadata_offset);
 
     bm->new_block = 0;
     bm->file_offset = 0;
@@ -255,12 +539,15 @@
     loff_t file_offset = snapuserd_->GetBufferDataOffset();
 
     loff_t offset = 0;
-    CHECK(blocks.size() == total_blocks_merged);
+    CHECK(blocks_.size() == total_blocks_merged_);
 
     size_t xor_index = 0;
-    for (size_t block_index = 0; block_index < blocks.size(); block_index++) {
-        void* bufptr = static_cast<void*>((char*)ra_temp_buffer.get() + offset);
-        uint64_t new_block = blocks[block_index];
+    BufferSink bufsink;
+    bufsink.Initialize(BLOCK_SZ * 2);
+
+    for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
+        void* bufptr = static_cast<void*>((char*)ra_temp_buffer_.get() + offset);
+        uint64_t new_block = blocks_[block_index];
 
         if (xor_index < xor_op_vec.size()) {
             const CowOperation* xor_op = xor_op_vec[xor_index];
@@ -268,17 +555,16 @@
             // Check if this block is an XOR op
             if (xor_op->new_block == new_block) {
                 // Read the xor'ed data from COW
-                if (!reader_->ReadData(*xor_op, &bufsink_)) {
+                if (!reader_->ReadData(*xor_op, &bufsink)) {
                     SNAP_LOG(ERROR)
                             << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
                     snapuserd_->ReadAheadIOFailed();
                     return false;
                 }
-
                 // Pointer to the data read from base device
                 uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
                 // Get the xor'ed data read from COW device
-                uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink_.GetPayloadBufPtr());
+                uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink.GetPayloadBufPtr());
 
                 // Retrieve the original data
                 for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
@@ -292,7 +578,7 @@
 
         offset += BLOCK_SZ;
         // Track the metadata blocks which are stored in scratch space
-        bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer.get() +
+        bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
                                                        metadata_offset);
 
         bm->new_block = new_block;
@@ -308,11 +594,34 @@
     // This is important - explicitly set the contents to zero. This is used
     // when re-constructing the data after crash. This indicates end of
     // reading metadata contents when re-constructing the data
-    bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer.get() +
+    bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
                                                    metadata_offset);
     bm->new_block = 0;
     bm->file_offset = 0;
 
+    return true;
+}
+
+bool ReadAhead::ReadAheadIOStart() {
+    // Check if the data has to be constructed from the COW file.
+    // This will be true only once during boot up after a crash
+    // during merge.
+    if (snapuserd_->ShouldReconstructDataFromCow()) {
+        return ReconstructDataFromCow();
+    }
+
+    if (read_ahead_async_) {
+        if (!ReadAheadAsyncIO()) {
+            SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - io_uring processing failure.";
+            return false;
+        }
+    } else {
+        if (!ReadAheadSyncIO()) {
+            SNAP_LOG(ERROR) << "ReadAheadSyncIO failed";
+            return false;
+        }
+    }
+
     // Wait for the merge to finish for the previous RA window. We shouldn't
     // be touching the scratch space until merge is complete of previous RA
     // window. If there is a crash during this time frame, merge should resume
@@ -322,22 +631,22 @@
     }
 
     // Copy the data to scratch space
-    memcpy(metadata_buffer_, ra_temp_meta_buffer.get(), snapuserd_->GetBufferMetadataSize());
-    memcpy(read_ahead_buffer_, ra_temp_buffer.get(), total_blocks_merged * BLOCK_SZ);
+    memcpy(metadata_buffer_, ra_temp_meta_buffer_.get(), snapuserd_->GetBufferMetadataSize());
+    memcpy(read_ahead_buffer_, ra_temp_buffer_.get(), total_blocks_merged_ * BLOCK_SZ);
 
-    offset = 0;
+    loff_t offset = 0;
     std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
     read_ahead_buffer_map.clear();
 
-    for (size_t block_index = 0; block_index < blocks.size(); block_index++) {
+    for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
         void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + offset);
-        uint64_t new_block = blocks[block_index];
+        uint64_t new_block = blocks_[block_index];
 
         read_ahead_buffer_map[new_block] = bufptr;
         offset += BLOCK_SZ;
     }
 
-    snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
+    snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_);
 
     // Flush the data only if we have a overlapping blocks in the region
     // Notify the Merge thread to resume merging this window
@@ -350,6 +659,33 @@
     return true;
 }
 
+bool ReadAhead::InitializeIouring() {
+    if (!snapuserd_->IsIouringSupported()) {
+        return false;
+    }
+
+    ring_ = std::make_unique<struct io_uring>();
+
+    int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
+    if (ret) {
+        SNAP_LOG(ERROR) << "io_uring_queue_init failed with ret: " << ret;
+        return false;
+    }
+
+    // For xor ops processing
+    bufsink_.Initialize(PAYLOAD_BUFFER_SZ * 2);
+    read_ahead_async_ = true;
+
+    SNAP_LOG(INFO) << "Read-ahead: io_uring initialized with queue depth: " << queue_depth_;
+    return true;
+}
+
+void ReadAhead::FinalizeIouring() {
+    if (read_ahead_async_) {
+        io_uring_queue_exit(ring_.get());
+    }
+}
+
 bool ReadAhead::RunThread() {
     if (!InitializeFds()) {
         return false;
@@ -363,14 +699,18 @@
 
     InitializeRAIter();
 
+    InitializeIouring();
+
     while (!RAIterDone()) {
         if (!ReadAheadIOStart()) {
             break;
         }
     }
 
+    FinalizeIouring();
     CloseFds();
     reader_->CloseCowFd();
+
     SNAP_LOG(INFO) << " ReadAhead thread terminating....";
     return true;
 }
@@ -434,8 +774,9 @@
     metadata_buffer_ =
             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferMetadataOffset());
     read_ahead_buffer_ = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
-    // For xor ops
-    bufsink_.Initialize(PAYLOAD_BUFFER_SZ);
+
+    ra_temp_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferDataSize());
+    ra_temp_meta_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
 }
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
index 1c3e04b..d670f1e 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
@@ -12,6 +12,9 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <android-base/strings.h>
+#include <gflags/gflags.h>
+
 #include <fcntl.h>
 #include <linux/fs.h>
 #include <linux/memfd.h>
@@ -27,6 +30,7 @@
 #include <string_view>
 
 #include <android-base/file.h>
+#include <android-base/properties.h>
 #include <android-base/unique_fd.h>
 #include <fs_mgr/file_wait.h>
 #include <gtest/gtest.h>
@@ -38,6 +42,8 @@
 
 #include "snapuserd_core.h"
 
+DEFINE_string(force_config, "", "Force testing mode with iouring disabled");
+
 namespace android {
 namespace snapshot {
 
@@ -857,5 +863,23 @@
 
 int main(int argc, char** argv) {
     ::testing::InitGoogleTest(&argc, argv);
-    return RUN_ALL_TESTS();
+
+    gflags::ParseCommandLineFlags(&argc, &argv, false);
+
+    android::base::SetProperty("ctl.stop", "snapuserd");
+
+    if (FLAGS_force_config == "iouring_disabled") {
+        if (!android::base::SetProperty("snapuserd.test.io_uring.force_disable", "1")) {
+            return testing::AssertionFailure()
+                   << "Failed to disable property: snapuserd.test.io_uring.disabled";
+        }
+    }
+
+    int ret = RUN_ALL_TESTS();
+
+    if (FLAGS_force_config == "iouring_disabled") {
+        android::base::SetProperty("snapuserd.test.io_uring.force_disable", "0");
+    }
+
+    return ret;
 }
diff --git a/libutils/Unicode_test.cpp b/libutils/Unicode_test.cpp
index b92eef8..8b994d9 100644
--- a/libutils/Unicode_test.cpp
+++ b/libutils/Unicode_test.cpp
@@ -100,7 +100,7 @@
         0xF0, 0x90, 0x80, 0x80, // U+10000, 2 UTF-16 character
     };
 
-    char16_t output[1 + 1 + 1 + 2 + 1]; // Room for NULL
+    char16_t output[1 + 1 + 1 + 2 + 1];  // Room for null
 
     utf8_to_utf16(str, sizeof(str), output, sizeof(output) / sizeof(output[0]));
 
@@ -114,8 +114,7 @@
             << "should be first half of surrogate U+10000";
     EXPECT_EQ(0xDC00, output[4])
             << "should be second half of surrogate U+10000";
-    EXPECT_EQ(NULL, output[5])
-            << "should be NULL terminated";
+    EXPECT_EQ(0, output[5]) << "should be null terminated";
 }
 
 TEST_F(UnicodeTest, strstr16EmptyTarget) {
