binder: Use RpcTransport

- after accept() / connect(), call sslAccept() /
  sslConnect(), respectively.
- replace ::send() / ::recv() with RpcTransport::
  send() / recv() / peek() accordingly.

Also refacator binderRpcTest to prepare for TLS implementation.

Test: TH
Test: binderRpcTest
Bug: 190868302

Change-Id: I809345c59a467cd219ebcec7a9db3a3b7776a601
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 36c03c5..6563bc8 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -273,7 +273,7 @@
 status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
                            const sp<RpcSession>& session, const char* what, const void* data,
                            size_t size) {
-    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, connection->fd.get(),
+    LOG_RPC_DETAIL("Sending %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
                    android::base::HexString(data, size).c_str());
 
     if (size > std::numeric_limits<ssize_t>::max()) {
@@ -282,11 +282,12 @@
         return BAD_VALUE;
     }
 
-    if (status_t status = session->mShutdownTrigger->interruptableWriteFully(connection->fd.get(),
-                                                                             data, size);
+    if (status_t status =
+                session->mShutdownTrigger->interruptableWriteFully(connection->rpcTransport.get(),
+                                                                   data, size);
         status != OK) {
-        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on fd %d, error: %s", what, size,
-                       connection->fd.get(), statusToString(status).c_str());
+        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
+                       connection->rpcTransport.get(), statusToString(status).c_str());
         (void)session->shutdownAndWait(false);
         return status;
     }
@@ -304,14 +305,15 @@
     }
 
     if (status_t status =
-                session->mShutdownTrigger->interruptableReadFully(connection->fd.get(), data, size);
+                session->mShutdownTrigger->interruptableReadFully(connection->rpcTransport.get(),
+                                                                  data, size);
         status != OK) {
-        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size,
-                       connection->fd.get(), statusToString(status).c_str());
+        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
+                       connection->rpcTransport.get(), statusToString(status).c_str());
         return status;
     }
 
-    LOG_RPC_DETAIL("Received %s on fd %d: %s", what, connection->fd.get(),
+    LOG_RPC_DETAIL("Received %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
                    android::base::HexString(data, size).c_str());
     return OK;
 }
@@ -490,7 +492,8 @@
         return status;
 
     if (flags & IBinder::FLAG_ONEWAY) {
-        LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", connection->fd.get());
+        LOG_RPC_DETAIL("Oneway command, so no longer waiting on RpcTransport %p",
+                       connection->rpcTransport.get());
 
         // Do not wait on result.
         // However, too many oneway calls may cause refcounts to build up and fill up the socket,
@@ -585,7 +588,7 @@
 
 status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection,
                                         const sp<RpcSession>& session, CommandType type) {
-    LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", connection->fd.get());
+    LOG_RPC_DETAIL("getAndExecuteCommand on RpcTransport %p", connection->rpcTransport.get());
 
     RpcWireHeader command;
     if (status_t status = rpcRec(connection, session, "command header", &command, sizeof(command));
@@ -598,8 +601,7 @@
 status_t RpcState::drainCommands(const sp<RpcSession::RpcConnection>& connection,
                                  const sp<RpcSession>& session, CommandType type) {
     uint8_t buf;
-    while (0 < TEMP_FAILURE_RETRY(
-                       recv(connection->fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
+    while (connection->rpcTransport->peek(&buf, sizeof(buf)).value_or(-1) > 0) {
         status_t status = getAndExecuteCommand(connection, session, type);
         if (status != OK) return status;
     }