binder: Refactor: move FdTrigger to its own file / class.

Also move interruptable*Fully functions to RpcTransport so that we no
longer need pending() and pollSocket().

This also allows us to hide send() / recv(); callers should use
interruptableWriteFully / interruptableReadFully instead, because
those repsect the shutdown trigger.
- Fix one place to use interruptableWriteFully() instead of send() when
  sending header.

interruptable*Fully are marked as virtual functions because TLS will
need to poll with events dynamically adjusted. See follow-up CLs for
TLS implementation.

Test: TH
Bug: 190868302
Change-Id: I131eed3a637b3a30280b320966e466bbfac0fc45
diff --git a/libs/binder/Android.bp b/libs/binder/Android.bp
index f34672c..34121d2 100644
--- a/libs/binder/Android.bp
+++ b/libs/binder/Android.bp
@@ -104,6 +104,7 @@
         "BpBinder.cpp",
         "BufferedTextOutput.cpp",
         "Debug.cpp",
+        "FdTrigger.cpp",
         "IInterface.cpp",
         "IMemory.cpp",
         "IPCThreadState.cpp",
diff --git a/libs/binder/FdTrigger.cpp b/libs/binder/FdTrigger.cpp
new file mode 100644
index 0000000..e38ac63
--- /dev/null
+++ b/libs/binder/FdTrigger.cpp
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "FdTrigger"
+#include <log/log.h>
+
+#include <poll.h>
+
+#include <android-base/macros.h>
+
+#include "FdTrigger.h"
+namespace android {
+
+std::unique_ptr<FdTrigger> FdTrigger::make() {
+    auto ret = std::make_unique<FdTrigger>();
+    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
+        ALOGE("Could not create pipe %s", strerror(errno));
+        return nullptr;
+    }
+    return ret;
+}
+
+void FdTrigger::trigger() {
+    mWrite.reset();
+}
+
+bool FdTrigger::isTriggered() {
+    return mWrite == -1;
+}
+
+status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
+    while (true) {
+        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
+                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
+        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
+        if (ret < 0) {
+            return -errno;
+        }
+        if (ret == 0) {
+            continue;
+        }
+        if (pfd[1].revents & POLLHUP) {
+            return -ECANCELED;
+        }
+        return pfd[0].revents & event ? OK : DEAD_OBJECT;
+    }
+}
+
+} // namespace android
diff --git a/libs/binder/FdTrigger.h b/libs/binder/FdTrigger.h
new file mode 100644
index 0000000..984e685
--- /dev/null
+++ b/libs/binder/FdTrigger.h
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include <android-base/unique_fd.h>
+#include <utils/Errors.h>
+
+namespace android {
+
+/** This is not a pipe. */
+class FdTrigger {
+public:
+    /** Returns nullptr for error case */
+    static std::unique_ptr<FdTrigger> make();
+
+    /**
+     * Close the write end of the pipe so that the read end receives POLLHUP.
+     * Not threadsafe.
+     */
+    void trigger();
+
+    /**
+     * Whether this has been triggered.
+     */
+    bool isTriggered();
+
+    /**
+     * Poll for a read event.
+     *
+     * event - for pollfd
+     *
+     * Return:
+     *   true - time to read!
+     *   false - trigger happened
+     */
+    status_t triggerablePoll(base::borrowed_fd fd, int16_t event);
+
+private:
+    base::unique_fd mWrite;
+    base::unique_fd mRead;
+};
+} // namespace android
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 4fa99c0..66483ed 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -29,6 +29,7 @@
 #include <binder/RpcTransportRaw.h>
 #include <log/log.h>
 
+#include "FdTrigger.h"
 #include "RpcSocketAddress.h"
 #include "RpcState.h"
 #include "RpcWireFormat.h"
@@ -156,7 +157,7 @@
         LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join.");
         LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined");
         mJoinThreadRunning = true;
-        mShutdownTrigger = RpcSession::FdTrigger::make();
+        mShutdownTrigger = FdTrigger::make();
         LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");
 
         mCtx = mRpcTransportCtxFactory->newServerCtx();
