Merge changes from topic "binder_rpc_non_blocking"

* changes:
  binder: RpcTranpsortCtx::newTransport Add FdTrigger arg.
  binder: RpcSession handle post-connect
  binder: RPC uses non-blocking sockets.
  binder: Refactor: move FdTrigger to its own file / class.
  binder: RpcSession initialize mShutdownTrigger earlier
diff --git a/libs/binder/Android.bp b/libs/binder/Android.bp
index f34672c..34121d2 100644
--- a/libs/binder/Android.bp
+++ b/libs/binder/Android.bp
@@ -104,6 +104,7 @@
         "BpBinder.cpp",
         "BufferedTextOutput.cpp",
         "Debug.cpp",
+        "FdTrigger.cpp",
         "IInterface.cpp",
         "IMemory.cpp",
         "IPCThreadState.cpp",
diff --git a/libs/binder/FdTrigger.cpp b/libs/binder/FdTrigger.cpp
new file mode 100644
index 0000000..e38ac63
--- /dev/null
+++ b/libs/binder/FdTrigger.cpp
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "FdTrigger"
+#include <log/log.h>
+
+#include <poll.h>
+
+#include <android-base/macros.h>
+
+#include "FdTrigger.h"
+namespace android {
+
+std::unique_ptr<FdTrigger> FdTrigger::make() {
+    auto ret = std::make_unique<FdTrigger>();
+    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
+        ALOGE("Could not create pipe %s", strerror(errno));
+        return nullptr;
+    }
+    return ret;
+}
+
+void FdTrigger::trigger() {
+    mWrite.reset();
+}
+
+bool FdTrigger::isTriggered() {
+    return mWrite == -1;
+}
+
+status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
+    while (true) {
+        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
+                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
+        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
+        if (ret < 0) {
+            return -errno;
+        }
+        if (ret == 0) {
+            continue;
+        }
+        if (pfd[1].revents & POLLHUP) {
+            return -ECANCELED;
+        }
+        return pfd[0].revents & event ? OK : DEAD_OBJECT;
+    }
+}
+
+} // namespace android
diff --git a/libs/binder/FdTrigger.h b/libs/binder/FdTrigger.h
new file mode 100644
index 0000000..984e685
--- /dev/null
+++ b/libs/binder/FdTrigger.h
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+
+#include <android-base/unique_fd.h>
+#include <utils/Errors.h>
+
+namespace android {
+
+/** This is not a pipe. */
+class FdTrigger {
+public:
+    /** Returns nullptr for error case */
+    static std::unique_ptr<FdTrigger> make();
+
+    /**
+     * Close the write end of the pipe so that the read end receives POLLHUP.
+     * Not threadsafe.
+     */
+    void trigger();
+
+    /**
+     * Whether this has been triggered.
+     */
+    bool isTriggered();
+
+    /**
+     * Poll for a read event.
+     *
+     * event - for pollfd
+     *
+     * Return:
+     *   true - time to read!
+     *   false - trigger happened
+     */
+    status_t triggerablePoll(base::borrowed_fd fd, int16_t event);
+
+private:
+    base::unique_fd mWrite;
+    base::unique_fd mRead;
+};
+} // namespace android
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 4fa99c0..a20445b 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -29,6 +29,7 @@
 #include <binder/RpcTransportRaw.h>
 #include <log/log.h>
 
+#include "FdTrigger.h"
 #include "RpcSocketAddress.h"
 #include "RpcState.h"
 #include "RpcWireFormat.h"
@@ -156,7 +157,7 @@
         LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join.");
         LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined");
         mJoinThreadRunning = true;
-        mShutdownTrigger = RpcSession::FdTrigger::make();
+        mShutdownTrigger = FdTrigger::make();
         LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");
 
         mCtx = mRpcTransportCtxFactory->newServerCtx();
