diff --git a/fs_mgr/fs_mgr_fstab.cpp b/fs_mgr/fs_mgr_fstab.cpp
index 0ca1946..143e980 100644
--- a/fs_mgr/fs_mgr_fstab.cpp
+++ b/fs_mgr/fs_mgr_fstab.cpp
@@ -657,7 +657,12 @@
             if (partition_ext4 == fstab->end()) {
                 auto new_entry = *GetEntryForMountPoint(fstab, mount_point);
                 new_entry.fs_type = "ext4";
-                fstab->emplace_back(new_entry);
+                auto it = std::find_if(fstab->rbegin(), fstab->rend(),
+                                       [&mount_point](const auto& entry) {
+                                           return entry.mount_point == mount_point;
+                                       });
+                auto end_of_mount_point_group = fstab->begin() + std::distance(it, fstab->rend());
+                fstab->insert(end_of_mount_point_group, new_entry);
             }
         }
     }
diff --git a/fs_mgr/libfiemap/image_manager.cpp b/fs_mgr/libfiemap/image_manager.cpp
index 7ccdbf9..c416f4d 100644
--- a/fs_mgr/libfiemap/image_manager.cpp
+++ b/fs_mgr/libfiemap/image_manager.cpp
@@ -79,7 +79,7 @@
     partition_opener_ = std::make_unique<android::fs_mgr::PartitionOpener>();
 
     // Allow overriding whether ImageManager thinks it's in recovery, for testing.
-#ifdef __ANDROID_RECOVERY__
+#ifdef __ANDROID_RAMDISK__
     device_info_.is_recovery = {true};
 #else
     if (!device_info_.is_recovery.has_value()) {
@@ -523,7 +523,7 @@
 
     auto image_header = GetImageHeaderPath(name);
 
-#if !defined __ANDROID_RECOVERY__
+#ifndef __ANDROID_RAMDISK__
     // If there is a device-mapper node wrapping the block device, then we're
     // able to create another node around it; the dm layer does not carry the
     // exclusion lock down the stack when a mount occurs.
diff --git a/fs_mgr/libsnapshot/Android.bp b/fs_mgr/libsnapshot/Android.bp
index 8b269cd..6db8f13 100644
--- a/fs_mgr/libsnapshot/Android.bp
+++ b/fs_mgr/libsnapshot/Android.bp
@@ -275,6 +275,24 @@
     ],
 }
 
+cc_test {
+    name: "vts_ota_config_test",
+    srcs: [
+        "vts_ota_config_test.cpp",
+    ],
+    shared_libs: [
+        "libbase",
+    ],
+    test_suites: [
+        "vts",
+    ],
+    test_options: {
+        min_shipping_api_level: 33,
+    },
+    auto_gen_config: true,
+    require_root: true,
+}
+
 cc_binary {
     name: "snapshotctl",
     srcs: [
diff --git a/fs_mgr/libsnapshot/cow_reader.cpp b/fs_mgr/libsnapshot/cow_reader.cpp
index 9b5fd2a..746feeb 100644
--- a/fs_mgr/libsnapshot/cow_reader.cpp
+++ b/fs_mgr/libsnapshot/cow_reader.cpp
@@ -550,6 +550,9 @@
     const CowOperation& Get() override;
     void Next() override;
 
+    void Prev() override;
+    bool RDone() override;
+
   private:
     std::shared_ptr<std::vector<CowOperation>> ops_;
     std::vector<CowOperation>::iterator op_iter_;
@@ -560,6 +563,15 @@
     op_iter_ = ops_->begin();
 }
 
+bool CowOpIter::RDone() {
+    return op_iter_ == ops_->begin();
+}
+
+void CowOpIter::Prev() {
+    CHECK(!RDone());
+    op_iter_--;
+}
+
 bool CowOpIter::Done() {
     return op_iter_ == ops_->end();
 }
@@ -585,6 +597,9 @@
     const CowOperation& Get() override;
     void Next() override;
 
+    void Prev() override;
+    bool RDone() override;
+
   private:
     std::shared_ptr<std::vector<CowOperation>> ops_;
     std::shared_ptr<std::vector<uint32_t>> merge_op_blocks_;
@@ -603,6 +618,9 @@
     const CowOperation& Get() override;
     void Next() override;
 
+    void Prev() override;
+    bool RDone() override;
+
   private:
     std::shared_ptr<std::vector<CowOperation>> ops_;
     std::shared_ptr<std::vector<uint32_t>> merge_op_blocks_;
@@ -623,6 +641,15 @@
     block_iter_ = merge_op_blocks->begin() + start;
 }
 
+bool CowMergeOpIter::RDone() {
+    return block_iter_ == merge_op_blocks_->begin();
+}
+
+void CowMergeOpIter::Prev() {
+    CHECK(!RDone());
+    block_iter_--;
+}
+
 bool CowMergeOpIter::Done() {
     return block_iter_ == merge_op_blocks_->end();
 }
@@ -649,6 +676,15 @@
     block_riter_ = merge_op_blocks->rbegin();
 }
 
+bool CowRevMergeOpIter::RDone() {
+    return block_riter_ == merge_op_blocks_->rbegin();
+}
+
+void CowRevMergeOpIter::Prev() {
+    CHECK(!RDone());
+    block_riter_--;
+}
+
 bool CowRevMergeOpIter::Done() {
     return block_riter_ == merge_op_blocks_->rend() - start_;
 }
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
index d5b4335..8e6bbd9 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/cow_reader.h
@@ -92,7 +92,7 @@
   public:
     virtual ~ICowOpIter() {}
 
-    // True if there are more items to read, false otherwise.
+    // True if there are no more items to read forward, false otherwise.
     virtual bool Done() = 0;
 
     // Read the current operation.
@@ -100,6 +100,12 @@
 
     // Advance to the next item.
     virtual void Next() = 0;
+
+    // Advance to the previous item.
+    virtual void Prev() = 0;
+
+    // True if there are no more items to read backwards, false otherwise
+    virtual bool RDone() = 0;
 };
 
 class CowReader final : public ICowReader {
diff --git a/fs_mgr/libsnapshot/snapshot_test.cpp b/fs_mgr/libsnapshot/snapshot_test.cpp
index d76558b..04d228d 100644
--- a/fs_mgr/libsnapshot/snapshot_test.cpp
+++ b/fs_mgr/libsnapshot/snapshot_test.cpp
@@ -46,8 +46,6 @@
 #include "partition_cow_creator.h"
 #include "utility.h"
 
-#include <android-base/properties.h>
-
 // Mock classes are not used. Header included to ensure mocked class definition aligns with the
 // class itself.
 #include <libsnapshot/mock_device_info.h>
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index b988c7b..692cb74 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -492,22 +492,17 @@
         return;
     }
 
-    if (IsIouringSupported()) {
-        std::async(std::launch::async, &SnapshotHandler::ReadBlocksAsync, this, dm_block_device,
-                   partition_name, dev_sz);
-    } else {
-        int num_threads = 2;
-        size_t num_blocks = dev_sz >> BLOCK_SHIFT;
-        size_t num_blocks_per_thread = num_blocks / num_threads;
-        size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT;
-        off_t offset = 0;
+    int num_threads = 2;
+    size_t num_blocks = dev_sz >> BLOCK_SHIFT;
+    size_t num_blocks_per_thread = num_blocks / num_threads;
+    size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT;
+    off_t offset = 0;
 
-        for (int i = 0; i < num_threads; i++) {
-            std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this,
-                       dm_block_device, partition_name, offset, read_sz_per_thread);
+    for (int i = 0; i < num_threads; i++) {
+        std::async(std::launch::async, &SnapshotHandler::ReadBlocksToCache, this, dm_block_device,
+                   partition_name, offset, read_sz_per_thread);
 
-            offset += read_sz_per_thread;
-        }
+        offset += read_sz_per_thread;
     }
 }
 
