libbinder: RPC socket bound to service thread

This is in preparation of transforming binder RPC to have a more
traditional server achitecture where the server accepts new connections,
and arbitrarily many can be added.

Before this change, there were a few issues:
- threads might call accept, but not use the FD they accepted (threads
would accidentally trade FDs, and this could create confusing logs)
- if a server thread disconnected or finished, another thread might try
to reuse the FD being used there

Now:
- server sockets are associated with the thread from the beginning
- when a connection disconnects, the server also removes the connection

Bug: 185167543
Test: binderRpcTest (note added ~RpcConnection assert)

Change-Id: Icf50a23ba52167c6fbe04ad1a77cfcb6fb3fcc9c
diff --git a/libs/binder/RpcConnection.cpp b/libs/binder/RpcConnection.cpp
index 22e0466..1388a80 100644
--- a/libs/binder/RpcConnection.cpp
+++ b/libs/binder/RpcConnection.cpp
@@ -57,6 +57,10 @@
 }
 RpcConnection::~RpcConnection() {
     LOG_RPC_DETAIL("RpcConnection destroyed %p", this);
+
+    std::lock_guard<std::mutex> _l(mSocketMutex);
+    LOG_ALWAYS_FATAL_IF(mServers.size() != 0,
+                        "Should not be able to destroy a connection with servers in use.");
 }
 
 sp<RpcConnection> RpcConnection::make() {
@@ -222,36 +226,35 @@
 }
 
 void RpcConnection::join() {
-    // establish a connection
-    {
-        unique_fd clientFd(
-                TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC)));
-        if (clientFd < 0) {
-            // If this log becomes confusing, should save more state from setupUnixDomainServer
-            // in order to output here.
-            ALOGE("Could not accept4 socket: %s", strerror(errno));
-            return;
-        }
-
-        LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
-
-        assignServerToThisThread(std::move(clientFd));
+    // TODO(b/185167543): do this dynamically, instead of from a static number
+    // of threads
+    unique_fd clientFd(
+            TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, 0 /*length*/, SOCK_CLOEXEC)));
+    if (clientFd < 0) {
+        // If this log becomes confusing, should save more state from setupUnixDomainServer
+        // in order to output here.
+        ALOGE("Could not accept4 socket: %s", strerror(errno));
+        return;
     }
 
-    // We may not use the connection we just established (two threads might
-    // establish connections for each other), but for now, just use one
-    // server/socket connection.
-    ExclusiveSocket socket(sp<RpcConnection>::fromExisting(this), SocketUse::SERVER);
+    LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
+
+    // must be registered to allow arbitrary client code executing commands to
+    // be able to do nested calls (we can't only read from it)
+    sp<ConnectionSocket> socket = assignServerToThisThread(std::move(clientFd));
 
     while (true) {
         status_t error =
-                state()->getAndExecuteCommand(socket.fd(), sp<RpcConnection>::fromExisting(this));
+                state()->getAndExecuteCommand(socket->fd, sp<RpcConnection>::fromExisting(this));
 
         if (error != OK) {
             ALOGI("Binder socket thread closing w/ status %s", statusToString(error).c_str());
-            return;
+            break;
         }
     }
+
+    LOG_ALWAYS_FATAL_IF(!removeServerSocket(socket),
+                        "bad state: socket object guaranteed to be in list");
 }
 
 void RpcConnection::setForServer(const wp<RpcServer>& server) {
@@ -316,11 +319,23 @@
     mClients.push_back(connection);
 }
 
-void RpcConnection::assignServerToThisThread(unique_fd&& fd) {
+sp<RpcConnection::ConnectionSocket> RpcConnection::assignServerToThisThread(unique_fd&& fd) {
     std::lock_guard<std::mutex> _l(mSocketMutex);
     sp<ConnectionSocket> connection = sp<ConnectionSocket>::make();
     connection->fd = std::move(fd);
+    connection->exclusiveTid = gettid();
     mServers.push_back(connection);
+
+    return connection;
+}
+
+bool RpcConnection::removeServerSocket(const sp<ConnectionSocket>& socket) {
+    std::lock_guard<std::mutex> _l(mSocketMutex);
+    if (auto it = std::find(mServers.begin(), mServers.end(), socket); it != mServers.end()) {
+        mServers.erase(it);
+        return true;
+    }
+    return false;
 }
 
 RpcConnection::ExclusiveSocket::ExclusiveSocket(const sp<RpcConnection>& connection, SocketUse use)
@@ -335,37 +350,31 @@
 
         // CHECK FOR DEDICATED CLIENT SOCKET
         //
-        // A server/looper should always use a dedicated connection.
-        if (use != SocketUse::SERVER) {
-            findSocket(tid, &exclusive, &available, mConnection->mClients,
-                       mConnection->mClientsOffset);
+        // A server/looper should always use a dedicated connection if available
+        findSocket(tid, &exclusive, &available, mConnection->mClients, mConnection->mClientsOffset);
 
-            // WARNING: this assumes a server cannot request its client to send
-            // a transaction, as mServers is excluded below.
-            //
-            // Imagine we have more than one thread in play, and a single thread
-            // sends a synchronous, then an asynchronous command. Imagine the
-            // asynchronous command is sent on the first client socket. Then, if
-            // we naively send a synchronous command to that same socket, the
-            // thread on the far side might be busy processing the asynchronous
-            // command. So, we move to considering the second available thread
-            // for subsequent calls.
-            if (use == SocketUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) {
-                mConnection->mClientsOffset =
-                        (mConnection->mClientsOffset + 1) % mConnection->mClients.size();
-            }
+        // WARNING: this assumes a server cannot request its client to send
+        // a transaction, as mServers is excluded below.
+        //
+        // Imagine we have more than one thread in play, and a single thread
+        // sends a synchronous, then an asynchronous command. Imagine the
+        // asynchronous command is sent on the first client socket. Then, if
+        // we naively send a synchronous command to that same socket, the
+        // thread on the far side might be busy processing the asynchronous
+        // command. So, we move to considering the second available thread
+        // for subsequent calls.
+        if (use == SocketUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) {
+            mConnection->mClientsOffset =
+                    (mConnection->mClientsOffset + 1) % mConnection->mClients.size();
         }
 
-        // USE SERVING SOCKET (to start serving or for nested transaction)
+        // USE SERVING SOCKET (for nested transaction)
         //
         // asynchronous calls cannot be nested
         if (use != SocketUse::CLIENT_ASYNC) {
-            // servers should start serving on an available thread only
-            // otherwise, this should only be a nested call
-            bool useAvailable = use == SocketUse::SERVER;
-
-            findSocket(tid, &exclusive, (useAvailable ? &available : nullptr),
-                       mConnection->mServers, 0 /* index hint */);
+            // server sockets are always assigned to a thread
+            findSocket(tid, &exclusive, nullptr /*available*/, mConnection->mServers,
+                       0 /* index hint */);
         }
 
         // if our thread is already using a connection, prioritize using that
@@ -379,8 +388,6 @@
             break;
         }
 
-        LOG_ALWAYS_FATAL_IF(use == SocketUse::SERVER, "Must create connection to join one.");
-
         // in regular binder, this would usually be a deadlock :)
         LOG_ALWAYS_FATAL_IF(mConnection->mClients.size() == 0,
                             "Not a client of any connection. You must create a connection to an "