@@ -167,7 +168,7 @@
     status_t status;
     while ((status = mShutdownTrigger->triggerablePoll(mServer, POLLIN)) == OK) {
         unique_fd clientFd(TEMP_FAILURE_RETRY(
-                accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC)));
+                accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC | SOCK_NONBLOCK)));
 
         if (clientFd < 0) {
             ALOGE("Could not accept4 socket: %s", strerror(errno));
@@ -259,7 +260,7 @@
     status_t status = OK;
 
     int clientFdForLog = clientFd.get();
-    auto client = server->mCtx->newTransport(std::move(clientFd));
+    auto client = server->mCtx->newTransport(std::move(clientFd), server->mShutdownTrigger.get());
     if (client == nullptr) {
         ALOGE("Dropping accept4()-ed socket because sslAccept fails");
         status = DEAD_OBJECT;
@@ -270,8 +271,8 @@
 
     RpcConnectionHeader header;
     if (status == OK) {
-        status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header,
-                                                                  sizeof(header));
+        status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header,
+                                                sizeof(header));
         if (status != OK) {
             ALOGE("Failed to read ID for client connecting to RPC server: %s",
                   statusToString(status).c_str());
@@ -296,8 +297,8 @@
                     .version = protocolVersion,
             };
 
-            status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response,
-                                                                       sizeof(response));
+            status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response,
+                                                     sizeof(response));
             if (status != OK) {
                 ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
                 // still need to cleanup before we can return
@@ -387,8 +388,8 @@
     LOG_RPC_DETAIL("Setting up socket server %s", addr.toString().c_str());
     LOG_ALWAYS_FATAL_IF(hasServer(), "Each RpcServer can only have one server.");
 
-    unique_fd serverFd(
-            TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
+    unique_fd serverFd(TEMP_FAILURE_RETRY(
+            socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)));
     if (serverFd == -1) {
         int savedErrno = errno;
         ALOGE("Could not create socket: %s", strerror(savedErrno));
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index 107210b..4c47005 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -35,9 +35,11 @@
 #include <jni.h>
 #include <utils/String8.h>
 
+#include "FdTrigger.h"
 #include "RpcSocketAddress.h"
 #include "RpcState.h"
 #include "RpcWireFormat.h"
+#include "Utils.h"
 
 #ifdef __GLIBC__
 extern "C" pid_t gettid();
@@ -133,12 +135,18 @@
             fd = request();
             if (!fd.ok()) return BAD_VALUE;
         }
+        if (auto res = setNonBlocking(fd); !res.ok()) {
+            ALOGE("setupPreconnectedClient: %s", res.error().message().c_str());
+            return res.error().code() == 0 ? UNKNOWN_ERROR : -res.error().code();
+        }
         return initAndAddConnection(std::move(fd), sessionId, incoming);
     });
 }
 
 status_t RpcSession::addNullDebuggingClient() {
     // Note: only works on raw sockets.
+    if (auto status = initShutdownTrigger(); status != OK) return status;
+
     unique_fd serverFd(TEMP_FAILURE_RETRY(open("/dev/null", O_WRONLY | O_CLOEXEC)));
 
     if (serverFd == -1) {
@@ -152,7 +160,7 @@
         ALOGE("Unable to create RpcTransportCtx for null debugging client");
         return NO_MEMORY;
     }
-    auto server = ctx->newTransport(std::move(serverFd));
+    auto server = ctx->newTransport(std::move(serverFd), mShutdownTrigger.get());
     if (server == nullptr) {
         ALOGE("Unable to set up RpcTransport");
         return UNKNOWN_ERROR;
@@ -216,91 +224,6 @@
     return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address);
 }
 
-std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
-    auto ret = std::make_unique<RpcSession::FdTrigger>();
-    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
-        ALOGE("Could not create pipe %s", strerror(errno));
-        return nullptr;
-    }
-    return ret;
-}
-
-void RpcSession::FdTrigger::trigger() {
-    mWrite.reset();
-}
-
-bool RpcSession::FdTrigger::isTriggered() {
-    return mWrite == -1;
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) {
-    return triggerablePoll(rpcTransport->pollSocket(), event);
-}
-
-status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
-    while (true) {
-        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
-                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
-        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
-        if (ret < 0) {
-            return -errno;
-        }
-        if (ret == 0) {
-            continue;
-        }
-        if (pfd[1].revents & POLLHUP) {
-            return -ECANCELED;
-        }
-        return pfd[0].revents & event ? OK : DEAD_OBJECT;
-    }
-}
-
-status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport,
-                                                        const void* data, size_t size) {
-    const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
-    const uint8_t* end = buffer + size;
-
-    MAYBE_WAIT_IN_FLAKE_MODE;
-
-    status_t status;
-    while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) {
-        auto writeSize = rpcTransport->send(buffer, end - buffer);
-        if (!writeSize.ok()) {
-            LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
-            return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
-        }
-
-        if (*writeSize == 0) return DEAD_OBJECT;
-
-        buffer += *writeSize;
-        if (buffer == end) return OK;
-    }
-    return status;
-}
-
-status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data,
-                                                       size_t size) {
-    uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
-    uint8_t* end = buffer + size;
-
-    MAYBE_WAIT_IN_FLAKE_MODE;
-
-    status_t status;
-    while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) {
-        auto readSize = rpcTransport->recv(buffer, end - buffer);
-        if (!readSize.ok()) {
-            LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
-            return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
-        }
-
-        if (*readSize == 0) return DEAD_OBJECT; // EOF
-
-        buffer += *readSize;
-        if (buffer == end) return OK;
-    }
-    return status;
-}
-
 status_t RpcSession::readId() {
     {
         std::lock_guard<std::mutex> _l(mMutex);
@@ -484,6 +407,7 @@
                             "Must only setup session once, but already has %zu clients",
                             mOutgoingConnections.size());
     }