@@ -694,10 +689,8 @@
     // During selinux init transition, libsnapshot will propagate the
     // status of io_uring enablement. As properties are not initialized,
     // we cannot query system property.
-    //
-    // TODO: b/219642530: Intermittent I/O failures observed
     if (is_io_uring_enabled_) {
-        return false;
+        return true;
     }
 
     // Finally check the system property
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index cc82985..83d40f6 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -99,6 +99,7 @@
     void InitializeRAIter();
     bool RAIterDone();
     void RAIterNext();
+    void RAResetIter(uint64_t num_blocks);
     const CowOperation* GetRAOpIter();
 
     void InitializeBuffer();
@@ -151,12 +152,16 @@
     std::unique_ptr<uint8_t[]> ra_temp_meta_buffer_;
     BufferSink bufsink_;
 
+    uint64_t total_ra_blocks_completed_ = 0;
     bool read_ahead_async_ = false;
-    // Queue depth of 32 seems optimal. We don't want
+    // Queue depth of 8 seems optimal. We don't want
     // to have a huge depth as it may put more memory pressure
     // on the kernel worker threads given that we use
-    // IOSQE_ASYNC flag.
-    int queue_depth_ = 32;
+    // IOSQE_ASYNC flag - ASYNC flags can potentially
+    // result in EINTR; Since we don't restart
+    // syscalls and fallback to synchronous I/O, we
+    // don't want huge queue depth
+    int queue_depth_ = 8;
     std::unique_ptr<struct io_uring> ring_;
 };
 
@@ -210,11 +215,12 @@
 
     // Merge related ops
     bool Merge();
-    bool MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
-    bool MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter);
-    bool MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter);
+    bool AsyncMerge();
+    bool SyncMerge();
+    bool MergeOrderedOps();
+    bool MergeOrderedOpsAsync();
+    bool MergeReplaceZeroOps();
     int PrepareMerge(uint64_t* source_offset, int* pending_ops,
-                     const std::unique_ptr<ICowOpIter>& cowop_iter,
                      std::vector<const CowOperation*>* replace_zero_vec = nullptr);
 
     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
@@ -238,12 +244,18 @@
     unique_fd base_path_merge_fd_;
     unique_fd ctrl_fd_;
 
+    std::unique_ptr<ICowOpIter> cowop_iter_;
+    size_t ra_block_index_ = 0;
+    uint64_t blocks_merged_in_group_ = 0;
     bool merge_async_ = false;
-    // Queue depth of 32 seems optimal. We don't want
+    // Queue depth of 8 seems optimal. We don't want
     // to have a huge depth as it may put more memory pressure
     // on the kernel worker threads given that we use
-    // IOSQE_ASYNC flag.
-    int queue_depth_ = 32;
+    // IOSQE_ASYNC flag - ASYNC flags can potentially
+    // result in EINTR; Since we don't restart
+    // syscalls and fallback to synchronous I/O, we
+    // don't want huge queue depth
+    int queue_depth_ = 8;
     std::unique_ptr<struct io_uring> ring_;
 
     std::shared_ptr<SnapshotHandler> snapuserd_;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
index ffb982a..0cb41d3 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
@@ -24,15 +24,14 @@
 using android::base::unique_fd;
 
 int Worker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
-                         const std::unique_ptr<ICowOpIter>& cowop_iter,
                          std::vector<const CowOperation*>* replace_zero_vec) {
     int num_ops = *pending_ops;
     int nr_consecutive = 0;
     bool checkOrderedOp = (replace_zero_vec == nullptr);
 
     do {
-        if (!cowop_iter->Done() && num_ops) {
-            const CowOperation* cow_op = &cowop_iter->Get();
+        if (!cowop_iter_->Done() && num_ops) {
+            const CowOperation* cow_op = &cowop_iter_->Get();
             if (checkOrderedOp && !IsOrderedOp(*cow_op)) {
                 break;
             }
@@ -42,12 +41,12 @@
                 replace_zero_vec->push_back(cow_op);
             }
 
-            cowop_iter->Next();
+            cowop_iter_->Next();
             num_ops -= 1;
             nr_consecutive = 1;
 
-            while (!cowop_iter->Done() && num_ops) {
-                const CowOperation* op = &cowop_iter->Get();
+            while (!cowop_iter_->Done() && num_ops) {
+                const CowOperation* op = &cowop_iter_->Get();
                 if (checkOrderedOp && !IsOrderedOp(*op)) {
                     break;
                 }
@@ -63,7 +62,7 @@
 
                 nr_consecutive += 1;
                 num_ops -= 1;
-                cowop_iter->Next();
+                cowop_iter_->Next();
             }
         }
     } while (0);
@@ -71,7 +70,7 @@
     return nr_consecutive;
 }
 
-bool Worker::MergeReplaceZeroOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
+bool Worker::MergeReplaceZeroOps() {
     // Flush every 8192 ops. Since all ops are independent and there is no
     // dependency between COW ops, we will flush the data and the number
     // of ops merged in COW file for every 8192 ops. If there is a crash,
@@ -84,15 +83,17 @@
     int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32;
     int num_ops_merged = 0;
 
-    while (!cowop_iter->Done()) {
+    SNAP_LOG(INFO) << "MergeReplaceZeroOps started....";
+
+    while (!cowop_iter_->Done()) {
         int num_ops = PAYLOAD_BUFFER_SZ / BLOCK_SZ;
         std::vector<const CowOperation*> replace_zero_vec;
         uint64_t source_offset;
 
-        int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter, &replace_zero_vec);
+        int linear_blocks = PrepareMerge(&source_offset, &num_ops, &replace_zero_vec);
         if (linear_blocks == 0) {
             // Merge complete
-            CHECK(cowop_iter->Done());
+            CHECK(cowop_iter_->Done());
             break;
         }
 
@@ -117,8 +118,8 @@
         size_t io_size = linear_blocks * BLOCK_SZ;
 
         // Merge - Write the contents back to base device
-        int ret = pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(), io_size,
-                         source_offset);
+        int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(), bufsink_.GetPayloadBufPtr(),
+                                            io_size, source_offset));
         if (ret < 0 || ret != io_size) {
             SNAP_LOG(ERROR)
                     << "Merge: ReplaceZeroOps: Failed to write to backing device while merging "
@@ -172,16 +173,15 @@
     return true;
 }
 
