libbinder: send is non-blocking and interruptible

We actually don't use MSG_DONTBLOCK now, but we only read after
receiving POLLOUT. However, by polling, send also now plugs into the
shutdown triggers. As a side effect, this causes send calls to fail
earlier.

Bug: 186470974
Test: binderRpcTest
Change-Id: Ia13d0dc1749fa7c10340e07e221b4b682cc59cc8
diff --git a/libs/binder/RpcServer.cpp b/libs/binder/RpcServer.cpp
index 2a87ae4..eacd7bc 100644
--- a/libs/binder/RpcServer.cpp
+++ b/libs/binder/RpcServer.cpp
@@ -16,6 +16,7 @@
 
 #define LOG_TAG "RpcServer"
 
+#include <poll.h>
 #include <sys/socket.h>
 #include <sys/un.h>
 
@@ -152,7 +153,7 @@
     }
 
     status_t status;
-    while ((status = mShutdownTrigger->triggerablePollRead(mServer)) == OK) {
+    while ((status = mShutdownTrigger->triggerablePoll(mServer, POLLIN)) == OK) {
         unique_fd clientFd(TEMP_FAILURE_RETRY(
                 accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC)));
 
diff --git a/libs/binder/RpcSession.cpp b/libs/binder/RpcSession.cpp
index bdf1bbe..fc6a900 100644
--- a/libs/binder/RpcSession.cpp
+++ b/libs/binder/RpcSession.cpp
@@ -178,9 +178,11 @@
     return mWrite == -1;
 }
 
