Merge changes Ic46ac54b,Id787e70f

* changes:
  Implement keepAliveBinder for setRpcClientDebug
  Add keepAliveBinder argument to setRpcClientDebug.
diff --git a/libs/binder/RpcAddress.cpp b/libs/binder/RpcAddress.cpp
index 5c32320..98dee9a 100644
--- a/libs/binder/RpcAddress.cpp
+++ b/libs/binder/RpcAddress.cpp
@@ -29,7 +29,7 @@
 }
 
 bool RpcAddress::isZero() const {
-    RpcWireAddress ZERO{0};
+    RpcWireAddress ZERO{.options = 0};
     return memcmp(mRawAddr.get(), &ZERO, sizeof(RpcWireAddress)) == 0;
 }
 
@@ -51,13 +51,34 @@
     close(fd);
 }
 
-RpcAddress RpcAddress::unique() {
+RpcAddress RpcAddress::random(bool forServer) {
+    // The remainder of this header acts as reserved space for different kinds
+    // of binder objects.
+    uint64_t options = RPC_WIRE_ADDRESS_OPTION_CREATED;
+
+    // servers and clients allocate addresses independently, so this bit can
+    // tell you where an address originates
+    if (forServer) options |= RPC_WIRE_ADDRESS_OPTION_FOR_SERVER;
+
     RpcAddress ret;
-    ReadRandomBytes((uint8_t*)ret.mRawAddr.get(), sizeof(RpcWireAddress));
+    RpcWireAddress* raw = ret.mRawAddr.get();
+
+    raw->options = options;
+    ReadRandomBytes(raw->address, sizeof(raw->address));
+
     LOG_RPC_DETAIL("Creating new address: %s", ret.toString().c_str());
     return ret;
 }
 
+bool RpcAddress::isForServer() const {
+    return mRawAddr.get()->options & RPC_WIRE_ADDRESS_OPTION_FOR_SERVER;
+}
+
+bool RpcAddress::isRecognizedType() const {
+    uint64_t allKnownOptions = RPC_WIRE_ADDRESS_OPTION_CREATED | RPC_WIRE_ADDRESS_OPTION_FOR_SERVER;
+    return (mRawAddr.get()->options & ~allKnownOptions) == 0;
+}
+
 RpcAddress RpcAddress::fromRawEmbedded(const RpcWireAddress* raw) {
     RpcAddress addr;
     memcpy(addr.mRawAddr.get(), raw, sizeof(RpcWireAddress));
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 60be406..a8f3fa8 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -270,14 +270,25 @@
             return;
         }
 
-        if (header.sessionId == RPC_SESSION_ID_NEW) {
+        RpcAddress sessionId = RpcAddress::fromRawEmbedded(&header.sessionId);
+
+        if (sessionId.isZero()) {
             if (reverse) {
                 ALOGE("Cannot create a new session with a reverse connection, would leak");
                 return;
             }
 
-            LOG_ALWAYS_FATAL_IF(server->mSessionIdCounter >= INT32_MAX, "Out of session IDs");
-            server->mSessionIdCounter++;
+            RpcAddress sessionId = RpcAddress::zero();
+            size_t tries = 0;
+            do {
+                // don't block if there is some entropy issue
+                if (tries++ > 5) {
+                    ALOGE("Cannot find new address: %s", sessionId.toString().c_str());
+                    return;
+                }
+
+                sessionId = RpcAddress::random(true /*forServer*/);
+            } while (server->mSessions.end() != server->mSessions.find(sessionId));
 
             session = RpcSession::make();
             session->setMaxThreads(server->mMaxThreads);
@@ -285,23 +296,24 @@
                                        sp<RpcServer::EventListener>::fromExisting(
                                                static_cast<RpcServer::EventListener*>(
                                                        server.get())),
-                                       server->mSessionIdCounter)) {
+                                       sessionId)) {
                 ALOGE("Failed to attach server to session");
                 return;
             }
 
-            server->mSessions[server->mSessionIdCounter] = session;
+            server->mSessions[sessionId] = session;
         } else {
-            auto it = server->mSessions.find(header.sessionId);
+            auto it = server->mSessions.find(sessionId);
             if (it == server->mSessions.end()) {
-                ALOGE("Cannot add thread, no record of session with ID %d", header.sessionId);
+                ALOGE("Cannot add thread, no record of session with ID %s",
+                      sessionId.toString().c_str());
                 return;
             }
             session = it->second;
         }
 
         if (reverse) {
-            LOG_ALWAYS_FATAL_IF(!session->addClientConnection(std::move(clientFd)),
+            LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(clientFd), true),
                                 "server state must already be initialized");
             return;
         }
@@ -350,19 +362,21 @@
     return true;
 }
 
-void RpcServer::onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) {
+void RpcServer::onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) {
     auto id = session->mId;
     LOG_ALWAYS_FATAL_IF(id == std::nullopt, "Server sessions must be initialized with ID");
-    LOG_RPC_DETAIL("Dropping session %d", *id);
+    LOG_RPC_DETAIL("Dropping session with address %s", id->toString().c_str());
 
     std::lock_guard<std::mutex> _l(mLock);
     auto it = mSessions.find(*id);
-    LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %d", *id);
-    LOG_ALWAYS_FATAL_IF(it->second != session, "Bad state, session has id mismatch %d", *id);
+    LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %s",
+                        id->toString().c_str());
+    LOG_ALWAYS_FATAL_IF(it->second != session, "Bad state, session has id mismatch %s",
+                        id->toString().c_str());
     (void)mSessions.erase(it);
 }
 
-void RpcServer::onSessionServerThreadEnded() {
+void RpcServer::onSessionIncomingThreadEnded() {
     mShutdownCv.notify_all();
 }
 
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index a759ae3..4f55eef 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -51,7 +51,7 @@
     LOG_RPC_DETAIL("RpcSession destroyed %p", this);
 
     std::lock_guard<std::mutex> _l(mMutex);
-    LOG_ALWAYS_FATAL_IF(mServerConnections.size() != 0,
+    LOG_ALWAYS_FATAL_IF(mIncomingConnections.size() != 0,
                         "Should not be able to destroy a session with servers in use.");
 }
 
@@ -61,10 +61,10 @@
 
 void RpcSession::setMaxThreads(size_t threads) {
     std::lock_guard<std::mutex> _l(mMutex);
-    LOG_ALWAYS_FATAL_IF(!mClientConnections.empty() || !mServerConnections.empty(),
+    LOG_ALWAYS_FATAL_IF(!mOutgoingConnections.empty() || !mIncomingConnections.empty(),
                         "Must set max threads before setting up connections, but has %zu client(s) "
                         "and %zu server(s)",
-                        mClientConnections.size(), mServerConnections.size());
+                        mOutgoingConnections.size(), mIncomingConnections.size());
     mMaxThreads = threads;
 }
 
@@ -100,7 +100,7 @@
         return false;
     }
 
-    return addClientConnection(std::move(serverFd));
+    return addOutgoingConnection(std::move(serverFd), false);
 }
 
 sp<IBinder> RpcSession::getRootObject() {
@@ -108,7 +108,7 @@
     status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                                 ConnectionUse::CLIENT, &connection);
     if (status != OK) return nullptr;
-    return state()->getRootObject(connection.fd(), sp<RpcSession>::fromExisting(this));
+    return state()->getRootObject(connection.get(), sp<RpcSession>::fromExisting(this));
 }
 
 status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
@@ -116,7 +116,7 @@
     status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                                 ConnectionUse::CLIENT, &connection);
     if (status != OK) return status;
-    return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
+    return state()->getMaxThreads(connection.get(), sp<RpcSession>::fromExisting(this), maxThreads);
 }
 
 bool RpcSession::shutdownAndWait(bool wait) {
@@ -146,7 +146,7 @@
                                                                      : ConnectionUse::CLIENT,
                                       &connection);
     if (status != OK) return status;
-    return state()->transact(connection.fd(), binder, code, data,
+    return state()->transact(connection.get(), binder, code, data,
                              sp<RpcSession>::fromExisting(this), reply, flags);
 }
 
@@ -155,7 +155,7 @@
     status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                                 ConnectionUse::CLIENT_REFCOUNT, &connection);
     if (status != OK) return status;
-    return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address);
+    return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address);
 }
 
 std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
@@ -218,28 +218,27 @@
         LOG_ALWAYS_FATAL_IF(mForServer != nullptr, "Can only update ID for client.");
     }
 
-    int32_t id;
-
     ExclusiveConnection connection;
     status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                                 ConnectionUse::CLIENT, &connection);
     if (status != OK) return status;
 
-    status = state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id);
+    mId = RpcAddress::zero();
+    status = state()->getSessionId(connection.get(), sp<RpcSession>::fromExisting(this),
+                                   &mId.value());
     if (status != OK) return status;
 
-    LOG_RPC_DETAIL("RpcSession %p has id %d", this, id);
-    mId = id;
+    LOG_RPC_DETAIL("RpcSession %p has id %s", this, mId->toString().c_str());
     return OK;
 }
 
-void RpcSession::WaitForShutdownListener::onSessionLockedAllServerThreadsEnded(
+void RpcSession::WaitForShutdownListener::onSessionLockedAllIncomingThreadsEnded(
         const sp<RpcSession>& session) {
     (void)session;
     mShutdown = true;
 }
 
-void RpcSession::WaitForShutdownListener::onSessionServerThreadEnded() {
+void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() {
     mCv.notify_all();
 }
 
@@ -263,10 +262,9 @@
 RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
     // must be registered to allow arbitrary client code executing commands to
     // be able to do nested calls (we can't only read from it)
-    sp<RpcConnection> connection = assignServerToThisThread(std::move(fd));
+    sp<RpcConnection> connection = assignIncomingConnectionToThisThread(std::move(fd));
 
-    status_t status =
-            mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
+    status_t status = mState->readConnectionInit(connection, sp<RpcSession>::fromExisting(this));
 
     return PreJoinSetupResult{
             .connection = std::move(connection),
@@ -279,7 +277,7 @@
 
     if (setupResult.status == OK) {
         while (true) {
-            status_t status = session->state()->getAndExecuteCommand(connection->fd, session,
+            status_t status = session->state()->getAndExecuteCommand(connection, session,
                                                                      RpcState::CommandType::ANY);
             if (status != OK) {
                 LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
@@ -292,7 +290,7 @@
               statusToString(setupResult.status).c_str());
     }
 
-    LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection),
+    LOG_ALWAYS_FATAL_IF(!session->removeIncomingConnection(connection),
                         "bad state: connection object guaranteed to be in list");
 
     sp<RpcSession::EventListener> listener;
@@ -309,23 +307,28 @@
     session = nullptr;
 
     if (listener != nullptr) {
-        listener->onSessionServerThreadEnded();
+        listener->onSessionIncomingThreadEnded();
     }
 }
 
-wp<RpcServer> RpcSession::server() {
-    return mForServer;
+sp<RpcServer> RpcSession::server() {
+    RpcServer* unsafeServer = mForServer.unsafe_get();
+    sp<RpcServer> server = mForServer.promote();
+
+    LOG_ALWAYS_FATAL_IF((unsafeServer == nullptr) != (server == nullptr),
+                        "wp<> is to avoid strong cycle only");
+    return server;
 }
 
 bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
     {
         std::lock_guard<std::mutex> _l(mMutex);
-        LOG_ALWAYS_FATAL_IF(mClientConnections.size() != 0,
+        LOG_ALWAYS_FATAL_IF(mOutgoingConnections.size() != 0,
                             "Must only setup session once, but already has %zu clients",
-                            mClientConnections.size());
+                            mOutgoingConnections.size());
     }
 
-    if (!setupOneSocketConnection(addr, RPC_SESSION_ID_NEW, false /*reverse*/)) return false;
+    if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*reverse*/)) return false;
 
     // TODO(b/189955605): we should add additional sessions dynamically
     // instead of all at once.
@@ -362,7 +365,8 @@
     return true;
 }
 
-bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t id, bool reverse) {
+bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id,
+                                          bool reverse) {
     for (size_t tries = 0; tries < 5; tries++) {
         if (tries > 0) usleep(10000);
 
@@ -386,9 +390,9 @@
             return false;
         }
 
-        RpcConnectionHeader header{
-                .sessionId = id,
-        };
+        RpcConnectionHeader header{.options = 0};
+        memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress));
+
         if (reverse) header.options |= RPC_CONNECTION_OPTION_REVERSE;
 
         if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) {
@@ -428,7 +432,7 @@
             LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
             return true;
         } else {
-            return addClientConnection(std::move(serverFd));
+            return addOutgoingConnection(std::move(serverFd), true);
         }
     }
 