+    if (auto status = initShutdownTrigger(); status != OK) return status;
 
     if (status_t status = connectAndInit(RpcAddress::zero(), false /*incoming*/); status != OK)
         return status;
@@ -550,8 +474,8 @@
     for (size_t tries = 0; tries < 5; tries++) {
         if (tries > 0) usleep(10000);
 
-        unique_fd serverFd(
-                TEMP_FAILURE_RETRY(socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC, 0)));
+        unique_fd serverFd(TEMP_FAILURE_RETRY(
+                socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)));
         if (serverFd == -1) {
             int savedErrno = errno;
             ALOGE("Could not create socket at %s: %s", addr.toString().c_str(),
@@ -564,10 +488,34 @@
                 ALOGW("Connection reset on %s", addr.toString().c_str());
                 continue;
             }
-            int savedErrno = errno;
-            ALOGE("Could not connect socket at %s: %s", addr.toString().c_str(),
-                  strerror(savedErrno));
-            return -savedErrno;
+            if (errno != EAGAIN && errno != EINPROGRESS) {
+                int savedErrno = errno;
+                ALOGE("Could not connect socket at %s: %s", addr.toString().c_str(),
+                      strerror(savedErrno));
+                return -savedErrno;
+            }
+            // For non-blocking sockets, connect() may return EAGAIN (for unix domain socket) or
+            // EINPROGRESS (for others). Call poll() and getsockopt() to get the error.
+            status_t pollStatus = mShutdownTrigger->triggerablePoll(serverFd, POLLOUT);
+            if (pollStatus != OK) {
+                ALOGE("Could not POLLOUT after connect() on non-blocking socket: %s",
+                      statusToString(pollStatus).c_str());
+                return pollStatus;
+            }
+            int soError;
+            socklen_t soErrorLen = sizeof(soError);
+            int ret = getsockopt(serverFd.get(), SOL_SOCKET, SO_ERROR, &soError, &soErrorLen);
+            if (ret == -1) {
+                int savedErrno = errno;
+                ALOGE("Could not getsockopt() after connect() on non-blocking socket: %s",
+                      strerror(savedErrno));
+                return -savedErrno;
+            }
+            if (soError != 0) {
+                ALOGE("After connect(), getsockopt() returns error for socket at %s: %s",
+                      addr.toString().c_str(), strerror(soError));
+                return -soError;
+            }
         }
         LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());
 
@@ -580,13 +528,14 @@
 
 status_t RpcSession::initAndAddConnection(unique_fd fd, const RpcAddress& sessionId,
                                           bool incoming) {
+    LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr);
     auto ctx = mRpcTransportCtxFactory->newClientCtx();
     if (ctx == nullptr) {
         ALOGE("Unable to create client RpcTransportCtx with %s sockets",
               mRpcTransportCtxFactory->toCString());
         return NO_MEMORY;
     }
-    auto server = ctx->newTransport(std::move(fd));
+    auto server = ctx->newTransport(std::move(fd), mShutdownTrigger.get());
     if (server == nullptr) {
         ALOGE("Unable to set up RpcTransport in %s context", mRpcTransportCtxFactory->toCString());
         return UNKNOWN_ERROR;
@@ -602,16 +551,12 @@
 
     if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;
 
-    auto sentHeader = server->send(&header, sizeof(header));
-    if (!sentHeader.ok()) {
+    auto sendHeaderStatus =
+            server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header));
+    if (sendHeaderStatus != OK) {
         ALOGE("Could not write connection header to socket: %s",
-              sentHeader.error().message().c_str());
-        return -sentHeader.error().code();
-    }
-    if (*sentHeader != sizeof(header)) {
-        ALOGE("Could not write connection header to socket: sent %zd bytes, expected %zd",
-              *sentHeader, sizeof(header));
-        return UNKNOWN_ERROR;
+              statusToString(sendHeaderStatus).c_str());
+        return sendHeaderStatus;
     }
 
     LOG_RPC_DETAIL("Socket at client: header sent");
