Merge "libbinder: RPC prevent oneway cmd buildup"
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index a27dff5..c563377 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -240,7 +240,8 @@
     sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client));
 
     while (true) {
-        status_t error = session->state()->getAndExecuteCommand(connection->fd, session);
+        status_t error = session->state()->getAndExecuteCommand(connection->fd, session,
+                                                                RpcState::CommandType::ANY);
 
         if (error != OK) {
             LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 4af0852..1f97293 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -386,7 +386,11 @@
 
     if (flags & IBinder::FLAG_ONEWAY) {
         LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", fd.get());
-        return OK; // do not wait for result
+
+        // Do not wait on result.
+        // However, too many oneway calls may cause refcounts to build up and fill up the socket,
+        // so process those.
+        return drainCommands(fd, session, CommandType::CONTROL_ONLY);
     }
 
     LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction.");
@@ -413,7 +417,8 @@
 
         if (command.command == RPC_COMMAND_REPLY) break;
 
-        if (status_t status = processServerCommand(fd, session, command); status != OK)
+        if (status_t status = processServerCommand(fd, session, command, CommandType::ANY);
+            status != OK)
             return status;
     }
 
@@ -471,7 +476,8 @@
     return OK;
 }
 
-status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session) {
+status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session,
+                                        CommandType type) {
     LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get());
 
     RpcWireHeader command;
@@ -479,11 +485,21 @@
         status != OK)
         return status;
 
-    return processServerCommand(fd, session, command);
+    return processServerCommand(fd, session, command, type);
+}
+
+status_t RpcState::drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session,
+                                 CommandType type) {
+    uint8_t buf;
+    while (0 < TEMP_FAILURE_RETRY(recv(fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
+        status_t status = getAndExecuteCommand(fd, session, type);
+        if (status != OK) return status;
+    }
+    return OK;
 }
 
 status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcSession>& session,
-                                        const RpcWireHeader& command) {
+                                        const RpcWireHeader& command, CommandType type) {
     IPCThreadState* kernelBinderState = IPCThreadState::selfOrNull();
     IPCThreadState::SpGuard spGuard{
             .address = __builtin_frame_address(0),
@@ -501,6 +517,7 @@
 
     switch (command.command) {
         case RPC_COMMAND_TRANSACT:
+            if (type != CommandType::ANY) return BAD_TYPE;
             return processTransact(fd, session, command);
         case RPC_COMMAND_DEC_STRONG:
             return processDecStrong(fd, session, command);
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index d63c1c3..d448938 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -66,8 +66,15 @@
                                            const sp<RpcSession>& session, Parcel* reply,
                                            uint32_t flags);
     [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address);
+
+    enum class CommandType {
+        ANY,
+        CONTROL_ONLY,
+    };
     [[nodiscard]] status_t getAndExecuteCommand(const base::unique_fd& fd,
-                                                const sp<RpcSession>& session);
+                                                const sp<RpcSession>& session, CommandType type);
+    [[nodiscard]] status_t drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session,
+                                         CommandType type);
 
     /**
      * Called by Parcel for outgoing binders. This implies one refcount of
@@ -129,7 +136,7 @@
                                         Parcel* reply);
     [[nodiscard]] status_t processServerCommand(const base::unique_fd& fd,
                                                 const sp<RpcSession>& session,
-                                                const RpcWireHeader& command);
+                                                const RpcWireHeader& command, CommandType type);
     [[nodiscard]] status_t processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
                                            const RpcWireHeader& command);
     [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd,
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index e48b7c5..0a970fb 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -876,7 +876,7 @@
 TEST_P(BinderRpc, OnewayStressTest) {
     constexpr size_t kNumClientThreads = 10;
     constexpr size_t kNumServerThreads = 10;
-    constexpr size_t kNumCalls = 50;
+    constexpr size_t kNumCalls = 500;
 
     auto proc = createRpcTestSocketServerProcess(kNumServerThreads);