auto import from //depot/cupcake/@135843
diff --git a/libcutils/mq.c b/libcutils/mq.c
new file mode 100644
index 0000000..3b65f1f
--- /dev/null
+++ b/libcutils/mq.c
@@ -0,0 +1,1357 @@
+/*
+ * Copyright (C) 2007 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "mq"
+
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <sys/uio.h>
+
+#include <cutils/array.h>
+#include <cutils/hashmap.h>
+#include <cutils/selector.h>
+
+#include "loghack.h"
+#include "buffer.h"
+
+/** Number of dead peers to remember. */
+#define PEER_HISTORY (16)
+
+typedef struct sockaddr SocketAddress;
+typedef struct sockaddr_un UnixAddress;
+
+/**
+ * Process/user/group ID. We don't use ucred directly because it's only
+ * available on Linux.
+ */
+typedef struct {
+ pid_t pid;
+ uid_t uid;
+ gid_t gid;
+} Credentials;
+
+/** Listens for bytes coming from remote peers. */
+typedef void BytesListener(Credentials credentials, char* bytes, size_t size);
+
+/** Listens for the deaths of remote peers. */
+typedef void DeathListener(pid_t pid);
+
+/** Types of packets. */
+typedef enum {
+ /** Request for a connection to another peer. */
+ CONNECTION_REQUEST,
+
+ /** A connection to another peer. */
+ CONNECTION,
+
+ /** Reports a failed connection attempt. */
+ CONNECTION_ERROR,
+
+ /** A generic packet of bytes. */
+ BYTES,
+} PacketType;
+
+typedef enum {
+ /** Reading a packet header. */
+ READING_HEADER,
+
+ /** Waiting for a connection from the master. */
+ ACCEPTING_CONNECTION,
+
+ /** Reading bytes. */
+ READING_BYTES,
+} InputState;
+
+/** A packet header. */
+// TODO: Use custom headers for master->peer, peer->master, peer->peer.
+typedef struct {
+ PacketType type;
+ union {
+ /** Packet size. Used for BYTES. */
+ size_t size;
+
+ /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */
+ Credentials credentials;
+ };
+} Header;
+
+/** A packet which will be sent to a peer. */
+typedef struct OutgoingPacket OutgoingPacket;
+struct OutgoingPacket {
+ /** Packet header. */
+ Header header;
+
+ union {
+ /** Connection to peer. Used with CONNECTION. */
+ int socket;
+
+ /** Buffer of bytes. Used with BYTES. */
+ Buffer* bytes;
+ };
+
+ /** Frees all resources associated with this packet. */
+ void (*free)(OutgoingPacket* packet);
+
+ /** Optional context. */
+ void* context;
+
+ /** Next packet in the queue. */
+ OutgoingPacket* nextPacket;
+};
+
+/** Represents a remote peer. */
+typedef struct PeerProxy PeerProxy;
+
+/** Local peer state. You typically have one peer per process. */
+typedef struct {
+ /** This peer's PID. */
+ pid_t pid;
+
+ /**
+ * Map from pid to peer proxy. The peer has a peer proxy for each remote
+ * peer it's connected to.
+ *
+ * Acquire mutex before use.
+ */
+ Hashmap* peerProxies;
+
+ /** Manages I/O. */
+ Selector* selector;
+
+ /** Used to synchronize operations with the selector thread. */
+ pthread_mutex_t mutex;
+
+ /** Is this peer the master? */
+ bool master;
+
+ /** Peer proxy for the master. */
+ PeerProxy* masterProxy;
+
+ /** Listens for packets from remote peers. */
+ BytesListener* onBytes;
+
+ /** Listens for deaths of remote peers. */
+ DeathListener* onDeath;
+
+ /** Keeps track of recently dead peers. Requires mutex. */
+ pid_t deadPeers[PEER_HISTORY];
+ size_t deadPeerCursor;
+} Peer;
+
+struct PeerProxy {
+ /** Credentials of the remote process. */
+ Credentials credentials;
+
+ /** Keeps track of data coming in from the remote peer. */
+ InputState inputState;
+ Buffer* inputBuffer;
+ PeerProxy* connecting;
+
+ /** File descriptor for this peer. */
+ SelectableFd* fd;
+
+ /**
+ * Queue of packets to be written out to the remote peer.
+ *
+ * Requires mutex.
+ */
+ // TODO: Limit queue length.
+ OutgoingPacket* currentPacket;
+ OutgoingPacket* lastPacket;
+
+ /** Used to write outgoing header. */
+ Buffer outgoingHeader;
+
+ /** True if this is the master's proxy. */
+ bool master;
+
+ /** Reference back to the local peer. */
+ Peer* peer;
+
+ /**
+ * Used in master only. Maps this peer proxy to other peer proxies to
+ * which the peer has been connected to. Maps pid to PeerProxy. Helps
+ * keep track of which connections we've sent to whom.
+ */
+ Hashmap* connections;
+};
+
+/** Server socket path. */
+static const char* MASTER_PATH = "/master.peer";
+
+/** Credentials of the master peer. */
+static const Credentials MASTER_CREDENTIALS = {0, 0, 0};
+
+/** Creates a peer proxy and adds it to the peer proxy map. */
+static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials);
+
+/** Sets the non-blocking flag on a descriptor. */
+static void setNonBlocking(int fd) {
+ int flags;
+ if ((flags = fcntl(fd, F_GETFL, 0)) < 0) {
+ LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
+ }
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) {
+ LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
+ }
+}
+
+/** Closes a fd and logs a warning if the close fails. */
+static void closeWithWarning(int fd) {
+ int result = close(fd);
+ if (result == -1) {
+ LOGW("close() error: %s", strerror(errno));
+ }
+}
+
+/** Hashes pid_t keys. */
+static int pidHash(void* key) {
+ pid_t* pid = (pid_t*) key;
+ return (int) (*pid);
+}
+
+/** Compares pid_t keys. */
+static bool pidEquals(void* keyA, void* keyB) {
+ pid_t* a = (pid_t*) keyA;
+ pid_t* b = (pid_t*) keyB;
+ return *a == *b;
+}
+
+/** Gets the master address. Not thread safe. */
+static UnixAddress* getMasterAddress() {
+ static UnixAddress masterAddress;
+ static bool initialized = false;
+ if (initialized == false) {
+ masterAddress.sun_family = AF_LOCAL;
+ strcpy(masterAddress.sun_path, MASTER_PATH);
+ initialized = true;
+ }
+ return &masterAddress;
+}
+
+/** Gets exclusive access to the peer for this thread. */
+static void peerLock(Peer* peer) {
+ pthread_mutex_lock(&peer->mutex);
+}
+
+/** Releases exclusive access to the peer. */
+static void peerUnlock(Peer* peer) {
+ pthread_mutex_unlock(&peer->mutex);
+}
+
+/** Frees a simple, i.e. header-only, outgoing packet. */
+static void outgoingPacketFree(OutgoingPacket* packet) {
+ LOGD("Freeing outgoing packet.");
+ free(packet);
+}
+
+/**
+ * Prepare to read a new packet from the peer.
+ */
+static void peerProxyExpectHeader(PeerProxy* peerProxy) {
+ peerProxy->inputState = READING_HEADER;
+ bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header));
+}
+
+/** Sets up the buffer for the outgoing header. */
+static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) {
+ peerProxy->outgoingHeader.data
+ = (char*) &(peerProxy->currentPacket->header);
+ peerProxy->outgoingHeader.size = sizeof(Header);
+ bufferPrepareForWrite(&peerProxy->outgoingHeader);
+}
+
+/** Adds a packet to the end of the queue. Callers must have the mutex. */
+static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy,
+ OutgoingPacket* newPacket) {
+ newPacket->nextPacket = NULL; // Just in case.
+ if (peerProxy->currentPacket == NULL) {
+ // The queue is empty.
+ peerProxy->currentPacket = newPacket;
+ peerProxy->lastPacket = newPacket;
+
+ peerProxyPrepareOutgoingHeader(peerProxy);
+ } else {
+ peerProxy->lastPacket->nextPacket = newPacket;
+ }
+}
+
+/** Takes the peer lock and enqueues the given packet. */
+static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy,
+ OutgoingPacket* newPacket) {
+ Peer* peer = peerProxy->peer;
+ peerLock(peer);
+ peerProxyEnqueueOutgoingPacket(peerProxy, newPacket);
+ peerUnlock(peer);
+}
+
+/**
+ * Frees current packet and moves to the next one. Returns true if there is
+ * a next packet or false if the queue is empty.
+ */
+static bool peerProxyNextPacket(PeerProxy* peerProxy) {
+ Peer* peer = peerProxy->peer;
+ peerLock(peer);
+
+ OutgoingPacket* current = peerProxy->currentPacket;
+
+ if (current == NULL) {
+ // The queue is already empty.
+ peerUnlock(peer);
+ return false;
+ }
+
+ OutgoingPacket* next = current->nextPacket;
+ peerProxy->currentPacket = next;
+ current->nextPacket = NULL;
+ current->free(current);
+ if (next == NULL) {
+ // The queue is empty.
+ peerProxy->lastPacket = NULL;
+ peerUnlock(peer);
+ return false;
+ } else {
+ peerUnlock(peer);
+ peerProxyPrepareOutgoingHeader(peerProxy);
+
+ // TODO: Start writing next packet? It would reduce the number of
+ // system calls, but we could also starve other peers.
+ return true;
+ }
+}
+
+/**
+ * Checks whether a peer died recently.
+ */
+static bool peerIsDead(Peer* peer, pid_t pid) {
+ size_t i;
+ for (i = 0; i < PEER_HISTORY; i++) {
+ pid_t deadPeer = peer->deadPeers[i];
+ if (deadPeer == 0) {
+ return false;
+ }
+ if (deadPeer == pid) {
+ return true;
+ }
+ }
+ return false;
+}
+
+/**
+ * Cleans up connection information.
+ */
+static bool peerProxyRemoveConnection(void* key, void* value, void* context) {
+ PeerProxy* deadPeer = (PeerProxy*) context;
+ PeerProxy* otherPeer = (PeerProxy*) value;
+ hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid));
+ return true;
+}
+
+/**
+ * Called when the peer dies.
+ */
+static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) {
+ if (errnoIsSet) {
+ LOGI("Peer %d died. errno: %s", peerProxy->credentials.pid,
+ strerror(errno));
+ } else {
+ LOGI("Peer %d died.", peerProxy->credentials.pid);
+ }
+
+ // If we lost the master, we're up a creek. We can't let this happen.
+ if (peerProxy->master) {
+ LOG_ALWAYS_FATAL("Lost connection to master.");
+ }
+
+ Peer* localPeer = peerProxy->peer;
+ pid_t pid = peerProxy->credentials.pid;
+
+ peerLock(localPeer);
+
+ // Remember for awhile that the peer died.
+ localPeer->deadPeers[localPeer->deadPeerCursor]
+ = peerProxy->credentials.pid;
+ localPeer->deadPeerCursor++;
+ if (localPeer->deadPeerCursor == PEER_HISTORY) {
+ localPeer->deadPeerCursor = 0;
+ }
+
+ // Remove from peer map.
+ hashmapRemove(localPeer->peerProxies, &pid);
+
+ // External threads can no longer get to this peer proxy, so we don't
+ // need the lock anymore.
+ peerUnlock(localPeer);
+
+ // Remove the fd from the selector.
+ if (peerProxy->fd != NULL) {
+ peerProxy->fd->remove = true;
+ }
+
+ // Clear outgoing packet queue.
+ while (peerProxyNextPacket(peerProxy)) {}
+
+ bufferFree(peerProxy->inputBuffer);
+
+ // This only applies to the master.
+ if (peerProxy->connections != NULL) {
+ // We can't leave these other maps pointing to freed memory.
+ hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection,
+ peerProxy);
+ hashmapFree(peerProxy->connections);
+ }
+
+ // Invoke death listener.
+ localPeer->onDeath(pid);
+
+ // Free the peer proxy itself.
+ free(peerProxy);
+}
+
+static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) {
+ if (errno == EINTR) {
+ // Log interruptions but otherwise ignore them.
+ LOGW("%s() interrupted.", functionName);
+ } else if (errno == EAGAIN) {
+ LOGD("EWOULDBLOCK");
+ // Ignore.
+ } else {
+ LOGW("Error returned by %s().", functionName);
+ peerProxyKill(peerProxy, true);
+ }
+}
+
+/**
+ * Buffers output sent to a peer. May be called multiple times until the entire
+ * buffer is filled. Returns true when the buffer is empty.
+ */
+static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) {
+ ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd);
+ if (size < 0) {
+ peerProxyHandleError(peerProxy, "write");
+ return false;
+ } else {
+ return bufferWriteComplete(outgoing);
+ }
+}
+
+/** Writes packet bytes to peer. */
+static void peerProxyWriteBytes(PeerProxy* peerProxy) {
+ Buffer* buffer = peerProxy->currentPacket->bytes;
+ if (peerProxyWriteFromBuffer(peerProxy, buffer)) {
+ LOGD("Bytes written.");
+ peerProxyNextPacket(peerProxy);
+ }
+}
+
+/** Sends a socket to the peer. */
+static void peerProxyWriteConnection(PeerProxy* peerProxy) {
+ int socket = peerProxy->currentPacket->socket;
+
+ // Why does sending and receiving fds have to be such a PITA?
+ struct msghdr msg;
+ struct iovec iov[1];
+
+ union {
+ struct cmsghdr cm;
+ char control[CMSG_SPACE(sizeof(int))];
+ } control_un;
+
+ struct cmsghdr *cmptr;
+
+ msg.msg_control = control_un.control;
+ msg.msg_controllen = sizeof(control_un.control);
+ cmptr = CMSG_FIRSTHDR(&msg);
+ cmptr->cmsg_len = CMSG_LEN(sizeof(int));
+ cmptr->cmsg_level = SOL_SOCKET;
+ cmptr->cmsg_type = SCM_RIGHTS;
+
+ // Store the socket in the message.
+ *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket;
+
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ iov[0].iov_base = "";
+ iov[0].iov_len = 1;
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1;
+
+ ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0);
+
+ if (result < 0) {
+ peerProxyHandleError(peerProxy, "sendmsg");
+ } else {
+ // Success. Queue up the next packet.
+ peerProxyNextPacket(peerProxy);
+
+ }
+}
+
+/**
+ * Writes some outgoing data.
+ */
+static void peerProxyWrite(SelectableFd* fd) {
+ // TODO: Try to write header and body with one system call.
+
+ PeerProxy* peerProxy = (PeerProxy*) fd->data;
+ OutgoingPacket* current = peerProxy->currentPacket;
+
+ if (current == NULL) {
+ // We have nothing left to write.
+ return;
+ }
+
+ // Write the header.
+ Buffer* outgoingHeader = &peerProxy->outgoingHeader;
+ bool headerWritten = bufferWriteComplete(outgoingHeader);
+ if (!headerWritten) {
+ LOGD("Writing header...");
+ headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader);
+ if (headerWritten) {
+ LOGD("Header written.");
+ }
+ }
+
+ // Write body.
+ if (headerWritten) {
+ PacketType type = current->header.type;
+ switch (type) {
+ case CONNECTION:
+ peerProxyWriteConnection(peerProxy);
+ break;
+ case BYTES:
+ peerProxyWriteBytes(peerProxy);
+ break;
+ case CONNECTION_REQUEST:
+ case CONNECTION_ERROR:
+ // These packets consist solely of a header.
+ peerProxyNextPacket(peerProxy);
+ break;
+ default:
+ LOG_ALWAYS_FATAL("Unknown packet type: %d", type);
+ }
+ }
+}
+
+/**
+ * Sets up a peer proxy's fd before we try to select() it.
+ */
+static void peerProxyBeforeSelect(SelectableFd* fd) {
+ LOGD("Before select...");
+
+ PeerProxy* peerProxy = (PeerProxy*) fd->data;
+
+ peerLock(peerProxy->peer);
+ bool hasPackets = peerProxy->currentPacket != NULL;
+ peerUnlock(peerProxy->peer);
+
+ if (hasPackets) {
+ LOGD("Packets found. Setting onWritable().");
+
+ fd->onWritable = &peerProxyWrite;
+ } else {
+ // We have nothing to write.
+ fd->onWritable = NULL;
+ }
+}
+
+/** Prepare to read bytes from the peer. */
+static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) {
+ LOGD("Expecting %d bytes.", header->size);
+
+ peerProxy->inputState = READING_BYTES;
+ if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) {
+ LOGW("Couldn't allocate memory for incoming data. Size: %u",
+ (unsigned int) header->size);
+
+ // TODO: Ignore the packet and log a warning?
+ peerProxyKill(peerProxy, false);
+ }
+}
+
+/**
+ * Gets a peer proxy for the given ID. Creates a peer proxy if necessary.
+ * Sends a connection request to the master if desired.
+ *
+ * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died
+ * or ENOMEM if memory couldn't be allocated.
+ */
+static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid,
+ bool requestConnection) {
+ if (pid == peer->pid) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ if (peerIsDead(peer, pid)) {
+ errno = EHOSTDOWN;
+ return NULL;
+ }
+
+ PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid);
+ if (peerProxy != NULL) {
+ return peerProxy;
+ }
+
+ // If this is the master peer, we already know about all peers.
+ if (peer->master) {
+ errno = EHOSTDOWN;
+ return NULL;
+ }
+
+ // Try to create a peer proxy.
+ Credentials credentials;
+ credentials.pid = pid;
+
+ // Fake gid and uid until we have the real thing. The real creds are
+ // filled in by masterProxyExpectConnection(). These fake creds will
+ // never be exposed to the user.
+ credentials.uid = 0;
+ credentials.gid = 0;
+
+ // Make sure we can allocate the connection request packet.
+ OutgoingPacket* packet = NULL;
+ if (requestConnection) {
+ packet = calloc(1, sizeof(OutgoingPacket));
+ if (packet == NULL) {
+ errno = ENOMEM;
+ return NULL;
+ }
+
+ packet->header.type = CONNECTION_REQUEST;
+ packet->header.credentials = credentials;
+ packet->free = &outgoingPacketFree;
+ }
+
+ peerProxy = peerProxyCreate(peer, credentials);
+ if (peerProxy == NULL) {
+ free(packet);
+ errno = ENOMEM;
+ return NULL;
+ } else {
+ // Send a connection request to the master.
+ if (requestConnection) {
+ PeerProxy* masterProxy = peer->masterProxy;
+ peerProxyEnqueueOutgoingPacket(masterProxy, packet);
+ }
+
+ return peerProxy;
+ }
+}
+
+/**
+ * Switches the master peer proxy into a state where it's waiting for a
+ * connection from the master.
+ */
+static void masterProxyExpectConnection(PeerProxy* masterProxy,
+ Header* header) {
+ // TODO: Restructure things so we don't need this check.
+ // Verify that this really is the master.
+ if (!masterProxy->master) {
+ LOGW("Non-master process %d tried to send us a connection.",
+ masterProxy->credentials.pid);
+ // Kill off the evil peer.
+ peerProxyKill(masterProxy, false);
+ return;
+ }
+
+ masterProxy->inputState = ACCEPTING_CONNECTION;
+ Peer* localPeer = masterProxy->peer;
+
+ // Create a peer proxy so we have somewhere to stash the creds.
+ // See if we already have a proxy set up.
+ pid_t pid = header->credentials.pid;
+ peerLock(localPeer);
+ PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false);
+ if (peerProxy == NULL) {
+ LOGW("Peer proxy creation failed: %s", strerror(errno));
+ } else {
+ // Fill in full credentials.
+ peerProxy->credentials = header->credentials;
+ }
+ peerUnlock(localPeer);
+
+ // Keep track of which peer proxy we're accepting a connection for.
+ masterProxy->connecting = peerProxy;
+}
+
+/**
+ * Reads input from a peer process.
+ */
+static void peerProxyRead(SelectableFd* fd);
+
+/** Sets up fd callbacks. */
+static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) {
+ peerProxy->fd = fd;
+ fd->data = peerProxy;
+ fd->onReadable = &peerProxyRead;
+ fd->beforeSelect = &peerProxyBeforeSelect;
+
+ // Make the socket non-blocking.
+ setNonBlocking(fd->fd);
+}
+
+/**
+ * Accepts a connection sent by the master proxy.
+ */
+static void masterProxyAcceptConnection(PeerProxy* masterProxy) {
+ struct msghdr msg;
+ struct iovec iov[1];
+ ssize_t size;
+ char ignored;
+ int incomingFd;
+
+ // TODO: Reuse code which writes the connection. Who the heck designed
+ // this API anyway?
+ union {
+ struct cmsghdr cm;
+ char control[CMSG_SPACE(sizeof(int))];
+ } control_un;
+ struct cmsghdr *cmptr;
+ msg.msg_control = control_un.control;
+ msg.msg_controllen = sizeof(control_un.control);
+
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+
+ // We sent 1 byte of data so we can detect EOF.
+ iov[0].iov_base = &ignored;
+ iov[0].iov_len = 1;
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1;
+
+ size = recvmsg(masterProxy->fd->fd, &msg, 0);
+ if (size < 0) {
+ if (errno == EINTR) {
+ // Log interruptions but otherwise ignore them.
+ LOGW("recvmsg() interrupted.");
+ return;
+ } else if (errno == EAGAIN) {
+ // Keep waiting for the connection.
+ return;
+ } else {
+ LOG_ALWAYS_FATAL("Error reading connection from master: %s",
+ strerror(errno));
+ }
+ } else if (size == 0) {
+ // EOF.
+ LOG_ALWAYS_FATAL("Received EOF from master.");
+ }
+
+ // Extract fd from message.
+ if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
+ && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
+ if (cmptr->cmsg_level != SOL_SOCKET) {
+ LOG_ALWAYS_FATAL("Expected SOL_SOCKET.");
+ }
+ if (cmptr->cmsg_type != SCM_RIGHTS) {
+ LOG_ALWAYS_FATAL("Expected SCM_RIGHTS.");
+ }
+ incomingFd = *((int*) CMSG_DATA(cmptr));
+ } else {
+ LOG_ALWAYS_FATAL("Expected fd.");
+ }
+
+ // The peer proxy this connection is for.
+ PeerProxy* peerProxy = masterProxy->connecting;
+ if (peerProxy == NULL) {
+ LOGW("Received connection for unknown peer.");
+ closeWithWarning(incomingFd);
+ } else {
+ Peer* peer = masterProxy->peer;
+
+ SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd);
+ if (selectableFd == NULL) {
+ LOGW("Error adding fd to selector for %d.",
+ peerProxy->credentials.pid);
+ closeWithWarning(incomingFd);
+ peerProxyKill(peerProxy, false);
+ }
+
+ peerProxySetFd(peerProxy, selectableFd);
+ }
+
+ peerProxyExpectHeader(masterProxy);
+}
+
+/**
+ * Frees an outgoing packet containing a connection.
+ */
+static void outgoingPacketFreeSocket(OutgoingPacket* packet) {
+ closeWithWarning(packet->socket);
+ outgoingPacketFree(packet);
+}
+
+/**
+ * Connects two known peers.
+ */
+static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) {
+ int sockets[2];
+ int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets);
+ if (result == -1) {
+ LOGW("socketpair() error: %s", strerror(errno));
+ // TODO: Send CONNECTION_FAILED packets to peers.
+ return;
+ }
+
+ OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket));
+ OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket));
+ if (packetA == NULL || packetB == NULL) {
+ free(packetA);
+ free(packetB);
+ LOGW("malloc() error. Failed to tell process %d that process %d is"
+ " dead.", peerA->credentials.pid, peerB->credentials.pid);
+ return;
+ }
+
+ packetA->header.type = CONNECTION;
+ packetB->header.type = CONNECTION;
+
+ packetA->header.credentials = peerB->credentials;
+ packetB->header.credentials = peerA->credentials;
+
+ packetA->socket = sockets[0];
+ packetB->socket = sockets[1];
+
+ packetA->free = &outgoingPacketFreeSocket;
+ packetB->free = &outgoingPacketFreeSocket;
+
+ peerLock(peerA->peer);
+ peerProxyEnqueueOutgoingPacket(peerA, packetA);
+ peerProxyEnqueueOutgoingPacket(peerB, packetB);
+ peerUnlock(peerA->peer);
+}
+
+/**
+ * Informs a peer that the peer they're trying to connect to couldn't be
+ * found.
+ */
+static void masterReportConnectionError(PeerProxy* peerProxy,
+ Credentials credentials) {
+ OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
+ if (packet == NULL) {
+ LOGW("malloc() error. Failed to tell process %d that process %d is"
+ " dead.", peerProxy->credentials.pid, credentials.pid);
+ return;
+ }
+
+ packet->header.type = CONNECTION_ERROR;
+ packet->header.credentials = credentials;
+ packet->free = &outgoingPacketFree;
+
+ peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet);
+}
+
+/**
+ * Handles a request to be connected to another peer.
+ */
+static void masterHandleConnectionRequest(PeerProxy* peerProxy,
+ Header* header) {
+ Peer* master = peerProxy->peer;
+ pid_t targetPid = header->credentials.pid;
+ if (!hashmapContainsKey(peerProxy->connections, &targetPid)) {
+ // We haven't connected these peers yet.
+ PeerProxy* targetPeer
+ = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid);
+ if (targetPeer == NULL) {
+ // Unknown process.
+ masterReportConnectionError(peerProxy, header->credentials);
+ } else {
+ masterConnectPeers(peerProxy, targetPeer);
+ }
+ }
+
+ // This packet is complete. Get ready for the next one.
+ peerProxyExpectHeader(peerProxy);
+}
+
+/**
+ * The master told us this peer is dead.
+ */
+static void masterProxyHandleConnectionError(PeerProxy* masterProxy,
+ Header* header) {
+ Peer* peer = masterProxy->peer;
+
+ // Look up the peer proxy.
+ pid_t pid = header->credentials.pid;
+ PeerProxy* peerProxy = NULL;
+ peerLock(peer);
+ peerProxy = hashmapGet(peer->peerProxies, &pid);
+ peerUnlock(peer);
+
+ if (peerProxy != NULL) {
+ LOGI("Couldn't connect to %d.", pid);
+ peerProxyKill(peerProxy, false);
+ } else {
+ LOGW("Peer proxy for %d not found. This shouldn't happen.", pid);
+ }
+
+ peerProxyExpectHeader(masterProxy);
+}
+
+/**
+ * Handles a packet header.
+ */
+static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) {
+ switch (header->type) {
+ case CONNECTION_REQUEST:
+ masterHandleConnectionRequest(peerProxy, header);
+ break;
+ case CONNECTION:
+ masterProxyExpectConnection(peerProxy, header);
+ break;
+ case CONNECTION_ERROR:
+ masterProxyHandleConnectionError(peerProxy, header);
+ break;
+ case BYTES:
+ peerProxyExpectBytes(peerProxy, header);
+ break;
+ default:
+ LOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid,
+ header->type);
+ peerProxyKill(peerProxy, false);
+ }
+}
+
+/**
+ * Buffers input sent by peer. May be called multiple times until the entire
+ * buffer is filled. Returns true when the buffer is full.
+ */
+static bool peerProxyBufferInput(PeerProxy* peerProxy) {
+ Buffer* in = peerProxy->inputBuffer;
+ ssize_t size = bufferRead(in, peerProxy->fd->fd);
+ if (size < 0) {
+ peerProxyHandleError(peerProxy, "read");
+ return false;
+ } else if (size == 0) {
+ // EOF.
+ LOGI("EOF");
+ peerProxyKill(peerProxy, false);
+ return false;
+ } else if (bufferReadComplete(in)) {
+ // We're done!
+ return true;
+ } else {
+ // Continue reading.
+ return false;
+ }
+}
+
+/**
+ * Reads input from a peer process.
+ */
+static void peerProxyRead(SelectableFd* fd) {
+ LOGD("Reading...");
+ PeerProxy* peerProxy = (PeerProxy*) fd->data;
+ int state = peerProxy->inputState;
+ Buffer* in = peerProxy->inputBuffer;
+ switch (state) {
+ case READING_HEADER:
+ if (peerProxyBufferInput(peerProxy)) {
+ LOGD("Header read.");
+ // We've read the complete header.
+ Header* header = (Header*) in->data;
+ peerProxyHandleHeader(peerProxy, header);
+ }
+ break;
+ case READING_BYTES:
+ LOGD("Reading bytes...");
+ if (peerProxyBufferInput(peerProxy)) {
+ LOGD("Bytes read.");
+ // We have the complete packet. Notify bytes listener.
+ peerProxy->peer->onBytes(peerProxy->credentials,
+ in->data, in->size);
+
+ // Get ready for the next packet.
+ peerProxyExpectHeader(peerProxy);
+ }
+ break;
+ case ACCEPTING_CONNECTION:
+ masterProxyAcceptConnection(peerProxy);
+ break;
+ default:
+ LOG_ALWAYS_FATAL("Unknown state: %d", state);
+ }
+}
+
+static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) {
+ PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy));
+ if (peerProxy == NULL) {
+ return NULL;
+ }
+
+ peerProxy->inputBuffer = bufferCreate(sizeof(Header));
+ if (peerProxy->inputBuffer == NULL) {
+ free(peerProxy);
+ return NULL;
+ }
+
+ peerProxy->peer = peer;
+ peerProxy->credentials = credentials;
+
+ // Initial state == expecting a header.
+ peerProxyExpectHeader(peerProxy);
+
+ // Add this proxy to the map. Make sure the key points to the stable memory
+ // inside of the peer proxy itself.
+ pid_t* pid = &(peerProxy->credentials.pid);
+ hashmapPut(peer->peerProxies, pid, peerProxy);
+ return peerProxy;
+}
+
+/** Accepts a connection to the master peer. */
+static void masterAcceptConnection(SelectableFd* listenerFd) {
+ // Accept connection.
+ int socket = accept(listenerFd->fd, NULL, NULL);
+ if (socket == -1) {
+ LOGW("accept() error: %s", strerror(errno));
+ return;
+ }
+
+ LOGD("Accepted connection as fd %d.", socket);
+
+ // Get credentials.
+ Credentials credentials;
+ struct ucred ucredentials;
+ socklen_t credentialsSize = sizeof(struct ucred);
+ int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED,
+ &ucredentials, &credentialsSize);
+ // We might want to verify credentialsSize.
+ if (result == -1) {
+ LOGW("getsockopt() error: %s", strerror(errno));
+ closeWithWarning(socket);
+ return;
+ }
+
+ // Copy values into our own structure so we know we have the types right.
+ credentials.pid = ucredentials.pid;
+ credentials.uid = ucredentials.uid;
+ credentials.gid = ucredentials.gid;
+
+ LOGI("Accepted connection from process %d.", credentials.pid);
+
+ Peer* masterPeer = (Peer*) listenerFd->data;
+
+ peerLock(masterPeer);
+
+ // Make sure we don't already have a connection from that process.
+ PeerProxy* peerProxy
+ = hashmapGet(masterPeer->peerProxies, &credentials.pid);
+ if (peerProxy != NULL) {
+ peerUnlock(masterPeer);
+ LOGW("Alread connected to process %d.", credentials.pid);
+ closeWithWarning(socket);
+ return;
+ }
+
+ // Add connection to the selector.
+ SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket);
+ if (socketFd == NULL) {
+ peerUnlock(masterPeer);
+ LOGW("malloc() failed.");
+ closeWithWarning(socket);
+ return;
+ }
+
+ // Create a peer proxy.
+ peerProxy = peerProxyCreate(masterPeer, credentials);
+ peerUnlock(masterPeer);
+ if (peerProxy == NULL) {
+ LOGW("malloc() failed.");
+ socketFd->remove = true;
+ closeWithWarning(socket);
+ }
+ peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals);
+ peerProxySetFd(peerProxy, socketFd);
+}
+
+/**
+ * Creates the local peer.
+ */
+static Peer* peerCreate() {
+ Peer* peer = calloc(1, sizeof(Peer));
+ if (peer == NULL) {
+ LOG_ALWAYS_FATAL("malloc() error.");
+ }
+ peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals);
+ peer->selector = selectorCreate();
+
+ pthread_mutexattr_t attributes;
+ if (pthread_mutexattr_init(&attributes) != 0) {
+ LOG_ALWAYS_FATAL("pthread_mutexattr_init() error.");
+ }
+ if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) {
+ LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error.");
+ }
+ if (pthread_mutex_init(&peer->mutex, &attributes) != 0) {
+ LOG_ALWAYS_FATAL("pthread_mutex_init() error.");
+ }
+
+ peer->pid = getpid();
+ return peer;
+}
+
+/** The local peer. */
+static Peer* localPeer;
+
+/** Frees a packet of bytes. */
+static void outgoingPacketFreeBytes(OutgoingPacket* packet) {
+ LOGD("Freeing outgoing packet.");
+ bufferFree(packet->bytes);
+ free(packet);
+}
+
+/**
+ * Sends a packet of bytes to a remote peer. Returns 0 on success.
+ *
+ * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
+ * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
+ * to EINVAL if pid is the same as the local pid.
+ */
+int peerSendBytes(pid_t pid, const char* bytes, size_t size) {
+ Peer* peer = localPeer;
+ assert(peer != NULL);
+
+ OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
+ if (packet == NULL) {
+ errno = ENOMEM;
+ return -1;
+ }
+
+ Buffer* copy = bufferCreate(size);
+ if (copy == NULL) {
+ free(packet);
+ errno = ENOMEM;
+ return -1;
+ }
+
+ // Copy data.
+ memcpy(copy->data, bytes, size);
+ copy->size = size;
+
+ packet->bytes = copy;
+ packet->header.type = BYTES;
+ packet->header.size = size;
+ packet->free = outgoingPacketFreeBytes;
+ bufferPrepareForWrite(packet->bytes);
+
+ peerLock(peer);
+
+ PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
+ if (peerProxy == NULL) {
+ // The peer is already dead or we couldn't alloc memory. Either way,
+ // errno is set.
+ peerUnlock(peer);
+ packet->free(packet);
+ return -1;
+ } else {
+ peerProxyEnqueueOutgoingPacket(peerProxy, packet);
+ peerUnlock(peer);
+ selectorWakeUp(peer->selector);
+ return 0;
+ }
+}
+
+/** Keeps track of how to free shared bytes. */
+typedef struct {
+ void (*free)(void* context);
+ void* context;
+} SharedBytesFreer;
+
+/** Frees shared bytes. */
+static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) {
+ SharedBytesFreer* sharedBytesFreer
+ = (SharedBytesFreer*) packet->context;
+ sharedBytesFreer->free(sharedBytesFreer->context);
+ free(sharedBytesFreer);
+ free(packet);
+}
+
+/**
+ * Sends a packet of bytes to a remote peer without copying the bytes. Calls
+ * free() with context after the bytes have been sent.
+ *
+ * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
+ * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
+ * to EINVAL if pid is the same as the local pid.
+ */
+int peerSendSharedBytes(pid_t pid, char* bytes, size_t size,
+ void (*free)(void* context), void* context) {
+ Peer* peer = localPeer;
+ assert(peer != NULL);
+
+ OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
+ if (packet == NULL) {
+ errno = ENOMEM;
+ return -1;
+ }
+
+ Buffer* wrapper = bufferWrap(bytes, size, size);
+ if (wrapper == NULL) {
+ free(packet);
+ errno = ENOMEM;
+ return -1;
+ }
+
+ SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer));
+ if (sharedBytesFreer == NULL) {
+ free(packet);
+ free(wrapper);
+ errno = ENOMEM;
+ return -1;
+ }
+ sharedBytesFreer->free = free;
+ sharedBytesFreer->context = context;
+
+ packet->bytes = wrapper;
+ packet->context = sharedBytesFreer;
+ packet->header.type = BYTES;
+ packet->header.size = size;
+ packet->free = &outgoingPacketFreeSharedBytes;
+ bufferPrepareForWrite(packet->bytes);
+
+ peerLock(peer);
+
+ PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
+ if (peerProxy == NULL) {
+ // The peer is already dead or we couldn't alloc memory. Either way,
+ // errno is set.
+ peerUnlock(peer);
+ packet->free(packet);
+ return -1;
+ } else {
+ peerProxyEnqueueOutgoingPacket(peerProxy, packet);
+ peerUnlock(peer);
+ selectorWakeUp(peer->selector);
+ return 0;
+ }
+}
+
+/**
+ * Starts the master peer. The master peer differs from other peers in that
+ * it is responsible for connecting the other peers. You can only have one
+ * master peer.
+ *
+ * Goes into an I/O loop and does not return.
+ */
+void masterPeerInitialize(BytesListener* bytesListener,
+ DeathListener* deathListener) {
+ // Create and bind socket.
+ int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
+ if (listenerSocket == -1) {
+ LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
+ }
+ unlink(MASTER_PATH);
+ int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(),
+ sizeof(UnixAddress));
+ if (result == -1) {
+ LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno));
+ }
+
+ LOGD("Listener socket: %d", listenerSocket);
+
+ // Queue up to 16 connections.
+ result = listen(listenerSocket, 16);
+ if (result != 0) {
+ LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno));
+ }
+
+ // Make socket non-blocking.
+ setNonBlocking(listenerSocket);
+
+ // Create the peer for this process. Fail if we already have one.
+ if (localPeer != NULL) {
+ LOG_ALWAYS_FATAL("Peer is already initialized.");
+ }
+ localPeer = peerCreate();
+ if (localPeer == NULL) {
+ LOG_ALWAYS_FATAL("malloc() failed.");
+ }
+ localPeer->master = true;
+ localPeer->onBytes = bytesListener;
+ localPeer->onDeath = deathListener;
+
+ // Make listener socket selectable.
+ SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket);
+ if (listenerFd == NULL) {
+ LOG_ALWAYS_FATAL("malloc() error.");
+ }
+ listenerFd->data = localPeer;
+ listenerFd->onReadable = &masterAcceptConnection;
+}
+
+/**
+ * Starts a local peer.
+ *
+ * Goes into an I/O loop and does not return.
+ */
+void peerInitialize(BytesListener* bytesListener,
+ DeathListener* deathListener) {
+ // Connect to master peer.
+ int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
+ if (masterSocket == -1) {
+ LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
+ }
+ int result = connect(masterSocket, (SocketAddress*) getMasterAddress(),
+ sizeof(UnixAddress));
+ if (result != 0) {
+ LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno));
+ }
+
+ // Create the peer for this process. Fail if we already have one.
+ if (localPeer != NULL) {
+ LOG_ALWAYS_FATAL("Peer is already initialized.");
+ }
+ localPeer = peerCreate();
+ if (localPeer == NULL) {
+ LOG_ALWAYS_FATAL("malloc() failed.");
+ }
+ localPeer->onBytes = bytesListener;
+ localPeer->onDeath = deathListener;
+
+ // Make connection selectable.
+ SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket);
+ if (masterFd == NULL) {
+ LOG_ALWAYS_FATAL("malloc() error.");
+ }
+
+ // Create a peer proxy for the master peer.
+ PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS);
+ if (masterProxy == NULL) {
+ LOG_ALWAYS_FATAL("malloc() error.");
+ }
+ peerProxySetFd(masterProxy, masterFd);
+ masterProxy->master = true;
+ localPeer->masterProxy = masterProxy;
+}
+
+/** Starts the master peer I/O loop. Doesn't return. */
+void peerLoop() {
+ assert(localPeer != NULL);
+
+ // Start selector.
+ selectorLoop(localPeer->selector);
+}
+