binder: Refactor: move FdTrigger to its own file / class.
Also move interruptable*Fully functions to RpcTransport so that we no
longer need pending() and pollSocket().
This also allows us to hide send() / recv(); callers should use
interruptableWriteFully / interruptableReadFully instead, because
those repsect the shutdown trigger.
- Fix one place to use interruptableWriteFully() instead of send() when
sending header.
interruptable*Fully are marked as virtual functions because TLS will
need to poll with events dynamically adjusted. See follow-up CLs for
TLS implementation.
Test: TH
Bug: 190868302
Change-Id: I131eed3a637b3a30280b320966e466bbfac0fc45
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index fe12ed4..c756f2e 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -35,6 +35,7 @@
#include <jni.h>
#include <utils/String8.h>
+#include "FdTrigger.h"
#include "RpcSocketAddress.h"
#include "RpcState.h"
#include "RpcWireFormat.h"
@@ -218,91 +219,6 @@
return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address);
}
-std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
- auto ret = std::make_unique<RpcSession::FdTrigger>();
- if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
- ALOGE("Could not create pipe %s", strerror(errno));
- return nullptr;
- }
- return ret;
-}
-
-void RpcSession::FdTrigger::trigger() {
- mWrite.reset();
-}
-
-bool RpcSession::FdTrigger::isTriggered() {
- return mWrite == -1;
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) {
- return triggerablePoll(rpcTransport->pollSocket(), event);
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
- while (true) {
- pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
- {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
- int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
- if (ret < 0) {
- return -errno;
- }
- if (ret == 0) {
- continue;
- }
- if (pfd[1].revents & POLLHUP) {
- return -ECANCELED;
- }
- return pfd[0].revents & event ? OK : DEAD_OBJECT;
- }
-}
-
-status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport,
- const void* data, size_t size) {
- const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
- const uint8_t* end = buffer + size;
-
- MAYBE_WAIT_IN_FLAKE_MODE;
-
- status_t status;
- while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) {
- auto writeSize = rpcTransport->send(buffer, end - buffer);
- if (!writeSize.ok()) {
- LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
- return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
- }
-
- if (*writeSize == 0) return DEAD_OBJECT;
-
- buffer += *writeSize;
- if (buffer == end) return OK;
- }
- return status;
-}
-
-status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data,
- size_t size) {
- uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
- uint8_t* end = buffer + size;
-
- MAYBE_WAIT_IN_FLAKE_MODE;
-
- status_t status;
- while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) {
- auto readSize = rpcTransport->recv(buffer, end - buffer);
- if (!readSize.ok()) {
- LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
- return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
- }
-
- if (*readSize == 0) return DEAD_OBJECT; // EOF
-
- buffer += *readSize;
- if (buffer == end) return OK;
- }
- return status;
-}
-
status_t RpcSession::readId() {
{
std::lock_guard<std::mutex> _l(mMutex);
@@ -584,6 +500,7 @@
status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId,
bool incoming) {
+ LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr);
auto ctx = mRpcTransportCtxFactory->newClientCtx();
if (ctx == nullptr) {
ALOGE("Unable to create client RpcTransportCtx with %s sockets",
@@ -606,16 +523,12 @@
if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
- auto sentHeader = server->send(&header, sizeof(header));
- if (!sentHeader.ok()) {
+ auto sendHeaderStatus =
+ server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header));
+ if (sendHeaderStatus != OK) {
ALOGE("Could not write connection header to socket: %s",
- sentHeader.error().message().c_str());
- return -sentHeader.error().code();
- }
- if (*sentHeader != sizeof(header)) {
- ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd",
- *sentHeader, sizeof(header));
- return UNKNOWN_ERROR;
+ statusToString(sendHeaderStatus).c_str());
+ return sendHeaderStatus;
}
LOG_RPC_DETAIL("Socket at client: header sent");