libbinder: RpcServer - shutdown connection threads
Re-use FdTrigger to shutdown connection threads.
Coming next - shutting down sessions as well!
Bug: 185167543
Test: binderRpcTest, binder_rpc_fuzzer
Change-Id: I238f1e2a5f69fdec09ac8b3afc484ab8639852fa
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 50b0465..540c346 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -184,10 +184,18 @@
bool RpcServer::shutdown() {
std::unique_lock<std::mutex> _l(mLock);
- if (mShutdownTrigger == nullptr) return false;
+ if (mShutdownTrigger == nullptr) {
+ LOG_RPC_DETAIL("Cannot shutdown. No shutdown trigger installed.");
+ return false;
+ }
mShutdownTrigger->trigger();
- while (mJoinThreadRunning) mShutdownCv.wait(_l);
+ while (mJoinThreadRunning || !mConnectingThreads.empty()) {
+ ALOGI("Waiting for RpcServer to shut down. Join thread running: %d, Connecting threads: "
+ "%zu",
+ mJoinThreadRunning, mConnectingThreads.size());
+ mShutdownCv.wait(_l);
+ }
// At this point, we know join() is about to exit, but the thread that calls
// join() may not have exited yet.
@@ -222,17 +230,21 @@
void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) {
// TODO(b/183988761): cannot trust this simple ID
LOG_ALWAYS_FATAL_IF(!server->mAgreedExperimental, "no!");
- bool idValid = true;
+
+ // mShutdownTrigger can only be cleared once connection threads have joined.
+ // It must be set before this thread is started
+ LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr);
+
int32_t id;
- if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) {
- ALOGE("Could not read ID from fd %d", clientFd.get());
- idValid = false;
+ bool idValid = server->mShutdownTrigger->interruptableRecv(clientFd.get(), &id, sizeof(id));
+ if (!idValid) {
+ ALOGE("Failed to read ID for client connecting to RPC server.");
}
std::thread thisThread;
sp<RpcSession> session;
{
- std::lock_guard<std::mutex> _l(server->mLock);
+ std::unique_lock<std::mutex> _l(server->mLock);
auto threadId = server->mConnectingThreads.find(std::this_thread::get_id());
LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(),
@@ -241,6 +253,16 @@
ScopeGuard detachGuard = [&]() { thisThread.detach(); };
server->mConnectingThreads.erase(threadId);
+ // TODO(b/185167543): we currently can't disable this because we don't
+ // shutdown sessions as well, only the server itself. So, we need to
+ // keep this separate from the detachGuard, since we temporarily want to
+ // give a notification even when we pass ownership of the thread to
+ // a session.
+ ScopeGuard threadLifetimeGuard = [&]() {
+ _l.unlock();
+ server->mShutdownCv.notify_all();
+ };
+
if (!idValid) {
return;
}
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index ea82f36..771d738 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -144,6 +144,22 @@
}
}
+bool RpcSession::FdTrigger::interruptableRecv(base::borrowed_fd fd, void* data, size_t size) {
+ uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
+ uint8_t* end = buffer + size;
+
+ while (triggerablePollRead(fd)) {
+ ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
+ if (readSize < 0) {
+ ALOGE("Failed to read %s", strerror(errno));
+ return false;
+ }
+ buffer += readSize;
+ if (buffer == end) return true;
+ }
+ return false;
+}
+
status_t RpcSession::readId() {
{
std::lock_guard<std::mutex> _l(mMutex);
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index bcef8dc..3d22002 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -134,6 +134,15 @@
*/
bool triggerablePollRead(base::borrowed_fd fd);
+ /**
+ * Read, but allow the read to be interrupted by this trigger.
+ *
+ * Return:
+ * true - read succeeded at 'size'
+ * false - interrupted (failure or trigger)
+ */
+ bool interruptableRecv(base::borrowed_fd fd, void* data, size_t size);
+
private:
base::unique_fd mWrite;
base::unique_fd mRead;
diff --git a/libs/binder/tests/rpc_fuzzer/main.cpp b/libs/binder/tests/rpc_fuzzer/main.cpp
index 84f5974..e6fd392 100644
--- a/libs/binder/tests/rpc_fuzzer/main.cpp
+++ b/libs/binder/tests/rpc_fuzzer/main.cpp
@@ -20,6 +20,7 @@
#include <binder/Parcel.h>
#include <binder/RpcServer.h>
#include <binder/RpcSession.h>
+#include <fuzzer/FuzzedDataProvider.h>
#include <sys/resource.h>
#include <sys/un.h>
@@ -53,6 +54,7 @@
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
if (size > 50000) return 0;
+ FuzzedDataProvider provider(data, size);
unlink(kSock.c_str());
@@ -84,14 +86,21 @@
CHECK(base::WriteFully(clientFd, &id, sizeof(id)));
#endif
- CHECK(base::WriteFully(clientFd, data, size));
+ bool hangupBeforeShutdown = provider.ConsumeBool();
- clientFd.reset();
+ std::vector<uint8_t> writeData = provider.ConsumeRemainingBytes<uint8_t>();
+ CHECK(base::WriteFully(clientFd, writeData.data(), writeData.size()));
+
+ if (hangupBeforeShutdown) {
+ clientFd.reset();
+ }
// TODO(185167543): currently this is okay because we only shutdown the one
// thread, but once we can shutdown other sessions, we'll need to change
// this behavior in order to make sure all of the input is actually read.
while (!server->shutdown()) usleep(100);
+
+ clientFd.reset();
serverThread.join();
// TODO(b/185167543): better way to force a server to shutdown