libsnapshot: configure num worker threads

Forward number of worker threads build configuration to snapuserd. these
are the threads that will be used to serve i/o requests to dm-user.

Bug: 361438985
Test: OTA with print logs post OTA reboot
Change-Id: Ib70a3cb8766b96232ea6f97effece041534dc922
diff --git a/fs_mgr/libsnapshot/android/snapshot/snapshot.proto b/fs_mgr/libsnapshot/android/snapshot/snapshot.proto
index 62f9901..5fb71a3 100644
--- a/fs_mgr/libsnapshot/android/snapshot/snapshot.proto
+++ b/fs_mgr/libsnapshot/android/snapshot/snapshot.proto
@@ -233,6 +233,8 @@
     // Number of cow operations to be merged at once
     uint32 cow_op_merge_size = 13;
 
+    // Number of worker threads to serve I/O from dm-user
+    uint32 num_worker_threads = 14;
 }
 
 // Next: 10
diff --git a/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h b/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
index 7ae55db..8ff41db 100644
--- a/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
+++ b/fs_mgr/libsnapshot/include/libsnapshot/snapshot.h
@@ -838,6 +838,10 @@
 
     // Get value of maximum cow op merge size
     uint32_t GetUpdateCowOpMergeSize(LockedFile* lock);
+
+    // Get number of threads to perform post OTA boot verification
+    uint32_t GetUpdateWorkerCount(LockedFile* lock);
+
     // Wrapper around libdm, with diagnostics.
     bool DeleteDeviceIfExists(const std::string& name,
                               const std::chrono::milliseconds& timeout_ms = {});
diff --git a/fs_mgr/libsnapshot/snapshot.cpp b/fs_mgr/libsnapshot/snapshot.cpp
index 6c3bedd..05dec68 100644
--- a/fs_mgr/libsnapshot/snapshot.cpp
+++ b/fs_mgr/libsnapshot/snapshot.cpp
@@ -1235,8 +1235,8 @@
                 wrong_phase = true;
                 break;
             default:
-                LOG(ERROR) << "Unknown merge status for \"" << snapshot << "\": "
-                           << "\"" << result.state << "\"";
+                LOG(ERROR) << "Unknown merge status for \"" << snapshot << "\": " << "\""
+                           << result.state << "\"";
                 if (failure_code == MergeFailureCode::Ok) {
                     failure_code = MergeFailureCode::UnexpectedMergeState;
                 }
@@ -1725,6 +1725,10 @@
         if (cow_op_merge_size != 0) {
             snapuserd_argv->emplace_back("-cow_op_merge_size=" + std::to_string(cow_op_merge_size));
         }
+        uint32_t worker_count = GetUpdateWorkerCount(lock.get());
+        if (worker_count != 0) {
+            snapuserd_argv->emplace_back("-worker_count=" + std::to_string(worker_count));
+        }
     }
 
     size_t num_cows = 0;
@@ -2152,6 +2156,11 @@
     return update_status.cow_op_merge_size();
 }
 
+uint32_t SnapshotManager::GetUpdateWorkerCount(LockedFile* lock) {
+    SnapshotUpdateStatus update_status = ReadSnapshotUpdateStatus(lock);
+    return update_status.num_worker_threads();
+}
+
 bool SnapshotManager::MarkSnapuserdFromSystem() {
     auto path = GetSnapuserdFromSystemPath();
 
@@ -3140,6 +3149,7 @@
         status.set_legacy_snapuserd(old_status.legacy_snapuserd());
         status.set_o_direct(old_status.o_direct());
         status.set_cow_op_merge_size(old_status.cow_op_merge_size());
+        status.set_num_worker_threads(old_status.num_worker_threads());
     }
     return WriteSnapshotUpdateStatus(lock, status);
 }
@@ -3524,6 +3534,9 @@
         }
         status.set_cow_op_merge_size(
                 android::base::GetUintProperty<uint32_t>("ro.virtual_ab.cow_op_merge_size", 0));
+        status.set_num_worker_threads(
+                android::base::GetUintProperty<uint32_t>("ro.virtual_ab.num_worker_threads", 0));
+
     } else if (legacy_compression) {
         LOG(INFO) << "Virtual A/B using legacy snapuserd";
     } else {
@@ -3960,6 +3973,7 @@
     ss << "Using io_uring: " << update_status.io_uring_enabled() << std::endl;
     ss << "Using o_direct: " << update_status.o_direct() << std::endl;
     ss << "Cow op merge size (0 for uncapped): " << update_status.cow_op_merge_size() << std::endl;
+    ss << "Worker thread count: " << update_status.num_worker_threads() << std::endl;
     ss << "Using XOR compression: " << GetXorCompressionEnabledProperty() << std::endl;
     ss << "Current slot: " << device_->GetSlotSuffix() << std::endl;
     ss << "Boot indicator: booting from " << GetCurrentSlot() << " slot" << std::endl;
@@ -4576,8 +4590,7 @@
         }
     }
 
