diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 2d2eed2..60be406 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -307,13 +307,15 @@
         }
 
         detachGuard.Disable();
-        session->preJoin(std::move(thisThread));
+        session->preJoinThreadOwnership(std::move(thisThread));
     }
 
+    auto setupResult = session->preJoinSetup(std::move(clientFd));
+
     // avoid strong cycle
     server = nullptr;
 
-    RpcSession::join(std::move(session), std::move(clientFd));
+    RpcSession::join(std::move(session), std::move(setupResult));
 }
 
 bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index de9aa22..2a230d2 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -236,7 +236,7 @@
     }
 }
 
-void RpcSession::preJoin(std::thread thread) {
+void RpcSession::preJoinThreadOwnership(std::thread thread) {
     LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");
 
     {
@@ -245,20 +245,36 @@
     }
 }
 
-void RpcSession::join(sp<RpcSession>&& session, unique_fd client) {
+RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
     // must be registered to allow arbitrary client code executing commands to
     // be able to do nested calls (we can't only read from it)
-    sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client));
+    sp<RpcConnection> connection = assignServerToThisThread(std::move(fd));
 
-    while (true) {
-        status_t error = session->state()->getAndExecuteCommand(connection->fd, session,
-                                                                RpcState::CommandType::ANY);
+    status_t status =
+            mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
 
-        if (error != OK) {
-            LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
-                           statusToString(error).c_str());
-            break;
+    return PreJoinSetupResult{
+            .connection = std::move(connection),
+            .status = status,
+    };
+}
+
+void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult) {
+    sp<RpcConnection>& connection = setupResult.connection;
+
+    if (setupResult.status == OK) {
+        while (true) {
+            status_t status = session->state()->getAndExecuteCommand(connection->fd, session,
+                                                                     RpcState::CommandType::ANY);
+            if (status != OK) {
+                LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
+                               statusToString(status).c_str());
+                break;
+            }
         }
+    } else {
+        ALOGE("Connection failed to init, closing with status %s",
+              statusToString(setupResult.status).c_str());
     }
 
     LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection),
@@ -381,14 +397,17 @@
                 unique_fd fd = std::move(serverFd);
                 // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
                 sp<RpcSession> session = thiz;
-                session->preJoin(std::move(thread));
-                ownershipTransferred = true;
-                joinCv.notify_one();
+                session->preJoinThreadOwnership(std::move(thread));
 
+                // only continue once we have a response or the connection fails
+                auto setupResult = session->preJoinSetup(std::move(fd));
+
+                ownershipTransferred = true;
                 threadLock.unlock();
+                joinCv.notify_one();
                 // do not use & vars below
 
-                RpcSession::join(std::move(session), std::move(fd));
+                RpcSession::join(std::move(session), std::move(setupResult));
             });
             joinCv.wait(lock, [&] { return ownershipTransferred; });
             LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
@@ -403,20 +422,32 @@
 }
 
 bool RpcSession::addClientConnection(unique_fd fd) {
-    std::lock_guard<std::mutex> _l(mMutex);
+    sp<RpcConnection> connection = sp<RpcConnection>::make();
+    {
+        std::lock_guard<std::mutex> _l(mMutex);
 
-    // first client connection added, but setForServer not called, so
-    // initializaing for a client.
-    if (mShutdownTrigger == nullptr) {
-        mShutdownTrigger = FdTrigger::make();
-        mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
-        if (mShutdownTrigger == nullptr) return false;
+        // first client connection added, but setForServer not called, so
+        // initializaing for a client.
+        if (mShutdownTrigger == nullptr) {
+            mShutdownTrigger = FdTrigger::make();
+            mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
+            if (mShutdownTrigger == nullptr) return false;
+        }
+
+        connection->fd = std::move(fd);
+        connection->exclusiveTid = gettid();
+        mClientConnections.push_back(connection);
     }
 
-    sp<RpcConnection> session = sp<RpcConnection>::make();
-    session->fd = std::move(fd);
-    mClientConnections.push_back(session);
-    return true;
+    status_t status =
+            mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
+
+    {
+        std::lock_guard<std::mutex> _l(mMutex);
+        connection->exclusiveTid = std::nullopt;
+    }
+
+    return status == OK;
 }
 
 bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
@@ -519,7 +550,9 @@
         // in regular binder, this would usually be a deadlock :)
         LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0,
                             "Session has no client connections. This is required for an RPC server "
-                            "to make any non-nested (e.g. oneway or on another thread) calls.");
+                            "to make any non-nested (e.g. oneway or on another thread) calls. "
+                            "Use: %d. Server connections: %zu",
+                            static_cast<int>(use), mSession->mServerConnections.size());
 
         LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...",
                        mSession->mClientConnections.size(), mSession->mServerConnections.size());
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 62eb58a..53eba5a 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -265,6 +265,27 @@
     return OK;
 }
 
