Tune snapshot-merge performance

Currently, there is one thread per partition
for snapshot merge. When all these threads are
run in parallel, this may stress the system
as the merge threads are both CPU and I/O bound.

Allow only two merge threads to be in-flight
at any point in time. This will ensure that there
is forward progress done with respect to snapshot-merge
and only two cores are used as against using
5-6 cores.

Additionally, system and prodcut partitions are merged
first. This is primarily because /root is mounted
of system partition and faster the merge completes
on /system partition, we can switch the dm tables
immediately. There is no change in the merge phase
from libsnapshot perspective. This prioritization
is based on each merge phase. If the system partition
merge is in second phase, then it takes priority
in that phase.

As a side benefit, this should also
reduce the memory usage when merge is in-flight
given that we now limit the threads.

There is slight delay in overall merge time as
we now throttle the merge.

No boot time regressions observed.

Full OTA:

Merge time (Without this patch): 42 seconds
Merge time (With this patch): 46 seconds

Incremental OTA:

Merge time (Without this patch): 52 seconds
Merge time (With this patch): 57 seconds

system partition merge completes in the first ~12-16 seconds.

App-launch (COLD) on Pixel:

Baseline (After snapshot-merge is completed when there is no daemon):
==========================

Chrome: 250
youtube: 631
camera: 230

==========================

Without this patch when snapshot-merge is in-progress (in ms):

Full - OTA

Chrome: 1729
youtube: 3126
camera: 1525

==========================

With this patch when snapshot-merge is in-progress (in ms):


Full - OTA

Chrome:  1061
youtube: 820
camera: 1378

Incremental - OTA (350M)

Chrome: 495
youtube: 1442
camera: 641
=====================

Bug: 237490659
Test: Full and incremental OTA
Signed-off-by: Akilesh Kailash <akailash@google.com>
Change-Id: I887d5073dba88e9a8a85ac10c771e4ccee7c84ff
diff --git a/fs_mgr/libsnapshot/snapshot.cpp b/fs_mgr/libsnapshot/snapshot.cpp
index c8684a2..9670706 100644
--- a/fs_mgr/libsnapshot/snapshot.cpp
+++ b/fs_mgr/libsnapshot/snapshot.cpp
@@ -2151,8 +2151,17 @@
         if (!suffix.empty() && !android::base::EndsWith(name, suffix)) {
             continue;
         }
-        snapshots->emplace_back(std::move(name));
+
+        // Insert system and product partition at the beginning so that
+        // during snapshot-merge, these partitions are merged first.
+        if (name == "system_a" || name == "system_b" || name == "product_a" ||
+            name == "product_b") {
+            snapshots->insert(snapshots->begin(), std::move(name));
+        } else {
+            snapshots->emplace_back(std::move(name));
+        }
     }
+
     return true;
 }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index afc653f..8939b78 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -147,17 +147,18 @@
     NotifyRAForMergeReady();
 }
 
