Allow direct reads on source device

Allow O_DIRECT reads on source block device.
This will further cut down the Active and Inactive file pages
during partition verification.

On Pixel 6 after incremental OTA - Post OTA reboot:

		Without patch      With patch     Delta
--------------------------------------------------------
Inactive(File):  4992MB             3887MB         ~22%
Active(File):    1465MB             1014MB         ~30%

Boot time however increases from 25 to 30 seconds.

This is not yet enabled. This will be behind a sysprop flag
or for low memory devices and will be enabled later.

Additionally, set the priority of worker threads to normal.
Merge threads priority is reduced. This will help low memory
devices as tested on Pixel watch.

Bug: 311233916
Test: OTA on Pixel 6
Change-Id: Icacdef08d68e28d3062611477703e7cf393a9f10
Signed-off-by: Akilesh Kailash <akailash@google.com>
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/extractor.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/extractor.cpp
index c5718d5..c85331b 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/extractor.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/extractor.cpp
@@ -41,7 +41,7 @@
 bool Extractor::Init() {
     auto opener = factory_.CreateTestOpener(control_name_);
     handler_ = std::make_shared<SnapshotHandler>(control_name_, cow_path_, base_path_, base_path_,
-                                                 opener, 1, false, false);
+                                                 opener, 1, false, false, false);
     if (!handler_->InitCowDevice()) {
         return false;
     }
@@ -50,7 +50,7 @@
     }
 
     read_worker_ = std::make_unique<ReadWorker>(cow_path_, base_path_, control_name_, base_path_,
-                                                handler_->GetSharedPtr(), opener);
+                                                handler_->GetSharedPtr(), opener, false);
     if (!read_worker_->Init()) {
         return false;
     }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
index ffd7a4b..711e704 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
@@ -51,10 +51,11 @@
 std::shared_ptr<HandlerThread> SnapshotHandlerManager::AddHandler(
         const std::string& misc_name, const std::string& cow_device_path,
         const std::string& backing_device, const std::string& base_path_merge,
-        std::shared_ptr<IBlockServerOpener> opener, int num_worker_threads, bool use_iouring) {
-    auto snapuserd = std::make_shared<SnapshotHandler>(misc_name, cow_device_path, backing_device,
-                                                       base_path_merge, opener, num_worker_threads,
-                                                       use_iouring, perform_verification_);
+        std::shared_ptr<IBlockServerOpener> opener, int num_worker_threads, bool use_iouring,
+        bool o_direct) {
+    auto snapuserd = std::make_shared<SnapshotHandler>(
+            misc_name, cow_device_path, backing_device, base_path_merge, opener, num_worker_threads,
+            use_iouring, perform_verification_, o_direct);
     if (!snapuserd->InitCowDevice()) {
         LOG(ERROR) << "Failed to initialize Snapuserd";
         return nullptr;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
index ff6ee8f..f23f07e 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
@@ -57,7 +57,8 @@
                                                       const std::string& backing_device,
                                                       const std::string& base_path_merge,
                                                       std::shared_ptr<IBlockServerOpener> opener,
-                                                      int num_worker_threads, bool use_iouring) = 0;
+                                                      int num_worker_threads, bool use_iouring,
+                                                      bool o_direct) = 0;
 
     // Start serving requests on a snapshot handler.
     virtual bool StartHandler(const std::string& misc_name) = 0;
@@ -96,7 +97,8 @@
                                               const std::string& backing_device,
                                               const std::string& base_path_merge,
                                               std::shared_ptr<IBlockServerOpener> opener,
-                                              int num_worker_threads, bool use_iouring) override;
+                                              int num_worker_threads, bool use_iouring,
+                                              bool o_direct) override;
     bool StartHandler(const std::string& misc_name) override;
     bool DeleteHandler(const std::string& misc_name) override;
     bool InitiateMerge(const std::string& misc_name) override;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
index 3a56669..bcf9aab 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
@@ -557,7 +557,7 @@
         return true;
     }
 
