Merge changes I5f58750a,Ifb74b312,I4eda926a am: 3ca69777c5 am: fd06bd3d5f

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1719912

Change-Id: I735e14e3534ad00b8c4f6d3b69f3348b8b58c1d3
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index d05b848..156a834 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -379,7 +379,7 @@
 
         // CHECK FOR DEDICATED CLIENT SOCKET
         //
-        // A server/looper should always use a dedicated session if available
+        // A server/looper should always use a dedicated connection if available
         findConnection(tid, &exclusive, &available, mSession->mClientConnections,
                        mSession->mClientConnectionsOffset);
 
@@ -407,7 +407,7 @@
                            0 /* index hint */);
         }
 
-        // if our thread is already using a session, prioritize using that
+        // if our thread is already using a connection, prioritize using that
         if (exclusive != nullptr) {
             mConnection = exclusive;
             mReentrant = true;
@@ -420,11 +420,10 @@
 
         // in regular binder, this would usually be a deadlock :)
         LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0,
-                            "Not a client of any session. You must create a session to an "
-                            "RPC server to make any non-nested (e.g. oneway or on another thread) "
-                            "calls.");
+                            "Session has no client connections. This is required for an RPC server "
+                            "to make any non-nested (e.g. oneway or on another thread) calls.");
 
-        LOG_RPC_DETAIL("No available session (have %zu clients and %zu servers). Waiting...",
+        LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...",
                        mSession->mClientConnections.size(), mSession->mServerConnections.size());
         mSession->mAvailableConnectionCv.wait(_l);
     }
@@ -443,13 +442,13 @@
     for (size_t i = 0; i < sockets.size(); i++) {
         sp<RpcConnection>& socket = sockets[(i + socketsIndexHint) % sockets.size()];
 
-        // take first available session (intuition = caching)
+        // take first available connection (intuition = caching)
         if (available && *available == nullptr && socket->exclusiveTid == std::nullopt) {
             *available = socket;
             continue;
         }
 
-        // though, prefer to take session which is already inuse by this thread
+        // though, prefer to take connection which is already inuse by this thread
         // (nested transactions)
         if (exclusive && socket->exclusiveTid == tid) {
             *exclusive = socket;
@@ -459,7 +458,7 @@
 }
 
 RpcSession::ExclusiveConnection::~ExclusiveConnection() {
-    // reentrant use of a session means something less deep in the call stack
+    // reentrant use of a connection means something less deep in the call stack
     // is using this fd, and it retains the right to it. So, we don't give up
     // exclusive ownership, and no thread is freed.
     if (!mReentrant) {
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index a801729..2cad2ae 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -221,8 +221,8 @@
 
     if (sent < 0 || sent != static_cast<ssize_t>(size)) {
         int savedErrno = errno;
-        ALOGE("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent, size,
-              fd.get(), strerror(savedErrno));
+        LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
+                       size, fd.get(), strerror(savedErrno));
 
         terminate();
         return -savedErrno;
@@ -241,10 +241,8 @@
 
     if (status_t status = session->mShutdownTrigger->interruptableReadFully(fd.get(), data, size);
         status != OK) {
-        if (status != -ECANCELED) {
-            ALOGE("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(),
-                  statusToString(status).c_str());
-        }
+        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(),
+                       statusToString(status).c_str());
         return status;
     }
 
diff --git a/libs/binder/tests/rpc_fuzzer/main.cpp b/libs/binder/tests/rpc_fuzzer/main.cpp
index e6fd392..072f8dd 100644
--- a/libs/binder/tests/rpc_fuzzer/main.cpp
+++ b/libs/binder/tests/rpc_fuzzer/main.cpp
@@ -71,43 +71,44 @@
     CHECK_LT(kSock.size(), sizeof(addr.sun_path));
     memcpy(&addr.sun_path, kSock.c_str(), kSock.size());
 
-    base::unique_fd clientFd(TEMP_FAILURE_RETRY(socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)));
-    CHECK_NE(clientFd.get(), -1);
-    CHECK_EQ(0,
-             TEMP_FAILURE_RETRY(
-                     connect(clientFd.get(), reinterpret_cast<sockaddr*>(&addr), sizeof(addr))))
-            << strerror(errno);
-
-    // TODO(b/182938024): fuzz multiple sessions, instead of just one
-
-#if 0
-    // make fuzzer more productive locally by forcing it to create a new session
-    int32_t id = -1;
-    CHECK(base::WriteFully(clientFd, &id, sizeof(id)));
-#endif
+    std::vector<base::unique_fd> connections;
 
     bool hangupBeforeShutdown = provider.ConsumeBool();
 
-    std::vector<uint8_t> writeData = provider.ConsumeRemainingBytes<uint8_t>();
-    CHECK(base::WriteFully(clientFd, writeData.data(), writeData.size()));
+    while (provider.remaining_bytes() > 0) {
+        if (connections.empty() || provider.ConsumeBool()) {
+            base::unique_fd fd(TEMP_FAILURE_RETRY(socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0)));
+            CHECK_NE(fd.get(), -1);
+            CHECK_EQ(0,
+                     TEMP_FAILURE_RETRY(
+                             connect(fd.get(), reinterpret_cast<sockaddr*>(&addr), sizeof(addr))))
+                    << strerror(errno);
+            connections.push_back(std::move(fd));
+        } else {
+            size_t idx = provider.ConsumeIntegralInRange<size_t>(0, connections.size() - 1);
+
+            if (provider.ConsumeBool()) {
+                std::vector<uint8_t> writeData = provider.ConsumeBytes<uint8_t>(
+                        provider.ConsumeIntegralInRange<size_t>(0, provider.remaining_bytes()));
+                CHECK(base::WriteFully(connections.at(idx).get(), writeData.data(),
+                                       writeData.size()));
+            } else {
+                connections.erase(connections.begin() + idx); // hang up
+            }
+        }
+    }
 
     if (hangupBeforeShutdown) {
-        clientFd.reset();
+        connections.clear();
+        while (!server->listSessions().empty() && server->numUninitializedSessions()) {
+            // wait for all threads to finish processing existing information
+            usleep(1);
+        }
     }
 
-    // TODO(185167543): currently this is okay because we only shutdown the one
-    // thread, but once we can shutdown other sessions, we'll need to change
-    // this behavior in order to make sure all of the input is actually read.
-    while (!server->shutdown()) usleep(100);
-
-    clientFd.reset();
+    while (!server->shutdown()) usleep(1);
     serverThread.join();
 
-    // TODO(b/185167543): better way to force a server to shutdown
-    while (!server->listSessions().empty() && server->numUninitializedSessions()) {
-        usleep(1);
-    }
-
     return 0;
 }