Merge "binderRpcBenchmark: test more data sizes"
diff --git a/libs/binder/Android.bp b/libs/binder/Android.bp
index 2027b6e..f34672c 100644
--- a/libs/binder/Android.bp
+++ b/libs/binder/Android.bp
@@ -123,6 +123,7 @@
         "RpcSession.cpp",
         "RpcServer.cpp",
         "RpcState.cpp",
+        "RpcTransportRaw.cpp",
         "Static.cpp",
         "Stability.cpp",
         "Status.cpp",
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 68c12f7..879e462 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -26,6 +26,7 @@
 #include <android-base/scopeguard.h>
 #include <binder/Parcel.h>
 #include <binder/RpcServer.h>
+#include <binder/RpcTransportRaw.h>
 #include <log/log.h>
 
 #include "RpcSocketAddress.h"
@@ -37,13 +38,17 @@
 using base::ScopeGuard;
 using base::unique_fd;
 
-RpcServer::RpcServer() {}
+RpcServer::RpcServer(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory)
+      : mRpcTransportCtxFactory(std::move(rpcTransportCtxFactory)) {}
 RpcServer::~RpcServer() {
     (void)shutdown();
 }
 
-sp<RpcServer> RpcServer::make() {
-    return sp<RpcServer>::make();
+sp<RpcServer> RpcServer::make(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory) {
+    // Default is without TLS.
+    if (rpcTransportCtxFactory == nullptr)
+        rpcTransportCtxFactory = RpcTransportCtxFactoryRaw::make();
+    return sp<RpcServer>::make(std::move(rpcTransportCtxFactory));
 }
 
 void RpcServer::iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction() {
@@ -153,6 +158,10 @@
         mJoinThreadRunning = true;
         mShutdownTrigger = RpcSession::FdTrigger::make();
         LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");
+
+        mCtx = mRpcTransportCtxFactory->newServerCtx();
+        LOG_ALWAYS_FATAL_IF(mCtx == nullptr, "Unable to create RpcTransportCtx with %s sockets",
+                            mRpcTransportCtxFactory->toCString());
     }
 
     status_t status;
@@ -219,6 +228,7 @@
     LOG_RPC_DETAIL("Finished waiting on shutdown.");
 
     mShutdownTrigger = nullptr;
+    mCtx = nullptr;
     return true;
 }
 
@@ -244,14 +254,29 @@
     // mShutdownTrigger can only be cleared once connection threads have joined.
     // It must be set before this thread is started
     LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr);
+    LOG_ALWAYS_FATAL_IF(server->mCtx == nullptr);
+
+    status_t status = OK;
+
+    int clientFdForLog = clientFd.get();
+    auto client = server->mCtx->newTransport(std::move(clientFd));
+    if (client == nullptr) {
+        ALOGE("Dropping accept4()-ed socket because sslAccept fails");
+        status = DEAD_OBJECT;
+        // still need to cleanup before we can return
+    } else {
+        LOG_RPC_DETAIL("Created RpcTransport %p for client fd %d", client.get(), clientFdForLog);
+    }
 
     RpcConnectionHeader header;
-    status_t status = server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &header,
-                                                                       sizeof(header));
-    if (status != OK) {
-        ALOGE("Failed to read ID for client connecting to RPC server: %s",
-              statusToString(status).c_str());
-        // still need to cleanup before we can return
+    if (status == OK) {
+        status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header,
+                                                                  sizeof(header));
+        if (status != OK) {
+            ALOGE("Failed to read ID for client connecting to RPC server: %s",
+                  statusToString(status).c_str());
+            // still need to cleanup before we can return
+        }
     }
 
     bool incoming = false;
@@ -271,7 +296,7 @@
                     .version = protocolVersion,
             };
 
-            status = server->mShutdownTrigger->interruptableWriteFully(clientFd.get(), &response,
+            status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response,
                                                                        sizeof(response));
             if (status != OK) {
                 ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
@@ -341,7 +366,7 @@
         }
 
         if (incoming) {
-            LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(clientFd), true),
+            LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(client), true),
                                 "server state must already be initialized");
             return;
         }
@@ -350,7 +375,7 @@
         session->preJoinThreadOwnership(std::move(thisThread));
     }
 
-    auto setupResult = session->preJoinSetup(std::move(clientFd));
+    auto setupResult = session->preJoinSetup(std::move(client));
 
     // avoid strong cycle
     server = nullptr;
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index 66f31b8..7565f1b 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -30,6 +30,7 @@
 #include <android_runtime/vm.h>
 #include <binder/Parcel.h>
 #include <binder/RpcServer.h>
+#include <binder/RpcTransportRaw.h>
 #include <binder/Stability.h>
 #include <jni.h>
 #include <utils/String8.h>
@@ -46,7 +47,8 @@
 
 using base::unique_fd;
 
-RpcSession::RpcSession() {
+RpcSession::RpcSession(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory)
+      : mRpcTransportCtxFactory(std::move(rpcTransportCtxFactory)) {
     LOG_RPC_DETAIL("RpcSession created %p", this);
 
     mState = std::make_unique<RpcState>();
@@ -59,8 +61,11 @@
                         "Should not be able to destroy a session with servers in use.");
 }
 
-sp<RpcSession> RpcSession::make() {
-    return sp<RpcSession>::make();
+sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory) {
+    // Default is without TLS.
+    if (rpcTransportCtxFactory == nullptr)
+        rpcTransportCtxFactory = RpcTransportCtxFactoryRaw::make();
+    return sp<RpcSession>::make(std::move(rpcTransportCtxFactory));
 }
 
 void RpcSession::setMaxThreads(size_t threads) {
@@ -121,7 +126,19 @@
     return false;
 }
 