-    LOG(ERROR) << "Device-mapper device " << name << "(" << full_path << ")"
-               << " still in use."
+    LOG(ERROR) << "Device-mapper device " << name << "(" << full_path << ")" << " still in use."
                << "  Probably a file descriptor was leaked or held open, or a loop device is"
                << " attached.";
     return false;
diff --git a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
index dd2dd56..32e16cc 100644
--- a/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/snapuserd_daemon.cpp
@@ -31,6 +31,7 @@
 DEFINE_bool(io_uring, false, "If true, io_uring feature is enabled");
 DEFINE_bool(o_direct, false, "If true, enable direct reads on source device");
 DEFINE_int32(cow_op_merge_size, 0, "number of operations to be processed at once");
+DEFINE_int32(worker_count, 4, "number of worker threads used to serve I/O requests to dm-user");
 
 namespace android {
 namespace snapshot {
@@ -114,8 +115,9 @@
             LOG(ERROR) << "Malformed message, expected at least four sub-arguments.";
             return false;
         }
-        auto handler = user_server_.AddHandler(parts[0], parts[1], parts[2], parts[3],
-                                               FLAGS_o_direct, FLAGS_cow_op_merge_size);
+        auto handler =
+                user_server_.AddHandler(parts[0], parts[1], parts[2], parts[3], FLAGS_worker_count,
+                                        FLAGS_o_direct, FLAGS_cow_op_merge_size);
         if (!handler || !user_server_.StartHandler(parts[0])) {
             return false;
         }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
index 013df35..3bb8a30 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
@@ -35,6 +35,7 @@
 #include <snapuserd/dm_user_block_server.h>
 #include <snapuserd/snapuserd_client.h>
 #include "snapuserd_server.h"
+#include "user-space-merge/snapuserd_core.h"
 
 namespace android {
 namespace snapshot {
@@ -125,7 +126,7 @@
             return Sendmsg(fd, "fail");
         }
 
-        auto handler = AddHandler(out[1], out[2], out[3], out[4]);
+        auto handler = AddHandler(out[1], out[2], out[3], out[4], std::nullopt);
         if (!handler) {
             return Sendmsg(fd, "fail");
         }
@@ -341,12 +342,11 @@
     SetTerminating();
 }
 
-std::shared_ptr<HandlerThread> UserSnapshotServer::AddHandler(const std::string& misc_name,
-                                                              const std::string& cow_device_path,
-                                                              const std::string& backing_device,
-                                                              const std::string& base_path_merge,
-                                                              const bool o_direct,
-                                                              uint32_t cow_op_merge_size) {
+std::shared_ptr<HandlerThread> UserSnapshotServer::AddHandler(
+        const std::string& misc_name, const std::string& cow_device_path,
+        const std::string& backing_device, const std::string& base_path_merge,
+        std::optional<uint32_t> num_worker_threads, const bool o_direct,
+        uint32_t cow_op_merge_size) {
     // We will need multiple worker threads only during
     // device boot after OTA. For all other purposes,
     // one thread is sufficient. We don't want to consume
@@ -355,7 +355,9 @@
     //
     // During boot up, we need multiple threads primarily for
     // update-verification.
-    int num_worker_threads = kNumWorkerThreads;
+    if (!num_worker_threads.has_value()) {
+        num_worker_threads = kNumWorkerThreads;
+    }
     if (is_socket_present_) {
         num_worker_threads = 1;
     }
@@ -368,7 +370,7 @@
     auto opener = block_server_factory_->CreateOpener(misc_name);
 
     return handlers_->AddHandler(misc_name, cow_device_path, backing_device, base_path_merge,
-                                 opener, num_worker_threads, io_uring_enabled_, o_direct,
+                                 opener, num_worker_threads.value(), io_uring_enabled_, o_direct,
                                  cow_op_merge_size);
 }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
index ceea36a..f002e8d 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
@@ -87,6 +87,7 @@
                                               const std::string& cow_device_path,
                                               const std::string& backing_device,
                                               const std::string& base_path_merge,
+                                              std::optional<uint32_t> num_worker_threads,
                                               bool o_direct = false,
                                               uint32_t cow_op_merge_size = 0);
     bool StartHandler(const std::string& misc_name);