Merge changes I0daebc18,I87df46d1

* changes:
  dumpsys: display the client PIDs of all of the services
  Add API to BpBinder to get handle for debugging
diff --git a/libs/binder/Android.bp b/libs/binder/Android.bp
index f34672c..2127f57 100644
--- a/libs/binder/Android.bp
+++ b/libs/binder/Android.bp
@@ -104,6 +104,7 @@
         "BpBinder.cpp",
         "BufferedTextOutput.cpp",
         "Debug.cpp",
+        "FdTrigger.cpp",
         "IInterface.cpp",
         "IMemory.cpp",
         "IPCThreadState.cpp",
@@ -359,5 +360,6 @@
         "libbinder",
         "liblog",
         "libutils",
+        "android.debug_aidl-cpp",
     ],
 }
diff --git a/libs/binder/Binder.cpp b/libs/binder/Binder.cpp
index 628381c..d3eef4e 100644
--- a/libs/binder/Binder.cpp
+++ b/libs/binder/Binder.cpp
@@ -555,7 +555,9 @@
         return status;
     }
     rpcServer->setRootObjectWeak(weakThis);
-    rpcServer->setupExternalServer(std::move(socketFd));
+    if (auto status = rpcServer->setupExternalServer(std::move(socketFd)); status != OK) {
+        return status;
+    }
     rpcServer->setMaxThreads(binderThreadPoolMaxCount);
     rpcServer->start();
     e->mRpcServerLinks.emplace(link);
diff --git a/libs/binder/FdTrigger.cpp b/libs/binder/FdTrigger.cpp
new file mode 100644
index 0000000..e38ac63
--- /dev/null
+++ b/libs/binder/FdTrigger.cpp
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "FdTrigger"
+#include <log/log.h>
+
+#include <poll.h>
+
+#include <android-base/macros.h>
+
+#include "FdTrigger.h"
+namespace android {
+
+std::unique_ptr<FdTrigger> FdTrigger::make() {
+    auto ret = std::make_unique<FdTrigger>();
+    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
+        ALOGE("Could not create pipe %s", strerror(errno));
+        return nullptr;
+    }
+    return ret;
+}
+
+void FdTrigger::trigger() {
+    mWrite.reset();
+}
+
+bool FdTrigger::isTriggered() {
+    return mWrite == -1;
+}
+
+status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
+    while (true) {
+        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
+                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
+        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
+        if (ret < 0) {
+            return -errno;
+        }
+        if (ret == 0) {
+            continue;
+        }
+        if (pfd[1].revents & POLLHUP) {
+            return -ECANCELED;
+        }
+        return pfd[0].revents & event ? OK : DEAD_OBJECT;
+    }
+}
+
+} // namespace android
diff --git a/libs/binder/FdTrigger.h b/libs/binder/FdTrigger.h
new file mode 100644
index 0000000..984e685
--- /dev/null
+++ b/libs/binder/FdTrigger.h
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include <android-base/unique_fd.h>
+#include <utils/Errors.h>
+
+namespace android {
+
+/** This is not a pipe. */
+class FdTrigger {
+public:
+    /** Returns nullptr for error case */
+    static std::unique_ptr<FdTrigger> make();
+
+    /**
+     * Close the write end of the pipe so that the read end receives POLLHUP.
+     * Not threadsafe.
+     */
+    void trigger();
+
+    /**
+     * Whether this has been triggered.
+     */
+    bool isTriggered();
+
+    /**
+     * Poll for a read event.
+     *
+     * event - for pollfd
+     *
+     * Return:
+     *   true - time to read!
+     *   false - trigger happened
+     */
+    status_t triggerablePoll(base::borrowed_fd fd, int16_t event);
+
+private:
+    base::unique_fd mWrite;
+    base::unique_fd mRead;
+};
+} // namespace android
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 879e462..a20445b 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -29,6 +29,7 @@
 #include <binder/RpcTransportRaw.h>
 #include <log/log.h>
 
+#include "FdTrigger.h"
 #include "RpcSocketAddress.h"
 #include "RpcState.h"
 #include "RpcWireFormat.h"
@@ -55,25 +56,25 @@
     mAgreedExperimental = true;
 }
 
-bool RpcServer::setupUnixDomainServer(const char* path) {
+status_t RpcServer::setupUnixDomainServer(const char* path) {
     return setupSocketServer(UnixSocketAddress(path));
 }
 
-bool RpcServer::setupVsockServer(unsigned int port) {
+status_t RpcServer::setupVsockServer(unsigned int port) {
     // realizing value w/ this type at compile time to avoid ubsan abort
     constexpr unsigned int kAnyCid = VMADDR_CID_ANY;
 
     return setupSocketServer(VsockSocketAddress(kAnyCid, port));
 }
 
-bool RpcServer::setupInetServer(const char* address, unsigned int port,
-                                unsigned int* assignedPort) {
+status_t RpcServer::setupInetServer(const char* address, unsigned int port,
+                                    unsigned int* assignedPort) {
     if (assignedPort != nullptr) *assignedPort = 0;
     auto aiStart = InetSocketAddress::getAddrInfo(address, port);
-    if (aiStart == nullptr) return false;
+    if (aiStart == nullptr) return UNKNOWN_ERROR;
     for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) {
         InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, address, port);
-        if (!setupSocketServer(socketAddress)) {
+        if (status_t status = setupSocketServer(socketAddress); status != OK) {
             continue;
         }
 
@@ -84,7 +85,7 @@
             int savedErrno = errno;
             ALOGE("Could not getsockname at %s: %s", socketAddress.toString().c_str(),
                   strerror(savedErrno));
-            return false;
+            return -savedErrno;
         }
         LOG_ALWAYS_FATAL_IF(len != sizeof(addr), "Wrong socket type: len %zu vs len %zu",
                             static_cast<size_t>(len), sizeof(addr));
@@ -97,11 +98,11 @@
             *assignedPort = realPort;
         }
 
-        return true;
+        return OK;
     }
     ALOGE("None of the socket address resolved for %s:%u can be set up as inet server.", address,
           port);
-    return false;
+    return UNKNOWN_ERROR;
 }
 
 void RpcServer::setMaxThreads(size_t threads) {
@@ -156,7 +157,7 @@
         LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join.");
         LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined");
         mJoinThreadRunning = true;
-        mShutdownTrigger = RpcSession::FdTrigger::make();
+        mShutdownTrigger = FdTrigger::make();
         LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");
 
         mCtx = mRpcTransportCtxFactory->newServerCtx();
@@ -167,7 +168,7 @@
     status_t status;
     while ((status = mShutdownTrigger->triggerablePoll(mServer, POLLIN)) == OK) {
         unique_fd clientFd(TEMP_FAILURE_RETRY(
-                accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC)));
+                accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC | SOCK_NONBLOCK)));
 
         if (clientFd < 0) {
             ALOGE("Could not accept4 socket: %s", strerror(errno));
@@ -259,7 +260,7 @@
     status_t status = OK;
 
     int clientFdForLog = clientFd.get();
-    auto client = server->mCtx->newTransport(std::move(clientFd));
+    auto client = server->mCtx->newTransport(std::move(clientFd), server->mShutdownTrigger.get());
     if (client == nullptr) {
         ALOGE("Dropping accept4()-ed socket because sslAccept fails");
         status = DEAD_OBJECT;
@@ -270,8 +271,8 @@
 
     RpcConnectionHeader header;
     if (status == OK) {
-        status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header,
-                                                                  sizeof(header));
+        status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header,
+                                                sizeof(header));
         if (status != OK) {
             ALOGE("Failed to read ID for client connecting to RPC server: %s",
                   statusToString(status).c_str());
@@ -296,8 +297,8 @@
                     .version = protocolVersion,
             };
 
-            status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response,
-                                                                       sizeof(response));
+            status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response,
+                                                     sizeof(response));
             if (status != OK) {
                 ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
                 // still need to cleanup before we can return
@@ -366,7 +367,7 @@
         }
 
         if (incoming) {
-            LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(client), true),
+            LOG_ALWAYS_FATAL_IF(OK != session->addOutgoingConnection(std::move(client), true),
                                 "server state must already be initialized");
             return;
         }
@@ -383,21 +384,22 @@
     RpcSession::join(std::move(session), std::move(setupResult));
 }
 
-bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
+status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
     LOG_RPC_DETAIL("Setting up socket server %s", addr.toString().c_str());
     LOG_ALWAYS_FATAL_IF(hasServer(), "Each RpcServer can only have one server.");
 
-    unique_fd serverFd(
-            TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
+    unique_fd serverFd(TEMP_FAILURE_RETRY(
+            socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)));
     if (serverFd == -1) {
-        ALOGE("Could not create socket: %s", strerror(errno));
-        return false;
+        int savedErrno = errno;
+        ALOGE("Could not create socket: %s", strerror(savedErrno));
+        return -savedErrno;
     }
 
     if (0 != TEMP_FAILURE_RETRY(bind(serverFd.get(), addr.addr(), addr.addrSize()))) {
         int savedErrno = errno;
         ALOGE("Could not bind socket at %s: %s", addr.toString().c_str(), strerror(savedErrno));
-        return false;
+        return -savedErrno;
     }
 
     // Right now, we create all threads at once, making accept4 slow. To avoid hanging the client,