-    if (!SetThreadPriority(kNiceValueForMergeThreads)) {
+    if (!SetThreadPriority(ANDROID_PRIORITY_BACKGROUND)) {
         SNAP_PLOG(ERROR) << "Failed to set thread priority";
     }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
index 431baf0..f1d4065 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.cpp
@@ -31,16 +31,19 @@
 void ReadWorker::CloseFds() {
     block_server_ = {};
     backing_store_fd_ = {};
+    backing_store_direct_fd_ = {};
     Worker::CloseFds();
 }
 
 ReadWorker::ReadWorker(const std::string& cow_device, const std::string& backing_device,
                        const std::string& misc_name, const std::string& base_path_merge,
                        std::shared_ptr<SnapshotHandler> snapuserd,
-                       std::shared_ptr<IBlockServerOpener> opener)
+                       std::shared_ptr<IBlockServerOpener> opener, bool direct_read)
     : Worker(cow_device, misc_name, base_path_merge, snapuserd),
       backing_store_device_(backing_device),
-      block_server_opener_(opener) {}
+      direct_read_(direct_read),
+      block_server_opener_(opener),
+      aligned_buffer_(std::unique_ptr<void, decltype(&::free)>(nullptr, &::free)) {}
 
 // Start the replace operation. This will read the
 // internal COW format and if the block is compressed,
@@ -61,6 +64,17 @@
     }
     SNAP_LOG(DEBUG) << " ReadFromBaseDevice...: new-block: " << cow_op->new_block
                     << " Op: " << *cow_op;
+
+    if (direct_read_ && IsBlockAligned(offset)) {
+        if (!android::base::ReadFullyAtOffset(backing_store_direct_fd_, aligned_buffer_.get(),
+                                              BLOCK_SZ, offset)) {
+            SNAP_PLOG(ERROR) << "O_DIRECT Read failed at offset: " << offset;
+            return false;
+        }
+        std::memcpy(buffer, aligned_buffer_.get(), BLOCK_SZ);
+        return true;
+    }
+
     if (!android::base::ReadFullyAtOffset(backing_store_fd_, buffer, BLOCK_SZ, offset)) {
         std::string op;
         if (cow_op->type() == kCowCopyOp)
@@ -201,6 +215,24 @@
         return false;
     }
 
+    if (direct_read_) {
+        backing_store_direct_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY | O_DIRECT));
+        if (backing_store_direct_fd_ < 0) {
+            SNAP_PLOG(ERROR) << "Open Failed with O_DIRECT: " << backing_store_direct_fd_;
+            direct_read_ = false;
+        } else {
+            void* aligned_addr;
+            ssize_t page_size = getpagesize();
+            if (posix_memalign(&aligned_addr, page_size, page_size) < 0) {
+                direct_read_ = false;
+                SNAP_PLOG(ERROR) << "posix_memalign failed "
+                                 << " page_size: " << page_size << " read_sz: " << page_size;
+            } else {
+                aligned_buffer_.reset(aligned_addr);
+            }
+        }
+    }
+
     block_server_ = block_server_opener_->Open(this, PAYLOAD_BUFFER_SZ);
     if (!block_server_) {
         SNAP_PLOG(ERROR) << "Unable to open block server";
@@ -214,7 +246,7 @@
 
     pthread_setname_np(pthread_self(), "ReadWorker");
 
-    if (!SetThreadPriority(kNiceValueForMergeThreads)) {
+    if (!SetThreadPriority(ANDROID_PRIORITY_NORMAL)) {
         SNAP_PLOG(ERROR) << "Failed to set thread priority";
     }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
index 6dbae81..1aff50c 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/read_worker.h
@@ -28,7 +28,7 @@
     ReadWorker(const std::string& cow_device, const std::string& backing_device,
                const std::string& misc_name, const std::string& base_path_merge,
                std::shared_ptr<SnapshotHandler> snapuserd,
-               std::shared_ptr<IBlockServerOpener> opener);
+               std::shared_ptr<IBlockServerOpener> opener, bool direct_read = false);
 
     bool Run();
     bool Init() override;
@@ -59,11 +59,14 @@
 
     std::string backing_store_device_;
     unique_fd backing_store_fd_;
+    unique_fd backing_store_direct_fd_;
+    bool direct_read_ = false;
 
     std::shared_ptr<IBlockServerOpener> block_server_opener_;
     std::unique_ptr<IBlockServer> block_server_;
 
     std::basic_string<uint8_t> xor_buffer_;
+    std::unique_ptr<void, decltype(&::free)> aligned_buffer_;
 };
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index 9b8c70d..05ba047 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -36,7 +36,7 @@
 SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,
                                  std::string backing_device, std::string base_path_merge,
                                  std::shared_ptr<IBlockServerOpener> opener, int num_worker_threads,
