libbinder: reverse connections
When connecting to an RPC client server, you can request to serve a
threadpool so that you can receive callbacks from it.
Future considerations:
- starting threads dynamically (likely very, very soon after this CL)
- combining threadpools (as needed)
Bug: 185167543
Test: binderRpcTest
Change-Id: I992959e963ebc1b3da2f89fdb6c1ec625cb51af4
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 77cae83..b146bb0 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -239,15 +239,16 @@
// It must be set before this thread is started
LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr);
- int32_t id;
- status_t status =
- server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &id, sizeof(id));
+ RpcConnectionHeader header;
+ status_t status = server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &header,
+ sizeof(header));
bool idValid = status == OK;
if (!idValid) {
ALOGE("Failed to read ID for client connecting to RPC server: %s",
statusToString(status).c_str());
// still need to cleanup before we can return
}
+ bool reverse = header.options & RPC_CONNECTION_OPTION_REVERSE;
std::thread thisThread;
sp<RpcSession> session;
@@ -269,24 +270,37 @@
return;
}
- if (id == RPC_SESSION_ID_NEW) {
+ if (header.sessionId == RPC_SESSION_ID_NEW) {
+ if (reverse) {
+ ALOGE("Cannot create a new session with a reverse connection, would leak");
+ return;
+ }
+
LOG_ALWAYS_FATAL_IF(server->mSessionIdCounter >= INT32_MAX, "Out of session IDs");
server->mSessionIdCounter++;
session = RpcSession::make();
- session->setForServer(wp<RpcServer>(server), server->mSessionIdCounter,
- server->mShutdownTrigger);
+ session->setForServer(server,
+ sp<RpcServer::EventListener>::fromExisting(
+ static_cast<RpcServer::EventListener*>(server.get())),
+ server->mSessionIdCounter, server->mShutdownTrigger);
server->mSessions[server->mSessionIdCounter] = session;
} else {
- auto it = server->mSessions.find(id);
+ auto it = server->mSessions.find(header.sessionId);
if (it == server->mSessions.end()) {
- ALOGE("Cannot add thread, no record of session with ID %d", id);
+ ALOGE("Cannot add thread, no record of session with ID %d", header.sessionId);
return;
}
session = it->second;
}
+ if (reverse) {
+ LOG_ALWAYS_FATAL_IF(!session->addClientConnection(std::move(clientFd)),
+ "server state must already be initialized");
+ return;
+ }
+
detachGuard.Disable();
session->preJoin(std::move(thisThread));
}
@@ -294,7 +308,7 @@
// avoid strong cycle
server = nullptr;
- session->join(std::move(clientFd));
+ RpcSession::join(std::move(session), std::move(clientFd));
}
bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
@@ -341,8 +355,7 @@
(void)mSessions.erase(it);
}
-void RpcServer::onSessionServerThreadEnded(const sp<RpcSession>& session) {
- (void)session;
+void RpcServer::onSessionServerThreadEnded() {
mShutdownCv.notify_all();
}