binder: Refactor: move FdTrigger to its own file / class.
Also move interruptable*Fully functions to RpcTransport so that we no
longer need pending() and pollSocket().
This also allows us to hide send() / recv(); callers should use
interruptableWriteFully / interruptableReadFully instead, because
those repsect the shutdown trigger.
- Fix one place to use interruptableWriteFully() instead of send() when
sending header.
interruptable*Fully are marked as virtual functions because TLS will
need to poll with events dynamically adjusted. See follow-up CLs for
TLS implementation.
Test: TH
Bug: 190868302
Change-Id: I131eed3a637b3a30280b320966e466bbfac0fc45
diff --git a/libs/binder/Android.bp b/libs/binder/Android.bp
index f34672c..34121d2 100644
--- a/libs/binder/Android.bp
+++ b/libs/binder/Android.bp
@@ -104,6 +104,7 @@
"BpBinder.cpp",
"BufferedTextOutput.cpp",
"Debug.cpp",
+ "FdTrigger.cpp",
"IInterface.cpp",
"IMemory.cpp",
"IPCThreadState.cpp",
diff --git a/libs/binder/FdTrigger.cpp b/libs/binder/FdTrigger.cpp
new file mode 100644
index 0000000..e38ac63
--- /dev/null
+++ b/libs/binder/FdTrigger.cpp
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "FdTrigger"
+#include <log/log.h>
+
+#include <poll.h>
+
+#include <android-base/macros.h>
+
+#include "FdTrigger.h"
+namespace android {
+
+std::unique_ptr<FdTrigger> FdTrigger::make() {
+ auto ret = std::make_unique<FdTrigger>();
+ if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
+ ALOGE("Could not create pipe %s", strerror(errno));
+ return nullptr;
+ }
+ return ret;
+}
+
+void FdTrigger::trigger() {
+ mWrite.reset();
+}
+
+bool FdTrigger::isTriggered() {
+ return mWrite == -1;
+}
+
+status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
+ while (true) {
+ pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
+ {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
+ int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
+ if (ret < 0) {
+ return -errno;
+ }
+ if (ret == 0) {
+ continue;
+ }
+ if (pfd[1].revents & POLLHUP) {
+ return -ECANCELED;
+ }
+ return pfd[0].revents & event ? OK : DEAD_OBJECT;
+ }
+}
+
+} // namespace android
diff --git a/libs/binder/FdTrigger.h b/libs/binder/FdTrigger.h
new file mode 100644
index 0000000..984e685
--- /dev/null
+++ b/libs/binder/FdTrigger.h
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include <android-base/unique_fd.h>
+#include <utils/Errors.h>
+
+namespace android {
+
+/** This is not a pipe. */
+class FdTrigger {
+public:
+ /** Returns nullptr for error case */
+ static std::unique_ptr<FdTrigger> make();
+
+ /**
+ * Close the write end of the pipe so that the read end receives POLLHUP.
+ * Not threadsafe.
+ */
+ void trigger();
+
+ /**
+ * Whether this has been triggered.
+ */
+ bool isTriggered();
+
+ /**
+ * Poll for a read event.
+ *
+ * event - for pollfd
+ *
+ * Return:
+ * true - time to read!
+ * false - trigger happened
+ */
+ status_t triggerablePoll(base::borrowed_fd fd, int16_t event);
+
+private:
+ base::unique_fd mWrite;
+ base::unique_fd mRead;
+};
+} // namespace android
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 4fa99c0..66483ed 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -29,6 +29,7 @@
#include <binder/RpcTransportRaw.h>
#include <log/log.h>
+#include "FdTrigger.h"
#include "RpcSocketAddress.h"
#include "RpcState.h"
#include "RpcWireFormat.h"
@@ -156,7 +157,7 @@
LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join.");
LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined");
mJoinThreadRunning = true;
- mShutdownTrigger = RpcSession::FdTrigger::make();
+ mShutdownTrigger = FdTrigger::make();
LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");
mCtx = mRpcTransportCtxFactory->newServerCtx();
@@ -270,8 +271,8 @@
RpcConnectionHeader header;
if (status == OK) {
- status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header,
- sizeof(header));
+ status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header,
+ sizeof(header));
if (status != OK) {
ALOGE("Failed to read ID for client connecting to RPC server: %s",
statusToString(status).c_str());
@@ -296,8 +297,8 @@
.version = protocolVersion,
};
- status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response,
- sizeof(response));
+ status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response,
+ sizeof(response));
if (status != OK) {
ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
// still need to cleanup before we can return
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index fe12ed4..c756f2e 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -35,6 +35,7 @@
#include <jni.h>
#include <utils/String8.h>
+#include "FdTrigger.h"
#include "RpcSocketAddress.h"
#include "RpcState.h"
#include "RpcWireFormat.h"
@@ -218,91 +219,6 @@
return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address);
}
-std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
- auto ret = std::make_unique<RpcSession::FdTrigger>();
- if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
- ALOGE("Could not create pipe %s", strerror(errno));
- return nullptr;
- }
- return ret;
-}
-
-void RpcSession::FdTrigger::trigger() {
- mWrite.reset();
-}
-
-bool RpcSession::FdTrigger::isTriggered() {
- return mWrite == -1;
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) {
- return triggerablePoll(rpcTransport->pollSocket(), event);
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
- while (true) {
- pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
- {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
- int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
- if (ret < 0) {
- return -errno;
- }
- if (ret == 0) {
- continue;
- }
- if (pfd[1].revents & POLLHUP) {
- return -ECANCELED;
- }
- return pfd[0].revents & event ? OK : DEAD_OBJECT;
- }
-}
-
-status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport,
- const void* data, size_t size) {
- const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
- const uint8_t* end = buffer + size;
-
- MAYBE_WAIT_IN_FLAKE_MODE;
-
- status_t status;
- while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) {
- auto writeSize = rpcTransport->send(buffer, end - buffer);
- if (!writeSize.ok()) {
- LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
- return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
- }
-
- if (*writeSize == 0) return DEAD_OBJECT;
-
- buffer += *writeSize;
- if (buffer == end) return OK;
- }
- return status;
-}
-
-status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data,
- size_t size) {
- uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
- uint8_t* end = buffer + size;
-
- MAYBE_WAIT_IN_FLAKE_MODE;
-
- status_t status;
- while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) {
- auto readSize = rpcTransport->recv(buffer, end - buffer);
- if (!readSize.ok()) {
- LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
- return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
- }
-
- if (*readSize == 0) return DEAD_OBJECT; // EOF
-
- buffer += *readSize;
- if (buffer == end) return OK;
- }
- return status;
-}
-
status_t RpcSession::readId() {
{
std::lock_guard<std::mutex> _l(mMutex);
@@ -584,6 +500,7 @@
status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId,
bool incoming) {
+ LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr);
auto ctx = mRpcTransportCtxFactory->newClientCtx();
if (ctx == nullptr) {
ALOGE("Unable to create client RpcTransportCtx with %s sockets",
@@ -606,16 +523,12 @@
if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
- auto sentHeader = server->send(&header, sizeof(header));
- if (!sentHeader.ok()) {
+ auto sendHeaderStatus =
+ server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header));
+ if (sendHeaderStatus != OK) {
ALOGE("Could not write connection header to socket: %s",
- sentHeader.error().message().c_str());
- return -sentHeader.error().code();
- }
- if (*sentHeader != sizeof(header)) {
- ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd",
- *sentHeader, sizeof(header));
- return UNKNOWN_ERROR;
+ statusToString(sendHeaderStatus).c_str());
+ return sendHeaderStatus;
}
LOG_RPC_DETAIL("Socket at client: header sent");
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 23382c3..b58f1b3 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -283,8 +283,8 @@
}
if (status_t status =
- session->mShutdownTrigger->interruptableWriteFully(connection->rpcTransport.get(),
- data, size);
+ connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(),
+ data, size);
status != OK) {
LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
connection->rpcTransport.get(), statusToString(status).c_str());
@@ -305,8 +305,8 @@
}
if (status_t status =
- session->mShutdownTrigger->interruptableReadFully(connection->rpcTransport.get(),
- data, size);
+ connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(),
+ data, size);
status != OK) {
LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
connection->rpcTransport.get(), statusToString(status).c_str());
diff --git a/libs/binder/RpcTransportRaw.cpp b/libs/binder/RpcTransportRaw.cpp
index 2fc1945..995c542 100644
--- a/libs/binder/RpcTransportRaw.cpp
+++ b/libs/binder/RpcTransportRaw.cpp
@@ -17,8 +17,11 @@
#define LOG_TAG "RpcRawTransport"
#include <log/log.h>
+#include <poll.h>
+
#include <binder/RpcTransportRaw.h>
+#include "FdTrigger.h"
#include "RpcState.h"
using android::base::ErrnoError;
@@ -32,14 +35,14 @@
class RpcTransportRaw : public RpcTransport {
public:
explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {}
- Result<size_t> send(const void *buf, size_t size) override {
+ Result<size_t> send(const void* buf, size_t size) {
ssize_t ret = TEMP_FAILURE_RETRY(::send(mSocket.get(), buf, size, MSG_NOSIGNAL));
if (ret < 0) {
return ErrnoError() << "send()";
}
return ret;
}
- Result<size_t> recv(void *buf, size_t size) override {
+ Result<size_t> recv(void* buf, size_t size) {
ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_NOSIGNAL));
if (ret < 0) {
return ErrnoError() << "recv()";
@@ -53,8 +56,50 @@
}
return ret;
}
- bool pending() override { return false; }
- android::base::borrowed_fd pollSocket() const override { return mSocket; }
+
+ status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override {
+ const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
+ const uint8_t* end = buffer + size;
+
+ MAYBE_WAIT_IN_FLAKE_MODE;
+
+ status_t status;
+ while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLOUT)) == OK) {
+ auto writeSize = this->send(buffer, end - buffer);
+ if (!writeSize.ok()) {
+ LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
+ return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
+ }
+
+ if (*writeSize == 0) return DEAD_OBJECT;
+
+ buffer += *writeSize;
+ if (buffer == end) return OK;
+ }
+ return status;
+ }
+
+ status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override {
+ uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
+ uint8_t* end = buffer + size;
+
+ MAYBE_WAIT_IN_FLAKE_MODE;
+
+ status_t status;
+ while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLIN)) == OK) {
+ auto readSize = this->recv(buffer, end - buffer);
+ if (!readSize.ok()) {
+ LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
+ return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
+ }
+
+ if (*readSize == 0) return DEAD_OBJECT; // EOF
+
+ buffer += *readSize;
+ if (buffer == end) return OK;
+ }
+ return status;
+ }
private:
android::base::unique_fd mSocket;
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index f79d85f..bf3e7e0 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -32,6 +32,7 @@
namespace android {
+class FdTrigger;
class RpcSocketAddress;
/**
@@ -190,7 +191,7 @@
sp<IBinder> mRootObject;
wp<IBinder> mRootObjectWeak;
std::map<RpcAddress, sp<RpcSession>> mSessions;
- std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
+ std::unique_ptr<FdTrigger> mShutdownTrigger;
std::condition_variable mShutdownCv;
std::unique_ptr<RpcTransportCtx> mCtx;
};
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 3320623..761c50d 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -39,6 +39,7 @@
class RpcSocketAddress;
class RpcState;
class RpcTransport;
+class FdTrigger;
constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_NEXT = 0;
constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL = 0xF0000000;
@@ -161,50 +162,6 @@
friend RpcState;
explicit RpcSession(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory);
- /** This is not a pipe. */
- struct FdTrigger {
- /** Returns nullptr for error case */
- static std::unique_ptr<FdTrigger> make();
-
- /**
- * Close the write end of the pipe so that the read end receives POLLHUP.
- * Not threadsafe.
- */
- void trigger();
-
- /**
- * Whether this has been triggered.
- */
- bool isTriggered();
-
- /**
- * Poll for a read event.
- *
- * event - for pollfd
- *
- * Return:
- * true - time to read!
- * false - trigger happened
- */
- status_t triggerablePoll(base::borrowed_fd fd, int16_t event);
-
- /**
- * Read (or write), but allow to be interrupted by this trigger.
- *
- * Return:
- * true - succeeded in completely processing 'size'
- * false - interrupted (failure or trigger)
- */
- status_t interruptableReadFully(RpcTransport* rpcTransport, void* data, size_t size);
- status_t interruptableWriteFully(RpcTransport* rpcTransport, const void* data, size_t size);
-
- private:
- status_t triggerablePoll(RpcTransport* rpcTransport, int16_t event);
-
- base::unique_fd mWrite;
- base::unique_fd mRead;
- };
-
class EventListener : public virtual RefBase {
public:
virtual void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0;
diff --git a/libs/binder/include/binder/RpcTransport.h b/libs/binder/include/binder/RpcTransport.h
index 1164600..afca585 100644
--- a/libs/binder/include/binder/RpcTransport.h
+++ b/libs/binder/include/binder/RpcTransport.h
@@ -23,42 +23,30 @@
#include <android-base/result.h>
#include <android-base/unique_fd.h>
+#include <utils/Errors.h>
namespace android {
+class FdTrigger;
+
// Represents a socket connection.
class RpcTransport {
public:
virtual ~RpcTransport() = default;
- // replacement of ::send(). errno may not be set if TLS is enabled.
- virtual android::base::Result<size_t> send(const void *buf, size_t size) = 0;
-
- // replacement of ::recv(). errno may not be set if TLS is enabled.
- virtual android::base::Result<size_t> recv(void *buf, size_t size) = 0;
-
- // replacement of ::recv(MSG_PEEK). errno may not be set if TLS is enabled.
- //
- // Implementation details:
- // - For TLS, this may invoke syscalls and read data from the transport
- // into an internal buffer in userspace. After that, pending() == true.
- // - For raw sockets, this calls ::recv(MSG_PEEK), which leaves the data in the kernel buffer;
- // pending() is always false.
+ // replacement of ::recv(MSG_PEEK). Error code may not be set if TLS is enabled.
virtual android::base::Result<size_t> peek(void *buf, size_t size) = 0;
- // Returns true if there are data pending in a userspace buffer that RpcTransport holds.
- //
- // Implementation details:
- // - For TLS, this does not invoke any syscalls or read any data from the
- // transport. This only returns whether there are data pending in the internal buffer in
- // userspace.
- // - For raw sockets, this always returns false.
- virtual bool pending() = 0;
-
- // Returns fd for polling.
- //
- // Do not directly read / write on this raw fd!
- [[nodiscard]] virtual android::base::borrowed_fd pollSocket() const = 0;
+ /**
+ * Read (or write), but allow to be interrupted by a trigger.
+ *
+ * Return:
+ * OK - succeeded in completely processing 'size'
+ * error - interrupted (failure or trigger)
+ */
+ virtual status_t interruptableWriteFully(FdTrigger *fdTrigger, const void *buf,
+ size_t size) = 0;
+ virtual status_t interruptableReadFully(FdTrigger *fdTrigger, void *buf, size_t size) = 0;
protected:
RpcTransport() = default;