-                                 bool use_iouring, bool perform_verification) {
+                                 bool use_iouring, bool perform_verification, bool o_direct) {
     misc_name_ = std::move(misc_name);
     cow_device_ = std::move(cow_device);
     backing_store_device_ = std::move(backing_device);
@@ -45,13 +45,14 @@
     num_worker_threads_ = num_worker_threads;
     is_io_uring_enabled_ = use_iouring;
     perform_verification_ = perform_verification;
+    o_direct_ = o_direct;
 }
 
 bool SnapshotHandler::InitializeWorkers() {
     for (int i = 0; i < num_worker_threads_; i++) {
         auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, misc_name_,
                                                base_path_merge_, GetSharedPtr(),
-                                               block_server_opener_);
+                                               block_server_opener_, o_direct_);
         if (!wt->Init()) {
             SNAP_LOG(ERROR) << "Thread initialization failed";
             return false;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index fa1e7a0..9b7238a 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -47,6 +47,7 @@
 #include <snapuserd/snapuserd_buffer.h>
 #include <snapuserd/snapuserd_kernel.h>
 #include <storage_literals/storage_literals.h>
+#include <system/thread_defs.h>
 #include "snapuserd_readahead.h"
 #include "snapuserd_verify.h"
 
@@ -62,8 +63,6 @@
 
 static constexpr int kNumWorkerThreads = 4;
 
-static constexpr int kNiceValueForMergeThreads = -5;
-
 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
 
@@ -105,7 +104,7 @@
   public:
     SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device,
                     std::string base_path_merge, std::shared_ptr<IBlockServerOpener> opener,
-                    int num_workers, bool use_iouring, bool perform_verification);
+                    int num_workers, bool use_iouring, bool perform_verification, bool o_direct);
     bool InitCowDevice();
     bool Start();
 
@@ -247,6 +246,7 @@
     bool perform_verification_ = true;
     bool resume_merge_ = false;
     bool merge_complete_ = false;
+    bool o_direct_ = false;
 
     std::unique_ptr<UpdateVerify> update_verify_;
     std::shared_ptr<IBlockServerOpener> block_server_opener_;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
index 034cda1..c08c1b1 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -778,7 +778,7 @@
 
     InitializeIouring();
 
-    if (!SetThreadPriority(kNiceValueForMergeThreads)) {
+    if (!SetThreadPriority(ANDROID_PRIORITY_BACKGROUND)) {
         SNAP_PLOG(ERROR) << "Failed to set thread priority";
     }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
index 6eee357..0b881b6 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
@@ -346,7 +346,8 @@
 std::shared_ptr<HandlerThread> UserSnapshotServer::AddHandler(const std::string& misc_name,
                                                               const std::string& cow_device_path,
                                                               const std::string& backing_device,
-                                                              const std::string& base_path_merge) {
+                                                              const std::string& base_path_merge,
+                                                              const bool o_direct) {
     // We will need multiple worker threads only during
     // device boot after OTA. For all other purposes,
     // one thread is sufficient. We don't want to consume
@@ -368,7 +369,7 @@
     auto opener = block_server_factory_->CreateOpener(misc_name);
 
     return handlers_->AddHandler(misc_name, cow_device_path, backing_device, base_path_merge,
-                                 opener, num_worker_threads, io_uring_enabled_);
+                                 opener, num_worker_threads, io_uring_enabled_, o_direct);
 }
 
 bool UserSnapshotServer::WaitForSocket() {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
index 9926071..3013c47 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
@@ -86,7 +86,8 @@
     std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
                                               const std::string& cow_device_path,
                                               const std::string& backing_device,
-                                              const std::string& base_path_merge);
+                                              const std::string& base_path_merge,
+                                              bool o_direct = false);
     bool StartHandler(const std::string& misc_name);
 
     void SetTerminating() { terminating_ = true; }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
