auto import from //depot/cupcake/@135843
diff --git a/libcutils/mq.c b/libcutils/mq.c
new file mode 100644
index 0000000..3b65f1f
--- /dev/null
+++ b/libcutils/mq.c
@@ -0,0 +1,1357 @@
+/*
+ * Copyright (C) 2007 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#define LOG_TAG "mq"
+
+#include <assert.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <pthread.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <sys/uio.h>
+
+#include <cutils/array.h>
+#include <cutils/hashmap.h>
+#include <cutils/selector.h>
+
+#include "loghack.h"
+#include "buffer.h"
+
+/** Number of dead peers to remember. */
+#define PEER_HISTORY (16)
+
+typedef struct sockaddr SocketAddress;
+typedef struct sockaddr_un UnixAddress;
+
+/** 
+ * Process/user/group ID. We don't use ucred directly because it's only 
+ * available on Linux.
+ */
+typedef struct {
+    pid_t pid;
+    uid_t uid;
+    gid_t gid;
+} Credentials;
+
+/** Listens for bytes coming from remote peers. */
+typedef void BytesListener(Credentials credentials, char* bytes, size_t size);
+
+/** Listens for the deaths of remote peers. */
+typedef void DeathListener(pid_t pid);
+
+/** Types of packets. */
+typedef enum {
+    /** Request for a connection to another peer. */
+    CONNECTION_REQUEST, 
+
+    /** A connection to another peer. */
+    CONNECTION, 
+
+    /** Reports a failed connection attempt. */
+    CONNECTION_ERROR, 
+
+    /** A generic packet of bytes. */
+    BYTES,
+} PacketType;
+
+typedef enum {
+    /** Reading a packet header. */
+    READING_HEADER,
+
+    /** Waiting for a connection from the master. */
+    ACCEPTING_CONNECTION,
+
+    /** Reading bytes. */
+    READING_BYTES,
+} InputState;
+
+/** A packet header. */
+// TODO: Use custom headers for master->peer, peer->master, peer->peer.
+typedef struct {
+    PacketType type;
+    union {
+        /** Packet size. Used for BYTES. */
+        size_t size;
+
+        /** Credentials. Used for CONNECTION and CONNECTION_REQUEST. */
+        Credentials credentials; 
+    };
+} Header;
+
+/** A packet which will be sent to a peer. */
+typedef struct OutgoingPacket OutgoingPacket;
+struct OutgoingPacket {
+    /** Packet header. */
+    Header header; 
+    
+    union {
+        /** Connection to peer. Used with CONNECTION. */
+        int socket;
+        
+        /** Buffer of bytes. Used with BYTES. */
+        Buffer* bytes;
+    };
+
+    /** Frees all resources associated with this packet. */
+    void (*free)(OutgoingPacket* packet);
+   
+    /** Optional context. */
+    void* context;
+    
+    /** Next packet in the queue. */
+    OutgoingPacket* nextPacket;
+};
+
+/** Represents a remote peer. */
+typedef struct PeerProxy PeerProxy;
+
+/** Local peer state. You typically have one peer per process. */
+typedef struct {
+    /** This peer's PID. */
+    pid_t pid;
+    
+    /** 
+     * Map from pid to peer proxy. The peer has a peer proxy for each remote
+     * peer it's connected to. 
+     *
+     * Acquire mutex before use.
+     */
+    Hashmap* peerProxies;
+
+    /** Manages I/O. */
+    Selector* selector;
+   
+    /** Used to synchronize operations with the selector thread. */
+    pthread_mutex_t mutex; 
+
+    /** Is this peer the master? */
+    bool master;
+
+    /** Peer proxy for the master. */
+    PeerProxy* masterProxy;
+    
+    /** Listens for packets from remote peers. */
+    BytesListener* onBytes;
+    
+    /** Listens for deaths of remote peers. */
+    DeathListener* onDeath;
+    
+    /** Keeps track of recently dead peers. Requires mutex. */
+    pid_t deadPeers[PEER_HISTORY];
+    size_t deadPeerCursor;
+} Peer;
+
+struct PeerProxy {
+    /** Credentials of the remote process. */
+    Credentials credentials;
+
+    /** Keeps track of data coming in from the remote peer. */
+    InputState inputState;
+    Buffer* inputBuffer;
+    PeerProxy* connecting;
+
+    /** File descriptor for this peer. */
+    SelectableFd* fd;
+
+    /** 
+     * Queue of packets to be written out to the remote peer.
+     *
+     * Requires mutex.
+     */
+    // TODO: Limit queue length.
+    OutgoingPacket* currentPacket;
+    OutgoingPacket* lastPacket;
+    
+    /** Used to write outgoing header. */
+    Buffer outgoingHeader;
+    
+    /** True if this is the master's proxy. */
+    bool master;
+
+    /** Reference back to the local peer. */
+    Peer* peer;
+
+    /**
+     * Used in master only. Maps this peer proxy to other peer proxies to
+     * which the peer has been connected to. Maps pid to PeerProxy. Helps
+     * keep track of which connections we've sent to whom.
+     */
+    Hashmap* connections;
+};
+
+/** Server socket path. */
+static const char* MASTER_PATH = "/master.peer";
+
+/** Credentials of the master peer. */
+static const Credentials MASTER_CREDENTIALS = {0, 0, 0};
+
+/** Creates a peer proxy and adds it to the peer proxy map. */
+static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials);
+
+/** Sets the non-blocking flag on a descriptor. */
+static void setNonBlocking(int fd) {
+    int flags;
+    if ((flags = fcntl(fd, F_GETFL, 0)) < 0) { 
+        LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
+    } 
+    if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { 
+        LOG_ALWAYS_FATAL("fcntl() error: %s", strerror(errno));
+    } 
+}
+
+/** Closes a fd and logs a warning if the close fails. */
+static void closeWithWarning(int fd) {
+    int result = close(fd);
+    if (result == -1) {
+        LOGW("close() error: %s", strerror(errno));
+    }
+}
+
+/** Hashes pid_t keys. */
+static int pidHash(void* key) {
+    pid_t* pid = (pid_t*) key;
+    return (int) (*pid);
+}
+
+/** Compares pid_t keys. */
+static bool pidEquals(void* keyA, void* keyB) {
+    pid_t* a = (pid_t*) keyA;
+    pid_t* b = (pid_t*) keyB;
+    return *a == *b;
+}
+
+/** Gets the master address. Not thread safe. */
+static UnixAddress* getMasterAddress() {
+    static UnixAddress masterAddress;
+    static bool initialized = false;
+    if (initialized == false) {
+        masterAddress.sun_family = AF_LOCAL;
+        strcpy(masterAddress.sun_path, MASTER_PATH); 
+        initialized = true;
+    }
+    return &masterAddress;
+}
+
+/** Gets exclusive access to the peer for this thread. */
+static void peerLock(Peer* peer) {
+    pthread_mutex_lock(&peer->mutex);
+}
+
+/** Releases exclusive access to the peer. */
+static void peerUnlock(Peer* peer) {
+    pthread_mutex_unlock(&peer->mutex);
+}
+
+/** Frees a simple, i.e. header-only, outgoing packet. */
+static void outgoingPacketFree(OutgoingPacket* packet) {
+    LOGD("Freeing outgoing packet.");
+	free(packet);
+}
+
+/**
+ * Prepare to read a new packet from the peer.
+ */
+static void peerProxyExpectHeader(PeerProxy* peerProxy) {
+    peerProxy->inputState = READING_HEADER;
+    bufferPrepareForRead(peerProxy->inputBuffer, sizeof(Header));
+}
+
+/** Sets up the buffer for the outgoing header. */
+static void peerProxyPrepareOutgoingHeader(PeerProxy* peerProxy) {
+    peerProxy->outgoingHeader.data 
+        = (char*) &(peerProxy->currentPacket->header);
+    peerProxy->outgoingHeader.size = sizeof(Header);
+    bufferPrepareForWrite(&peerProxy->outgoingHeader);
+}
+
+/** Adds a packet to the end of the queue. Callers must have the mutex. */
+static void peerProxyEnqueueOutgoingPacket(PeerProxy* peerProxy,
+        OutgoingPacket* newPacket) {
+    newPacket->nextPacket = NULL; // Just in case.
+    if (peerProxy->currentPacket == NULL) {
+        // The queue is empty.
+        peerProxy->currentPacket = newPacket;
+        peerProxy->lastPacket = newPacket;
+        
+        peerProxyPrepareOutgoingHeader(peerProxy); 
+    } else {
+        peerProxy->lastPacket->nextPacket = newPacket;
+    }
+}
+
+/** Takes the peer lock and enqueues the given packet. */
+static void peerProxyLockAndEnqueueOutgoingPacket(PeerProxy* peerProxy,
+        OutgoingPacket* newPacket) {
+    Peer* peer = peerProxy->peer;
+    peerLock(peer);
+    peerProxyEnqueueOutgoingPacket(peerProxy, newPacket);
+    peerUnlock(peer);
+}
+
+/** 
+ * Frees current packet and moves to the next one. Returns true if there is
+ * a next packet or false if the queue is empty.
+ */
+static bool peerProxyNextPacket(PeerProxy* peerProxy) {
+    Peer* peer = peerProxy->peer;
+    peerLock(peer);
+    
+    OutgoingPacket* current = peerProxy->currentPacket;
+    
+    if (current == NULL) {
+    	// The queue is already empty.
+        peerUnlock(peer);
+        return false;
+    }
+    
+    OutgoingPacket* next = current->nextPacket;
+    peerProxy->currentPacket = next;
+    current->nextPacket = NULL;
+    current->free(current);
+    if (next == NULL) {
+        // The queue is empty.
+        peerProxy->lastPacket = NULL;
+        peerUnlock(peer);
+        return false;
+    } else {
+        peerUnlock(peer);
+        peerProxyPrepareOutgoingHeader(peerProxy); 
+
+        // TODO: Start writing next packet? It would reduce the number of
+        // system calls, but we could also starve other peers.
+        return true;
+    }
+}
+
+/**
+ * Checks whether a peer died recently.
+ */
+static bool peerIsDead(Peer* peer, pid_t pid) {
+    size_t i;
+    for (i = 0; i < PEER_HISTORY; i++) {
+        pid_t deadPeer = peer->deadPeers[i];
+        if (deadPeer == 0) {
+            return false;
+        }
+        if (deadPeer == pid) {
+            return true;
+        }
+    }
+    return false;
+}
+
+/**
+ * Cleans up connection information.
+ */
+static bool peerProxyRemoveConnection(void* key, void* value, void* context) {
+    PeerProxy* deadPeer = (PeerProxy*) context;
+    PeerProxy* otherPeer = (PeerProxy*) value;
+    hashmapRemove(otherPeer->connections, &(deadPeer->credentials.pid));
+    return true;
+}
+
+/**
+ * Called when the peer dies.
+ */
+static void peerProxyKill(PeerProxy* peerProxy, bool errnoIsSet) {
+    if (errnoIsSet) {
+        LOGI("Peer %d died. errno: %s", peerProxy->credentials.pid, 
+                strerror(errno));
+    } else {
+        LOGI("Peer %d died.", peerProxy->credentials.pid);
+    }
+    
+    // If we lost the master, we're up a creek. We can't let this happen.
+    if (peerProxy->master) {    
+        LOG_ALWAYS_FATAL("Lost connection to master.");
+    }
+
+    Peer* localPeer = peerProxy->peer;
+    pid_t pid = peerProxy->credentials.pid;
+    
+    peerLock(localPeer);
+    
+    // Remember for awhile that the peer died.
+    localPeer->deadPeers[localPeer->deadPeerCursor] 
+        = peerProxy->credentials.pid;
+    localPeer->deadPeerCursor++;
+    if (localPeer->deadPeerCursor == PEER_HISTORY) {
+        localPeer->deadPeerCursor = 0;
+    }
+  
+    // Remove from peer map.
+    hashmapRemove(localPeer->peerProxies, &pid);
+    
+    // External threads can no longer get to this peer proxy, so we don't 
+    // need the lock anymore.
+    peerUnlock(localPeer);
+    
+    // Remove the fd from the selector.
+    if (peerProxy->fd != NULL) {
+        peerProxy->fd->remove = true;
+    }
+
+    // Clear outgoing packet queue.
+    while (peerProxyNextPacket(peerProxy)) {}
+
+    bufferFree(peerProxy->inputBuffer);
+
+    // This only applies to the master.
+    if (peerProxy->connections != NULL) {
+        // We can't leave these other maps pointing to freed memory.
+        hashmapForEach(peerProxy->connections, &peerProxyRemoveConnection, 
+                peerProxy);
+        hashmapFree(peerProxy->connections);
+    }
+
+    // Invoke death listener.
+    localPeer->onDeath(pid);
+
+    // Free the peer proxy itself.
+    free(peerProxy);
+}
+
+static void peerProxyHandleError(PeerProxy* peerProxy, char* functionName) {
+    if (errno == EINTR) {
+        // Log interruptions but otherwise ignore them.
+        LOGW("%s() interrupted.", functionName);
+    } else if (errno == EAGAIN) {
+    	LOGD("EWOULDBLOCK");
+        // Ignore.
+    } else {
+        LOGW("Error returned by %s().", functionName);
+        peerProxyKill(peerProxy, true);
+    }
+}
+
+/**
+ * Buffers output sent to a peer. May be called multiple times until the entire
+ * buffer is filled. Returns true when the buffer is empty.
+ */
+static bool peerProxyWriteFromBuffer(PeerProxy* peerProxy, Buffer* outgoing) {
+    ssize_t size = bufferWrite(outgoing, peerProxy->fd->fd);
+    if (size < 0) {
+        peerProxyHandleError(peerProxy, "write");
+        return false;
+    } else {
+        return bufferWriteComplete(outgoing);
+    }
+}
+
+/** Writes packet bytes to peer. */
+static void peerProxyWriteBytes(PeerProxy* peerProxy) {	
+	Buffer* buffer = peerProxy->currentPacket->bytes;
+	if (peerProxyWriteFromBuffer(peerProxy, buffer)) {
+        LOGD("Bytes written.");
+        peerProxyNextPacket(peerProxy);
+    }    
+}
+
+/** Sends a socket to the peer. */
+static void peerProxyWriteConnection(PeerProxy* peerProxy) {
+    int socket = peerProxy->currentPacket->socket;
+
+    // Why does sending and receiving fds have to be such a PITA?
+    struct msghdr msg;
+    struct iovec iov[1];
+    
+    union {
+        struct cmsghdr cm;
+        char control[CMSG_SPACE(sizeof(int))];
+    } control_un;
+   
+    struct cmsghdr *cmptr;
+    
+    msg.msg_control = control_un.control;
+    msg.msg_controllen = sizeof(control_un.control);
+    cmptr = CMSG_FIRSTHDR(&msg);
+    cmptr->cmsg_len = CMSG_LEN(sizeof(int));
+    cmptr->cmsg_level = SOL_SOCKET;
+    cmptr->cmsg_type = SCM_RIGHTS;
+   
+    // Store the socket in the message.
+    *((int *) CMSG_DATA(cmptr)) = peerProxy->currentPacket->socket;
+
+    msg.msg_name = NULL;
+    msg.msg_namelen = 0;
+    iov[0].iov_base = "";
+    iov[0].iov_len = 1;
+    msg.msg_iov = iov;
+    msg.msg_iovlen = 1;
+
+    ssize_t result = sendmsg(peerProxy->fd->fd, &msg, 0);
+    
+    if (result < 0) {
+        peerProxyHandleError(peerProxy, "sendmsg");
+    } else {
+        // Success. Queue up the next packet.
+        peerProxyNextPacket(peerProxy);
+        
+    }
+}
+
+/**
+ * Writes some outgoing data.
+ */
+static void peerProxyWrite(SelectableFd* fd) {
+    // TODO: Try to write header and body with one system call.
+
+    PeerProxy* peerProxy = (PeerProxy*) fd->data;
+    OutgoingPacket* current = peerProxy->currentPacket;
+    
+    if (current == NULL) {
+        // We have nothing left to write.
+        return;
+    }
+
+    // Write the header.
+    Buffer* outgoingHeader = &peerProxy->outgoingHeader;
+    bool headerWritten = bufferWriteComplete(outgoingHeader);
+    if (!headerWritten) {
+        LOGD("Writing header...");
+        headerWritten = peerProxyWriteFromBuffer(peerProxy, outgoingHeader);
+        if (headerWritten) {
+            LOGD("Header written.");
+        }
+    }    
+
+    // Write body.
+    if (headerWritten) {
+        PacketType type = current->header.type;
+        switch (type) {
+            case CONNECTION:
+                peerProxyWriteConnection(peerProxy);
+                break;
+            case BYTES:
+                peerProxyWriteBytes(peerProxy);
+                break;
+            case CONNECTION_REQUEST:
+            case CONNECTION_ERROR:
+                // These packets consist solely of a header.
+                peerProxyNextPacket(peerProxy);
+                break;
+            default:
+                LOG_ALWAYS_FATAL("Unknown packet type: %d", type); 
+        }
+    }
+}
+
+/**
+ * Sets up a peer proxy's fd before we try to select() it.
+ */
+static void peerProxyBeforeSelect(SelectableFd* fd) {
+    LOGD("Before select...");
+
+    PeerProxy* peerProxy = (PeerProxy*) fd->data;
+  
+    peerLock(peerProxy->peer);
+    bool hasPackets = peerProxy->currentPacket != NULL;
+    peerUnlock(peerProxy->peer);
+    
+    if (hasPackets) {
+        LOGD("Packets found. Setting onWritable().");
+            
+        fd->onWritable = &peerProxyWrite;
+    } else {
+        // We have nothing to write.
+        fd->onWritable = NULL;
+    }
+}
+
+/** Prepare to read bytes from the peer. */
+static void peerProxyExpectBytes(PeerProxy* peerProxy, Header* header) {
+	LOGD("Expecting %d bytes.", header->size);
+	
+	peerProxy->inputState = READING_BYTES;
+    if (bufferPrepareForRead(peerProxy->inputBuffer, header->size) == -1) {
+        LOGW("Couldn't allocate memory for incoming data. Size: %u",
+                (unsigned int) header->size);    
+        
+        // TODO: Ignore the packet and log a warning?
+        peerProxyKill(peerProxy, false);
+    }
+}
+
+/**
+ * Gets a peer proxy for the given ID. Creates a peer proxy if necessary.
+ * Sends a connection request to the master if desired.
+ *
+ * Returns NULL if an error occurs. Sets errno to EHOSTDOWN if the peer died
+ * or ENOMEM if memory couldn't be allocated.
+ */
+static PeerProxy* peerProxyGetOrCreate(Peer* peer, pid_t pid, 
+        bool requestConnection) {
+    if (pid == peer->pid) {
+        errno = EINVAL;
+        return NULL;
+    }
+    
+    if (peerIsDead(peer, pid)) {
+        errno = EHOSTDOWN;
+        return NULL;
+    }
+    
+    PeerProxy* peerProxy = hashmapGet(peer->peerProxies, &pid);
+    if (peerProxy != NULL) {
+        return peerProxy;
+    }
+
+    // If this is the master peer, we already know about all peers.
+    if (peer->master) {
+        errno = EHOSTDOWN;
+        return NULL;
+    }
+
+    // Try to create a peer proxy.
+    Credentials credentials;
+    credentials.pid = pid;
+
+    // Fake gid and uid until we have the real thing. The real creds are
+    // filled in by masterProxyExpectConnection(). These fake creds will
+    // never be exposed to the user.
+    credentials.uid = 0;
+    credentials.gid = 0;
+
+    // Make sure we can allocate the connection request packet.
+    OutgoingPacket* packet = NULL;
+    if (requestConnection) {
+        packet = calloc(1, sizeof(OutgoingPacket));
+        if (packet == NULL) {
+            errno = ENOMEM;
+            return NULL;
+        }
+
+        packet->header.type = CONNECTION_REQUEST;
+        packet->header.credentials = credentials;
+        packet->free = &outgoingPacketFree;
+    }
+    
+    peerProxy = peerProxyCreate(peer, credentials);
+    if (peerProxy == NULL) {
+        free(packet);
+        errno = ENOMEM;
+        return NULL;
+    } else {
+        // Send a connection request to the master.
+        if (requestConnection) {
+            PeerProxy* masterProxy = peer->masterProxy;
+            peerProxyEnqueueOutgoingPacket(masterProxy, packet);
+        }
+        
+        return peerProxy;
+    }
+}
+
+/**
+ * Switches the master peer proxy into a state where it's waiting for a
+ * connection from the master.
+ */
+static void masterProxyExpectConnection(PeerProxy* masterProxy,
+        Header* header) {
+    // TODO: Restructure things so we don't need this check.
+    // Verify that this really is the master.
+    if (!masterProxy->master) {
+        LOGW("Non-master process %d tried to send us a connection.", 
+            masterProxy->credentials.pid);
+        // Kill off the evil peer.
+        peerProxyKill(masterProxy, false);
+        return;
+    }
+    
+    masterProxy->inputState = ACCEPTING_CONNECTION;
+    Peer* localPeer = masterProxy->peer;
+   
+    // Create a peer proxy so we have somewhere to stash the creds.
+    // See if we already have a proxy set up.
+    pid_t pid = header->credentials.pid;
+    peerLock(localPeer);
+    PeerProxy* peerProxy = peerProxyGetOrCreate(localPeer, pid, false);
+    if (peerProxy == NULL) {
+        LOGW("Peer proxy creation failed: %s", strerror(errno));
+    } else {
+        // Fill in full credentials.
+        peerProxy->credentials = header->credentials;
+    }
+    peerUnlock(localPeer);
+    
+    // Keep track of which peer proxy we're accepting a connection for.
+    masterProxy->connecting = peerProxy;
+}
+
+/**
+ * Reads input from a peer process.
+ */
+static void peerProxyRead(SelectableFd* fd);
+
+/** Sets up fd callbacks. */
+static void peerProxySetFd(PeerProxy* peerProxy, SelectableFd* fd) {
+    peerProxy->fd = fd;
+    fd->data = peerProxy;
+    fd->onReadable = &peerProxyRead;
+    fd->beforeSelect = &peerProxyBeforeSelect;
+    
+    // Make the socket non-blocking.
+    setNonBlocking(fd->fd);
+}
+
+/**
+ * Accepts a connection sent by the master proxy.
+ */
+static void masterProxyAcceptConnection(PeerProxy* masterProxy) {
+    struct msghdr msg;
+    struct iovec iov[1];
+    ssize_t size;
+    char ignored;
+    int incomingFd;
+    
+    // TODO: Reuse code which writes the connection. Who the heck designed
+    // this API anyway?
+    union {
+        struct cmsghdr cm;
+        char control[CMSG_SPACE(sizeof(int))];
+    } control_un;
+    struct cmsghdr *cmptr;
+    msg.msg_control = control_un.control;
+    msg.msg_controllen = sizeof(control_un.control);
+
+    msg.msg_name = NULL;
+    msg.msg_namelen = 0;
+
+    // We sent 1 byte of data so we can detect EOF.
+    iov[0].iov_base = &ignored;
+    iov[0].iov_len = 1;
+    msg.msg_iov = iov;
+    msg.msg_iovlen = 1;
+
+    size = recvmsg(masterProxy->fd->fd, &msg, 0);
+    if (size < 0) {
+        if (errno == EINTR) {
+            // Log interruptions but otherwise ignore them.
+            LOGW("recvmsg() interrupted.");
+            return;
+        } else if (errno == EAGAIN) {
+            // Keep waiting for the connection.
+            return;
+        } else {
+            LOG_ALWAYS_FATAL("Error reading connection from master: %s",
+                    strerror(errno));
+        }
+    } else if (size == 0) {
+        // EOF.
+        LOG_ALWAYS_FATAL("Received EOF from master.");
+    }
+
+    // Extract fd from message.
+    if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL 
+            && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
+        if (cmptr->cmsg_level != SOL_SOCKET) {
+            LOG_ALWAYS_FATAL("Expected SOL_SOCKET.");
+        }
+        if (cmptr->cmsg_type != SCM_RIGHTS) {
+            LOG_ALWAYS_FATAL("Expected SCM_RIGHTS.");
+        }
+        incomingFd = *((int*) CMSG_DATA(cmptr));
+    } else {
+        LOG_ALWAYS_FATAL("Expected fd.");
+    }
+    
+    // The peer proxy this connection is for.
+    PeerProxy* peerProxy = masterProxy->connecting;
+    if (peerProxy == NULL) {
+        LOGW("Received connection for unknown peer.");
+        closeWithWarning(incomingFd);
+    } else {
+        Peer* peer = masterProxy->peer;
+        
+        SelectableFd* selectableFd = selectorAdd(peer->selector, incomingFd);
+        if (selectableFd == NULL) {
+            LOGW("Error adding fd to selector for %d.",
+                    peerProxy->credentials.pid);
+            closeWithWarning(incomingFd);
+            peerProxyKill(peerProxy, false);
+        }
+
+        peerProxySetFd(peerProxy, selectableFd);
+    }
+    
+    peerProxyExpectHeader(masterProxy);
+}
+
+/**
+ * Frees an outgoing packet containing a connection.
+ */
+static void outgoingPacketFreeSocket(OutgoingPacket* packet) {
+    closeWithWarning(packet->socket);
+    outgoingPacketFree(packet);
+}
+
+/**
+ * Connects two known peers.
+ */
+static void masterConnectPeers(PeerProxy* peerA, PeerProxy* peerB) {
+    int sockets[2];
+    int result = socketpair(AF_LOCAL, SOCK_STREAM, 0, sockets);
+    if (result == -1) {
+        LOGW("socketpair() error: %s", strerror(errno));
+        // TODO: Send CONNECTION_FAILED packets to peers.
+        return;
+    }
+
+    OutgoingPacket* packetA = calloc(1, sizeof(OutgoingPacket));
+    OutgoingPacket* packetB = calloc(1, sizeof(OutgoingPacket));
+    if (packetA == NULL || packetB == NULL) {
+        free(packetA);
+        free(packetB);
+        LOGW("malloc() error. Failed to tell process %d that process %d is"
+                " dead.", peerA->credentials.pid, peerB->credentials.pid);
+        return;
+    }
+   
+    packetA->header.type = CONNECTION;
+    packetB->header.type = CONNECTION;
+    
+    packetA->header.credentials = peerB->credentials;
+    packetB->header.credentials = peerA->credentials;
+
+    packetA->socket = sockets[0];
+    packetB->socket = sockets[1];
+
+    packetA->free = &outgoingPacketFreeSocket;
+    packetB->free = &outgoingPacketFreeSocket;
+   
+    peerLock(peerA->peer);
+    peerProxyEnqueueOutgoingPacket(peerA, packetA);   
+    peerProxyEnqueueOutgoingPacket(peerB, packetB);   
+    peerUnlock(peerA->peer);
+}
+
+/**
+ * Informs a peer that the peer they're trying to connect to couldn't be
+ * found.
+ */
+static void masterReportConnectionError(PeerProxy* peerProxy,
+        Credentials credentials) {
+    OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
+    if (packet == NULL) {
+        LOGW("malloc() error. Failed to tell process %d that process %d is"
+                " dead.", peerProxy->credentials.pid, credentials.pid);
+        return;
+    }
+   
+    packet->header.type = CONNECTION_ERROR;
+    packet->header.credentials = credentials;
+    packet->free = &outgoingPacketFree;
+    
+    peerProxyLockAndEnqueueOutgoingPacket(peerProxy, packet);   
+}
+
+/**
+ * Handles a request to be connected to another peer.
+ */
+static void masterHandleConnectionRequest(PeerProxy* peerProxy, 
+        Header* header) {
+    Peer* master = peerProxy->peer;
+    pid_t targetPid = header->credentials.pid;
+    if (!hashmapContainsKey(peerProxy->connections, &targetPid)) {
+        // We haven't connected these peers yet.
+        PeerProxy* targetPeer 
+            = (PeerProxy*) hashmapGet(master->peerProxies, &targetPid);
+        if (targetPeer == NULL) {
+            // Unknown process.
+            masterReportConnectionError(peerProxy, header->credentials);
+        } else {
+            masterConnectPeers(peerProxy, targetPeer);
+        }
+    }
+    
+    // This packet is complete. Get ready for the next one.
+    peerProxyExpectHeader(peerProxy);
+}
+
+/**
+ * The master told us this peer is dead.
+ */
+static void masterProxyHandleConnectionError(PeerProxy* masterProxy,
+        Header* header) {
+    Peer* peer = masterProxy->peer;
+    
+    // Look up the peer proxy.
+    pid_t pid = header->credentials.pid;
+    PeerProxy* peerProxy = NULL;
+    peerLock(peer);
+    peerProxy = hashmapGet(peer->peerProxies, &pid);
+    peerUnlock(peer);
+
+    if (peerProxy != NULL) {
+        LOGI("Couldn't connect to %d.", pid);
+        peerProxyKill(peerProxy, false);
+    } else {
+        LOGW("Peer proxy for %d not found. This shouldn't happen.", pid);
+    }
+    
+    peerProxyExpectHeader(masterProxy);
+}
+
+/**
+ * Handles a packet header.
+ */
+static void peerProxyHandleHeader(PeerProxy* peerProxy, Header* header) {
+    switch (header->type) {
+        case CONNECTION_REQUEST:
+            masterHandleConnectionRequest(peerProxy, header);
+            break;
+        case CONNECTION:
+            masterProxyExpectConnection(peerProxy, header);
+            break;
+        case CONNECTION_ERROR:
+            masterProxyHandleConnectionError(peerProxy, header);
+            break;
+        case BYTES:    
+            peerProxyExpectBytes(peerProxy, header);
+            break;
+        default:
+            LOGW("Invalid packet type from %d: %d", peerProxy->credentials.pid, 
+                    header->type);
+            peerProxyKill(peerProxy, false);
+    }
+}
+
+/**
+ * Buffers input sent by peer. May be called multiple times until the entire
+ * buffer is filled. Returns true when the buffer is full.
+ */
+static bool peerProxyBufferInput(PeerProxy* peerProxy) {
+    Buffer* in = peerProxy->inputBuffer;
+    ssize_t size = bufferRead(in, peerProxy->fd->fd);
+    if (size < 0) {
+        peerProxyHandleError(peerProxy, "read");
+        return false;
+    } else if (size == 0) {
+        // EOF.
+    	LOGI("EOF");
+        peerProxyKill(peerProxy, false);
+        return false;
+    } else if (bufferReadComplete(in)) {
+        // We're done!
+        return true;
+    } else {
+        // Continue reading.
+        return false;
+    }
+}
+
+/**
+ * Reads input from a peer process.
+ */
+static void peerProxyRead(SelectableFd* fd) {
+    LOGD("Reading...");
+    PeerProxy* peerProxy = (PeerProxy*) fd->data;
+    int state = peerProxy->inputState;
+    Buffer* in = peerProxy->inputBuffer;
+    switch (state) {
+        case READING_HEADER:
+            if (peerProxyBufferInput(peerProxy)) {
+                LOGD("Header read.");
+                // We've read the complete header.
+                Header* header = (Header*) in->data;
+                peerProxyHandleHeader(peerProxy, header);
+            }
+            break;
+        case READING_BYTES:
+            LOGD("Reading bytes...");
+            if (peerProxyBufferInput(peerProxy)) {
+                LOGD("Bytes read.");
+                // We have the complete packet. Notify bytes listener.
+                peerProxy->peer->onBytes(peerProxy->credentials,
+                    in->data, in->size);
+                        
+                // Get ready for the next packet.
+                peerProxyExpectHeader(peerProxy);
+            }
+            break;
+        case ACCEPTING_CONNECTION:
+            masterProxyAcceptConnection(peerProxy);
+            break;
+        default:
+            LOG_ALWAYS_FATAL("Unknown state: %d", state);
+    }
+}
+
+static PeerProxy* peerProxyCreate(Peer* peer, Credentials credentials) {
+    PeerProxy* peerProxy = calloc(1, sizeof(PeerProxy));
+    if (peerProxy == NULL) {
+        return NULL;
+    }
+   
+    peerProxy->inputBuffer = bufferCreate(sizeof(Header));
+    if (peerProxy->inputBuffer == NULL) {
+        free(peerProxy);
+        return NULL;
+    }
+
+    peerProxy->peer = peer;
+    peerProxy->credentials = credentials;
+
+    // Initial state == expecting a header.
+    peerProxyExpectHeader(peerProxy); 
+  
+    // Add this proxy to the map. Make sure the key points to the stable memory
+    // inside of the peer proxy itself.
+    pid_t* pid = &(peerProxy->credentials.pid);
+    hashmapPut(peer->peerProxies, pid, peerProxy);
+    return peerProxy;
+}
+
+/** Accepts a connection to the master peer. */
+static void masterAcceptConnection(SelectableFd* listenerFd) {
+    // Accept connection.
+    int socket = accept(listenerFd->fd, NULL, NULL);
+    if (socket == -1) {
+        LOGW("accept() error: %s", strerror(errno));
+        return;
+    }
+    
+    LOGD("Accepted connection as fd %d.", socket);
+    
+    // Get credentials.
+    Credentials credentials;
+    struct ucred ucredentials;
+    socklen_t credentialsSize = sizeof(struct ucred);
+    int result = getsockopt(socket, SOL_SOCKET, SO_PEERCRED, 
+                &ucredentials, &credentialsSize);
+    // We might want to verify credentialsSize.
+    if (result == -1) {
+        LOGW("getsockopt() error: %s", strerror(errno));
+        closeWithWarning(socket);
+        return;
+    }
+
+    // Copy values into our own structure so we know we have the types right.
+    credentials.pid = ucredentials.pid;
+    credentials.uid = ucredentials.uid;
+    credentials.gid = ucredentials.gid;
+    
+    LOGI("Accepted connection from process %d.", credentials.pid);
+   
+    Peer* masterPeer = (Peer*) listenerFd->data;
+    
+    peerLock(masterPeer);
+    
+    // Make sure we don't already have a connection from that process.
+    PeerProxy* peerProxy 
+        = hashmapGet(masterPeer->peerProxies, &credentials.pid);
+    if (peerProxy != NULL) {
+        peerUnlock(masterPeer);
+        LOGW("Alread connected to process %d.", credentials.pid);
+        closeWithWarning(socket);
+        return;
+    }
+   
+    // Add connection to the selector.
+    SelectableFd* socketFd = selectorAdd(masterPeer->selector, socket);
+    if (socketFd == NULL) {
+        peerUnlock(masterPeer);
+        LOGW("malloc() failed.");
+        closeWithWarning(socket);
+        return;
+    }
+
+    // Create a peer proxy.
+    peerProxy = peerProxyCreate(masterPeer, credentials);
+    peerUnlock(masterPeer);
+    if (peerProxy == NULL) {
+        LOGW("malloc() failed.");
+        socketFd->remove = true;
+        closeWithWarning(socket);
+    }
+    peerProxy->connections = hashmapCreate(10, &pidHash, &pidEquals);
+    peerProxySetFd(peerProxy, socketFd);
+}
+
+/**
+ * Creates the local peer.
+ */
+static Peer* peerCreate() {
+    Peer* peer = calloc(1, sizeof(Peer));
+    if (peer == NULL) {
+        LOG_ALWAYS_FATAL("malloc() error.");
+    }
+    peer->peerProxies = hashmapCreate(10, &pidHash, &pidEquals);
+    peer->selector = selectorCreate();
+    
+    pthread_mutexattr_t attributes;
+    if (pthread_mutexattr_init(&attributes) != 0) {
+        LOG_ALWAYS_FATAL("pthread_mutexattr_init() error.");
+    }
+    if (pthread_mutexattr_settype(&attributes, PTHREAD_MUTEX_RECURSIVE) != 0) {
+        LOG_ALWAYS_FATAL("pthread_mutexattr_settype() error.");
+    }
+    if (pthread_mutex_init(&peer->mutex, &attributes) != 0) {
+        LOG_ALWAYS_FATAL("pthread_mutex_init() error.");
+    }
+    
+    peer->pid = getpid();
+    return peer;
+}
+
+/** The local peer. */
+static Peer* localPeer;
+
+/** Frees a packet of bytes. */
+static void outgoingPacketFreeBytes(OutgoingPacket* packet) {
+    LOGD("Freeing outgoing packet.");
+    bufferFree(packet->bytes);
+    free(packet);
+}
+
+/**
+ * Sends a packet of bytes to a remote peer. Returns 0 on success.
+ *
+ * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
+ * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
+ * to EINVAL if pid is the same as the local pid.
+ */
+int peerSendBytes(pid_t pid, const char* bytes, size_t size) {
+	Peer* peer = localPeer;
+    assert(peer != NULL);
+
+    OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
+    if (packet == NULL) {
+        errno = ENOMEM;
+        return -1;
+    }
+
+    Buffer* copy = bufferCreate(size); 
+    if (copy == NULL) {
+        free(packet);
+        errno = ENOMEM;
+        return -1;
+    }
+
+    // Copy data.
+    memcpy(copy->data, bytes, size);
+    copy->size = size;
+    
+    packet->bytes = copy;
+    packet->header.type = BYTES;
+    packet->header.size = size;
+    packet->free = outgoingPacketFreeBytes;
+    bufferPrepareForWrite(packet->bytes);
+    
+    peerLock(peer);
+    
+    PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
+    if (peerProxy == NULL) {
+        // The peer is already dead or we couldn't alloc memory. Either way,
+        // errno is set.
+        peerUnlock(peer);
+        packet->free(packet); 
+        return -1;
+    } else {
+        peerProxyEnqueueOutgoingPacket(peerProxy, packet);
+        peerUnlock(peer);
+        selectorWakeUp(peer->selector);
+        return 0; 
+    }
+}
+
+/** Keeps track of how to free shared bytes. */
+typedef struct {
+    void (*free)(void* context);
+    void* context;
+} SharedBytesFreer;
+
+/** Frees shared bytes. */
+static void outgoingPacketFreeSharedBytes(OutgoingPacket* packet) {
+    SharedBytesFreer* sharedBytesFreer 
+        = (SharedBytesFreer*) packet->context;
+    sharedBytesFreer->free(sharedBytesFreer->context);
+    free(sharedBytesFreer);
+    free(packet);
+}
+
+/**
+ * Sends a packet of bytes to a remote peer without copying the bytes. Calls
+ * free() with context after the bytes have been sent.
+ *
+ * Returns -1 if an error occurs. Sets errno to ENOMEM if memory couldn't be
+ * allocated. Sets errno to EHOSTDOWN if the peer died recently. Sets errno
+ * to EINVAL if pid is the same as the local pid.
+ */
+int peerSendSharedBytes(pid_t pid, char* bytes, size_t size,
+        void (*free)(void* context), void* context) {
+    Peer* peer = localPeer;
+    assert(peer != NULL);
+
+    OutgoingPacket* packet = calloc(1, sizeof(OutgoingPacket));
+    if (packet == NULL) {
+        errno = ENOMEM;
+        return -1;
+    }
+
+    Buffer* wrapper = bufferWrap(bytes, size, size);
+    if (wrapper == NULL) {
+        free(packet);
+        errno = ENOMEM;
+        return -1;
+    }
+
+    SharedBytesFreer* sharedBytesFreer = malloc(sizeof(SharedBytesFreer));
+    if (sharedBytesFreer == NULL) {
+        free(packet);
+        free(wrapper);
+        errno = ENOMEM;
+        return -1;
+    }
+    sharedBytesFreer->free = free;
+    sharedBytesFreer->context = context;
+    
+    packet->bytes = wrapper;
+    packet->context = sharedBytesFreer;
+    packet->header.type = BYTES;
+    packet->header.size = size;
+    packet->free = &outgoingPacketFreeSharedBytes;
+    bufferPrepareForWrite(packet->bytes);
+    
+    peerLock(peer);
+    
+    PeerProxy* peerProxy = peerProxyGetOrCreate(peer, pid, true);
+    if (peerProxy == NULL) {
+        // The peer is already dead or we couldn't alloc memory. Either way,
+        // errno is set.
+        peerUnlock(peer);
+        packet->free(packet); 
+        return -1;
+    } else {
+        peerProxyEnqueueOutgoingPacket(peerProxy, packet);
+        peerUnlock(peer);
+        selectorWakeUp(peer->selector);
+        return 0; 
+    }
+}
+
+/**
+ * Starts the master peer. The master peer differs from other peers in that
+ * it is responsible for connecting the other peers. You can only have one
+ * master peer.
+ *
+ * Goes into an I/O loop and does not return.
+ */
+void masterPeerInitialize(BytesListener* bytesListener, 
+        DeathListener* deathListener) {
+    // Create and bind socket.
+    int listenerSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
+    if (listenerSocket == -1) {
+        LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
+    }
+    unlink(MASTER_PATH);
+    int result = bind(listenerSocket, (SocketAddress*) getMasterAddress(), 
+            sizeof(UnixAddress));
+    if (result == -1) {
+        LOG_ALWAYS_FATAL("bind() error: %s", strerror(errno));
+    }
+
+    LOGD("Listener socket: %d",  listenerSocket);   
+    
+    // Queue up to 16 connections.
+    result = listen(listenerSocket, 16);
+    if (result != 0) {
+        LOG_ALWAYS_FATAL("listen() error: %s", strerror(errno));
+    }
+
+    // Make socket non-blocking.
+    setNonBlocking(listenerSocket);
+
+    // Create the peer for this process. Fail if we already have one.
+    if (localPeer != NULL) {
+        LOG_ALWAYS_FATAL("Peer is already initialized.");
+    }
+    localPeer = peerCreate();
+    if (localPeer == NULL) {
+        LOG_ALWAYS_FATAL("malloc() failed.");
+    }
+    localPeer->master = true;
+    localPeer->onBytes = bytesListener;
+    localPeer->onDeath = deathListener;
+    
+    // Make listener socket selectable.
+    SelectableFd* listenerFd = selectorAdd(localPeer->selector, listenerSocket);
+    if (listenerFd == NULL) {
+        LOG_ALWAYS_FATAL("malloc() error.");
+    }
+    listenerFd->data = localPeer;
+    listenerFd->onReadable = &masterAcceptConnection;
+}
+
+/**
+ * Starts a local peer.
+ *
+ * Goes into an I/O loop and does not return.
+ */
+void peerInitialize(BytesListener* bytesListener, 
+        DeathListener* deathListener) {
+    // Connect to master peer.
+    int masterSocket = socket(AF_LOCAL, SOCK_STREAM, 0);
+    if (masterSocket == -1) {
+        LOG_ALWAYS_FATAL("socket() error: %s", strerror(errno));
+    }
+    int result = connect(masterSocket, (SocketAddress*) getMasterAddress(),
+            sizeof(UnixAddress));
+    if (result != 0) {
+        LOG_ALWAYS_FATAL("connect() error: %s", strerror(errno));
+    }
+
+    // Create the peer for this process. Fail if we already have one.
+    if (localPeer != NULL) {
+        LOG_ALWAYS_FATAL("Peer is already initialized.");
+    }
+    localPeer = peerCreate();
+    if (localPeer == NULL) {
+        LOG_ALWAYS_FATAL("malloc() failed.");
+    }
+    localPeer->onBytes = bytesListener;
+    localPeer->onDeath = deathListener;
+    
+    // Make connection selectable.
+    SelectableFd* masterFd = selectorAdd(localPeer->selector, masterSocket);
+    if (masterFd == NULL) {
+        LOG_ALWAYS_FATAL("malloc() error.");
+    }
+
+    // Create a peer proxy for the master peer.
+    PeerProxy* masterProxy = peerProxyCreate(localPeer, MASTER_CREDENTIALS);
+    if (masterProxy == NULL) {
+        LOG_ALWAYS_FATAL("malloc() error.");
+    }
+    peerProxySetFd(masterProxy, masterFd);
+    masterProxy->master = true;
+    localPeer->masterProxy = masterProxy;
+}
+
+/** Starts the master peer I/O loop. Doesn't return. */
+void peerLoop() {
+    assert(localPeer != NULL);
+    
+    // Start selector.
+    selectorLoop(localPeer->selector);
+}
+