@@ -270,8 +271,8 @@
 
     RpcConnectionHeader header;
     if (status == OK) {
-        status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header,
-                                                                  sizeof(header));
+        status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header,
+                                                sizeof(header));
         if (status != OK) {
             ALOGE("Failed to read ID for client connecting to RPC server: %s",
                   statusToString(status).c_str());
@@ -296,8 +297,8 @@
                     .version = protocolVersion,
             };
 
-            status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response,
-                                                                       sizeof(response));
+            status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response,
+                                                     sizeof(response));
             if (status != OK) {
                 ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
                 // still need to cleanup before we can return
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index fe12ed4..c756f2e 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -35,6 +35,7 @@
 #include <jni.h>
 #include <utils/String8.h>
 
+#include "FdTrigger.h"
 #include "RpcSocketAddress.h"
 #include "RpcState.h"
 #include "RpcWireFormat.h"
@@ -218,91 +219,6 @@
     return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address);
 }
 
-std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
-    auto ret = std::make_unique<RpcSession::FdTrigger>();
-    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
-        ALOGE("Could not create pipe %s", strerror(errno));
-        return nullptr;
-    }
-    return ret;
-}
-
-void RpcSession::FdTrigger::trigger() {
-    mWrite.reset();
-}
-
-bool RpcSession::FdTrigger::isTriggered() {
-    return mWrite == -1;
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) {
-    return triggerablePoll(rpcTransport->pollSocket(), event);
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
-    while (true) {
-        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
-                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
-        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
-        if (ret < 0) {
-            return -errno;
-        }
-        if (ret == 0) {
-            continue;
-        }
-        if (pfd[1].revents & POLLHUP) {
-            return -ECANCELED;
-        }
-        return pfd[0].revents & event ? OK : DEAD_OBJECT;
-    }
-}
-
-status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport,
-                                                        const void* data, size_t size) {
-    const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
-    const uint8_t* end = buffer + size;
-
-    MAYBE_WAIT_IN_FLAKE_MODE;
-
-    status_t status;
-    while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) {
-        auto writeSize = rpcTransport->send(buffer, end - buffer);
-        if (!writeSize.ok()) {
-            LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
-            return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
-        }
-
-        if (*writeSize == 0) return DEAD_OBJECT;
-
-        buffer += *writeSize;
-        if (buffer == end) return OK;
-    }
-    return status;
-}
-
-status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data,
-                                                       size_t size) {
-    uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
-    uint8_t* end = buffer + size;
-
-    MAYBE_WAIT_IN_FLAKE_MODE;
-
-    status_t status;
-    while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) {
-        auto readSize = rpcTransport->recv(buffer, end - buffer);
-        if (!readSize.ok()) {
-            LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
-            return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
-        }
-
-        if (*readSize == 0) return DEAD_OBJECT; // EOF
-
-        buffer += *readSize;
-        if (buffer == end) return OK;
-    }
-    return status;
-}
-
 status_t RpcSession::readId() {
     {
         std::lock_guard<std::mutex> _l(mMutex);
@@ -584,6 +500,7 @@
 
 status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId,
                                           bool incoming) {
+    LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr);
     auto ctx = mRpcTransportCtxFactory->newClientCtx();
     if (ctx == nullptr) {
         ALOGE("Unable to create client RpcTransportCtx with %s sockets",
@@ -606,16 +523,12 @@
 
     if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
 
-    auto sentHeader = server->send(&header, sizeof(header));
-    if (!sentHeader.ok()) {
+    auto sendHeaderStatus =
+            server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header));
+    if (sendHeaderStatus != OK) {
         ALOGE("Could not write connection header to socket: %s",
-              sentHeader.error().message().c_str());
-        return -sentHeader.error().code();
-    }
-    if (*sentHeader != sizeof(header)) {
-        ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd",
-              *sentHeader, sizeof(header));
-        return UNKNOWN_ERROR;
+              statusToString(sendHeaderStatus).c_str());
+        return sendHeaderStatus;
     }
 
     LOG_RPC_DETAIL("Socket at client: header sent");
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 23382c3..b58f1b3 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -283,8 +283,8 @@
     }
 
     if (status_t status =
-                session->mShutdownTrigger->interruptableWriteFully(connection->rpcTransport.get(),
-                                                                   data, size);
+                connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(),
+                                                                  data, size);
         status != OK) {
         LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                        connection->rpcTransport.get(), statusToString(status).c_str());