@@ -436,7 +440,7 @@
     return false;
 }
 
-bool RpcSession::addClientConnection(unique_fd fd) {
+bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) {
     sp<RpcConnection> connection = sp<RpcConnection>::make();
     {
         std::lock_guard<std::mutex> _l(mMutex);
@@ -451,11 +455,13 @@
 
         connection->fd = std::move(fd);
         connection->exclusiveTid = gettid();
-        mClientConnections.push_back(connection);
+        mOutgoingConnections.push_back(connection);
     }
 
-    status_t status =
-            mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
+    status_t status = OK;
+    if (init) {
+        mState->sendConnectionInit(connection, sp<RpcSession>::fromExisting(this));
+    }
 
     {
         std::lock_guard<std::mutex> _l(mMutex);
@@ -466,7 +472,7 @@
 }
 
 bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
-                              int32_t sessionId) {
+                              const RpcAddress& sessionId) {
     LOG_ALWAYS_FATAL_IF(mForServer != nullptr);
     LOG_ALWAYS_FATAL_IF(server == nullptr);
     LOG_ALWAYS_FATAL_IF(mEventListener != nullptr);
@@ -482,25 +488,26 @@
     return true;
 }
 
-sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
+sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(unique_fd fd) {
     std::lock_guard<std::mutex> _l(mMutex);
     sp<RpcConnection> session = sp<RpcConnection>::make();
     session->fd = std::move(fd);
     session->exclusiveTid = gettid();
-    mServerConnections.push_back(session);
+    mIncomingConnections.push_back(session);
 
     return session;
 }
 
-bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) {
+bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) {
     std::lock_guard<std::mutex> _l(mMutex);
-    if (auto it = std::find(mServerConnections.begin(), mServerConnections.end(), connection);
-        it != mServerConnections.end()) {
-        mServerConnections.erase(it);
-        if (mServerConnections.size() == 0) {
+    if (auto it = std::find(mIncomingConnections.begin(), mIncomingConnections.end(), connection);
+        it != mIncomingConnections.end()) {
+        mIncomingConnections.erase(it);
+        if (mIncomingConnections.size() == 0) {
             sp<EventListener> listener = mEventListener.promote();
             if (listener) {
-                listener->onSessionLockedAllServerThreadsEnded(sp<RpcSession>::fromExisting(this));
+                listener->onSessionLockedAllIncomingThreadsEnded(
+                        sp<RpcSession>::fromExisting(this));
             }
         }
         return true;
@@ -525,11 +532,11 @@
         // CHECK FOR DEDICATED CLIENT SOCKET
         //
         // A server/looper should always use a dedicated connection if available
-        findConnection(tid, &exclusive, &available, session->mClientConnections,
-                       session->mClientConnectionsOffset);
+        findConnection(tid, &exclusive, &available, session->mOutgoingConnections,
+                       session->mOutgoingConnectionsOffset);
 
         // WARNING: this assumes a server cannot request its client to send
-        // a transaction, as mServerConnections is excluded below.
+        // a transaction, as mIncomingConnections is excluded below.
         //
         // Imagine we have more than one thread in play, and a single thread
         // sends a synchronous, then an asynchronous command. Imagine the
@@ -539,17 +546,31 @@
         // command. So, we move to considering the second available thread
         // for subsequent calls.
         if (use == ConnectionUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) {
-            session->mClientConnectionsOffset =
-                    (session->mClientConnectionsOffset + 1) % session->mClientConnections.size();
+            session->mOutgoingConnectionsOffset = (session->mOutgoingConnectionsOffset + 1) %
+                    session->mOutgoingConnections.size();
         }
 
-        // USE SERVING SOCKET (for nested transaction)
-        //
-        // asynchronous calls cannot be nested
+        // USE SERVING SOCKET (e.g. nested transaction)
         if (use != ConnectionUse::CLIENT_ASYNC) {
+            sp<RpcConnection> exclusiveIncoming;
             // server connections are always assigned to a thread
-            findConnection(tid, &exclusive, nullptr /*available*/, session->mServerConnections,
-                           0 /* index hint */);
+            findConnection(tid, &exclusiveIncoming, nullptr /*available*/,
+                           session->mIncomingConnections, 0 /* index hint */);
+
+            // asynchronous calls cannot be nested, we currently allow ref count
+            // calls to be nested (so that you can use this without having extra
+            // threads). Note 'drainCommands' is used so that these ref counts can't
+            // build up.
+            if (exclusiveIncoming != nullptr) {
+                if (exclusiveIncoming->allowNested) {
+                    // guaranteed to be processed as nested command
+                    exclusive = exclusiveIncoming;
+                } else if (use == ConnectionUse::CLIENT_REFCOUNT && available == nullptr) {
+                    // prefer available socket, but if we don't have one, don't
+                    // wait for one
+                    exclusive = exclusiveIncoming;
+                }
+            }
         }
 
         // if our thread is already using a connection, prioritize using that
@@ -563,16 +584,16 @@
             break;
         }
 
-        if (session->mClientConnections.size() == 0) {
+        if (session->mOutgoingConnections.size() == 0) {
             ALOGE("Session has no client connections. This is required for an RPC server to make "
                   "any non-nested (e.g. oneway or on another thread) calls. Use: %d. Server "
                   "connections: %zu",
-                  static_cast<int>(use), session->mServerConnections.size());
+                  static_cast<int>(use), session->mIncomingConnections.size());
             return WOULD_BLOCK;
         }
 
         LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...",
-                       session->mClientConnections.size(), session->mServerConnections.size());
+                       session->mOutgoingConnections.size(), session->mIncomingConnections.size());
         session->mAvailableConnectionCv.wait(_l);
     }
     session->mWaitingThreads--;
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 53eba5a..fd2eff6 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -83,21 +83,45 @@
     }
     LOG_ALWAYS_FATAL_IF(isRpc, "RPC binder must have known address at this point");
 
-    auto&& [it, inserted] = mNodeForAddress.insert({RpcAddress::unique(),
-                                                    BinderNode{
-                                                            .binder = binder,
-                                                            .timesSent = 1,
-                                                            .sentRef = binder,
-                                                    }});
-    // TODO(b/182939933): better organization could avoid needing this log
-    LOG_ALWAYS_FATAL_IF(!inserted);
+    bool forServer = session->server() != nullptr;
 
-    *outAddress = it->first;
-    return OK;
+    for (size_t tries = 0; tries < 5; tries++) {
+        auto&& [it, inserted] = mNodeForAddress.insert({RpcAddress::random(forServer),
+                                                        BinderNode{
+                                                                .binder = binder,
+                                                                .timesSent = 1,
+                                                                .sentRef = binder,
+                                                        }});
+        if (inserted) {
+            *outAddress = it->first;
+            return OK;
+        }
+
+        // well, we don't have visibility into the header here, but still
+        static_assert(sizeof(RpcWireAddress) == 40, "this log needs updating");
+        ALOGW("2**256 is 1e77. If you see this log, you probably have some entropy issue, or maybe "
+              "you witness something incredible!");
+    }
+
+    ALOGE("Unable to create an address in order to send out %p", binder.get());
+    return WOULD_BLOCK;
 }
 
 status_t RpcState::onBinderEntering(const sp<RpcSession>& session, const RpcAddress& address,
                                     sp<IBinder>* out) {
+    // ensure that: if we want to use addresses for something else in the future (for
+    //   instance, allowing transitive binder sends), that we don't accidentally
+    //   send those addresses to old server. Accidentally ignoring this in that
+    //   case and considering the binder to be recognized could cause this
+    //   process to accidentally proxy transactions for that binder. Of course,
+    //   if we communicate with a binder, it could always be proxying
+    //   information. However, we want to make sure that isn't done on accident
+    //   by a client.
+    if (!address.isRecognizedType()) {
+        ALOGE("Address is of an unknown type, rejecting: %s", address.toString().c_str());
+        return BAD_VALUE;
+    }
+
     std::unique_lock<std::mutex> _l(mNodeMutex);
     if (mTerminated) return DEAD_OBJECT;
 
@@ -117,6 +141,14 @@
         return OK;
     }
 
+    // we don't know about this binder, so the other side of the connection
+    // should have created it.
+    if (address.isForServer() == !!session->server()) {
+        ALOGE("Server received unrecognized address which we should own the creation of %s.",
+              address.toString().c_str());
+        return BAD_VALUE;
+    }
+
     auto&& [it, inserted] = mNodeForAddress.insert({address, BinderNode{}});
     LOG_ALWAYS_FATAL_IF(!inserted, "Failed to insert binder when creating proxy");
 
@@ -222,9 +254,11 @@
     mData.reset(new (std::nothrow) uint8_t[size]);
 }
 
-status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
-                           const char* what, const void* data, size_t size) {
-    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
+status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
+                           const sp<RpcSession>& session, const char* what, const void* data,
+                           size_t size) {
+    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, connection->fd.get(),
+                   hexString(data, size).c_str());
 
     if (size > std::numeric_limits<ssize_t>::max()) {
         ALOGE("Cannot send %s at size %zu (too big)", what, size);
@@ -232,12 +266,12 @@
         return BAD_VALUE;
     }
 
