libbinder: reverse connections

When connecting to an RPC client server, you can request to serve a
threadpool so that you can receive callbacks from it.

Future considerations:
- starting threads dynamically (likely very, very soon after this CL)
- combining threadpools (as needed)

Bug: 185167543
Test: binderRpcTest
Change-Id: I992959e963ebc1b3da2f89fdb6c1ec625cb51af4
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 77cae83..b146bb0 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -239,15 +239,16 @@
     // It must be set before this thread is started
     LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr);
 
-    int32_t id;
-    status_t status =
-            server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &id, sizeof(id));
+    RpcConnectionHeader header;
+    status_t status = server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &header,
+                                                                       sizeof(header));
     bool idValid = status == OK;
     if (!idValid) {
         ALOGE("Failed to read ID for client connecting to RPC server: %s",
               statusToString(status).c_str());
         // still need to cleanup before we can return
     }
+    bool reverse = header.options & RPC_CONNECTION_OPTION_REVERSE;
 
     std::thread thisThread;
     sp<RpcSession> session;
@@ -269,24 +270,37 @@
             return;
         }
 
-        if (id == RPC_SESSION_ID_NEW) {
+        if (header.sessionId == RPC_SESSION_ID_NEW) {
+            if (reverse) {
+                ALOGE("Cannot create a new session with a reverse connection, would leak");
+                return;
+            }
+
             LOG_ALWAYS_FATAL_IF(server->mSessionIdCounter >= INT32_MAX, "Out of session IDs");
             server->mSessionIdCounter++;
 
             session = RpcSession::make();
-            session->setForServer(wp<RpcServer>(server), server->mSessionIdCounter,
-                                  server->mShutdownTrigger);
+            session->setForServer(server,
+                                  sp<RpcServer::EventListener>::fromExisting(
+                                          static_cast<RpcServer::EventListener*>(server.get())),
+                                  server->mSessionIdCounter, server->mShutdownTrigger);
 
             server->mSessions[server->mSessionIdCounter] = session;
         } else {
-            auto it = server->mSessions.find(id);
+            auto it = server->mSessions.find(header.sessionId);
             if (it == server->mSessions.end()) {
-                ALOGE("Cannot add thread, no record of session with ID %d", id);
+                ALOGE("Cannot add thread, no record of session with ID %d", header.sessionId);
                 return;
             }
             session = it->second;
         }
 
+        if (reverse) {
+            LOG_ALWAYS_FATAL_IF(!session->addClientConnection(std::move(clientFd)),
+                                "server state must already be initialized");
+            return;
+        }
+
         detachGuard.Disable();
         session->preJoin(std::move(thisThread));
     }
@@ -294,7 +308,7 @@
     // avoid strong cycle
     server = nullptr;
 
-    session->join(std::move(clientFd));
+    RpcSession::join(std::move(session), std::move(clientFd));
 }
 
 bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
@@ -341,8 +355,7 @@
     (void)mSessions.erase(it);
 }
 
-void RpcServer::onSessionServerThreadEnded(const sp<RpcSession>& session) {
-    (void)session;
+void RpcServer::onSessionServerThreadEnded() {
     mShutdownCv.notify_all();
 }