Add RpcServer::shutdown.

The function terminates any existing execution of join().

After this CL, join() is only allowed to be called in one thread.

Test: binderLibTest
Change-Id: I5f1abbb39ee42a8f94b7394a702a152701537e7e
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 59659bd..e31aea0 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -16,19 +16,21 @@
 
 #define LOG_TAG "RpcServer"
 
+#include <poll.h>
 #include <sys/socket.h>
 #include <sys/un.h>
 
 #include <thread>
 #include <vector>
 
+#include <android-base/macros.h>
 #include <android-base/scopeguard.h>
 #include <binder/Parcel.h>
 #include <binder/RpcServer.h>
 #include <log/log.h>
-#include "RpcState.h"
 
 #include "RpcSocketAddress.h"
+#include "RpcState.h"
 #include "RpcWireFormat.h"
 
 namespace android {
@@ -99,7 +101,7 @@
 
 void RpcServer::setMaxThreads(size_t threads) {
     LOG_ALWAYS_FATAL_IF(threads <= 0, "RpcServer is useless without threads");
-    LOG_ALWAYS_FATAL_IF(mStarted, "must be called before started");
+    LOG_ALWAYS_FATAL_IF(mJoinThreadRunning, "Cannot set max threads while running");
     mMaxThreads = threads;
 }
 
@@ -126,16 +128,61 @@
     return ret;
 }
 
+std::unique_ptr<RpcServer::FdTrigger> RpcServer::FdTrigger::make() {
+    auto ret = std::make_unique<RpcServer::FdTrigger>();
+    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr;
+    return ret;
+}
+
+void RpcServer::FdTrigger::trigger() {
+    mWrite.reset();
+}
+
 void RpcServer::join() {
-    while (true) {
-        (void)acceptOne();
+    LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
+
+    {
+        std::lock_guard<std::mutex> _l(mLock);
+        LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join.");
+        LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined");
+        mJoinThreadRunning = true;
+        mShutdownTrigger = FdTrigger::make();
+        LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");
     }
+
+    while (true) {
+        pollfd pfd[]{{.fd = mServer.get(), .events = POLLIN, .revents = 0},
+                     {.fd = mShutdownTrigger->readFd().get(), .events = POLLHUP, .revents = 0}};
+        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
+        if (ret < 0) {
+            ALOGE("Could not poll socket: %s", strerror(errno));
+            continue;
+        }
+        if (ret == 0) {
+            continue;
+        }
+        if (pfd[1].revents & POLLHUP) {
+            LOG_RPC_DETAIL("join() exiting because shutdown requested.");
+            break;
+        }
+
+        (void)acceptOneNoCheck();
+    }
+
+    {
+        std::lock_guard<std::mutex> _l(mLock);
+        mJoinThreadRunning = false;
+    }
+    mShutdownCv.notify_all();
 }
 
 bool RpcServer::acceptOne() {
     LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
-    LOG_ALWAYS_FATAL_IF(!hasServer(), "RpcServer must be setup to join.");
+    LOG_ALWAYS_FATAL_IF(!hasServer(), "RpcServer must be setup to acceptOne.");
+    return acceptOneNoCheck();
+}
 
+bool RpcServer::acceptOneNoCheck() {
     unique_fd clientFd(
             TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC)));
 
@@ -156,6 +203,18 @@
     return true;
 }
 
+bool RpcServer::shutdown() {
+    LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
+    std::unique_lock<std::mutex> _l(mLock);
+    if (mShutdownTrigger == nullptr) return false;
+
+    mShutdownTrigger->trigger();
+    while (mJoinThreadRunning) mShutdownCv.wait(_l);
+
+    mShutdownTrigger = nullptr;
+    return true;
+}
+
 std::vector<sp<RpcSession>> RpcServer::listSessions() {
     std::lock_guard<std::mutex> _l(mLock);
     std::vector<sp<RpcSession>> sessions;