-bool Worker::MergeOrderedOpsAsync(const std::unique_ptr<ICowOpIter>& cowop_iter) {
+bool Worker::MergeOrderedOpsAsync() {
     void* mapped_addr = snapuserd_->GetMappedAddr();
     void* read_ahead_buffer =
             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
-    size_t block_index = 0;
 
     SNAP_LOG(INFO) << "MergeOrderedOpsAsync started....";
 
-    while (!cowop_iter->Done()) {
-        const CowOperation* cow_op = &cowop_iter->Get();
+    while (!cowop_iter_->Done()) {
+        const CowOperation* cow_op = &cowop_iter_->Get();
         if (!IsOrderedOp(*cow_op)) {
             break;
         }
@@ -190,11 +190,10 @@
         // Wait for RA thread to notify that the merge window
         // is ready for merging.
         if (!snapuserd_->WaitForMergeBegin()) {
-            snapuserd_->SetMergeFailed(block_index);
             return false;
         }
 
-        snapuserd_->SetMergeInProgress(block_index);
+        snapuserd_->SetMergeInProgress(ra_block_index_);
 
         loff_t offset = 0;
         int num_ops = snapuserd_->GetTotalBlocksToMerge();
@@ -202,12 +201,13 @@
         int pending_sqe = queue_depth_;
         int pending_ios_to_submit = 0;
         bool flush_required = false;
+        blocks_merged_in_group_ = 0;
 
         SNAP_LOG(DEBUG) << "Merging copy-ops of size: " << num_ops;
         while (num_ops) {
             uint64_t source_offset;
 
-            int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter);
+            int linear_blocks = PrepareMerge(&source_offset, &num_ops);
 
             if (linear_blocks != 0) {
                 size_t io_size = (linear_blocks * BLOCK_SZ);
@@ -216,7 +216,6 @@
                 struct io_uring_sqe* sqe = io_uring_get_sqe(ring_.get());
                 if (!sqe) {
                     SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during merge-ordered ops";
-                    snapuserd_->SetMergeFailed(block_index);
                     return false;
                 }
 
@@ -225,10 +224,18 @@
 
                 offset += io_size;
                 num_ops -= linear_blocks;
+                blocks_merged_in_group_ += linear_blocks;
 
                 pending_sqe -= 1;
                 pending_ios_to_submit += 1;
-                sqe->flags |= IOSQE_ASYNC;
+                // These flags are important - We need to make sure that the
+                // blocks are linked and are written in the same order as
+                // populated. This is because of overlapping block writes.
+                //
+                // If there are no dependency, we can optimize this further by
+                // allowing parallel writes; but for now, just link all the SQ
+                // entries.
+                sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
             }
 
             // Ring is full or no more COW ops to be merged in this batch
@@ -256,7 +263,7 @@
                             pending_sqe -= 1;
                             flush_required = false;
                             pending_ios_to_submit += 1;
-                            sqe->flags |= IOSQE_ASYNC;
+                            sqe->flags |= (IOSQE_IO_LINK | IOSQE_ASYNC);
                         }
                     } else {
                         flush_required = true;
@@ -269,35 +276,45 @@
                     SNAP_PLOG(ERROR)
                             << "io_uring_submit failed for read-ahead: "
                             << " io submit: " << ret << " expected: " << pending_ios_to_submit;
-                    snapuserd_->SetMergeFailed(block_index);
                     return false;
                 }
 
                 int pending_ios_to_complete = pending_ios_to_submit;
                 pending_ios_to_submit = 0;
 
+                bool status = true;
+
                 // Reap I/O completions
                 while (pending_ios_to_complete) {
                     struct io_uring_cqe* cqe;
 
+                    // We need to make sure to reap all the I/O's submitted
+                    // even if there are any errors observed.
+                    //
+                    // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
+                    // these error codes are not truly I/O errors; we can retry them
+                    // by re-populating the SQE entries and submitting the I/O
+                    // request back. However, we don't do that now; instead we
+                    // will fallback to synchronous I/O.
                     ret = io_uring_wait_cqe(ring_.get(), &cqe);
                     if (ret) {
-                        SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
-                        snapuserd_->SetMergeFailed(block_index);
-                        return false;
+                        SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed: " << ret;
+                        status = false;
                     }
 
                     if (cqe->res < 0) {
-                        SNAP_LOG(ERROR)
-                                << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
-                        snapuserd_->SetMergeFailed(block_index);
-                        return false;
+                        SNAP_LOG(ERROR) << "Merge: io_uring_wait_cqe failed with res: " << cqe->res;
+                        status = false;
                     }
 
                     io_uring_cqe_seen(ring_.get(), cqe);
                     pending_ios_to_complete -= 1;
                 }
 
+                if (!status) {
+                    return false;
+                }
+
                 pending_sqe = queue_depth_;
             }
 
@@ -312,7 +329,6 @@
         // Flush the data
         if (flush_required && (fsync(base_path_merge_fd_.get()) < 0)) {
             SNAP_LOG(ERROR) << " Failed to fsync merged data";
-            snapuserd_->SetMergeFailed(block_index);
             return false;
         }
 
@@ -320,35 +336,34 @@
         // the merge completion
         if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
-            snapuserd_->SetMergeFailed(block_index);
             return false;
         }
 
         SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
+
         // Mark the block as merge complete
-        snapuserd_->SetMergeCompleted(block_index);
+        snapuserd_->SetMergeCompleted(ra_block_index_);
 
         // Notify RA thread that the merge thread is ready to merge the next
         // window
         snapuserd_->NotifyRAForMergeReady();
 
         // Get the next block
-        block_index += 1;
+        ra_block_index_ += 1;
     }
 
     return true;
 }
 
