Merge "binder: setRpcClientDebug drops numThreads argument."
diff --git a/cmds/installd/dexopt.cpp b/cmds/installd/dexopt.cpp
index 204953c..cc0434d 100644
--- a/cmds/installd/dexopt.cpp
+++ b/cmds/installd/dexopt.cpp
@@ -292,8 +292,8 @@
     }
 }
 
-static unique_fd create_profile(uid_t uid, const std::string& profile, int32_t flags) {
-    unique_fd fd(TEMP_FAILURE_RETRY(open(profile.c_str(), flags, 0600)));
+static unique_fd create_profile(uid_t uid, const std::string& profile, int32_t flags, mode_t mode) {
+    unique_fd fd(TEMP_FAILURE_RETRY(open(profile.c_str(), flags, mode)));
     if (fd.get() < 0) {
         if (errno != EEXIST) {
             PLOG(ERROR) << "Failed to create profile " << profile;
@@ -310,7 +310,7 @@
     return fd;
 }
 
-static unique_fd open_profile(uid_t uid, const std::string& profile, int32_t flags) {
+static unique_fd open_profile(uid_t uid, const std::string& profile, int32_t flags, mode_t mode) {
     // Do not follow symlinks when opening a profile:
     //   - primary profiles should not contain symlinks in their paths
     //   - secondary dex paths should have been already resolved and validated
@@ -320,7 +320,7 @@
     // Reference profiles and snapshots are created on the fly; so they might not exist beforehand.
     unique_fd fd;
     if ((flags & O_CREAT) != 0) {
-        fd = create_profile(uid, profile, flags);
+        fd = create_profile(uid, profile, flags, mode);
     } else {
         fd.reset(TEMP_FAILURE_RETRY(open(profile.c_str(), flags)));
     }
@@ -336,6 +336,16 @@
             PLOG(ERROR) << "Failed to open profile " << profile;
         }
         return invalid_unique_fd();
+    } else {
+        // If we just create the file we need to set its mode because on Android
+        // open has a mask that only allows owner access.
+        if ((flags & O_CREAT) != 0) {
+            if (fchmod(fd.get(), mode) != 0) {
+                PLOG(ERROR) << "Could not set mode " << std::hex << mode << std::dec
+                        << " on profile" << profile;
+                // Not a terminal failure.
+            }
+        }
     }
 
     return fd;
@@ -345,20 +355,29 @@
         const std::string& location, bool is_secondary_dex) {
     std::string profile = create_current_profile_path(user, package_name, location,
             is_secondary_dex);
-    return open_profile(uid, profile, O_RDONLY);
+    return open_profile(uid, profile, O_RDONLY, /*mode=*/ 0);
 }
 
 static unique_fd open_reference_profile(uid_t uid, const std::string& package_name,
         const std::string& location, bool read_write, bool is_secondary_dex) {
     std::string profile = create_reference_profile_path(package_name, location, is_secondary_dex);
-    return open_profile(uid, profile, read_write ? (O_CREAT | O_RDWR) : O_RDONLY);
+    return open_profile(
+        uid,
+        profile,
+        read_write ? (O_CREAT | O_RDWR) : O_RDONLY,
+        S_IRUSR | S_IWUSR | S_IRGRP);  // so that ART can also read it when apps run.
 }
 
 static UniqueFile open_reference_profile_as_unique_file(uid_t uid, const std::string& package_name,
         const std::string& location, bool read_write, bool is_secondary_dex) {
     std::string profile_path = create_reference_profile_path(package_name, location,
                                                              is_secondary_dex);
-    unique_fd ufd = open_profile(uid, profile_path, read_write ? (O_CREAT | O_RDWR) : O_RDONLY);
+    unique_fd ufd = open_profile(
+        uid,
+        profile_path,
+        read_write ? (O_CREAT | O_RDWR) : O_RDONLY,
+        S_IRUSR | S_IWUSR | S_IRGRP);  // so that ART can also read it when apps run.
+
     return UniqueFile(ufd.release(), profile_path, [](const std::string& path) {
         clear_profile(path);
     });
@@ -367,7 +386,7 @@
 static unique_fd open_spnashot_profile(uid_t uid, const std::string& package_name,
         const std::string& location) {
     std::string profile = create_snapshot_profile_path(package_name, location);
-    return open_profile(uid, profile, O_CREAT | O_RDWR | O_TRUNC);
+    return open_profile(uid, profile, O_CREAT | O_RDWR | O_TRUNC,  S_IRUSR | S_IWUSR);
 }
 
 static void open_profile_files(uid_t uid, const std::string& package_name,
@@ -2484,7 +2503,7 @@
     for (size_t i = 0; i < profiles.size(); )  {
         std::vector<unique_fd> profiles_fd;
         for (size_t k = 0; k < kAggregationBatchSize && i < profiles.size(); k++, i++) {
-            unique_fd fd = open_profile(AID_SYSTEM, profiles[i], O_RDONLY);
+            unique_fd fd = open_profile(AID_SYSTEM, profiles[i], O_RDONLY, /*mode=*/ 0);
             if (fd.get() >= 0) {
                 profiles_fd.push_back(std::move(fd));
             }
diff --git a/cmds/installd/tests/installd_dexopt_test.cpp b/cmds/installd/tests/installd_dexopt_test.cpp
index e272025..216347e 100644
--- a/cmds/installd/tests/installd_dexopt_test.cpp
+++ b/cmds/installd/tests/installd_dexopt_test.cpp
@@ -919,7 +919,7 @@
             return;
         }
 
-        // Check that the snapshot was created witht he expected acess flags.
+        // Check that the snapshot was created with the expected access flags.
         CheckFileAccess(snap_profile_, kSystemUid, kSystemGid, 0600 | S_IFREG);
 
         // The snapshot should be equivalent to the merge of profiles.
@@ -962,8 +962,8 @@
             return;
         }
 
-        // Check that the snapshot was created witht he expected acess flags.
-        CheckFileAccess(ref_profile_, kTestAppUid, kTestAppUid, 0600 | S_IFREG);
+        // Check that the snapshot was created with the expected access flags.
+        CheckFileAccess(ref_profile_, kTestAppUid, kTestAppUid, 0640 | S_IFREG);
 
         // The snapshot should be equivalent to the merge of profiles.
         std::string ref_profile_content = ref_profile_ + ".expected";
diff --git a/libs/binder/Parcel.cpp b/libs/binder/Parcel.cpp
index 232a70c..ebba375 100644
--- a/libs/binder/Parcel.cpp
+++ b/libs/binder/Parcel.cpp
@@ -206,6 +206,7 @@
             status_t status = writeInt32(1); // non-null
             if (status != OK) return status;
             RpcAddress address = RpcAddress::zero();
+            // TODO(b/167966510): need to undo this if the Parcel is not sent
             status = mSession->state()->onBinderLeaving(mSession, binder, &address);
             if (status != OK) return status;
             status = address.writeToParcel(this);
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 2f378da..2d2eed2 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -187,6 +187,11 @@
     }
 
     mShutdownTrigger->trigger();
+    for (auto& [id, session] : mSessions) {
+        (void)id;
+        session->mShutdownTrigger->trigger();
+    }
+
     while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
         if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) {
             ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, "
@@ -261,7 +266,7 @@
         };
         server->mConnectingThreads.erase(threadId);
 
-        if (!idValid) {
+        if (!idValid || server->mShutdownTrigger->isTriggered()) {
             return;
         }
 
@@ -276,10 +281,14 @@
 
             session = RpcSession::make();
             session->setMaxThreads(server->mMaxThreads);
-            session->setForServer(server,
-                                  sp<RpcServer::EventListener>::fromExisting(
-                                          static_cast<RpcServer::EventListener*>(server.get())),
-                                  server->mSessionIdCounter, server->mShutdownTrigger);
+            if (!session->setForServer(server,
+                                       sp<RpcServer::EventListener>::fromExisting(
+                                               static_cast<RpcServer::EventListener*>(
+                                                       server.get())),
+                                       server->mSessionIdCounter)) {
+                ALOGE("Failed to attach server to session");
+                return;
+            }
 
             server->mSessions[server->mSessionIdCounter] = session;
         } else {
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index c563377..de9aa22 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -113,17 +113,21 @@
     return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
 }
 
-bool RpcSession::shutdown() {
+bool RpcSession::shutdownAndWait(bool wait) {
     std::unique_lock<std::mutex> _l(mMutex);
-    LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session");
     LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed");
-    LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
 
     mShutdownTrigger->trigger();
-    mShutdownListener->waitForShutdown(_l);
-    mState->terminate();
 
-    LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
+    if (wait) {
+        LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
+        mShutdownListener->waitForShutdown(_l);
+        LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
+    }
+
+    _l.unlock();
+    mState->clear();
+
     return true;
 }
 
@@ -139,12 +143,15 @@
 status_t RpcSession::sendDecStrong(const RpcAddress& address) {
     ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
                                    ConnectionUse::CLIENT_REFCOUNT);
-    return state()->sendDecStrong(connection.fd(), address);
+    return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address);
 }
 
 std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
     auto ret = std::make_unique<RpcSession::FdTrigger>();
-    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr;
+    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
+        ALOGE("Could not create pipe %s", strerror(errno));
+        return nullptr;
+    }
     return ret;
 }
 