-    ssize_t sent = TEMP_FAILURE_RETRY(send(fd.get(), data, size, MSG_NOSIGNAL));
+    ssize_t sent = TEMP_FAILURE_RETRY(send(connection->fd.get(), data, size, MSG_NOSIGNAL));
 
     if (sent < 0 || sent != static_cast<ssize_t>(size)) {
         int savedErrno = errno;
         LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
-                       size, fd.get(), strerror(savedErrno));
+                       size, connection->fd.get(), strerror(savedErrno));
 
         (void)session->shutdownAndWait(false);
         return -savedErrno;
@@ -246,35 +280,41 @@
     return OK;
 }
 
-status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
-                          const char* what, void* data, size_t size) {
+status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection,
+                          const sp<RpcSession>& session, const char* what, void* data,
+                          size_t size) {
     if (size > std::numeric_limits<ssize_t>::max()) {
         ALOGE("Cannot rec %s at size %zu (too big)", what, size);
         (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
 
-    if (status_t status = session->mShutdownTrigger->interruptableReadFully(fd.get(), data, size);
+    if (status_t status =
+                session->mShutdownTrigger->interruptableReadFully(connection->fd.get(), data, size);
         status != OK) {
-        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(),
-                       statusToString(status).c_str());
+        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size,
+                       connection->fd.get(), statusToString(status).c_str());
         return status;
     }
 
-    LOG_RPC_DETAIL("Received %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
+    LOG_RPC_DETAIL("Received %s on fd %d: %s", what, connection->fd.get(),
+                   hexString(data, size).c_str());
     return OK;
 }
 
-status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
-    RpcClientConnectionInit init{
+status_t RpcState::sendConnectionInit(const sp<RpcSession::RpcConnection>& connection,
+                                      const sp<RpcSession>& session) {
+    RpcOutgoingConnectionInit init{
             .msg = RPC_CONNECTION_INIT_OKAY,
     };
-    return rpcSend(fd, session, "connection init", &init, sizeof(init));
+    return rpcSend(connection, session, "connection init", &init, sizeof(init));
 }
 
-status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
-    RpcClientConnectionInit init;
-    if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK)
+status_t RpcState::readConnectionInit(const sp<RpcSession::RpcConnection>& connection,
+                                      const sp<RpcSession>& session) {
+    RpcOutgoingConnectionInit init;
+    if (status_t status = rpcRec(connection, session, "connection init", &init, sizeof(init));
+        status != OK)
         return status;
 
     static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY));
@@ -286,13 +326,14 @@
     return OK;
 }
 
-sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) {
+sp<IBinder> RpcState::getRootObject(const sp<RpcSession::RpcConnection>& connection,
+                                    const sp<RpcSession>& session) {
     Parcel data;
     data.markForRpc(session);
     Parcel reply;
 
-    status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT, data,
-                                      session, &reply, 0);
+    status_t status = transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT,
+                                      data, session, &reply, 0);
     if (status != OK) {
         ALOGE("Error getting root object: %s", statusToString(status).c_str());
         return nullptr;
@@ -301,14 +342,15 @@
     return reply.readStrongBinder();
 }
 
-status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                 size_t* maxThreadsOut) {
+status_t RpcState::getMaxThreads(const sp<RpcSession::RpcConnection>& connection,
+                                 const sp<RpcSession>& session, size_t* maxThreadsOut) {
     Parcel data;
     data.markForRpc(session);
     Parcel reply;
 
-    status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS,
-                                      data, session, &reply, 0);
+    status_t status =
+            transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS,
+                            data, session, &reply, 0);
     if (status != OK) {
         ALOGE("Error getting max threads: %s", statusToString(status).c_str());
         return status;
@@ -326,30 +368,26 @@
     return OK;
 }
 
-status_t RpcState::getSessionId(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                int32_t* sessionIdOut) {
+status_t RpcState::getSessionId(const sp<RpcSession::RpcConnection>& connection,
+                                const sp<RpcSession>& session, RpcAddress* sessionIdOut) {
     Parcel data;
     data.markForRpc(session);
     Parcel reply;
 
-    status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID,
-                                      data, session, &reply, 0);
+    status_t status =
+            transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID,
+                            data, session, &reply, 0);
     if (status != OK) {
         ALOGE("Error getting session ID: %s", statusToString(status).c_str());
         return status;
     }
 
-    int32_t sessionId;
-    status = reply.readInt32(&sessionId);
-    if (status != OK) return status;
-
-    *sessionIdOut = sessionId;
-    return OK;
+    return sessionIdOut->readFromParcel(reply);
 }
 
-status_t RpcState::transact(const base::unique_fd& fd, const sp<IBinder>& binder, uint32_t code,
-                            const Parcel& data, const sp<RpcSession>& session, Parcel* reply,
-                            uint32_t flags) {
+status_t RpcState::transact(const sp<RpcSession::RpcConnection>& connection,
+                            const sp<IBinder>& binder, uint32_t code, const Parcel& data,
+                            const sp<RpcSession>& session, Parcel* reply, uint32_t flags) {
     if (!data.isForRpc()) {
         ALOGE("Refusing to send RPC with parcel not crafted for RPC");
         return BAD_TYPE;
@@ -363,12 +401,12 @@
     RpcAddress address = RpcAddress::zero();
     if (status_t status = onBinderLeaving(session, binder, &address); status != OK) return status;
 
-    return transactAddress(fd, address, code, data, session, reply, flags);
+    return transactAddress(connection, address, code, data, session, reply, flags);
 }
 
-status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& address,
-                                   uint32_t code, const Parcel& data, const sp<RpcSession>& session,
-                                   Parcel* reply, uint32_t flags) {
+status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connection,
+                                   const RpcAddress& address, uint32_t code, const Parcel& data,
+                                   const sp<RpcSession>& session, Parcel* reply, uint32_t flags) {
     LOG_ALWAYS_FATAL_IF(!data.isForRpc());
     LOG_ALWAYS_FATAL_IF(data.objectsCount() != 0);
 
@@ -418,25 +456,25 @@
     memcpy(transactionData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireTransaction), data.data(),
            data.dataSize());
 
-    if (status_t status =
-                rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size());
+    if (status_t status = rpcSend(connection, session, "transaction", transactionData.data(),
+                                  transactionData.size());
         status != OK)
         // TODO(b/167966510): need to undo onBinderLeaving - we know the
         // refcount isn't successfully transferred.
         return status;
 
     if (flags & IBinder::FLAG_ONEWAY) {
-        LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", fd.get());
+        LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", connection->fd.get());
 
         // Do not wait on result.
         // However, too many oneway calls may cause refcounts to build up and fill up the socket,
         // so process those.
-        return drainCommands(fd, session, CommandType::CONTROL_ONLY);
+        return drainCommands(connection, session, CommandType::CONTROL_ONLY);
     }
 
     LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction.");
 
-    return waitForReply(fd, session, reply);
+    return waitForReply(connection, session, reply);
 }
 
 static void cleanup_reply_data(Parcel* p, const uint8_t* data, size_t dataSize,
@@ -448,17 +486,18 @@
     LOG_ALWAYS_FATAL_IF(objectsCount, 0);
 }
 
-status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                Parcel* reply) {
+status_t RpcState::waitForReply(const sp<RpcSession::RpcConnection>& connection,
+                                const sp<RpcSession>& session, Parcel* reply) {
     RpcWireHeader command;
     while (true) {
-        if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command));
+        if (status_t status =
+                    rpcRec(connection, session, "command header", &command, sizeof(command));
             status != OK)
             return status;
 
         if (command.command == RPC_COMMAND_REPLY) break;
 
-        if (status_t status = processServerCommand(fd, session, command, CommandType::ANY);
+        if (status_t status = processCommand(connection, session, command, CommandType::ANY);
             status != OK)
             return status;
     }
@@ -466,7 +505,7 @@
     CommandData data(command.bodySize);
     if (!data.valid()) return NO_MEMORY;
 
-    if (status_t status = rpcRec(fd, session, "reply body", data.data(), command.bodySize);
+    if (status_t status = rpcRec(connection, session, "reply body", data.data(), command.bodySize);
         status != OK)
         return status;
 
@@ -488,8 +527,8 @@
     return OK;
 }
 
-status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                 const RpcAddress& addr) {
+status_t RpcState::sendDecStrong(const sp<RpcSession::RpcConnection>& connection,
+                                 const sp<RpcSession>& session, const RpcAddress& addr) {
     {
         std::lock_guard<std::mutex> _l(mNodeMutex);
         if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
@@ -508,39 +547,42 @@
             .command = RPC_COMMAND_DEC_STRONG,
             .bodySize = sizeof(RpcWireAddress),
     };
-    if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK)
+    if (status_t status = rpcSend(connection, session, "dec ref header", &cmd, sizeof(cmd));
+        status != OK)
         return status;
-    if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(),
+    if (status_t status = rpcSend(connection, session, "dec ref body", &addr.viewRawEmbedded(),
                                   sizeof(RpcWireAddress));
         status != OK)
         return status;
     return OK;
 }
 
-status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                        CommandType type) {
-    LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get());
+status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection,
+                                        const sp<RpcSession>& session, CommandType type) {
+    LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", connection->fd.get());
 
     RpcWireHeader command;
-    if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command));
+    if (status_t status = rpcRec(connection, session, "command header", &command, sizeof(command));
         status != OK)
         return status;
 
-    return processServerCommand(fd, session, command, type);
+    return processCommand(connection, session, command, type);
 }
 