-bool Worker::MergeOrderedOps(const std::unique_ptr<ICowOpIter>& cowop_iter) {
+bool Worker::MergeOrderedOps() {
     void* mapped_addr = snapuserd_->GetMappedAddr();
     void* read_ahead_buffer =
             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
-    size_t block_index = 0;
 
     SNAP_LOG(INFO) << "MergeOrderedOps started....";
 
-    while (!cowop_iter->Done()) {
-        const CowOperation* cow_op = &cowop_iter->Get();
+    while (!cowop_iter_->Done()) {
+        const CowOperation* cow_op = &cowop_iter_->Get();
         if (!IsOrderedOp(*cow_op)) {
             break;
         }
@@ -357,11 +372,11 @@
         // Wait for RA thread to notify that the merge window
         // is ready for merging.
         if (!snapuserd_->WaitForMergeBegin()) {
-            snapuserd_->SetMergeFailed(block_index);
+            snapuserd_->SetMergeFailed(ra_block_index_);
             return false;
         }
 
-        snapuserd_->SetMergeInProgress(block_index);
+        snapuserd_->SetMergeInProgress(ra_block_index_);
 
         loff_t offset = 0;
         int num_ops = snapuserd_->GetTotalBlocksToMerge();
@@ -369,7 +384,7 @@
         while (num_ops) {
             uint64_t source_offset;
 
-            int linear_blocks = PrepareMerge(&source_offset, &num_ops, cowop_iter);
+            int linear_blocks = PrepareMerge(&source_offset, &num_ops);
             if (linear_blocks == 0) {
                 break;
             }
@@ -378,12 +393,13 @@
             // Write to the base device. Data is already in the RA buffer. Note
             // that XOR ops is already handled by the RA thread. We just write
             // the contents out.
-            int ret = pwrite(base_path_merge_fd_.get(), (char*)read_ahead_buffer + offset, io_size,
-                             source_offset);
+            int ret = TEMP_FAILURE_RETRY(pwrite(base_path_merge_fd_.get(),
+                                                (char*)read_ahead_buffer + offset, io_size,
+                                                source_offset));
             if (ret < 0 || ret != io_size) {
                 SNAP_LOG(ERROR) << "Failed to write to backing device while merging "
                                 << " at offset: " << source_offset << " io_size: " << io_size;
-                snapuserd_->SetMergeFailed(block_index);
+                snapuserd_->SetMergeFailed(ra_block_index_);
                 return false;
             }
 
@@ -397,7 +413,7 @@
         // Flush the data
         if (fsync(base_path_merge_fd_.get()) < 0) {
             SNAP_LOG(ERROR) << " Failed to fsync merged data";
-            snapuserd_->SetMergeFailed(block_index);
+            snapuserd_->SetMergeFailed(ra_block_index_);
             return false;
         }
 
@@ -405,47 +421,87 @@
         // the merge completion
         if (!snapuserd_->CommitMerge(snapuserd_->GetTotalBlocksToMerge())) {
             SNAP_LOG(ERROR) << " Failed to commit the merged block in the header";
-            snapuserd_->SetMergeFailed(block_index);
+            snapuserd_->SetMergeFailed(ra_block_index_);
             return false;
         }
 
         SNAP_LOG(DEBUG) << "Block commit of size: " << snapuserd_->GetTotalBlocksToMerge();
         // Mark the block as merge complete
-        snapuserd_->SetMergeCompleted(block_index);
+        snapuserd_->SetMergeCompleted(ra_block_index_);
 
         // Notify RA thread that the merge thread is ready to merge the next
         // window
         snapuserd_->NotifyRAForMergeReady();
 
         // Get the next block
-        block_index += 1;
+        ra_block_index_ += 1;
     }
 
     return true;
 }
 
-bool Worker::Merge() {
-    std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetMergeOpIter();
+bool Worker::AsyncMerge() {
+    if (!MergeOrderedOpsAsync()) {
+        SNAP_LOG(ERROR) << "MergeOrderedOpsAsync failed - Falling back to synchronous I/O";
+        // Reset the iter so that we retry the merge
+        while (blocks_merged_in_group_ && !cowop_iter_->RDone()) {
+            cowop_iter_->Prev();
+            blocks_merged_in_group_ -= 1;
+        }
 
+        return false;
+    }
+
+    SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed";
+    return true;
+}
+
+bool Worker::SyncMerge() {
+    if (!MergeOrderedOps()) {
+        SNAP_LOG(ERROR) << "Merge failed for ordered ops";
+        return false;
+    }
+
+    SNAP_LOG(INFO) << "MergeOrderedOps completed";
+    return true;
+}
+
+bool Worker::Merge() {
+    cowop_iter_ = reader_->GetMergeOpIter();
+
+    bool retry = false;
+    bool ordered_ops_merge_status;
+
+    // Start Async Merge
     if (merge_async_) {
-        if (!MergeOrderedOpsAsync(cowop_iter)) {
+        ordered_ops_merge_status = AsyncMerge();
+        if (!ordered_ops_merge_status) {
+            FinalizeIouring();
+            retry = true;
+            merge_async_ = false;
+        }
+    }
+
+    // Check if we need to fallback and retry the merge
+    //
+    // If the device doesn't support async merge, we
+    // will directly enter here (aka devices with 4.x kernels)
+    const bool sync_merge_required = (retry || !merge_async_);
+
+    if (sync_merge_required) {
+        ordered_ops_merge_status = SyncMerge();
+        if (!ordered_ops_merge_status) {
+            // Merge failed. Device will continue to be mounted
+            // off snapshots; merge will be retried during
+            // next reboot
             SNAP_LOG(ERROR) << "Merge failed for ordered ops";
             snapuserd_->MergeFailed();
             return false;
         }
-        SNAP_LOG(INFO) << "MergeOrderedOpsAsync completed.....";
-    } else {
-        // Start with Copy and Xor ops
-        if (!MergeOrderedOps(cowop_iter)) {
-            SNAP_LOG(ERROR) << "Merge failed for ordered ops";
-            snapuserd_->MergeFailed();
-            return false;
-        }
-        SNAP_LOG(INFO) << "MergeOrderedOps completed.....";
     }
 
     // Replace and Zero ops
-    if (!MergeReplaceZeroOps(cowop_iter)) {
+    if (!MergeReplaceZeroOps()) {
         SNAP_LOG(ERROR) << "Merge failed for replace/zero ops";
         snapuserd_->MergeFailed();
         return false;
@@ -461,14 +517,6 @@
         return false;
     }
 
-    {
-        // TODO: b/219642530 - Disable io_uring for merge
-        // until we figure out the cause of intermittent
-        // IO failures.
-        merge_async_ = false;
-        return true;
-    }
-
     ring_ = std::make_unique<struct io_uring>();
 
     int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
@@ -514,7 +562,7 @@
     CloseFds();
     reader_->CloseCowFd();
 
-    SNAP_LOG(INFO) << "Merge finish";
+    SNAP_LOG(INFO) << "Snapshot-Merge completed";
 
     return true;
 }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
index 26c5f19..7d9d392 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -279,7 +279,6 @@
             sqe = io_uring_get_sqe(ring_.get());
             if (!sqe) {
                 SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during read-ahead";
-                snapuserd_->ReadAheadIOFailed();
                 return false;
             }
 
@@ -309,7 +308,6 @@
             if (ret != pending_ios_to_submit) {
                 SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: "
                                  << " io submit: " << ret << " expected: " << pending_ios_to_submit;
-                snapuserd_->ReadAheadIOFailed();
                 return false;
             }
 
@@ -321,14 +319,12 @@
             // Read XOR data from COW file in parallel when I/O's are in-flight
             if (xor_processing_required && !ReadXorData(block_index, xor_op_index, xor_op_vec)) {
                 SNAP_LOG(ERROR) << "ReadXorData failed";
-                snapuserd_->ReadAheadIOFailed();
                 return false;
             }
 
             // Fetch I/O completions
             if (!ReapIoCompletions(pending_ios_to_complete)) {
                 SNAP_LOG(ERROR) << "ReapIoCompletions failed";
-                snapuserd_->ReadAheadIOFailed();
                 return false;
             }
 
@@ -393,26 +389,36 @@
 }
 
 bool ReadAhead::ReapIoCompletions(int pending_ios_to_complete) {
+    bool status = true;
+
     // Reap I/O completions
     while (pending_ios_to_complete) {
         struct io_uring_cqe* cqe;
 
+        // We need to make sure to reap all the I/O's submitted
+        // even if there are any errors observed.
+        //
+        // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
+        // these error codes are not truly I/O errors; we can retry them
+        // by re-populating the SQE entries and submitting the I/O
+        // request back. However, we don't do that now; instead we
+        // will fallback to synchronous I/O.
         int ret = io_uring_wait_cqe(ring_.get(), &cqe);
         if (ret) {
             SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
-            return false;
+            status = false;
         }
 
         if (cqe->res < 0) {
             SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
-            return false;
+            status = false;
         }
 
         io_uring_cqe_seen(ring_.get(), cqe);
         pending_ios_to_complete -= 1;
     }
 
-    return true;
+    return status;
 }
 
 void ReadAhead::ProcessXorData(size_t& block_xor_index, size_t& xor_index,
@@ -610,18 +616,38 @@
         return ReconstructDataFromCow();
     }
 
+    bool retry = false;
+    bool ra_status;
+
+    // Start Async read-ahead
     if (read_ahead_async_) {
-        if (!ReadAheadAsyncIO()) {
-            SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - io_uring processing failure.";
-            return false;
+        ra_status = ReadAheadAsyncIO();
+        if (!ra_status) {
+            SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - Falling back synchronous I/O";
+            FinalizeIouring();
+            RAResetIter(total_blocks_merged_);
+            retry = true;
+            read_ahead_async_ = false;
         }
-    } else {
-        if (!ReadAheadSyncIO()) {
+    }
+
+    // Check if we need to fallback and retry the merge
+    //
+    // If the device doesn't support async operations, we
+    // will directly enter here (aka devices with 4.x kernels)
+
+    const bool ra_sync_required = (retry || !read_ahead_async_);
+
+    if (ra_sync_required) {
+        ra_status = ReadAheadSyncIO();
+        if (!ra_status) {
             SNAP_LOG(ERROR) << "ReadAheadSyncIO failed";
             return false;
         }
     }
 
+    SNAP_LOG(DEBUG) << "Read-ahead: total_ra_blocks_merged: " << total_ra_blocks_completed_;
+
     // Wait for the merge to finish for the previous RA window. We shouldn't
     // be touching the scratch space until merge is complete of previous RA
     // window. If there is a crash during this time frame, merge should resume
@@ -646,6 +672,7 @@
         offset += BLOCK_SZ;
     }
 
+    total_ra_blocks_completed_ += total_blocks_merged_;
     snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_);
 
     // Flush the data only if we have a overlapping blocks in the region
@@ -763,6 +790,13 @@
     cowop_iter_->Next();
 }
 
+void ReadAhead::RAResetIter(uint64_t num_blocks) {
+    while (num_blocks && !cowop_iter_->RDone()) {
+        cowop_iter_->Prev();
+        num_blocks -= 1;
+    }
+}
+
 const CowOperation* ReadAhead::GetRAOpIter() {
     const CowOperation* cow_op = &cowop_iter_->Get();
     return cow_op;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
index 6dec1e2..d4e1d7c 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
@@ -531,6 +531,13 @@
     {
         std::unique_lock<std::mutex> lock(blk_state->m_lock);
 
+        // We may have fallback from Async-merge to synchronous merging
+        // on the existing block. There is no need to reset as the
+        // merge is already in progress.
+        if (blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_IN_PROGRESS) {
+            return;
+        }
+
         CHECK(blk_state->merge_state_ == MERGE_GROUP_STATE::GROUP_MERGE_PENDING);
 
         // First set the state to RA_READY so that in-flight I/O will drain
diff --git a/fs_mgr/libsnapshot/vts_ota_config_test.cpp b/fs_mgr/libsnapshot/vts_ota_config_test.cpp
new file mode 100644
index 0000000..afc2d81
--- /dev/null
+++ b/fs_mgr/libsnapshot/vts_ota_config_test.cpp
@@ -0,0 +1,23 @@
+//
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <android-base/properties.h>
+#include <gtest/gtest.h>
+
+TEST(VAB, Enabled) {
+    ASSERT_TRUE(android::base::GetBoolProperty("ro.virtual_ab.enabled", false));
+    ASSERT_TRUE(android::base::GetBoolProperty("ro.virtual_ab.userspace.snapshots.enabled", false));
+}
diff --git a/fs_mgr/tests/fs_mgr_test.cpp b/fs_mgr/tests/fs_mgr_test.cpp
index eccb902..1dbee75 100644
--- a/fs_mgr/tests/fs_mgr_test.cpp
+++ b/fs_mgr/tests/fs_mgr_test.cpp
@@ -1104,3 +1104,76 @@
     EXPECT_TRUE(CompareFlags(flags, entry->fs_mgr_flags));
     EXPECT_EQ(0, entry->readahead_size_kb);
 }
+
+TEST(fs_mgr, TransformFstabForDsu) {
+    TemporaryFile tf;
+    ASSERT_TRUE(tf.fd != -1);
+    std::string fstab_contents = R"fs(
+system /system      erofs   ro  wait,logical,first_stage_mount
+system /system      ext4    ro  wait,logical,first_stage_mount
+vendor /vendor      ext4    ro  wait,logical,first_stage_mount
+data   /data        f2fs    noatime     wait
+)fs";
+
+    ASSERT_TRUE(android::base::WriteStringToFile(fstab_contents, tf.path));
+
+    Fstab fstab;
+    EXPECT_TRUE(ReadFstabFromFile(tf.path, &fstab));
+    TransformFstabForDsu(&fstab, "dsu", {"system_gsi", "userdata_gsi"});
+    ASSERT_EQ(4U, fstab.size());
+
+    auto entry = fstab.begin();
+
+    EXPECT_EQ("/system", entry->mount_point);
+    EXPECT_EQ("system_gsi", entry->blk_device);
+    entry++;
+
+    EXPECT_EQ("/system", entry->mount_point);
+    EXPECT_EQ("system_gsi", entry->blk_device);
+    entry++;
+
+    EXPECT_EQ("/vendor", entry->mount_point);
+    EXPECT_EQ("vendor", entry->blk_device);
+    entry++;
+
+    EXPECT_EQ("/data", entry->mount_point);
+    EXPECT_EQ("userdata_gsi", entry->blk_device);
+    entry++;
+}
+
+TEST(fs_mgr, TransformFstabForDsu_synthesisExt4Entry) {
+    TemporaryFile tf;
+    ASSERT_TRUE(tf.fd != -1);
+    std::string fstab_contents = R"fs(
+system /system      erofs   ro  wait,logical,first_stage_mount
+vendor /vendor      ext4    ro  wait,logical,first_stage_mount
+data   /data        f2fs    noatime     wait
+)fs";
+
+    ASSERT_TRUE(android::base::WriteStringToFile(fstab_contents, tf.path));
+
+    Fstab fstab;
+    EXPECT_TRUE(ReadFstabFromFile(tf.path, &fstab));
+    TransformFstabForDsu(&fstab, "dsu", {"system_gsi", "userdata_gsi"});
+    ASSERT_EQ(4U, fstab.size());
+
+    auto entry = fstab.begin();
+
+    EXPECT_EQ("/system", entry->mount_point);
+    EXPECT_EQ("system_gsi", entry->blk_device);
+    EXPECT_EQ("erofs", entry->fs_type);
+    entry++;
+
+    EXPECT_EQ("/system", entry->mount_point);
+    EXPECT_EQ("system_gsi", entry->blk_device);
+    EXPECT_EQ("ext4", entry->fs_type);
+    entry++;
+
+    EXPECT_EQ("/vendor", entry->mount_point);
+    EXPECT_EQ("vendor", entry->blk_device);
+    entry++;
+
+    EXPECT_EQ("/data", entry->mount_point);
+    EXPECT_EQ("userdata_gsi", entry->blk_device);
+    entry++;
+}
diff --git a/init/README.md b/init/README.md
index c102b1f..c82dbfb 100644
--- a/init/README.md
+++ b/init/README.md
@@ -1034,7 +1034,7 @@
 /first_stage_ramdisk to remove the recovery components from the environment, then proceed the same
 as 2). Note that the decision to boot normally into Android instead of booting
 into recovery mode is made if androidboot.force_normal_boot=1 is present in the
-kernel commandline.
+kernel commandline, or in bootconfig with Android S and later.
 
 Once first stage init finishes it execs /system/bin/init with the "selinux_setup" argument. This
 phase is where SELinux is optionally compiled and loaded onto the system. selinux.cpp contains more
diff --git a/libcutils/include/cutils/trace.h b/libcutils/include/cutils/trace.h
index 97e93f8..17a0070 100644
--- a/libcutils/include/cutils/trace.h
+++ b/libcutils/include/cutils/trace.h
@@ -75,7 +75,8 @@
 #define ATRACE_TAG_AIDL             (1<<24)
 #define ATRACE_TAG_NNAPI            (1<<25)
 #define ATRACE_TAG_RRO              (1<<26)
-#define ATRACE_TAG_LAST             ATRACE_TAG_RRO
+#define ATRACE_TAG_THERMAL          (1 << 27)
+#define ATRACE_TAG_LAST             ATRACE_TAG_THERMAL
 
 // Reserved for initialization.
 #define ATRACE_TAG_NOT_READY        (1ULL<<63)
diff --git a/libprocessgroup/task_profiles.cpp b/libprocessgroup/task_profiles.cpp
index c080582..dc7c368 100644
--- a/libprocessgroup/task_profiles.cpp
+++ b/libprocessgroup/task_profiles.cpp
@@ -520,6 +520,7 @@
 bool TaskProfile::ExecuteForProcess(uid_t uid, pid_t pid) const {
     for (const auto& element : elements_) {
         if (!element->ExecuteForProcess(uid, pid)) {
+            LOG(VERBOSE) << "Applying profile action " << element->Name() << " failed";
             return false;
         }
     }
@@ -532,6 +533,7 @@
     }
     for (const auto& element : elements_) {
         if (!element->ExecuteForTask(tid)) {
+            LOG(VERBOSE) << "Applying profile action " << element->Name() << " failed";
             return false;
         }
     }
@@ -643,7 +645,7 @@
 
         std::string profile_name = profile_val["Name"].asString();
         const Json::Value& actions = profile_val["Actions"];
-        auto profile = std::make_shared<TaskProfile>();
+        auto profile = std::make_shared<TaskProfile>(profile_name);
 
         for (Json::Value::ArrayIndex act_idx = 0; act_idx < actions.size(); ++act_idx) {
             const Json::Value& action_val = actions[act_idx];
@@ -755,7 +757,7 @@
             }
         }
         if (ret) {
-            auto profile = std::make_shared<TaskProfile>();
+            auto profile = std::make_shared<TaskProfile>(aggregateprofile_name);
             profile->Add(std::make_unique<ApplyProfileAction>(profiles));
             profiles_[aggregateprofile_name] = profile;
         }
diff --git a/libprocessgroup/task_profiles.h b/libprocessgroup/task_profiles.h
index ecc67a3..9ee3781 100644
--- a/libprocessgroup/task_profiles.h
+++ b/libprocessgroup/task_profiles.h
@@ -59,6 +59,8 @@
 
     virtual ~ProfileAction() {}
 
+    virtual const char* Name() const = 0;
+
     // Default implementations will fail
     virtual bool ExecuteForProcess(uid_t, pid_t) const { return false; };
     virtual bool ExecuteForTask(int) const { return false; };
@@ -75,6 +77,7 @@
   public:
     SetClampsAction(int boost, int clamp) noexcept : boost_(boost), clamp_(clamp) {}
 
+    const char* Name() const override { return "SetClamps"; }
     bool ExecuteForProcess(uid_t uid, pid_t pid) const override;
     bool ExecuteForTask(int tid) const override;
 
@@ -87,6 +90,7 @@
   public:
     SetTimerSlackAction(unsigned long slack) noexcept : slack_(slack) {}
 
+    const char* Name() const override { return "SetTimerSlack"; }
     bool ExecuteForTask(int tid) const override;
 
   private:
@@ -101,6 +105,7 @@
     SetAttributeAction(const IProfileAttribute* attribute, const std::string& value)
         : attribute_(attribute), value_(value) {}
 
+    const char* Name() const override { return "SetAttribute"; }
     bool ExecuteForProcess(uid_t uid, pid_t pid) const override;
     bool ExecuteForTask(int tid) const override;
 
@@ -114,6 +119,7 @@
   public:
     SetCgroupAction(const CgroupController& c, const std::string& p);
 
+    const char* Name() const override { return "SetCgroup"; }
     bool ExecuteForProcess(uid_t uid, pid_t pid) const override;
     bool ExecuteForTask(int tid) const override;
     void EnableResourceCaching(ResourceCacheType cache_type) override;
@@ -137,6 +143,7 @@
     WriteFileAction(const std::string& task_path, const std::string& proc_path,
                     const std::string& value, bool logfailures);
 
+    const char* Name() const override { return "WriteFile"; }
     bool ExecuteForProcess(uid_t uid, pid_t pid) const override;
     bool ExecuteForTask(int tid) const override;
     void EnableResourceCaching(ResourceCacheType cache_type) override;
@@ -155,8 +162,9 @@
 
 class TaskProfile {
   public:
-    TaskProfile() : res_cached_(false) {}
+    TaskProfile(const std::string& name) : name_(name), res_cached_(false) {}
 
+    const std::string& Name() const { return name_; }
     void Add(std::unique_ptr<ProfileAction> e) { elements_.push_back(std::move(e)); }
     void MoveTo(TaskProfile* profile);
 
@@ -166,6 +174,7 @@
     void DropResourceCaching(ProfileAction::ResourceCacheType cache_type);
 
   private:
+    const std::string name_;
     bool res_cached_;
     std::vector<std::unique_ptr<ProfileAction>> elements_;
 };
@@ -176,6 +185,7 @@
     ApplyProfileAction(const std::vector<std::shared_ptr<TaskProfile>>& profiles)
         : profiles_(profiles) {}
 
+    const char* Name() const override { return "ApplyProfileAction"; }
     bool ExecuteForProcess(uid_t uid, pid_t pid) const override;
     bool ExecuteForTask(int tid) const override;
     void EnableResourceCaching(ProfileAction::ResourceCacheType cache_type) override;
diff --git a/libutils/Errors_test.cpp b/libutils/Errors_test.cpp
index 873c994..0d13bb0 100644
--- a/libutils/Errors_test.cpp
+++ b/libutils/Errors_test.cpp
@@ -108,3 +108,65 @@
     status_t b = g(false);
     EXPECT_EQ(PERMISSION_DENIED, b);
 }
