Merge "RpcServer::establishConnection: make static"
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 424a210..73facc1 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -16,14 +16,12 @@
 
 #define LOG_TAG "RpcServer"
 
-#include <poll.h>
 #include <sys/socket.h>
 #include <sys/un.h>
 
 #include <thread>
 #include <vector>
 
-#include <android-base/macros.h>
 #include <android-base/scopeguard.h>
 #include <binder/Parcel.h>
 #include <binder/RpcServer.h>
@@ -39,7 +37,9 @@
 using base::unique_fd;
 
 RpcServer::RpcServer() {}
-RpcServer::~RpcServer() {}
+RpcServer::~RpcServer() {
+    (void)shutdown();
+}
 
 sp<RpcServer> RpcServer::make() {
     return sp<RpcServer>::make();
@@ -128,16 +128,6 @@
     return ret;
 }
 
-std::unique_ptr<RpcServer::FdTrigger> RpcServer::FdTrigger::make() {
-    auto ret = std::make_unique<RpcServer::FdTrigger>();
-    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr;
-    return ret;
-}
-
-void RpcServer::FdTrigger::trigger() {
-    mWrite.reset();
-}
-
 void RpcServer::join() {
     LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
 
@@ -146,27 +136,12 @@
         LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join.");
         LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined");
         mJoinThreadRunning = true;
-        mShutdownTrigger = FdTrigger::make();
+        mShutdownTrigger = RpcSession::FdTrigger::make();
         LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");
     }
 
-    while (true) {
-        pollfd pfd[]{{.fd = mServer.get(), .events = POLLIN, .revents = 0},
-                     {.fd = mShutdownTrigger->readFd().get(), .events = POLLHUP, .revents = 0}};
-        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
-        if (ret < 0) {
-            ALOGE("Could not poll socket: %s", strerror(errno));
-            continue;
-        }
-        if (ret == 0) {
-            continue;
-        }
-        if (pfd[1].revents & POLLHUP) {
-            LOG_RPC_DETAIL("join() exiting because shutdown requested.");
-            break;
-        }
-
-        (void)acceptOneNoCheck();
+    while (mShutdownTrigger->triggerablePollRead(mServer)) {
+        (void)acceptOne();
     }
 
     {
@@ -177,12 +152,6 @@
 }
 
 bool RpcServer::acceptOne() {
-    LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
-    LOG_ALWAYS_FATAL_IF(!hasServer(), "RpcServer must be setup to acceptOne.");
-    return acceptOneNoCheck();
-}
-
-bool RpcServer::acceptOneNoCheck() {
     unique_fd clientFd(
             TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC)));
 
@@ -203,7 +172,6 @@
 }
 
 bool RpcServer::shutdown() {
-    LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
     std::unique_lock<std::mutex> _l(mLock);
     if (mShutdownTrigger == nullptr) return false;
 
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index 05fa49e..ea82f36 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -19,10 +19,12 @@
 #include <binder/RpcSession.h>
 
 #include <inttypes.h>
+#include <poll.h>
 #include <unistd.h>
 
 #include <string_view>
 
+#include <android-base/macros.h>
 #include <binder/Parcel.h>
 #include <binder/RpcServer.h>
 #include <binder/Stability.h>
@@ -113,6 +115,35 @@
     return state()->sendDecStrong(connection.fd(), address);
 }
 