@@ -152,6 +159,10 @@
     mWrite.reset();
 }
 
+bool RpcSession::FdTrigger::isTriggered() {
+    return mWrite == -1;
+}
+
 status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
     while (true) {
         pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0},
@@ -285,7 +296,7 @@
 
     if (!setupOneSocketConnection(addr, RPC_SESSION_ID_NEW, false /*reverse*/)) return false;
 
-    // TODO(b/185167543): we should add additional sessions dynamically
+    // TODO(b/189955605): we should add additional sessions dynamically
     // instead of all at once.
     // TODO(b/186470974): first risk of blocking
     size_t numThreadsAvailable;
@@ -303,11 +314,11 @@
 
     // we've already setup one client
     for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
-        // TODO(b/185167543): shutdown existing connections?
+        // TODO(b/189955605): shutdown existing connections?
         if (!setupOneSocketConnection(addr, mId.value(), false /*reverse*/)) return false;
     }
 
-    // TODO(b/185167543): we should add additional sessions dynamically
+    // TODO(b/189955605): we should add additional sessions dynamically
     // instead of all at once - the other side should be responsible for setting
     // up additional connections. We need to create at least one (unless 0 are
     // requested to be set) in order to allow the other side to reliably make
@@ -408,20 +419,21 @@
     return true;
 }
 
