libbinder: RPC prevent oneway cmd buildup
Previously, binderRpcTest OnewayStressTest was having a flaky hang @ 100
calls because the sockets were filling up with decref counts (these are
currently a bit long because they contain RpcAddress - these may need to
be smaller - see b/182939933). I temporarily reduced this to 50 calls to
avoid the flake, but this is the real fix (which also increases the
number of calls to 500).
Now, when we send a oneway call, we also drain any refcounts ("CONTROL")
commands. We disallow processing of any transactions in this case (this
would indicate a misbehaving client).
Fixes: 189272263
Test: binderRpcTest
Change-Id: If92af65f5e5a920e39046af6b10f675b5c26cd6e
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index a3efa56..a2fe3b9 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -237,7 +237,8 @@
sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client));
while (true) {
- status_t error = session->state()->getAndExecuteCommand(connection->fd, session);
+ status_t error = session->state()->getAndExecuteCommand(connection->fd, session,
+ RpcState::CommandType::ANY);
if (error != OK) {
LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index e18179e..f1cbe0d 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -386,7 +386,11 @@
if (flags & IBinder::FLAG_ONEWAY) {
LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", fd.get());
- return OK; // do not wait for result
+
+ // Do not wait on result.
+ // However, too many oneway calls may cause refcounts to build up and fill up the socket,
+ // so process those.
+ return drainCommands(fd, session, CommandType::CONTROL_ONLY);
}
LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction.");
@@ -413,7 +417,8 @@
if (command.command == RPC_COMMAND_REPLY) break;
- if (status_t status = processServerCommand(fd, session, command); status != OK)
+ if (status_t status = processServerCommand(fd, session, command, CommandType::ANY);
+ status != OK)
return status;
}
@@ -471,7 +476,8 @@
return OK;
}
-status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session) {
+status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session,
+ CommandType type) {
LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get());
RpcWireHeader command;
@@ -479,11 +485,21 @@
status != OK)
return status;
- return processServerCommand(fd, session, command);
+ return processServerCommand(fd, session, command, type);
+}
+
+status_t RpcState::drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session,
+ CommandType type) {
+ uint8_t buf;
+ while (0 < TEMP_FAILURE_RETRY(recv(fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
+ status_t status = getAndExecuteCommand(fd, session, type);
+ if (status != OK) return status;
+ }
+ return OK;
}
status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcSession>& session,
- const RpcWireHeader& command) {
+ const RpcWireHeader& command, CommandType type) {
IPCThreadState* kernelBinderState = IPCThreadState::selfOrNull();
IPCThreadState::SpGuard spGuard{
.address = __builtin_frame_address(0),
@@ -501,6 +517,7 @@
switch (command.command) {
case RPC_COMMAND_TRANSACT:
+ if (type != CommandType::ANY) return BAD_TYPE;
return processTransact(fd, session, command);
case RPC_COMMAND_DEC_STRONG:
return processDecStrong(fd, session, command);
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index d63c1c3..d448938 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -66,8 +66,15 @@
const sp<RpcSession>& session, Parcel* reply,
uint32_t flags);
[[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address);
+
+ enum class CommandType {
+ ANY,
+ CONTROL_ONLY,
+ };
[[nodiscard]] status_t getAndExecuteCommand(const base::unique_fd& fd,
- const sp<RpcSession>& session);
+ const sp<RpcSession>& session, CommandType type);
+ [[nodiscard]] status_t drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session,
+ CommandType type);
/**
* Called by Parcel for outgoing binders. This implies one refcount of
@@ -129,7 +136,7 @@
Parcel* reply);
[[nodiscard]] status_t processServerCommand(const base::unique_fd& fd,
const sp<RpcSession>& session,
- const RpcWireHeader& command);
+ const RpcWireHeader& command, CommandType type);
[[nodiscard]] status_t processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
const RpcWireHeader& command);
[[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd,
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 80708df..601ac6a 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -876,7 +876,7 @@
TEST_P(BinderRpc, OnewayStressTest) {
constexpr size_t kNumClientThreads = 10;
constexpr size_t kNumServerThreads = 10;
- constexpr size_t kNumCalls = 50;
+ constexpr size_t kNumCalls = 500;
auto proc = createRpcTestSocketServerProcess(kNumServerThreads);