+std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
+    auto ret = std::make_unique<RpcSession::FdTrigger>();
+    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr;
+    return ret;
+}
+
+void RpcSession::FdTrigger::trigger() {
+    mWrite.reset();
+}
+
+bool RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
+    while (true) {
+        pollfd pfd[]{{.fd = fd.get(), .events = POLLIN, .revents = 0},
+                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
+        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
+        if (ret < 0) {
+            ALOGE("Could not poll: %s", strerror(errno));
+            continue;
+        }
+        if (ret == 0) {
+            continue;
+        }
+        if (pfd[1].revents & POLLHUP) {
+            return false;
+        }
+        return true;
+    }
+}
+
 status_t RpcSession::readId() {
     {
         std::lock_guard<std::mutex> _l(mMutex);
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 4162a2b..f76ecc2 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -135,12 +135,6 @@
     [[nodiscard]] bool shutdown();
 
     /**
-     * Accept one connection on this server. You must have at least one client
-     * session before calling this.
-     */
-    [[nodiscard]] bool acceptOne();
-
-    /**
      * For debugging!
      */
     std::vector<sp<RpcSession>> listSessions();
@@ -153,29 +147,12 @@
     void onSessionTerminating(const sp<RpcSession>& session);
 
 private:
-    /** This is not a pipe. */
-    struct FdTrigger {
-        static std::unique_ptr<FdTrigger> make();
-        /**
-         * poll() on this fd for POLLHUP to get notification when trigger is called
-         */
-        base::borrowed_fd readFd() const { return mRead; }
-        /**
-         * Close the write end of the pipe so that the read end receives POLLHUP.
-         */
-        void trigger();
-
-    private:
-        base::unique_fd mWrite;
-        base::unique_fd mRead;
-    };
-
     friend sp<RpcServer>;
     RpcServer();
 
     static void establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd);
     bool setupSocketServer(const RpcSocketAddress& address);
-    [[nodiscard]] bool acceptOneNoCheck();
+    [[nodiscard]] bool acceptOne();
 
     bool mAgreedExperimental = false;
     size_t mMaxThreads = 1;
@@ -188,7 +165,7 @@
     std::map<int32_t, sp<RpcSession>> mSessions;
     int32_t mSessionIdCounter = 0;
     bool mJoinThreadRunning = false;
-    std::unique_ptr<FdTrigger> mShutdownTrigger;
+    std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
     std::condition_variable mShutdownCv;
 };
 
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index bcc213c..bcef8dc 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -112,6 +112,33 @@
     friend RpcServer;
     RpcSession();
 
+    /** This is not a pipe. */
+    struct FdTrigger {
+        static std::unique_ptr<FdTrigger> make();
+        /**
+         * poll() on this fd for POLLHUP to get notification when trigger is called
+         */
+        base::borrowed_fd readFd() const { return mRead; }
+
+        /**
+         * Close the write end of the pipe so that the read end receives POLLHUP.
+         */
+        void trigger();
+
+        /**
+         * Poll for a read event.
+         *
+         * Return:
+         *   true - time to read!
+         *   false - trigger happened
+         */
+        bool triggerablePollRead(base::borrowed_fd fd);
+
+    private:
+        base::unique_fd mWrite;
+        base::unique_fd mRead;
+    };
+
     status_t readId();
 
     // transfer ownership of thread
diff --git a/libs/binder/tests/rpc_fuzzer/main.cpp b/libs/binder/tests/rpc_fuzzer/main.cpp
index 8a12aea..84f5974 100644
--- a/libs/binder/tests/rpc_fuzzer/main.cpp
+++ b/libs/binder/tests/rpc_fuzzer/main.cpp
@@ -61,7 +61,7 @@
     server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
     CHECK(server->setupUnixDomainServer(kSock.c_str()));
 
-    std::thread serverThread([=] { (void)server->acceptOne(); });
+    std::thread serverThread([=] { (void)server->join(); });
 
     sockaddr_un addr{
             .sun_family = AF_UNIX,
@@ -76,8 +76,6 @@
                      connect(clientFd.get(), reinterpret_cast<sockaddr*>(&addr), sizeof(addr))))
             << strerror(errno);
 
-    serverThread.join();
-
     // TODO(b/182938024): fuzz multiple sessions, instead of just one
 
 #if 0
@@ -90,6 +88,12 @@
 
     clientFd.reset();
 
+    // TODO(185167543): currently this is okay because we only shutdown the one
+    // thread, but once we can shutdown other sessions, we'll need to change
+    // this behavior in order to make sure all of the input is actually read.
+    while (!server->shutdown()) usleep(100);
+    serverThread.join();
+
     // TODO(b/185167543): better way to force a server to shutdown
     while (!server->listSessions().empty() && server->numUninitializedSessions()) {
         usleep(1);