libbinder: RPC disallow nested oneway transactions
Previously, nested transactions were accidentally allowed while
processing oneway transactions. This changes things so that nested
transactions are only explicitly allowed when a synchronous transaction
is being processed (like how kernel binder is).
Future considerations: this CL makes it more explicit that we allow
refcount transactions as part of nested transactions. This is okay
because 'drainCommands' will process these, but there might be some
delay. We could make refcount behavior nicer if we always preferred
using an active threadpool (if one is available) to process them.
Bug: 167966510
Test: binderRpcTest
Change-Id: Iaeb472896654ff4bcd75b20394f8f3230febaabf
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index b2d1a1a..4a6362a 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -541,13 +541,27 @@
(session->mClientConnectionsOffset + 1) % session->mClientConnections.size();
}
- // USE SERVING SOCKET (for nested transaction)
- //
- // asynchronous calls cannot be nested
+ // USE SERVING SOCKET (e.g. nested transaction)
if (use != ConnectionUse::CLIENT_ASYNC) {
+ sp<RpcConnection> exclusiveServer;
// server connections are always assigned to a thread
- findConnection(tid, &exclusive, nullptr /*available*/, session->mServerConnections,
- 0 /* index hint */);
+ findConnection(tid, &exclusiveServer, nullptr /*available*/,
+ session->mServerConnections, 0 /* index hint */);
+
+ // asynchronous calls cannot be nested, we currently allow ref count
+ // calls to be nested (so that you can use this without having extra
+ // threads). Note 'drainCommands' is used so that these ref counts can't
+ // build up.
+ if (exclusiveServer != nullptr) {
+ if (exclusiveServer->allowNested) {
+ // guaranteed to be processed as nested command
+ exclusive = exclusiveServer;
+ } else if (use == ConnectionUse::CLIENT_REFCOUNT && available == nullptr) {
+ // prefer available socket, but if we don't have one, don't
+ // wait for one
+ exclusive = exclusiveServer;
+ }
+ }
}
// if our thread is already using a connection, prioritize using that
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 5a20156..050f4fb 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -633,6 +633,7 @@
// TODO(b/182939933): heap allocation just for lookup in mNodeForAddress,
// maybe add an RpcAddress 'view' if the type remains 'heavy'
auto addr = RpcAddress::fromRawEmbedded(&transaction->address);
+ bool oneway = transaction->flags & IBinder::FLAG_ONEWAY;
status_t replyStatus = OK;
sp<IBinder> target;
@@ -661,7 +662,7 @@
addr.toString().c_str());
(void)session->shutdownAndWait(false);
replyStatus = BAD_VALUE;
- } else if (transaction->flags & IBinder::FLAG_ONEWAY) {
+ } else if (oneway) {
std::unique_lock<std::mutex> _l(mNodeMutex);
auto it = mNodeForAddress.find(addr);
if (it->second.binder.promote() != target) {
@@ -718,7 +719,12 @@
data.markForRpc(session);
if (target) {
+ bool origAllowNested = connection->allowNested;
+ connection->allowNested = !oneway;
+
replyStatus = target->transact(transaction->code, data, &reply, transaction->flags);
+
+ connection->allowNested = origAllowNested;
} else {
LOG_RPC_DETAIL("Got special transaction %u", transaction->code);
@@ -754,7 +760,7 @@
}
}
- if (transaction->flags & IBinder::FLAG_ONEWAY) {
+ if (oneway) {
if (replyStatus != OK) {
ALOGW("Oneway call failed with error: %d", replyStatus);
}
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 27baa9d..e40154b 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -191,6 +191,8 @@
// whether this or another thread is currently using this fd to make
// or receive transactions.
std::optional<pid_t> exclusiveTid;
+
+ bool allowNested = false;
};
status_t readId();
diff --git a/libs/binder/tests/IBinderRpcTest.aidl b/libs/binder/tests/IBinderRpcTest.aidl
index b0c8b2d..9e10788 100644
--- a/libs/binder/tests/IBinderRpcTest.aidl
+++ b/libs/binder/tests/IBinderRpcTest.aidl
@@ -55,6 +55,7 @@
oneway void sleepMsAsync(int ms);
void doCallback(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value);
+ oneway void doCallbackAsync(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value);
void die(boolean cleanup);
void scheduleShutdown();
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index a79295a..69682b0 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -214,7 +214,8 @@
if (delayed) {
std::thread([=]() {
ALOGE("Executing delayed callback: '%s'", value.c_str());
- (void)doCallback(callback, oneway, false, value);
+ Status status = doCallback(callback, oneway, false, value);
+ ALOGE("Delayed callback status: '%s'", status.toString8().c_str());
}).detach();
return Status::ok();
}
@@ -226,6 +227,11 @@
return callback->sendCallback(value);
}
+ Status doCallbackAsync(const sp<IBinderRpcCallback>& callback, bool oneway, bool delayed,
+ const std::string& value) override {
+ return doCallback(callback, oneway, delayed, value);
+ }
+
Status die(bool cleanup) override {
if (cleanup) {
exit(1);
@@ -978,31 +984,42 @@
TEST_P(BinderRpc, Callbacks) {
const static std::string kTestString = "good afternoon!";
- for (bool oneway : {true, false}) {
- for (bool delayed : {true, false}) {
- auto proc = createRpcTestSocketServerProcess(1, 1, 1);
- auto cb = sp<MyBinderRpcCallback>::make();
+ for (bool callIsOneway : {true, false}) {
+ for (bool callbackIsOneway : {true, false}) {
+ for (bool delayed : {true, false}) {
+ auto proc = createRpcTestSocketServerProcess(1, 1, 1);
+ auto cb = sp<MyBinderRpcCallback>::make();
- EXPECT_OK(proc.rootIface->doCallback(cb, oneway, delayed, kTestString));
+ if (callIsOneway) {
+ EXPECT_OK(proc.rootIface->doCallbackAsync(cb, callbackIsOneway, delayed,
+ kTestString));
+ } else {
+ EXPECT_OK(
+ proc.rootIface->doCallback(cb, callbackIsOneway, delayed, kTestString));
+ }
- using std::literals::chrono_literals::operator""s;
- std::unique_lock<std::mutex> _l(cb->mMutex);
- cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); });
+ using std::literals::chrono_literals::operator""s;
+ std::unique_lock<std::mutex> _l(cb->mMutex);
+ cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); });
- EXPECT_EQ(cb->mValues.size(), 1) << "oneway: " << oneway << "delayed: " << delayed;
- if (cb->mValues.empty()) continue;
- EXPECT_EQ(cb->mValues.at(0), kTestString)
- << "oneway: " << oneway << "delayed: " << delayed;
+ EXPECT_EQ(cb->mValues.size(), 1)
+ << "callIsOneway: " << callIsOneway
+ << " callbackIsOneway: " << callbackIsOneway << " delayed: " << delayed;
+ if (cb->mValues.empty()) continue;
+ EXPECT_EQ(cb->mValues.at(0), kTestString)
+ << "callIsOneway: " << callIsOneway
+ << " callbackIsOneway: " << callbackIsOneway << " delayed: " << delayed;
- // since we are severing the connection, we need to go ahead and
- // tell the server to shutdown and exit so that waitpid won't hang
- EXPECT_OK(proc.rootIface->scheduleShutdown());
+ // since we are severing the connection, we need to go ahead and
+ // tell the server to shutdown and exit so that waitpid won't hang
+ EXPECT_OK(proc.rootIface->scheduleShutdown());
- // since this session has a reverse connection w/ a threadpool, we
- // need to manually shut it down
- EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));
+ // since this session has a reverse connection w/ a threadpool, we
+ // need to manually shut it down
+ EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));
- proc.expectAlreadyShutdown = true;
+ proc.expectAlreadyShutdown = true;
+ }
}
}
}