binder: Refactor: move FdTrigger to its own file / class.

Also move interruptable*Fully functions to RpcTransport so that we no
longer need pending() and pollSocket().

This also allows us to hide send() / recv(); callers should use
interruptableWriteFully / interruptableReadFully instead, because
those repsect the shutdown trigger.
- Fix one place to use interruptableWriteFully() instead of send() when
  sending header.

interruptable*Fully are marked as virtual functions because TLS will
need to poll with events dynamically adjusted. See follow-up CLs for
TLS implementation.

Test: TH
Bug: 190868302
Change-Id: I131eed3a637b3a30280b320966e466bbfac0fc45
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index fe12ed4..c756f2e 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -35,6 +35,7 @@
 #include <jni.h>
 #include <utils/String8.h>
 
+#include "FdTrigger.h"
 #include "RpcSocketAddress.h"
 #include "RpcState.h"
 #include "RpcWireFormat.h"
@@ -218,91 +219,6 @@
     return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address);
 }
 
-std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
-    auto ret = std::make_unique<RpcSession::FdTrigger>();
-    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
-        ALOGE("Could not create pipe %s", strerror(errno));
-        return nullptr;
-    }
-    return ret;
-}
-
-void RpcSession::FdTrigger::trigger() {
-    mWrite.reset();
-}
-
-bool RpcSession::FdTrigger::isTriggered() {
-    return mWrite == -1;
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) {
-    return triggerablePoll(rpcTransport->pollSocket(), event);
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
-    while (true) {
-        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
-                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
-        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
-        if (ret < 0) {
-            return -errno;
-        }
-        if (ret == 0) {
-            continue;
-        }
-        if (pfd[1].revents & POLLHUP) {
-            return -ECANCELED;
-        }
-        return pfd[0].revents & event ? OK : DEAD_OBJECT;
-    }
-}
-
-status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport,
-                                                        const void* data, size_t size) {
-    const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
-    const uint8_t* end = buffer + size;
-
-    MAYBE_WAIT_IN_FLAKE_MODE;
-
-    status_t status;
-    while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) {
-        auto writeSize = rpcTransport->send(buffer, end - buffer);
-        if (!writeSize.ok()) {
-            LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
-            return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
-        }
-
-        if (*writeSize == 0) return DEAD_OBJECT;
-
-        buffer += *writeSize;
-        if (buffer == end) return OK;
-    }
-    return status;
-}
-
-status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data,
-                                                       size_t size) {
-    uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
-    uint8_t* end = buffer + size;
-
-    MAYBE_WAIT_IN_FLAKE_MODE;
-
-    status_t status;
-    while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) {
-        auto readSize = rpcTransport->recv(buffer, end - buffer);
-        if (!readSize.ok()) {
-            LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
-            return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
-        }
-
-        if (*readSize == 0) return DEAD_OBJECT; // EOF
-
-        buffer += *readSize;
-        if (buffer == end) return OK;
-    }
-    return status;
-}
-
 status_t RpcSession::readId() {
     {
         std::lock_guard<std::mutex> _l(mMutex);
@@ -584,6 +500,7 @@
 
 status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId,
                                           bool incoming) {
+    LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr);
     auto ctx = mRpcTransportCtxFactory->newClientCtx();
     if (ctx == nullptr) {
         ALOGE("Unable to create client RpcTransportCtx with %s sockets",
@@ -606,16 +523,12 @@
 
     if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
 
-    auto sentHeader = server->send(&header, sizeof(header));
-    if (!sentHeader.ok()) {
+    auto sendHeaderStatus =
+            server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header));
+    if (sendHeaderStatus != OK) {
         ALOGE("Could not write connection header to socket: %s",
-              sentHeader.error().message().c_str());
-        return -sentHeader.error().code();
-    }
-    if (*sentHeader != sizeof(header)) {
-        ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd",
-              *sentHeader, sizeof(header));
-        return UNKNOWN_ERROR;
+              statusToString(sendHeaderStatus).c_str());
+        return sendHeaderStatus;
     }
 
     LOG_RPC_DETAIL("Socket at client: header sent");