@@ -407,16 +409,16 @@
     if (0 != TEMP_FAILURE_RETRY(listen(serverFd.get(), 50 /*backlog*/))) {
         int savedErrno = errno;
         ALOGE("Could not listen socket at %s: %s", addr.toString().c_str(), strerror(savedErrno));
-        return false;
+        return -savedErrno;
     }
 
     LOG_RPC_DETAIL("Successfully setup socket server %s", addr.toString().c_str());
 
-    if (!setupExternalServer(std::move(serverFd))) {
+    if (status_t status = setupExternalServer(std::move(serverFd)); status != OK) {
         ALOGE("Another thread has set up server while calling setupSocketServer. Race?");
-        return false;
+        return status;
     }
-    return true;
+    return OK;
 }
 
 void RpcServer::onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) {
@@ -449,15 +451,15 @@
     return std::move(mServer);
 }
 
-bool RpcServer::setupExternalServer(base::unique_fd serverFd) {
+status_t RpcServer::setupExternalServer(base::unique_fd serverFd) {
     LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
     std::lock_guard<std::mutex> _l(mLock);
     if (mServer.ok()) {
         ALOGE("Each RpcServer can only have one server.");
-        return false;
+        return INVALID_OPERATION;
     }
     mServer = std::move(serverFd);
-    return true;
+    return OK;
 }
 
 } // namespace android
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index b17191c..4c47005 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -35,9 +35,11 @@
 #include <jni.h>
 #include <utils/String8.h>
 
+#include "FdTrigger.h"
 #include "RpcSocketAddress.h"
 #include "RpcState.h"
 #include "RpcWireFormat.h"
+#include "Utils.h"
 
 #ifdef __GLIBC__
 extern "C" pid_t gettid();
@@ -107,43 +109,61 @@
     return mProtocolVersion;
 }
 
-bool RpcSession::setupUnixDomainClient(const char* path) {
+status_t RpcSession::setupUnixDomainClient(const char* path) {
     return setupSocketClient(UnixSocketAddress(path));
 }
 
-bool RpcSession::setupVsockClient(unsigned int cid, unsigned int port) {
+status_t RpcSession::setupVsockClient(unsigned int cid, unsigned int port) {
     return setupSocketClient(VsockSocketAddress(cid, port));
 }
 
-bool RpcSession::setupInetClient(const char* addr, unsigned int port) {
+status_t RpcSession::setupInetClient(const char* addr, unsigned int port) {
     auto aiStart = InetSocketAddress::getAddrInfo(addr, port);
-    if (aiStart == nullptr) return false;
+    if (aiStart == nullptr) return UNKNOWN_ERROR;
     for (auto ai = aiStart.get(); ai != nullptr; ai = ai->ai_next) {
         InetSocketAddress socketAddress(ai->ai_addr, ai->ai_addrlen, addr, port);
-        if (setupSocketClient(socketAddress)) return true;
+        if (status_t status = setupSocketClient(socketAddress); status == OK) return OK;
     }
     ALOGE("None of the socket address resolved for %s:%u can be added as inet client.", addr, port);
-    return false;
+    return NAME_NOT_FOUND;
 }
 
-bool RpcSession::addNullDebuggingClient() {
+status_t RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_fd()>&& request) {
+    return setupClient([&](const RpcAddress& sessionId, bool incoming) -> status_t {
+        // std::move'd from fd becomes -1 (!ok())
+        if (!fd.ok()) {
+            fd = request();
+            if (!fd.ok()) return BAD_VALUE;
+        }
+        if (auto res = setNonBlocking(fd); !res.ok()) {
+            ALOGE("setupPreconnectedClient: %s", res.error().message().c_str());
+            return res.error().code() == 0 ? UNKNOWN_ERROR : -res.error().code();
+        }
+        return initAndAddConnection(std::move(fd), sessionId, incoming);
+    });
+}
+
+status_t RpcSession::addNullDebuggingClient() {
     // Note: only works on raw sockets.
+    if (auto status = initShutdownTrigger(); status != OK) return status;
+
     unique_fd serverFd(TEMP_FAILURE_RETRY(open("/dev/null", O_WRONLY | O_CLOEXEC)));
 
     if (serverFd == -1) {
-        ALOGE("Could not connect to /dev/null: %s", strerror(errno));
-        return false;
+        int savedErrno = errno;
+        ALOGE("Could not connect to /dev/null: %s", strerror(savedErrno));
+        return -savedErrno;
     }
 
     auto ctx = mRpcTransportCtxFactory->newClientCtx();
     if (ctx == nullptr) {
         ALOGE("Unable to create RpcTransportCtx for null debugging client");
-        return false;
+        return NO_MEMORY;
     }
-    auto server = ctx->newTransport(std::move(serverFd));
+    auto server = ctx->newTransport(std::move(serverFd), mShutdownTrigger.get());
     if (server == nullptr) {
         ALOGE("Unable to set up RpcTransport");
-        return false;
+        return UNKNOWN_ERROR;
     }
     return addOutgoingConnection(std::move(server), false);
 }
@@ -204,91 +224,6 @@
     return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address);
 }
 
-std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
-    auto ret = std::make_unique<RpcSession::FdTrigger>();
-    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
-        ALOGE("Could not create pipe %s", strerror(errno));
-        return nullptr;
-    }
-    return ret;
-}
-
-void RpcSession::FdTrigger::trigger() {
-    mWrite.reset();
-}
-
-bool RpcSession::FdTrigger::isTriggered() {
-    return mWrite == -1;
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) {
-    return triggerablePoll(rpcTransport->pollSocket(), event);
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
-    while (true) {
-        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
-                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
-        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
-        if (ret < 0) {
-            return -errno;
-        }
-        if (ret == 0) {
-            continue;
-        }
-        if (pfd[1].revents & POLLHUP) {
-            return -ECANCELED;
-        }
-        return pfd[0].revents & event ? OK : DEAD_OBJECT;
-    }
-}
-
-status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport,
-                                                        const void* data, size_t size) {
-    const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
-    const uint8_t* end = buffer + size;
-
-    MAYBE_WAIT_IN_FLAKE_MODE;
-
-    status_t status;
-    while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) {
-        auto writeSize = rpcTransport->send(buffer, end - buffer);
-        if (!writeSize.ok()) {
-            LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
-            return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
-        }
-
-        if (*writeSize == 0) return DEAD_OBJECT;
-
-        buffer += *writeSize;
-        if (buffer == end) return OK;
-    }
-    return status;
-}
-
-status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data,
-                                                       size_t size) {
-    uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
-    uint8_t* end = buffer + size;
-
-    MAYBE_WAIT_IN_FLAKE_MODE;
-
-    status_t status;
-    while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) {
-        auto readSize = rpcTransport->recv(buffer, end - buffer);
-        if (!readSize.ok()) {
-            LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
-            return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
-        }
-
-        if (*readSize == 0) return DEAD_OBJECT; // EOF
-
-        buffer += *readSize;
-        if (buffer == end) return OK;
-    }
-    return status;
-}
-
 status_t RpcSession::readId() {
     {
         std::lock_guard<std::mutex> _l(mMutex);
@@ -464,48 +399,48 @@
     return server;
 }
 
-bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
+status_t RpcSession::setupClient(
+        const std::function<status_t(const RpcAddress& sessionId, bool incoming)>& connectAndInit) {
     {
         std::lock_guard<std::mutex> _l(mMutex);
         LOG_ALWAYS_FATAL_IF(mOutgoingConnections.size() != 0,
                             "Must only setup session once, but already has %zu clients",
                             mOutgoingConnections.size());
     }
+    if (auto status = initShutdownTrigger(); status != OK) return status;
 
-    if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*incoming*/)) return false;
+    if (status_t status = connectAndInit(RpcAddress::zero(), false /*incoming*/); status != OK)
+        return status;
 
     {
         ExclusiveConnection connection;
-        status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
-                                                    ConnectionUse::CLIENT, &connection);
-        if (status != OK) return false;
+        if (status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
+                                                        ConnectionUse::CLIENT, &connection);
+            status != OK)
+            return status;
 
         uint32_t version;
-        status = state()->readNewSessionResponse(connection.get(),
-                                                 sp<RpcSession>::fromExisting(this), &version);
-        if (!setProtocolVersion(version)) return false;
+        if (status_t status =
+                    state()->readNewSessionResponse(connection.get(),
+                                                    sp<RpcSession>::fromExisting(this), &version);
+            status != OK)
+            return status;
+        if (!setProtocolVersion(version)) return BAD_VALUE;
     }
 
     // TODO(b/189955605): we should add additional sessions dynamically
     // instead of all at once.