+
+TEST(errors, conversion_promotion) {
+    constexpr size_t successVal = 10ull;
+    auto f = [&](bool success) -> Result<size_t, StatusT> {
+        OR_RETURN(success_or_fail(success));
+        return successVal;
+    };
+    auto s = f(true);
+    ASSERT_TRUE(s.ok());
+    EXPECT_EQ(s.value(), successVal);
+    auto r = f(false);
+    EXPECT_TRUE(!r.ok());
+    EXPECT_EQ(PERMISSION_DENIED, r.error().code());
+}
+
+TEST(errors, conversion_promotion_bool) {
+    constexpr size_t successVal = true;
+    auto f = [&](bool success) -> Result<bool, StatusT> {
+        OR_RETURN(success_or_fail(success));
+        return successVal;
+    };
+    auto s = f(true);
+    ASSERT_TRUE(s.ok());
+    EXPECT_EQ(s.value(), successVal);
+    auto r = f(false);
+    EXPECT_TRUE(!r.ok());
+    EXPECT_EQ(PERMISSION_DENIED, r.error().code());
+}
+
+TEST(errors, conversion_promotion_char) {
+    constexpr char successVal = 'a';
+    auto f = [&](bool success) -> Result<unsigned char, StatusT> {
+        OR_RETURN(success_or_fail(success));
+        return successVal;
+    };
+    auto s = f(true);
+    ASSERT_TRUE(s.ok());
+    EXPECT_EQ(s.value(), successVal);
+    auto r = f(false);
+    EXPECT_TRUE(!r.ok());
+    EXPECT_EQ(PERMISSION_DENIED, r.error().code());
+}
+
+struct IntContainer {
+  // Implicit conversion from int is desired
+  IntContainer(int val) : val_(val) {}
+  int val_;
+};
+
+TEST(errors, conversion_construct) {
+    constexpr int successVal = 10;
+    auto f = [&](bool success) -> Result<IntContainer, StatusT> {
+        OR_RETURN(success_or_fail(success));
+        return successVal;
+    };
+    auto s = f(true);
+    ASSERT_TRUE(s.ok());
+    EXPECT_EQ(s.value().val_, successVal);
+    auto r = f(false);
+    EXPECT_TRUE(!r.ok());
+    EXPECT_EQ(PERMISSION_DENIED, r.error().code());
+}
diff --git a/libutils/include/utils/ErrorsMacros.h b/libutils/include/utils/ErrorsMacros.h
index 048c538..fdc46e6 100644
--- a/libutils/include/utils/ErrorsMacros.h
+++ b/libutils/include/utils/ErrorsMacros.h
@@ -25,6 +25,7 @@
 // [1] build/soong/cc/config/global.go#commonGlobalIncludes
 #include <android-base/errors.h>
 #include <android-base/result.h>
