snapuserd: Use io_uring api's for snapshot merge

using io_uring READ/WRITE opcodes for snapshot merge.
Specifically, it is used only for readahead and ordered ops
code path.

Snapshot merge perf:

===========================================================

Incremental OTA of 300M between two git_master branches on Pixel 6:

===========================================================

On Android S (with dm-snapshot): ~15 minutes:

update_engine: [INFO:cleanup_previous_update_action.cc(330)] Merge finished with state MergeCompleted.
update_engine: [INFO:cleanup_previous_update_action.cc(130)] Stopping/suspending/completing CleanupPreviousUpdateAction
update_engine: [INFO:cleanup_previous_update_action.cc(501)] Reporting merge stats: MergeCompleted in 926508ms (resumed 0 times), using 0 bytes of COW image.

===========================================================

On Android T (with io_uring: ~38 seconds):

update_engine: [INFO:cleanup_previous_update_action.cc(330)] Merge finished with state MergeCompleted.
update_engine: [INFO:cleanup_previous_update_action.cc(130)] Stopping/suspending/completing CleanupPreviousUpdateAction
update_engine: [INFO:cleanup_previous_update_action.cc(501)] Reporting merge stats: MergeCompleted in 38868ms (resumed 0 times), using 0 bytes of COW image.

===========================================================

Bug: 202784286
Test: Full/Incremental OTA
Signed-off-by: Akilesh Kailash <akailash@google.com>
Change-Id: I24ed3ae16730d0a18be0350c162dc67e1a7b74e1
diff --git a/fs_mgr/libsnapshot/snapuserd/Android.bp b/fs_mgr/libsnapshot/snapuserd/Android.bp
index 84bcb94..ef2233d 100644
--- a/fs_mgr/libsnapshot/snapuserd/Android.bp
+++ b/fs_mgr/libsnapshot/snapuserd/Android.bp
@@ -86,7 +86,9 @@
         "libsnapshot_cow",
         "libz",
         "libext4_utils",
+        "liburing",
     ],
+    include_dirs: ["bionic/libc/kernel"],
 }
 
 cc_binary {
@@ -182,7 +184,9 @@
         "libfs_mgr",
         "libdm",
         "libext4_utils",
+        "liburing",
     ],
+    include_dirs: ["bionic/libc/kernel"],
     header_libs: [
         "libstorage_literals_headers",
         "libfiemap_headers",
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index 95d95cd..2c84ff9 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -16,6 +16,9 @@
 
 #include "snapuserd_core.h"
 
+#include <sys/utsname.h>
+
+#include <android-base/properties.h>
 #include <android-base/strings.h>
 
 namespace android {
@@ -513,5 +516,28 @@
     return ra_state;
 }
 
+bool SnapshotHandler::IsIouringSupported() {
+    struct utsname uts;
+    unsigned int major, minor;
+
+    if ((uname(&uts) != 0) || (sscanf(uts.release, "%u.%u", &major, &minor) != 2)) {
+        SNAP_LOG(ERROR) << "Could not parse the kernel version from uname. "
+                        << " io_uring not supported";
+        return false;
+    }
+
+    // We will only support kernels from 5.6 onwards as IOSQE_ASYNC flag and
+    // IO_URING_OP_READ/WRITE opcodes were introduced only on 5.6 kernel
+    if (major >= 5) {
+        if (major == 5 && minor < 6) {
+            return false;
+        }
+    } else {
+        return false;
+    }
+
+    return android::base::GetBoolProperty("ro.virtual_ab.io_uring.enabled", false);
+}
+
 }  // namespace snapshot
 }  // namespace android
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index 1953316..f36866a 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -39,6 +39,7 @@
 #include <libdm/dm.h>
 #include <libsnapshot/cow_reader.h>
 #include <libsnapshot/cow_writer.h>
+#include <liburing.h>
 #include <snapuserd/snapuserd_buffer.h>
 #include <snapuserd/snapuserd_kernel.h>
 
@@ -113,6 +114,19 @@
     bool ReconstructDataFromCow();
     void CheckOverlap(const CowOperation* cow_op);
 
