libsnapshot: cap merge op count

Set op processing window during snapshot merge from the build. The lower
the merge count, the lower the memory strain during the merge process

Bug: 332255580
Test: verify merge_size propogates to snapuserd daemon
Change-Id: Ic7770115bca963d923a7a276897c5deac30f9faf
diff --git a/fs_mgr/libsnapshot/android/snapshot/snapshot.proto b/fs_mgr/libsnapshot/android/snapshot/snapshot.proto
index 2e948dd..62f9901 100644
--- a/fs_mgr/libsnapshot/android/snapshot/snapshot.proto
+++ b/fs_mgr/libsnapshot/android/snapshot/snapshot.proto
@@ -229,6 +229,10 @@
 
     // Enable direct reads from source device
     bool o_direct = 12;
+
+    // Number of cow operations to be merged at once
+    uint32 cow_op_merge_size = 13;
+
 }
 
 // Next: 10
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h b/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
index 1ec8634..4a3ec1d 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
@@ -831,6 +831,8 @@
     // Check if direct reads are enabled for the source image
     bool UpdateUsesODirect(LockedFile* lock);
 
+    // Get value of maximum cow op merge size
+    uint32_t GetUpdateCowOpMergeSize(LockedFile* lock);
     // Wrapper around libdm, with diagnostics.
     bool DeleteDeviceIfExists(const std::string& name,
                               const std::chrono::milliseconds& timeout_ms = {});
diff --git a/fs_mgr/libsnapshot/snapshot.cpp b/fs_mgr/libsnapshot/snapshot.cpp
index 11aa3f9..265445b 100644
--- a/fs_mgr/libsnapshot/snapshot.cpp
+++ b/fs_mgr/libsnapshot/snapshot.cpp
@@ -1709,6 +1709,10 @@
         if (UpdateUsesODirect(lock.get())) {
             snapuserd_argv->emplace_back("-o_direct");
         }
+        uint cow_op_merge_size = GetUpdateCowOpMergeSize(lock.get());
+        if (cow_op_merge_size != 0) {
+            snapuserd_argv->emplace_back("-cow_op_merge_size=" + std::to_string(cow_op_merge_size));
+        }
     }
 
     size_t num_cows = 0;
@@ -2131,6 +2135,11 @@
     return update_status.o_direct();
 }
 
+uint32_t SnapshotManager::GetUpdateCowOpMergeSize(LockedFile* lock) {
+    SnapshotUpdateStatus update_status = ReadSnapshotUpdateStatus(lock);
+    return update_status.cow_op_merge_size();
+}
+
 bool SnapshotManager::MarkSnapuserdFromSystem() {
     auto path = GetSnapuserdFromSystemPath();
 
@@ -3092,6 +3101,7 @@
         status.set_io_uring_enabled(old_status.io_uring_enabled());
         status.set_legacy_snapuserd(old_status.legacy_snapuserd());
         status.set_o_direct(old_status.o_direct());
+        status.set_cow_op_merge_size(old_status.cow_op_merge_size());
     }
     return WriteSnapshotUpdateStatus(lock, status);
 }
@@ -3474,6 +3484,8 @@
             status.set_legacy_snapuserd(true);
             LOG(INFO) << "Setting legacy_snapuserd to true";
         }
