storageproxy: Report fsync failures with a distinct error code am: cf458bae1e

Original change: https://googleplex-android-review.googlesource.com/c/platform/system/core/+/19517837

Change-Id: I0cec6b12306b037503160b56ceecdb95acc4bde3
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/bootstat/bootstat.cpp b/bootstat/bootstat.cpp
index 2c878f0..af0c89e 100644
--- a/bootstat/bootstat.cpp
+++ b/bootstat/bootstat.cpp
@@ -463,6 +463,11 @@
     {"watchdog,gsa,hard", 215},
     {"watchdog,gsa,soft", 216},
     {"watchdog,pmucal", 217},
+    {"reboot,early,bl", 218},
+    {"watchdog,apc,gsa,crashed", 219},
+    {"watchdog,apc,bl31,crashed", 220},
+    {"watchdog,apc,pbl,crashed", 221},
+    {"reboot,memory_protect,hyp", 222},
 };
 
 // Converts a string value representing the reason the system booted to an
diff --git a/fs_mgr/libsnapshot/snapshot.cpp b/fs_mgr/libsnapshot/snapshot.cpp
index 019b64a..870801e 100644
--- a/fs_mgr/libsnapshot/snapshot.cpp
+++ b/fs_mgr/libsnapshot/snapshot.cpp
@@ -2151,8 +2151,17 @@
         if (!suffix.empty() && !android::base::EndsWith(name, suffix)) {
             continue;
         }
-        snapshots->emplace_back(std::move(name));
+
+        // Insert system and product partition at the beginning so that
+        // during snapshot-merge, these partitions are merged first.
+        if (name == "system_a" || name == "system_b" || name == "product_a" ||
+            name == "product_b") {
+            snapshots->insert(snapshots->begin(), std::move(name));
+        } else {
+            snapshots->emplace_back(std::move(name));
+        }
     }
+
     return true;
 }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
index 692cb74..718c13c 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.cpp
@@ -143,17 +143,18 @@
     NotifyRAForMergeReady();
 }
 