-    // TODO(b/186470974): first risk of blocking
     size_t numThreadsAvailable;
     if (status_t status = getRemoteMaxThreads(&numThreadsAvailable); status != OK) {
-        ALOGE("Could not get max threads after initial session to %s: %s", addr.toString().c_str(),
+        ALOGE("Could not get max threads after initial session setup: %s",
               statusToString(status).c_str());
-        return false;
+        return status;
     }
 
     if (status_t status = readId(); status != OK) {
-        ALOGE("Could not get session id after initial session to %s; %s", addr.toString().c_str(),
+        ALOGE("Could not get session id after initial session setup: %s",
               statusToString(status).c_str());
-        return false;
-    }
-
-    // we've already setup one client
-    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
-        // TODO(b/189955605): shutdown existing connections?
-        if (!setupOneSocketConnection(addr, mId.value(), false /*incoming*/)) return false;
+        return status;
     }
 
     // TODO(b/189955605): we should add additional sessions dynamically
@@ -514,25 +449,38 @@
     // requested to be set) in order to allow the other side to reliably make
     // any requests at all.
 
-    for (size_t i = 0; i < mMaxThreads; i++) {
-        if (!setupOneSocketConnection(addr, mId.value(), true /*incoming*/)) return false;
+    // we've already setup one client
+    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
+        if (status_t status = connectAndInit(mId.value(), false /*incoming*/); status != OK)
+            return status;
     }
 
-    return true;
+    for (size_t i = 0; i < mMaxThreads; i++) {
+        if (status_t status = connectAndInit(mId.value(), true /*incoming*/); status != OK)
+            return status;
+    }
+
+    return OK;
 }
 
-bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id,
-                                          bool incoming) {
+status_t RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
+    return setupClient([&](const RpcAddress& sessionId, bool incoming) {
+        return setupOneSocketConnection(addr, sessionId, incoming);
+    });
+}
+
+status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr,
+                                              const RpcAddress& sessionId, bool incoming) {
     for (size_t tries = 0; tries < 5; tries++) {
         if (tries > 0) usleep(10000);
 
-        unique_fd serverFd(
-                TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
+        unique_fd serverFd(TEMP_FAILURE_RETRY(
+                socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)));
         if (serverFd == -1) {
             int savedErrno = errno;
             ALOGE("Could not create socket at %s: %s", addr.toString().c_str(),
                   strerror(savedErrno));
-            return false;
+            return -savedErrno;
         }
 
         if (0 != TEMP_FAILURE_RETRY(connect(serverFd.get(), addr.addr(), addr.addrSize()))) {
@@ -540,62 +488,87 @@
                 ALOGW("Connection reset on %s", addr.toString().c_str());
                 continue;
             }
-            int savedErrno = errno;
-            ALOGE("Could not connect socket at %s: %s", addr.toString().c_str(),
-                  strerror(savedErrno));
-            return false;
+            if (errno != EAGAIN && errno != EINPROGRESS) {
+                int savedErrno = errno;
+                ALOGE("Could not connect socket at %s: %s", addr.toString().c_str(),
+                      strerror(savedErrno));
+                return -savedErrno;
+            }
+            // For non-blocking sockets, connect() may return EAGAIN (for unix domain socket) or
+            // EINPROGRESS (for others). Call poll() and getsockopt() to get the error.
+            status_t pollStatus = mShutdownTrigger->triggerablePoll(serverFd, POLLOUT);
+            if (pollStatus != OK) {
+                ALOGE("Could not POLLOUT after connect() on non-blocking socket: %s",
+                      statusToString(pollStatus).c_str());
+                return pollStatus;
+            }
+            int soError;
+            socklen_t soErrorLen = sizeof(soError);
+            int ret = getsockopt(serverFd.get(), SOL_SOCKET, SO_ERROR, &soError, &soErrorLen);
+            if (ret == -1) {
+                int savedErrno = errno;
+                ALOGE("Could not getsockopt() after connect() on non-blocking socket: %s",
+                      strerror(savedErrno));
+                return -savedErrno;
+            }
+            if (soError != 0) {
+                ALOGE("After connect(), getsockopt() returns error for socket at %s: %s",
+                      addr.toString().c_str(), strerror(soError));
+                return -soError;
+            }
         }
         LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());
 
-        auto ctx = mRpcTransportCtxFactory->newClientCtx();
-        if (ctx == nullptr) {
-            ALOGE("Unable to create client RpcTransportCtx with %s sockets",
-                  mRpcTransportCtxFactory->toCString());
-            return false;
-        }
-        auto server = ctx->newTransport(std::move(serverFd));
-        if (server == nullptr) {
-            ALOGE("Unable to set up RpcTransport for %s", addr.toString().c_str());
-            return false;
-        }
-
-        LOG_RPC_DETAIL("Socket at %s client with RpcTransport %p", addr.toString().c_str(),
-                       server.get());
-
-        RpcConnectionHeader header{
-                .version = mProtocolVersion.value_or(RPC_WIRE_PROTOCOL_VERSION),
-                .options = 0,
-        };
-        memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress));
-
-        if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
-
-        auto sentHeader = server->send(&header, sizeof(header));
-        if (!sentHeader.ok()) {
-            ALOGE("Could not write connection header to socket at %s: %s", addr.toString().c_str(),
-                  sentHeader.error().message().c_str());
-            return false;
-        }
-        if (*sentHeader != sizeof(header)) {
-            ALOGE("Could not write connection header to socket at %s: sent %zd bytes, expected %zd",
-                  addr.toString().c_str(), *sentHeader, sizeof(header));
-            return false;
-        }
-
-        LOG_RPC_DETAIL("Socket at %s client: header sent", addr.toString().c_str());
-
-        if (incoming) {
-            return addIncomingConnection(std::move(server));
-        } else {
-            return addOutgoingConnection(std::move(server), true);
-        }
+        return initAndAddConnection(std::move(serverFd), sessionId, incoming);
     }
 
     ALOGE("Ran out of retries to connect to %s", addr.toString().c_str());
-    return false;
+    return UNKNOWN_ERROR;
 }
 
-bool RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) {
+status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId,
+                                          bool incoming) {
+    LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr);
+    auto ctx = mRpcTransportCtxFactory->newClientCtx();
+    if (ctx == nullptr) {
+        ALOGE("Unable to create client RpcTransportCtx with %s sockets",
+              mRpcTransportCtxFactory->toCString());
+        return NO_MEMORY;
+    }
+    auto server = ctx->newTransport(std::move(fd), mShutdownTrigger.get());
+    if (server == nullptr) {
+        ALOGE("Unable to set up RpcTransport in %s context", mRpcTransportCtxFactory->toCString());
+        return UNKNOWN_ERROR;
+    }
+
+    LOG_RPC_DETAIL("Socket at client with RpcTransport %p", server.get());
+
+    RpcConnectionHeader header{
+            .version = mProtocolVersion.value_or(RPC_WIRE_PROTOCOL_VERSION),
+            .options = 0,
+    };
+    memcpy(&header.sessionId, &sessionId.viewRawEmbedded(), sizeof(RpcWireAddress));
+
+    if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
+
+    auto sendHeaderStatus =
+            server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header));
+    if (sendHeaderStatus != OK) {
+        ALOGE("Could not write connection header to socket: %s",
+              statusToString(sendHeaderStatus).c_str());
+        return sendHeaderStatus;
+    }
+
+    LOG_RPC_DETAIL("Socket at client: header sent");
+
+    if (incoming) {
+        return addIncomingConnection(std::move(server));
+    } else {
+        return addOutgoingConnection(std::move(server), true /*init*/);
+    }
+}
+
+status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) {
     std::mutex mutex;
     std::condition_variable joinCv;
     std::unique_lock<std::mutex> lock(mutex);
@@ -621,22 +594,24 @@
     });
     joinCv.wait(lock, [&] { return ownershipTransferred; });
     LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
-    return true;
+    return OK;
 }
 
-bool RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) {
+status_t RpcSession::initShutdownTrigger() {
+    // first client connection added, but setForServer not called, so
+    // initializaing for a client.
+    if (mShutdownTrigger == nullptr) {
+        mShutdownTrigger = FdTrigger::make();
+        mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
+        if (mShutdownTrigger == nullptr) return INVALID_OPERATION;
+    }
+    return OK;
+}
+
+status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) {
     sp<RpcConnection> connection = sp<RpcConnection>::make();
     {
         std::lock_guard<std::mutex> _l(mMutex);
-
-        // first client connection added, but setForServer not called, so
-        // initializaing for a client.
-        if (mShutdownTrigger == nullptr) {
-            mShutdownTrigger = FdTrigger::make();
-            mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
-            if (mShutdownTrigger == nullptr) return false;
-        }
-
         connection->rpcTransport = std::move(rpcTransport);
         connection->exclusiveTid = gettid();
         mOutgoingConnections.push_back(connection);
@@ -652,7 +627,7 @@
         connection->exclusiveTid = std::nullopt;
     }
 
-    return status == OK;
+    return status;
 }
 
 bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 6563bc8..b58f1b3 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -283,8 +283,8 @@
     }
 
     if (status_t status =
-                session->mShutdownTrigger->interruptableWriteFully(connection->rpcTransport.get(),
-                                                                   data, size);
+                connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(),
+                                                                  data, size);
         status != OK) {
         LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                        connection->rpcTransport.get(), statusToString(status).c_str());