-void RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
-                              int32_t sessionId,
-                              const std::shared_ptr<FdTrigger>& shutdownTrigger) {
+bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
+                              int32_t sessionId) {
     LOG_ALWAYS_FATAL_IF(mForServer != nullptr);
     LOG_ALWAYS_FATAL_IF(server == nullptr);
     LOG_ALWAYS_FATAL_IF(mEventListener != nullptr);
     LOG_ALWAYS_FATAL_IF(eventListener == nullptr);
     LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
-    LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr);
+
+    mShutdownTrigger = FdTrigger::make();
+    if (mShutdownTrigger == nullptr) return false;
 
     mId = sessionId;
     mForServer = server;
     mEventListener = eventListener;
-    mShutdownTrigger = shutdownTrigger;
+    return true;
 }
 
 sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 93f1529..6899981 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -137,9 +137,39 @@
     dumpLocked();
 }
 
-void RpcState::terminate() {
+void RpcState::clear() {
     std::unique_lock<std::mutex> _l(mNodeMutex);
-    terminate(_l);
+
+    if (mTerminated) {
+        LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(),
+                            "New state should be impossible after terminating!");
+        return;
+    }
+
+    if (SHOULD_LOG_RPC_DETAIL) {
+        ALOGE("RpcState::clear()");
+        dumpLocked();
+    }
+
+    // if the destructor of a binder object makes another RPC call, then calling
+    // decStrong could deadlock. So, we must hold onto these binders until
+    // mNodeMutex is no longer taken.
+    std::vector<sp<IBinder>> tempHoldBinder;
+
+    mTerminated = true;
+    for (auto& [address, node] : mNodeForAddress) {
+        sp<IBinder> binder = node.binder.promote();
+        LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());
+
+        if (node.sentRef != nullptr) {
+            tempHoldBinder.push_back(node.sentRef);
+        }
+    }
+
+    mNodeForAddress.clear();
+
+    _l.unlock();
+    tempHoldBinder.clear(); // explicit
 }
 
 void RpcState::dumpLocked() {
@@ -170,32 +200,6 @@
     ALOGE("END DUMP OF RpcState");
 }
 
