libbinder: RPC know when connections setup

Previously, there was a race where:
a. client creates connection to server
b. client sends request for reverse connection to server (but this
  may still be traveling on the wire)
c. client sends transaction to server
d. server tries to make a callback
e. server fails to make callback because no reverse connection is setup

Now, when a new connection is setup, a header on this connection is
setup. So, we can wait on this header to be received in (b).

Note: currently, (e) results in an abort, this is tracked in b/167966510
with a TODO in the ExclusiveConnection code. This would make a less
obvious flake (or perhaps the problem would be ignored), but this race
still needs to be fixed for well-behaved clients to be able to function
reliably.

Fixes: 190639665
Test: binderRpcTest (callback test 10,000s of times)
Change-Id: I13bc912692d63ea73d46c5441fa7d51121df2f58
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index de9aa22..2a230d2 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -236,7 +236,7 @@
     }
 }
 
-void RpcSession::preJoin(std::thread thread) {
+void RpcSession::preJoinThreadOwnership(std::thread thread) {
     LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");
 
     {
@@ -245,20 +245,36 @@
     }
 }
 
-void RpcSession::join(sp<RpcSession>&& session, unique_fd client) {
+RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
     // must be registered to allow arbitrary client code executing commands to
     // be able to do nested calls (we can't only read from it)
-    sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client));
+    sp<RpcConnection> connection = assignServerToThisThread(std::move(fd));
 
-    while (true) {
-        status_t error = session->state()->getAndExecuteCommand(connection->fd, session,
-                                                                RpcState::CommandType::ANY);
+    status_t status =
+            mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
 
-        if (error != OK) {
-            LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
-                           statusToString(error).c_str());
-            break;
+    return PreJoinSetupResult{
+            .connection = std::move(connection),
+            .status = status,
+    };
+}
+
+void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult) {
+    sp<RpcConnection>& connection = setupResult.connection;
+
+    if (setupResult.status == OK) {
+        while (true) {
+            status_t status = session->state()->getAndExecuteCommand(connection->fd, session,
+                                                                     RpcState::CommandType::ANY);
+            if (status != OK) {
+                LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
+                               statusToString(status).c_str());
+                break;
+            }
         }
+    } else {
+        ALOGE("Connection failed to init, closing with status %s",
+              statusToString(setupResult.status).c_str());
     }
 
     LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection),
@@ -381,14 +397,17 @@
                 unique_fd fd = std::move(serverFd);
                 // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
                 sp<RpcSession> session = thiz;
-                session->preJoin(std::move(thread));
-                ownershipTransferred = true;
-                joinCv.notify_one();
+                session->preJoinThreadOwnership(std::move(thread));
 
+                // only continue once we have a response or the connection fails
+                auto setupResult = session->preJoinSetup(std::move(fd));
+
+                ownershipTransferred = true;
                 threadLock.unlock();
+                joinCv.notify_one();
                 // do not use & vars below
 
-                RpcSession::join(std::move(session), std::move(fd));
+                RpcSession::join(std::move(session), std::move(setupResult));
             });
             joinCv.wait(lock, [&] { return ownershipTransferred; });
             LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
@@ -403,20 +422,32 @@
 }
 
 bool RpcSession::addClientConnection(unique_fd fd) {
-    std::lock_guard<std::mutex> _l(mMutex);
+    sp<RpcConnection> connection = sp<RpcConnection>::make();
+    {
+        std::lock_guard<std::mutex> _l(mMutex);
 
-    // first client connection added, but setForServer not called, so
-    // initializaing for a client.
-    if (mShutdownTrigger == nullptr) {
-        mShutdownTrigger = FdTrigger::make();
-        mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
-        if (mShutdownTrigger == nullptr) return false;
+        // first client connection added, but setForServer not called, so
+        // initializaing for a client.
+        if (mShutdownTrigger == nullptr) {
+            mShutdownTrigger = FdTrigger::make();
+            mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
+            if (mShutdownTrigger == nullptr) return false;
+        }
+
+        connection->fd = std::move(fd);
+        connection->exclusiveTid = gettid();
+        mClientConnections.push_back(connection);
     }
 
-    sp<RpcConnection> session = sp<RpcConnection>::make();
-    session->fd = std::move(fd);
-    mClientConnections.push_back(session);
-    return true;
+    status_t status =
+            mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
+
+    {
+        std::lock_guard<std::mutex> _l(mMutex);
+        connection->exclusiveTid = std::nullopt;
+    }
+
+    return status == OK;
 }
 
 bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
@@ -519,7 +550,9 @@
         // in regular binder, this would usually be a deadlock :)
         LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0,
                             "Session has no client connections. This is required for an RPC server "
-                            "to make any non-nested (e.g. oneway or on another thread) calls.");
+                            "to make any non-nested (e.g. oneway or on another thread) calls. "
+                            "Use: %d. Server connections: %zu",
+                            static_cast<int>(use), mSession->mServerConnections.size());
 
         LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...",
                        mSession->mClientConnections.size(), mSession->mServerConnections.size());