Merge changes Ibe1a854b,I8c281ad2,I9e290cd0,I0035be2d

* changes:
  libbinder: RPC limit on oneway transactions
  libbinder: RPC state termination shutdown session
  libbinder: server sessions shut down independently
  libbinder: remove unused FdTrigger::readFd
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 2f378da..2d2eed2 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -187,6 +187,11 @@
     }
 
     mShutdownTrigger->trigger();
+    for (auto& [id, session] : mSessions) {
+        (void)id;
+        session->mShutdownTrigger->trigger();
+    }
+
     while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
         if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) {
             ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, "
@@ -261,7 +266,7 @@
         };
         server->mConnectingThreads.erase(threadId);
 
-        if (!idValid) {
+        if (!idValid || server->mShutdownTrigger->isTriggered()) {
             return;
         }
 
@@ -276,10 +281,14 @@
 
             session = RpcSession::make();
             session->setMaxThreads(server->mMaxThreads);
-            session->setForServer(server,
-                                  sp<RpcServer::EventListener>::fromExisting(
-                                          static_cast<RpcServer::EventListener*>(server.get())),
-                                  server->mSessionIdCounter, server->mShutdownTrigger);
+            if (!session->setForServer(server,
+                                       sp<RpcServer::EventListener>::fromExisting(
+                                               static_cast<RpcServer::EventListener*>(
+                                                       server.get())),
+                                       server->mSessionIdCounter)) {
+                ALOGE("Failed to attach server to session");
+                return;
+            }
 
             server->mSessions[server->mSessionIdCounter] = session;
         } else {
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index c563377..62118ff 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -113,17 +113,21 @@
     return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
 }
 
-bool RpcSession::shutdown() {
+bool RpcSession::shutdownAndWait(bool wait) {
     std::unique_lock<std::mutex> _l(mMutex);
-    LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session");
     LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed");
-    LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
 
     mShutdownTrigger->trigger();
-    mShutdownListener->waitForShutdown(_l);
-    mState->terminate();
 
-    LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
+    if (wait) {
+        LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
+        mShutdownListener->waitForShutdown(_l);
+        LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
+    }
+
+    _l.unlock();
+    mState->clear();
+
     return true;
 }
 
@@ -139,12 +143,15 @@
 status_t RpcSession::sendDecStrong(const RpcAddress& address) {
     ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
                                    ConnectionUse::CLIENT_REFCOUNT);
-    return state()->sendDecStrong(connection.fd(), address);
+    return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address);
 }
 
 std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
     auto ret = std::make_unique<RpcSession::FdTrigger>();
-    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr;
+    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
+        ALOGE("Could not create pipe %s", strerror(errno));
+        return nullptr;
+    }
     return ret;
 }
 
@@ -152,6 +159,10 @@
     mWrite.reset();
 }
 
+bool RpcSession::FdTrigger::isTriggered() {
+    return mWrite == -1;
+}
+
 status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
     while (true) {
         pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0},
@@ -408,20 +419,21 @@
     return true;
 }
 
-void RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
-                              int32_t sessionId,
-                              const std::shared_ptr<FdTrigger>& shutdownTrigger) {
+bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
+                              int32_t sessionId) {
     LOG_ALWAYS_FATAL_IF(mForServer != nullptr);
     LOG_ALWAYS_FATAL_IF(server == nullptr);
     LOG_ALWAYS_FATAL_IF(mEventListener != nullptr);
     LOG_ALWAYS_FATAL_IF(eventListener == nullptr);
     LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
-    LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr);
+
+    mShutdownTrigger = FdTrigger::make();
+    if (mShutdownTrigger == nullptr) return false;
 
     mId = sessionId;
     mForServer = server;
     mEventListener = eventListener;
-    mShutdownTrigger = shutdownTrigger;
+    return true;
 }
 
 sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 93f1529..3113841 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -137,9 +137,39 @@
     dumpLocked();
 }
 
