init: handle property service callbacks asynchronously

A previous change moved property_service into its own thread, since
there was otherwise a deadlock whenever a process called by init would
try to set a property.  This new thread, however, would send a message
via a blocking socket to init for each property that it received,
since init may need to take action depending on which property it is.
Unfortunately, this means that the deadlock is still possible, the
only difference is the socket's buffer must be filled before init deadlocks.

There are possible partial solutions here: the socket's buffer may be
increased or property_service may only send messages for the
properties that init will take action on, however all of these
solutions still lead to eventual deadlock.  The only complete solution
is to handle these messages asynchronously.

This change, therefore, adds the following:
1) A lock for instructing init to reboot
2) A lock for waiting on properties
3) A lock for queueing new properties
4) A lock for any actions with ServiceList or any Services, enforced
   through thread annotations, particularly since this code was not
   designed with the intention of being multi-threaded.

Bug: 146877356
Bug: 148236233
Test: boot
Test: kill hwservicemanager without deadlock
Change-Id: I84108e54217866205a48c45e8b59355012c32ea8
diff --git a/init/property_service.cpp b/init/property_service.cpp
index 84644e8..b140ee4 100644
--- a/init/property_service.cpp
+++ b/init/property_service.cpp
@@ -92,8 +92,10 @@
 static bool persistent_properties_loaded = false;
 
 static int property_set_fd = -1;
+static int from_init_socket = -1;
 static int init_socket = -1;
 static bool accept_messages = false;
+static std::thread property_service_thread;
 
 static PropertyInfoAreaFile property_info_area;
 
@@ -147,17 +149,6 @@
     return has_access;
 }
 
-static void SendPropertyChanged(const std::string& name, const std::string& value) {
-    auto property_msg = PropertyMessage{};
-    auto* changed_message = property_msg.mutable_changed_message();
-    changed_message->set_name(name);
-    changed_message->set_value(value);
-
-    if (auto result = SendMessage(init_socket, property_msg); !result.ok()) {
-        LOG(ERROR) << "Failed to send property changed message: " << result.error();
-    }
-}
-
 static uint32_t PropertySet(const std::string& name, const std::string& value, std::string* error) {
     size_t valuelen = value.size();
 
@@ -196,47 +187,137 @@
     // If init hasn't started its main loop, then it won't be handling property changed messages
     // anyway, so there's no need to try to send them.
     if (accept_messages) {
-        SendPropertyChanged(name, value);
+        PropertyChanged(name, value);
     }
     return PROP_SUCCESS;
 }
 