+#include <log/log_main.h>
 
 #include <assert.h>
 
@@ -44,13 +45,58 @@
     status_t val_;
 };
 
+
 namespace base {
+// TODO(b/221235365) StatusT fulfill ResultError contract and cleanup.
+
+// Unlike typical ResultError types, the underlying code should be a status_t
+// instead of a StatusT. We also special-case message generation.
+template<>
+struct ResultError<StatusT, false> {
+    ResultError(status_t s) : val_(s) {
+        LOG_FATAL_IF(s == OK, "Result error should not hold success");
+    }
+
+    template <typename T>
+    operator expected<T, ResultError<StatusT, false>>() const {
+        return unexpected(*this);
+    }
+
+    std::string message() const { return statusToString(val_); }
+    status_t code() const { return val_; }
+
+ private:
+    const status_t val_;
+};
+
+template<>
+struct ResultError<StatusT, true> {
+    template <typename T>
+    ResultError(T&& message, status_t s) : val_(s), message_(std::forward<T>(message)) {
+        LOG_FATAL_IF(s == OK, "Result error should not hold success");
+    }
+
+    ResultError(status_t s) : val_(s) {}
+
+    template <typename T>
+    operator expected<T, ResultError<StatusT, true>>() const {
+        return unexpected(*this);
+    }
+
+    status_t code() const { return val_; }
+
+    std::string message() const { return statusToString(val_) + message_; }
+ private:
+    const status_t val_;
+    std::string message_;
+};
 
 // Specialization of android::base::OkOrFail<V> for V = status_t. This is used to use the OR_RETURN
 // and OR_FATAL macros with statements that yields a value of status_t. See android-base/errors.h
 // for the detailed contract.
 template <>
 struct OkOrFail<status_t> {
+    static_assert(std::is_same_v<status_t, int>);
     // Tests if status_t is a success value of not.
     static bool IsOk(const status_t& s) { return s == OK; }
 
@@ -71,16 +117,70 @@
 
     // Or converts into Result<T, StatusT>. This is used when OR_RETURN is used in a function whose
     // return type is Result<T, StatusT>.
-    template <typename T, typename = std::enable_if_t<!std::is_same_v<T, status_t>>>
+
+    template <typename T>
     operator Result<T, StatusT>() && {
-        return Error<StatusT>(std::move(val_));
+        return ResultError<StatusT>(std::move(val_));
     }
 
-    operator Result<int, StatusT>() && { return Error<StatusT>(std::move(val_)); }
+    template<typename T>
+    operator Result<T, StatusT, false>() && {
+        return ResultError<StatusT, false>(std::move(val_));
+    }
 
+    // Since user defined conversion can be followed by numeric conversion,
+    // we have to specialize all conversions to results holding numeric types to
+    // avoid conversion ambiguities with the constructor of expected.
+#pragma push_macro("SPECIALIZED_CONVERSION")
+#define SPECIALIZED_CONVERSION(type)\
+  operator Result<type, StatusT>() && { return ResultError<StatusT>(std::move(val_)); }\
+  operator Result<type, StatusT, false>() && { return ResultError<StatusT, false>(std::move(val_));}
+
+    SPECIALIZED_CONVERSION(int)
+    SPECIALIZED_CONVERSION(short int)
+    SPECIALIZED_CONVERSION(unsigned short int)
+    SPECIALIZED_CONVERSION(unsigned int)
+    SPECIALIZED_CONVERSION(long int)
+    SPECIALIZED_CONVERSION(unsigned long int)
+    SPECIALIZED_CONVERSION(long long int)
+    SPECIALIZED_CONVERSION(unsigned long long int)
+    SPECIALIZED_CONVERSION(bool)
+    SPECIALIZED_CONVERSION(char)
+    SPECIALIZED_CONVERSION(unsigned char)
+    SPECIALIZED_CONVERSION(signed char)
+    SPECIALIZED_CONVERSION(wchar_t)
+    SPECIALIZED_CONVERSION(char16_t)
+    SPECIALIZED_CONVERSION(char32_t)
+    SPECIALIZED_CONVERSION(float)
+    SPECIALIZED_CONVERSION(double)
+    SPECIALIZED_CONVERSION(long double)
+#undef SPECIALIZED_CONVERSION
+#pragma pop_macro("SPECIALIZED_CONVERSION")
     // String representation of the error value.
     static std::string ErrorMessage(const status_t& s) { return statusToString(s); }
 };
-
 }  // namespace base