-void RpcState::terminate() {
+void RpcState::clear() {
     std::unique_lock<std::mutex> _l(mNodeMutex);
-    terminate(_l);
+
+    if (mTerminated) {
+        LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(),
+                            "New state should be impossible after terminating!");
+        return;
+    }
+
+    if (SHOULD_LOG_RPC_DETAIL) {
+        ALOGE("RpcState::clear()");
+        dumpLocked();
+    }
+
+    // if the destructor of a binder object makes another RPC call, then calling
+    // decStrong could deadlock. So, we must hold onto these binders until
+    // mNodeMutex is no longer taken.
+    std::vector<sp<IBinder>> tempHoldBinder;
+
+    mTerminated = true;
+    for (auto& [address, node] : mNodeForAddress) {
+        sp<IBinder> binder = node.binder.promote();
+        LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());
+
+        if (node.sentRef != nullptr) {
+            tempHoldBinder.push_back(node.sentRef);
+        }
+    }
+
+    mNodeForAddress.clear();
+
+    _l.unlock();
+    tempHoldBinder.clear(); // explicit
 }
 
 void RpcState::dumpLocked() {
@@ -170,32 +200,6 @@
     ALOGE("END DUMP OF RpcState");
 }
 
-void RpcState::terminate(std::unique_lock<std::mutex>& lock) {
-    if (SHOULD_LOG_RPC_DETAIL) {
-        ALOGE("RpcState::terminate()");
-        dumpLocked();
-    }
-
-    // if the destructor of a binder object makes another RPC call, then calling
-    // decStrong could deadlock. So, we must hold onto these binders until
-    // mNodeMutex is no longer taken.
-    std::vector<sp<IBinder>> tempHoldBinder;
-
-    mTerminated = true;
-    for (auto& [address, node] : mNodeForAddress) {
-        sp<IBinder> binder = node.binder.promote();
-        LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());
-
-        if (node.sentRef != nullptr) {
-            tempHoldBinder.push_back(node.sentRef);
-        }
-    }
-
-    mNodeForAddress.clear();
-
-    lock.unlock();
-    tempHoldBinder.clear(); // explicit
-}
 
 RpcState::CommandData::CommandData(size_t size) : mSize(size) {
     // The maximum size for regular binder is 1MB for all concurrent
@@ -218,13 +222,13 @@
     mData.reset(new (std::nothrow) uint8_t[size]);
 }
 
-status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data,
-                           size_t size) {
+status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
+                           const char* what, const void* data, size_t size) {
     LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
 
     if (size > std::numeric_limits<ssize_t>::max()) {
         ALOGE("Cannot send %s at size %zu (too big)", what, size);
-        terminate();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
 
@@ -235,7 +239,7 @@
         LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
                        size, fd.get(), strerror(savedErrno));
 
-        terminate();
+        (void)session->shutdownAndWait(false);
         return -savedErrno;
     }
 
@@ -246,7 +250,7 @@
                           const char* what, void* data, size_t size) {
     if (size > std::numeric_limits<ssize_t>::max()) {
         ALOGE("Cannot rec %s at size %zu (too big)", what, size);
-        terminate();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
 
@@ -358,7 +362,11 @@
 
         if (flags & IBinder::FLAG_ONEWAY) {
             asyncNumber = it->second.asyncNumber;
-            if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
+            if (!nodeProgressAsyncNumber(&it->second)) {
+                _l.unlock();
+                (void)session->shutdownAndWait(false);
+                return DEAD_OBJECT;
+            }
         }
     }
 
@@ -390,7 +398,7 @@
            data.dataSize());
 
     if (status_t status =
-                rpcSend(fd, "transaction", transactionData.data(), transactionData.size());
+                rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size());
         status != OK)
         return status;
 
@@ -442,7 +450,7 @@
     if (command.bodySize < sizeof(RpcWireReply)) {
         ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!",
               sizeof(RpcWireReply), command.bodySize);
-        terminate();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
     RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data.data());
@@ -457,7 +465,8 @@
     return OK;
 }
 