-status_t RpcState::drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                 CommandType type) {
+status_t RpcState::drainCommands(const sp<RpcSession::RpcConnection>& connection,
+                                 const sp<RpcSession>& session, CommandType type) {
     uint8_t buf;
-    while (0 < TEMP_FAILURE_RETRY(recv(fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
-        status_t status = getAndExecuteCommand(fd, session, type);
+    while (0 < TEMP_FAILURE_RETRY(
+                       recv(connection->fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
+        status_t status = getAndExecuteCommand(connection, session, type);
         if (status != OK) return status;
     }
     return OK;
 }
 
-status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                        const RpcWireHeader& command, CommandType type) {
+status_t RpcState::processCommand(const sp<RpcSession::RpcConnection>& connection,
+                                  const sp<RpcSession>& session, const RpcWireHeader& command,
+                                  CommandType type) {
     IPCThreadState* kernelBinderState = IPCThreadState::selfOrNull();
     IPCThreadState::SpGuard spGuard{
             .address = __builtin_frame_address(0),
@@ -559,9 +601,9 @@
     switch (command.command) {
         case RPC_COMMAND_TRANSACT:
             if (type != CommandType::ANY) return BAD_TYPE;
-            return processTransact(fd, session, command);
+            return processTransact(connection, session, command);
         case RPC_COMMAND_DEC_STRONG:
-            return processDecStrong(fd, session, command);
+            return processDecStrong(connection, session, command);
     }
 
     // We should always know the version of the opposing side, and since the
@@ -573,20 +615,20 @@
     (void)session->shutdownAndWait(false);
     return DEAD_OBJECT;
 }
-status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                   const RpcWireHeader& command) {
+status_t RpcState::processTransact(const sp<RpcSession::RpcConnection>& connection,
+                                   const sp<RpcSession>& session, const RpcWireHeader& command) {
     LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_TRANSACT, "command: %d", command.command);
 
     CommandData transactionData(command.bodySize);
     if (!transactionData.valid()) {
         return NO_MEMORY;
     }
-    if (status_t status = rpcRec(fd, session, "transaction body", transactionData.data(),
+    if (status_t status = rpcRec(connection, session, "transaction body", transactionData.data(),
                                  transactionData.size());
         status != OK)
         return status;
 
-    return processTransactInternal(fd, session, std::move(transactionData));
+    return processTransactInternal(connection, session, std::move(transactionData));
 }
 
 static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t dataSize,
@@ -598,7 +640,8 @@
     (void)objectsCount;
 }
 
-status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session,
+status_t RpcState::processTransactInternal(const sp<RpcSession::RpcConnection>& connection,
+                                           const sp<RpcSession>& session,
                                            CommandData transactionData) {
     // for 'recursive' calls to this, we have already read and processed the
     // binder from the transaction data and taken reference counts into account,
@@ -617,6 +660,7 @@
     // TODO(b/182939933): heap allocation just for lookup in mNodeForAddress,
     // maybe add an RpcAddress 'view' if the type remains 'heavy'
     auto addr = RpcAddress::fromRawEmbedded(&transaction->address);
+    bool oneway = transaction->flags & IBinder::FLAG_ONEWAY;
 
     status_t replyStatus = OK;
     sp<IBinder> target;
@@ -645,7 +689,7 @@
                   addr.toString().c_str());
             (void)session->shutdownAndWait(false);
             replyStatus = BAD_VALUE;
-        } else if (transaction->flags & IBinder::FLAG_ONEWAY) {
+        } else if (oneway) {
             std::unique_lock<std::mutex> _l(mNodeMutex);
             auto it = mNodeForAddress.find(addr);
             if (it->second.binder.promote() != target) {
@@ -702,7 +746,12 @@
         data.markForRpc(session);
 
         if (target) {
+            bool origAllowNested = connection->allowNested;
+            connection->allowNested = !oneway;
+
             replyStatus = target->transact(transaction->code, data, &reply, transaction->flags);
+
+            connection->allowNested = origAllowNested;
         } else {
             LOG_RPC_DETAIL("Got special transaction %u", transaction->code);
 
@@ -713,13 +762,13 @@
                 }
                 case RPC_SPECIAL_TRANSACT_GET_SESSION_ID: {
                     // for client connections, this should always report the value
-                    // originally returned from the server
-                    int32_t id = session->mId.value();
-                    replyStatus = reply.writeInt32(id);
+                    // originally returned from the server, so this is asserting
+                    // that it exists
+                    replyStatus = session->mId.value().writeToParcel(&reply);
                     break;
                 }
                 default: {
-                    sp<RpcServer> server = session->server().promote();
+                    sp<RpcServer> server = session->server();
                     if (server) {
                         switch (transaction->code) {
                             case RPC_SPECIAL_TRANSACT_GET_ROOT: {
@@ -738,7 +787,7 @@
         }
     }
 
-    if (transaction->flags & IBinder::FLAG_ONEWAY) {
+    if (oneway) {
         if (replyStatus != OK) {
             ALOGW("Oneway call failed with error: %d", replyStatus);
         }
@@ -811,11 +860,11 @@
     memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(),
            reply.dataSize());
 
-    return rpcSend(fd, session, "reply", replyData.data(), replyData.size());
+    return rpcSend(connection, session, "reply", replyData.data(), replyData.size());
 }
 
-status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                    const RpcWireHeader& command) {
+status_t RpcState::processDecStrong(const sp<RpcSession::RpcConnection>& connection,
+                                    const sp<RpcSession>& session, const RpcWireHeader& command) {
     LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command);
 
     CommandData commandData(command.bodySize);
@@ -823,7 +872,7 @@
         return NO_MEMORY;
     }
     if (status_t status =
-                rpcRec(fd, session, "dec ref body", commandData.data(), commandData.size());
+                rpcRec(connection, session, "dec ref body", commandData.data(), commandData.size());
         status != OK)
         return status;
 
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index 5bfef69..529dee5 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -51,34 +51,37 @@
     RpcState();
     ~RpcState();
 
-    status_t sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
-    status_t readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
+    status_t sendConnectionInit(const sp<RpcSession::RpcConnection>& connection,
+                                const sp<RpcSession>& session);
+    status_t readConnectionInit(const sp<RpcSession::RpcConnection>& connection,
+                                const sp<RpcSession>& session);
 
     // TODO(b/182940634): combine some special transactions into one "getServerInfo" call?
-    sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session);
-    status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session,
-                           size_t* maxThreadsOut);
-    status_t getSessionId(const base::unique_fd& fd, const sp<RpcSession>& session,
-                          int32_t* sessionIdOut);
+    sp<IBinder> getRootObject(const sp<RpcSession::RpcConnection>& connection,
+                              const sp<RpcSession>& session);
+    status_t getMaxThreads(const sp<RpcSession::RpcConnection>& connection,
+                           const sp<RpcSession>& session, size_t* maxThreadsOut);
+    status_t getSessionId(const sp<RpcSession::RpcConnection>& connection,
+                          const sp<RpcSession>& session, RpcAddress* sessionIdOut);
 
-    [[nodiscard]] status_t transact(const base::unique_fd& fd, const sp<IBinder>& address,
-                                    uint32_t code, const Parcel& data,
+    [[nodiscard]] status_t transact(const sp<RpcSession::RpcConnection>& connection,
+                                    const sp<IBinder>& address, uint32_t code, const Parcel& data,
                                     const sp<RpcSession>& session, Parcel* reply, uint32_t flags);
-    [[nodiscard]] status_t transactAddress(const base::unique_fd& fd, const RpcAddress& address,
-                                           uint32_t code, const Parcel& data,
-                                           const sp<RpcSession>& session, Parcel* reply,
-                                           uint32_t flags);
-    [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                         const RpcAddress& address);
+    [[nodiscard]] status_t transactAddress(const sp<RpcSession::RpcConnection>& connection,
+                                           const RpcAddress& address, uint32_t code,
+                                           const Parcel& data, const sp<RpcSession>& session,
+                                           Parcel* reply, uint32_t flags);
+    [[nodiscard]] status_t sendDecStrong(const sp<RpcSession::RpcConnection>& connection,
+                                         const sp<RpcSession>& session, const RpcAddress& address);
 
     enum class CommandType {
         ANY,
         CONTROL_ONLY,
     };
-    [[nodiscard]] status_t getAndExecuteCommand(const base::unique_fd& fd,
+    [[nodiscard]] status_t getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection,
                                                 const sp<RpcSession>& session, CommandType type);
-    [[nodiscard]] status_t drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                         CommandType type);
+    [[nodiscard]] status_t drainCommands(const sp<RpcSession::RpcConnection>& connection,
+                                         const sp<RpcSession>& session, CommandType type);
 
     /**
      * Called by Parcel for outgoing binders. This implies one refcount of
@@ -133,22 +136,25 @@
         size_t mSize;
     };
 
-    [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                   const char* what, const void* data, size_t size);
-    [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                  const char* what, void* data, size_t size);
+    [[nodiscard]] status_t rpcSend(const sp<RpcSession::RpcConnection>& connection,
+                                   const sp<RpcSession>& session, const char* what,
+                                   const void* data, size_t size);
+    [[nodiscard]] status_t rpcRec(const sp<RpcSession::RpcConnection>& connection,
+                                  const sp<RpcSession>& session, const char* what, void* data,
+                                  size_t size);
 
-    [[nodiscard]] status_t waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                        Parcel* reply);
-    [[nodiscard]] status_t processServerCommand(const base::unique_fd& fd,
-                                                const sp<RpcSession>& session,
-                                                const RpcWireHeader& command, CommandType type);
-    [[nodiscard]] status_t processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
+    [[nodiscard]] status_t waitForReply(const sp<RpcSession::RpcConnection>& connection,
+                                        const sp<RpcSession>& session, Parcel* reply);
+    [[nodiscard]] status_t processCommand(const sp<RpcSession::RpcConnection>& connection,
+                                          const sp<RpcSession>& session,
+                                          const RpcWireHeader& command, CommandType type);
+    [[nodiscard]] status_t processTransact(const sp<RpcSession::RpcConnection>& connection,
+                                           const sp<RpcSession>& session,
                                            const RpcWireHeader& command);
-    [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd,
+    [[nodiscard]] status_t processTransactInternal(const sp<RpcSession::RpcConnection>& connection,
                                                    const sp<RpcSession>& session,
                                                    CommandData transactionData);
-    [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd,
+    [[nodiscard]] status_t processDecStrong(const sp<RpcSession::RpcConnection>& connection,
                                             const sp<RpcSession>& session,
                                             const RpcWireHeader& command);
 
diff --git a/libs/binder/RpcWireFormat.h b/libs/binder/RpcWireFormat.h
index b5e5bc1..2016483 100644
--- a/libs/binder/RpcWireFormat.h
+++ b/libs/binder/RpcWireFormat.h
@@ -20,20 +20,26 @@
 #pragma clang diagnostic push
 #pragma clang diagnostic error "-Wpadded"
 
-constexpr int32_t RPC_SESSION_ID_NEW = -1;
-
 enum : uint8_t {
     RPC_CONNECTION_OPTION_REVERSE = 0x1,
 };
 
+constexpr uint64_t RPC_WIRE_ADDRESS_OPTION_CREATED = 1 << 0; // distinguish from '0' address
+constexpr uint64_t RPC_WIRE_ADDRESS_OPTION_FOR_SERVER = 1 << 1;
+
+struct RpcWireAddress {
+    uint64_t options;
+    uint8_t address[32];
+};
+
 /**
  * This is sent to an RpcServer in order to request a new connection is created,
  * either as part of a new session or an existing session
  */
 struct RpcConnectionHeader {
-    int32_t sessionId;
+    RpcWireAddress sessionId;
     uint8_t options;
-    uint8_t reserved[3];
+    uint8_t reserved[7];
 };
 
 #define RPC_CONNECTION_INIT_OKAY "cci"
@@ -43,7 +49,7 @@
  * transaction. The main use of this is in order to control the timing for when
  * a reverse connection is setup.
  */
-struct RpcClientConnectionInit {
+struct RpcOutgoingConnectionInit {
     char msg[4];
     uint8_t reserved[4];
 };
@@ -89,10 +95,6 @@
     uint32_t reserved[2];
 };
 
-struct RpcWireAddress {
-    uint8_t address[32];
-};
-
 struct RpcWireTransaction {
     RpcWireAddress address;
     uint32_t code;
diff --git a/libs/binder/include/binder/RpcAddress.h b/libs/binder/include/binder/RpcAddress.h
index 5a3f3a6..e428908 100644
--- a/libs/binder/include/binder/RpcAddress.h
+++ b/libs/binder/include/binder/RpcAddress.h
@@ -29,11 +29,7 @@
 struct RpcWireAddress;
 
 /**
- * This class represents an identifier of a binder object.
- *
- * The purpose of this class it to hide the ABI of an RpcWireAddress, and
- * potentially allow us to change the size of it in the future (RpcWireAddress
- * is PIMPL, essentially - although the type that is used here is not exposed).
+ * This class represents an identifier across an RPC boundary.
  */
 class RpcAddress {
 public:
@@ -46,9 +42,20 @@
     bool isZero() const;
 
     /**
-     * Create a new address which is unique
+     * Create a new random address.
      */
-    static RpcAddress unique();
+    static RpcAddress random(bool forServer);
+
+    /**
+     * Whether this address was created with 'bool forServer' true
+     */
+    bool isForServer() const;
+
+    /**
+     * Whether this address is one that could be created with this version of
+     * libbinder.
+     */
+    bool isRecognizedType() const;
 
     /**
      * Creates a new address as a copy of an embedded object.
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 4e6934b..c8d2857 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -17,6 +17,7 @@
 
 #include <android-base/unique_fd.h>
 #include <binder/IBinder.h>
+#include <binder/RpcAddress.h>
 #include <binder/RpcSession.h>
 #include <utils/Errors.h>
 #include <utils/RefBase.h>
@@ -155,8 +156,8 @@
     friend sp<RpcServer>;
     RpcServer();
 
-    void onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) override;
-    void onSessionServerThreadEnded() override;
+    void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) override;
+    void onSessionIncomingThreadEnded() override;
 
     static void establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd);
     bool setupSocketServer(const RpcSocketAddress& address);
@@ -171,8 +172,7 @@
     std::map<std::thread::id, std::thread> mConnectingThreads;
     sp<IBinder> mRootObject;
     wp<IBinder> mRootObjectWeak;
-    std::map<int32_t, sp<RpcSession>> mSessions;
-    int32_t mSessionIdCounter = 0;
+    std::map<RpcAddress, sp<RpcSession>> mSessions;
     std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
     std::condition_variable mShutdownCv;
 };
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 4ddf422..69c2a1a 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -118,7 +118,11 @@
 
     ~RpcSession();
 
-    wp<RpcServer> server();
+    /**
+     * Server if this session is created as part of a server (symmetrical to
+     * client servers). Otherwise, nullptr.
+     */
+    sp<RpcServer> server();
 
     // internal only
     const std::unique_ptr<RpcState>& state() { return mState; }
@@ -170,14 +174,14 @@
 
     class EventListener : public virtual RefBase {
     public:
-        virtual void onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) = 0;
-        virtual void onSessionServerThreadEnded() = 0;
+        virtual void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0;
+        virtual void onSessionIncomingThreadEnded() = 0;
     };
 
     class WaitForShutdownListener : public EventListener {
     public:
-        void onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& session) override;
-        void onSessionServerThreadEnded() override;
+        void onSessionLockedAllIncomingThreadsEnded(const sp<RpcSession>& session) override;
+        void onSessionIncomingThreadEnded() override;
         void waitForShutdown(std::unique_lock<std::mutex>& lock);
 
     private:
@@ -191,6 +195,8 @@
         // whether this or another thread is currently using this fd to make
         // or receive transactions.
         std::optional<pid_t> exclusiveTid;
+
+        bool allowNested = false;
     };
 
     status_t readId();
@@ -215,14 +221,14 @@
     static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result);
 
     [[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address);
-    [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId,
-                                                bool server);
-    [[nodiscard]] bool addClientConnection(base::unique_fd fd);
+    [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address,
+                                                const RpcAddress& sessionId, bool server);
+    [[nodiscard]] bool addOutgoingConnection(base::unique_fd fd, bool init);
     [[nodiscard]] bool setForServer(const wp<RpcServer>& server,
                                     const wp<RpcSession::EventListener>& eventListener,
-                                    int32_t sessionId);
-    sp<RpcConnection> assignServerToThisThread(base::unique_fd fd);
-    [[nodiscard]] bool removeServerConnection(const sp<RpcConnection>& connection);
+                                    const RpcAddress& sessionId);
+    sp<RpcConnection> assignIncomingConnectionToThisThread(base::unique_fd fd);
+    [[nodiscard]] bool removeIncomingConnection(const sp<RpcConnection>& connection);
 
     enum class ConnectionUse {
         CLIENT,
@@ -237,7 +243,7 @@
                              ExclusiveConnection* connection);
 
         ~ExclusiveConnection();
-        const base::unique_fd& fd() { return mConnection->fd; }
+        const sp<RpcConnection>& get() { return mConnection; }
 
     private:
         static void findConnection(pid_t tid, sp<RpcConnection>* exclusive,
@@ -254,13 +260,13 @@
         bool mReentrant = false;
     };
 
-    // On the other side of a session, for each of mClientConnections here, there should
-    // be one of mServerConnections on the other side (and vice versa).
+    // On the other side of a session, for each of mOutgoingConnections here, there should
+    // be one of mIncomingConnections on the other side (and vice versa).
     //
     // For the simplest session, a single server with one client, you would
     // have:
-    //  - the server has a single 'mServerConnections' and a thread listening on this
-    //  - the client has a single 'mClientConnections' and makes calls to this
+    //  - the server has a single 'mIncomingConnections' and a thread listening on this
+    //  - the client has a single 'mOutgoingConnections' and makes calls to this
     //  - here, when the client makes a call, the server can call back into it
     //    (nested calls), but outside of this, the client will only ever read
     //    calls from the server when it makes a call itself.
@@ -272,8 +278,7 @@
     sp<WaitForShutdownListener> mShutdownListener; // used for client sessions
     wp<EventListener> mEventListener; // mForServer if server, mShutdownListener if client
 
-    // TODO(b/183988761): this shouldn't be guessable
-    std::optional<int32_t> mId;
+    std::optional<RpcAddress> mId;
 
     std::unique_ptr<FdTrigger> mShutdownTrigger;
 
@@ -286,9 +291,9 @@
     std::condition_variable mAvailableConnectionCv; // for mWaitingThreads
     size_t mWaitingThreads = 0;
     // hint index into clients, ++ when sending an async transaction
-    size_t mClientConnectionsOffset = 0;
-    std::vector<sp<RpcConnection>> mClientConnections;
-    std::vector<sp<RpcConnection>> mServerConnections;
+    size_t mOutgoingConnectionsOffset = 0;
+    std::vector<sp<RpcConnection>> mOutgoingConnections;
+    std::vector<sp<RpcConnection>> mIncomingConnections;
     std::map<std::thread::id, std::thread> mThreads;
 };
 
diff --git a/libs/binder/rust/src/binder.rs b/libs/binder/rust/src/binder.rs
index 695a83e..2a09afc 100644
--- a/libs/binder/rust/src/binder.rs
+++ b/libs/binder/rust/src/binder.rs
@@ -25,6 +25,7 @@
 use std::cmp::Ordering;
 use std::ffi::{c_void, CStr, CString};
 use std::fmt;
+use std::fs::File;
 use std::marker::PhantomData;
 use std::ops::Deref;
 use std::os::raw::c_char;
@@ -54,6 +55,14 @@
     fn as_binder(&self) -> SpIBinder {
         panic!("This object was not a Binder object and cannot be converted into an SpIBinder.")
     }
+
+    /// Dump transaction handler for this Binder object.
+    ///
+    /// This handler is a no-op by default and should be implemented for each
+    /// Binder service struct that wishes to respond to dump transactions.
+    fn dump(&self, _file: &File, _args: &[&CStr]) -> Result<()> {
+        Ok(())
+    }
 }
 
 /// Interface stability promise
@@ -98,6 +107,10 @@
     /// `reply` may be [`None`] if the sender does not expect a reply.
     fn on_transact(&self, code: TransactionCode, data: &Parcel, reply: &mut Parcel) -> Result<()>;
 
+    /// Handle a request to invoke the dump transaction on this
+    /// object.
+    fn on_dump(&self, file: &File, args: &[&CStr]) -> Result<()>;
+
     /// Retrieve the class of this remote object.
     ///
     /// This method should always return the same InterfaceClass for the same
@@ -218,7 +231,7 @@
             if class.is_null() {
                 panic!("Expected non-null class pointer from AIBinder_Class_define!");
             }
-            sys::AIBinder_Class_setOnDump(class, None);
+            sys::AIBinder_Class_setOnDump(class, Some(I::on_dump));
             sys::AIBinder_Class_setHandleShellCommand(class, None);
             class
         };
@@ -492,6 +505,16 @@
     /// returned by `on_create` for this class. This function takes ownership of
     /// the provided pointer and destroys it.
     unsafe extern "C" fn on_destroy(object: *mut c_void);
+
+    /// Called to handle the `dump` transaction.
+    ///
+    /// # Safety
+    ///
+    /// Must be called with a non-null, valid pointer to a local `AIBinder` that
+    /// contains a `T` pointer in its user data. fd should be a non-owned file
+    /// descriptor, and args must be an array of null-terminated string
+    /// poiinters with length num_args.
+    unsafe extern "C" fn on_dump(binder: *mut sys::AIBinder, fd: i32, args: *mut *const c_char, num_args: u32) -> status_t;
 }
 
 /// Interface for transforming a generic SpIBinder into a specific remote
@@ -778,6 +801,10 @@
                 }
             }
 
