libbinder: RPC avoid poll

One of the major costs of RPC binder right now, compared to kernel
binder, is that we need to make two calls (poll + recv) whereas regular
binder can make one (ioctl) in order to read or write a command.

By removing this, we can get more comparable performance:
        .../0 is KERNEL
        .../1 is RPC

------------------------------------------------------------------------------------
Benchmark                                          Time             CPU   Iterations
------------------------------------------------------------------------------------
BM_pingTransaction/0                           37075 ns        18940 ns        36734
BM_pingTransaction/1                           43729 ns        22184 ns        29429
BM_repeatTwoPageString/0                      266736 ns       133091 ns         5273
BM_repeatTwoPageString/1                      311444 ns       155527 ns         5016
BM_throughputForTransportAndBytes/0/64         43458 ns        22226 ns        28221
BM_throughputForTransportAndBytes/1/64         49153 ns        25038 ns        36104
BM_throughputForTransportAndBytes/0/1024       44020 ns        22418 ns        26449
BM_throughputForTransportAndBytes/1/1024       49634 ns        25554 ns        30230
BM_throughputForTransportAndBytes/0/2048       41932 ns        21246 ns        34684
BM_throughputForTransportAndBytes/1/2048       49055 ns        24907 ns        29689
BM_throughputForTransportAndBytes/0/4096       49634 ns        25179 ns        26992
BM_throughputForTransportAndBytes/1/4096       53318 ns        27001 ns        20076
BM_throughputForTransportAndBytes/0/8182       59537 ns        30068 ns        26722
BM_throughputForTransportAndBytes/1/8182       69677 ns        35005 ns        19992
BM_throughputForTransportAndBytes/0/16364      67281 ns        30455 ns        24654
BM_throughputForTransportAndBytes/1/16364      86123 ns        42752 ns        18558
BM_throughputForTransportAndBytes/0/32728      83229 ns        37705 ns        16238
BM_throughputForTransportAndBytes/1/32728     116709 ns        57592 ns        12981
BM_throughputForTransportAndBytes/0/65535     223220 ns       104757 ns         6015
BM_throughputForTransportAndBytes/1/65535     380800 ns       187026 ns         4544
BM_throughputForTransportAndBytes/0/65536     202564 ns        95486 ns         7548
BM_throughputForTransportAndBytes/1/65536     347559 ns       170957 ns         4795
BM_throughputForTransportAndBytes/0/65537     293614 ns       128131 ns         5816
BM_throughputForTransportAndBytes/1/65537     524383 ns       241437 ns         2927
BM_repeatBinder/0                              62491 ns        33405 ns        19466
BM_repeatBinder/1                              68013 ns        33611 ns        23083

Bug: 182940634
Test: binderRpcBenchmark (above)
Change-Id: I5cbaf40e5936bdce04b5f158ceac970e8f6ff2fa
diff --git a/libs/binder/RpcTransportRaw.cpp b/libs/binder/RpcTransportRaw.cpp
index 41f4a9f..a22bc6f 100644
--- a/libs/binder/RpcTransportRaw.cpp
+++ b/libs/binder/RpcTransportRaw.cpp
@@ -43,56 +43,61 @@
         return ret;
     }
 
-    status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override {
-        const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
-        const uint8_t* end = buffer + size;
+    template <typename Buffer, typename SendOrReceive>
+    status_t interruptableReadOrWrite(FdTrigger* fdTrigger, Buffer buffer, size_t size,
+                                      SendOrReceive sendOrReceiveFun, const char* funName,
+                                      int16_t event) {
+        const Buffer end = buffer + size;
 
         MAYBE_WAIT_IN_FLAKE_MODE;
 
+        // Since we didn't poll, we need to manually check to see if it was triggered. Otherwise, we
+        // may never know we should be shutting down.
+        if (fdTrigger->isTriggered()) {
+            return DEAD_OBJECT;
+        }
+
+        bool first = true;
         status_t status;
-        while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLOUT)) == OK) {
-            ssize_t writeSize =
-                    TEMP_FAILURE_RETRY(::send(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL));
-            if (writeSize < 0) {
+        do {
+            ssize_t processSize = TEMP_FAILURE_RETRY(
+                    sendOrReceiveFun(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL));
+
+            if (processSize < 0) {
                 int savedErrno = errno;
-                LOG_RPC_DETAIL("RpcTransport send(): %s", strerror(savedErrno));
-                return -savedErrno;
+
+                // Still return the error on later passes, since it would expose
+                // a problem with polling
+                if (!first || (first && savedErrno != EAGAIN && savedErrno != EWOULDBLOCK)) {
+                    LOG_RPC_DETAIL("RpcTransport %s(): %s", funName, strerror(savedErrno));
+                    return -savedErrno;
+                }
+            } else if (processSize == 0) {
+                return DEAD_OBJECT;
+            } else {
+                buffer += processSize;
+                if (buffer == end) {
+                    return OK;
+                }
             }
 
-            if (writeSize == 0) return DEAD_OBJECT;
-
-            buffer += writeSize;
-            if (buffer == end) return OK;
-        }
+            if (first) first = false;
+        } while ((status = fdTrigger->triggerablePoll(mSocket.get(), event)) == OK);
         return status;
     }
 
+    status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override {
+        return interruptableReadOrWrite(fdTrigger, reinterpret_cast<const uint8_t*>(data), size,
+                                        send, "send", POLLOUT);
+    }
+
     status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override {
-        uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
-        uint8_t* end = buffer + size;
-
-        MAYBE_WAIT_IN_FLAKE_MODE;
-
-        status_t status;
-        while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLIN)) == OK) {
-            ssize_t readSize =
-                    TEMP_FAILURE_RETRY(::recv(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL));
-            if (readSize < 0) {
-                int savedErrno = errno;
-                LOG_RPC_DETAIL("RpcTransport recv(): %s", strerror(savedErrno));
-                return -savedErrno;
-            }
-
-            if (readSize == 0) return DEAD_OBJECT; // EOF
-
-            buffer += readSize;
-            if (buffer == end) return OK;
-        }
-        return status;
+        return interruptableReadOrWrite(fdTrigger, reinterpret_cast<uint8_t*>(data), size, recv,
+                                        "recv", POLLIN);
     }
 
 private:
-    android::base::unique_fd mSocket;
+    base::unique_fd mSocket;
 };
 
 // RpcTransportCtx with TLS disabled.