-status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
+status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
     while (true) {
-        pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0},
+        pollfd pfd[]{{.fd = fd.get(),
+                      .events = static_cast<int16_t>(event | POLLHUP),
+                      .revents = 0},
                      {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
         int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
         if (ret < 0) {
@@ -192,10 +194,31 @@
         if (pfd[1].revents & POLLHUP) {
             return -ECANCELED;
         }
-        return pfd[0].revents & POLLIN ? OK : DEAD_OBJECT;
+        return pfd[0].revents & event ? OK : DEAD_OBJECT;
     }
 }
 
+status_t RpcSession::FdTrigger::interruptableWriteFully(base::borrowed_fd fd, const void* data,
+                                                        size_t size) {
+    const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
+    const uint8_t* end = buffer + size;
+
+    MAYBE_WAIT_IN_FLAKE_MODE;
+
+    status_t status;
+    while ((status = triggerablePoll(fd, POLLOUT)) == OK) {
+        ssize_t writeSize = TEMP_FAILURE_RETRY(send(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
+        if (writeSize == 0) return DEAD_OBJECT;
+
+        if (writeSize < 0) {
+            return -errno;
+        }
+        buffer += writeSize;
+        if (buffer == end) return OK;
+    }
+    return status;
+}
+
 status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, void* data,
                                                        size_t size) {
     uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
@@ -204,7 +227,7 @@
     MAYBE_WAIT_IN_FLAKE_MODE;
 
     status_t status;
-    while ((status = triggerablePollRead(fd)) == OK) {
+    while ((status = triggerablePoll(fd, POLLIN)) == OK) {
         ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
         if (readSize == 0) return DEAD_OBJECT; // EOF
 
diff --git a/libs/binder/RpcState.cpp b/libs/binder/RpcState.cpp
index 5881703..332c75f 100644
--- a/libs/binder/RpcState.cpp
+++ b/libs/binder/RpcState.cpp
@@ -275,23 +275,19 @@
     LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, connection->fd.get(),
                    hexString(data, size).c_str());
 
-    MAYBE_WAIT_IN_FLAKE_MODE;
-
     if (size > std::numeric_limits<ssize_t>::max()) {
         ALOGE("Cannot send %s at size %zu (too big)", what, size);
         (void)session->shutdownAndWait(false);
         return BAD_VALUE;
     }
 
-    ssize_t sent = TEMP_FAILURE_RETRY(send(connection->fd.get(), data, size, MSG_NOSIGNAL));
-
-    if (sent < 0 || sent != static_cast<ssize_t>(size)) {
-        int savedErrno = errno;
-        LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
-                       size, connection->fd.get(), strerror(savedErrno));
-
+    if (status_t status = session->mShutdownTrigger->interruptableWriteFully(connection->fd.get(),
+                                                                             data, size);
+        status != OK) {
+        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on fd %d, error: %s", what, size,
+                       connection->fd.get(), statusToString(status).c_str());
         (void)session->shutdownAndWait(false);
-        return -savedErrno;
+        return status;
     }
 
     return OK;
diff --git a/libs/binder/include/binder/RpcSession.h b/libs/binder/include/binder/RpcSession.h
index 69c2a1a..662390a 100644
--- a/libs/binder/include/binder/RpcSession.h
+++ b/libs/binder/include/binder/RpcSession.h
@@ -152,20 +152,23 @@
         /**
          * Poll for a read event.
          *
+         * event - for pollfd
+         *
          * Return:
          *   true - time to read!
          *   false - trigger happened
          */
-        status_t triggerablePollRead(base::borrowed_fd fd);
+        status_t triggerablePoll(base::borrowed_fd fd, int16_t event);
 
         /**
-         * Read, but allow the read to be interrupted by this trigger.
+         * Read (or write), but allow to be interrupted by this trigger.
          *
          * Return:
-         *   true - read succeeded at 'size'
+         *   true - succeeded in completely processing 'size'
          *   false - interrupted (failure or trigger)
          */
         status_t interruptableReadFully(base::borrowed_fd fd, void* data, size_t size);
+        status_t interruptableWriteFully(base::borrowed_fd fd, const void* data, size_t size);
 
     private:
         base::unique_fd mWrite;
diff --git a/libs/binder/tests/binderRpcTest.cpp b/libs/binder/tests/binderRpcTest.cpp
index 29bde34..980d3cf 100644
--- a/libs/binder/tests/binderRpcTest.cpp
+++ b/libs/binder/tests/binderRpcTest.cpp
@@ -127,11 +127,6 @@
         out->clear();
         for (auto session : spServer->listSessions()) {
             size_t count = session->state()->countBinders();
-            if (count != 1) {
-                // this is called when there is only one binder held remaining,
-                // so to aid debugging
-                session->state()->dump();
-            }
             out->push_back(count);
         }
         return Status::ok();
@@ -360,7 +355,11 @@
                 EXPECT_EQ(remoteCount, 1);
             }
 
-            EXPECT_OK(rootIface->scheduleShutdown());
+            // even though it is on another thread, shutdown races with
+            // the transaction reply being written
+            if (auto status = rootIface->scheduleShutdown(); !status.isOk()) {
+                EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status;
+            }
         }
 
         rootIface = nullptr;
@@ -968,6 +967,12 @@
     Status status = iface->sleepMsAsync(kTooLongMs);
     EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status;
 
+    // now that it has died, wait for the remote session to shutdown
+    std::vector<int32_t> remoteCounts;
+    do {
+        EXPECT_OK(proc.rootIface->countBinders(&remoteCounts));
+    } while (remoteCounts.size() == kNumClients);
+
     // the second session should be shutdown in the other process by the time we
     // are able to join above (it'll only be hung up once it finishes processing
     // any pending commands). We need to erase this session from the record
@@ -1007,7 +1012,9 @@
 
                 // since we are severing the connection, we need to go ahead and
                 // tell the server to shutdown and exit so that waitpid won't hang
-                EXPECT_OK(proc.rootIface->scheduleShutdown());
+                if (auto status = proc.rootIface->scheduleShutdown(); !status.isOk()) {
+                    EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status;
+                }
 
                 // since this session has a reverse connection w/ a threadpool, we
                 // need to manually shut it down
@@ -1112,7 +1119,7 @@
 
     sp<RpcSession> session = RpcSession::make();
     bool okay = session->setupVsockClient(VMADDR_CID_LOCAL, vsockPort);
-    CHECK(server->shutdown());
+    while (!server->shutdown()) usleep(10000);
     ALOGE("Detected vsock loopback supported: %d", okay);
     return okay;
 }