-class AsyncRestorecon {
+template <typename T>
+class SingleThreadExecutor {
   public:
-    void TriggerRestorecon(const std::string& path) {
-        auto guard = std::lock_guard{mutex_};
-        paths_.emplace(path);
+    virtual ~SingleThreadExecutor() {}
 
-        if (!thread_started_) {
-            thread_started_ = true;
-            std::thread{&AsyncRestorecon::ThreadFunction, this}.detach();
+    template <typename F = T>
+    void Run(F&& item) {
+        auto guard = std::lock_guard{mutex_};
+        items_.emplace(std::forward<F>(item));
+
+        if (thread_state_ == ThreadState::kRunning || thread_state_ == ThreadState::kStopped) {
+            return;
+        }
+
+        if (thread_state_ == ThreadState::kPendingJoin) {
+            thread_.join();
+        }
+
+        StartThread();
+    }
+
+    void StopAndJoin() {
+        auto lock = std::unique_lock{mutex_};
+        if (thread_state_ == ThreadState::kPendingJoin) {
+            thread_.join();
+        } else if (thread_state_ == ThreadState::kRunning) {
+            thread_state_ = ThreadState::kStopped;
+            lock.unlock();
+            thread_.join();
+            lock.lock();
+        }
+
+        thread_state_ = ThreadState::kStopped;
+    }
+
+    void Restart() {
+        auto guard = std::lock_guard{mutex_};
+        if (items_.empty()) {
+            thread_state_ = ThreadState::kNotStarted;
+        } else {
+            StartThread();
+        }
+    }
+
+    void MaybeJoin() {
+        auto guard = std::lock_guard{mutex_};
+        if (thread_state_ == ThreadState::kPendingJoin) {
+            thread_.join();
+            thread_state_ = ThreadState::kNotStarted;
         }
     }
 
   private:
+    virtual void Execute(T&& item) = 0;
+
+    void StartThread() {
+        thread_state_ = ThreadState::kRunning;
+        auto thread = std::thread{&SingleThreadExecutor::ThreadFunction, this};
+        std::swap(thread_, thread);
+    }
+
     void ThreadFunction() {
         auto lock = std::unique_lock{mutex_};
 
-        while (!paths_.empty()) {
-            auto path = paths_.front();
-            paths_.pop();
+        while (!items_.empty()) {
+            auto item = items_.front();
+            items_.pop();
 
             lock.unlock();
-            if (selinux_android_restorecon(path.c_str(), SELINUX_ANDROID_RESTORECON_RECURSE) != 0) {
-                LOG(ERROR) << "Asynchronous restorecon of '" << path << "' failed'";
-            }
-            android::base::SetProperty(kRestoreconProperty, path);
+            Execute(std::move(item));
             lock.lock();
         }
 
-        thread_started_ = false;
+        if (thread_state_ != ThreadState::kStopped) {
+            thread_state_ = ThreadState::kPendingJoin;
+        }
     }
 
     std::mutex mutex_;
-    std::queue<std::string> paths_;
-    bool thread_started_ = false;
+    std::queue<T> items_;
+    enum class ThreadState {
+        kNotStarted,  // Initial state when starting the program or when restarting with no items to
+                      // process.
+        kRunning,     // The thread is running and is in a state that it will process new items if
+                      // are run.
+        kPendingJoin,  // The thread has run to completion and is pending join().  A new thread must
+                       // be launched for new items to be processed.
+        kStopped,  // This executor has stopped and will not process more items until Restart() is
+                   // called.  Currently pending items will be processed and the thread will be
+                   // joined.
+    };
+    ThreadState thread_state_ = ThreadState::kNotStarted;
+    std::thread thread_;
 };
 
+class RestoreconThread : public SingleThreadExecutor<std::string> {
+    virtual void Execute(std::string&& path) override {
+        if (selinux_android_restorecon(path.c_str(), SELINUX_ANDROID_RESTORECON_RECURSE) != 0) {
+            LOG(ERROR) << "Asynchronous restorecon of '" << path << "' failed'";
+        }
+        android::base::SetProperty(kRestoreconProperty, path);
+    }
+};
+
+struct ControlMessageInfo {
+    std::string message;
+    std::string name;
+    pid_t pid;
+    int fd;
+};
+
+class ControlMessageThread : public SingleThreadExecutor<ControlMessageInfo> {
+    virtual void Execute(ControlMessageInfo&& info) override {
+        bool success = HandleControlMessage(info.message, info.name, info.pid);
+
+        uint32_t response = success ? PROP_SUCCESS : PROP_ERROR_HANDLE_CONTROL_MESSAGE;
+        if (info.fd != -1) {
+            TEMP_FAILURE_RETRY(send(info.fd, &response, sizeof(response), 0));
+            close(info.fd);
+        }
+    }
+};
+
+static RestoreconThread restorecon_thread;
+static ControlMessageThread control_message_thread;
+
 class SocketConnection {
   public:
     SocketConnection(int socket, const ucred& cred) : socket_(socket), cred_(cred) {}
@@ -378,29 +459,17 @@
         return PROP_ERROR_HANDLE_CONTROL_MESSAGE;
     }
 
-    auto property_msg = PropertyMessage{};
-    auto* control_message = property_msg.mutable_control_message();
-    control_message->set_msg(msg);
-    control_message->set_name(name);
-    control_message->set_pid(pid);
-
-    // We must release the fd before sending it to init, otherwise there will be a race with init.
-    // If init calls close() before Release(), then fdsan will see the wrong tag and abort().
+    // We must release the fd before spawning the thread, otherwise there will be a race with the
+    // thread. If the thread calls close() before this function calls Release(), then fdsan will see
+    // the wrong tag and abort().
     int fd = -1;
     if (socket != nullptr && SelinuxGetVendorAndroidVersion() > __ANDROID_API_Q__) {
         fd = socket->Release();
-        control_message->set_fd(fd);
     }
 
-    if (auto result = SendMessage(init_socket, property_msg); !result.ok()) {
-        // We've already released the fd above, so if we fail to send the message to init, we need
-        // to manually free it here.
-        if (fd != -1) {
-            close(fd);
-        }
-        *error = "Failed to send control message: " + result.error().message();
-        return PROP_ERROR_HANDLE_CONTROL_MESSAGE;
-    }
+    // Handling a control message likely calls SetProperty, which we must synchronously handle,
+    // therefore we must fork a thread to handle it.
+    control_message_thread.Run({msg, name, pid, fd});
 
     return PROP_SUCCESS;
 }
@@ -502,8 +571,7 @@
     // We use a thread to do this restorecon operation to prevent holding up init, as it may take
     // a long time to complete.
     if (name == kRestoreconProperty && cr.pid != 1 && !value.empty()) {
-        static AsyncRestorecon async_restorecon;
-        async_restorecon.TriggerRestorecon(value);
+        restorecon_thread.Run(value);
         return PROP_SUCCESS;
     }
 
@@ -1082,6 +1150,8 @@
     PropertyLoadBootDefaults();
 }
 
+static bool pause_property_service = false;
+
 static void HandleInitSocket() {
     auto message = ReadMessage(init_socket);
     if (!message.ok()) {
@@ -1116,6 +1186,10 @@
             accept_messages = true;
             break;
         }
+        case InitMessage::kPausePropertyService: {
+            pause_property_service = true;
+            break;
+        }
         default:
             LOG(ERROR) << "Unknown message type from init: " << init_message.msg_case();
     }
@@ -1136,7 +1210,7 @@
         LOG(FATAL) << result.error();
     }
 
-    while (true) {
+    while (!pause_property_service) {
         auto pending_functions = epoll.Wait(std::nullopt);
         if (!pending_functions.ok()) {
             LOG(ERROR) << pending_functions.error();
@@ -1145,9 +1219,34 @@
                 (*function)();
             }
         }
+        control_message_thread.MaybeJoin();
+        restorecon_thread.MaybeJoin();
     }
 }
 