-void RpcState::terminate(std::unique_lock<std::mutex>& lock) {
-    if (SHOULD_LOG_RPC_DETAIL) {
-        ALOGE("RpcState::terminate()");
-        dumpLocked();
-    }
-
-    // if the destructor of a binder object makes another RPC call, then calling
-    // decStrong could deadlock. So, we must hold onto these binders until
-    // mNodeMutex is no longer taken.
-    std::vector<sp<IBinder>> tempHoldBinder;
-
-    mTerminated = true;
-    for (auto& [address, node] : mNodeForAddress) {
-        sp<IBinder> binder = node.binder.promote();
-        LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());
-
-        if (node.sentRef != nullptr) {
-            tempHoldBinder.push_back(node.sentRef);
-        }
-    }
-
-    mNodeForAddress.clear();
-
-    lock.unlock();
-    tempHoldBinder.clear(); // explicit
-}
 
 RpcState::CommandData::CommandData(size_t size) : mSize(size) {
     // The maximum size for regular binder is 1MB for all concurrent
@@ -218,13 +222,13 @@
     mData.reset(new (std::nothrow) uint8_t[size]);
 }
 
-status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data,
-                           size_t size) {
+status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
+                           const char* what, const void* data, size_t size) {
     LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
 
     if (size > std::numeric_limits<ssize_t>::max()) {
         ALOGE("Cannot send %s at size %zu (too big)", what, size);
-        terminate();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
 
@@ -235,7 +239,7 @@
         LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
                        size, fd.get(), strerror(savedErrno));
 
-        terminate();
+        (void)session->shutdownAndWait(false);
         return -savedErrno;
     }
 
@@ -246,7 +250,7 @@
                           const char* what, void* data, size_t size) {
     if (size > std::numeric_limits<ssize_t>::max()) {
         ALOGE("Cannot rec %s at size %zu (too big)", what, size);
-        terminate();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
 
@@ -358,7 +362,11 @@
 
         if (flags & IBinder::FLAG_ONEWAY) {
             asyncNumber = it->second.asyncNumber;
-            if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
+            if (!nodeProgressAsyncNumber(&it->second)) {
+                _l.unlock();
+                (void)session->shutdownAndWait(false);
+                return DEAD_OBJECT;
+            }
         }
     }
 
@@ -390,8 +398,10 @@
            data.dataSize());
 
     if (status_t status =
-                rpcSend(fd, "transaction", transactionData.data(), transactionData.size());
+                rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size());
         status != OK)
+        // TODO(b/167966510): need to undo onBinderLeaving - we know the
+        // refcount isn't successfully transferred.
         return status;
 
     if (flags & IBinder::FLAG_ONEWAY) {
@@ -442,7 +452,7 @@
     if (command.bodySize < sizeof(RpcWireReply)) {
         ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!",
               sizeof(RpcWireReply), command.bodySize);
-        terminate();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
     RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data.data());
@@ -457,7 +467,8 @@
     return OK;
 }
 
-status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& addr) {
+status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
+                                 const RpcAddress& addr) {
     {
         std::lock_guard<std::mutex> _l(mNodeMutex);
         if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
@@ -476,10 +487,10 @@
             .command = RPC_COMMAND_DEC_STRONG,
             .bodySize = sizeof(RpcWireAddress),
     };
-    if (status_t status = rpcSend(fd, "dec ref header", &cmd, sizeof(cmd)); status != OK)
+    if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK)
         return status;
-    if (status_t status =
-                rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress));
+    if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(),
+                                  sizeof(RpcWireAddress));
         status != OK)
         return status;
     return OK;
@@ -538,7 +549,7 @@
     // also can't consider it a fatal error because this would allow any client
     // to kill us, so ending the session for misbehaving client.
     ALOGE("Unknown RPC command %d - terminating session", command.command);