@@ -652,19 +597,21 @@
     return OK;
 }
 
+status_t RpcSession::initShutdownTrigger() {
+    // first client connection added, but setForServer not called, so
+    // initializaing for a client.
+    if (mShutdownTrigger == nullptr) {
+        mShutdownTrigger = FdTrigger::make();
+        mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
+        if (mShutdownTrigger == nullptr) return INVALID_OPERATION;
+    }
+    return OK;
+}
+
 status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) {
     sp<RpcConnection> connection = sp<RpcConnection>::make();
     {
         std::lock_guard<std::mutex> _l(mMutex);
-
-        // first client connection added, but setForServer not called, so
-        // initializaing for a client.
-        if (mShutdownTrigger == nullptr) {
-            mShutdownTrigger = FdTrigger::make();
-            mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
-            if (mShutdownTrigger == nullptr) return INVALID_OPERATION;
-        }
-
         connection->rpcTransport = std::move(rpcTransport);
         connection->exclusiveTid = gettid();
         mOutgoingConnections.push_back(connection);
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 23382c3..b58f1b3 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -283,8 +283,8 @@
     }
 
     if (status_t status =
-                session->mShutdownTrigger->interruptableWriteFully(connection->rpcTransport.get(),
-                                                                   data, size);
+                connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(),
+                                                                  data, size);
         status != OK) {
         LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                        connection->rpcTransport.get(), statusToString(status).c_str());
@@ -305,8 +305,8 @@
     }
 
     if (status_t status =
-                session->mShutdownTrigger->interruptableReadFully(connection->rpcTransport.get(),
-                                                                  data, size);
+                connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(),
+                                                                 data, size);
         status != OK) {
         LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                        connection->rpcTransport.get(), statusToString(status).c_str());
diff --git a/libs/binder/RpcTransportRaw.cpp b/libs/binder/RpcTransportRaw.cpp
index 2fc1945..d77fc52 100644
--- a/libs/binder/RpcTransportRaw.cpp
+++ b/libs/binder/RpcTransportRaw.cpp
@@ -17,8 +17,11 @@
 #define LOG_TAG "RpcRawTransport"
 #include <log/log.h>
 
+#include <poll.h>
+
 #include <binder/RpcTransportRaw.h>
 
+#include "FdTrigger.h"
 #include "RpcState.h"
 
 using android::base::ErrnoError;
@@ -32,14 +35,14 @@
 class RpcTransportRaw : public RpcTransport {
 public:
     explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {}
-    Result<size_t> send(const void *buf, size_t size) override {
+    Result<size_t> send(const void* buf, size_t size) {
         ssize_t ret = TEMP_FAILURE_RETRY(::send(mSocket.get(), buf, size, MSG_NOSIGNAL));
         if (ret < 0) {
             return ErrnoError() << "send()";
         }
         return ret;
     }
-    Result<size_t> recv(void *buf, size_t size) override {
+    Result<size_t> recv(void* buf, size_t size) {
         ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_NOSIGNAL));
         if (ret < 0) {
             return ErrnoError() << "recv()";
@@ -47,14 +50,56 @@
         return ret;
     }
     Result<size_t> peek(void *buf, size_t size) override {
-        ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_PEEK | MSG_DONTWAIT));
+        ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_PEEK));
         if (ret < 0) {
             return ErrnoError() << "recv(MSG_PEEK)";
         }
         return ret;
     }