+bool RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_fd()>&& request) {
+    return setupClient([&](const RpcAddress& sessionId, bool incoming) {
+        // std::move'd from fd becomes -1 (!ok())
+        if (!fd.ok()) {
+            fd = request();
+            if (!fd.ok()) return false;
+        }
+        return initAndAddConnection(std::move(fd), sessionId, incoming);
+    });
+}
+
 bool RpcSession::addNullDebuggingClient() {
+    // Note: only works on raw sockets.
     unique_fd serverFd(TEMP_FAILURE_RETRY(open("/dev/null", O_WRONLY | O_CLOEXEC)));
 
     if (serverFd == -1) {
@@ -129,7 +146,17 @@
         return false;
     }
 
-    return addOutgoingConnection(std::move(serverFd), false);
+    auto ctx = mRpcTransportCtxFactory->newClientCtx();
+    if (ctx == nullptr) {
+        ALOGE("Unable to create RpcTransportCtx for null debugging client");
+        return false;
+    }
+    auto server = ctx->newTransport(std::move(serverFd));
+    if (server == nullptr) {
+        ALOGE("Unable to set up RpcTransport");
+        return false;
+    }
+    return addOutgoingConnection(std::move(server), false);
 }
 
 sp<IBinder> RpcSession::getRootObject() {
@@ -205,6 +232,10 @@
     return mWrite == -1;
 }
 
+status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) {
+    return triggerablePoll(rpcTransport->pollSocket(), event);
+}
+
 status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
     while (true) {
         pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
@@ -223,28 +254,30 @@
     }
 }
 
-status_t RpcSession::FdTrigger::interruptableWriteFully(base::borrowed_fd fd, const void* data,
-                                                        size_t size) {
+status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport,
+                                                        const void* data, size_t size) {
     const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
     const uint8_t* end = buffer + size;
 
     MAYBE_WAIT_IN_FLAKE_MODE;
 
     status_t status;
-    while ((status = triggerablePoll(fd, POLLOUT)) == OK) {
-        ssize_t writeSize = TEMP_FAILURE_RETRY(send(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
-        if (writeSize == 0) return DEAD_OBJECT;
-
-        if (writeSize < 0) {
-            return -errno;
+    while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) {
+        auto writeSize = rpcTransport->send(buffer, end - buffer);
+        if (!writeSize.ok()) {
+            LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
+            return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
         }
-        buffer += writeSize;
+
+        if (*writeSize == 0) return DEAD_OBJECT;
+
+        buffer += *writeSize;
         if (buffer == end) return OK;
     }
     return status;
 }
 
-status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, void* data,
+status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data,
                                                        size_t size) {
     uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
     uint8_t* end = buffer + size;
@@ -252,14 +285,16 @@
     MAYBE_WAIT_IN_FLAKE_MODE;
 
     status_t status;
-    while ((status = triggerablePoll(fd, POLLIN)) == OK) {
-        ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
-        if (readSize == 0) return DEAD_OBJECT; // EOF
-
-        if (readSize < 0) {
-            return -errno;
+    while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) {
+        auto readSize = rpcTransport->recv(buffer, end - buffer);
+        if (!readSize.ok()) {
+            LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
+            return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
         }
-        buffer += readSize;
+
+        if (*readSize == 0) return DEAD_OBJECT; // EOF
+
+        buffer += *readSize;
         if (buffer == end) return OK;
     }
     return status;
@@ -312,10 +347,11 @@
     }
 }
 
-RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
+RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(
+        std::unique_ptr<RpcTransport> rpcTransport) {
     // must be registered to allow arbitrary client code executing commands to
     // be able to do nested calls (we can't only read from it)
-    sp<RpcConnection> connection = assignIncomingConnectionToThisThread(std::move(fd));
+    sp<RpcConnection> connection = assignIncomingConnectionToThisThread(std::move(rpcTransport));
 
     status_t status;
 
@@ -439,7 +475,8 @@
     return server;
 }
 
-bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
+bool RpcSession::setupClient(
+        const std::function<bool(const RpcAddress& sessionId, bool incoming)>& connectAndInit) {
     {
         std::lock_guard<std::mutex> _l(mMutex);
         LOG_ALWAYS_FATAL_IF(mOutgoingConnections.size() != 0,
@@ -447,7 +484,7 @@
                             mOutgoingConnections.size());
     }
 
-    if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*incoming*/)) return false;
+    if (!connectAndInit(RpcAddress::zero(), false /*incoming*/)) return false;
 
     {
         ExclusiveConnection connection;
@@ -466,37 +503,42 @@
     // TODO(b/186470974): first risk of blocking
     size_t numThreadsAvailable;
     if (status_t status = getRemoteMaxThreads(&numThreadsAvailable); status != OK) {
-        ALOGE("Could not get max threads after initial session to %s: %s", addr.toString().c_str(),
+        ALOGE("Could not get max threads after initial session setup: %s",
               statusToString(status).c_str());
         return false;
     }
 
     if (status_t status = readId(); status != OK) {
-        ALOGE("Could not get session id after initial session to %s; %s", addr.toString().c_str(),
+        ALOGE("Could not get session id after initial session setup: %s",
               statusToString(status).c_str());
         return false;
     }
 
-    // we've already setup one client
-    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
-        // TODO(b/189955605): shutdown existing connections?
-        if (!setupOneSocketConnection(addr, mId.value(), false /*incoming*/)) return false;
-    }
-
     // TODO(b/189955605): we should add additional sessions dynamically
     // instead of all at once - the other side should be responsible for setting
     // up additional connections. We need to create at least one (unless 0 are
     // requested to be set) in order to allow the other side to reliably make
     // any requests at all.
 
+    // we've already setup one client
+    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
+        if (!connectAndInit(mId.value(), false /*incoming*/)) return false;
+    }
+
     for (size_t i = 0; i < mMaxThreads; i++) {
-        if (!setupOneSocketConnection(addr, mId.value(), true /*incoming*/)) return false;
+        if (!connectAndInit(mId.value(), true /*incoming*/)) return false;
     }
 
     return true;
 }
 
-bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id,
+bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
+    return setupClient([&](const RpcAddress& sessionId, bool incoming) {
+        return setupOneSocketConnection(addr, sessionId, incoming);
+    });
+}
+
+bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& sessionId,
                                           bool incoming) {
     for (size_t tries = 0; tries < 5; tries++) {
         if (tries > 0) usleep(10000);
@@ -520,36 +562,60 @@
                   strerror(savedErrno));
             return false;
         }