+            fn on_dump(&self, file: &std::fs::File, args: &[&std::ffi::CStr]) -> $crate::Result<()> {
+                self.0.dump(file, args)
+            }
+
             fn get_class() -> $crate::InterfaceClass {
                 static CLASS_INIT: std::sync::Once = std::sync::Once::new();
                 static mut CLASS: Option<$crate::InterfaceClass> = None;
diff --git a/libs/binder/rust/src/native.rs b/libs/binder/rust/src/native.rs
index 3b3fd08..5e324b3 100644
--- a/libs/binder/rust/src/native.rs
+++ b/libs/binder/rust/src/native.rs
@@ -21,10 +21,13 @@
 use crate::sys;
 
 use std::convert::TryFrom;
-use std::ffi::{c_void, CString};
+use std::ffi::{c_void, CStr, CString};
+use std::fs::File;
 use std::mem::ManuallyDrop;
 use std::ops::Deref;
-use std::ptr;
+use std::os::raw::c_char;
+use std::os::unix::io::FromRawFd;
+use std::slice;
 
 /// Rust wrapper around Binder remotable objects.
 ///
@@ -273,7 +276,7 @@
     /// Must be called with a valid pointer to a `T` object. After this call,
     /// the pointer will be invalid and should not be dereferenced.
     unsafe extern "C" fn on_destroy(object: *mut c_void) {
-        ptr::drop_in_place(object as *mut T)
+        Box::from_raw(object as *mut T);
     }
 
     /// Called whenever a new, local `AIBinder` object is needed of a specific
@@ -290,6 +293,37 @@
         // object created by Box.
         args
     }