@@ -305,8 +305,8 @@
     }
 
     if (status_t status =
-                session->mShutdownTrigger->interruptableReadFully(connection->rpcTransport.get(),
-                                                                  data, size);
+                connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(),
+                                                                 data, size);
         status != OK) {
         LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                        connection->rpcTransport.get(), statusToString(status).c_str());
@@ -601,7 +601,7 @@
 status_t RpcState::drainCommands(const sp<RpcSession::RpcConnection>& connection,
                                  const sp<RpcSession>& session, CommandType type) {
     uint8_t buf;
-    while (connection->rpcTransport->peek(&buf, sizeof(buf)).value_or(-1) > 0) {
+    while (connection->rpcTransport->peek(&buf, sizeof(buf)).value_or(0) > 0) {
         status_t status = getAndExecuteCommand(connection, session, type);
         if (status != OK) return status;
     }
diff --git a/libs/binder/RpcTransportRaw.cpp b/libs/binder/RpcTransportRaw.cpp
index 953d233..d77fc52 100644
--- a/libs/binder/RpcTransportRaw.cpp
+++ b/libs/binder/RpcTransportRaw.cpp
@@ -17,8 +17,11 @@
 #define LOG_TAG "RpcRawTransport"
 #include <log/log.h>
 
+#include <poll.h>
+
 #include <binder/RpcTransportRaw.h>
 
+#include "FdTrigger.h"
 #include "RpcState.h"
 
 using android::base::ErrnoError;
@@ -32,29 +35,71 @@
 class RpcTransportRaw : public RpcTransport {
 public:
     explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {}
-    Result<ssize_t> send(const void *buf, int size) override {
+    Result<size_t> send(const void* buf, size_t size) {
         ssize_t ret = TEMP_FAILURE_RETRY(::send(mSocket.get(), buf, size, MSG_NOSIGNAL));
         if (ret < 0) {
             return ErrnoError() << "send()";
         }
         return ret;
     }
-    Result<ssize_t> recv(void *buf, int size) override {
+    Result<size_t> recv(void* buf, size_t size) {
         ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_NOSIGNAL));
         if (ret < 0) {
             return ErrnoError() << "recv()";
         }
         return ret;
     }
-    Result<ssize_t> peek(void *buf, int size) override {
-        ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_PEEK | MSG_DONTWAIT));
+    Result<size_t> peek(void *buf, size_t size) override {
+        ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_PEEK));
         if (ret < 0) {
             return ErrnoError() << "recv(MSG_PEEK)";
         }
         return ret;
     }
-    bool pending() override { return false; }
-    android::base::borrowed_fd pollSocket() const override { return mSocket; }
+
+    status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override {
+        const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
+        const uint8_t* end = buffer + size;
+
+        MAYBE_WAIT_IN_FLAKE_MODE;
+
+        status_t status;
+        while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLOUT)) == OK) {
+            auto writeSize = this->send(buffer, end - buffer);
+            if (!writeSize.ok()) {
+                LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
+                return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
+            }
+
+            if (*writeSize == 0) return DEAD_OBJECT;
+
+            buffer += *writeSize;
+            if (buffer == end) return OK;
+        }
+        return status;
+    }
+
+    status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override {
+        uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
+        uint8_t* end = buffer + size;
+
+        MAYBE_WAIT_IN_FLAKE_MODE;
+
+        status_t status;
+        while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLIN)) == OK) {
+            auto readSize = this->recv(buffer, end - buffer);
+            if (!readSize.ok()) {
+                LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
+                return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
+            }
+
+            if (*readSize == 0) return DEAD_OBJECT; // EOF
+
+            buffer += *readSize;
+            if (buffer == end) return OK;
+        }
+        return status;
+    }
 
 private:
     android::base::unique_fd mSocket;
@@ -63,7 +108,7 @@
 // RpcTransportCtx with TLS disabled.
 class RpcTransportCtxRaw : public RpcTransportCtx {
 public:
-    std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd) const {
+    std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd, FdTrigger*) const {
         return std::make_unique<RpcTransportRaw>(std::move(fd));
     }
 };
diff --git a/libs/binder/ServiceManagerHost.cpp b/libs/binder/ServiceManagerHost.cpp
index 1c2f9b4..59334b7 100644
--- a/libs/binder/ServiceManagerHost.cpp
+++ b/libs/binder/ServiceManagerHost.cpp
@@ -158,8 +158,10 @@
     LOG_ALWAYS_FATAL_IF(!forwardResult->hostPort().has_value());
 
     auto rpcSession = RpcSession::make();
-    if (!rpcSession->setupInetClient("127.0.0.1", *forwardResult->hostPort())) {
-        ALOGE("Unable to set up inet client on host port %u", *forwardResult->hostPort());
+    if (status_t status = rpcSession->setupInetClient("127.0.0.1", *forwardResult->hostPort());
+        status != OK) {
+        ALOGE("Unable to set up inet client on host port %u: %s", *forwardResult->hostPort(),
+              statusToString(status).c_str());
         return nullptr;
     }
     auto binder = rpcSession->getRootObject();
diff --git a/libs/binder/Utils.cpp b/libs/binder/Utils.cpp
index 90a4502..d2a5be1 100644
--- a/libs/binder/Utils.cpp
+++ b/libs/binder/Utils.cpp
@@ -18,10 +18,24 @@
 
 #include <string.h>
 
+using android::base::ErrnoError;
+using android::base::Result;
+
 namespace android {
 
 void zeroMemory(uint8_t* data, size_t size) {
     memset(data, 0, size);
 }
 
-}   // namespace android
+Result<void> setNonBlocking(android::base::borrowed_fd fd) {
+    int flags = TEMP_FAILURE_RETRY(fcntl(fd.get(), F_GETFL));
+    if (flags == -1) {
+        return ErrnoError() << "Could not get flags for fd";
+    }
+    if (int ret = TEMP_FAILURE_RETRY(fcntl(fd.get(), F_SETFL, flags | O_NONBLOCK)); ret == -1) {
+        return ErrnoError() << "Could not set non-blocking flag for fd";
+    }
+    return {};
+}
+
+} // namespace android
diff --git a/libs/binder/Utils.h b/libs/binder/Utils.h
index f94b158..1e383da 100644
--- a/libs/binder/Utils.h
+++ b/libs/binder/Utils.h
@@ -17,9 +17,14 @@
 #include <cstdint>
 #include <stddef.h>
 
+#include <android-base/result.h>
+#include <android-base/unique_fd.h>
+
 namespace android {
 
 // avoid optimizations
 void zeroMemory(uint8_t* data, size_t size);
 
+android::base::Result<void> setNonBlocking(android::base::borrowed_fd fd);
+
 }   // namespace android
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 4abf3b9..bf3e7e0 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -32,6 +32,7 @@
 
 namespace android {
 
+class FdTrigger;
 class RpcSocketAddress;
 
 /**
@@ -59,12 +60,12 @@
      *     process B makes binder b and sends it to A
      *     A uses this 'back session' to send things back to B
      */
-    [[nodiscard]] bool setupUnixDomainServer(const char* path);
+    [[nodiscard]] status_t setupUnixDomainServer(const char* path);
 
     /**
      * Creates an RPC server at the current port.
      */
-    [[nodiscard]] bool setupVsockServer(unsigned int port);
+    [[nodiscard]] status_t setupVsockServer(unsigned int port);
 
     /**
      * Creates an RPC server at the current port using IPv4.
@@ -80,8 +81,8 @@
      * "0.0.0.0" allows for connections on any IP address that the device may
      * have
      */
-    [[nodiscard]] bool setupInetServer(const char* address, unsigned int port,
-                                       unsigned int* assignedPort);
+    [[nodiscard]] status_t setupInetServer(const char* address, unsigned int port,
+                                           unsigned int* assignedPort);
 
     /**
      * If setup*Server has been successful, return true. Otherwise return false.
@@ -97,7 +98,7 @@
      * Set up server using an external FD previously set up by releaseServer().
      * Return false if there's already a server.
      */
-    bool setupExternalServer(base::unique_fd serverFd);
+    [[nodiscard]] status_t setupExternalServer(base::unique_fd serverFd);
 
     void iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
 
@@ -175,7 +176,7 @@
     void onSessionIncomingThreadEnded() override;
 
     static void establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd);
-    bool setupSocketServer(const RpcSocketAddress& address);
+    status_t setupSocketServer(const RpcSocketAddress& address);
 
     const std::unique_ptr<RpcTransportCtxFactory> mRpcTransportCtxFactory;
     bool mAgreedExperimental = false;
@@ -190,7 +191,7 @@
     sp<IBinder> mRootObject;
     wp<IBinder> mRootObjectWeak;
     std::map<RpcAddress, sp<RpcSession>> mSessions;
-    std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
+    std::unique_ptr<FdTrigger> mShutdownTrigger;
     std::condition_variable mShutdownCv;
     std::unique_ptr<RpcTransportCtx> mCtx;
 };
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index e3d6bba..761c50d 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -39,6 +39,7 @@
 class RpcSocketAddress;
 class RpcState;
 class RpcTransport;
+class FdTrigger;
 
 constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_NEXT = 0;
 constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL = 0xF0000000;
@@ -78,17 +79,30 @@
      * This should be called once per thread, matching 'join' in the remote
      * process.
      */