-
-        RpcConnectionHeader header{
-                .version = mProtocolVersion.value_or(RPC_WIRE_PROTOCOL_VERSION),
-                .options = 0,
-        };
-        memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress));
-
-        if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
-
-        if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) {
-            int savedErrno = errno;
-            ALOGE("Could not write connection header to socket at %s: %s", addr.toString().c_str(),
-                  strerror(savedErrno));
-            return false;
-        }
-
         LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());
 
-        if (incoming) {
-            return addIncomingConnection(std::move(serverFd));
-        } else {
-            return addOutgoingConnection(std::move(serverFd), true);
-        }
+        return initAndAddConnection(std::move(serverFd), sessionId, incoming);
     }
 
     ALOGE("Ran out of retries to connect to %s", addr.toString().c_str());
     return false;
 }
 
-bool RpcSession::addIncomingConnection(unique_fd fd) {
+bool RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId, bool incoming) {
+    auto ctx = mRpcTransportCtxFactory->newClientCtx();
+    if (ctx == nullptr) {
+        ALOGE("Unable to create client RpcTransportCtx with %s sockets",
+              mRpcTransportCtxFactory->toCString());
+        return false;
+    }
+    auto server = ctx->newTransport(std::move(fd));
+    if (server == nullptr) {
+        ALOGE("Unable to set up RpcTransport in %s context", mRpcTransportCtxFactory->toCString());
+        return false;
+    }
+
+    LOG_RPC_DETAIL("Socket at client with RpcTransport %p", server.get());
+
+    RpcConnectionHeader header{
+            .version = mProtocolVersion.value_or(RPC_WIRE_PROTOCOL_VERSION),
+            .options = 0,
+    };
+    memcpy(&header.sessionId, &sessionId.viewRawEmbedded(), sizeof(RpcWireAddress));
+
+    if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
+
+    auto sentHeader = server->send(&header, sizeof(header));
+    if (!sentHeader.ok()) {
+        ALOGE("Could not write connection header to socket: %s",
+              sentHeader.error().message().c_str());
+        return false;
+    }
+    if (*sentHeader != sizeof(header)) {
+        ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd",
+              *sentHeader, sizeof(header));
+        return false;
+    }
+
+    LOG_RPC_DETAIL("Socket at client: header sent");
+
+    if (incoming) {
+        return addIncomingConnection(std::move(server));
+    } else {
+        return addOutgoingConnection(std::move(server), true /*init*/);
+    }
+}
+
+bool RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) {
     std::mutex mutex;
     std::condition_variable joinCv;
     std::unique_lock<std::mutex> lock(mutex);
@@ -558,13 +624,13 @@
     bool ownershipTransferred = false;
     thread = std::thread([&]() {
         std::unique_lock<std::mutex> threadLock(mutex);
-        unique_fd movedFd = std::move(fd);
+        std::unique_ptr<RpcTransport> movedRpcTransport = std::move(rpcTransport);
         // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
         sp<RpcSession> session = thiz;
         session->preJoinThreadOwnership(std::move(thread));
 
         // only continue once we have a response or the connection fails
-        auto setupResult = session->preJoinSetup(std::move(movedFd));
+        auto setupResult = session->preJoinSetup(std::move(movedRpcTransport));
 
         ownershipTransferred = true;
         threadLock.unlock();
@@ -578,7 +644,7 @@
     return true;
 }
 
-bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) {
+bool RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) {
     sp<RpcConnection> connection = sp<RpcConnection>::make();
     {
         std::lock_guard<std::mutex> _l(mMutex);
@@ -591,7 +657,7 @@
             if (mShutdownTrigger == nullptr) return false;
         }
 
-        connection->fd = std::move(fd);
+        connection->rpcTransport = std::move(rpcTransport);
         connection->exclusiveTid = gettid();
         mOutgoingConnections.push_back(connection);
     }
@@ -626,7 +692,8 @@
     return true;
 }
 
-sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(unique_fd fd) {
+sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(
+        std::unique_ptr<RpcTransport> rpcTransport) {
     std::lock_guard<std::mutex> _l(mMutex);
 
     if (mIncomingConnections.size() >= mMaxThreads) {
@@ -644,7 +711,7 @@
     }
 
     sp<RpcConnection> session = sp<RpcConnection>::make();
-    session->fd = std::move(fd);
+    session->rpcTransport = std::move(rpcTransport);
     session->exclusiveTid = gettid();
 
     mIncomingConnections.push_back(session);
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 36c03c5..23382c3 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -273,7 +273,7 @@
 status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
                            const sp<RpcSession>& session, const char* what, const void* data,
                            size_t size) {
-    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, connection->fd.get(),
+    LOG_RPC_DETAIL("Sending %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
                    android::base::HexString(data, size).c_str());
 
     if (size > std::numeric_limits<ssize_t>::max()) {
@@ -282,11 +282,12 @@
         return BAD_VALUE;
     }
 
-    if (status_t status = session->mShutdownTrigger->interruptableWriteFully(connection->fd.get(),
-                                                                             data, size);
+    if (status_t status =
+                session->mShutdownTrigger->interruptableWriteFully(connection->rpcTransport.get(),
+                                                                   data, size);
         status != OK) {
-        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on fd %d, error: %s", what, size,
-                       connection->fd.get(), statusToString(status).c_str());
+        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
+                       connection->rpcTransport.get(), statusToString(status).c_str());
         (void)session->shutdownAndWait(false);
         return status;
     }
@@ -304,14 +305,15 @@
     }
 
     if (status_t status =
-                session->mShutdownTrigger->interruptableReadFully(connection->fd.get(), data, size);
+                session->mShutdownTrigger->interruptableReadFully(connection->rpcTransport.get(),
+                                                                  data, size);
         status != OK) {
-        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size,
-                       connection->fd.get(), statusToString(status).c_str());
+        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
+                       connection->rpcTransport.get(), statusToString(status).c_str());
         return status;
     }
 
-    LOG_RPC_DETAIL("Received %s on fd %d: %s", what, connection->fd.get(),
+    LOG_RPC_DETAIL("Received %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
                    android::base::HexString(data, size).c_str());
     return OK;
 }