-status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& addr) {
+status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
+                                 const RpcAddress& addr) {
     {
         std::lock_guard<std::mutex> _l(mNodeMutex);
         if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
@@ -476,10 +485,10 @@
             .command = RPC_COMMAND_DEC_STRONG,
             .bodySize = sizeof(RpcWireAddress),
     };
-    if (status_t status = rpcSend(fd, "dec ref header", &cmd, sizeof(cmd)); status != OK)
+    if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK)
         return status;
-    if (status_t status =
-                rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress));
+    if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(),
+                                  sizeof(RpcWireAddress));
         status != OK)
         return status;
     return OK;
@@ -538,7 +547,7 @@
     // also can't consider it a fatal error because this would allow any client
     // to kill us, so ending the session for misbehaving client.
     ALOGE("Unknown RPC command %d - terminating session", command.command);
-    terminate();
+    (void)session->shutdownAndWait(false);
     return DEAD_OBJECT;
 }
 status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -571,7 +580,7 @@
     if (transactionData.size() < sizeof(RpcWireTransaction)) {
         ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
               sizeof(RpcWireTransaction), transactionData.size());
-        terminate();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
     RpcWireTransaction* transaction = reinterpret_cast<RpcWireTransaction*>(transactionData.data());
@@ -600,15 +609,15 @@
             // session.
             ALOGE("While transacting, binder has been deleted at address %s. Terminating!",
                   addr.toString().c_str());
-            terminate();
+            (void)session->shutdownAndWait(false);
             replyStatus = BAD_VALUE;
         } else if (target->localBinder() == nullptr) {
             ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!",
                   addr.toString().c_str());
-            terminate();
+            (void)session->shutdownAndWait(false);
             replyStatus = BAD_VALUE;
         } else if (transaction->flags & IBinder::FLAG_ONEWAY) {
-            std::lock_guard<std::mutex> _l(mNodeMutex);
+            std::unique_lock<std::mutex> _l(mNodeMutex);
             auto it = mNodeForAddress.find(addr);
             if (it->second.binder.promote() != target) {
                 ALOGE("Binder became invalid during transaction. Bad client? %s",
@@ -617,16 +626,33 @@
             } else if (transaction->asyncNumber != it->second.asyncNumber) {
                 // we need to process some other asynchronous transaction
                 // first
-                // TODO(b/183140903): limit enqueues/detect overfill for bad client
-                // TODO(b/183140903): detect when an object is deleted when it still has
-                //        pending async transactions
                 it->second.asyncTodo.push(BinderNode::AsyncTodo{
                         .ref = target,
                         .data = std::move(transactionData),
                         .asyncNumber = transaction->asyncNumber,
                 });
-                LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s", transaction->asyncNumber,
-                               addr.toString().c_str());
+
+                size_t numPending = it->second.asyncTodo.size();
+                LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s (%zu pending)",
+                               transaction->asyncNumber, addr.toString().c_str(), numPending);
+
+                constexpr size_t kArbitraryOnewayCallTerminateLevel = 10000;
+                constexpr size_t kArbitraryOnewayCallWarnLevel = 1000;
+                constexpr size_t kArbitraryOnewayCallWarnPer = 1000;
+
+                if (numPending >= kArbitraryOnewayCallWarnLevel) {
+                    if (numPending >= kArbitraryOnewayCallTerminateLevel) {
+                        ALOGE("WARNING: %zu pending oneway transactions. Terminating!", numPending);
+                        _l.unlock();
+                        (void)session->shutdownAndWait(false);
+                        return FAILED_TRANSACTION;
+                    }
+
+                    if (numPending % kArbitraryOnewayCallWarnPer == 0) {
+                        ALOGW("Warning: many oneway transactions built up on %p (%zu)",
+                              target.get(), numPending);
+                    }
+                }
                 return OK;
             }
         }
@@ -707,7 +733,11 @@
             // last refcount dropped after this transaction happened
             if (it == mNodeForAddress.end()) return OK;
 