+void SendStopPropertyServiceMessage() {
+    auto init_message = InitMessage{};
+    init_message.set_pause_property_service(true);
+    if (auto result = SendMessage(from_init_socket, init_message); !result.ok()) {
+        LOG(ERROR) << "Failed to send stop property service message: " << result.error();
+    }
+}
+
+void PausePropertyService() {
+    control_message_thread.StopAndJoin();
+    restorecon_thread.StopAndJoin();
+    SendStopPropertyServiceMessage();
+    property_service_thread.join();
+}
+
+void ResumePropertyService() {
+    pause_property_service = false;
+    auto new_thread = std::thread{PropertyServiceThread};
+    property_service_thread.swap(new_thread);
+    restorecon_thread.Restart();
+    control_message_thread.Restart();
+}
+
 void StartPropertyService(int* epoll_socket) {
     InitPropertySet("ro.property_service.version", "2");
 
@@ -1155,7 +1254,7 @@
     if (socketpair(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0, sockets) != 0) {
         PLOG(FATAL) << "Failed to socketpair() between property_service and init";
     }
-    *epoll_socket = sockets[0];
+    *epoll_socket = from_init_socket = sockets[0];
     init_socket = sockets[1];
     accept_messages = true;
 
@@ -1169,7 +1268,8 @@
 
     listen(property_set_fd, 8);
 
-    std::thread{PropertyServiceThread}.detach();
+    auto new_thread = std::thread{PropertyServiceThread};
+    property_service_thread.swap(new_thread);
 }
 
 }  // namespace init