+    bool ReadAheadAsyncIO();
+    bool ReapIoCompletions(int pending_ios_to_complete);
+    bool ReadXorData(size_t block_index, size_t xor_op_index,
+                     std::vector<const CowOperation*>& xor_op_vec);
+    void ProcessXorData(size_t& block_xor_index, size_t& xor_index,
+                        std::vector<const CowOperation*>& xor_op_vec, void* buffer,
+                        loff_t& buffer_offset);
+    void UpdateScratchMetadata();
+
+    bool ReadAheadSyncIO();
+    bool InitializeIouring();
+    void FinalizeIouring();
+
     void* read_ahead_buffer_;
     void* metadata_buffer_;
 
@@ -131,7 +145,19 @@
     std::unordered_set<uint64_t> dest_blocks_;
     std::unordered_set<uint64_t> source_blocks_;
     bool overlap_;
+    std::vector<uint64_t> blocks_;
+    int total_blocks_merged_ = 0;
+    std::unique_ptr<uint8_t[]> ra_temp_buffer_;
+    std::unique_ptr<uint8_t[]> ra_temp_meta_buffer_;
     BufferSink bufsink_;
+
+    bool read_ahead_async_ = false;
+    // Queue depth of 32 seems optimal. We don't want
+    // to have a huge depth as it may put more memory pressure
+    // on the kernel worker threads given that we use
+    // IOSQE_ASYNC flag.
+    int queue_depth_ = 32;
+    std::unique_ptr<struct io_uring> ring_;
 };
 
 class Worker {
@@ -185,6 +211,7 @@
     // Merge related ops
     bool Merge();
     bool MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
+    bool MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter);
     bool MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
     int PrepareMerge(uint64_t* source_offset, int* pending_ops,
                      const std::unique_ptr<ICowOpIter>& cowop_iter,
@@ -193,6 +220,9 @@
     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
     chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
 
+    bool InitializeIouring();
+    void FinalizeIouring();
+
     std::unique_ptr<CowReader> reader_;
     BufferSink bufsink_;
     XorSink xorsink_;
@@ -208,6 +238,14 @@
     unique_fd base_path_merge_fd_;
     unique_fd ctrl_fd_;
 
+    bool merge_async_ = false;
+    // Queue depth of 32 seems optimal. We don't want
+    // to have a huge depth as it may put more memory pressure
+    // on the kernel worker threads given that we use
+    // IOSQE_ASYNC flag.
+    int queue_depth_ = 32;
+    std::unique_ptr<struct io_uring> ring_;
+
     std::shared_ptr<SnapshotHandler> snapuserd_;
 };
 
@@ -292,6 +330,8 @@
     bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
     MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer);
 
+    bool IsIouringSupported();
+
   private:
     bool ReadMetadata();
     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
index fa055b7..d4d4efe 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
@@ -72,16 +72,16 @@
 }
 
 bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
-    // Flush every 2048 ops. Since all ops are independent and there is no
+    // Flush every 8192 ops. Since all ops are independent and there is no
     // dependency between COW ops, we will flush the data and the number
-    // of ops merged in COW file for every 2048 ops. If there is a crash,
+    // of ops merged in COW file for every 8192 ops. If there is a crash,
     // we will end up replaying some of the COW ops which were already merged.
     // That is ok.
     //
