RpcBinder: Add AF_UNIX socketpair transport

Add support for running RpcBinder over unnamed Unix domain sockets
created by socketpair(). This is useful e.g. between parent/child
processes.

The implementation uses the initial socket pair only to create more
socket pairs for individual connections. This creates a natural mapping
to syscalls used on sockets bound to an address:

    socket()                socketpair()
    bind()                  n/a (preconnected)
    connect()               sendmsg()
    listen()                recvmsg()

Bug: 250685929
Test: atest binderRpcTest
Change-Id: Id4ff3946ddcfefe3592eb1149c61582f7369aa29
diff --git a/libs/binder/RpcTransportRaw.cpp b/libs/binder/RpcTransportRaw.cpp
index 65e8fac..1912d14 100644
--- a/libs/binder/RpcTransportRaw.cpp
+++ b/libs/binder/RpcTransportRaw.cpp
@@ -23,6 +23,7 @@
 #include <binder/RpcTransportRaw.h>
 
 #include "FdTrigger.h"
+#include "OS.h"
 #include "RpcState.h"
 #include "RpcTransportUtils.h"
 
@@ -30,9 +31,6 @@
 
 namespace {
 
-// Linux kernel supports up to 253 (from SCM_MAX_FD) for unix sockets.
-constexpr size_t kMaxFdsPerMsg = 253;
-
 // RpcTransport with TLS disabled.
 class RpcTransportRaw : public RpcTransport {
 public:
@@ -63,57 +61,9 @@
             override {
         bool sentFds = false;
         auto send = [&](iovec* iovs, int niovs) -> ssize_t {
-            if (ancillaryFds != nullptr && !ancillaryFds->empty() && !sentFds) {
-                if (ancillaryFds->size() > kMaxFdsPerMsg) {
-                    // This shouldn't happen because we check the FD count in RpcState.
-                    ALOGE("Saw too many file descriptors in RpcTransportCtxRaw: %zu (max is %zu). "
-                          "Aborting session.",
-                          ancillaryFds->size(), kMaxFdsPerMsg);
-                    errno = EINVAL;
-                    return -1;
-                }
-
-                // CMSG_DATA is not necessarily aligned, so we copy the FDs into a buffer and then
-                // use memcpy.
-                int fds[kMaxFdsPerMsg];
-                for (size_t i = 0; i < ancillaryFds->size(); i++) {
-                    fds[i] = std::visit([](const auto& fd) { return fd.get(); },
-                                        ancillaryFds->at(i));
-                }
-                const size_t fdsByteSize = sizeof(int) * ancillaryFds->size();
-
-                alignas(struct cmsghdr) char msgControlBuf[CMSG_SPACE(sizeof(int) * kMaxFdsPerMsg)];
-
-                msghdr msg{
-                        .msg_iov = iovs,
-                        .msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(niovs),
-                        .msg_control = msgControlBuf,
-                        .msg_controllen = sizeof(msgControlBuf),
-                };
-
-                cmsghdr* cmsg = CMSG_FIRSTHDR(&msg);
-                cmsg->cmsg_level = SOL_SOCKET;
-                cmsg->cmsg_type = SCM_RIGHTS;
-                cmsg->cmsg_len = CMSG_LEN(fdsByteSize);
-                memcpy(CMSG_DATA(cmsg), fds, fdsByteSize);
-
-                msg.msg_controllen = CMSG_SPACE(fdsByteSize);
-
-                ssize_t processedSize = TEMP_FAILURE_RETRY(
-                        sendmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL | MSG_CMSG_CLOEXEC));
-                if (processedSize > 0) {
-                    sentFds = true;
-                }
-                return processedSize;
-            }
-
-            msghdr msg{
-                    .msg_iov = iovs,
-                    // posix uses int, glibc uses size_t.  niovs is a
-                    // non-negative int and can be cast to either.
-                    .msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(niovs),
-            };
-            return TEMP_FAILURE_RETRY(sendmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL));
+            int ret = sendMessageOnSocket(mSocket, iovs, niovs, sentFds ? nullptr : ancillaryFds);
+            sentFds |= ret > 0;
+            return ret;
         };
         return interruptableReadOrWrite(mSocket, fdTrigger, iovs, niovs, send, "sendmsg", POLLOUT,
                                         altPoll);
@@ -124,54 +74,7 @@
             const std::optional<android::base::function_ref<status_t()>>& altPoll,
             std::vector<std::variant<base::unique_fd, base::borrowed_fd>>* ancillaryFds) override {
         auto recv = [&](iovec* iovs, int niovs) -> ssize_t {
-            if (ancillaryFds != nullptr) {
-                int fdBuffer[kMaxFdsPerMsg];
-                alignas(struct cmsghdr) char msgControlBuf[CMSG_SPACE(sizeof(fdBuffer))];
-
-                msghdr msg{
-                        .msg_iov = iovs,
-                        .msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(niovs),
-                        .msg_control = msgControlBuf,
-                        .msg_controllen = sizeof(msgControlBuf),
-                };
-                ssize_t processSize =
-                        TEMP_FAILURE_RETRY(recvmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL));
-                if (processSize < 0) {
-                    return -1;
-                }
-
-                for (cmsghdr* cmsg = CMSG_FIRSTHDR(&msg); cmsg != nullptr;
-                     cmsg = CMSG_NXTHDR(&msg, cmsg)) {
-                    if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
-                        // NOTE: It is tempting to reinterpret_cast, but cmsg(3) explicitly asks
-                        // application devs to memcpy the data to ensure memory alignment.
-                        size_t dataLen = cmsg->cmsg_len - CMSG_LEN(0);
-                        LOG_ALWAYS_FATAL_IF(dataLen > sizeof(fdBuffer)); // sanity check
-                        memcpy(fdBuffer, CMSG_DATA(cmsg), dataLen);
-                        size_t fdCount = dataLen / sizeof(int);
-                        ancillaryFds->reserve(ancillaryFds->size() + fdCount);
-                        for (size_t i = 0; i < fdCount; i++) {
-                            ancillaryFds->emplace_back(base::unique_fd(fdBuffer[i]));
-                        }
-                        break;
-                    }
-                }
-
-                if (msg.msg_flags & MSG_CTRUNC) {
-                    ALOGE("msg was truncated. Aborting session.");
-                    errno = EPIPE;
-                    return -1;
-                }
-
-                return processSize;
-            }
-            msghdr msg{
-                    .msg_iov = iovs,
-                    // posix uses int, glibc uses size_t.  niovs is a
-                    // non-negative int and can be cast to either.
-                    .msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(niovs),
-            };
-            return TEMP_FAILURE_RETRY(recvmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL));
+            return receiveMessageFromSocket(mSocket, iovs, niovs, ancillaryFds);
         };
         return interruptableReadOrWrite(mSocket, fdTrigger, iovs, niovs, recv, "recvmsg", POLLIN,
                                         altPoll);