-    terminate();
+    (void)session->shutdownAndWait(false);
     return DEAD_OBJECT;
 }
 status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -571,7 +582,7 @@
     if (transactionData.size() < sizeof(RpcWireTransaction)) {
         ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
               sizeof(RpcWireTransaction), transactionData.size());
-        terminate();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
     RpcWireTransaction* transaction = reinterpret_cast<RpcWireTransaction*>(transactionData.data());
@@ -600,15 +611,15 @@
             // session.
             ALOGE("While transacting, binder has been deleted at address %s. Terminating!",
                   addr.toString().c_str());
-            terminate();
+            (void)session->shutdownAndWait(false);
             replyStatus = BAD_VALUE;
         } else if (target->localBinder() == nullptr) {
             ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!",
                   addr.toString().c_str());
-            terminate();
+            (void)session->shutdownAndWait(false);
             replyStatus = BAD_VALUE;
         } else if (transaction->flags & IBinder::FLAG_ONEWAY) {
-            std::lock_guard<std::mutex> _l(mNodeMutex);
+            std::unique_lock<std::mutex> _l(mNodeMutex);
             auto it = mNodeForAddress.find(addr);
             if (it->second.binder.promote() != target) {
                 ALOGE("Binder became invalid during transaction. Bad client? %s",
@@ -617,16 +628,33 @@
             } else if (transaction->asyncNumber != it->second.asyncNumber) {
                 // we need to process some other asynchronous transaction
                 // first
-                // TODO(b/183140903): limit enqueues/detect overfill for bad client
-                // TODO(b/183140903): detect when an object is deleted when it still has
-                //        pending async transactions
                 it->second.asyncTodo.push(BinderNode::AsyncTodo{
                         .ref = target,
                         .data = std::move(transactionData),
                         .asyncNumber = transaction->asyncNumber,
                 });
-                LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s", transaction->asyncNumber,
-                               addr.toString().c_str());
+
+                size_t numPending = it->second.asyncTodo.size();
+                LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s (%zu pending)",
+                               transaction->asyncNumber, addr.toString().c_str(), numPending);
+
+                constexpr size_t kArbitraryOnewayCallTerminateLevel = 10000;
+                constexpr size_t kArbitraryOnewayCallWarnLevel = 1000;
+                constexpr size_t kArbitraryOnewayCallWarnPer = 1000;
+
+                if (numPending >= kArbitraryOnewayCallWarnLevel) {
+                    if (numPending >= kArbitraryOnewayCallTerminateLevel) {
+                        ALOGE("WARNING: %zu pending oneway transactions. Terminating!", numPending);
+                        _l.unlock();
+                        (void)session->shutdownAndWait(false);
+                        return FAILED_TRANSACTION;
+                    }
+
+                    if (numPending % kArbitraryOnewayCallWarnPer == 0) {
+                        ALOGW("Warning: many oneway transactions built up on %p (%zu)",
+                              target.get(), numPending);
+                    }
+                }
                 return OK;
             }
         }
@@ -707,7 +735,11 @@
             // last refcount dropped after this transaction happened
             if (it == mNodeForAddress.end()) return OK;
 
-            if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
+            if (!nodeProgressAsyncNumber(&it->second)) {
+                _l.unlock();
+                (void)session->shutdownAndWait(false);
+                return DEAD_OBJECT;
+            }
 
             if (it->second.asyncTodo.size() == 0) return OK;
             if (it->second.asyncTodo.top().asyncNumber == it->second.asyncNumber) {
@@ -753,7 +785,7 @@
     memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(),
            reply.dataSize());
 
-    return rpcSend(fd, "reply", replyData.data(), replyData.size());
+    return rpcSend(fd, session, "reply", replyData.data(), replyData.size());
 }
 
 status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -772,7 +804,7 @@
     if (command.bodySize < sizeof(RpcWireAddress)) {
         ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireAddress. Terminating!",
               sizeof(RpcWireAddress), command.bodySize);
-        terminate();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
     RpcWireAddress* address = reinterpret_cast<RpcWireAddress*>(commandData.data());
