binder: RpcSession limit outgoing threads

Similar to the number of incoming threads,
the number of outgoing threads can be limited via
RpcSession::setMaxOutgoingThreads(). If set, only
min(maxOutgoingThreads, remoteMaxThreads) outgoing threads
are instantiated.

Test: binderRpcTest
Bug: 194225767

Change-Id: I15686bae4317d0ced5af999f3a3d21f9a03037e1
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index 486b67b..9eef3e8 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -90,6 +90,20 @@
     return mMaxIncomingThreads;
 }
 
+void RpcSession::setMaxOutgoingThreads(size_t threads) {
+    std::lock_guard<std::mutex> _l(mMutex);
+    LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(),
+                        "Must set max outgoing threads before setting up connections, but has %zu "
+                        "client(s) and %zu server(s)",
+                        mConnections.mOutgoing.size(), mConnections.mIncoming.size());
+    mMaxOutgoingThreads = threads;
+}
+
+size_t RpcSession::getMaxOutgoingThreads() {
+    std::lock_guard<std::mutex> _l(mMutex);
+    return mMaxOutgoingThreads;
+}
+
 bool RpcSession::setProtocolVersion(uint32_t version) {
     if (version >= RPC_WIRE_PROTOCOL_VERSION_NEXT &&
         version != RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL) {
@@ -473,6 +487,12 @@
         return status;
     }
 
+    size_t outgoingThreads = std::min(numThreadsAvailable, mMaxOutgoingThreads);
+    ALOGI_IF(outgoingThreads != numThreadsAvailable,
+             "Server hints client to start %zu outgoing threads, but client will only start %zu "
+             "because it is preconfigured to start at most %zu outgoing threads.",
+             numThreadsAvailable, outgoingThreads, mMaxOutgoingThreads);
+
     // TODO(b/189955605): we should add additional sessions dynamically
     // instead of all at once - the other side should be responsible for setting
     // up additional connections. We need to create at least one (unless 0 are
@@ -480,7 +500,10 @@
     // any requests at all.
 
     // we've already setup one client
-    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
+    LOG_RPC_DETAIL("RpcSession::setupClient() instantiating %zu outgoing (server max: %zu) and %zu "
+                   "incoming threads",
+                   outgoingThreads, numThreadsAvailable, mMaxIncomingThreads);
+    for (size_t i = 0; i + 1 < outgoingThreads; i++) {
         if (status_t status = connectAndInit(mId, false /*incoming*/); status != OK) return status;
     }