libbinder: RPC know when connections setup
Previously, there was a race where:
a. client creates connection to server
b. client sends request for reverse connection to server (but this
may still be traveling on the wire)
c. client sends transaction to server
d. server tries to make a callback
e. server fails to make callback because no reverse connection is setup
Now, when a new connection is setup, a header on this connection is
setup. So, we can wait on this header to be received in (b).
Note: currently, (e) results in an abort, this is tracked in b/167966510
with a TODO in the ExclusiveConnection code. This would make a less
obvious flake (or perhaps the problem would be ignored), but this race
still needs to be fixed for well-behaved clients to be able to function
reliably.
Fixes: 190639665
Test: binderRpcTest (callback test 10,000s of times)
Change-Id: I13bc912692d63ea73d46c5441fa7d51121df2f58
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 2d2eed2..60be406 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -307,13 +307,15 @@
}
detachGuard.Disable();
- session->preJoin(std::move(thisThread));
+ session->preJoinThreadOwnership(std::move(thisThread));
}
+ auto setupResult = session->preJoinSetup(std::move(clientFd));
+
// avoid strong cycle
server = nullptr;
- RpcSession::join(std::move(session), std::move(clientFd));
+ RpcSession::join(std::move(session), std::move(setupResult));
}
bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index de9aa22..2a230d2 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -236,7 +236,7 @@
}
}
-void RpcSession::preJoin(std::thread thread) {
+void RpcSession::preJoinThreadOwnership(std::thread thread) {
LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");
{
@@ -245,20 +245,36 @@
}
}
-void RpcSession::join(sp<RpcSession>&& session, unique_fd client) {
+RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
// must be registered to allow arbitrary client code executing commands to
// be able to do nested calls (we can't only read from it)
- sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client));
+ sp<RpcConnection> connection = assignServerToThisThread(std::move(fd));
- while (true) {
- status_t error = session->state()->getAndExecuteCommand(connection->fd, session,
- RpcState::CommandType::ANY);
+ status_t status =
+ mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
- if (error != OK) {
- LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
- statusToString(error).c_str());
- break;
+ return PreJoinSetupResult{
+ .connection = std::move(connection),
+ .status = status,
+ };
+}
+
+void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult) {
+ sp<RpcConnection>& connection = setupResult.connection;
+
+ if (setupResult.status == OK) {
+ while (true) {
+ status_t status = session->state()->getAndExecuteCommand(connection->fd, session,
+ RpcState::CommandType::ANY);
+ if (status != OK) {
+ LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
+ statusToString(status).c_str());
+ break;
+ }
}
+ } else {
+ ALOGE("Connection failed to init, closing with status %s",
+ statusToString(setupResult.status).c_str());
}
LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection),
@@ -381,14 +397,17 @@
unique_fd fd = std::move(serverFd);
// NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
sp<RpcSession> session = thiz;
- session->preJoin(std::move(thread));
- ownershipTransferred = true;
- joinCv.notify_one();
+ session->preJoinThreadOwnership(std::move(thread));
+ // only continue once we have a response or the connection fails
+ auto setupResult = session->preJoinSetup(std::move(fd));
+
+ ownershipTransferred = true;
threadLock.unlock();
+ joinCv.notify_one();
// do not use & vars below
- RpcSession::join(std::move(session), std::move(fd));
+ RpcSession::join(std::move(session), std::move(setupResult));
});
joinCv.wait(lock, [&] { return ownershipTransferred; });
LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
@@ -403,20 +422,32 @@
}
bool RpcSession::addClientConnection(unique_fd fd) {
- std::lock_guard<std::mutex> _l(mMutex);
+ sp<RpcConnection> connection = sp<RpcConnection>::make();
+ {
+ std::lock_guard<std::mutex> _l(mMutex);
- // first client connection added, but setForServer not called, so
- // initializaing for a client.
- if (mShutdownTrigger == nullptr) {
- mShutdownTrigger = FdTrigger::make();
- mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
- if (mShutdownTrigger == nullptr) return false;
+ // first client connection added, but setForServer not called, so
+ // initializaing for a client.
+ if (mShutdownTrigger == nullptr) {
+ mShutdownTrigger = FdTrigger::make();
+ mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
+ if (mShutdownTrigger == nullptr) return false;
+ }
+
+ connection->fd = std::move(fd);
+ connection->exclusiveTid = gettid();
+ mClientConnections.push_back(connection);
}
- sp<RpcConnection> session = sp<RpcConnection>::make();
- session->fd = std::move(fd);
- mClientConnections.push_back(session);
- return true;
+ status_t status =
+ mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
+
+ {
+ std::lock_guard<std::mutex> _l(mMutex);
+ connection->exclusiveTid = std::nullopt;
+ }
+
+ return status == OK;
}
bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
@@ -519,7 +550,9 @@
// in regular binder, this would usually be a deadlock :)
LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0,
"Session has no client connections. This is required for an RPC server "
- "to make any non-nested (e.g. oneway or on another thread) calls.");
+ "to make any non-nested (e.g. oneway or on another thread) calls. "
+ "Use: %d. Server connections: %zu",
+ static_cast<int>(use), mSession->mServerConnections.size());
LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...",
mSession->mClientConnections.size(), mSession->mServerConnections.size());
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 62eb58a..53eba5a 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -265,6 +265,27 @@
return OK;
}
+status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
+ RpcClientConnectionInit init{
+ .msg = RPC_CONNECTION_INIT_OKAY,
+ };
+ return rpcSend(fd, session, "connection init", &init, sizeof(init));
+}
+
+status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
+ RpcClientConnectionInit init;
+ if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK)
+ return status;
+
+ static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY));
+ if (0 != strncmp(init.msg, RPC_CONNECTION_INIT_OKAY, sizeof(init.msg))) {
+ ALOGE("Connection init message unrecognized %.*s", static_cast<int>(sizeof(init.msg)),
+ init.msg);
+ return BAD_VALUE;
+ }
+ return OK;
+}
+
sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) {
Parcel data;
data.markForRpc(session);
diff --git a/libs/binder/RpcState.h b/libs/binder/RpcState.h
index db142a1..5bfef69 100644
--- a/libs/binder/RpcState.h
+++ b/libs/binder/RpcState.h
@@ -51,6 +51,9 @@
RpcState();
~RpcState();
+ status_t sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
+ status_t readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
+
// TODO(b/182940634): combine some special transactions into one "getServerInfo" call?
sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session);
status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session,
diff --git a/libs/binder/RpcWireFormat.h b/libs/binder/RpcWireFormat.h
index 649c1ee..b5e5bc1 100644
--- a/libs/binder/RpcWireFormat.h
+++ b/libs/binder/RpcWireFormat.h
@@ -26,12 +26,28 @@
RPC_CONNECTION_OPTION_REVERSE = 0x1,
};
+/**
+ * This is sent to an RpcServer in order to request a new connection is created,
+ * either as part of a new session or an existing session
+ */
struct RpcConnectionHeader {
int32_t sessionId;
uint8_t options;
uint8_t reserved[3];
};
+#define RPC_CONNECTION_INIT_OKAY "cci"
+
+/**
+ * Whenever a client connection is setup, this is sent as the initial
+ * transaction. The main use of this is in order to control the timing for when
+ * a reverse connection is setup.
+ */
+struct RpcClientConnectionInit {
+ char msg[4];
+ uint8_t reserved[4];
+};
+
enum : uint32_t {
/**
* follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 4650cf2..6ad15f2 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -185,13 +185,6 @@
bool mShutdown = false;
};
- status_t readId();
-
- // transfer ownership of thread
- void preJoin(std::thread thread);
- // join on thread passed to preJoin
- static void join(sp<RpcSession>&& session, base::unique_fd client);
-
struct RpcConnection : public RefBase {
base::unique_fd fd;
@@ -200,6 +193,27 @@
std::optional<pid_t> exclusiveTid;
};
+ status_t readId();
+
+ // A thread joining a server must always call these functions in order, and
+ // cleanup is only programmed once into join. These are in separate
+ // functions in order to allow for different locks to be taken during
+ // different parts of setup.
+ //
+ // transfer ownership of thread (usually done while a lock is taken on the
+ // structure which originally owns the thread)
+ void preJoinThreadOwnership(std::thread thread);
+ // pass FD to thread and read initial connection information
+ struct PreJoinSetupResult {
+ // Server connection object associated with this
+ sp<RpcConnection> connection;
+ // Status of setup
+ status_t status;
+ };
+ PreJoinSetupResult preJoinSetup(base::unique_fd fd);
+ // join on thread passed to preJoinThreadOwnership
+ static void join(sp<RpcSession>&& session, PreJoinSetupResult&& result);
+
[[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address);
[[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address, int32_t sessionId,
bool server);