+        status.set_cow_op_merge_size(
+                android::base::GetUintProperty<uint32_t>("ro.virtual_ab.cow_op_merge_size", 0));
     } else if (legacy_compression) {
         LOG(INFO) << "Virtual A/B using legacy snapuserd";
     } else {
@@ -3909,6 +3921,7 @@
     ss << "Using userspace snapshots: " << update_status.userspace_snapshots() << std::endl;
     ss << "Using io_uring: " << update_status.io_uring_enabled() << std::endl;
     ss << "Using o_direct: " << update_status.o_direct() << std::endl;
+    ss << "Cow op merge size (0 for uncapped): " << update_status.cow_op_merge_size() << std::endl;
     ss << "Using XOR compression: " << GetXorCompressionEnabledProperty() << std::endl;
     ss << "Current slot: " << device_->GetSlotSuffix() << std::endl;
     ss << "Boot indicator: booting from " << GetCurrentSlot() << " slot" << std::endl;
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
index 67e9e52..dd2dd56 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
@@ -30,6 +30,7 @@
 DEFINE_bool(user_snapshot, false, "If true, user-space snapshots are used");
 DEFINE_bool(io_uring, false, "If true, io_uring feature is enabled");
 DEFINE_bool(o_direct, false, "If true, enable direct reads on source device");
+DEFINE_int32(cow_op_merge_size, 0, "number of operations to be processed at once");
 
 namespace android {
 namespace snapshot {
@@ -106,7 +107,6 @@
         }
         return user_server_.Run();
     }
-
     for (int i = arg_start; i < argc; i++) {
         auto parts = android::base::Split(argv[i], ",");
 
@@ -114,8 +114,8 @@
             LOG(ERROR) << "Malformed message, expected at least four sub-arguments.";
             return false;
         }
-        auto handler =
-                user_server_.AddHandler(parts[0], parts[1], parts[2], parts[3], FLAGS_o_direct);
+        auto handler = user_server_.AddHandler(parts[0], parts[1], parts[2], parts[3],
+                                               FLAGS_o_direct, FLAGS_cow_op_merge_size);
         if (!handler || !user_server_.StartHandler(parts[0])) {
             return false;
         }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/extractor.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/extractor.cpp
index c85331b..ef4ba93 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/extractor.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/extractor.cpp
@@ -41,7 +41,7 @@
 bool Extractor::Init() {
     auto opener = factory_.CreateTestOpener(control_name_);
     handler_ = std::make_shared<SnapshotHandler>(control_name_, cow_path_, base_path_, base_path_,
-                                                 opener, 1, false, false, false);
+                                                 opener, 1, false, false, false, 0);
     if (!handler_->InitCowDevice()) {
         return false;
     }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
index ea11f0e..fdd9cce 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.cpp
@@ -53,10 +53,10 @@
         const std::string& misc_name, const std::string& cow_device_path,
         const std::string& backing_device, const std::string& base_path_merge,
         std::shared_ptr<IBlockServerOpener> opener, int num_worker_threads, bool use_iouring,
-        bool o_direct) {
+        bool o_direct, uint32_t cow_op_merge_size) {
     auto snapuserd = std::make_shared<SnapshotHandler>(
             misc_name, cow_device_path, backing_device, base_path_merge, opener, num_worker_threads,
-            use_iouring, perform_verification_, o_direct);
+            use_iouring, perform_verification_, o_direct, cow_op_merge_size);
     if (!snapuserd->InitCowDevice()) {
         LOG(ERROR) << "Failed to initialize Snapuserd";
         return nullptr;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
index f23f07e..ecf5d5c 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/handler_manager.h
@@ -52,13 +52,11 @@
     virtual ~ISnapshotHandlerManager() {}
 
     // Add a new snapshot handler but do not start serving requests yet.
-    virtual std::shared_ptr<HandlerThread> AddHandler(const std::string& misc_name,
-                                                      const std::string& cow_device_path,
-                                                      const std::string& backing_device,
-                                                      const std::string& base_path_merge,
-                                                      std::shared_ptr<IBlockServerOpener> opener,
-                                                      int num_worker_threads, bool use_iouring,
-                                                      bool o_direct) = 0;
+    virtual std::shared_ptr<HandlerThread> AddHandler(
+            const std::string& misc_name, const std::string& cow_device_path,
+            const std::string& backing_device, const std::string& base_path_merge,
+            std::shared_ptr<IBlockServerOpener> opener, int num_worker_threads, bool use_iouring,
+            bool o_direct, uint32_t cow_op_merge_size) = 0;
 
     // Start serving requests on a snapshot handler.
     virtual bool StartHandler(const std::string& misc_name) = 0;
@@ -98,7 +96,7 @@
                                               const std::string& base_path_merge,
                                               std::shared_ptr<IBlockServerOpener> opener,
                                               int num_worker_threads, bool use_iouring,
-                                              bool o_direct) override;
+                                              bool o_direct, uint32_t cow_op_merge_size) override;
     bool StartHandler(const std::string& misc_name) override;
     bool DeleteHandler(const std::string& misc_name) override;
     bool InitiateMerge(const std::string& misc_name) override;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