@@ -790,7 +822,8 @@
     if (target == nullptr) {
         ALOGE("While requesting dec strong, binder has been deleted at address %s. Terminating!",
               addr.toString().c_str());
-        terminate();
+        _l.unlock();
+        (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
 
@@ -826,12 +859,11 @@
     return ref;
 }
 
-bool RpcState::nodeProgressAsyncNumber(BinderNode* node, std::unique_lock<std::mutex>& lock) {
+bool RpcState::nodeProgressAsyncNumber(BinderNode* node) {
     // 2**64 =~ 10**19 =~ 1000 transactions per second for 585 million years to
     // a single binder
     if (node->asyncNumber >= std::numeric_limits<decltype(node->asyncNumber)>::max()) {
         ALOGE("Out of async transaction IDs. Terminating");
-        terminate(lock);
         return false;
     }
     node->asyncNumber++;
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index 81ff458..13c3115 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -65,7 +65,8 @@
                                            uint32_t code, const Parcel& data,
                                            const sp<RpcSession>& session, Parcel* reply,
                                            uint32_t flags);
-    [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address);
+    [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
+                                         const RpcAddress& address);
 
     enum class CommandType {
         ANY,
@@ -110,11 +111,10 @@
      * WARNING: RpcState is responsible for calling this when the session is
      * no longer recoverable.
      */
-    void terminate();
+    void clear();
 
 private:
     void dumpLocked();
-    void terminate(std::unique_lock<std::mutex>& lock);
 
     // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
     // large allocations to avoid being requested from allocating too much data.
@@ -130,8 +130,8 @@
         size_t mSize;
     };
 
-    [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const char* what, const void* data,
-                                   size_t size);
+    [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
+                                   const char* what, const void* data, size_t size);
     [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
                                   const char* what, void* data, size_t size);
 
@@ -204,9 +204,8 @@
     // dropped after any locks are removed.
     sp<IBinder> tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& it);
     // true - success
-    // false - state terminated, lock gone, halt
-    [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node,
-                                               std::unique_lock<std::mutex>& lock);
+    // false - session shutdown, halt
+    [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node);
 
     std::mutex mNodeMutex;
     bool mTerminated = false;
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index 98db221..4e6934b 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -97,7 +97,7 @@
      *
      * If this is not specified, this will be a single-threaded server.
      *
-     * TODO(b/185167543): these are currently created per client, but these
+     * TODO(b/167966510): these are currently created per client, but these
      * should be shared.
      */
     void setMaxThreads(size_t threads);
@@ -173,7 +173,7 @@
     wp<IBinder> mRootObjectWeak;
     std::map<int32_t, sp<RpcSession>> mSessions;
     int32_t mSessionIdCounter = 0;
-    std::shared_ptr<RpcSession::FdTrigger> mShutdownTrigger;
+    std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
     std::condition_variable mShutdownCv;
 };
 
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index a6bc1a9..4650cf2 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -54,7 +54,7 @@
      * If this is called, 'shutdown' on this session must also be called.
      * Otherwise, a threadpool will leak.
      *
-     * TODO(b/185167543): start these dynamically
+     * TODO(b/189955605): start these dynamically
      */
     void setMaxThreads(size_t threads);
     size_t getMaxThreads();
@@ -97,14 +97,20 @@
     status_t getRemoteMaxThreads(size_t* maxThreads);
 
     /**
-     * Shuts down the service. Only works for client sessions (server-side
-     * sessions currently only support shutting down the entire server).
+     * Shuts down the service.
+     *
+     * For client sessions, wait can be true or false. For server sessions,
+     * waiting is not currently supported (will abort).
      *
      * Warning: this is currently not active/nice (the server isn't told we're
      * shutting down). Being nicer to the server could potentially make it
      * reclaim resources faster.
+     *
+     * If this is called w/ 'wait' true, then this will wait for shutdown to
+     * complete before returning. This will hang if it is called from the
+     * session threadpool (when processing received calls).
      */
-    [[nodiscard]] bool shutdown();
+    [[nodiscard]] bool shutdownAndWait(bool wait);
 
     [[nodiscard]] status_t transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
                                     Parcel* reply, uint32_t flags);
