binder: Use RpcTransport

- after accept() / connect(), call sslAccept() /
  sslConnect(), respectively.
- replace ::send() / ::recv() with RpcTransport::
  send() / recv() / peek() accordingly.

Also refacator binderRpcTest to prepare for TLS implementation.

Test: TH
Test: binderRpcTest
Bug: 190868302

Change-Id: I809345c59a467cd219ebcec7a9db3a3b7776a601
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 62ea187..6ab25e0 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -26,6 +26,7 @@
 #include <android-base/scopeguard.h>
 #include <binder/Parcel.h>
 #include <binder/RpcServer.h>
+#include <binder/RpcTransportRaw.h>
 #include <log/log.h>
 
 #include "RpcSocketAddress.h"
@@ -37,13 +38,17 @@
 using base::ScopeGuard;
 using base::unique_fd;
 
-RpcServer::RpcServer() {}
+RpcServer::RpcServer(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory)
+      : mRpcTransportCtxFactory(std::move(rpcTransportCtxFactory)) {}
 RpcServer::~RpcServer() {
     (void)shutdown();
 }
 
-sp<RpcServer> RpcServer::make() {
-    return sp<RpcServer>::make();
+sp<RpcServer> RpcServer::make(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory) {
+    // Default is without TLS.
+    if (rpcTransportCtxFactory == nullptr)
+        rpcTransportCtxFactory = RpcTransportCtxFactoryRaw::make();
+    return sp<RpcServer>::make(std::move(rpcTransportCtxFactory));
 }
 
 void RpcServer::iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction() {
@@ -154,6 +159,10 @@
         mJoinThreadRunning = true;
         mShutdownTrigger = RpcSession::FdTrigger::make();
         LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");
+
+        mCtx = mRpcTransportCtxFactory->newServerCtx();
+        LOG_ALWAYS_FATAL_IF(mCtx == nullptr, "Unable to create RpcTransportCtx with %s sockets",
+                            mRpcTransportCtxFactory->toCString());
     }
 
     status_t status;
@@ -220,6 +229,7 @@
     LOG_RPC_DETAIL("Finished waiting on shutdown.");
 
     mShutdownTrigger = nullptr;
+    mCtx = nullptr;
     return true;
 }
 
@@ -245,14 +255,29 @@
     // mShutdownTrigger can only be cleared once connection threads have joined.
     // It must be set before this thread is started
     LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr);
+    LOG_ALWAYS_FATAL_IF(server->mCtx == nullptr);
+
+    status_t status = OK;
+
+    int clientFdForLog = clientFd.get();
+    auto client = server->mCtx->newTransport(std::move(clientFd));
+    if (client == nullptr) {
+        ALOGE("Dropping accept4()-ed socket because sslAccept fails");
+        status = DEAD_OBJECT;
+        // still need to cleanup before we can return
+    } else {
+        LOG_RPC_DETAIL("Created RpcTransport %p for client fd %d", client.get(), clientFdForLog);
+    }
 
     RpcConnectionHeader header;
-    status_t status = server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &header,
-                                                                       sizeof(header));
-    if (status != OK) {
-        ALOGE("Failed to read ID for client connecting to RPC server: %s",
-              statusToString(status).c_str());
-        // still need to cleanup before we can return
+    if (status == OK) {
+        status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header,
+                                                                  sizeof(header));
+        if (status != OK) {
+            ALOGE("Failed to read ID for client connecting to RPC server: %s",
+                  statusToString(status).c_str());
+            // still need to cleanup before we can return
+        }
     }
 
     bool incoming = false;
@@ -272,7 +297,7 @@
                     .version = protocolVersion,
             };
 
-            status = server->mShutdownTrigger->interruptableWriteFully(clientFd.get(), &response,
+            status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response,
                                                                        sizeof(response));
             if (status != OK) {
                 ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
@@ -342,7 +367,7 @@
         }
 
         if (incoming) {
-            LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(clientFd), true),
+            LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(client), true),
                                 "server state must already be initialized");
             return;
         }
@@ -351,7 +376,7 @@
         session->preJoinThreadOwnership(std::move(thisThread));
     }
 
-    auto setupResult = session->preJoinSetup(std::move(clientFd));
+    auto setupResult = session->preJoinSetup(std::move(client));
 
     // avoid strong cycle
     server = nullptr;