-void SnapshotHandler::CheckMergeCompletionStatus() {
+bool SnapshotHandler::CheckMergeCompletionStatus() {
     if (!merge_initiated_) {
         SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: "
                        << reader_->get_num_total_data_ops();
-        return;
+        return false;
     }
 
     struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
 
     SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops
                    << " Total-data-ops: " << reader_->get_num_total_data_ops();
+    return true;
 }
 
 bool SnapshotHandler::ReadMetadata() {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index 90fba75..ea09b65 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -306,7 +306,7 @@
     const bool& IsAttached() const { return attached_; }
     void AttachControlDevice() { attached_ = true; }
 
-    void CheckMergeCompletionStatus();
+    bool CheckMergeCompletionStatus();
     bool CommitMerge(int num_merge_ops);
 
     void CloseFds() { cow_fd_ = {}; }
@@ -337,6 +337,8 @@
 
     // State transitions for merge
     void InitiateMerge();
+    void MonitorMerge();
+    void WakeupMonitorMergeThread();
     void WaitForMergeComplete();
     bool WaitForMergeBegin();
     void NotifyRAForMergeReady();
@@ -365,6 +367,7 @@
     void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
     void SetIouringEnabled(bool io_uring_enabled) { is_io_uring_enabled_ = io_uring_enabled; }
     bool MergeInitiated() { return merge_initiated_; }
+    bool MergeMonitored() { return merge_monitored_; }
     double GetMergePercentage() { return merge_completion_percentage_; }
 
     // Merge Block State Transitions
@@ -431,6 +434,7 @@
     double merge_completion_percentage_;
 
     bool merge_initiated_ = false;
+    bool merge_monitored_ = false;
     bool attached_ = false;
     bool is_socket_present_;
     bool is_io_uring_enabled_ = false;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
index b7f7f54..1bf33c8 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
@@ -60,6 +60,14 @@
     return DaemonOps::INVALID;
 }
 
+UserSnapshotServer::UserSnapshotServer() {
+    monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
+    if (monitor_merge_event_fd_ == -1) {
+        PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
+    }
+    terminating_ = false;
+}
+
 UserSnapshotServer::~UserSnapshotServer() {
     // Close any client sockets that were added via AcceptClient().
     for (size_t i = 1; i < watched_fds_.size(); i++) {
@@ -250,7 +258,7 @@
                     return Sendmsg(fd, "fail");
                 }
 
-                if (!StartMerge(*iter)) {
+                if (!StartMerge(&lock, *iter)) {
                     return Sendmsg(fd, "fail");
                 }
 
@@ -307,7 +315,7 @@
     }
 
     handler->snapuserd()->CloseFds();
-    handler->snapuserd()->CheckMergeCompletionStatus();
+    bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
     handler->snapuserd()->UnmapBufferRegion();
 
     auto misc_name = handler->misc_name();
@@ -315,7 +323,11 @@
 
     {
         std::lock_guard<std::mutex> lock(lock_);
-        num_partitions_merge_complete_ += 1;
+        if (merge_completed) {
+            num_partitions_merge_complete_ += 1;
+            active_merge_threads_ -= 1;
+            WakeupMonitorMergeThread();
+        }
         handler->SetThreadTerminated();
         auto iter = FindHandler(&lock, handler->misc_name());
         if (iter == dm_users_.end()) {
@@ -427,6 +439,9 @@
 
         if (th.joinable()) th.join();
     }
+
+    stop_monitor_merge_thread_ = true;
+    WakeupMonitorMergeThread();
 }
 
 void UserSnapshotServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
@@ -511,13 +526,24 @@
     return true;
 }
 
-bool UserSnapshotServer::StartMerge(const std::shared_ptr<UserSnapshotDmUserHandler>& handler) {
+bool UserSnapshotServer::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+                                    const std::shared_ptr<UserSnapshotDmUserHandler>& handler) {
+    CHECK(proof_of_lock);
+
     if (!handler->snapuserd()->IsAttached()) {
         LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
         return false;
     }
 
-    handler->snapuserd()->InitiateMerge();
+    handler->snapuserd()->MonitorMerge();
+
+    if (!is_merge_monitor_started_.has_value()) {
+        std::thread(&UserSnapshotServer::MonitorMerge, this).detach();
+        is_merge_monitor_started_ = true;
+    }
+
+    merge_handlers_.push(handler);
+    WakeupMonitorMergeThread();
     return true;
 }
 
@@ -599,6 +625,42 @@
     return true;
 }
 
+void UserSnapshotServer::WakeupMonitorMergeThread() {
+    uint64_t notify = 1;
+    ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
+    if (rc < 0) {
+        PLOG(FATAL) << "failed to notify monitor merge thread";
+    }
+}
+
+void UserSnapshotServer::MonitorMerge() {
+    while (!stop_monitor_merge_thread_) {
+        uint64_t testVal;
+        ssize_t ret =
+                TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
+        if (ret == -1) {
+            PLOG(FATAL) << "Failed to read from eventfd";
+        } else if (ret == 0) {
+            LOG(FATAL) << "Hit EOF on eventfd";
+        }
+
+        LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
+        {
+            std::lock_guard<std::mutex> lock(lock_);
+            while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
+                auto handler = merge_handlers_.front();
+                merge_handlers_.pop();
+                LOG(INFO) << "Starting merge for partition: "
+                          << handler->snapuserd()->GetMiscName();
+                handler->snapuserd()->InitiateMerge();
+                active_merge_threads_ += 1;
+            }
+        }
+    }
+
+    LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
+}
+
 bool UserSnapshotServer::WaitForSocket() {
     auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
 
@@ -655,6 +717,7 @@
     if (!StartWithSocket(true)) {
         return false;
     }
+
     return Run();
 }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