index 73c3cbf..8ddb0f4 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
@@ -59,10 +59,16 @@
 using testing::AssertionFailure;
 using testing::AssertionResult;
 using testing::AssertionSuccess;
+using ::testing::TestWithParam;
 
-class SnapuserdTestBase : public ::testing::TestWithParam<bool> {
+struct TestParam {
+    bool io_uring;
+    bool o_direct;
+};
+
+class SnapuserdTestBase : public ::testing::TestWithParam<TestParam> {
   protected:
-    void SetUp() override;
+    virtual void SetUp() override;
     void TearDown() override;
     void CreateBaseDevice();
     void CreateCowDevice();
@@ -628,9 +634,10 @@
     auto factory = harness_->GetBlockServerFactory();
     auto opener = factory->CreateOpener(system_device_ctrl_name_);
     handlers_->DisableVerification();
-    auto handler =
-            handlers_->AddHandler(system_device_ctrl_name_, cow_system_->path, base_dev_->GetPath(),
-                                  base_dev_->GetPath(), opener, 1, GetParam());
+    const TestParam params = GetParam();
+    auto handler = handlers_->AddHandler(system_device_ctrl_name_, cow_system_->path,
+                                         base_dev_->GetPath(), base_dev_->GetPath(), opener, 1,
+                                         params.io_uring, params.o_direct);
     ASSERT_NE(handler, nullptr);
     ASSERT_NE(handler->snapuserd(), nullptr);
 #ifdef __ANDROID__
@@ -898,9 +905,10 @@
     opener_ = factory_.CreateTestOpener(system_device_ctrl_name_);
     ASSERT_NE(opener_, nullptr);
 
+    const TestParam params = GetParam();
     handler_ = std::make_shared<SnapshotHandler>(system_device_ctrl_name_, cow_system_->path,
                                                  base_dev_->GetPath(), base_dev_->GetPath(),
-                                                 opener_, 1, false, false);
+                                                 opener_, 1, false, false, params.o_direct);
     ASSERT_TRUE(handler_->InitCowDevice());
     ASSERT_TRUE(handler_->InitializeWorkers());
 
@@ -990,14 +998,28 @@
     return {false, true};
 }
 
-std::string IoUringConfigName(const testing::TestParamInfo<SnapuserdTest::ParamType>& info) {
-    return info.param ? "io_uring" : "sync";
+std::vector<TestParam> GetTestConfigs() {
+    std::vector<TestParam> testParams;
+    std::vector<bool> uring_configs = GetIoUringConfigs();
+
+    for (bool config : uring_configs) {
+        TestParam param;
+        param.io_uring = config;
+        param.o_direct = false;
+        testParams.push_back(std::move(param));
+    }
+
+    for (bool config : uring_configs) {
+        TestParam param;
+        param.io_uring = config;
+        param.o_direct = true;
+        testParams.push_back(std::move(param));
+    }
+    return testParams;
 }
 
-INSTANTIATE_TEST_SUITE_P(Io, SnapuserdTest, ::testing::ValuesIn(GetIoUringConfigs()),
-                         IoUringConfigName);
-INSTANTIATE_TEST_SUITE_P(Io, HandlerTest, ::testing::ValuesIn(GetIoUringConfigs()),
-                         IoUringConfigName);
+INSTANTIATE_TEST_SUITE_P(Io, SnapuserdTest, ::testing::ValuesIn(GetTestConfigs()));
+INSTANTIATE_TEST_SUITE_P(Io, HandlerTest, ::testing::ValuesIn(GetTestConfigs()));
 
 }  // namespace snapshot
 }  // namespace android