index e6a9a29..e2c5874 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.cpp
@@ -32,12 +32,18 @@
 
 MergeWorker::MergeWorker(const std::string& cow_device, const std::string& misc_name,
                          const std::string& base_path_merge,
-                         std::shared_ptr<SnapshotHandler> snapuserd)
-    : Worker(cow_device, misc_name, base_path_merge, snapuserd) {}
+                         std::shared_ptr<SnapshotHandler> snapuserd, uint32_t cow_op_merge_size)
+    : Worker(cow_device, misc_name, base_path_merge, snapuserd),
+      cow_op_merge_size_(cow_op_merge_size) {}
 
 int MergeWorker::PrepareMerge(uint64_t* source_offset, int* pending_ops,
                               std::vector<const CowOperation*>* replace_zero_vec) {
     int num_ops = *pending_ops;
+    // 0 indicates ro.virtual_ab.cow_op_merge_size was not set in the build
+    if (cow_op_merge_size_ != 0) {
+        num_ops = std::min(cow_op_merge_size_, static_cast<uint32_t>(*pending_ops));
+    }
+
     int nr_consecutive = 0;
     bool checkOrderedOp = (replace_zero_vec == nullptr);
     size_t num_blocks = 1;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.h
index 478d4c8..a19352d 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/merge_worker.h
@@ -23,7 +23,8 @@
 class MergeWorker : public Worker {
   public:
     MergeWorker(const std::string& cow_device, const std::string& misc_name,
-                const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
+                const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd,
+                uint32_t cow_op_merge_size);
     bool Run();
 
   private:
@@ -53,6 +54,7 @@
     // syscalls and fallback to synchronous I/O, we
     // don't want huge queue depth
     int queue_depth_ = 8;
+    uint32_t cow_op_merge_size_ = 0;
 };
 
 }  // namespace snapshot
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index 05ba047..7c9a64e 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -36,7 +36,8 @@
 SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,
                                  std::string backing_device, std::string base_path_merge,
                                  std::shared_ptr<IBlockServerOpener> opener, int num_worker_threads,
-                                 bool use_iouring, bool perform_verification, bool o_direct) {
+                                 bool use_iouring, bool perform_verification, bool o_direct,
+                                 uint32_t cow_op_merge_size) {
     misc_name_ = std::move(misc_name);
     cow_device_ = std::move(cow_device);
     backing_store_device_ = std::move(backing_device);
@@ -46,6 +47,7 @@
     is_io_uring_enabled_ = use_iouring;
     perform_verification_ = perform_verification;
     o_direct_ = o_direct;
+    cow_op_merge_size_ = cow_op_merge_size;
 }
 
 bool SnapshotHandler::InitializeWorkers() {
@@ -60,12 +62,11 @@
 
         worker_threads_.push_back(std::move(wt));
     }
-
     merge_thread_ = std::make_unique<MergeWorker>(cow_device_, misc_name_, base_path_merge_,
-                                                  GetSharedPtr());
+                                                  GetSharedPtr(), cow_op_merge_size_);
 
     read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
-                                                     GetSharedPtr());
+                                                     GetSharedPtr(), cow_op_merge_size_);
 
     update_verify_ = std::make_unique<UpdateVerify>(misc_name_);
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index 9b7238a..c7de995 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -104,7 +104,8 @@
   public:
     SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device,
                     std::string base_path_merge, std::shared_ptr<IBlockServerOpener> opener,
-                    int num_workers, bool use_iouring, bool perform_verification, bool o_direct);
+                    int num_workers, bool use_iouring, bool perform_verification, bool o_direct,
+                    uint32_t cow_op_merge_size);
     bool InitCowDevice();
     bool Start();
 
@@ -247,7 +248,7 @@
     bool resume_merge_ = false;
     bool merge_complete_ = false;
     bool o_direct_ = false;
-
+    uint32_t cow_op_merge_size_ = 0;
     std::unique_ptr<UpdateVerify> update_verify_;
     std::shared_ptr<IBlockServerOpener> block_server_opener_;
 };
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
index 2baf20d..6b1ed0c 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -18,6 +18,7 @@
 
 #include <pthread.h>
 
+#include "android-base/properties.h"
 #include "snapuserd_core.h"
 #include "utility.h"
 