-    [[nodiscard]] bool setupUnixDomainClient(const char* path);
+    [[nodiscard]] status_t setupUnixDomainClient(const char* path);
 
     /**
      * Connects to an RPC server at the CVD & port.
      */
-    [[nodiscard]] bool setupVsockClient(unsigned int cvd, unsigned int port);
+    [[nodiscard]] status_t setupVsockClient(unsigned int cvd, unsigned int port);
 
     /**
      * Connects to an RPC server at the given address and port.
      */
-    [[nodiscard]] bool setupInetClient(const char* addr, unsigned int port);
+    [[nodiscard]] status_t setupInetClient(const char* addr, unsigned int port);
+
+    /**
+     * Starts talking to an RPC server which has already been connected to. This
+     * is expected to be used when another process has permission to connect to
+     * a binder RPC service, but this process only has permission to talk to
+     * that service.
+     *
+     * For convenience, if 'fd' is -1, 'request' will be called.
+     *
+     * For future compatibility, 'request' should not reference any stack data.
+     */
+    [[nodiscard]] status_t setupPreconnectedClient(base::unique_fd fd,
+                                                   std::function<base::unique_fd()>&& request);
 
     /**
      * For debugging!
@@ -97,7 +111,7 @@
      * response will never be satisfied. All data sent here will be
      * unceremoniously cast down the bottomless pit, /dev/null.
      */
-    [[nodiscard]] bool addNullDebuggingClient();
+    [[nodiscard]] status_t addNullDebuggingClient();
 
     /**
      * Query the other side of the session for the root object hosted by that
@@ -148,50 +162,6 @@
     friend RpcState;
     explicit RpcSession(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory);
 
-    /** This is not a pipe. */
-    struct FdTrigger {
-        /** Returns nullptr for error case */
-        static std::unique_ptr<FdTrigger> make();
-
-        /**
-         * Close the write end of the pipe so that the read end receives POLLHUP.
-         * Not threadsafe.
-         */
-        void trigger();
-
-        /**
-         * Whether this has been triggered.
-         */
-        bool isTriggered();
-
-        /**
-         * Poll for a read event.
-         *
-         * event - for pollfd
-         *
-         * Return:
-         *   true - time to read!
-         *   false - trigger happened
-         */
-        status_t triggerablePoll(base::borrowed_fd fd, int16_t event);
-
-        /**
-         * Read (or write), but allow to be interrupted by this trigger.
-         *
-         * Return:
-         *   true - succeeded in completely processing 'size'
-         *   false - interrupted (failure or trigger)
-         */
-        status_t interruptableReadFully(RpcTransport* rpcTransport, void* data, size_t size);
-        status_t interruptableWriteFully(RpcTransport* rpcTransport, const void* data, size_t size);
-
-    private:
-        status_t triggerablePoll(RpcTransport* rpcTransport, int16_t event);
-
-        base::unique_fd mWrite;
-        base::unique_fd mRead;
-    };
-
     class EventListener : public virtual RefBase {
     public:
         virtual void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0;
@@ -240,11 +210,17 @@
     // join on thread passed to preJoinThreadOwnership
     static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result);
 
-    [[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address);
-    [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address,
-                                                const RpcAddress& sessionId, bool server);
-    [[nodiscard]] bool addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport);
-    [[nodiscard]] bool addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init);
+    [[nodiscard]] status_t setupClient(
+            const std::function<status_t(const RpcAddress& sessionId, bool incoming)>&
+                    connectAndInit);
+    [[nodiscard]] status_t setupSocketClient(const RpcSocketAddress& address);
+    [[nodiscard]] status_t setupOneSocketConnection(const RpcSocketAddress& address,
+                                                    const RpcAddress& sessionId, bool incoming);
+    [[nodiscard]] status_t initAndAddConnection(base::unique_fd fd, const RpcAddress& sessionId,
+                                                bool incoming);
+    [[nodiscard]] status_t addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport);
+    [[nodiscard]] status_t addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport,
+                                                 bool init);
     [[nodiscard]] bool setForServer(const wp<RpcServer>& server,
                                     const wp<RpcSession::EventListener>& eventListener,
                                     const RpcAddress& sessionId);
@@ -252,6 +228,8 @@
             std::unique_ptr<RpcTransport> rpcTransport);
     [[nodiscard]] bool removeIncomingConnection(const sp<RpcConnection>& connection);
 