index 00734a9..c2af61f 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
@@ -15,6 +15,7 @@
 #pragma once
 
 #include <poll.h>
+#include <sys/eventfd.h>
 
 #include <cstdio>
 #include <cstring>
@@ -22,6 +23,8 @@
 #include <future>
 #include <iostream>
 #include <mutex>
+#include <optional>
+#include <queue>
 #include <sstream>
 #include <string>
 #include <thread>
@@ -34,6 +37,7 @@
 namespace snapshot {
 
 static constexpr uint32_t kMaxPacketSize = 512;
+static constexpr uint8_t kMaxMergeThreads = 2;
 
 enum class DaemonOps {
     INIT,
@@ -85,13 +89,19 @@
     std::vector<struct pollfd> watched_fds_;
     bool is_socket_present_ = false;
     int num_partitions_merge_complete_ = 0;
+    int active_merge_threads_ = 0;
+    bool stop_monitor_merge_thread_ = false;
     bool is_server_running_ = false;
     bool io_uring_enabled_ = false;
+    std::optional<bool> is_merge_monitor_started_;
+
+    android::base::unique_fd monitor_merge_event_fd_;
 
     std::mutex lock_;
 
     using HandlerList = std::vector<std::shared_ptr<UserSnapshotDmUserHandler>>;
     HandlerList dm_users_;
+    std::queue<std::shared_ptr<UserSnapshotDmUserHandler>> merge_handlers_;
 
     void AddWatchedFd(android::base::borrowed_fd fd, int events);
     void AcceptClient();
@@ -109,6 +119,8 @@
     bool IsTerminating() { return terminating_; }
 
     void RunThread(std::shared_ptr<UserSnapshotDmUserHandler> handler);
+    void MonitorMerge();
+
     void JoinAllThreads();
     bool StartWithSocket(bool start_listening);
 
@@ -122,7 +134,7 @@
     bool UpdateVerification(std::lock_guard<std::mutex>* proof_of_lock);
 
   public:
-    UserSnapshotServer() { terminating_ = false; }
+    UserSnapshotServer();
     ~UserSnapshotServer();
 
     bool Start(const std::string& socketname);
@@ -136,9 +148,11 @@
                                                           const std::string& backing_device,
                                                           const std::string& base_path_merge);
     bool StartHandler(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
-    bool StartMerge(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
+    bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+                    const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
     std::string GetMergeStatus(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
 
+    void WakeupMonitorMergeThread();
     void SetTerminating() { terminating_ = true; }
     void ReceivedSocketSignal() { received_socket_signal_ = true; }
     void SetServerRunning() { is_server_running_ = true; }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
index d4e1d7c..28c9f68 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
@@ -165,6 +165,13 @@
 using namespace android::dm;
 using android::base::unique_fd;
 
+void SnapshotHandler::MonitorMerge() {
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        merge_monitored_ = true;
+    }
+}
+
 // This is invoked once primarily by update-engine to initiate
 // the merge
 void SnapshotHandler::InitiateMerge() {
@@ -361,10 +368,16 @@
 
 std::string SnapshotHandler::GetMergeStatus() {
     bool merge_not_initiated = false;
+    bool merge_monitored = false;
     bool merge_failed = false;
 
     {
         std::lock_guard<std::mutex> lock(lock_);
+
+        if (MergeMonitored()) {
+            merge_monitored = true;
+        }
+
         if (!MergeInitiated()) {
             merge_not_initiated = true;
         }
@@ -387,6 +400,12 @@
             return "snapshot-merge-complete";
         }
 
+        // Merge monitor thread is tracking the merge but the merge thread
+        // is not started yet.
+        if (merge_monitored) {
+            return "snapshot-merge";
+        }
+
         // Return the state as "snapshot". If the device was rebooted during
         // merge, we will return the status as "snapshot". This is ok, as
         // libsnapshot will explicitly resume the merge. This is slightly