+
+
+// These conversions make StatusT directly comparable to status_t in order to
+// avoid calling code whenever comparisons are desired.
+
+template <bool include_message>
+bool operator==(const base::ResultError<StatusT, include_message>& l, const status_t& r) {
+    return (l.code() == r);
+}
+template <bool include_message>
+bool operator==(const status_t& l, const base::ResultError<StatusT, include_message>& r) {
+    return (l == r.code());
+}
+
+template <bool include_message>
+bool operator!=(const base::ResultError<StatusT, include_message>& l, const status_t& r) {
+    return (l.code() != r);
+}
+template <bool include_message>
+bool operator!=(const status_t& l, const base::ResultError<StatusT, include_message>& r) {
+    return (l != r.code());
+}
+
 }  // namespace android
diff --git a/trusty/keymaster/include/trusty_keymaster/TrustyKeyMintDevice.h b/trusty/keymaster/include/trusty_keymaster/TrustyKeyMintDevice.h
index 5fd628f..c8d8932 100644
--- a/trusty/keymaster/include/trusty_keymaster/TrustyKeyMintDevice.h
+++ b/trusty/keymaster/include/trusty_keymaster/TrustyKeyMintDevice.h
@@ -27,6 +27,7 @@
 using ::keymaster::TrustyKeymaster;
 using ::ndk::ScopedAStatus;
 using secureclock::TimeStampToken;
+using ::std::array;
 using ::std::optional;
 using ::std::shared_ptr;
 using ::std::vector;
@@ -77,8 +78,13 @@
                                const optional<TimeStampToken>& timestampToken) override;
     ScopedAStatus earlyBootEnded() override;
 
-    ScopedAStatus convertStorageKeyToEphemeral(const std::vector<uint8_t>& storageKeyBlob,
-                                               std::vector<uint8_t>* ephemeralKeyBlob) override;
+    ScopedAStatus convertStorageKeyToEphemeral(const vector<uint8_t>& storageKeyBlob,
+                                               vector<uint8_t>* ephemeralKeyBlob) override;
+
+    ScopedAStatus getRootOfTrustChallenge(array<uint8_t, 16>* challenge) override;
+    ScopedAStatus getRootOfTrust(const array<uint8_t, 16>& challenge,
+                                 vector<uint8_t>* rootOfTrust) override;
+    ScopedAStatus sendRootOfTrust(const vector<uint8_t>& rootOfTrust) override;
 
   protected:
     std::shared_ptr<TrustyKeymaster> impl_;
diff --git a/trusty/keymaster/keymint/TEST_MAPPING b/trusty/keymaster/keymint/TEST_MAPPING
new file mode 100644
index 0000000..2400ccd
--- /dev/null
+++ b/trusty/keymaster/keymint/TEST_MAPPING
@@ -0,0 +1,7 @@
+{
+  "presubmit" : [
+    {
+      "name" : "vts_treble_vintf_framework_test"
+    }
+  ]
+}
\ No newline at end of file
diff --git a/trusty/keymaster/keymint/TrustyKeyMintDevice.cpp b/trusty/keymaster/keymint/TrustyKeyMintDevice.cpp
index 68a7912..44780e8 100644
--- a/trusty/keymaster/keymint/TrustyKeyMintDevice.cpp
+++ b/trusty/keymaster/keymint/TrustyKeyMintDevice.cpp
@@ -306,7 +306,7 @@
 }
 
 ScopedAStatus TrustyKeyMintDevice::convertStorageKeyToEphemeral(
-        const std::vector<uint8_t>& storageKeyBlob, std::vector<uint8_t>* ephemeralKeyBlob) {
+        const vector<uint8_t>& storageKeyBlob, vector<uint8_t>* ephemeralKeyBlob) {
     keymaster::ExportKeyRequest request(impl_->message_version());
     request.SetKeyMaterial(storageKeyBlob.data(), storageKeyBlob.size());
     request.key_format = KM_KEY_FORMAT_RAW;
@@ -321,4 +321,17 @@
     return ScopedAStatus::ok();
 }
 