-    bool pending() override { return false; }
-    android::base::borrowed_fd pollSocket() const override { return mSocket; }
+
+    status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override {
+        const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
+        const uint8_t* end = buffer + size;
+
+        MAYBE_WAIT_IN_FLAKE_MODE;
+
+        status_t status;
+        while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLOUT)) == OK) {
+            auto writeSize = this->send(buffer, end - buffer);
+            if (!writeSize.ok()) {
+                LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
+                return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
+            }
+
+            if (*writeSize == 0) return DEAD_OBJECT;
+
+            buffer += *writeSize;
+            if (buffer == end) return OK;
+        }
+        return status;
+    }
+
+    status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override {
+        uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
+        uint8_t* end = buffer + size;
+
+        MAYBE_WAIT_IN_FLAKE_MODE;
+
+        status_t status;
+        while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLIN)) == OK) {
+            auto readSize = this->recv(buffer, end - buffer);
+            if (!readSize.ok()) {
+                LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
+                return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
+            }
+
+            if (*readSize == 0) return DEAD_OBJECT; // EOF
+
+            buffer += *readSize;
+            if (buffer == end) return OK;
+        }
+        return status;
+    }
 
 private:
     android::base::unique_fd mSocket;
@@ -63,7 +108,7 @@
 // RpcTransportCtx with TLS disabled.
 class RpcTransportCtxRaw : public RpcTransportCtx {
 public:
-    std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd) const {
+    std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd, FdTrigger*) const {
         return std::make_unique<RpcTransportRaw>(std::move(fd));
     }
 };
diff --git a/libs/binder/Utils.cpp b/libs/binder/Utils.cpp
index 90a4502..d2a5be1 100644
--- a/libs/binder/Utils.cpp
+++ b/libs/binder/Utils.cpp
@@ -18,10 +18,24 @@
 
 #include <string.h>
 
+using android::base::ErrnoError;
+using android::base::Result;
+
 namespace android {
 
 void zeroMemory(uint8_t* data, size_t size) {
     memset(data, 0, size);
 }
 
-}   // namespace android
+Result<void> setNonBlocking(android::base::borrowed_fd fd) {
+    int flags = TEMP_FAILURE_RETRY(fcntl(fd.get(), F_GETFL));
+    if (flags == -1) {
+        return ErrnoError() << "Could not get flags for fd";
+    }
+    if (int ret = TEMP_FAILURE_RETRY(fcntl(fd.get(), F_SETFL, flags | O_NONBLOCK)); ret == -1) {
+        return ErrnoError() << "Could not set non-blocking flag for fd";
+    }
+    return {};
+}
+
+} // namespace android
diff --git a/libs/binder/Utils.h b/libs/binder/Utils.h
index f94b158..1e383da 100644
--- a/libs/binder/Utils.h
+++ b/libs/binder/Utils.h
@@ -17,9 +17,14 @@
 #include <cstdint>
 #include <stddef.h>
 
+#include <android-base/result.h>
+#include <android-base/unique_fd.h>
+
 namespace android {
 
 // avoid optimizations
 void zeroMemory(uint8_t* data, size_t size);
 
+android::base::Result<void> setNonBlocking(android::base::borrowed_fd fd);
+
 }   // namespace android
diff --git a/libs/binder/include/binder/RpcServer.h b/libs/binder/include/binder/RpcServer.h
index f79d85f..bf3e7e0 100644
--- a/libs/binder/include/binder/RpcServer.h
+++ b/libs/binder/include/binder/RpcServer.h
@@ -32,6 +32,7 @@
 
 namespace android {
 
+class FdTrigger;
 class RpcSocketAddress;
 
 /**
@@ -190,7 +191,7 @@
     sp<IBinder> mRootObject;
     wp<IBinder> mRootObjectWeak;
     std::map<RpcAddress, sp<RpcSession>> mSessions;
-    std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
+    std::unique_ptr<FdTrigger> mShutdownTrigger;
     std::condition_variable mShutdownCv;
     std::unique_ptr<RpcTransportCtx> mCtx;
 };
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 7ed6e43..761c50d 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -39,6 +39,7 @@
 class RpcSocketAddress;
 class RpcState;
 class RpcTransport;
+class FdTrigger;
 
 constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_NEXT = 0;
 constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL = 0xF0000000;
@@ -161,50 +162,6 @@
     friend RpcState;
     explicit RpcSession(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory);
 
-    /** This is not a pipe. */
-    struct FdTrigger {
-        /** Returns nullptr for error case */
-        static std::unique_ptr<FdTrigger> make();
-
-        /**
-         * Close the write end of the pipe so that the read end receives POLLHUP.
-         * Not threadsafe.
-         */
-        void trigger();
-
-        /**
-         * Whether this has been triggered.
-         */
-        bool isTriggered();
-
-        /**
-         * Poll for a read event.
-         *
-         * event - for pollfd
-         *
-         * Return:
-         *   true - time to read!
-         *   false - trigger happened
-         */
-        status_t triggerablePoll(base::borrowed_fd fd, int16_t event);
-
-        /**
-         * Read (or write), but allow to be interrupted by this trigger.
-         *
-         * Return:
-         *   true - succeeded in completely processing 'size'
-         *   false - interrupted (failure or trigger)
-         */
-        status_t interruptableReadFully(RpcTransport* rpcTransport, void* data, size_t size);
-        status_t interruptableWriteFully(RpcTransport* rpcTransport, const void* data, size_t size);
-
-    private:
-        status_t triggerablePoll(RpcTransport* rpcTransport, int16_t event);
-
-        base::unique_fd mWrite;
-        base::unique_fd mRead;
-    };
-
     class EventListener : public virtual RefBase {
     public:
         virtual void onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session) = 0;
@@ -271,6 +228,8 @@
             std::unique_ptr<RpcTransport> rpcTransport);
     [[nodiscard]] bool removeIncomingConnection(const sp<RpcConnection>& connection);
 
