libbinder: reverse connections
When connecting to an RPC client server, you can request to serve a
threadpool so that you can receive callbacks from it.
Future considerations:
- starting threads dynamically (likely very, very soon after this CL)
- combining threadpools (as needed)
Bug: 185167543
Test: binderRpcTest
Change-Id: I992959e963ebc1b3da2f89fdb6c1ec625cb51af4
diff --git a/libs/binder/tests/Android.bp b/libs/binder/tests/Android.bp
index 9cf433d..c7c899f 100644
--- a/libs/binder/tests/Android.bp
+++ b/libs/binder/tests/Android.bp
@@ -118,6 +118,7 @@
host_supported: true,
unstable: true,
srcs: [
+ "IBinderRpcCallback.aidl",
"IBinderRpcSession.aidl",
"IBinderRpcTest.aidl",
],
diff --git a/libs/binder/tests/IBinderRpcCallback.aidl b/libs/binder/tests/IBinderRpcCallback.aidl
new file mode 100644
index 0000000..0336961
--- /dev/null
+++ b/libs/binder/tests/IBinderRpcCallback.aidl
@@ -0,0 +1,20 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+interface IBinderRpcCallback {
+ void sendCallback(@utf8InCpp String str);
+ oneway void sendOnewayCallback(@utf8InCpp String str);
+}
diff --git a/libs/binder/tests/IBinderRpcTest.aidl b/libs/binder/tests/IBinderRpcTest.aidl
index 646bcc6..b0c8b2d 100644
--- a/libs/binder/tests/IBinderRpcTest.aidl
+++ b/libs/binder/tests/IBinderRpcTest.aidl
@@ -54,6 +54,8 @@
void sleepMs(int ms);
oneway void sleepMsAsync(int ms);
+ void doCallback(IBinderRpcCallback callback, boolean isOneway, boolean delayed, @utf8InCpp String value);
+
void die(boolean cleanup);
void scheduleShutdown();
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index efc70e6..80708df 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -14,6 +14,7 @@
* limitations under the License.
*/
+#include <BnBinderRpcCallback.h>
#include <BnBinderRpcSession.h>
#include <BnBinderRpcTest.h>
#include <aidl/IBinderRpcTest.h>
@@ -34,6 +35,7 @@
#include <cstdlib>
#include <iostream>
#include <thread>
+#include <type_traits>
#include <sys/prctl.h>
#include <unistd.h>
@@ -89,6 +91,22 @@
};
std::atomic<int32_t> MyBinderRpcSession::gNum;
+class MyBinderRpcCallback : public BnBinderRpcCallback {
+ Status sendCallback(const std::string& value) {
+ std::unique_lock _l(mMutex);
+ mValues.push_back(value);
+ _l.unlock();
+ mCv.notify_one();
+ return Status::ok();
+ }
+ Status sendOnewayCallback(const std::string& value) { return sendCallback(value); }
+
+public:
+ std::mutex mMutex;
+ std::condition_variable mCv;
+ std::vector<std::string> mValues;
+};
+
class MyBinderRpcTest : public BnBinderRpcTest {
public:
wp<RpcServer> server;
@@ -187,6 +205,27 @@
return sleepMs(ms);
}
+ Status doCallback(const sp<IBinderRpcCallback>& callback, bool oneway, bool delayed,
+ const std::string& value) override {
+ if (callback == nullptr) {
+ return Status::fromExceptionCode(Status::EX_NULL_POINTER);
+ }
+
+ if (delayed) {
+ std::thread([=]() {
+ ALOGE("Executing delayed callback: '%s'", value.c_str());
+ (void)doCallback(callback, oneway, false, value);
+ }).detach();
+ return Status::ok();
+ }
+
+ if (oneway) {
+ return callback->sendOnewayCallback(value);
+ }
+
+ return callback->sendCallback(value);
+ }
+
Status die(bool cleanup) override {
if (cleanup) {
exit(1);
@@ -308,6 +347,9 @@
BinderRpcTestProcessSession(BinderRpcTestProcessSession&&) = default;
~BinderRpcTestProcessSession() {
+ EXPECT_NE(nullptr, rootIface);
+ if (rootIface == nullptr) return;
+
if (!expectAlreadyShutdown) {
std::vector<int32_t> remoteCounts;
// calling over any sessions counts across all sessions
@@ -348,7 +390,7 @@
// This creates a new process serving an interface on a certain number of
// threads.
ProcessSession createRpcTestSocketServerProcess(
- size_t numThreads, size_t numSessions,
+ size_t numThreads, size_t numSessions, size_t numReverseConnections,
const std::function<void(const sp<RpcServer>&)>& configure) {
CHECK_GE(numSessions, 1) << "Must have at least one session to a server";
@@ -404,6 +446,8 @@
for (size_t i = 0; i < numSessions; i++) {
sp<RpcSession> session = RpcSession::make();
+ session->setMaxReverseConnections(numReverseConnections);
+
switch (socketType) {
case SocketType::UNIX:
if (session->setupUnixDomainClient(addr.c_str())) goto success;
@@ -425,9 +469,11 @@
}
BinderRpcTestProcessSession createRpcTestSocketServerProcess(size_t numThreads,
- size_t numSessions = 1) {
+ size_t numSessions = 1,
+ size_t numReverseConnections = 0) {
BinderRpcTestProcessSession ret{
.proc = createRpcTestSocketServerProcess(numThreads, numSessions,
+ numReverseConnections,
[&](const sp<RpcServer>& server) {
sp<MyBinderRpcTest> service =
new MyBinderRpcTest;
@@ -895,6 +941,38 @@
for (auto& t : threads) t.join();
}
+TEST_P(BinderRpc, Callbacks) {
+ const static std::string kTestString = "good afternoon!";
+
+ for (bool oneway : {true, false}) {
+ for (bool delayed : {true, false}) {
+ auto proc = createRpcTestSocketServerProcess(1, 1, 1);
+ auto cb = sp<MyBinderRpcCallback>::make();
+
+ EXPECT_OK(proc.rootIface->doCallback(cb, oneway, delayed, kTestString));
+
+ using std::literals::chrono_literals::operator""s;
+ std::unique_lock<std::mutex> _l(cb->mMutex);
+ cb->mCv.wait_for(_l, 1s, [&] { return !cb->mValues.empty(); });
+
+ EXPECT_EQ(cb->mValues.size(), 1) << "oneway: " << oneway << "delayed: " << delayed;
+ if (cb->mValues.empty()) continue;
+ EXPECT_EQ(cb->mValues.at(0), kTestString)
+ << "oneway: " << oneway << "delayed: " << delayed;
+
+ // since we are severing the connection, we need to go ahead and
+ // tell the server to shutdown and exit so that waitpid won't hang
+ EXPECT_OK(proc.rootIface->scheduleShutdown());
+
+ // since this session has a reverse connection w/ a threadpool, we
+ // need to manually shut it down
+ EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdown());
+
+ proc.expectAlreadyShutdown = true;
+ }
+ }
+}
+
TEST_P(BinderRpc, Die) {
for (bool doDeathCleanup : {true, false}) {
auto proc = createRpcTestSocketServerProcess(1);