@@ -29,11 +30,13 @@
 using android::base::unique_fd;
 
 ReadAhead::ReadAhead(const std::string& cow_device, const std::string& backing_device,
-                     const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd) {
+                     const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd,
+                     uint32_t cow_op_merge_size) {
     cow_device_ = cow_device;
     backing_store_device_ = backing_device;
     misc_name_ = misc_name;
     snapuserd_ = snapuserd;
+    cow_op_merge_size_ = cow_op_merge_size;
 }
 
 void ReadAhead::CheckOverlap(const CowOperation* cow_op) {
@@ -62,8 +65,11 @@
                                     std::vector<uint64_t>& blocks,
                                     std::vector<const CowOperation*>& xor_op_vec) {
     int num_ops = *pending_ops;
-    int nr_consecutive = 0;
+    if (cow_op_merge_size_ != 0) {
+        num_ops = std::min(static_cast<int>(cow_op_merge_size_), *pending_ops);
+    }
 
+    int nr_consecutive = 0;
     bool is_ops_present = (!RAIterDone() && num_ops);
 
     if (!is_ops_present) {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.h
index d3ba126..4885c96 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.h
@@ -35,7 +35,8 @@
 class ReadAhead {
   public:
     ReadAhead(const std::string& cow_device, const std::string& backing_device,
-              const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd);
+              const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd,
+              uint32_t cow_op_merge_size);
     bool RunThread();
 
   private:
@@ -106,6 +107,7 @@
     // syscalls and fallback to synchronous I/O, we
     // don't want huge queue depth
     int queue_depth_ = 8;
+    uint32_t cow_op_merge_size_;
     std::unique_ptr<struct io_uring> ring_;
 };
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
index 0b881b6..c0af5c5 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
@@ -347,7 +347,8 @@
                                                               const std::string& cow_device_path,
                                                               const std::string& backing_device,
                                                               const std::string& base_path_merge,
-                                                              const bool o_direct) {
+                                                              const bool o_direct,
+                                                              uint32_t cow_op_merge_size) {
     // We will need multiple worker threads only during
     // device boot after OTA. For all other purposes,
     // one thread is sufficient. We don't want to consume
@@ -369,7 +370,8 @@
     auto opener = block_server_factory_->CreateOpener(misc_name);
 
     return handlers_->AddHandler(misc_name, cow_device_path, backing_device, base_path_merge,
-                                 opener, num_worker_threads, io_uring_enabled_, o_direct);
+                                 opener, num_worker_threads, io_uring_enabled_, o_direct,
+                                 cow_op_merge_size);
 }
 
 bool UserSnapshotServer::WaitForSocket() {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
index d9cf97f..ceea36a 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
@@ -87,7 +87,8 @@
                                               const std::string& cow_device_path,
                                               const std::string& backing_device,
                                               const std::string& base_path_merge,
-                                              bool o_direct = false);
+                                              bool o_direct = false,
+                                              uint32_t cow_op_merge_size = 0);
     bool StartHandler(const std::string& misc_name);
 
     void SetTerminating() { terminating_ = true; }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
index 6d0ae3d..9042f2b 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_test.cpp
@@ -67,6 +67,7 @@
     std::string compression;
     int block_size;
     int num_threads;
+    uint32_t cow_op_merge_size;
 };
 
 class SnapuserdTestBase : public ::testing::TestWithParam<TestParam> {
@@ -708,9 +709,9 @@
     auto opener = factory->CreateOpener(system_device_ctrl_name_);
     handlers_->DisableVerification();
     const TestParam params = GetParam();
-    auto handler = handlers_->AddHandler(system_device_ctrl_name_, cow_system_->path,
-                                         base_dev_->GetPath(), base_dev_->GetPath(), opener, 1,
-                                         params.io_uring, params.o_direct);
+    auto handler = handlers_->AddHandler(
+            system_device_ctrl_name_, cow_system_->path, base_dev_->GetPath(), base_dev_->GetPath(),
+            opener, 1, params.io_uring, params.o_direct, params.cow_op_merge_size);
     ASSERT_NE(handler, nullptr);
     ASSERT_NE(handler->snapuserd(), nullptr);
 #ifdef __ANDROID__
@@ -1227,9 +1228,9 @@
     ASSERT_NE(opener_, nullptr);
 
     const TestParam params = GetParam();
-    handler_ = std::make_shared<SnapshotHandler>(system_device_ctrl_name_, cow_system_->path,
-                                                 base_dev_->GetPath(), base_dev_->GetPath(),
-                                                 opener_, 1, false, false, params.o_direct);
+    handler_ = std::make_shared<SnapshotHandler>(
+            system_device_ctrl_name_, cow_system_->path, base_dev_->GetPath(), base_dev_->GetPath(),
+            opener_, 1, false, false, params.o_direct, params.cow_op_merge_size);
     ASSERT_TRUE(handler_->InitCowDevice());
     ASSERT_TRUE(handler_->InitializeWorkers());
 
@@ -1507,6 +1508,7 @@
                     param.num_threads = thread;
                     param.io_uring = io_uring;
                     param.o_direct = false;
+                    param.cow_op_merge_size = 0;
                     testParams.push_back(std::move(param));
                 }
             }