libbinder: RpcServer - shutdown connection threads

Re-use FdTrigger to shutdown connection threads.

Coming next - shutting down sessions as well!

Bug: 185167543
Test: binderRpcTest, binder_rpc_fuzzer
Change-Id: I238f1e2a5f69fdec09ac8b3afc484ab8639852fa
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 50b0465..540c346 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -184,10 +184,18 @@
 
 bool RpcServer::shutdown() {
     std::unique_lock<std::mutex> _l(mLock);
-    if (mShutdownTrigger == nullptr) return false;
+    if (mShutdownTrigger == nullptr) {
+        LOG_RPC_DETAIL("Cannot shutdown. No shutdown trigger installed.");
+        return false;
+    }
 
     mShutdownTrigger->trigger();
-    while (mJoinThreadRunning) mShutdownCv.wait(_l);
+    while (mJoinThreadRunning || !mConnectingThreads.empty()) {
+        ALOGI("Waiting for RpcServer to shut down. Join thread running: %d, Connecting threads: "
+              "%zu",
+              mJoinThreadRunning, mConnectingThreads.size());
+        mShutdownCv.wait(_l);
+    }
 
     // At this point, we know join() is about to exit, but the thread that calls
     // join() may not have exited yet.
@@ -222,17 +230,21 @@
 void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) {
     // TODO(b/183988761): cannot trust this simple ID
     LOG_ALWAYS_FATAL_IF(!server->mAgreedExperimental, "no!");
-    bool idValid = true;
+
+    // mShutdownTrigger can only be cleared once connection threads have joined.
+    // It must be set before this thread is started
+    LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr);
+
     int32_t id;
-    if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) {
-        ALOGE("Could not read ID from fd %d", clientFd.get());
-        idValid = false;
+    bool idValid = server->mShutdownTrigger->interruptableRecv(clientFd.get(), &id, sizeof(id));
+    if (!idValid) {
+        ALOGE("Failed to read ID for client connecting to RPC server.");
     }
 
     std::thread thisThread;
     sp<RpcSession> session;
     {
-        std::lock_guard<std::mutex> _l(server->mLock);
+        std::unique_lock<std::mutex> _l(server->mLock);
 
         auto threadId = server->mConnectingThreads.find(std::this_thread::get_id());
         LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(),
@@ -241,6 +253,16 @@
         ScopeGuard detachGuard = [&]() { thisThread.detach(); };
         server->mConnectingThreads.erase(threadId);
 
+        // TODO(b/185167543): we currently can't disable this because we don't
+        // shutdown sessions as well, only the server itself. So, we need to
+        // keep this separate from the detachGuard, since we temporarily want to
+        // give a notification even when we pass ownership of the thread to
+        // a session.
+        ScopeGuard threadLifetimeGuard = [&]() {
+            _l.unlock();
+            server->mShutdownCv.notify_all();
+        };
+
         if (!idValid) {
             return;
         }
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index ea82f36..771d738 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -144,6 +144,22 @@
     }
 }
 
+bool RpcSession::FdTrigger::interruptableRecv(base::borrowed_fd fd, void* data, size_t size) {
+    uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
+    uint8_t* end = buffer + size;
+
+    while (triggerablePollRead(fd)) {
+        ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
+        if (readSize < 0) {
+            ALOGE("Failed to read %s", strerror(errno));
+            return false;
+        }
+        buffer += readSize;
+        if (buffer == end) return true;
+    }
+    return false;
+}
+
 status_t RpcSession::readId() {
     {
         std::lock_guard<std::mutex> _l(mMutex);
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index bcef8dc..3d22002 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -134,6 +134,15 @@
          */
         bool triggerablePollRead(base::borrowed_fd fd);
 
+        /**
+         * Read, but allow the read to be interrupted by this trigger.
+         *
+         * Return:
+         *   true - read succeeded at 'size'
+         *   false - interrupted (failure or trigger)
+         */
+        bool interruptableRecv(base::borrowed_fd fd, void* data, size_t size);
+
     private:
         base::unique_fd mWrite;
         base::unique_fd mRead;
diff --git a/libs/binder/tests/rpc_fuzzer/main.cpp b/libs/binder/tests/rpc_fuzzer/main.cpp
index 84f5974..e6fd392 100644
--- a/libs/binder/tests/rpc_fuzzer/main.cpp
+++ b/libs/binder/tests/rpc_fuzzer/main.cpp
@@ -20,6 +20,7 @@
 #include <binder/Parcel.h>
 #include <binder/RpcServer.h>
 #include <binder/RpcSession.h>
+#include <fuzzer/FuzzedDataProvider.h>
 
 #include <sys/resource.h>
 #include <sys/un.h>
@@ -53,6 +54,7 @@
 
 extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
     if (size > 50000) return 0;
+    FuzzedDataProvider provider(data, size);
 
     unlink(kSock.c_str());
 
@@ -84,14 +86,21 @@
     CHECK(base::WriteFully(clientFd, &id, sizeof(id)));
 #endif
 
-    CHECK(base::WriteFully(clientFd, data, size));
+    bool hangupBeforeShutdown = provider.ConsumeBool();
 
-    clientFd.reset();
+    std::vector<uint8_t> writeData = provider.ConsumeRemainingBytes<uint8_t>();
+    CHECK(base::WriteFully(clientFd, writeData.data(), writeData.size()));
+
+    if (hangupBeforeShutdown) {
+        clientFd.reset();
+    }
 
     // TODO(185167543): currently this is okay because we only shutdown the one
     // thread, but once we can shutdown other sessions, we'll need to change
     // this behavior in order to make sure all of the input is actually read.
     while (!server->shutdown()) usleep(100);
+
+    clientFd.reset();
     serverThread.join();
 
     // TODO(b/185167543): better way to force a server to shutdown