+
+    /// Called to handle the `dump` transaction.
+    ///
+    /// # Safety
+    ///
+    /// Must be called with a non-null, valid pointer to a local `AIBinder` that
+    /// contains a `T` pointer in its user data. fd should be a non-owned file
+    /// descriptor, and args must be an array of null-terminated string
+    /// poiinters with length num_args.
+    unsafe extern "C" fn on_dump(binder: *mut sys::AIBinder, fd: i32, args: *mut *const c_char, num_args: u32) -> status_t {
+        if fd < 0 {
+            return StatusCode::UNEXPECTED_NULL as status_t;
+        }
+        // We don't own this file, so we need to be careful not to drop it.
+        let file = ManuallyDrop::new(File::from_raw_fd(fd));
+
+        if args.is_null() {
+            return StatusCode::UNEXPECTED_NULL as status_t;
+        }
+        let args = slice::from_raw_parts(args, num_args as usize);
+        let args: Vec<_> = args.iter().map(|s| CStr::from_ptr(*s)).collect();
+
+        let object = sys::AIBinder_getUserData(binder);
+        let binder: &T = &*(object as *const T);
+        let res = binder.on_dump(&file, &args);
+
+        match res {
+            Ok(()) => 0,
+            Err(e) => e as status_t,
+        }
+    }
 }
 
 impl<T: Remotable> Drop for Binder<T> {
@@ -410,6 +444,10 @@
         Ok(())
     }
 
+    fn on_dump(&self, _file: &File, _args: &[&CStr]) -> Result<()> {
+        Ok(())
+    }
+
     binder_fn_get_class!(Binder::<Self>);
 }
 
diff --git a/libs/binder/rust/src/parcel.rs b/libs/binder/rust/src/parcel.rs
index 6c34824..a3f7620 100644
--- a/libs/binder/rust/src/parcel.rs
+++ b/libs/binder/rust/src/parcel.rs
@@ -184,11 +184,17 @@
         }
     }
 
+    /// Returns the total size of the parcel.
+    pub fn get_data_size(&self) -> i32 {
+        unsafe {
+            // Safety: `Parcel` always contains a valid pointer to an `AParcel`,
+            // and this call is otherwise safe.
+            sys::AParcel_getDataSize(self.as_native())
+        }
+    }
+
     /// Move the current read/write position in the parcel.
     ///
-    /// The new position must be a position previously returned by
-    /// `self.get_data_position()`.
-    ///
     /// # Safety
     ///
     /// This method is safe if `pos` is less than the current size of the parcel
@@ -219,6 +225,72 @@
         D::deserialize(self)
     }
 
+    /// Attempt to read a type that implements [`Deserialize`] from this
+    /// `Parcel` onto an existing value. This operation will overwrite the old
+    /// value partially or completely, depending on how much data is available.
+    pub fn read_onto<D: Deserialize>(&self, x: &mut D) -> Result<()> {
+        x.deserialize_from(self)
+    }
+
+    /// Safely read a sized parcelable.
+    ///
+    /// Read the size of a parcelable, compute the end position
+    /// of that parcelable, then build a sized readable sub-parcel
+    /// and call a closure with the sub-parcel as its parameter.
+    /// The closure can keep reading data from the sub-parcel
+    /// until it runs out of input data. The closure is responsible
+    /// for calling [`ReadableSubParcel::has_more_data`] to check for
+    /// more data before every read, at least until Rust generators
+    /// are stabilized.
+    /// After the closure returns, skip to the end of the current
+    /// parcelable regardless of how much the closure has read.
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// let mut parcelable = Default::default();
+    /// parcel.sized_read(|subparcel| {
+    ///     if subparcel.has_more_data() {
+    ///         parcelable.a = subparcel.read()?;
+    ///     }
+    ///     if subparcel.has_more_data() {
+    ///         parcelable.b = subparcel.read()?;
+    ///     }
+    ///     Ok(())
+    /// });
+    /// ```
+    ///
+    pub fn sized_read<F>(&self, mut f: F) -> Result<()>
+    where
+        for<'a> F: FnMut(ReadableSubParcel<'a>) -> Result<()>
+    {
+        let start = self.get_data_position();
+        let parcelable_size: i32 = self.read()?;
+        if parcelable_size < 0 {
+            return Err(StatusCode::BAD_VALUE);
+        }
+
+        let end = start.checked_add(parcelable_size)
+            .ok_or(StatusCode::BAD_VALUE)?;
+        if end > self.get_data_size() {
+            return Err(StatusCode::NOT_ENOUGH_DATA);
+        }
+
+        let subparcel = ReadableSubParcel {
+            parcel: self,
+            end_position: end,
+        };
+        f(subparcel)?;
+
+        // Advance the data position to the actual end,
+        // in case the closure read less data than was available
+        unsafe {
+            self.set_data_position(end)?;
+        }
+
+        Ok(())
+    }
+
     /// Read a vector size from the `Parcel` and resize the given output vector
     /// to be correctly sized for that amount of data.
     ///
@@ -264,6 +336,27 @@
     }
 }
 
+/// A segment of a readable parcel, used for [`Parcel::sized_read`].
+pub struct ReadableSubParcel<'a> {
+    parcel: &'a Parcel,
+    end_position: i32,
+}
+
+impl<'a> ReadableSubParcel<'a> {
+    /// Read a type that implements [`Deserialize`] from the sub-parcel.
+    pub fn read<D: Deserialize>(&self) -> Result<D> {
+        // The caller should have checked this,
+        // but it can't hurt to double-check
+        assert!(self.has_more_data());
+        D::deserialize(self.parcel)
+    }
+
+    /// Check if the sub-parcel has more data to read
+    pub fn has_more_data(&self) -> bool {
+        self.parcel.get_data_position() < self.end_position
+    }
+}
+
 // Internal APIs
 impl Parcel {
     pub(crate) fn write_binder(&mut self, binder: Option<&SpIBinder>) -> Result<()> {
diff --git a/libs/binder/rust/src/parcel/parcelable.rs b/libs/binder/rust/src/parcel/parcelable.rs
index f57788b..956ecfe 100644
--- a/libs/binder/rust/src/parcel/parcelable.rs
+++ b/libs/binder/rust/src/parcel/parcelable.rs
@@ -39,6 +39,14 @@
 pub trait Deserialize: Sized {
     /// Deserialize an instance from the given [`Parcel`].
     fn deserialize(parcel: &Parcel) -> Result<Self>;
+
+    /// Deserialize an instance from the given [`Parcel`] onto the
+    /// current object. This operation will overwrite the old value
+    /// partially or completely, depending on how much data is available.
+    fn deserialize_from(&mut self, parcel: &Parcel) -> Result<()> {
+        *self = Self::deserialize(parcel)?;
+        Ok(())
+    }
 }
 
 /// Helper trait for types that can be serialized as arrays.
@@ -184,6 +192,14 @@
             parcel.read().map(Some)
         }
     }
+
+    /// Deserialize an Option of this type from the given [`Parcel`] onto the
+    /// current object. This operation will overwrite the current value
+    /// partially or completely, depending on how much data is available.
+    fn deserialize_option_from(this: &mut Option<Self>, parcel: &Parcel) -> Result<()> {
+        *this = Self::deserialize_option(parcel)?;
+        Ok(())
+    }
 }
 
 /// Callback to allocate a vector for parcel array read functions.
@@ -677,6 +693,75 @@
     fn deserialize(parcel: &Parcel) -> Result<Self> {
         DeserializeOption::deserialize_option(parcel)
     }
+
+    fn deserialize_from(&mut self, parcel: &Parcel) -> Result<()> {
+        DeserializeOption::deserialize_option_from(self, parcel)
+    }
+}
+
+/// Implement `Deserialize` trait and friends for a parcelable
+///
+/// This is an internal macro used by the AIDL compiler to implement
+/// `Deserialize`, `DeserializeArray` and `DeserializeOption` for
+/// structured parcelables. The target type must implement a
+/// `deserialize_parcelable` method with the following signature:
+/// ```no_run
+/// fn deserialize_parcelable(
+///     &mut self,
+///     parcel: &binder::parcel::Parcelable,
+/// ) -> binder::Result<()> {
+///     // ...
+/// }
+/// ```
+#[macro_export]
+macro_rules! impl_deserialize_for_parcelable {
+    ($parcelable:ident) => {
+        impl $crate::parcel::Deserialize for $parcelable {
+            fn deserialize(
+                parcel: &$crate::parcel::Parcel,
+            ) -> $crate::Result<Self> {
+                $crate::parcel::DeserializeOption::deserialize_option(parcel)
+                    .transpose()
+                    .unwrap_or(Err($crate::StatusCode::UNEXPECTED_NULL))
+            }
+            fn deserialize_from(
+                &mut self,
+                parcel: &$crate::parcel::Parcel,
+            ) -> $crate::Result<()> {
+                let status: i32 = parcel.read()?;
+                if status == 0 {
+                    Err($crate::StatusCode::UNEXPECTED_NULL)
+                } else {
+                    self.deserialize_parcelable(parcel)
+                }
+            }
+        }
+
+        impl $crate::parcel::DeserializeArray for $parcelable {}
+
+        impl $crate::parcel::DeserializeOption for $parcelable {
+            fn deserialize_option(
+                parcel: &$crate::parcel::Parcel,
+            ) -> $crate::Result<Option<Self>> {
+                let mut result = None;
+                Self::deserialize_option_from(&mut result, parcel)?;
+                Ok(result)
+            }
+            fn deserialize_option_from(
+                this: &mut Option<Self>,
+                parcel: &$crate::parcel::Parcel,
+            ) -> $crate::Result<()> {
+                let status: i32 = parcel.read()?;
+                if status == 0 {
+                    *this = None;
+                    Ok(())
+                } else {
+                    this.get_or_insert_with(Self::default)
+                        .deserialize_parcelable(parcel)
+                }
+            }
+        }
+    }
 }
 
 #[test]
diff --git a/libs/binder/rust/tests/integration.rs b/libs/binder/rust/tests/integration.rs
index 10b77f4..da8907d 100644
--- a/libs/binder/rust/tests/integration.rs
+++ b/libs/binder/rust/tests/integration.rs
@@ -23,6 +23,9 @@
     FIRST_CALL_TRANSACTION,
 };
 use std::convert::{TryFrom, TryInto};
+use std::ffi::CStr;
+use std::fs::File;
+use std::sync::Mutex;
 
 /// Name of service runner.
 ///
