binder: Refactor: move FdTrigger to its own file / class.
Also move interruptable*Fully functions to RpcTransport so that we no
longer need pending() and pollSocket().
This also allows us to hide send() / recv(); callers should use
interruptableWriteFully / interruptableReadFully instead, because
those repsect the shutdown trigger.
- Fix one place to use interruptableWriteFully() instead of send() when
sending header.
interruptable*Fully are marked as virtual functions because TLS will
need to poll with events dynamically adjusted. See follow-up CLs for
TLS implementation.
Test: TH
Bug: 190868302
Change-Id: I131eed3a637b3a30280b320966e466bbfac0fc45
diff --git a/libs/binder/RpcTransportRaw.cpp b/libs/binder/RpcTransportRaw.cpp
index 2fc1945..995c542 100644
--- a/libs/binder/RpcTransportRaw.cpp
+++ b/libs/binder/RpcTransportRaw.cpp
@@ -17,8 +17,11 @@
#define LOG_TAG "RpcRawTransport"
#include <log/log.h>
+#include <poll.h>
+
#include <binder/RpcTransportRaw.h>
+#include "FdTrigger.h"
#include "RpcState.h"
using android::base::ErrnoError;
@@ -32,14 +35,14 @@
class RpcTransportRaw : public RpcTransport {
public:
explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {}
- Result<size_t> send(const void *buf, size_t size) override {
+ Result<size_t> send(const void* buf, size_t size) {
ssize_t ret = TEMP_FAILURE_RETRY(::send(mSocket.get(), buf, size, MSG_NOSIGNAL));
if (ret < 0) {
return ErrnoError() << "send()";
}
return ret;
}
- Result<size_t> recv(void *buf, size_t size) override {
+ Result<size_t> recv(void* buf, size_t size) {
ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_NOSIGNAL));
if (ret < 0) {
return ErrnoError() << "recv()";
@@ -53,8 +56,50 @@
}
return ret;
}
- bool pending() override { return false; }
- android::base::borrowed_fd pollSocket() const override { return mSocket; }
+
+ status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override {
+ const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
+ const uint8_t* end = buffer + size;
+
+ MAYBE_WAIT_IN_FLAKE_MODE;
+
+ status_t status;
+ while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLOUT)) == OK) {
+ auto writeSize = this->send(buffer, end - buffer);
+ if (!writeSize.ok()) {
+ LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
+ return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
+ }
+
+ if (*writeSize == 0) return DEAD_OBJECT;
+
+ buffer += *writeSize;
+ if (buffer == end) return OK;
+ }
+ return status;
+ }
+
+ status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override {
+ uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
+ uint8_t* end = buffer + size;
+
+ MAYBE_WAIT_IN_FLAKE_MODE;
+
+ status_t status;
+ while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLIN)) == OK) {
+ auto readSize = this->recv(buffer, end - buffer);
+ if (!readSize.ok()) {
+ LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
+ return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
+ }
+
+ if (*readSize == 0) return DEAD_OBJECT; // EOF
+
+ buffer += *readSize;
+ if (buffer == end) return OK;
+ }
+ return status;
+ }
private:
android::base::unique_fd mSocket;