-            if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
+            if (!nodeProgressAsyncNumber(&it->second)) {
+                _l.unlock();
+                (void)session->shutdownAndWait(false);
+                return DEAD_OBJECT;
+            }
 
             if (it->second.asyncTodo.size() == 0) return OK;
             if (it->second.asyncTodo.top().asyncNumber == it->second.asyncNumber) {
@@ -753,7 +783,7 @@
     memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(),
            reply.dataSize());
 
-    return rpcSend(fd, "reply", replyData.data(), replyData.size());
+    return rpcSend(fd, session, "reply", replyData.data(), replyData.size());
 }
 
 status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -772,7 +802,7 @@
     if (command.bodySize < sizeof(RpcWireAddress)) {
         ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireAddress. Terminating!",
               sizeof(RpcWireAddress), command.bodySize);
-        terminate();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
     RpcWireAddress* address = reinterpret_cast<RpcWireAddress*>(commandData.data());
@@ -790,7 +820,8 @@
     if (target == nullptr) {
         ALOGE("While requesting dec strong, binder has been deleted at address %s. Terminating!",
               addr.toString().c_str());
-        terminate();
+        _l.unlock();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
 
@@ -826,12 +857,11 @@
     return ref;
 }
 
-bool RpcState::nodeProgressAsyncNumber(BinderNode* node, std::unique_lock<std::mutex>& lock) {
+bool RpcState::nodeProgressAsyncNumber(BinderNode* node) {
     // 2**64 =~ 10**19 =~ 1000 transactions per second for 585 million years to
     // a single binder
     if (node->asyncNumber >= std::numeric_limits<decltype(node->asyncNumber)>::max()) {
         ALOGE("Out of async transaction IDs. Terminating");
-        terminate(lock);
         return false;
     }
     node->asyncNumber++;
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index 81ff458..13c3115 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -65,7 +65,8 @@
                                            uint32_t code, const Parcel& data,
                                            const sp<RpcSession>& session, Parcel* reply,
                                            uint32_t flags);
-    [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address);
+    [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
+                                         const RpcAddress& address);
 
     enum class CommandType {
         ANY,
@@ -110,11 +111,10 @@
      * WARNING: RpcState is responsible for calling this when the session is
      * no longer recoverable.
      */
-    void terminate();
+    void clear();
 
 private:
     void dumpLocked();
-    void terminate(std::unique_lock<std::mutex>& lock);
 
     // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
     // large allocations to avoid being requested from allocating too much data.
@@ -130,8 +130,8 @@
         size_t mSize;
     };
 
-    [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const char* what, const void* data,
-                                   size_t size);
+    [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
+                                   const char* what, const void* data, size_t size);
     [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
                                   const char* what, void* data, size_t size);
 
@@ -204,9 +204,8 @@
     // dropped after any locks are removed.
     sp<IBinder> tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& it);
     // true - success
-    // false - state terminated, lock gone, halt
-    [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node,
-                                               std::unique_lock<std::mutex>& lock);
+    // false - session shutdown, halt
+    [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node);
 
     std::mutex mNodeMutex;
     bool mTerminated = false;
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 98db221..b88bf50 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -173,7 +173,7 @@
     wp<IBinder> mRootObjectWeak;
     std::map<int32_t, sp<RpcSession>> mSessions;
     int32_t mSessionIdCounter = 0;
-    std::shared_ptr<RpcSession::FdTrigger> mShutdownTrigger;
+    std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
     std::condition_variable mShutdownCv;
 };
 
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index a6bc1a9..7aa6d02 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -97,14 +97,20 @@
     status_t getRemoteMaxThreads(size_t* maxThreads);
 
     /**
-     * Shuts down the service. Only works for client sessions (server-side
-     * sessions currently only support shutting down the entire server).
+     * Shuts down the service.
+     *
+     * For client sessions, wait can be true or false. For server sessions,
+     * waiting is not currently supported (will abort).
      *
      * Warning: this is currently not active/nice (the server isn't told we're
      * shutting down). Being nicer to the server could potentially make it
      * reclaim resources faster.
+     *
+     * If this is called w/ 'wait' true, then this will wait for shutdown to
+     * complete before returning. This will hang if it is called from the
+     * session threadpool (when processing received calls).
      */