+ScopedAStatus TrustyKeyMintDevice::getRootOfTrustChallenge(array<uint8_t, 16>* /* challenge */) {
+    return kmError2ScopedAStatus(KM_ERROR_UNIMPLEMENTED);
+}
+
+ScopedAStatus TrustyKeyMintDevice::getRootOfTrust(const array<uint8_t, 16>& /* challenge */,
+                                                  vector<uint8_t>* /* rootOfTrust */) {
+    return kmError2ScopedAStatus(KM_ERROR_UNIMPLEMENTED);
+}
+
+ScopedAStatus TrustyKeyMintDevice::sendRootOfTrust(const vector<uint8_t>& /* rootOfTrust */) {
+    return kmError2ScopedAStatus(KM_ERROR_UNIMPLEMENTED);
+}
+
 }  // namespace aidl::android::hardware::security::keymint::trusty
diff --git a/trusty/keymaster/keymint/TrustyRemotelyProvisionedComponentDevice.cpp b/trusty/keymaster/keymint/TrustyRemotelyProvisionedComponentDevice.cpp
index 5664829..099f189 100644
--- a/trusty/keymaster/keymint/TrustyRemotelyProvisionedComponentDevice.cpp
+++ b/trusty/keymaster/keymint/TrustyRemotelyProvisionedComponentDevice.cpp
@@ -71,9 +71,10 @@
 }  // namespace
 
 ScopedAStatus TrustyRemotelyProvisionedComponentDevice::getHardwareInfo(RpcHardwareInfo* info) {
-    info->versionNumber = 1;
+    info->versionNumber = 2;
     info->rpcAuthorName = "Google";
     info->supportedEekCurve = RpcHardwareInfo::CURVE_25519;
+    info->uniqueId = "Trusty: My password is ******";
     return ScopedAStatus::ok();
 }
 
diff --git a/trusty/keymaster/keymint/android.hardware.security.keymint-service.trusty.xml b/trusty/keymaster/keymint/android.hardware.security.keymint-service.trusty.xml
index 7ca5050..0b995a2 100644
--- a/trusty/keymaster/keymint/android.hardware.security.keymint-service.trusty.xml
+++ b/trusty/keymaster/keymint/android.hardware.security.keymint-service.trusty.xml
@@ -1,6 +1,7 @@
 <manifest version="1.0" type="device">
     <hal format="aidl">
         <name>android.hardware.security.keymint</name>
+        <version>2</version>
         <fqname>IKeyMintDevice/default</fqname>
     </hal>
     <hal format="aidl">
@@ -13,6 +14,7 @@
     </hal>
     <hal format="aidl">
         <name>android.hardware.security.keymint</name>
+        <version>2</version>
         <fqname>IRemotelyProvisionedComponent/default</fqname>
     </hal>
 </manifest>
diff --git a/trusty/test/driver/Android.bp b/trusty/test/driver/Android.bp
new file mode 100644
index 0000000..b813a04
--- /dev/null
+++ b/trusty/test/driver/Android.bp
@@ -0,0 +1,32 @@
+// Copyright (C) 2022 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package {
+    // See: http://go/android-license-faq
+    default_applicable_licenses: ["Android-Apache-2.0"],
+}
+
+python_test {
+    name: "trusty_driver_test",
+    srcs: [
+        "**/*.py",
+    ],
+    test_suites: ["general-tests"],
+    version: {
+        py3: {
+            embedded_launcher: true,
+            enabled: true,
+        },
+    },
+}
diff --git a/trusty/test/driver/trusty_driver_test.py b/trusty/test/driver/trusty_driver_test.py
new file mode 100644
index 0000000..608fd47
--- /dev/null
+++ b/trusty/test/driver/trusty_driver_test.py
@@ -0,0 +1,81 @@
+#!/usr/bin/python
+#
+# Copyright 2022 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Test cases for Trusty Linux Driver."""
+
+import os
+import unittest
+
+def ReadFile(file_path):
+    with open(file_path, 'r') as f:
+        # Strip all trailing spaces, newline and null characters.
+        return f.read().rstrip(' \n\x00')
+
+def WriteFile(file_path, s):
+    with open(file_path, 'w') as f:
+        f.write(s)
+
+def IsTrustySupported():
+    return os.path.exists("/dev/trusty-ipc-dev0")
+
+@unittest.skipIf(not IsTrustySupported(), "Device does not support Trusty")
+class TrustyDriverTest(unittest.TestCase):
+    def testIrqDriverBinding(self):
+        WriteFile("/sys/bus/platform/drivers/trusty-irq/unbind", "trusty:irq")
+        WriteFile("/sys/bus/platform/drivers/trusty-irq/bind", "trusty:irq")
+
+    def testLogDriverBinding(self):
+        WriteFile("/sys/bus/platform/drivers/trusty-log/unbind", "trusty:log")
+        WriteFile("/sys/bus/platform/drivers/trusty-log/bind", "trusty:log")
+
+    @unittest.skip("TODO(b/142275662): virtio remove currently hangs")
+    def testVirtioDriverBinding(self):
+        WriteFile("/sys/bus/platform/drivers/trusty-virtio/unbind",
+                  "trusty:virtio")
+        WriteFile("/sys/bus/platform/drivers/trusty-virtio/bind",
+                  "trusty:virtio")
+
+    @unittest.skip("TODO(b/142275662): virtio remove currently hangs")
+    def testTrustyDriverBinding(self):
+        WriteFile("/sys/bus/platform/drivers/trusty/unbind", "trusty")
+        WriteFile("/sys/bus/platform/drivers/trusty/bind", "trusty")
+
+    def testTrustyDriverVersion(self):
+        ver = ReadFile("/sys/bus/platform/devices/trusty/trusty_version")
+        self.assertTrue(ver.startswith("Project:"))
+
+    def testUntaintedLinux(self):
+        tainted = ReadFile("/proc/sys/kernel/tainted")
+        self.assertEqual(tainted, "0")
+
+    # stdcall test with shared memory buffers.
+    # Each test run takes up to 4 arguments:
+    # <obj_size>,<obj_count=1>,<repeat_share=1>,<repeat_access=3>
+    #
+    # Test single 4K shared memory object.
+    # Test 10 8MB objects, shared twice, each accessed twice. (8MB non-
+    # contiguous object is large enough to need several 4KB messages to
+    # describe)
+    # Test sharing 2 8MB objects 100 times without accessing it.
+    # Test 10 4K shared memory objects, shared 10 times, each accessed
+    # 10 times.
+    def testStdCall(self):
+        test = "/sys/devices/platform/trusty/trusty:test/trusty_test_run"
+        args = "0x1000 0x800000,10,2,2 0x800000,2,100,0 0x1000,10,10,10"
+        WriteFile(test, args)
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/trusty/trusty-test.mk b/trusty/trusty-test.mk
index 74106ec..3a43774 100644
--- a/trusty/trusty-test.mk
+++ b/trusty/trusty-test.mk
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 PRODUCT_PACKAGES += \
-	spiproxyd \
-	trusty_keymaster_set_attestation_key \
 	keymaster_soft_attestation_keys.xml \
-
+	spiproxyd \
+	trusty_driver_test \
+	trusty_keymaster_set_attestation_key \