+status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
+    RpcClientConnectionInit init{
+            .msg = RPC_CONNECTION_INIT_OKAY,
+    };
+    return rpcSend(fd, session, "connection init", &init, sizeof(init));
+}
+
+status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
+    RpcClientConnectionInit init;
+    if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK)
+        return status;
+
+    static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY));
+    if (0 != strncmp(init.msg, RPC_CONNECTION_INIT_OKAY, sizeof(init.msg))) {
+        ALOGE("Connection init message unrecognized %.*s", static_cast<int>(sizeof(init.msg)),
+              init.msg);
+        return BAD_VALUE;
+    }
+    return OK;
+}
+
 sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) {
     Parcel data;
     data.markForRpc(session);
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index db142a1..5bfef69 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -51,6 +51,9 @@
     RpcState();
     ~RpcState();
 
+    status_t sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
+    status_t readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
+
     // TODO(b/182940634): combine some special transactions into one "getServerInfo" call?
     sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session);
     status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session,
diff --git a/libs/binder/RpcWireFormat.h b/libs/binder/RpcWireFormat.h
index 649c1ee..b5e5bc1 100644
--- a/libs/binder/RpcWireFormat.h
+++ b/libs/binder/RpcWireFormat.h
@@ -26,12 +26,28 @@
     RPC_CONNECTION_OPTION_REVERSE = 0x1,
 };
 
+/**
+ * This is sent to an RpcServer in order to request a new connection is created,
+ * either as part of a new session or an existing session
+ */
 struct RpcConnectionHeader {
     int32_t sessionId;
     uint8_t options;
     uint8_t reserved[3];
 };
 
+#define RPC_CONNECTION_INIT_OKAY "cci"
+
+/**
+ * Whenever a client connection is setup, this is sent as the initial
+ * transaction. The main use of this is in order to control the timing for when
+ * a reverse connection is setup.
+ */
+struct RpcClientConnectionInit {
+    char msg[4];
+    uint8_t reserved[4];
+};
+
 enum : uint32_t {
     /**
      * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 4650cf2..6ad15f2 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -185,13 +185,6 @@
         bool mShutdown = false;
     };
 
-    status_t readId();
-
-    // transfer ownership of thread
-    void preJoin(std::thread thread);
-    // join on thread passed to preJoin
-    static void join(sp<RpcSession>&& session, base::unique_fd client);
-
     struct RpcConnection : public RefBase {
         base::unique_fd fd;
 
@@ -200,6 +193,27 @@
         std::optional<pid_t> exclusiveTid;
     };
 
+    status_t readId();
+
+    // A thread joining a server must always call these functions in order, and
+    // cleanup is only programmed once into join. These are in separate
+    // functions in order to allow for different locks to be taken during
+    // different parts of setup.
+    //
+    // transfer ownership of thread (usually done while a lock is taken on the
+    // structure which originally owns the thread)
+    void preJoinThreadOwnership(std::thread thread);
+    // pass FD to thread and read initial connection information
+    struct PreJoinSetupResult {
+        // Server connection object associated with this
+        sp<RpcConnection> connection;
+        // Status of setup
+        status_t status;
+    };
+    PreJoinSetupResult preJoinSetup(base::unique_fd fd);
+    // join on thread passed to preJoinThreadOwnership
+    static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result);
+
     [[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address);
     [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId,
                                                 bool server);