@@ -305,8 +305,8 @@
     }
 
     if (status_t status =
-                session->mShutdownTrigger->interruptableReadFully(connection->rpcTransport.get(),
-                                                                  data, size);
+                connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(),
+                                                                 data, size);
         status != OK) {
         LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                        connection->rpcTransport.get(), statusToString(status).c_str());
diff --git a/libs/binder/RpcTransportRaw.cpp b/libs/binder/RpcTransportRaw.cpp
index 2fc1945..995c542 100644
--- a/libs/binder/RpcTransportRaw.cpp
+++ b/libs/binder/RpcTransportRaw.cpp
@@ -17,8 +17,11 @@
 #define LOG_TAG "RpcRawTransport"
 #include <log/log.h>
 
+#include <poll.h>
+
 #include <binder/RpcTransportRaw.h>
 
+#include "FdTrigger.h"
 #include "RpcState.h"
 
 using android::base::ErrnoError;
@@ -32,14 +35,14 @@
 class RpcTransportRaw : public RpcTransport {
 public:
     explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {}
-    Result<size_t> send(const void *buf, size_t size) override {
+    Result<size_t> send(const void* buf, size_t size) {
         ssize_t ret = TEMP_FAILURE_RETRY(::send(mSocket.get(), buf, size, MSG_NOSIGNAL));
         if (ret < 0) {
             return ErrnoError() << "send()";
         }
         return ret;
     }
-    Result<size_t> recv(void *buf, size_t size) override {
+    Result<size_t> recv(void* buf, size_t size) {
         ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_NOSIGNAL));
         if (ret < 0) {
             return ErrnoError() << "recv()";
@@ -53,8 +56,50 @@
         }
         return ret;
     }
-    bool pending() override { return false; }
-    android::base::borrowed_fd pollSocket() const override { return mSocket; }
+
+    status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override {
+        const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
+        const uint8_t* end = buffer + size;
+
+        MAYBE_WAIT_IN_FLAKE_MODE;
+
+        status_t status;
+        while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLOUT)) == OK) {
+            auto writeSize = this->send(buffer, end - buffer);
+            if (!writeSize.ok()) {
+                LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
+                return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
+            }
+
+            if (*writeSize == 0) return DEAD_OBJECT;
+
+            buffer += *writeSize;
+            if (buffer == end) return OK;
+        }
+        return status;
+    }
+
+    status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override {
+        uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
+        uint8_t* end = buffer + size;
+
+        MAYBE_WAIT_IN_FLAKE_MODE;
+
+        status_t status;
+        while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLIN)) == OK) {
+            auto readSize = this->recv(buffer, end - buffer);
+            if (!readSize.ok()) {
+                LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
+                return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
+            }
+
+            if (*readSize == 0) return DEAD_OBJECT; // EOF
+
+            buffer += *readSize;
+            if (buffer == end) return OK;
+        }
+        return status;
+    }
 
 private:
     android::base::unique_fd mSocket;
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index f79d85f..bf3e7e0 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -32,6 +32,7 @@
 
 namespace android {
 
+class FdTrigger;
 class RpcSocketAddress;
 
 /**
@@ -190,7 +191,7 @@
     sp<IBinder> mRootObject;
     wp<IBinder> mRootObjectWeak;
     std::map<RpcAddress, sp<RpcSession>> mSessions;
-    std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
+    std::unique_ptr<FdTrigger> mShutdownTrigger;
     std::condition_variable mShutdownCv;
     std::unique_ptr<RpcTransportCtx> mCtx;
 };
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 3320623..761c50d 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -39,6 +39,7 @@
 class RpcSocketAddress;
 class RpcState;
 class RpcTransport;
+class FdTrigger;
 
 constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_NEXT = 0;
 constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL = 0xF0000000;
@@ -161,50 +162,6 @@
     friend RpcState;
     explicit RpcSession(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory);
 
-    /** This is not a pipe. */
-    struct FdTrigger {
-        /** Returns nullptr for error case */
-        static std::unique_ptr<FdTrigger> make();
-
-        /**
-         * Close the write end of the pipe so that the read end receives POLLHUP.
-         * Not threadsafe.
-         */
-        void trigger();
-
-        /**
-         * Whether this has been triggered.
-         */
-        bool isTriggered();
-
-        /**
-         * Poll for a read event.
-         *
-         * event - for pollfd
-         *
-         * Return:
-         *   true - time to read!
-         *   false - trigger happened
-         */
-        status_t triggerablePoll(base::borrowed_fd fd, int16_t event);
-
-        /**
-         * Read (or write), but allow to be interrupted by this trigger.
-         *
-         * Return:
-         *   true - succeeded in completely processing 'size'
-         *   false - interrupted (failure or trigger)
-         */
-        status_t interruptableReadFully(RpcTransport* rpcTransport, void* data, size_t size);
-        status_t interruptableWriteFully(RpcTransport* rpcTransport, const void* data, size_t size);
-
-    private:
-        status_t triggerablePoll(RpcTransport* rpcTransport, int16_t event);
-
-        base::unique_fd mWrite;
-        base::unique_fd mRead;
-    };
-
     class EventListener : public virtual RefBase {
     public:
         virtual void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0;
diff --git a/libs/binder/include/binder/RpcTransport.h b/libs/binder/include/binder/RpcTransport.h
index 1164600..afca585 100644
--- a/libs/binder/include/binder/RpcTransport.h
+++ b/libs/binder/include/binder/RpcTransport.h
@@ -23,42 +23,30 @@
 
 #include <android-base/result.h>
 #include <android-base/unique_fd.h>
+#include <utils/Errors.h>
 
 namespace android {
 
+class FdTrigger;
+
 // Represents a socket connection.
 class RpcTransport {
 public:
     virtual ~RpcTransport() = default;
 
-    // replacement of ::send(). errno may not be set if TLS is enabled.
-    virtual android::base::Result<size_t> send(const void *buf, size_t size) = 0;
-
-    // replacement of ::recv(). errno may not be set if TLS is enabled.
-    virtual android::base::Result<size_t> recv(void *buf, size_t size) = 0;
-
-    // replacement of ::recv(MSG_PEEK). errno may not be set if TLS is enabled.
-    //
-    // Implementation details:
-    // - For TLS, this may invoke syscalls and read data from the transport
-    // into an internal buffer in userspace. After that, pending() == true.
-    // - For raw sockets, this calls ::recv(MSG_PEEK), which leaves the data in the kernel buffer;
-    // pending() is always false.
+    // replacement of ::recv(MSG_PEEK). Error code may not be set if TLS is enabled.
     virtual android::base::Result<size_t> peek(void *buf, size_t size) = 0;
 
-    // Returns true if there are data pending in a userspace buffer that RpcTransport holds.
-    //
-    // Implementation details:
-    // - For TLS, this does not invoke any syscalls or read any data from the
-    // transport. This only returns whether there are data pending in the internal buffer in
-    // userspace.
-    // - For raw sockets, this always returns false.
-    virtual bool pending() = 0;
-
-    // Returns fd for polling.
-    //
-    // Do not directly read / write on this raw fd!
-    [[nodiscard]] virtual android::base::borrowed_fd pollSocket() const = 0;
+    /**
+     * Read (or write), but allow to be interrupted by a trigger.
+     *
+     * Return:
+     *   OK - succeeded in completely processing 'size'
+     *   error - interrupted (failure or trigger)
+     */
+    virtual status_t interruptableWriteFully(FdTrigger *fdTrigger, const void *buf,
+                                             size_t size) = 0;
+    virtual status_t interruptableReadFully(FdTrigger *fdTrigger, void *buf, size_t size) = 0;
 
 protected:
     RpcTransport() = default;