libbinder: server sessions shut down independently

Before this CL, for a server, there is a single pipe which when hungup
will end every session with that server. This, as it turned out in
hindsight to be problematic, since we want to shutdown problematic
sessions without interrupting other clients.

Assuming we keep the pipe-based interrupt, there are essential two
solutions to consider here (this CL is option B).
a. instead of hanging up these pipes, write information to them, and
   then wake up all relevant threads which can figure out the right
   thing to do.
   - pro: only need to create one FD
   - con: very complicated because there may be multiple readers of the
          pipe, etc...
   - con: will mess with all clients
b. have a separate pipe per session
   - pro: relatively simple logic
   - con: an extra FD per session

So, for now, going with (b).

Bug: 183140903
Test: binderRpcTest, run biner_rpc_fuzzer for 5 minutes
    (I checked locally, w/o the RpcServer check for isTriggered, this
    also detected that deadlock in less than 1 minute! :D)
Change-Id: I9e290cd0a6df1d33435183fb16f508f38071ad62
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 2f378da..2d2eed2 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -187,6 +187,11 @@
     }
 
     mShutdownTrigger->trigger();
+    for (auto& [id, session] : mSessions) {
+        (void)id;
+        session->mShutdownTrigger->trigger();
+    }
+
     while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
         if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) {
             ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, "
@@ -261,7 +266,7 @@
         };
         server->mConnectingThreads.erase(threadId);
 
-        if (!idValid) {
+        if (!idValid || server->mShutdownTrigger->isTriggered()) {
             return;
         }
 
@@ -276,10 +281,14 @@
 
             session = RpcSession::make();
             session->setMaxThreads(server->mMaxThreads);
-            session->setForServer(server,
-                                  sp<RpcServer::EventListener>::fromExisting(
-                                          static_cast<RpcServer::EventListener*>(server.get())),
-                                  server->mSessionIdCounter, server->mShutdownTrigger);
+            if (!session->setForServer(server,
+                                       sp<RpcServer::EventListener>::fromExisting(
+                                               static_cast<RpcServer::EventListener*>(
+                                                       server.get())),
+                                       server->mSessionIdCounter)) {
+                ALOGE("Failed to attach server to session");
+                return;
+            }
 
             server->mSessions[server->mSessionIdCounter] = session;
         } else {
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index c563377..93e04f7 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -144,7 +144,10 @@
 
 std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
     auto ret = std::make_unique<RpcSession::FdTrigger>();
-    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr;
+    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
+        ALOGE("Could not create pipe %s", strerror(errno));
+        return nullptr;
+    }
     return ret;
 }
 
@@ -152,6 +155,10 @@
     mWrite.reset();
 }
 
+bool RpcSession::FdTrigger::isTriggered() {
+    return mWrite == -1;
+}
+
 status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
     while (true) {
         pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0},
@@ -408,20 +415,21 @@
     return true;
 }
 
-void RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
-                              int32_t sessionId,
-                              const std::shared_ptr<FdTrigger>& shutdownTrigger) {
+bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
+                              int32_t sessionId) {
     LOG_ALWAYS_FATAL_IF(mForServer != nullptr);
     LOG_ALWAYS_FATAL_IF(server == nullptr);
     LOG_ALWAYS_FATAL_IF(mEventListener != nullptr);
     LOG_ALWAYS_FATAL_IF(eventListener == nullptr);
     LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
-    LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr);
+
+    mShutdownTrigger = FdTrigger::make();
+    if (mShutdownTrigger == nullptr) return false;
 
     mId = sessionId;
     mForServer = server;
     mEventListener = eventListener;
-    mShutdownTrigger = shutdownTrigger;
+    return true;
 }
 
 sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 98db221..b88bf50 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -173,7 +173,7 @@
     wp<IBinder> mRootObjectWeak;
     std::map<int32_t, sp<RpcSession>> mSessions;
     int32_t mSessionIdCounter = 0;
-    std::shared_ptr<RpcSession::FdTrigger> mShutdownTrigger;
+    std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
     std::condition_variable mShutdownCv;
 };
 
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 59613c5..2f2c77c 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -130,10 +130,16 @@
 
         /**
          * Close the write end of the pipe so that the read end receives POLLHUP.
+         * Not threadsafe.
          */
         void trigger();
 
         /**
+         * Whether this has been triggered.
+         */
+        bool isTriggered();
+
+        /**
          * Poll for a read event.
          *
          * Return:
@@ -192,9 +198,9 @@
     [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId,
                                                 bool server);
     [[nodiscard]] bool addClientConnection(base::unique_fd fd);
-    void setForServer(const wp<RpcServer>& server,
-                      const wp<RpcSession::EventListener>& eventListener, int32_t sessionId,
-                      const std::shared_ptr<FdTrigger>& shutdownTrigger);
+    [[nodiscard]] bool setForServer(const wp<RpcServer>& server,
+                                    const wp<RpcSession::EventListener>& eventListener,
+                                    int32_t sessionId);
     sp<RpcConnection> assignServerToThisThread(base::unique_fd fd);
     [[nodiscard]] bool removeServerConnection(const sp<RpcConnection>& connection);
 
@@ -247,7 +253,7 @@
     // TODO(b/183988761): this shouldn't be guessable
     std::optional<int32_t> mId;
 
-    std::shared_ptr<FdTrigger> mShutdownTrigger;
+    std::unique_ptr<FdTrigger> mShutdownTrigger;
 
     std::unique_ptr<RpcState> mState;