+    status_t initShutdownTrigger();
+
     enum class ConnectionUse {
         CLIENT,
         CLIENT_ASYNC,
diff --git a/libs/binder/include/binder/RpcTransport.h b/libs/binder/include/binder/RpcTransport.h
index 1778cae..1b69519 100644
--- a/libs/binder/include/binder/RpcTransport.h
+++ b/libs/binder/include/binder/RpcTransport.h
@@ -23,42 +23,30 @@
 
 #include <android-base/result.h>
 #include <android-base/unique_fd.h>
+#include <utils/Errors.h>
 
 namespace android {
 
+class FdTrigger;
+
 // Represents a socket connection.
 class RpcTransport {
 public:
     virtual ~RpcTransport() = default;
 
-    // replacement of ::send(). errno may not be set if TLS is enabled.
-    virtual android::base::Result<ssize_t> send(const void *buf, int size) = 0;
+    // replacement of ::recv(MSG_PEEK). Error code may not be set if TLS is enabled.
+    virtual android::base::Result<size_t> peek(void *buf, size_t size) = 0;
 
-    // replacement of ::recv(). errno may not be set if TLS is enabled.
-    virtual android::base::Result<ssize_t> recv(void *buf, int size) = 0;
-
-    // replacement of ::recv(MSG_PEEK). errno may not be set if TLS is enabled.
-    //
-    // Implementation details:
-    // - For TLS, this may invoke syscalls and read data from the transport
-    // into an internal buffer in userspace. After that, pending() == true.
-    // - For raw sockets, this calls ::recv(MSG_PEEK), which leaves the data in the kernel buffer;
-    // pending() is always false.
-    virtual android::base::Result<ssize_t> peek(void *buf, int size) = 0;
-
-    // Returns true if there are data pending in a userspace buffer that RpcTransport holds.
-    //
-    // Implementation details:
-    // - For TLS, this does not invoke any syscalls or read any data from the
-    // transport. This only returns whether there are data pending in the internal buffer in
-    // userspace.
-    // - For raw sockets, this always returns false.
-    virtual bool pending() = 0;
-
-    // Returns fd for polling.
-    //
-    // Do not directly read / write on this raw fd!
-    [[nodiscard]] virtual android::base::borrowed_fd pollSocket() const = 0;
+    /**
+     * Read (or write), but allow to be interrupted by a trigger.
+     *
+     * Return:
+     *   OK - succeeded in completely processing 'size'
+     *   error - interrupted (failure or trigger)
+     */
+    virtual status_t interruptableWriteFully(FdTrigger *fdTrigger, const void *buf,
+                                             size_t size) = 0;
+    virtual status_t interruptableReadFully(FdTrigger *fdTrigger, void *buf, size_t size) = 0;
 
 protected:
     RpcTransport() = default;
@@ -68,8 +56,13 @@
 class RpcTransportCtx {
 public:
     virtual ~RpcTransportCtx() = default;
+
+    // Create a new RpcTransport object.
+    //
+    // Implemenion details: for TLS, this function may incur I/O. |fdTrigger| may be used
+    // to interrupt I/O. This function blocks until handshake is finished.
     [[nodiscard]] virtual std::unique_ptr<RpcTransport> newTransport(
-            android::base::unique_fd fd) const = 0;
+            android::base::unique_fd fd, FdTrigger *fdTrigger) const = 0;
 
 protected:
     RpcTransportCtx() = default;
diff --git a/libs/binder/libbinder_rpc_unstable.cpp b/libs/binder/libbinder_rpc_unstable.cpp
index 68ec669..bcb13ae 100644
--- a/libs/binder/libbinder_rpc_unstable.cpp
+++ b/libs/binder/libbinder_rpc_unstable.cpp
@@ -19,16 +19,20 @@
 #include <binder/RpcServer.h>
 #include <binder/RpcSession.h>
 
+using android::OK;
 using android::RpcServer;
 using android::RpcSession;
+using android::status_t;
+using android::statusToString;
 
 extern "C" {
 
 bool RunRpcServer(AIBinder* service, unsigned int port) {
     auto server = RpcServer::make();
     server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
-    if (!server->setupVsockServer(port)) {
-        LOG(ERROR) << "Failed to set up vsock server with port " << port;
+    if (status_t status = server->setupVsockServer(port); status != OK) {
+        LOG(ERROR) << "Failed to set up vsock server with port " << port
+                   << " error: " << statusToString(status).c_str();
         return false;
     }
     server->setRootObject(AIBinder_toPlatformBinder(service));
@@ -41,8 +45,9 @@
 
 AIBinder* RpcClient(unsigned int cid, unsigned int port) {
     auto session = RpcSession::make();
-    if (!session->setupVsockClient(cid, port)) {
-        LOG(ERROR) << "Failed to set up vsock client with CID " << cid << " and port " << port;
+    if (status_t status = session->setupVsockClient(cid, port); status != OK) {
+        LOG(ERROR) << "Failed to set up vsock client with CID " << cid << " and port " << port
+                   << " error: " << statusToString(status).c_str();
         return nullptr;
     }
     return AIBinder_fromPlatformBinder(session->getRootObject());
diff --git a/libs/binder/ndk/include_cpp/android/binder_interface_utils.h b/libs/binder/ndk/include_cpp/android/binder_interface_utils.h
index 70a3906..5de64f8 100644
--- a/libs/binder/ndk/include_cpp/android/binder_interface_utils.h
+++ b/libs/binder/ndk/include_cpp/android/binder_interface_utils.h
@@ -43,23 +43,37 @@
 namespace ndk {
 
 /**
+ * analog using std::shared_ptr for internally held refcount
+ *
  * ref must be called at least one time during the lifetime of this object. The recommended way to
  * construct this object is with SharedRefBase::make.
- *
- * Note - this class used to not inherit from enable_shared_from_this, so
- * std::make_shared works, but it won't be portable against old copies of this
- * class.
  */
-class SharedRefBase : public std::enable_shared_from_this<SharedRefBase> {
+class SharedRefBase {
    public:
     SharedRefBase() {}
-    virtual ~SharedRefBase() {}
+    virtual ~SharedRefBase() {
+        std::call_once(mFlagThis, [&]() {
+            __assert(__FILE__, __LINE__, "SharedRefBase: no ref created during lifetime");
+        });
+
+        if (ref() != nullptr) {
+            __assert(__FILE__, __LINE__,
+                     "SharedRefBase: destructed but still able to lock weak_ptr. Is this object "
+                     "double-owned?");
+        }
+    }
 
     /**
      * A shared_ptr must be held to this object when this is called. This must be called once during
      * the lifetime of this object.
      */
-    std::shared_ptr<SharedRefBase> ref() { return shared_from_this(); }
+    std::shared_ptr<SharedRefBase> ref() {
+        std::shared_ptr<SharedRefBase> thiz = mThis.lock();
+
+        std::call_once(mFlagThis, [&]() { mThis = thiz = std::shared_ptr<SharedRefBase>(this); });
+
+        return thiz;
+    }
 
     /**
      * Convenience method for a ref (see above) which automatically casts to the desired child type.
@@ -78,13 +92,8 @@
 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
         T* t = new T(std::forward<Args>(args)...);
 #pragma clang diagnostic pop
-
-        // T may have a private (though necessarily virtual!) destructor, so we
-        // can't use refbase. For getting the first ref, we don't use ref()
-        // since the internal shared_ptr isn't guaranteed to exist yet.
-        SharedRefBase* base = static_cast<SharedRefBase*>(t);
-        std::shared_ptr<SharedRefBase> strongBase(base);
-        return std::static_pointer_cast<T>(strongBase);
+        // warning: Potential leak of memory pointed to by 't' [clang-analyzer-unix.Malloc]
+        return t->template ref<T>();  // NOLINT(clang-analyzer-unix.Malloc)
     }
 
     static void operator delete(void* p) { std::free(p); }
@@ -97,9 +106,13 @@
 #if !defined(__ANDROID_API__) || __ANDROID_API__ >= 30 || defined(__ANDROID_APEX__)
    private:
 #else
-    [[deprecated("Prefer SharedRefBase::make<T>(...) or std::make_shared<T>() if possible.")]]
+    [[deprecated("Prefer SharedRefBase::make<T>(...) if possible.")]]
 #endif
     static void* operator new(size_t s) { return std::malloc(s); }
+
+   private:
+    std::once_flag mFlagThis;
+    std::weak_ptr<SharedRefBase> mThis;
 };
 
 /**
diff --git a/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp b/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp
index 94cc086..b5c06e9 100644
--- a/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp
+++ b/libs/binder/ndk/tests/libbinder_ndk_unit_test.cpp
@@ -54,15 +54,6 @@
 constexpr uint64_t kContextTestValue = 0xb4e42fb4d9a1d715;
 
 class MyBinderNdkUnitTest : public aidl::BnBinderNdkUnitTest {
-   public:
-    MyBinderNdkUnitTest() = default;
-    MyBinderNdkUnitTest(bool* deleted) : deleted(deleted) {}
-    ~MyBinderNdkUnitTest() {
-        if (deleted) {
-            *deleted = true;
-        }
-    }
-
     ndk::ScopedAStatus repeatInt(int32_t in, int32_t* out) {
         *out = in;
         return ndk::ScopedAStatus::ok();
@@ -131,7 +122,6 @@
     }
 
     uint64_t contextTestValue = kContextTestValue;
-    bool* deleted = nullptr;
 };
 
 int generatedService() {
@@ -234,25 +224,15 @@
     return true;
 }
 
-TEST(NdkBinder, MakeShared) {
-    const char* kInstance = "make_shared_test_instance";
-    bool deleted = false;
+TEST(NdkBinder, DetectDoubleOwn) {
+    auto badService = ndk::SharedRefBase::make<MyBinderNdkUnitTest>();
+    EXPECT_DEATH(std::shared_ptr<MyBinderNdkUnitTest>(badService.get()),
+                 "Is this object double-owned?");
+}
 
-    {
-        auto service = std::make_shared<MyBinderNdkUnitTest>(&deleted);
-        auto binder = service->asBinder();
-        ASSERT_EQ(EX_NONE, AServiceManager_addService(binder.get(), kInstance));
-        auto binder2 = ndk::SpAIBinder(AServiceManager_checkService(kInstance));
-        ASSERT_EQ(binder.get(), binder2.get());
-
-        // overwrite service
-        ASSERT_EQ(EX_NONE,
-                  AServiceManager_addService(
-                          std::make_shared<MyBinderNdkUnitTest>(&deleted)->asBinder().get(),
-                          kInstance));
-    }
-
-    EXPECT_TRUE(deleted);
+TEST(NdkBinder, DetectNoSharedRefBaseCreated) {
+    EXPECT_DEATH(std::make_shared<MyBinderNdkUnitTest>(),
+                 "SharedRefBase: no ref created during lifetime");
 }
 
 TEST(NdkBinder, GetServiceThatDoesntExist) {
diff --git a/libs/binder/rust/src/parcel/file_descriptor.rs b/libs/binder/rust/src/parcel/file_descriptor.rs
index 20e9178..179b7c8 100644
--- a/libs/binder/rust/src/parcel/file_descriptor.rs
+++ b/libs/binder/rust/src/parcel/file_descriptor.rs
@@ -23,7 +23,7 @@
 use crate::sys;
 
 use std::fs::File;
-use std::os::unix::io::{AsRawFd, FromRawFd};
+use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
 
 /// Rust version of the Java class android.os.ParcelFileDescriptor
 #[derive(Debug)]
@@ -48,6 +48,12 @@
     }
 }
 
+impl AsRawFd for ParcelFileDescriptor {
+    fn as_raw_fd(&self) -> RawFd {
+        self.0.as_raw_fd()
+    }
+}
+
 impl Serialize for ParcelFileDescriptor {
     fn serialize(&self, parcel: &mut Parcel) -> Result<()> {
         let fd = self.0.as_raw_fd();
diff --git a/libs/binder/servicedispatcher.cpp b/libs/binder/servicedispatcher.cpp
index a6e3f7d..9811cdf 100644
--- a/libs/binder/servicedispatcher.cpp
+++ b/libs/binder/servicedispatcher.cpp
@@ -23,9 +23,12 @@
 #include <android-base/logging.h>
 #include <android-base/properties.h>
 #include <android-base/stringprintf.h>
+#include <android/debug/BnAdbCallback.h>
+#include <android/debug/IAdbManager.h>
 #include <android/os/BnServiceManager.h>
 #include <android/os/IServiceManager.h>
 #include <binder/IServiceManager.h>
+#include <binder/ProcessState.h>
 #include <binder/RpcServer.h>
 
 using android::BBinder;
@@ -33,6 +36,7 @@
 using android::OK;
 using android::RpcServer;
 using android::sp;
+using android::status_t;
 using android::statusToString;
 using android::String16;
 using android::base::Basename;
@@ -49,6 +53,7 @@
 
 const char* kLocalInetAddress = "127.0.0.1";
 using ServiceRetriever = decltype(&android::IServiceManager::checkService);
+using android::debug::IAdbManager;
 
 int Usage(const char* program) {
     auto basename = Basename(program);
@@ -87,8 +92,8 @@
     }
     rpcServer->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
     unsigned int port;
-    if (!rpcServer->setupInetServer(kLocalInetAddress, 0, &port)) {
-        LOG(ERROR) << "setupInetServer failed";
+    if (status_t status = rpcServer->setupInetServer(kLocalInetAddress, 0, &port); status != OK) {
+        LOG(ERROR) << "setupInetServer failed: " << statusToString(status);
         return EX_SOFTWARE;
     }
     auto socket = rpcServer->releaseServer();
@@ -200,8 +205,8 @@
     rpcServer->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
     rpcServer->setRootObject(service);
     unsigned int port;
-    if (!rpcServer->setupInetServer(kLocalInetAddress, 0, &port)) {
-        LOG(ERROR) << "Unable to set up inet server";
+    if (status_t status = rpcServer->setupInetServer(kLocalInetAddress, 0, &port); status != OK) {
+        LOG(ERROR) << "Unable to set up inet server: " << statusToString(status);
         return EX_SOFTWARE;
     }
     LOG(INFO) << "Finish wrapping servicemanager with RPC on port " << port;
@@ -212,6 +217,25 @@
     __builtin_unreachable();
 }
 
+class AdbCallback : public android::debug::BnAdbCallback {
+public:
+    android::binder::Status onDebuggingChanged(bool enabled,
+                                               android::debug::AdbTransportType) override {
+        if (!enabled) {
+            LOG(ERROR) << "ADB debugging disabled, exiting.";
+            exit(EX_SOFTWARE);
+        }
+        return android::binder::Status::ok();
+    }
+};
+
+void exitOnAdbDebuggingDisabled() {
+    auto adb = android::waitForService<IAdbManager>(String16("adb"));
+    CHECK(adb != nullptr) << "Unable to retrieve service adb";
+    auto status = adb->registerCallback(sp<AdbCallback>::make());
+    CHECK(status.isOk()) << "Unable to call IAdbManager::registerCallback: " << status;
+}
+
 // Log to logd. For warning and more severe messages, also log to stderr.
 class ServiceDispatcherLogger {
 public:
@@ -252,6 +276,10 @@
         }
     }
 
+    android::ProcessState::self()->setThreadPoolMaxThreadCount(1);
+    android::ProcessState::self()->startThreadPool();
+    exitOnAdbDebuggingDisabled();
+
     if (optind + 1 != argc) return Usage(argv[0]);
     auto name = argv[optind];
 
diff --git a/libs/binder/tests/IBinderRpcBenchmark.aidl b/libs/binder/tests/IBinderRpcBenchmark.aidl
index 1457422..2baf680 100644
--- a/libs/binder/tests/IBinderRpcBenchmark.aidl
+++ b/libs/binder/tests/IBinderRpcBenchmark.aidl
@@ -17,4 +17,5 @@
 interface IBinderRpcBenchmark {
     @utf8InCpp String repeatString(@utf8InCpp String str);
     IBinder repeatBinder(IBinder binder);
+    byte[] repeatBytes(in byte[] bytes);
 }
diff --git a/libs/binder/tests/binderHostDeviceTest.cpp b/libs/binder/tests/binderHostDeviceTest.cpp
index 5dd9212..3f72b8f 100644
--- a/libs/binder/tests/binderHostDeviceTest.cpp
+++ b/libs/binder/tests/binderHostDeviceTest.cpp
@@ -101,8 +101,9 @@
 
     [[nodiscard]] static sp<IBinder> get(unsigned int hostPort) {
         auto rpcSession = RpcSession::make();
-        if (!rpcSession->setupInetClient("127.0.0.1", hostPort)) {
-            ADD_FAILURE() << "Failed to setupInetClient on " << hostPort;
+        if (status_t status = rpcSession->setupInetClient("127.0.0.1", hostPort); status != OK) {
+            ADD_FAILURE() << "Failed to setupInetClient on " << hostPort << ": "
+                          << statusToString(status);
             return nullptr;
         }
         return rpcSession->getRootObject();
diff --git a/libs/binder/tests/binderLibTest.cpp b/libs/binder/tests/binderLibTest.cpp
index 65db7f6..eea7d8c 100644
--- a/libs/binder/tests/binderLibTest.cpp
+++ b/libs/binder/tests/binderLibTest.cpp
@@ -1192,8 +1192,8 @@
         if (rpcServer == nullptr) return {};
         rpcServer->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
         unsigned int port;
-        if (!rpcServer->setupInetServer("127.0.0.1", 0, &port)) {
-            ADD_FAILURE() << "setupInetServer failed";
+        if (status_t status = rpcServer->setupInetServer("127.0.0.1", 0, &port); status != OK) {
+            ADD_FAILURE() << "setupInetServer failed" << statusToString(status);
             return {};
         }
         return {rpcServer->releaseServer(), port};
diff --git a/libs/binder/tests/binderRpcBenchmark.cpp b/libs/binder/tests/binderRpcBenchmark.cpp
index 5f4a7b5..0c452ff 100644
--- a/libs/binder/tests/binderRpcBenchmark.cpp
+++ b/libs/binder/tests/binderRpcBenchmark.cpp
@@ -42,6 +42,8 @@
 using android::RpcServer;
 using android::RpcSession;
 using android::sp;
+using android::status_t;
+using android::statusToString;
 using android::String16;
 using android::binder::Status;
 
@@ -50,8 +52,12 @@
         *out = str;
         return Status::ok();
     }
-    Status repeatBinder(const sp<IBinder>& str, sp<IBinder>* out) override {
-        *out = str;
+    Status repeatBinder(const sp<IBinder>& binder, sp<IBinder>* out) override {
+        *out = binder;
+        return Status::ok();
+    }
+    Status repeatBytes(const std::vector<uint8_t>& bytes, std::vector<uint8_t>* out) override {
+        *out = bytes;
         return Status::ok();
     }
 };
@@ -61,12 +67,11 @@
     RPC,
 };
 
-static void EachTransport(benchmark::internal::Benchmark* b) {
+static const std::initializer_list<int64_t> kTransportList = {
 #ifdef __BIONIC__
-    b->Args({Transport::KERNEL});
+        Transport::KERNEL,
 #endif
-    b->Args({Transport::RPC});
-}
+        Transport::RPC};
 
 static sp<RpcSession> gSession = RpcSession::make();
 #ifdef __BIONIC__
@@ -96,9 +101,9 @@
         CHECK_EQ(OK, binder->pingBinder());
     }
 }
-BENCHMARK(BM_pingTransaction)->Apply(EachTransport);
+BENCHMARK(BM_pingTransaction)->ArgsProduct({kTransportList});
 
-void BM_repeatString(benchmark::State& state) {
+void BM_repeatTwoPageString(benchmark::State& state) {
     sp<IBinder> binder = getBinderForOptions(state);
 
     sp<IBinderRpcBenchmark> iface = interface_cast<IBinderRpcBenchmark>(binder);
@@ -125,7 +130,27 @@
         CHECK(ret.isOk()) << ret;
     }
 }
-BENCHMARK(BM_repeatString)->Apply(EachTransport);
+BENCHMARK(BM_repeatTwoPageString)->ArgsProduct({kTransportList});
+
+void BM_throughputForTransportAndBytes(benchmark::State& state) {
+    sp<IBinder> binder = getBinderForOptions(state);
+    sp<IBinderRpcBenchmark> iface = interface_cast<IBinderRpcBenchmark>(binder);
+    CHECK(iface != nullptr);
+
+    std::vector<uint8_t> bytes = std::vector<uint8_t>(state.range(1));
+    for (size_t i = 0; i < bytes.size(); i++) {
+        bytes[i] = i % 256;
+    }
+
+    while (state.KeepRunning()) {
+        std::vector<uint8_t> out;
+        Status ret = iface->repeatBytes(bytes, &out);
+        CHECK(ret.isOk()) << ret;
+    }
+}
+BENCHMARK(BM_throughputForTransportAndBytes)
+        ->ArgsProduct({kTransportList,
+                       {64, 1024, 2048, 4096, 8182, 16364, 32728, 65535, 65536, 65537}});
 
 void BM_repeatBinder(benchmark::State& state) {
     sp<IBinder> binder = gSession->getRootObject();
@@ -142,7 +167,7 @@
         CHECK(ret.isOk()) << ret;
     }
 }
-BENCHMARK(BM_repeatBinder)->Apply(EachTransport);
+BENCHMARK(BM_repeatBinder)->ArgsProduct({kTransportList});
 
 int main(int argc, char** argv) {
     ::benchmark::Initialize(&argc, argv);
@@ -152,15 +177,15 @@
     (void)unlink(addr.c_str());
 
     std::cerr << "Tests suffixes:" << std::endl;
-    std::cerr << "\t\\" << Transport::KERNEL << " is KERNEL" << std::endl;
-    std::cerr << "\t\\" << Transport::RPC << " is RPC" << std::endl;
+    std::cerr << "\t.../" << Transport::KERNEL << " is KERNEL" << std::endl;
+    std::cerr << "\t.../" << Transport::RPC << " is RPC" << std::endl;
 
     if (0 == fork()) {
         prctl(PR_SET_PDEATHSIG, SIGHUP); // racey, okay
         sp<RpcServer> server = RpcServer::make();
         server->setRootObject(sp<MyBinderRpcBenchmark>::make());
         server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
-        CHECK(server->setupUnixDomainServer(addr.c_str()));
+        CHECK_EQ(OK, server->setupUnixDomainServer(addr.c_str()));
         server->join();
         exit(1);
     }
@@ -182,11 +207,13 @@
     CHECK_NE(nullptr, gKernelBinder.get());
 #endif
 
+    status_t status;
     for (size_t tries = 0; tries < 5; tries++) {
         usleep(10000);
-        if (gSession->setupUnixDomainClient(addr.c_str())) goto success;
+        status = gSession->setupUnixDomainClient(addr.c_str());
+        if (status == OK) goto success;
     }
-    LOG(FATAL) << "Could not connect.";
+    LOG(FATAL) << "Could not connect: " << statusToString(status).c_str();
 success:
 
     ::benchmark::RunSpecifiedBenchmarks();
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 98839f7..15ccae9 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -42,6 +42,7 @@
 #include <sys/prctl.h>
 #include <unistd.h>
 
+#include "../RpcSocketAddress.h" // for testing preconnected clients
 #include "../RpcState.h"   // for debugging
 #include "../vm_sockets.h" // for VMADDR_*
 
@@ -88,7 +89,7 @@
     auto server = RpcServer::make(newFactory(GetParam()));
     server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
     ASSERT_FALSE(server->hasServer());
-    ASSERT_TRUE(server->setupExternalServer(std::move(sink)));
+    ASSERT_EQ(OK, server->setupExternalServer(std::move(sink)));
     ASSERT_TRUE(server->hasServer());
     base::unique_fd retrieved = server->releaseServer();
     ASSERT_FALSE(server->hasServer());
@@ -409,12 +410,15 @@
 };
 
 enum class SocketType {
+    PRECONNECTED,
     UNIX,
     VSOCK,
     INET,
 };
 static inline std::string PrintToString(SocketType socketType) {
     switch (socketType) {
+        case SocketType::PRECONNECTED:
+            return "preconnected_uds";
         case SocketType::UNIX:
             return "unix_domain_socket";
         case SocketType::VSOCK:
@@ -427,6 +431,20 @@
     }
 }
 
+static base::unique_fd connectToUds(const char* addrStr) {
+    UnixSocketAddress addr(addrStr);
+    base::unique_fd serverFd(
+            TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
+    int savedErrno = errno;
+    CHECK(serverFd.ok()) << "Could not create socket " << addrStr << ": " << strerror(savedErrno);
+
+    if (0 != TEMP_FAILURE_RETRY(connect(serverFd.get(), addr.addr(), addr.addrSize()))) {
+        int savedErrno = errno;
+        LOG(FATAL) << "Could not connect to socket " << addrStr << ": " << strerror(savedErrno);
+    }
+    return serverFd;
+}
+
 class BinderRpc : public ::testing::TestWithParam<std::tuple<SocketType, RpcSecurity>> {
 public:
     struct Options {
@@ -463,14 +481,16 @@
                     unsigned int outPort = 0;
 
                     switch (socketType) {
+                        case SocketType::PRECONNECTED:
+                            [[fallthrough]];
                         case SocketType::UNIX:
-                            CHECK(server->setupUnixDomainServer(addr.c_str())) << addr;
+                            CHECK_EQ(OK, server->setupUnixDomainServer(addr.c_str())) << addr;
                             break;
                         case SocketType::VSOCK:
-                            CHECK(server->setupVsockServer(vsockPort));
+                            CHECK_EQ(OK, server->setupVsockServer(vsockPort));
                             break;
                         case SocketType::INET: {
-                            CHECK(server->setupInetServer(kLocalInetAddress, 0, &outPort));
+                            CHECK_EQ(OK, server->setupInetServer(kLocalInetAddress, 0, &outPort));
                             CHECK_NE(0, outPort);
                             break;
                         }
@@ -496,24 +516,35 @@
             CHECK_NE(0, outPort);
         }
 
+        status_t status;
+
         for (size_t i = 0; i < options.numSessions; i++) {
             sp<RpcSession> session = RpcSession::make(newFactory(rpcSecurity));
             session->setMaxThreads(options.numIncomingConnections);
 
             switch (socketType) {
+                case SocketType::PRECONNECTED:
+                    status = session->setupPreconnectedClient({}, [=]() {
+                        return connectToUds(addr.c_str());
+                    });
+                    if (status == OK) goto success;
+                    break;
                 case SocketType::UNIX:
-                    if (session->setupUnixDomainClient(addr.c_str())) goto success;
+                    status = session->setupUnixDomainClient(addr.c_str());
+                    if (status == OK) goto success;
                     break;
                 case SocketType::VSOCK:
-                    if (session->setupVsockClient(VMADDR_CID_LOCAL, vsockPort)) goto success;
+                    status = session->setupVsockClient(VMADDR_CID_LOCAL, vsockPort);
+                    if (status == OK) goto success;
                     break;
                 case SocketType::INET:
-                    if (session->setupInetClient("127.0.0.1", outPort)) goto success;
+                    status = session->setupInetClient("127.0.0.1", outPort);
+                    if (status == OK) goto success;
                     break;
                 default:
                     LOG_ALWAYS_FATAL("Unknown socket type");
             }
-            LOG_ALWAYS_FATAL("Could not connect");
+            LOG_ALWAYS_FATAL("Could not connect %s", statusToString(status).c_str());
         success:
             ret.sessions.push_back({session, session->getRootObject()});
         }
@@ -1165,18 +1196,23 @@
     unsigned int vsockPort = allocateVsockPort();
     sp<RpcServer> server = RpcServer::make(RpcTransportCtxFactoryRaw::make());
     server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
-    CHECK(server->setupVsockServer(vsockPort));
+    if (status_t status = server->setupVsockServer(vsockPort); status != OK) {
+        if (status == -EAFNOSUPPORT) {
+            return false;
+        }
+        LOG_ALWAYS_FATAL("Could not setup vsock server: %s", statusToString(status).c_str());
+    }
     server->start();
 
     sp<RpcSession> session = RpcSession::make(RpcTransportCtxFactoryRaw::make());
-    bool okay = session->setupVsockClient(VMADDR_CID_LOCAL, vsockPort);
+    status_t status = session->setupVsockClient(VMADDR_CID_LOCAL, vsockPort);
     while (!server->shutdown()) usleep(10000);
-    ALOGE("Detected vsock loopback supported: %d", okay);
-    return okay;
+    ALOGE("Detected vsock loopback supported: %s", statusToString(status).c_str());
+    return status == OK;
 }
 
 static std::vector<SocketType> testSocketTypes() {
-    std::vector<SocketType> ret = {SocketType::UNIX, SocketType::INET};
+    std::vector<SocketType> ret = {SocketType::PRECONNECTED, SocketType::UNIX, SocketType::INET};
 
     static bool hasVsockLoopback = testSupportVsockLoopback();
 
@@ -1248,7 +1284,7 @@
     unlink(addr.c_str());
     auto server = RpcServer::make(newFactory(GetParam()));
     server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
-    ASSERT_TRUE(server->setupUnixDomainServer(addr.c_str()));
+    ASSERT_EQ(OK, server->setupUnixDomainServer(addr.c_str()));
     auto joinEnds = std::make_shared<OneOffSignal>();
 
     // If things are broken and the thread never stops, don't block other tests. Because the thread
@@ -1289,14 +1325,14 @@
     auto rpcServer = RpcServer::make();
     rpcServer->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
     unsigned int port;
-    ASSERT_TRUE(rpcServer->setupInetServer(kLocalInetAddress, 0, &port));
+    ASSERT_EQ(OK, rpcServer->setupInetServer(kLocalInetAddress, 0, &port));
     auto socket = rpcServer->releaseServer();
 
     auto keepAlive = sp<BBinder>::make();
     ASSERT_EQ(OK, binder->setRpcClientDebug(std::move(socket), keepAlive));
 
     auto rpcSession = RpcSession::make();
-    ASSERT_TRUE(rpcSession->setupInetClient("127.0.0.1", port));
+    ASSERT_EQ(OK, rpcSession->setupInetClient("127.0.0.1", port));
     auto rpcBinder = rpcSession->getRootObject();
     ASSERT_NE(nullptr, rpcBinder);
 
diff --git a/libs/binder/tests/parcel_fuzzer/random_parcel.cpp b/libs/binder/tests/parcel_fuzzer/random_parcel.cpp
index a2472f8..8bf04cc 100644
--- a/libs/binder/tests/parcel_fuzzer/random_parcel.cpp
+++ b/libs/binder/tests/parcel_fuzzer/random_parcel.cpp
@@ -37,7 +37,7 @@
 void fillRandomParcel(Parcel* p, FuzzedDataProvider&& provider) {
     if (provider.ConsumeBool()) {
         auto session = RpcSession::make(RpcTransportCtxFactoryRaw::make());
-        CHECK(session->addNullDebuggingClient());
+        CHECK_EQ(OK, session->addNullDebuggingClient());
         p->markForRpc(session);
         fillRandomParcelData(p, std::move(provider));
         return;
diff --git a/libs/binder/tests/rpc_fuzzer/main.cpp b/libs/binder/tests/rpc_fuzzer/main.cpp
index 9fc496f..230f5c7 100644
--- a/libs/binder/tests/rpc_fuzzer/main.cpp
+++ b/libs/binder/tests/rpc_fuzzer/main.cpp
@@ -60,7 +60,7 @@
     sp<RpcServer> server = RpcServer::make();
     server->setRootObject(sp<SomeBinder>::make());
     server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
-    CHECK(server->setupUnixDomainServer(kSock.c_str()));
+    CHECK_EQ(OK, server->setupUnixDomainServer(kSock.c_str()));
 
     std::thread serverThread([=] { (void)server->join(); });