@@ -490,7 +492,8 @@
         return status;
 
     if (flags & IBinder::FLAG_ONEWAY) {
-        LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", connection->fd.get());
+        LOG_RPC_DETAIL("Oneway command, so no longer waiting on RpcTransport %p",
+                       connection->rpcTransport.get());
 
         // Do not wait on result.
         // However, too many oneway calls may cause refcounts to build up and fill up the socket,
@@ -585,7 +588,7 @@
 
 status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection,
                                         const sp<RpcSession>& session, CommandType type) {
-    LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", connection->fd.get());
+    LOG_RPC_DETAIL("getAndExecuteCommand on RpcTransport %p", connection->rpcTransport.get());
 
     RpcWireHeader command;
     if (status_t status = rpcRec(connection, session, "command header", &command, sizeof(command));
@@ -598,8 +601,7 @@
 status_t RpcState::drainCommands(const sp<RpcSession::RpcConnection>& connection,
                                  const sp<RpcSession>& session, CommandType type) {
     uint8_t buf;
-    while (0 < TEMP_FAILURE_RETRY(
-                       recv(connection->fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
+    while (connection->rpcTransport->peek(&buf, sizeof(buf)).value_or(0) > 0) {
         status_t status = getAndExecuteCommand(connection, session, type);
         if (status != OK) return status;
     }
diff --git a/libs/binder/RpcTransportRaw.cpp b/libs/binder/RpcTransportRaw.cpp
new file mode 100644
index 0000000..2fc1945
--- /dev/null
+++ b/libs/binder/RpcTransportRaw.cpp
@@ -0,0 +1,88 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "RpcRawTransport"
+#include <log/log.h>
+
+#include <binder/RpcTransportRaw.h>
+
+#include "RpcState.h"
+
+using android::base::ErrnoError;
+using android::base::Result;
+
+namespace android {
+
+namespace {
+
+// RpcTransport with TLS disabled.
+class RpcTransportRaw : public RpcTransport {
+public:
+    explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {}
+    Result<size_t> send(const void *buf, size_t size) override {
+        ssize_t ret = TEMP_FAILURE_RETRY(::send(mSocket.get(), buf, size, MSG_NOSIGNAL));
+        if (ret < 0) {
+            return ErrnoError() << "send()";
+        }
+        return ret;
+    }
+    Result<size_t> recv(void *buf, size_t size) override {
+        ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_NOSIGNAL));
+        if (ret < 0) {
+            return ErrnoError() << "recv()";
+        }
+        return ret;
+    }
+    Result<size_t> peek(void *buf, size_t size) override {
+        ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_PEEK | MSG_DONTWAIT));
+        if (ret < 0) {
+            return ErrnoError() << "recv(MSG_PEEK)";
+        }
+        return ret;
+    }
+    bool pending() override { return false; }
+    android::base::borrowed_fd pollSocket() const override { return mSocket; }
+
+private:
+    android::base::unique_fd mSocket;
+};
+
+// RpcTransportCtx with TLS disabled.
+class RpcTransportCtxRaw : public RpcTransportCtx {
+public:
+    std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd) const {
+        return std::make_unique<RpcTransportRaw>(std::move(fd));
+    }
+};
+} // namespace
+
+std::unique_ptr<RpcTransportCtx> RpcTransportCtxFactoryRaw::newServerCtx() const {
+    return std::make_unique<RpcTransportCtxRaw>();
+}
+
+std::unique_ptr<RpcTransportCtx> RpcTransportCtxFactoryRaw::newClientCtx() const {
+    return std::make_unique<RpcTransportCtxRaw>();
+}
+
+const char *RpcTransportCtxFactoryRaw::toCString() const {
+    return "raw";
+}
+
+std::unique_ptr<RpcTransportCtxFactory> RpcTransportCtxFactoryRaw::make() {
+    return std::unique_ptr<RpcTransportCtxFactoryRaw>(new RpcTransportCtxFactoryRaw());
+}
+
+} // namespace android
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index a1fba8a..4abf3b9 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -19,6 +19,7 @@
 #include <binder/IBinder.h>
 #include <binder/RpcAddress.h>
 #include <binder/RpcSession.h>
+#include <binder/RpcTransport.h>
 #include <utils/Errors.h>
 #include <utils/RefBase.h>
 
@@ -47,7 +48,8 @@
  */
 class RpcServer final : public virtual RefBase, private RpcSession::EventListener {
 public:
-    static sp<RpcServer> make();
+    static sp<RpcServer> make(
+            std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory = nullptr);
 
     /**
      * This represents a session for responses, e.g.:
@@ -167,7 +169,7 @@
 
 private:
     friend sp<RpcServer>;
-    RpcServer();
+    explicit RpcServer(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory);
 
     void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) override;
     void onSessionIncomingThreadEnded() override;
@@ -175,6 +177,7 @@
     static void establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd);
     bool setupSocketServer(const RpcSocketAddress& address);
 
+    const std::unique_ptr<RpcTransportCtxFactory> mRpcTransportCtxFactory;
     bool mAgreedExperimental = false;
     size_t mMaxThreads = 1;
     std::optional<uint32_t> mProtocolVersion;
@@ -189,6 +192,7 @@
     std::map<RpcAddress, sp<RpcSession>> mSessions;
     std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
     std::condition_variable mShutdownCv;
+    std::unique_ptr<RpcTransportCtx> mCtx;
 };
 
 } // namespace android
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 1f7c029..0b46606 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -18,6 +18,8 @@
 #include <android-base/unique_fd.h>
 #include <binder/IBinder.h>
 #include <binder/RpcAddress.h>
+#include <binder/RpcSession.h>
+#include <binder/RpcTransport.h>
 #include <utils/Errors.h>
 #include <utils/RefBase.h>
 
@@ -36,6 +38,7 @@
 class RpcServer;
 class RpcSocketAddress;
 class RpcState;
+class RpcTransport;
 
 constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_NEXT = 0;
 constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL = 0xF0000000;
@@ -48,7 +51,8 @@
  */
 class RpcSession final : public virtual RefBase {
 public:
-    static sp<RpcSession> make();
+    static sp<RpcSession> make(
+            std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory = nullptr);
 
     /**
      * Set the maximum number of threads allowed to be made (for things like callbacks).
@@ -87,6 +91,19 @@
     [[nodiscard]] bool setupInetClient(const char* addr, unsigned int port);
 
     /**
+     * Starts talking to an RPC server which has already been connected to. This
+     * is expected to be used when another process has permission to connect to
+     * a binder RPC service, but this process only has permission to talk to
+     * that service.
+     *
+     * For convenience, if 'fd' is -1, 'request' will be called.
+     *
+     * For future compatibility, 'request' should not reference any stack data.
+     */
+    [[nodiscard]] bool setupPreconnectedClient(base::unique_fd fd,
+                                               std::function<base::unique_fd()>&& request);
+
+    /**
      * For debugging!
      *
      * Sets up an empty connection. All queries to this connection which require a
@@ -142,7 +159,7 @@
     friend sp<RpcSession>;
     friend RpcServer;
     friend RpcState;
-    RpcSession();
+    explicit RpcSession(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory);
 
     /** This is not a pipe. */
     struct FdTrigger {
@@ -178,10 +195,12 @@
          *   true - succeeded in completely processing 'size'
          *   false - interrupted (failure or trigger)
          */
-        status_t interruptableReadFully(base::borrowed_fd fd, void* data, size_t size);
-        status_t interruptableWriteFully(base::borrowed_fd fd, const void* data, size_t size);
+        status_t interruptableReadFully(RpcTransport* rpcTransport, void* data, size_t size);
+        status_t interruptableWriteFully(RpcTransport* rpcTransport, const void* data, size_t size);
 
     private:
+        status_t triggerablePoll(RpcTransport* rpcTransport, int16_t event);
+
         base::unique_fd mWrite;
         base::unique_fd mRead;
     };
@@ -204,7 +223,7 @@
     };
 
     struct RpcConnection : public RefBase {
-        base::unique_fd fd;
+        std::unique_ptr<RpcTransport> rpcTransport;
 
         // whether this or another thread is currently using this fd to make
         // or receive transactions.
@@ -230,19 +249,24 @@
         // Status of setup
         status_t status;
     };
-    PreJoinSetupResult preJoinSetup(base::unique_fd fd);
+    PreJoinSetupResult preJoinSetup(std::unique_ptr<RpcTransport> rpcTransport);
     // join on thread passed to preJoinThreadOwnership
     static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result);
 
+    [[nodiscard]] bool setupClient(
+            const std::function<bool(const RpcAddress& sessionId, bool incoming)>& connectAndInit);
     [[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address);
     [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address,
-                                                const RpcAddress& sessionId, bool server);
-    [[nodiscard]] bool addIncomingConnection(base::unique_fd fd);
-    [[nodiscard]] bool addOutgoingConnection(base::unique_fd fd, bool init);
+                                                const RpcAddress& sessionId, bool incoming);
+    [[nodiscard]] bool initAndAddConnection(base::unique_fd fd, const RpcAddress& sessionId,
+                                            bool incoming);
+    [[nodiscard]] bool addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport);
+    [[nodiscard]] bool addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init);
     [[nodiscard]] bool setForServer(const wp<RpcServer>& server,
                                     const wp<RpcSession::EventListener>& eventListener,
                                     const RpcAddress& sessionId);
