libbinder: no temp rpc sess leak w spurious wakeup

RpcSession incoming threads continued to hold an RpcSession
instance after they set the shutdown condition (removing the
associated 1:1 thread connection object from RpcSession's
mConnections object). Since the shutdown condition must
include cleaning up RpcSession, we cannot delay or remove
clearing the associated connections. Instead, a new explicit
shutdown condition is added, which does not restrict the
manipulation of the session object.

Interestingly, this issue was blocking several changes on-and-off
for a few weeks. Though, test hub doesn't show it failing at all. I
couldn't reproduce it locally even with 5 days (24hr/day) and
one of these failing tests running in a tight loop, with builds
running in the background (devinmoore@ reported a local failure
with a build running). I submitted several changes to debug this,
and one of them (that dumped backtraces), should have caught it,
except the race is just too rare.

When we have this situation: a retrospectively benign problem causing
a big failure, the obvious question to ask is, is the test too
brittle? No! If this is the sensitivity at which it finds a bug, we
can hardly imagine an error going unnoticed here. Only if this
situation repeated several times or some of these issues became too
plenty to maintain would I think that we needed to "tone down the
tests".

Finally, how did this get fixed: dump every incStrong backtrace in
RpcSession and investigate all the code that is responsible for
maintaining those sp<>s. Wheeee :)

Bug: 244325464
Test: binderRpcTest

Change-Id: I76ac8f21900d6ce095a1acfb246ca7acf1591e0b
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index ff50c16..7d6bcfc 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -324,16 +324,18 @@
 }
 
 void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() {
+    mShutdownCount += 1;
     mCv.notify_all();
 }
 
 void RpcSession::WaitForShutdownListener::waitForShutdown(RpcMutexUniqueLock& lock,
                                                           const sp<RpcSession>& session) {
-    while (session->mConnections.mIncoming.size() > 0) {
+    while (mShutdownCount < session->mConnections.mMaxIncoming) {
         if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) {
             ALOGE("Waiting for RpcSession to shut down (1s w/o progress): %zu incoming connections "
-                  "still.",
-                  session->mConnections.mIncoming.size());
+                  "still %zu/%zu fully shutdown.",
+                  session->mConnections.mIncoming.size(), mShutdownCount.load(),
+                  session->mConnections.mMaxIncoming);
         }
     }
 }
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 5e5d4ea..40faf2c 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -240,6 +240,7 @@
 
     private:
         RpcConditionVariable mCv;
+        std::atomic<size_t> mShutdownCount = 0;
     };
     friend WaitForShutdownListener;
 
@@ -380,6 +381,7 @@
         // hint index into clients, ++ when sending an async transaction
         size_t mOutgoingOffset = 0;
         std::vector<sp<RpcConnection>> mOutgoing;
+        // max size of mIncoming. Once any thread starts down, no more can be started.
         size_t mMaxIncoming = 0;
         std::vector<sp<RpcConnection>> mIncoming;
         std::map<RpcMaybeThread::id, RpcMaybeThread> mThreads;