-void SnapshotHandler::CheckMergeCompletionStatus() {
+bool SnapshotHandler::CheckMergeCompletionStatus() {
     if (!merge_initiated_) {
         SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: "
                        << reader_->get_num_total_data_ops();
-        return;
+        return false;
     }
 
     struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
 
     SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops
                    << " Total-data-ops: " << reader_->get_num_total_data_ops();
+    return true;
 }
 
 bool SnapshotHandler::ReadMetadata() {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
index 83d40f6..c6805e5 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_core.h
@@ -18,6 +18,9 @@
 #include <stdint.h>
 #include <stdlib.h>
 #include <sys/mman.h>
+#include <sys/resource.h>
+#include <sys/time.h>
+#include <unistd.h>
 
 #include <condition_variable>
 #include <cstring>
@@ -54,6 +57,8 @@
 
 static constexpr int kNumWorkerThreads = 4;
 
+static constexpr int kNiceValueForMergeThreads = -5;
+
 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
 
@@ -274,7 +279,7 @@
     const bool& IsAttached() const { return attached_; }
     void AttachControlDevice() { attached_ = true; }
 
-    void CheckMergeCompletionStatus();
+    bool CheckMergeCompletionStatus();
     bool CommitMerge(int num_merge_ops);
 
     void CloseFds() { cow_fd_ = {}; }
@@ -305,6 +310,8 @@
 
     // State transitions for merge
     void InitiateMerge();
+    void MonitorMerge();
+    void WakeupMonitorMergeThread();
     void WaitForMergeComplete();
     bool WaitForMergeBegin();
     void NotifyRAForMergeReady();
@@ -333,6 +340,7 @@
     void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
     void SetIouringEnabled(bool io_uring_enabled) { is_io_uring_enabled_ = io_uring_enabled; }
     bool MergeInitiated() { return merge_initiated_; }
+    bool MergeMonitored() { return merge_monitored_; }
     double GetMergePercentage() { return merge_completion_percentage_; }
 
     // Merge Block State Transitions
@@ -407,6 +415,7 @@
     double merge_completion_percentage_;
 
     bool merge_initiated_ = false;
+    bool merge_monitored_ = false;
     bool attached_ = false;
     bool is_socket_present_;
     bool is_io_uring_enabled_ = false;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
index c26a2cd..63f47d6 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_merge.cpp
@@ -71,16 +71,16 @@
 }
 
 bool Worker::MergeReplaceZeroOps() {
-    // Flush every 8192 ops. Since all ops are independent and there is no
+    // Flush after merging 2MB. Since all ops are independent and there is no
     // dependency between COW ops, we will flush the data and the number
-    // of ops merged in COW file for every 8192 ops. If there is a crash,
-    // we will end up replaying some of the COW ops which were already merged.
-    // That is ok.
+    // of ops merged in COW block device. If there is a crash, we will
+    // end up replaying some of the COW ops which were already merged. That is
+    // ok.
     //
-    // Why 8192 ops ? Increasing this may improve merge time 3-4 seconds but
-    // we need to make sure that we checkpoint; 8k ops seems optimal. In-case
-    // if there is a crash merge should always make forward progress.
-    int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 32;
+    // Although increasing this greater than 2MB may help in improving merge
+    // times; however, on devices with low memory, this can be problematic
+    // when there are multiple merge threads in parallel.
+    int total_ops_merged_per_commit = (PAYLOAD_BUFFER_SZ / BLOCK_SZ) * 2;
     int num_ops_merged = 0;
 
     SNAP_LOG(INFO) << "MergeReplaceZeroOps started....";
@@ -543,6 +543,10 @@
         return true;
     }
 
+    if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
+        SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid();
+    }
+
     SNAP_LOG(INFO) << "Merge starting..";
 
     if (!Init()) {
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
index fa2866f..b9e4255 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_readahead.cpp
@@ -727,6 +727,10 @@
 
     InitializeIouring();
 
+    if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
+        SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid();
+    }
+
     while (!RAIterDone()) {
         if (!ReadAheadIOStart()) {
             break;
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
index 82b2b25..5d93f01 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.cpp
@@ -59,6 +59,14 @@
     return DaemonOps::INVALID;
 }
 
+UserSnapshotServer::UserSnapshotServer() {
+    monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
+    if (monitor_merge_event_fd_ == -1) {
+        PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
+    }
+    terminating_ = false;
+}
+
 UserSnapshotServer::~UserSnapshotServer() {
     // Close any client sockets that were added via AcceptClient().
     for (size_t i = 1; i < watched_fds_.size(); i++) {
@@ -249,7 +257,7 @@
                     return Sendmsg(fd, "fail");
                 }
 
-                if (!StartMerge(*iter)) {
+                if (!StartMerge(&lock, *iter)) {
                     return Sendmsg(fd, "fail");
                 }
 
@@ -298,7 +306,7 @@
     }
 
     handler->snapuserd()->CloseFds();
-    handler->snapuserd()->CheckMergeCompletionStatus();
+    bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
     handler->snapuserd()->UnmapBufferRegion();
 
     auto misc_name = handler->misc_name();
@@ -306,7 +314,11 @@
 
     {
         std::lock_guard<std::mutex> lock(lock_);
-        num_partitions_merge_complete_ += 1;
+        if (merge_completed) {
+            num_partitions_merge_complete_ += 1;
+            active_merge_threads_ -= 1;
+            WakeupMonitorMergeThread();
+        }
         handler->SetThreadTerminated();
         auto iter = FindHandler(&lock, handler->misc_name());
         if (iter == dm_users_.end()) {
@@ -418,6 +430,9 @@
 
         if (th.joinable()) th.join();
     }
+
+    stop_monitor_merge_thread_ = true;
+    WakeupMonitorMergeThread();
 }
 
 void UserSnapshotServer::AddWatchedFd(android::base::borrowed_fd fd, int events) {
@@ -502,13 +517,24 @@
     return true;
 }
 
-bool UserSnapshotServer::StartMerge(const std::shared_ptr<UserSnapshotDmUserHandler>& handler) {
+bool UserSnapshotServer::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+                                    const std::shared_ptr<UserSnapshotDmUserHandler>& handler) {
+    CHECK(proof_of_lock);
+
     if (!handler->snapuserd()->IsAttached()) {
         LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
         return false;
     }
 
-    handler->snapuserd()->InitiateMerge();
+    handler->snapuserd()->MonitorMerge();
+
+    if (!is_merge_monitor_started_.has_value()) {
+        std::thread(&UserSnapshotServer::MonitorMerge, this).detach();
+        is_merge_monitor_started_ = true;
+    }
+
+    merge_handlers_.push(handler);
+    WakeupMonitorMergeThread();
     return true;
 }
 
@@ -590,6 +616,42 @@
     return true;
 }
 
+void UserSnapshotServer::WakeupMonitorMergeThread() {
+    uint64_t notify = 1;
+    ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
+    if (rc < 0) {
+        PLOG(FATAL) << "failed to notify monitor merge thread";
+    }
+}
+
+void UserSnapshotServer::MonitorMerge() {
+    while (!stop_monitor_merge_thread_) {
+        uint64_t testVal;
+        ssize_t ret =
+                TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
+        if (ret == -1) {
+            PLOG(FATAL) << "Failed to read from eventfd";
+        } else if (ret == 0) {
+            LOG(FATAL) << "Hit EOF on eventfd";
+        }
+
+        LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
+        {
+            std::lock_guard<std::mutex> lock(lock_);
+            while (active_merge_threads_ < kMaxMergeThreads && merge_handlers_.size() > 0) {
+                auto handler = merge_handlers_.front();
+                merge_handlers_.pop();
+                LOG(INFO) << "Starting merge for partition: "
+                          << handler->snapuserd()->GetMiscName();
+                handler->snapuserd()->InitiateMerge();
+                active_merge_threads_ += 1;
+            }
+        }
+    }
+
+    LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
+}
+
 bool UserSnapshotServer::WaitForSocket() {
     auto scope_guard = android::base::make_scope_guard([this]() -> void { JoinAllThreads(); });
 
@@ -646,6 +708,7 @@
     if (!StartWithSocket(true)) {
         return false;
     }
+
     return Run();
 }
 
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
index 34e7941..e0844ae 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_server.h
@@ -15,6 +15,7 @@
 #pragma once
 
 #include <poll.h>
+#include <sys/eventfd.h>
 
 #include <cstdio>
 #include <cstring>
@@ -22,6 +23,8 @@
 #include <future>
 #include <iostream>
 #include <mutex>
+#include <optional>
+#include <queue>
 #include <sstream>
 #include <string>
 #include <thread>
@@ -34,6 +37,7 @@
 namespace snapshot {
 
 static constexpr uint32_t kMaxPacketSize = 512;
+static constexpr uint8_t kMaxMergeThreads = 2;
 
 enum class DaemonOps {
     INIT,
@@ -84,13 +88,19 @@
     std::vector<struct pollfd> watched_fds_;
     bool is_socket_present_ = false;
     int num_partitions_merge_complete_ = 0;
+    int active_merge_threads_ = 0;
+    bool stop_monitor_merge_thread_ = false;
     bool is_server_running_ = false;
     bool io_uring_enabled_ = false;
+    std::optional<bool> is_merge_monitor_started_;
+
+    android::base::unique_fd monitor_merge_event_fd_;
 
     std::mutex lock_;
 
     using HandlerList = std::vector<std::shared_ptr<UserSnapshotDmUserHandler>>;
     HandlerList dm_users_;
+    std::queue<std::shared_ptr<UserSnapshotDmUserHandler>> merge_handlers_;
 
     void AddWatchedFd(android::base::borrowed_fd fd, int events);
     void AcceptClient();
@@ -108,6 +118,8 @@
     bool IsTerminating() { return terminating_; }
 
     void RunThread(std::shared_ptr<UserSnapshotDmUserHandler> handler);
+    void MonitorMerge();
+
     void JoinAllThreads();
     bool StartWithSocket(bool start_listening);
 
@@ -119,7 +131,7 @@
     void TerminateMergeThreads(std::lock_guard<std::mutex>* proof_of_lock);
 
   public:
-    UserSnapshotServer() { terminating_ = false; }
+    UserSnapshotServer();
     ~UserSnapshotServer();
 
     bool Start(const std::string& socketname);
@@ -133,9 +145,11 @@
                                                           const std::string& backing_device,
                                                           const std::string& base_path_merge);
     bool StartHandler(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
-    bool StartMerge(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
+    bool StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
+                    const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
     std::string GetMergeStatus(const std::shared_ptr<UserSnapshotDmUserHandler>& handler);
 
+    void WakeupMonitorMergeThread();
     void SetTerminating() { terminating_ = true; }
     void ReceivedSocketSignal() { received_socket_signal_ = true; }
     void SetServerRunning() { is_server_running_ = true; }
diff --git a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
index d4e1d7c..28c9f68 100644
--- a/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
+++ b/fs_mgr/libsnapshot/snapuserd/user-space-merge/snapuserd_transitions.cpp
@@ -165,6 +165,13 @@
 using namespace android::dm;
 using android::base::unique_fd;
 
+void SnapshotHandler::MonitorMerge() {
+    {
+        std::lock_guard<std::mutex> lock(lock_);
+        merge_monitored_ = true;
+    }
+}
+
 // This is invoked once primarily by update-engine to initiate
 // the merge
 void SnapshotHandler::InitiateMerge() {
@@ -361,10 +368,16 @@
 
 std::string SnapshotHandler::GetMergeStatus() {
     bool merge_not_initiated = false;
+    bool merge_monitored = false;
     bool merge_failed = false;
 
     {
         std::lock_guard<std::mutex> lock(lock_);
+
+        if (MergeMonitored()) {
+            merge_monitored = true;
+        }
+
         if (!MergeInitiated()) {
             merge_not_initiated = true;
         }
@@ -387,6 +400,12 @@
             return "snapshot-merge-complete";
         }
 
+        // Merge monitor thread is tracking the merge but the merge thread
+        // is not started yet.
+        if (merge_monitored) {
+            return "snapshot-merge";
+        }
+
         // Return the state as "snapshot". If the device was rebooted during
         // merge, we will return the status as "snapshot". This is ok, as
         // libsnapshot will explicitly resume the merge. This is slightly
diff --git a/rootdir/init.rc b/rootdir/init.rc
index 870a97b..cd71aa8 100644
--- a/rootdir/init.rc
+++ b/rootdir/init.rc
@@ -823,6 +823,7 @@
     mkdir /data/misc/odsign 0710 root system
     # directory used for odsign metrics
     mkdir /data/misc/odsign/metrics 0770 root system
+
     # Directory for VirtualizationService temporary image files.
     # Delete any stale files owned by the old virtualizationservice uid (b/230056726).
     chmod 0770 /data/misc/virtualizationservice
@@ -1029,7 +1030,8 @@
     exec_start update_verifier_nonencrypted
     start statsd
     start netd
-    trigger zygote-run
+    start zygote
+    start zygote_secondary
 
 on zygote-start && property:ro.crypto.state=unsupported
     wait_for_prop odsign.verification.done 1
@@ -1037,7 +1039,8 @@
     exec_start update_verifier_nonencrypted
     start statsd
     start netd
-    trigger zygote-run
+    start zygote
+    start zygote_secondary
 
 on zygote-start && property:ro.crypto.state=encrypted && property:ro.crypto.type=file
     wait_for_prop odsign.verification.done 1
@@ -1045,15 +1048,6 @@
     exec_start update_verifier_nonencrypted
     start statsd
     start netd
-    trigger zygote-run
-
-on zygote-run && property:ro.zygote=zygote32
-    start zygote
-
-on zygote-run && property:ro.zygote=zygote64
-    start zygote
-
-on zygote-run && property:ro.zygote=zygote64_32
     start zygote
     start zygote_secondary
 
diff --git a/rootdir/init.zygote64_32.rc b/rootdir/init.zygote64_32.rc
index dfe1645..efb30d6 100644
--- a/rootdir/init.zygote64_32.rc
+++ b/rootdir/init.zygote64_32.rc
@@ -25,4 +25,3 @@
     socket usap_pool_secondary stream 660 root system
     onrestart restart zygote
     task_profiles ProcessCapacityHigh MaxPerformance
-    disabled