@@ -50,13 +53,11 @@
     let extension_name = args.next();
 
     {
-        let mut service = Binder::new(BnTest(Box::new(TestService {
-            s: service_name.clone(),
-        })));
+        let mut service = Binder::new(BnTest(Box::new(TestService::new(&service_name))));
         service.set_requesting_sid(true);
         if let Some(extension_name) = extension_name {
             let extension =
-                BnTest::new_binder(TestService { s: extension_name }, BinderFeatures::default());
+                BnTest::new_binder(TestService::new(&extension_name), BinderFeatures::default());
             service
                 .set_extension(&mut extension.as_binder())
                 .expect("Could not add extension");
@@ -80,14 +81,24 @@
     ));
 }
 
-#[derive(Clone)]
 struct TestService {
     s: String,
+    dump_args: Mutex<Vec<String>>,
+}
+
+impl TestService {
+    fn new(s: &str) -> Self {
+        Self {
+            s: s.to_string(),
+            dump_args: Mutex::new(Vec::new()),
+        }
+    }
 }
 
 #[repr(u32)]
 enum TestTransactionCode {
     Test = FIRST_CALL_TRANSACTION,
+    GetDumpArgs,
     GetSelinuxContext,
 }
 
@@ -97,6 +108,7 @@
     fn try_from(c: u32) -> Result<Self, Self::Error> {
         match c {
             _ if c == TestTransactionCode::Test as u32 => Ok(TestTransactionCode::Test),
+            _ if c == TestTransactionCode::GetDumpArgs as u32 => Ok(TestTransactionCode::GetDumpArgs),
             _ if c == TestTransactionCode::GetSelinuxContext as u32 => {
                 Ok(TestTransactionCode::GetSelinuxContext)
             }
@@ -105,13 +117,24 @@
     }
 }
 
-impl Interface for TestService {}
+impl Interface for TestService {
+    fn dump(&self, _file: &File, args: &[&CStr]) -> binder::Result<()> {
+        let mut dump_args = self.dump_args.lock().unwrap();
+        dump_args.extend(args.iter().map(|s| s.to_str().unwrap().to_owned()));
+        Ok(())
+    }
+}
 
 impl ITest for TestService {
     fn test(&self) -> binder::Result<String> {
         Ok(self.s.clone())
     }
 
+    fn get_dump_args(&self) -> binder::Result<Vec<String>> {
+        let args = self.dump_args.lock().unwrap().clone();
+        Ok(args)
+    }
+
     fn get_selinux_context(&self) -> binder::Result<String> {
         let sid =
             ThreadState::with_calling_sid(|sid| sid.map(|s| s.to_string_lossy().into_owned()));
@@ -124,6 +147,9 @@
     /// Returns a test string
     fn test(&self) -> binder::Result<String>;
 
+    /// Return the arguments sent via dump
+    fn get_dump_args(&self) -> binder::Result<Vec<String>>;
+
     /// Returns the caller's SELinux context
     fn get_selinux_context(&self) -> binder::Result<String>;
 }
@@ -145,6 +171,7 @@
 ) -> binder::Result<()> {
     match code.try_into()? {
         TestTransactionCode::Test => reply.write(&service.test()?),
+        TestTransactionCode::GetDumpArgs => reply.write(&service.get_dump_args()?),
         TestTransactionCode::GetSelinuxContext => reply.write(&service.get_selinux_context()?),
     }
 }
@@ -157,6 +184,13 @@
         reply.read()
     }
 
+    fn get_dump_args(&self) -> binder::Result<Vec<String>> {
+        let reply =
+            self.binder
+                .transact(TestTransactionCode::GetDumpArgs as TransactionCode, 0, |_| Ok(()))?;
+        reply.read()
+    }
+
     fn get_selinux_context(&self) -> binder::Result<String> {
         let reply = self.binder.transact(
             TestTransactionCode::GetSelinuxContext as TransactionCode,
@@ -172,6 +206,10 @@
         self.0.test()
     }
 
+    fn get_dump_args(&self) -> binder::Result<Vec<String>> {
+        self.0.get_dump_args()
+    }
+
     fn get_selinux_context(&self) -> binder::Result<String> {
         self.0.get_selinux_context()
     }
@@ -432,18 +470,22 @@
         {
             let _process = ScopedServiceProcess::new(service_name);
 
-            let mut remote = binder::get_service(service_name);
+            let test_client: Strong<dyn ITest> =
+                binder::get_interface(service_name)
+                .expect("Did not get test binder service");
+            let mut remote = test_client.as_binder();
             assert!(remote.is_binder_alive());
             remote.ping_binder().expect("Could not ping remote service");
 
-            // We're not testing the output of dump here, as that's really a
-            // property of the C++ implementation. There is the risk that the
-            // method just does nothing, but we don't want to depend on any
-            // particular output from the underlying library.
+            let dump_args = ["dump", "args", "for", "testing"];
+
             let null_out = File::open("/dev/null").expect("Could not open /dev/null");
             remote
-                .dump(&null_out, &[])
+                .dump(&null_out, &dump_args)
                 .expect("Could not dump remote service");
+
+            let remote_args = test_client.get_dump_args().expect("Could not fetched dumped args");
+            assert_eq!(dump_args, remote_args[..], "Remote args don't match call to dump");
         }
 
         // get/set_extensions is tested in test_extensions()
@@ -504,9 +546,7 @@
     /// rust_ndk_interop.rs
     #[test]
     fn associate_existing_class() {
-        let service = Binder::new(BnTest(Box::new(TestService {
-            s: "testing_service".to_string(),
-        })));
+        let service = Binder::new(BnTest(Box::new(TestService::new("testing_service"))));
 
         // This should succeed although we will have to treat the service as
         // remote.
@@ -520,9 +560,7 @@
     fn reassociate_rust_binder() {
         let service_name = "testing_service";
         let service_ibinder = BnTest::new_binder(
-            TestService {
-                s: service_name.to_string(),
-            },
+            TestService::new(service_name),
             BinderFeatures::default(),
         )
         .as_binder();
@@ -538,9 +576,7 @@
     fn weak_binder_upgrade() {
         let service_name = "testing_service";
         let service = BnTest::new_binder(
-            TestService {
-                s: service_name.to_string(),
-            },
+            TestService::new(service_name),
             BinderFeatures::default(),
         );
 
@@ -556,9 +592,7 @@
         let service_name = "testing_service";
         let weak = {
             let service = BnTest::new_binder(
-                TestService {
-                    s: service_name.to_string(),
-                },
+                TestService::new(service_name),
                 BinderFeatures::default(),
             );
 
@@ -572,9 +606,7 @@
     fn weak_binder_clone() {
         let service_name = "testing_service";
         let service = BnTest::new_binder(
-            TestService {
-                s: service_name.to_string(),
-            },
+            TestService::new(service_name),
             BinderFeatures::default(),
         );
 
@@ -593,15 +625,11 @@
     #[allow(clippy::eq_op)]
     fn binder_ord() {
         let service1 = BnTest::new_binder(
-            TestService {
-                s: "testing_service1".to_string(),
-            },
+            TestService::new("testing_service1"),
             BinderFeatures::default(),
         );
         let service2 = BnTest::new_binder(
-            TestService {
-                s: "testing_service2".to_string(),
-            },
+            TestService::new("testing_service2"),
             BinderFeatures::default(),
         );
 
diff --git a/libs/binder/tests/IBinderRpcTest.aidl b/libs/binder/tests/IBinderRpcTest.aidl
index b0c8b2d..9e10788 100644
--- a/libs/binder/tests/IBinderRpcTest.aidl
+++ b/libs/binder/tests/IBinderRpcTest.aidl
@@ -55,6 +55,7 @@
     oneway void sleepMsAsync(int ms);
 
     void doCallback(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value);
+    oneway void doCallbackAsync(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value);
 
     void die(boolean cleanup);
     void scheduleShutdown();
diff --git a/libs/binder/tests/binderLibTest.cpp b/libs/binder/tests/binderLibTest.cpp
index 1ecb0b7..4c3225f 100644
--- a/libs/binder/tests/binderLibTest.cpp
+++ b/libs/binder/tests/binderLibTest.cpp
@@ -114,7 +114,6 @@
     BINDER_LIB_TEST_ECHO_VECTOR,
     BINDER_LIB_TEST_REJECT_BUF,
     BINDER_LIB_TEST_CAN_GET_SID,
-    BINDER_LIB_TEST_CREATE_TEST_SERVICE,
 };
 
 pid_t start_server_process(int arg2, bool usePoll = false)
@@ -1256,89 +1255,6 @@
 INSTANTIATE_TEST_CASE_P(BinderLibTest, BinderLibRpcTestP, testing::Bool(),
                         BinderLibRpcTestP::ParamToString);
 
-class BinderLibTestService;
-class BinderLibRpcClientTest : public BinderLibRpcTestBase,
-                               public WithParamInterface<std::tuple<bool, uint32_t>> {
-public:
-    static std::string ParamToString(const testing::TestParamInfo<ParamType> &info) {
-        auto [isRemote, numThreads] = info.param;
-        return (isRemote ? "remote" : "local") + "_server_with_"s + std::to_string(numThreads) +
-                "_threads";
-    }
-    sp<IBinder> CreateRemoteService(int32_t id) {
-        Parcel data, reply;
-        status_t status = data.writeInt32(id);
-        EXPECT_THAT(status, StatusEq(OK));
-        if (status != OK) return nullptr;
-        status = m_server->transact(BINDER_LIB_TEST_CREATE_TEST_SERVICE, data, &reply);
-        EXPECT_THAT(status, StatusEq(OK));
-        if (status != OK) return nullptr;
-        sp<IBinder> ret;
-        status = reply.readStrongBinder(&ret);
-        EXPECT_THAT(status, StatusEq(OK));
-        if (status != OK) return nullptr;
-        return ret;
-    }
-};
-
-TEST_P(BinderLibRpcClientTest, Test) {
-    auto [isRemote, numThreadsParam] = GetParam();
-    uint32_t numThreads = numThreadsParam; // ... to be captured in lambda
-    int32_t id = 0xC0FFEE00 + numThreads;
-    sp<IBinder> server = isRemote ? sp<IBinder>(CreateRemoteService(id))
-                                  : sp<IBinder>(sp<BinderLibTestService>::make(id, false));
-    ASSERT_EQ(isRemote, !!server->remoteBinder());
-    ASSERT_THAT(GetId(server), HasValue(id));
-
-    unsigned int port = 0;
-    // Fake servicedispatcher.
-    {
-        auto [socket, socketPort] = CreateSocket();
-        ASSERT_TRUE(socket.ok());
-        port = socketPort;
-        ASSERT_THAT(server->setRpcClientDebug(std::move(socket)), StatusEq(OK));
-    }
-
-    std::mutex mutex;
-    std::condition_variable cv;
-    bool start = false;
-
-    auto threadFn = [&](size_t threadNum) {
-        usleep(threadNum * 10 * 1000); // threadNum * 10ms. Need this to avoid SYN flooding.
-        auto rpcSession = RpcSession::make();
-        ASSERT_TRUE(rpcSession->setupInetClient("127.0.0.1", port));
-        auto rpcServerBinder = rpcSession->getRootObject();
-        ASSERT_NE(nullptr, rpcServerBinder);
-        // Check that |rpcServerBinder| and |server| points to the same service.
-        EXPECT_THAT(GetId(rpcServerBinder), HasValue(id)) << "For thread #" << threadNum;
-
-        {
-            std::unique_lock<std::mutex> lock(mutex);
-            ASSERT_TRUE(cv.wait_for(lock, 1s, [&] { return start; }));
-        }
-        // Let all threads almost simultaneously ping the service.
-        for (size_t i = 0; i < 100; ++i) {
-            EXPECT_THAT(rpcServerBinder->pingBinder(), StatusEq(OK))
-                    << "For thread #" << threadNum << ", iteration " << i;
-        }
-    };
-
-    std::vector<std::thread> threads;
-    for (size_t i = 0; i < numThreads; ++i) threads.emplace_back(std::bind(threadFn, i));
-
-    {
-        std::lock_guard<std::mutex> lock(mutex);
-        start = true;
-    }
-    cv.notify_all();
-
-    for (auto &t : threads) t.join();
-}
-
-INSTANTIATE_TEST_CASE_P(BinderLibTest, BinderLibRpcClientTest,
-                        testing::Combine(testing::Bool(), testing::Values(1u, 10u)),
-                        BinderLibRpcClientTest::ParamToString);
-
 class BinderLibTestService : public BBinder {
 public:
     explicit BinderLibTestService(int32_t id, bool exitOnDestroy = true)
@@ -1653,12 +1569,6 @@
             case BINDER_LIB_TEST_CAN_GET_SID: {
                 return IPCThreadState::self()->getCallingSid() == nullptr ? BAD_VALUE : NO_ERROR;
             }
-            case BINDER_LIB_TEST_CREATE_TEST_SERVICE: {
-                int32_t id;
-                if (status_t status = data.readInt32(&id); status != NO_ERROR) return status;
-                reply->writeStrongBinder(sp<BinderLibTestService>::make(id, false));
-                return NO_ERROR;
-            }
             default:
                 return UNKNOWN_TRANSACTION;
         };
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index a79295a..368a24e 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -214,7 +214,8 @@
         if (delayed) {
             std::thread([=]() {
                 ALOGE("Executing delayed callback: '%s'", value.c_str());
-                (void)doCallback(callback, oneway, false, value);
+                Status status = doCallback(callback, oneway, false, value);
+                ALOGE("Delayed callback status: '%s'", status.toString8().c_str());
             }).detach();
             return Status::ok();
         }
@@ -226,6 +227,11 @@
         return callback->sendCallback(value);
     }
 
+    Status doCallbackAsync(const sp<IBinderRpcCallback>& callback, bool oneway, bool delayed,
+                           const std::string& value) override {
+        return doCallback(callback, oneway, delayed, value);
+    }
+
     Status die(bool cleanup) override {
         if (cleanup) {
             exit(1);
@@ -298,6 +304,11 @@
     return temp + "/binderRpcTest_" + std::to_string(id++);
 };
 
+static unsigned int allocateVsockPort() {
+    static unsigned int vsockPort = 3456;
+    return vsockPort++;
+}
+
 struct ProcessSession {
     // reference to process hosting a socket server
     Process host;
@@ -385,6 +396,7 @@
             return "";
     }
 }
+
 class BinderRpc : public ::testing::TestWithParam<SocketType> {
 public:
     // This creates a new process serving an interface on a certain number of
@@ -396,10 +408,9 @@
 
         SocketType socketType = GetParam();
 
+        unsigned int vsockPort = allocateVsockPort();
         std::string addr = allocateSocketAddress();
         unlink(addr.c_str());
-        static unsigned int vsockPort = 3456;
-        vsockPort++;
 
         auto ret = ProcessSession{
                 .host = Process([&](Pipe* pipe) {
@@ -978,31 +989,42 @@
 TEST_P(BinderRpc, Callbacks) {
     const static std::string kTestString = "good afternoon!";
 
-    for (bool oneway : {true, false}) {
-        for (bool delayed : {true, false}) {
-            auto proc = createRpcTestSocketServerProcess(1, 1, 1);
-            auto cb = sp<MyBinderRpcCallback>::make();
+    for (bool callIsOneway : {true, false}) {
+        for (bool callbackIsOneway : {true, false}) {
+            for (bool delayed : {true, false}) {
+                auto proc = createRpcTestSocketServerProcess(1, 1, 1);
+                auto cb = sp<MyBinderRpcCallback>::make();
 
-            EXPECT_OK(proc.rootIface->doCallback(cb, oneway, delayed, kTestString));
+                if (callIsOneway) {
+                    EXPECT_OK(proc.rootIface->doCallbackAsync(cb, callbackIsOneway, delayed,
+                                                              kTestString));
+                } else {
+                    EXPECT_OK(
+                            proc.rootIface->doCallback(cb, callbackIsOneway, delayed, kTestString));
+                }
 
-            using std::literals::chrono_literals::operator""s;
-            std::unique_lock<std::mutex> _l(cb->mMutex);
-            cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); });
+                using std::literals::chrono_literals::operator""s;
+                std::unique_lock<std::mutex> _l(cb->mMutex);
+                cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); });
 
-            EXPECT_EQ(cb->mValues.size(), 1) << "oneway: " << oneway << "delayed: " << delayed;
-            if (cb->mValues.empty()) continue;
-            EXPECT_EQ(cb->mValues.at(0), kTestString)
-                    << "oneway: " << oneway << "delayed: " << delayed;
+                EXPECT_EQ(cb->mValues.size(), 1)
+                        << "callIsOneway: " << callIsOneway
+                        << " callbackIsOneway: " << callbackIsOneway << " delayed: " << delayed;
+                if (cb->mValues.empty()) continue;
+                EXPECT_EQ(cb->mValues.at(0), kTestString)
+                        << "callIsOneway: " << callIsOneway
+                        << " callbackIsOneway: " << callbackIsOneway << " delayed: " << delayed;
 
-            // since we are severing the connection, we need to go ahead and
-            // tell the server to shutdown and exit so that waitpid won't hang
-            EXPECT_OK(proc.rootIface->scheduleShutdown());
+                // since we are severing the connection, we need to go ahead and
+                // tell the server to shutdown and exit so that waitpid won't hang
+                EXPECT_OK(proc.rootIface->scheduleShutdown());
 
-            // since this session has a reverse connection w/ a threadpool, we
-            // need to manually shut it down
-            EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));
+                // since this session has a reverse connection w/ a threadpool, we
+                // need to manually shut it down
+                EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));
 
-            proc.expectAlreadyShutdown = true;
+                proc.expectAlreadyShutdown = true;
+            }
         }
     }
 }
@@ -1091,15 +1113,33 @@
     ASSERT_EQ(beforeFds, countFds()) << (system("ls -l /proc/self/fd/"), "fd leak?");
 }
 