-    sp<RpcConnection> assignIncomingConnectionToThisThread(base::unique_fd fd);
+    sp<RpcConnection> assignIncomingConnectionToThisThread(
+            std::unique_ptr<RpcTransport> rpcTransport);
     [[nodiscard]] bool removeIncomingConnection(const sp<RpcConnection>& connection);
 
     enum class ConnectionUse {
@@ -275,6 +299,8 @@
         bool mReentrant = false;
     };
 
+    const std::unique_ptr<RpcTransportCtxFactory> mRpcTransportCtxFactory;
+
     // On the other side of a session, for each of mOutgoingConnections here, there should
     // be one of mIncomingConnections on the other side (and vice versa).
     //
diff --git a/libs/binder/include/binder/RpcTransport.h b/libs/binder/include/binder/RpcTransport.h
new file mode 100644
index 0000000..1164600
--- /dev/null
+++ b/libs/binder/include/binder/RpcTransport.h
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Wraps the transport layer of RPC. Implementation may use plain sockets or TLS.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include <android-base/result.h>
+#include <android-base/unique_fd.h>
+
+namespace android {
+
+// Represents a socket connection.
+class RpcTransport {
+public:
+    virtual ~RpcTransport() = default;
+
+    // replacement of ::send(). errno may not be set if TLS is enabled.
+    virtual android::base::Result<size_t> send(const void *buf, size_t size) = 0;
+
+    // replacement of ::recv(). errno may not be set if TLS is enabled.
+    virtual android::base::Result<size_t> recv(void *buf, size_t size) = 0;
+
+    // replacement of ::recv(MSG_PEEK). errno may not be set if TLS is enabled.
+    //
+    // Implementation details:
+    // - For TLS, this may invoke syscalls and read data from the transport
+    // into an internal buffer in userspace. After that, pending() == true.
+    // - For raw sockets, this calls ::recv(MSG_PEEK), which leaves the data in the kernel buffer;
+    // pending() is always false.
+    virtual android::base::Result<size_t> peek(void *buf, size_t size) = 0;
+
+    // Returns true if there are data pending in a userspace buffer that RpcTransport holds.
+    //
+    // Implementation details:
+    // - For TLS, this does not invoke any syscalls or read any data from the
+    // transport. This only returns whether there are data pending in the internal buffer in
+    // userspace.
+    // - For raw sockets, this always returns false.
+    virtual bool pending() = 0;
+
+    // Returns fd for polling.
+    //
+    // Do not directly read / write on this raw fd!
+    [[nodiscard]] virtual android::base::borrowed_fd pollSocket() const = 0;
+
+protected:
+    RpcTransport() = default;
+};
+
+// Represents the context that generates the socket connection.
+class RpcTransportCtx {
+public:
+    virtual ~RpcTransportCtx() = default;
+    [[nodiscard]] virtual std::unique_ptr<RpcTransport> newTransport(
+            android::base::unique_fd fd) const = 0;
+
+protected:
+    RpcTransportCtx() = default;
+};
+
+// A factory class that generates RpcTransportCtx.
+class RpcTransportCtxFactory {
+public:
+    virtual ~RpcTransportCtxFactory() = default;
+    // Creates server context.
+    [[nodiscard]] virtual std::unique_ptr<RpcTransportCtx> newServerCtx() const = 0;
+
+    // Creates client context.
+    [[nodiscard]] virtual std::unique_ptr<RpcTransportCtx> newClientCtx() const = 0;
+
+    // Return a short description of this transport (e.g. "raw"). For logging / debugging / testing
+    // only.
+    [[nodiscard]] virtual const char *toCString() const = 0;
+
+protected:
+    RpcTransportCtxFactory() = default;
+};
+
+} // namespace android
diff --git a/libs/binder/include/binder/RpcTransportRaw.h b/libs/binder/include/binder/RpcTransportRaw.h
new file mode 100644
index 0000000..6fb1f92
--- /dev/null
+++ b/libs/binder/include/binder/RpcTransportRaw.h
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Wraps the transport layer of RPC. Implementation uses plain sockets.
+// Note: don't use directly. You probably want newServerRpcTransportCtx / newClientRpcTransportCtx.
+
+#pragma once
+
+#include <memory>
+
+#include <binder/RpcTransport.h>
+
+namespace android {
+
+// RpcTransportCtxFactory with TLS disabled.
+class RpcTransportCtxFactoryRaw : public RpcTransportCtxFactory {
+public:
+    static std::unique_ptr<RpcTransportCtxFactory> make();
+
+    std::unique_ptr<RpcTransportCtx> newServerCtx() const override;
+    std::unique_ptr<RpcTransportCtx> newClientCtx() const override;
+    const char* toCString() const override;
+
+private:
+    RpcTransportCtxFactoryRaw() = default;
+};
+
+} // namespace android
diff --git a/libs/binder/ndk/include_cpp/android/binder_interface_utils.h b/libs/binder/ndk/include_cpp/android/binder_interface_utils.h
index 70a3906..6c44726 100644
--- a/libs/binder/ndk/include_cpp/android/binder_interface_utils.h
+++ b/libs/binder/ndk/include_cpp/android/binder_interface_utils.h
@@ -43,23 +43,31 @@
 namespace ndk {
 
 /**
+ * analog using std::shared_ptr for internally held refcount
+ *
  * ref must be called at least one time during the lifetime of this object. The recommended way to
  * construct this object is with SharedRefBase::make.
- *
- * Note - this class used to not inherit from enable_shared_from_this, so
- * std::make_shared works, but it won't be portable against old copies of this
- * class.
  */
-class SharedRefBase : public std::enable_shared_from_this<SharedRefBase> {
+class SharedRefBase {
    public:
     SharedRefBase() {}
-    virtual ~SharedRefBase() {}
+    virtual ~SharedRefBase() {
+        std::call_once(mFlagThis, [&]() {
+            __assert(__FILE__, __LINE__, "SharedRefBase: no ref created during lifetime");
+        });
+    }
 
     /**
      * A shared_ptr must be held to this object when this is called. This must be called once during
      * the lifetime of this object.
      */
-    std::shared_ptr<SharedRefBase> ref() { return shared_from_this(); }
+    std::shared_ptr<SharedRefBase> ref() {
+        std::shared_ptr<SharedRefBase> thiz = mThis.lock();
+
+        std::call_once(mFlagThis, [&]() { mThis = thiz = std::shared_ptr<SharedRefBase>(this); });
+
+        return thiz;
+    }
 
     /**
      * Convenience method for a ref (see above) which automatically casts to the desired child type.
@@ -78,13 +86,8 @@
 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
         T* t = new T(std::forward<Args>(args)...);
 #pragma clang diagnostic pop
-
-        // T may have a private (though necessarily virtual!) destructor, so we
-        // can't use refbase. For getting the first ref, we don't use ref()
-        // since the internal shared_ptr isn't guaranteed to exist yet.
-        SharedRefBase* base = static_cast<SharedRefBase*>(t);
-        std::shared_ptr<SharedRefBase> strongBase(base);
-        return std::static_pointer_cast<T>(strongBase);
+        // warning: Potential leak of memory pointed to by 't' [clang-analyzer-unix.Malloc]
+        return t->template ref<T>();  // NOLINT(clang-analyzer-unix.Malloc)
     }
 
     static void operator delete(void* p) { std::free(p); }
@@ -97,9 +100,13 @@
 #if !defined(__ANDROID_API__) || __ANDROID_API__ >= 30 || defined(__ANDROID_APEX__)
    private:
 #else
-    [[deprecated("Prefer SharedRefBase::make<T>(...) or std::make_shared<T>() if possible.")]]
+    [[deprecated("Prefer SharedRefBase::make<T>(...) if possible.")]]
 #endif
     static void* operator new(size_t s) { return std::malloc(s); }
+
+   private:
+    std::once_flag mFlagThis;
+    std::weak_ptr<SharedRefBase> mThis;
 };
 
 /**
diff --git a/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp b/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp
index 94cc086..5ad390e 100644
--- a/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp
+++ b/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp
@@ -54,15 +54,6 @@
 constexpr uint64_t kContextTestValue = 0xb4e42fb4d9a1d715;
 
 class MyBinderNdkUnitTest : public aidl::BnBinderNdkUnitTest {
-   public:
-    MyBinderNdkUnitTest() = default;
-    MyBinderNdkUnitTest(bool* deleted) : deleted(deleted) {}
-    ~MyBinderNdkUnitTest() {
-        if (deleted) {
-            *deleted = true;
-        }
-    }
-
     ndk::ScopedAStatus repeatInt(int32_t in, int32_t* out) {
         *out = in;
         return ndk::ScopedAStatus::ok();
@@ -131,7 +122,6 @@
     }
 
     uint64_t contextTestValue = kContextTestValue;
-    bool* deleted = nullptr;
 };
 
 int generatedService() {
@@ -234,27 +224,6 @@
     return true;
 }
 
-TEST(NdkBinder, MakeShared) {
-    const char* kInstance = "make_shared_test_instance";
-    bool deleted = false;
-
-    {
-        auto service = std::make_shared<MyBinderNdkUnitTest>(&deleted);
-        auto binder = service->asBinder();
-        ASSERT_EQ(EX_NONE, AServiceManager_addService(binder.get(), kInstance));
-        auto binder2 = ndk::SpAIBinder(AServiceManager_checkService(kInstance));
-        ASSERT_EQ(binder.get(), binder2.get());
-
-        // overwrite service
-        ASSERT_EQ(EX_NONE,
-                  AServiceManager_addService(
-                          std::make_shared<MyBinderNdkUnitTest>(&deleted)->asBinder().get(),
-                          kInstance));
-    }
-
-    EXPECT_TRUE(deleted);
-}
-
 TEST(NdkBinder, GetServiceThatDoesntExist) {
     sp<IFoo> foo = IFoo::getService("asdfghkl;");
     EXPECT_EQ(nullptr, foo.get());
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 44ed229..36a6804 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -29,6 +29,8 @@
 #include <binder/ProcessState.h>
 #include <binder/RpcServer.h>
 #include <binder/RpcSession.h>
+#include <binder/RpcTransport.h>
+#include <binder/RpcTransportRaw.h>
 #include <gtest/gtest.h>
 
 #include <chrono>
@@ -40,6 +42,7 @@
 #include <sys/prctl.h>
 #include <unistd.h>
 
+#include "../RpcSocketAddress.h" // for testing preconnected clients
 #include "../RpcState.h"   // for debugging
 #include "../vm_sockets.h" // for VMADDR_*
 
@@ -51,6 +54,21 @@
               RPC_WIRE_PROTOCOL_VERSION == RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL);
 const char* kLocalInetAddress = "127.0.0.1";
 
+enum class RpcSecurity { RAW };
+
+static inline std::vector<RpcSecurity> RpcSecurityValues() {
+    return {RpcSecurity::RAW};
+}
+
+static inline std::unique_ptr<RpcTransportCtxFactory> newFactory(RpcSecurity rpcSecurity) {
+    switch (rpcSecurity) {
+        case RpcSecurity::RAW:
+            return RpcTransportCtxFactoryRaw::make();
+        default:
+            LOG_ALWAYS_FATAL("Unknown RpcSecurity %d", rpcSecurity);
+    }
+}
+
 TEST(BinderRpcParcel, EntireParcelFormatted) {
     Parcel p;
     p.writeInt32(3);
@@ -58,10 +76,17 @@
     EXPECT_DEATH(p.markForBinder(sp<BBinder>::make()), "");
 }
 
-TEST(BinderRpc, SetExternalServer) {
+class BinderRpcSimple : public ::testing::TestWithParam<RpcSecurity> {
+public:
+    static std::string PrintTestParam(const ::testing::TestParamInfo<ParamType>& info) {
+        return newFactory(info.param)->toCString();
+    }
+};
+
+TEST_P(BinderRpcSimple, SetExternalServerTest) {
     base::unique_fd sink(TEMP_FAILURE_RETRY(open("/dev/null", O_RDWR)));
     int sinkFd = sink.get();
-    auto server = RpcServer::make();
+    auto server = RpcServer::make(newFactory(GetParam()));
     server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
     ASSERT_FALSE(server->hasServer());
     ASSERT_TRUE(server->setupExternalServer(std::move(sink)));
@@ -385,12 +410,15 @@
 };
 
 enum class SocketType {
+    PRECONNECTED,
     UNIX,
     VSOCK,
     INET,
 };
-static inline std::string PrintSocketType(const testing::TestParamInfo<SocketType>& info) {
-    switch (info.param) {
+static inline std::string PrintToString(SocketType socketType) {
+    switch (socketType) {
+        case SocketType::PRECONNECTED:
+            return "preconnected_uds";
         case SocketType::UNIX:
             return "unix_domain_socket";
         case SocketType::VSOCK:
@@ -403,7 +431,21 @@
     }
 }
 
-class BinderRpc : public ::testing::TestWithParam<SocketType> {
+static base::unique_fd connectToUds(const char* addrStr) {
+    UnixSocketAddress addr(addrStr);
+    base::unique_fd serverFd(
+            TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
+    int savedErrno = errno;
+    CHECK(serverFd.ok()) << "Could not create socket " << addrStr << ": " << strerror(savedErrno);
+
+    if (0 != TEMP_FAILURE_RETRY(connect(serverFd.get(), addr.addr(), addr.addrSize()))) {
+        int savedErrno = errno;
+        LOG(FATAL) << "Could not connect to socket " << addrStr << ": " << strerror(savedErrno);
+    }
+    return serverFd;
+}
+
+class BinderRpc : public ::testing::TestWithParam<std::tuple<SocketType, RpcSecurity>> {
 public:
     struct Options {
         size_t numThreads = 1;
@@ -411,13 +453,19 @@
         size_t numIncomingConnections = 0;
     };
 
+    static inline std::string PrintParamInfo(const testing::TestParamInfo<ParamType>& info) {
+        auto [type, security] = info.param;
+        return PrintToString(type) + "_" + newFactory(security)->toCString();
+    }
+
     // This creates a new process serving an interface on a certain number of
     // threads.
     ProcessSession createRpcTestSocketServerProcess(
             const Options& options, const std::function<void(const sp<RpcServer>&)>& configure) {
         CHECK_GE(options.numSessions, 1) << "Must have at least one session to a server";
 
-        SocketType socketType = GetParam();
+        SocketType socketType = std::get<0>(GetParam());
+        RpcSecurity rpcSecurity = std::get<1>(GetParam());
 
         unsigned int vsockPort = allocateVsockPort();
         std::string addr = allocateSocketAddress();
@@ -425,7 +473,7 @@
 
         auto ret = ProcessSession{
                 .host = Process([&](android::base::borrowed_fd writeEnd) {
-                    sp<RpcServer> server = RpcServer::make();
+                    sp<RpcServer> server = RpcServer::make(newFactory(rpcSecurity));
 
                     server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
                     server->setMaxThreads(options.numThreads);
@@ -433,6 +481,8 @@
                     unsigned int outPort = 0;
 
                     switch (socketType) {
+                        case SocketType::PRECONNECTED:
+                            [[fallthrough]];
                         case SocketType::UNIX:
                             CHECK(server->setupUnixDomainServer(addr.c_str())) << addr;
                             break;
@@ -467,10 +517,16 @@
         }
 
         for (size_t i = 0; i < options.numSessions; i++) {
-            sp<RpcSession> session = RpcSession::make();
+            sp<RpcSession> session = RpcSession::make(newFactory(rpcSecurity));
             session->setMaxThreads(options.numIncomingConnections);
 
             switch (socketType) {
+                case SocketType::PRECONNECTED:
+                    if (session->setupPreconnectedClient({}, [=]() {
+                            return connectToUds(addr.c_str());
+                        }))
+                        goto success;
+                    break;
                 case SocketType::UNIX:
                     if (session->setupUnixDomainClient(addr.c_str())) goto success;
                     break;
@@ -1131,13 +1187,14 @@
 }
 
 static bool testSupportVsockLoopback() {
+    // We don't need to enable TLS to know if vsock is supported.
     unsigned int vsockPort = allocateVsockPort();
-    sp<RpcServer> server = RpcServer::make();
+    sp<RpcServer> server = RpcServer::make(RpcTransportCtxFactoryRaw::make());
     server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
     CHECK(server->setupVsockServer(vsockPort));
     server->start();
 
-    sp<RpcSession> session = RpcSession::make();
+    sp<RpcSession> session = RpcSession::make(RpcTransportCtxFactoryRaw::make());
     bool okay = session->setupVsockClient(VMADDR_CID_LOCAL, vsockPort);
     while (!server->shutdown()) usleep(10000);
     ALOGE("Detected vsock loopback supported: %d", okay);
@@ -1145,7 +1202,7 @@
 }
 
 static std::vector<SocketType> testSocketTypes() {
-    std::vector<SocketType> ret = {SocketType::UNIX, SocketType::INET};
+    std::vector<SocketType> ret = {SocketType::PRECONNECTED, SocketType::UNIX, SocketType::INET};
 
     static bool hasVsockLoopback = testSupportVsockLoopback();
 
@@ -1156,10 +1213,13 @@
     return ret;
 }
 
-INSTANTIATE_TEST_CASE_P(PerSocket, BinderRpc, ::testing::ValuesIn(testSocketTypes()),
-                        PrintSocketType);
+INSTANTIATE_TEST_CASE_P(PerSocket, BinderRpc,
+                        ::testing::Combine(::testing::ValuesIn(testSocketTypes()),
+                                           ::testing::ValuesIn(RpcSecurityValues())),
+                        BinderRpc::PrintParamInfo);
 
-class BinderRpcServerRootObject : public ::testing::TestWithParam<std::tuple<bool, bool>> {};
+class BinderRpcServerRootObject
+      : public ::testing::TestWithParam<std::tuple<bool, bool, RpcSecurity>> {};
 
 TEST_P(BinderRpcServerRootObject, WeakRootObject) {
     using SetFn = std::function<void(RpcServer*, sp<IBinder>)>;
@@ -1167,8 +1227,8 @@
         return isStrong ? SetFn(&RpcServer::setRootObject) : SetFn(&RpcServer::setRootObjectWeak);
     };
 
-    auto server = RpcServer::make();
-    auto [isStrong1, isStrong2] = GetParam();
+    auto [isStrong1, isStrong2, rpcSecurity] = GetParam();
+    auto server = RpcServer::make(newFactory(rpcSecurity));
     auto binder1 = sp<BBinder>::make();
     IBinder* binderRaw1 = binder1.get();
     setRootObject(isStrong1)(server.get(), binder1);
@@ -1185,7 +1245,8 @@
 }
 
 INSTANTIATE_TEST_CASE_P(BinderRpc, BinderRpcServerRootObject,
-                        ::testing::Combine(::testing::Bool(), ::testing::Bool()));
+                        ::testing::Combine(::testing::Bool(), ::testing::Bool(),
+                                           ::testing::ValuesIn(RpcSecurityValues())));
 
 class OneOffSignal {
 public:
@@ -1208,10 +1269,10 @@
     bool mValue = false;
 };
 
-TEST(BinderRpc, Shutdown) {
+TEST_P(BinderRpcSimple, Shutdown) {
     auto addr = allocateSocketAddress();
     unlink(addr.c_str());
-    auto server = RpcServer::make();
+    auto server = RpcServer::make(newFactory(GetParam()));
     server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
     ASSERT_TRUE(server->setupUnixDomainServer(addr.c_str()));
     auto joinEnds = std::make_shared<OneOffSignal>();
@@ -1272,6 +1333,9 @@
     ASSERT_EQ(OK, rpcBinder->pingBinder());
 }
 
+INSTANTIATE_TEST_CASE_P(BinderRpc, BinderRpcSimple, ::testing::ValuesIn(RpcSecurityValues()),
+                        BinderRpcSimple::PrintTestParam);
+
 } // namespace android
 
 int main(int argc, char** argv) {
diff --git a/libs/binder/tests/parcel_fuzzer/random_parcel.cpp b/libs/binder/tests/parcel_fuzzer/random_parcel.cpp
index 92fdc72..a2472f8 100644
--- a/libs/binder/tests/parcel_fuzzer/random_parcel.cpp
+++ b/libs/binder/tests/parcel_fuzzer/random_parcel.cpp
@@ -19,6 +19,7 @@
 #include <android-base/logging.h>
 #include <binder/IServiceManager.h>
 #include <binder/RpcSession.h>
+#include <binder/RpcTransportRaw.h>
 #include <fuzzbinder/random_fd.h>
 #include <utils/String16.h>
 
@@ -35,7 +36,7 @@
 
 void fillRandomParcel(Parcel* p, FuzzedDataProvider&& provider) {
     if (provider.ConsumeBool()) {
-        auto session = sp<RpcSession>::make();
+        auto session = RpcSession::make(RpcTransportCtxFactoryRaw::make());
         CHECK(session->addNullDebuggingClient());
         p->markForRpc(session);
         fillRandomParcelData(p, std::move(provider));