-    // Why 2048 ops ? We can probably increase this to bigger value but just
-    // need to ensure that merge makes forward progress if there are
-    // crashes repeatedly which is highly unlikely.
-    int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 8;
+    // Why 8192 ops ? Increasing this may improve merge time 3-4 seconds but
+    // we need to make sure that we checkpoint; 8k ops seems optimal. In-case
+    // if there is a crash merge should always make forward progress.
+    int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32;
     int num_ops_merged = 0;
 
     while (!cowop_iter->Done()) {
@@ -128,7 +128,7 @@
 
         num_ops_merged += linear_blocks;
 
-        if (num_ops_merged == total_ops_merged_per_commit) {
+        if (num_ops_merged >= total_ops_merged_per_commit) {
             // Flush the data
             if (fsync(base_path_merge_fd_.get()) < 0) {
                 SNAP_LOG(ERROR) << "Merge: ReplaceZeroOps: Failed to fsync merged data";
@@ -172,6 +172,173 @@
     return true;
 }
 
+bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) {
+    void* mapped_addr = snapuserd_->GetMappedAddr();
+    void* read_ahead_buffer =
+            static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
+    size_t block_index = 0;
+
+    SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";
+
+    while (!cowop_iter->Done()) {
+        const CowOperation* cow_op = &cowop_iter->Get();
+        if (!IsOrderedOp(*cow_op)) {
+            break;
+        }
+
+        SNAP_LOG(DEBUG) << "Waiting for merge begin...";
+        // Wait for RA thread to notify that the merge window
+        // is ready for merging.
+        if (!snapuserd_->WaitForMergeBegin()) {
+            snapuserd_->SetMergeFailed(block_index);
+            return false;
+        }
+
+        snapuserd_->SetMergeInProgress(block_index);
+
+        loff_t offset = 0;
+        int num_ops = snapuserd_->GetTotalBlocksToMerge();
+
+        int pending_sqe = queue_depth_;
+        int pending_ios_to_submit = 0;
+        bool flush_required = false;
+
+        SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
+        while (num_ops) {
+            uint64_t source_offset;
+
+            int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter);
+
+            if (linear_blocks != 0) {
+                size_t io_size = (linear_blocks * BLOCK_SZ);
+
+                // Get an SQE entry from the ring and populate the I/O variables
+                struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
+                if (!sqe) {
+                    SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
+                    snapuserd_->SetMergeFailed(block_index);
+                    return false;
+                }
+
+                io_uring_prep_write(sqe, base_path_merge_fd_.get(),
+                                    (char*)read_ahead_buffer + offset, io_size, source_offset);
+
+                offset += io_size;
+                num_ops -= linear_blocks;
+
+                pending_sqe -= 1;
+                pending_ios_to_submit += 1;
+                sqe->flags |= IOSQE_ASYNC;
+            }
+
+            // Ring is full or no more COW ops to be merged in this batch
+            if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
+                // If this is a last set of COW ops to be merged in this batch, we need
+                // to sync the merged data. We will try to grab an SQE entry
+                // and set the FSYNC command; additionally, make sure that
+                // the fsync is done after all the I/O operations queued
+                // in the ring is completed by setting IOSQE_IO_DRAIN.
+                //
+                // If there is no space in the ring, we will flush it later
+                // by explicitly calling fsync() system call.
+                if (num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
+                    if (pending_sqe != 0) {
+                        struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
+                        if (!sqe) {
+                            // very unlikely but let's continue and not fail the
+                            // merge - we will flush it later
+                            SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
+                            flush_required = true;
+                        } else {
+                            io_uring_prep_fsync(sqe, base_path_merge_fd_.get(), 0);
+                            // Drain the queue before fsync
+                            io_uring_sqe_set_flags(sqe, IOSQE_IO_DRAIN);
+                            pending_sqe -= 1;
+                            flush_required = false;
+                            pending_ios_to_submit += 1;
+                            sqe->flags |= IOSQE_ASYNC;
+                        }
+                    } else {
+                        flush_required = true;
+                    }
+                }
+
+                // Submit the IO for all the COW ops in a single syscall
+                int ret = io_uring_submit(ring_.get());
+                if (ret != pending_ios_to_submit) {
+                    SNAP_PLOG(ERROR)
+                            << "io_uring_submit failed for read-ahead: "
+                            << " io submit: " << ret << " expected: " << pending_ios_to_submit;
+                    snapuserd_->SetMergeFailed(block_index);
+                    return false;
+                }
+
+                int pending_ios_to_complete = pending_ios_to_submit;
+                pending_ios_to_submit = 0;
+
+                // Reap I/O completions
+                while (pending_ios_to_complete) {
+                    struct io_uring_cqe* cqe;
+
+                    ret = io_uring_wait_cqe(ring_.get(), &cqe);
+                    if (ret) {
+                        SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
+                        snapuserd_->SetMergeFailed(block_index);
+                        return false;
+                    }
+
+                    if (cqe->res < 0) {
+                        SNAP_LOG(ERROR)
+                                << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
+                        snapuserd_->SetMergeFailed(block_index);
+                        return false;
+                    }
+
+                    io_uring_cqe_seen(ring_.get(), cqe);
+                    pending_ios_to_complete -= 1;
+                }
+
+                pending_sqe = queue_depth_;
+            }
+
+            if (linear_blocks == 0) {
+                break;
+            }
+        }
+
+        // Verify all ops are merged
+        CHECK(num_ops == 0);
+
+        // Flush the data
+        if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
+            SNAP_LOG(ERROR) << " Failed to fsync merged data";
+            snapuserd_->SetMergeFailed(block_index);
+            return false;
+        }
+
+        // Merge is done and data is on disk. Update the COW Header about
+        // the merge completion
+        if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
+            SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
+            snapuserd_->SetMergeFailed(block_index);
+            return false;
+        }
+
+        SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
+        // Mark the block as merge complete
+        snapuserd_->SetMergeCompleted(block_index);
+
+        // Notify RA thread that the merge thread is ready to merge the next
+        // window
+        snapuserd_->NotifyRAForMergeReady();
+
+        // Get the next block
+        block_index += 1;
+    }
+
+    return true;
+}
+
 bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
     void* mapped_addr = snapuserd_->GetMappedAddr();
     void* read_ahead_buffer =