-INSTANTIATE_TEST_CASE_P(PerSocket, BinderRpc,
-                        ::testing::ValuesIn({
-                                SocketType::UNIX,
-// TODO(b/185269356): working on host
-#ifdef __BIONIC__
-                                SocketType::VSOCK,
-#endif
-                                SocketType::INET,
-                        }),
+static bool testSupportVsockLoopback() {
+    unsigned int vsockPort = allocateVsockPort();
+    sp<RpcServer> server = RpcServer::make();
+    server->iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction();
+    CHECK(server->setupVsockServer(vsockPort));
+    server->start();
+
+    sp<RpcSession> session = RpcSession::make();
+    bool okay = session->setupVsockClient(VMADDR_CID_LOCAL, vsockPort);
+    CHECK(server->shutdown());
+    ALOGE("Detected vsock loopback supported: %d", okay);
+    return okay;
+}
+
+static std::vector<SocketType> testSocketTypes() {
+    std::vector<SocketType> ret = {SocketType::UNIX, SocketType::INET};
+
+    static bool hasVsockLoopback = testSupportVsockLoopback();
+
+    if (hasVsockLoopback) {
+        ret.push_back(SocketType::VSOCK);
+    }
+
+    return ret;
+}
+
+INSTANTIATE_TEST_CASE_P(PerSocket, BinderRpc, ::testing::ValuesIn(testSocketTypes()),
                         PrintSocketType);
 
 class BinderRpcServerRootObject : public ::testing::TestWithParam<std::tuple<bool, bool>> {};
diff --git a/libs/binder/tests/parcel_fuzzer/hwbinder.cpp b/libs/binder/tests/parcel_fuzzer/hwbinder.cpp
index 0fec393..35b5ebc 100644
--- a/libs/binder/tests/parcel_fuzzer/hwbinder.cpp
+++ b/libs/binder/tests/parcel_fuzzer/hwbinder.cpp
@@ -148,28 +148,6 @@
         // should be null since we don't create any IPC objects
         CHECK(data == nullptr) << data;
     },
-    [] (const ::android::hardware::Parcel& p, uint8_t size) {
-        FUZZ_LOG() << "about to readEmbeddedNativeHandle";
-        size_t parent_buffer_handle = size & 0xf;
-        size_t parent_offset = size >> 4;
-        const native_handle_t* handle = nullptr;
-        status_t status = p.readEmbeddedNativeHandle(parent_buffer_handle, parent_offset, &handle);
-        FUZZ_LOG() << "readEmbeddedNativeHandle status: " << status << " handle: " << handle << " handle: " << handle;
-
-        // should be null since we don't create any IPC objects
-        CHECK(handle == nullptr) << handle;
-    },
-    [] (const ::android::hardware::Parcel& p, uint8_t size) {
-        FUZZ_LOG() << "about to readNullableEmbeddedNativeHandle";
-        size_t parent_buffer_handle = size & 0xf;
-        size_t parent_offset = size >> 4;
-        const native_handle_t* handle = nullptr;
-        status_t status = p.readNullableEmbeddedNativeHandle(parent_buffer_handle, parent_offset, &handle);
-        FUZZ_LOG() << "readNullableEmbeddedNativeHandle status: " << status << " handle: " << handle << " handle: " << handle;
-
-        // should be null since we don't create any IPC objects
-        CHECK(handle == nullptr) << handle;
-    },
     [] (const ::android::hardware::Parcel& p, uint8_t /*data*/) {
         FUZZ_LOG() << "about to readNativeHandleNoDup";
         const native_handle_t* handle = nullptr;
@@ -180,14 +158,5 @@
         CHECK(handle == nullptr) << handle;
         CHECK(status != ::android::OK);
     },
-    [] (const ::android::hardware::Parcel& p, uint8_t /*data*/) {
-        FUZZ_LOG() << "about to readNullableNativeHandleNoDup";
-        const native_handle_t* handle = nullptr;
-        status_t status = p.readNullableNativeHandleNoDup(&handle);
-        FUZZ_LOG() << "readNullableNativeHandleNoDup status: " << status << " handle: " << handle;
-
-        // should be null since we don't create any IPC objects
-        CHECK(handle == nullptr) << handle;
-    },
 };
 // clang-format on