@@ -129,16 +135,17 @@
         static std::unique_ptr<FdTrigger> make();
 
         /**
-         * poll() on this fd for POLLHUP to get notification when trigger is called
-         */
-        base::borrowed_fd readFd() const { return mRead; }
-
-        /**
          * Close the write end of the pipe so that the read end receives POLLHUP.
+         * Not threadsafe.
          */
         void trigger();
 
         /**
+         * Whether this has been triggered.
+         */
+        bool isTriggered();
+
+        /**
          * Poll for a read event.
          *
          * Return:
@@ -197,9 +204,9 @@
     [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId,
                                                 bool server);
     [[nodiscard]] bool addClientConnection(base::unique_fd fd);
-    void setForServer(const wp<RpcServer>& server,
-                      const wp<RpcSession::EventListener>& eventListener, int32_t sessionId,
-                      const std::shared_ptr<FdTrigger>& shutdownTrigger);
+    [[nodiscard]] bool setForServer(const wp<RpcServer>& server,
+                                    const wp<RpcSession::EventListener>& eventListener,
+                                    int32_t sessionId);
     sp<RpcConnection> assignServerToThisThread(base::unique_fd fd);
     [[nodiscard]] bool removeServerConnection(const sp<RpcConnection>& connection);
 
@@ -252,7 +259,7 @@
     // TODO(b/183988761): this shouldn't be guessable
     std::optional<int32_t> mId;
 
-    std::shared_ptr<FdTrigger> mShutdownTrigger;
+    std::unique_ptr<FdTrigger> mShutdownTrigger;
 
     std::unique_ptr<RpcState> mState;
 
@@ -266,9 +273,6 @@
     size_t mClientConnectionsOffset = 0;
     std::vector<sp<RpcConnection>> mClientConnections;
     std::vector<sp<RpcConnection>> mServerConnections;
-
-    // TODO(b/185167543): allow sharing between different sessions in a
-    // process? (or combine with mServerConnections)
     std::map<std::thread::id, std::thread> mThreads;
 };
 
diff --git a/libs/binder/rust/src/lib.rs b/libs/binder/rust/src/lib.rs
index 2694cba..7c0584b 100644
--- a/libs/binder/rust/src/lib.rs
+++ b/libs/binder/rust/src/lib.rs
@@ -115,14 +115,14 @@
 pub use native::add_service;
 pub use native::Binder;
 pub use parcel::Parcel;
-pub use proxy::{get_interface, get_service};
+pub use proxy::{get_interface, get_service, wait_for_interface, wait_for_service};
 pub use proxy::{AssociateClass, DeathRecipient, Proxy, SpIBinder, WpIBinder};
 pub use state::{ProcessState, ThreadState};
 
 /// The public API usable outside AIDL-generated interface crates.
 pub mod public_api {
     pub use super::parcel::ParcelFileDescriptor;
-    pub use super::{add_service, get_interface};
+    pub use super::{add_service, get_interface, wait_for_interface};
     pub use super::{
         BinderFeatures, DeathRecipient, ExceptionCode, IBinder, Interface, ProcessState, SpIBinder,
         Status, StatusCode, Strong, ThreadState, Weak, WpIBinder,
diff --git a/libs/binder/rust/src/proxy.rs b/libs/binder/rust/src/proxy.rs
index 52036f5..4a6d118 100644
--- a/libs/binder/rust/src/proxy.rs
+++ b/libs/binder/rust/src/proxy.rs
@@ -653,6 +653,18 @@
     }
 }
 
+/// Retrieve an existing service, or start it if it is configured as a dynamic
+/// service and isn't yet started.
+pub fn wait_for_service(name: &str) -> Option<SpIBinder> {
+    let name = CString::new(name).ok()?;
+    unsafe {
+        // Safety: `AServiceManager_waitforService` returns either a null
+        // pointer or a valid pointer to an owned `AIBinder`. Either of these
+        // values is safe to pass to `SpIBinder::from_raw`.
+        SpIBinder::from_raw(sys::AServiceManager_waitForService(name.as_ptr()))
+    }
+}
+
 /// Retrieve an existing service for a particular interface, blocking for a few
 /// seconds if it doesn't yet exist.
 pub fn get_interface<T: FromIBinder + ?Sized>(name: &str) -> Result<Strong<T>> {
@@ -663,6 +675,16 @@
     }
 }
 
+/// Retrieve an existing service for a particular interface, or start it if it
+/// is configured as a dynamic service and isn't yet started.
+pub fn wait_for_interface<T: FromIBinder + ?Sized>(name: &str) -> Result<Strong<T>> {
+    let service = wait_for_service(name);
+    match service {
+        Some(service) => FromIBinder::try_from(service),
+        None => Err(StatusCode::NAME_NOT_FOUND),
+    }
+}
+
 /// # Safety
 ///
 /// `SpIBinder` guarantees that `binder` always contains a valid pointer to an
diff --git a/libs/binder/rust/tests/integration.rs b/libs/binder/rust/tests/integration.rs
index 0332007..10b77f4 100644
--- a/libs/binder/rust/tests/integration.rs
+++ b/libs/binder/rust/tests/integration.rs
@@ -274,6 +274,20 @@
     }
 
     #[test]
+    fn check_wait_for_service() {
+        let mut sm =
+            binder::wait_for_service("manager").expect("Did not get manager binder service");
+        assert!(sm.is_binder_alive());
+        assert!(sm.ping_binder().is_ok());
+
+        // The service manager service isn't an ITest, so this must fail.
+        assert_eq!(
+            binder::wait_for_interface::<dyn ITest>("manager").err(),
+            Some(StatusCode::BAD_TYPE)
+        );
+    }
+
+    #[test]
     fn trivial_client() {
         let service_name = "trivial_client_test";
         let _process = ScopedServiceProcess::new(service_name);
@@ -283,6 +297,15 @@
     }
 
     #[test]
+    fn wait_for_trivial_client() {
+        let service_name = "wait_for_trivial_client_test";
+        let _process = ScopedServiceProcess::new(service_name);
+        let test_client: Strong<dyn ITest> =
+            binder::wait_for_interface(service_name).expect("Did not get manager binder service");
+        assert_eq!(test_client.test().unwrap(), "wait_for_trivial_client_test");
+    }
+
+    #[test]
     fn get_selinux_context() {
         let service_name = "get_selinux_context";
         let _process = ScopedServiceProcess::new(service_name);
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 0a970fb..82f8a3e 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -941,6 +941,40 @@
     for (auto& t : threads) t.join();
 }
 
+TEST_P(BinderRpc, OnewayCallExhaustion) {
+    constexpr size_t kNumClients = 2;
+    constexpr size_t kTooLongMs = 1000;
+
+    auto proc = createRpcTestSocketServerProcess(kNumClients /*threads*/, 2 /*sessions*/);
+
+    // Build up oneway calls on the second session to make sure it terminates
+    // and shuts down. The first session should be unaffected (proc destructor
+    // checks the first session).
+    auto iface = interface_cast<IBinderRpcTest>(proc.proc.sessions.at(1).root);
+
+    std::vector<std::thread> threads;
+    for (size_t i = 0; i < kNumClients; i++) {
+        // one of these threads will get stuck queueing a transaction once the
+        // socket fills up, the other will be able to fill up transactions on
+        // this object
+        threads.push_back(std::thread([&] {
+            while (iface->sleepMsAsync(kTooLongMs).isOk()) {
+            }
+        }));
+    }
+    for (auto& t : threads) t.join();
+
+    Status status = iface->sleepMsAsync(kTooLongMs);
+    EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status;
+
+    // the second session should be shutdown in the other process by the time we
+    // are able to join above (it'll only be hung up once it finishes processing
+    // any pending commands). We need to erase this session from the record
+    // here, so that the destructor for our session won't check that this
+    // session is valid, but we still want it to test the other session.
+    proc.proc.sessions.erase(proc.proc.sessions.begin() + 1);
+}
+
 TEST_P(BinderRpc, Callbacks) {
     const static std::string kTestString = "good afternoon!";
 
@@ -966,7 +1000,7 @@
 
             // since this session has a reverse connection w/ a threadpool, we
             // need to manually shut it down
-            EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdown());
+            EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));
 
             proc.expectAlreadyShutdown = true;
         }