@@ -260,15 +427,23 @@
 bool Worker::Merge() {
     std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
 
-    // Start with Copy and Xor ops
-    if (!MergeOrderedOps(cowop_iter)) {
-        SNAP_LOG(ERROR) << "Merge failed for ordered ops";
-        snapuserd_->MergeFailed();
-        return false;
+    if (merge_async_) {
+        if (!MergeOrderedOpsAsync(cowop_iter)) {
+            SNAP_LOG(ERROR) << "Merge failed for ordered ops";
+            snapuserd_->MergeFailed();
+            return false;
+        }
+        SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed.....";
+    } else {
+        // Start with Copy and Xor ops
+        if (!MergeOrderedOps(cowop_iter)) {
+            SNAP_LOG(ERROR) << "Merge failed for ordered ops";
+            snapuserd_->MergeFailed();
+            return false;
+        }
+        SNAP_LOG(INFO) << "MergeOrderedOps completed.....";
     }
 
-    SNAP_LOG(INFO) << "MergeOrderedOps completed...";
-
     // Replace and Zero ops
     if (!MergeReplaceZeroOps(cowop_iter)) {
         SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
@@ -281,6 +456,31 @@
     return true;
 }
 
+bool Worker::InitializeIouring() {
+    if (!snapuserd_->IsIouringSupported()) {
+        return false;
+    }
+
+    ring_ = std::make_unique<struct io_uring>();
+
+    int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
+    if (ret) {
+        LOG(ERROR) << "Merge: io_uring_queue_init failed with ret: " << ret;
+        return false;
+    }
+
+    merge_async_ = true;
+
+    LOG(INFO) << "Merge: io_uring initialized with queue depth: " << queue_depth_;
+    return true;
+}
+
+void Worker::FinalizeIouring() {
+    if (merge_async_) {
+        io_uring_queue_exit(ring_.get());
+    }
+}
+
 bool Worker::RunMergeThread() {
     SNAP_LOG(DEBUG) << "Waiting for merge begin...";
     if (!snapuserd_->WaitForMergeBegin()) {
@@ -296,10 +496,13 @@
         return false;
     }
 
+    InitializeIouring();
+
     if (!Merge()) {
         return false;
     }
 
+    FinalizeIouring();
     CloseFds();
     reader_->CloseCowFd();
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
index 9e8ccfb..90f87e7 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -183,25 +183,311 @@
     return true;
 }
 
-bool ReadAhead::ReadAheadIOStart() {
-    // Check if the data has to be constructed from the COW file.
-    // This will be true only once during boot up after a crash
-    // during merge.
-    if (snapuserd_->ShouldReconstructDataFromCow()) {
-        return ReconstructDataFromCow();
-    }
+/*
+ * With io_uring, the data flow is slightly different.
+ *
+ * The data flow is as follows:
+ *
+ * 1: Queue the I/O requests to be read from backing source device.
+ * This is done by retrieving the SQE entry from ring and populating
+ * the SQE entry. Note that the I/O is not submitted yet.
+ *
+ * 2: Once the ring is full (aka queue_depth), we will submit all
+ * the queued I/O request with a single system call. This essentially
+ * cuts down "queue_depth" number of system calls to a single system call.
+ *
+ * 3: Once the I/O is submitted, user-space thread will now work
+ * on processing the XOR Operations. This happens in parallel when
+ * I/O requests are submitted to the kernel. This is ok because, for XOR
+ * operations, we first need to retrieve the compressed data form COW block
+ * device. Thus, we have offloaded the backing source I/O to the kernel
+ * and user-space is parallely working on fetching the data for XOR operations.
+ *
+ * 4: After the XOR operations are read from COW device, poll the completion
+ * queue for all the I/O submitted. If the I/O's were already completed,
+ * then user-space thread will just read the CQE requests from the ring
+ * without doing any system call. If none of the I/O were completed yet,
+ * user-space thread will do a system call and wait for I/O completions.
+ *
+ * Flow diagram:
+ *                                                    SQ-RING
+ *  SQE1 <----------- Fetch SQE1 Entry ---------- |SQE1||SQE2|SQE3|
+ *
+ *  SQE1  ------------ Populate SQE1 Entry ------> |SQE1-X||SQE2|SQE3|
+ *
+ *  SQE2 <----------- Fetch SQE2 Entry ---------- |SQE1-X||SQE2|SQE3|
+ *
+ *  SQE2  ------------ Populate SQE2 Entry ------> |SQE1-X||SQE2-X|SQE3|
+ *
+ *  SQE3 <----------- Fetch SQE3 Entry ---------- |SQE1-X||SQE2-X|SQE3|
+ *
+ *  SQE3  ------------ Populate SQE3 Entry ------> |SQE1-X||SQE2-X|SQE3-X|
+ *
+ *  Submit-IO ---------------------------------> |SQE1-X||SQE2-X|SQE3-X|
+ *     |                                                  |
+ *     |                                        Process I/O entries in kernel
+ *     |                                                  |
+ *  Retrieve XOR                                          |
+ *  data from COW                                         |
+ *     |                                                  |
+ *     |                                                  |
+ *  Fetch CQ completions
+ *     |                                              CQ-RING
+ *                                               |CQE1-X||CQE2-X|CQE3-X|
+ *                                                        |
+ *   CQE1 <------------Fetch CQE1 Entry          |CQE1||CQE2-X|CQE3-X|
+ *   CQE2 <------------Fetch CQE2 Entry          |CQE1||CQE2-|CQE3-X|
+ *   CQE3 <------------Fetch CQE3 Entry          |CQE1||CQE2-|CQE3-|
+ *    |
+ *    |
+ *  Continue Next set of operations in the RING
+ */
 
-    std::vector<uint64_t> blocks;
-
+bool ReadAhead::ReadAheadAsyncIO() {
     int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
     loff_t buffer_offset = 0;
-    int total_blocks_merged = 0;
+    total_blocks_merged_ = 0;
     overlap_ = false;
     dest_blocks_.clear();
     source_blocks_.clear();
+    blocks_.clear();
     std::vector<const CowOperation*> xor_op_vec;
 
-    auto ra_temp_buffer = std::make_unique<uint8_t[]>(snapuserd_->GetBufferDataSize());
+    int pending_sqe = queue_depth_;
+    int pending_ios_to_submit = 0;
+
+    size_t xor_op_index = 0;
+    size_t block_index = 0;
+
+    loff_t offset = 0;
+
+    bufsink_.ResetBufferOffset();
+
+    // Number of ops to be merged in this window. This is a fixed size
+    // except for the last window wherein the number of ops can be less
+    // than the size of the RA window.
+    while (num_ops) {
+        uint64_t source_offset;
+        struct io_uring_sqe* sqe;
+
+        int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
+
+        if (linear_blocks != 0) {
+            size_t io_size = (linear_blocks * BLOCK_SZ);
+
+            // Get an SQE entry from the ring and populate the I/O variables
+            sqe = io_uring_get_sqe(ring_.get());
+            if (!sqe) {
+                SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during read-ahead";
+                snapuserd_->ReadAheadIOFailed();
+                return false;
+            }
+
+            io_uring_prep_read(sqe, backing_store_fd_.get(),
+                               (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
+                               source_offset);
+
+            buffer_offset += io_size;
+            num_ops -= linear_blocks;
+            total_blocks_merged_ += linear_blocks;
+
+            pending_sqe -= 1;
+            pending_ios_to_submit += 1;
+            sqe->flags |= IOSQE_ASYNC;
+        }
+
+        // pending_sqe == 0 : Ring is full
+        //
+        // num_ops == 0 : All the COW ops in this batch are processed - Submit
+        // pending I/O requests in the ring
+        //
+        // linear_blocks == 0 : All the COW ops processing is done. Submit
+        // pending I/O requests in the ring
+        if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
+            // Submit the IO for all the COW ops in a single syscall
+            int ret = io_uring_submit(ring_.get());
+            if (ret != pending_ios_to_submit) {
+                SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: "
+                                 << " io submit: " << ret << " expected: " << pending_ios_to_submit;
+                snapuserd_->ReadAheadIOFailed();
+                return false;
+            }
+
+            int pending_ios_to_complete = pending_ios_to_submit;
+            pending_ios_to_submit = 0;
+
+            bool xor_processing_required = (xor_op_vec.size() > 0);
+
+            // Read XOR data from COW file in parallel when I/O's are in-flight
+            if (xor_processing_required && !ReadXorData(block_index, xor_op_index, xor_op_vec)) {
+                SNAP_LOG(ERROR) << "ReadXorData failed";
+                snapuserd_->ReadAheadIOFailed();
+                return false;
+            }
+
+            // Fetch I/O completions
+            if (!ReapIoCompletions(pending_ios_to_complete)) {
+                SNAP_LOG(ERROR) << "ReapIoCompletions failed";
+                snapuserd_->ReadAheadIOFailed();
+                return false;
+            }
+
+            // Retrieve XOR'ed data
+            if (xor_processing_required) {
+                ProcessXorData(block_index, xor_op_index, xor_op_vec, ra_temp_buffer_.get(),
+                               offset);
+            }
+
+            // All the I/O in the ring is processed.
+            pending_sqe = queue_depth_;
+        }
+
+        if (linear_blocks == 0) {
+            break;
+        }
+    }
+
+    // Done with merging ordered ops
+    if (RAIterDone() && total_blocks_merged_ == 0) {
+        return true;
+    }
+
+    CHECK(blocks_.size() == total_blocks_merged_);
+
+    UpdateScratchMetadata();
+
+    return true;
+}
+
+void ReadAhead::UpdateScratchMetadata() {
+    loff_t metadata_offset = 0;
+
+    struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
+            (char*)ra_temp_meta_buffer_.get() + metadata_offset);
+
+    bm->new_block = 0;
+    bm->file_offset = 0;
+
+    loff_t file_offset = snapuserd_->GetBufferDataOffset();
+
+    for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
+        uint64_t new_block = blocks_[block_index];
+        // Track the metadata blocks which are stored in scratch space
+        bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
+                                                       metadata_offset);
+
+        bm->new_block = new_block;
+        bm->file_offset = file_offset;
+
+        metadata_offset += sizeof(struct ScratchMetadata);
+        file_offset += BLOCK_SZ;
+    }
+
+    // This is important - explicitly set the contents to zero. This is used
+    // when re-constructing the data after crash. This indicates end of
+    // reading metadata contents when re-constructing the data
+    bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
+                                                   metadata_offset);
+    bm->new_block = 0;
+    bm->file_offset = 0;
+}
+
+bool ReadAhead::ReapIoCompletions(int pending_ios_to_complete) {
+    // Reap I/O completions
+    while (pending_ios_to_complete) {
+        struct io_uring_cqe* cqe;
+
+        int ret = io_uring_wait_cqe(ring_.get(), &cqe);
+        if (ret) {
+            SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
+            return false;
+        }
+
+        if (cqe->res < 0) {
+            SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
+            return false;
+        }
+
+        io_uring_cqe_seen(ring_.get(), cqe);
+        pending_ios_to_complete -= 1;
+    }
+
+    return true;
+}
+
+void ReadAhead::ProcessXorData(size_t& block_xor_index, size_t& xor_index,
+                               std::vector<const CowOperation*>& xor_op_vec, void* buffer,
+                               loff_t& buffer_offset) {
+    loff_t xor_buf_offset = 0;
+
+    while (block_xor_index < blocks_.size()) {
+        void* bufptr = static_cast<void*>((char*)buffer + buffer_offset);
+        uint64_t new_block = blocks_[block_xor_index];
+
+        if (xor_index < xor_op_vec.size()) {
+            const CowOperation* xor_op = xor_op_vec[xor_index];
+
+            // Check if this block is an XOR op
+            if (xor_op->new_block == new_block) {
+                // Pointer to the data read from base device
+                uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
+                // Get the xor'ed data read from COW device
+                uint8_t* xor_data = reinterpret_cast<uint8_t*>((char*)bufsink_.GetPayloadBufPtr() +
+                                                               xor_buf_offset);
+
+                for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
+                    buffer[byte_offset] ^= xor_data[byte_offset];
+                }
+
+                // Move to next XOR op
+                xor_index += 1;
+                xor_buf_offset += BLOCK_SZ;
+            }
+        }
+
+        buffer_offset += BLOCK_SZ;
+        block_xor_index += 1;
+    }
+
+    bufsink_.ResetBufferOffset();
+}
+
+bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index,
+                            std::vector<const CowOperation*>& xor_op_vec) {
+    // Process the XOR ops in parallel - We will be reading data
+    // from COW file for XOR ops processing.
+    while (block_index < blocks_.size()) {
+        uint64_t new_block = blocks_[block_index];
+
+        if (xor_op_index < xor_op_vec.size()) {
+            const CowOperation* xor_op = xor_op_vec[xor_op_index];
+            if (xor_op->new_block == new_block) {
+                if (!reader_->ReadData(*xor_op, &bufsink_)) {
+                    SNAP_LOG(ERROR)
+                            << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
+                    return false;
+                }
+
+                xor_op_index += 1;
+                bufsink_.UpdateBufferOffset(BLOCK_SZ);
+            }
+        }
+        block_index += 1;
+    }
+    return true;
+}
+
+bool ReadAhead::ReadAheadSyncIO() {
+    int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
+    loff_t buffer_offset = 0;
+    total_blocks_merged_ = 0;
+    overlap_ = false;
+    dest_blocks_.clear();
+    source_blocks_.clear();
+    blocks_.clear();
+    std::vector<const CowOperation*> xor_op_vec;
+
+    bufsink_.ResetBufferOffset();
 
     // Number of ops to be merged in this window. This is a fixed size
     // except for the last window wherein the number of ops can be less
@@ -209,7 +495,7 @@
     while (num_ops) {
         uint64_t source_offset;
 
-        int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks, xor_op_vec);
+        int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
         if (linear_blocks == 0) {
             // No more blocks to read
             SNAP_LOG(DEBUG) << " Read-ahead completed....";
@@ -220,7 +506,7 @@
 
         // Read from the base device consecutive set of blocks in one shot
         if (!android::base::ReadFullyAtOffset(backing_store_fd_,
-                                              (char*)ra_temp_buffer.get() + buffer_offset, io_size,
+                                              (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
                                               source_offset)) {
             SNAP_PLOG(ERROR) << "Ordered-op failed. Read from backing store: "
                              << backing_store_device_ << "at block :" << source_offset / BLOCK_SZ
@@ -233,21 +519,19 @@
         }
 
         buffer_offset += io_size;
-        total_blocks_merged += linear_blocks;
+        total_blocks_merged_ += linear_blocks;
         num_ops -= linear_blocks;
     }
 
     // Done with merging ordered ops
-    if (RAIterDone() && total_blocks_merged == 0) {
+    if (RAIterDone() && total_blocks_merged_ == 0) {
         return true;
     }
 
     loff_t metadata_offset = 0;
 
-    auto ra_temp_meta_buffer = std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
-
     struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
-            (char*)ra_temp_meta_buffer.get() + metadata_offset);
+            (char*)ra_temp_meta_buffer_.get() + metadata_offset);
 
     bm->new_block = 0;
     bm->file_offset = 0;
@@ -255,12 +539,15 @@
     loff_t file_offset = snapuserd_->GetBufferDataOffset();
 
     loff_t offset = 0;
-    CHECK(blocks.size() == total_blocks_merged);
+    CHECK(blocks_.size() == total_blocks_merged_);
 
     size_t xor_index = 0;
-    for (size_t block_index = 0; block_index < blocks.size(); block_index++) {
-        void* bufptr = static_cast<void*>((char*)ra_temp_buffer.get() + offset);
-        uint64_t new_block = blocks[block_index];
+    BufferSink bufsink;
+    bufsink.Initialize(BLOCK_SZ * 2);
+
+    for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
+        void* bufptr = static_cast<void*>((char*)ra_temp_buffer_.get() + offset);
+        uint64_t new_block = blocks_[block_index];
 
         if (xor_index < xor_op_vec.size()) {
             const CowOperation* xor_op = xor_op_vec[xor_index];
@@ -268,17 +555,16 @@
             // Check if this block is an XOR op
             if (xor_op->new_block == new_block) {
                 // Read the xor'ed data from COW
-                if (!reader_->ReadData(*xor_op, &bufsink_)) {
+                if (!reader_->ReadData(*xor_op, &bufsink)) {
                     SNAP_LOG(ERROR)
                             << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
                     snapuserd_->ReadAheadIOFailed();
                     return false;
                 }
-
                 // Pointer to the data read from base device
                 uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
                 // Get the xor'ed data read from COW device
-                uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink_.GetPayloadBufPtr());
+                uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink.GetPayloadBufPtr());
 
                 // Retrieve the original data
                 for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
@@ -292,7 +578,7 @@
 
         offset += BLOCK_SZ;
         // Track the metadata blocks which are stored in scratch space
-        bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer.get() +
+        bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
                                                        metadata_offset);
 
         bm->new_block = new_block;
@@ -308,11 +594,34 @@
     // This is important - explicitly set the contents to zero. This is used
     // when re-constructing the data after crash. This indicates end of
     // reading metadata contents when re-constructing the data
-    bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer.get() +
+    bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
                                                    metadata_offset);
     bm->new_block = 0;
     bm->file_offset = 0;
 
+    return true;
+}
+
+bool ReadAhead::ReadAheadIOStart() {
+    // Check if the data has to be constructed from the COW file.
+    // This will be true only once during boot up after a crash
+    // during merge.
+    if (snapuserd_->ShouldReconstructDataFromCow()) {
+        return ReconstructDataFromCow();
+    }
+
+    if (read_ahead_async_) {
+        if (!ReadAheadAsyncIO()) {
+            SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - io_uring processing failure.";
+            return false;
+        }
+    } else {
+        if (!ReadAheadSyncIO()) {
+            SNAP_LOG(ERROR) << "ReadAheadSyncIO failed";
+            return false;
+        }
+    }
+
     // Wait for the merge to finish for the previous RA window. We shouldn't
     // be touching the scratch space until merge is complete of previous RA
     // window. If there is a crash during this time frame, merge should resume
@@ -322,22 +631,22 @@
     }
 
     // Copy the data to scratch space