+    status_t initShutdownTrigger();
+
     enum class ConnectionUse {
         CLIENT,
         CLIENT_ASYNC,
diff --git a/libs/binder/include/binder/RpcTransport.h b/libs/binder/include/binder/RpcTransport.h
index 1164600..1b69519 100644
--- a/libs/binder/include/binder/RpcTransport.h
+++ b/libs/binder/include/binder/RpcTransport.h
@@ -23,42 +23,30 @@
 
 #include <android-base/result.h>
 #include <android-base/unique_fd.h>
+#include <utils/Errors.h>
 
 namespace android {
 
+class FdTrigger;
+
 // Represents a socket connection.
 class RpcTransport {
 public:
     virtual ~RpcTransport() = default;
 
-    // replacement of ::send(). errno may not be set if TLS is enabled.
-    virtual android::base::Result<size_t> send(const void *buf, size_t size) = 0;
-
-    // replacement of ::recv(). errno may not be set if TLS is enabled.
-    virtual android::base::Result<size_t> recv(void *buf, size_t size) = 0;
-
-    // replacement of ::recv(MSG_PEEK). errno may not be set if TLS is enabled.
-    //
-    // Implementation details:
-    // - For TLS, this may invoke syscalls and read data from the transport
-    // into an internal buffer in userspace. After that, pending() == true.
-    // - For raw sockets, this calls ::recv(MSG_PEEK), which leaves the data in the kernel buffer;
-    // pending() is always false.
+    // replacement of ::recv(MSG_PEEK). Error code may not be set if TLS is enabled.
     virtual android::base::Result<size_t> peek(void *buf, size_t size) = 0;
 
-    // Returns true if there are data pending in a userspace buffer that RpcTransport holds.
-    //
-    // Implementation details:
-    // - For TLS, this does not invoke any syscalls or read any data from the
-    // transport. This only returns whether there are data pending in the internal buffer in
-    // userspace.
-    // - For raw sockets, this always returns false.
-    virtual bool pending() = 0;
-
-    // Returns fd for polling.
-    //
-    // Do not directly read / write on this raw fd!
-    [[nodiscard]] virtual android::base::borrowed_fd pollSocket() const = 0;
+    /**
+     * Read (or write), but allow to be interrupted by a trigger.
+     *
+     * Return:
+     *   OK - succeeded in completely processing 'size'
+     *   error - interrupted (failure or trigger)
+     */
+    virtual status_t interruptableWriteFully(FdTrigger *fdTrigger, const void *buf,
+                                             size_t size) = 0;
+    virtual status_t interruptableReadFully(FdTrigger *fdTrigger, void *buf, size_t size) = 0;
 
 protected:
     RpcTransport() = default;
@@ -68,8 +56,13 @@
 class RpcTransportCtx {
 public:
     virtual ~RpcTransportCtx() = default;
+
+    // Create a new RpcTransport object.
+    //
+    // Implemenion details: for TLS, this function may incur I/O. |fdTrigger| may be used
+    // to interrupt I/O. This function blocks until handshake is finished.
     [[nodiscard]] virtual std::unique_ptr<RpcTransport> newTransport(
-            android::base::unique_fd fd) const = 0;
+            android::base::unique_fd fd, FdTrigger *fdTrigger) const = 0;
 
 protected:
     RpcTransportCtx() = default;