Add RpcServer::shutdown.
The function terminates any existing execution of join().
After this CL, join() is only allowed to be called in one thread.
Test: binderLibTest
Change-Id: I5f1abbb39ee42a8f94b7394a702a152701537e7e
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 59659bd..e31aea0 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -16,19 +16,21 @@
#define LOG_TAG "RpcServer"
+#include <poll.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <thread>
#include <vector>
+#include <android-base/macros.h>
#include <android-base/scopeguard.h>
#include <binder/Parcel.h>
#include <binder/RpcServer.h>
#include <log/log.h>
-#include "RpcState.h"
#include "RpcSocketAddress.h"
+#include "RpcState.h"
#include "RpcWireFormat.h"
namespace android {
@@ -99,7 +101,7 @@
void RpcServer::setMaxThreads(size_t threads) {
LOG_ALWAYS_FATAL_IF(threads <= 0, "RpcServer is useless without threads");
- LOG_ALWAYS_FATAL_IF(mStarted, "must be called before started");
+ LOG_ALWAYS_FATAL_IF(mJoinThreadRunning, "Cannot set max threads while running");
mMaxThreads = threads;
}
@@ -126,16 +128,61 @@
return ret;
}
+std::unique_ptr<RpcServer::FdTrigger> RpcServer::FdTrigger::make() {
+ auto ret = std::make_unique<RpcServer::FdTrigger>();
+ if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr;
+ return ret;
+}
+
+void RpcServer::FdTrigger::trigger() {
+ mWrite.reset();
+}
+
void RpcServer::join() {
- while (true) {
- (void)acceptOne();
+ LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
+
+ {
+ std::lock_guard<std::mutex> _l(mLock);
+ LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join.");
+ LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined");
+ mJoinThreadRunning = true;
+ mShutdownTrigger = FdTrigger::make();
+ LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");
}
+
+ while (true) {
+ pollfd pfd[]{{.fd = mServer.get(), .events = POLLIN, .revents = 0},
+ {.fd = mShutdownTrigger->readFd().get(), .events = POLLHUP, .revents = 0}};
+ int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
+ if (ret < 0) {
+ ALOGE("Could not poll socket: %s", strerror(errno));
+ continue;
+ }
+ if (ret == 0) {
+ continue;
+ }
+ if (pfd[1].revents & POLLHUP) {
+ LOG_RPC_DETAIL("join() exiting because shutdown requested.");
+ break;
+ }
+
+ (void)acceptOneNoCheck();
+ }
+
+ {
+ std::lock_guard<std::mutex> _l(mLock);
+ mJoinThreadRunning = false;
+ }
+ mShutdownCv.notify_all();
}
bool RpcServer::acceptOne() {
LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
- LOG_ALWAYS_FATAL_IF(!hasServer(), "RpcServer must be setup to join.");
+ LOG_ALWAYS_FATAL_IF(!hasServer(), "RpcServer must be setup to acceptOne.");
+ return acceptOneNoCheck();
+}
+bool RpcServer::acceptOneNoCheck() {
unique_fd clientFd(
TEMP_FAILURE_RETRY(accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC)));
@@ -156,6 +203,18 @@
return true;
}
+bool RpcServer::shutdown() {
+ LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
+ std::unique_lock<std::mutex> _l(mLock);
+ if (mShutdownTrigger == nullptr) return false;
+
+ mShutdownTrigger->trigger();
+ while (mJoinThreadRunning) mShutdownCv.wait(_l);
+
+ mShutdownTrigger = nullptr;
+ return true;
+}
+
std::vector<sp<RpcSession>> RpcServer::listSessions() {
std::lock_guard<std::mutex> _l(mLock);
std::vector<sp<RpcSession>> sessions;