-    memcpy(metadata_buffer_, ra_temp_meta_buffer.get(), snapuserd_->GetBufferMetadataSize());
-    memcpy(read_ahead_buffer_, ra_temp_buffer.get(), total_blocks_merged * BLOCK_SZ);
+    memcpy(metadata_buffer_, ra_temp_meta_buffer_.get(), snapuserd_->GetBufferMetadataSize());
+    memcpy(read_ahead_buffer_, ra_temp_buffer_.get(), total_blocks_merged_ * BLOCK_SZ);
 
-    offset = 0;
+    loff_t offset = 0;
     std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
     read_ahead_buffer_map.clear();
 
-    for (size_t block_index = 0; block_index < blocks.size(); block_index++) {
+    for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
         void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + offset);
-        uint64_t new_block = blocks[block_index];
+        uint64_t new_block = blocks_[block_index];
 
         read_ahead_buffer_map[new_block] = bufptr;
         offset += BLOCK_SZ;
     }
 
-    snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
+    snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_);
 
     // Flush the data only if we have a overlapping blocks in the region
     // Notify the Merge thread to resume merging this window
@@ -350,6 +659,33 @@
     return true;
 }
 
+bool ReadAhead::InitializeIouring() {
+    if (!snapuserd_->IsIouringSupported()) {
+        return false;
+    }
+
+    ring_ = std::make_unique<struct io_uring>();
+
+    int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
+    if (ret) {
+        SNAP_LOG(ERROR) << "io_uring_queue_init failed with ret: " << ret;
+        return false;
+    }
+
+    // For xor ops processing
+    bufsink_.Initialize(PAYLOAD_BUFFER_SZ * 2);
+    read_ahead_async_ = true;
+
+    SNAP_LOG(INFO) << "Read-ahead: io_uring initialized with queue depth: " << queue_depth_;
+    return true;
+}
+
+void ReadAhead::FinalizeIouring() {
+    if (read_ahead_async_) {
+        io_uring_queue_exit(ring_.get());
+    }
+}
+
 bool ReadAhead::RunThread() {
     if (!InitializeFds()) {
         return false;
@@ -363,15 +699,19 @@
 
     InitializeRAIter();
 
+    InitializeIouring();
+
     while (!RAIterDone()) {
         if (!ReadAheadIOStart()) {
             break;
         }
     }
 
+    FinalizeIouring();
     CloseFds();
     reader_->CloseCowFd();
-    SNAP_LOG(INFO) << " ReadAhead thread terminating....";
+
+    SNAP_LOG(INFO) << " ReadAheadAsync thread terminating....";
     return true;
 }
 
@@ -434,8 +774,9 @@
     metadata_buffer_ =
             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferMetadataOffset());
     read_ahead_buffer_ = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
-    // For xor ops
-    bufsink_.Initialize(PAYLOAD_BUFFER_SZ);
+
+    ra_temp_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferDataSize());
+    ra_temp_meta_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
 }
 
 }  // namespace snapshot