-    [[nodiscard]] bool shutdown();
+    [[nodiscard]] bool shutdownAndWait(bool wait);
 
     [[nodiscard]] status_t transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
                                     Parcel* reply, uint32_t flags);
@@ -129,16 +135,17 @@
         static std::unique_ptr<FdTrigger> make();
 
         /**
-         * poll() on this fd for POLLHUP to get notification when trigger is called
-         */
-        base::borrowed_fd readFd() const { return mRead; }
-
-        /**
          * Close the write end of the pipe so that the read end receives POLLHUP.
+         * Not threadsafe.
          */
         void trigger();
 
         /**
+         * Whether this has been triggered.
+         */
+        bool isTriggered();
+
+        /**
          * Poll for a read event.
          *
          * Return:
@@ -197,9 +204,9 @@
     [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId,
                                                 bool server);
     [[nodiscard]] bool addClientConnection(base::unique_fd fd);
-    void setForServer(const wp<RpcServer>& server,
-                      const wp<RpcSession::EventListener>& eventListener, int32_t sessionId,
-                      const std::shared_ptr<FdTrigger>& shutdownTrigger);
+    [[nodiscard]] bool setForServer(const wp<RpcServer>& server,
+                                    const wp<RpcSession::EventListener>& eventListener,
+                                    int32_t sessionId);
     sp<RpcConnection> assignServerToThisThread(base::unique_fd fd);
     [[nodiscard]] bool removeServerConnection(const sp<RpcConnection>& connection);
 
@@ -252,7 +259,7 @@
     // TODO(b/183988761): this shouldn't be guessable
     std::optional<int32_t> mId;
 
-    std::shared_ptr<FdTrigger> mShutdownTrigger;
+    std::unique_ptr<FdTrigger> mShutdownTrigger;
 
     std::unique_ptr<RpcState> mState;
 
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 0a970fb..82f8a3e 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -941,6 +941,40 @@
     for (auto& t : threads) t.join();
 }
 
+TEST_P(BinderRpc, OnewayCallExhaustion) {
+    constexpr size_t kNumClients = 2;
+    constexpr size_t kTooLongMs = 1000;
+
+    auto proc = createRpcTestSocketServerProcess(kNumClients /*threads*/, 2 /*sessions*/);
+
+    // Build up oneway calls on the second session to make sure it terminates
+    // and shuts down. The first session should be unaffected (proc destructor
+    // checks the first session).
+    auto iface = interface_cast<IBinderRpcTest>(proc.proc.sessions.at(1).root);
+
+    std::vector<std::thread> threads;
+    for (size_t i = 0; i < kNumClients; i++) {
+        // one of these threads will get stuck queueing a transaction once the
+        // socket fills up, the other will be able to fill up transactions on
+        // this object
+        threads.push_back(std::thread([&] {
+            while (iface->sleepMsAsync(kTooLongMs).isOk()) {
+            }
+        }));
+    }
+    for (auto& t : threads) t.join();
+
+    Status status = iface->sleepMsAsync(kTooLongMs);
+    EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status;
+
+    // the second session should be shutdown in the other process by the time we
+    // are able to join above (it'll only be hung up once it finishes processing
+    // any pending commands). We need to erase this session from the record
+    // here, so that the destructor for our session won't check that this
+    // session is valid, but we still want it to test the other session.
+    proc.proc.sessions.erase(proc.proc.sessions.begin() + 1);
+}
+
 TEST_P(BinderRpc, Callbacks) {
     const static std::string kTestString = "good afternoon!";
 
@@ -966,7 +1000,7 @@
 
             // since this session has a reverse connection w/ a threadpool, we
             // need to manually shut it down
-            EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdown());
+            EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));
 
             proc.expectAlreadyShutdown = true;
         }