libpdx: Add support for more event bits.

- Add generic abstraction for multiple event bits with an implementation-
  defined mechanism to deliver the bits.
- Update ServiceFS backend to pass through event bits.
- Implement EPOLLIN, EPOLLPRI, and EPOLLHUP event bit for UDS backed.

Bug: 34466748
Test: Build/flash system; observe stable operation.

Change-Id: I86afb5645b72ec69c095734c7891a690432150a3
diff --git a/libs/vr/libpdx/private/pdx/client.h b/libs/vr/libpdx/private/pdx/client.h
index 4eafe76..a590087 100644
--- a/libs/vr/libpdx/private/pdx/client.h
+++ b/libs/vr/libpdx/private/pdx/client.h
@@ -108,8 +108,22 @@
    */
   void DisableAutoReconnect();
 
+  /*
+   * Returns an fd that the client may use to check/wait for asynchronous
+   * notifications to the channel. It is implementation dependent how the
+   * transport backend handles this feature, however all implementations must
+   * support at least POLLIN/EPOLLIN/readable.
+   *
+   * For uses that require more than one type of event, use
+   * ClientChannel::GetEventMask() to distinguish between events.
+   */
   int event_fd() const;
+
+  /*
+   * Returns the underlying ClientChannel object.
+   */
   ClientChannel* GetChannel() const { return channel_.get(); }
+  std::unique_ptr<ClientChannel>&& TakeChannel() { return std::move(channel_); }
 
  private:
   Client(const Client&) = delete;
diff --git a/libs/vr/libpdx/private/pdx/client_channel.h b/libs/vr/libpdx/private/pdx/client_channel.h
index e7ea475..dbfd626 100644
--- a/libs/vr/libpdx/private/pdx/client_channel.h
+++ b/libs/vr/libpdx/private/pdx/client_channel.h
@@ -18,6 +18,8 @@
   virtual uint32_t GetIpcTag() const = 0;
 
   virtual int event_fd() const = 0;
+  virtual Status<int> GetEventMask(int events) = 0;
+
   virtual LocalChannelHandle& GetChannelHandle() = 0;
   virtual void* AllocateTransactionState() = 0;
   virtual void FreeTransactionState(void* state) = 0;
diff --git a/libs/vr/libpdx/private/pdx/mock_client_channel.h b/libs/vr/libpdx/private/pdx/mock_client_channel.h
index 7780ee3..561c939 100644
--- a/libs/vr/libpdx/private/pdx/mock_client_channel.h
+++ b/libs/vr/libpdx/private/pdx/mock_client_channel.h
@@ -11,6 +11,7 @@
  public:
   MOCK_CONST_METHOD0(GetIpcTag, uint32_t());
   MOCK_CONST_METHOD0(event_fd, int());
+  MOCK_METHOD1(GetEventMask, Status<int>(int));
   MOCK_METHOD0(GetChannelHandle, LocalChannelHandle&());
   MOCK_METHOD0(AllocateTransactionState, void*());
   MOCK_METHOD1(FreeTransactionState, void(void* state));
diff --git a/libs/vr/libpdx_uds/Android.bp b/libs/vr/libpdx_uds/Android.bp
index 9c4a3b0..09eeaa0 100644
--- a/libs/vr/libpdx_uds/Android.bp
+++ b/libs/vr/libpdx_uds/Android.bp
@@ -5,10 +5,13 @@
         "-Wall",
         "-Wextra",
         "-Werror",
+        "-DLOG_TAG=\"libpdx_uds\"",
+        "-DTRACE=0",
     ],
     export_include_dirs: ["private"],
     local_include_dirs: ["private"],
     srcs: [
+        "channel_event_set.cpp",
         "channel_manager.cpp",
         "client_channel_factory.cpp",
         "client_channel.cpp",
diff --git a/libs/vr/libpdx_uds/channel_event_set.cpp b/libs/vr/libpdx_uds/channel_event_set.cpp
new file mode 100644
index 0000000..f8baeab
--- /dev/null
+++ b/libs/vr/libpdx_uds/channel_event_set.cpp
@@ -0,0 +1,115 @@
+#include "private/uds/channel_event_set.h"
+
+#include <log/log.h>
+
+#include <uds/ipc_helper.h>
+
+namespace android {
+namespace pdx {
+namespace uds {
+
+ChannelEventSet::ChannelEventSet() {
+  const int flags = EFD_CLOEXEC | EFD_NONBLOCK;
+  LocalHandle epoll_fd, event_fd;
+
+  if (!SetupHandle(epoll_create(1), &epoll_fd, "epoll") ||
+      !SetupHandle(eventfd(0, flags), &event_fd, "event")) {
+    return;
+  }
+
+  epoll_event event;
+  event.events = 0;
+  event.data.u32 = 0;
+  if (epoll_ctl(epoll_fd.Get(), EPOLL_CTL_ADD, event_fd.Get(), &event) < 0) {
+    const int error = errno;
+    ALOGE("ChannelEventSet::ChannelEventSet: Failed to add event_fd: %s",
+          strerror(error));
+    return;
+  }
+
+  epoll_fd_ = std::move(epoll_fd);
+  event_fd_ = std::move(event_fd);
+}
+
+Status<void> ChannelEventSet::AddDataFd(const LocalHandle& data_fd) {
+  epoll_event event;
+  event.events = EPOLLHUP | EPOLLRDHUP;
+  event.data.u32 = event.events;
+  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, data_fd.Get(), &event) < 0) {
+    const int error = errno;
+    ALOGE("ChannelEventSet::ChannelEventSet: Failed to add event_fd: %s",
+          strerror(error));
+    return ErrorStatus{error};
+  } else {
+    return {};
+  }
+}
+
+int ChannelEventSet::ModifyEvents(int clear_mask, int set_mask) {
+  ALOGD_IF(TRACE, "ChannelEventSet::ModifyEvents: clear_mask=%x set_mask=%x",
+           clear_mask, set_mask);
+  const int old_bits = event_bits_;
+  const int new_bits = (event_bits_ & ~clear_mask) | set_mask;
+  event_bits_ = new_bits;
+
+  // If anything changed clear the event and update the event mask.
+  if (old_bits != new_bits) {
+    eventfd_t value;
+    eventfd_read(event_fd_.Get(), &value);
+
+    epoll_event event;
+    event.events = POLLIN;
+    event.data.u32 = event_bits_;
+    if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_MOD, event_fd_.Get(), &event) <
+        0) {
+      const int error = errno;
+      ALOGE("ChannelEventSet::AddEventHandle: Failed to update event: %s",
+            strerror(error));
+      return -error;
+    }
+  }
+
+  // If there are any bits set, re-trigger the eventfd.
+  if (new_bits)
+    eventfd_write(event_fd_.Get(), 1);
+
+  return 0;
+}
+
+Status<void> ChannelEventSet::SetupHandle(int fd, LocalHandle* handle,
+                                          const char* error_name) {
+  const int error = errno;
+  handle->Reset(fd);
+  if (!*handle) {
+    ALOGE("ChannelEventSet::SetupHandle: Failed to setup %s handle: %s",
+          error_name, strerror(error));
+    return ErrorStatus{error};
+  }
+  return {};
+}
+
+Status<int> ChannelEventReceiver::GetPendingEvents() const {
+  constexpr long kTimeoutMs = 0;
+  epoll_event event;
+  const int count =
+      RETRY_EINTR(epoll_wait(epoll_fd_.Get(), &event, 1, kTimeoutMs));
+
+  Status<int> status;
+  if (count < 0) {
+    status.SetError(errno);
+    ALOGE("ChannelEventReceiver::GetPendingEvents: Failed to get events: %s",
+          status.GetErrorMessage().c_str());
+    return status;
+  }
+
+  const int mask_out = event.data.u32;
+  ALOGD_IF(TRACE, "ChannelEventReceiver::GetPendingEvents: mask_out=%x",
+           mask_out);
+
+  status.SetValue(mask_out);
+  return status;
+}
+
+}  // namespace uds
+}  // namespace pdx
+}  // namespace android
diff --git a/libs/vr/libpdx_uds/channel_manager.cpp b/libs/vr/libpdx_uds/channel_manager.cpp
index 387625c..afc0a4f 100644
--- a/libs/vr/libpdx_uds/channel_manager.cpp
+++ b/libs/vr/libpdx_uds/channel_manager.cpp
@@ -33,11 +33,11 @@
   return LocalChannelHandle(nullptr, -1);
 }
 
-int ChannelManager::GetEventFd(int32_t handle) {
+ChannelManager::ChannelData* ChannelManager::GetChannelData(int32_t handle) {
   std::lock_guard<std::mutex> autolock(mutex_);
   auto channel = channels_.find(handle);
-  return channel != channels_.end() ? channel->second.event_fd.Get() : -1;
-};
+  return channel != channels_.end() ? &channel->second : nullptr;
+}
 
 }  // namespace uds
 }  // namespace pdx
diff --git a/libs/vr/libpdx_uds/client_channel.cpp b/libs/vr/libpdx_uds/client_channel.cpp
index 3394759..4cbdb94 100644
--- a/libs/vr/libpdx_uds/client_channel.cpp
+++ b/libs/vr/libpdx_uds/client_channel.cpp
@@ -50,12 +50,17 @@
   ChannelReference PushChannelHandle(BorrowedChannelHandle handle) {
     if (!handle)
       return handle.value();
-    ChannelInfo<BorrowedHandle> channel_info;
-    channel_info.data_fd.Reset(handle.value());
-    channel_info.event_fd.Reset(
-        ChannelManager::Get().GetEventFd(handle.value()));
-    request.channels.push_back(std::move(channel_info));
-    return request.channels.size() - 1;
+
+    if (auto* channel_data =
+            ChannelManager::Get().GetChannelData(handle.value())) {
+      ChannelInfo<BorrowedHandle> channel_info;
+      channel_info.data_fd.Reset(handle.value());
+      channel_info.event_fd = channel_data->event_receiver.event_fd();
+      request.channels.push_back(std::move(channel_info));
+      return request.channels.size() - 1;
+    } else {
+      return -1;
+    }
   }
 
   RequestHeader<BorrowedHandle> request;
@@ -127,27 +132,7 @@
 
 ClientChannel::ClientChannel(LocalChannelHandle channel_handle)
     : channel_handle_{std::move(channel_handle)} {
-  int data_fd = channel_handle_.value();
-  int event_fd =
-      channel_handle_ ? ChannelManager::Get().GetEventFd(data_fd) : -1;
-
-  if (event_fd >= 0) {
-    epoll_fd_.Reset(epoll_create(1));
-    if (epoll_fd_) {
-      epoll_event data_ev;
-      data_ev.events = EPOLLHUP;
-      data_ev.data.fd = data_fd;
-
-      epoll_event event_ev;
-      event_ev.events = EPOLLIN;
-      event_ev.data.fd = event_fd;
-      if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, data_fd, &data_ev) < 0 ||
-          epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd, &event_ev) < 0) {
-        ALOGE("Failed to add fd to epoll fd because: %s\n", strerror(errno));
-        epoll_fd_.Close();
-      }
-    }
-  }
+  channel_data_ = ChannelManager::Get().GetChannelData(channel_handle_.value());
 }
 
 std::unique_ptr<pdx::ClientChannel> ClientChannel::Create(
diff --git a/libs/vr/libpdx_uds/ipc_helper.cpp b/libs/vr/libpdx_uds/ipc_helper.cpp
index dca23ef..ee7299e 100644
--- a/libs/vr/libpdx_uds/ipc_helper.cpp
+++ b/libs/vr/libpdx_uds/ipc_helper.cpp
@@ -134,7 +134,9 @@
       RETRY_EINTR(recv(socket_fd, &preamble, sizeof(preamble), MSG_WAITALL));
   if (ret < 0)
     return ErrorStatus(errno);
-  if (ret != sizeof(preamble) || preamble.magic != kMagicPreamble)
+  else if (ret == 0)
+    return ErrorStatus(ESHUTDOWN);
+  else if (ret != sizeof(preamble) || preamble.magic != kMagicPreamble)
     return ErrorStatus(EIO);
 
   buffer_.resize(preamble.data_size);
@@ -157,7 +159,9 @@
   ret = RETRY_EINTR(recvmsg(socket_fd, &msg, MSG_WAITALL));
   if (ret < 0)
     return ErrorStatus(errno);
-  if (static_cast<uint32_t>(ret) != preamble.data_size)
+  else if (ret == 0)
+    return ErrorStatus(ESHUTDOWN);
+  else if (static_cast<uint32_t>(ret) != preamble.data_size)
     return ErrorStatus(EIO);
 
   bool cred_available = false;
@@ -239,7 +243,9 @@
   ssize_t size_read = RETRY_EINTR(recv(socket_fd, data, size, MSG_WAITALL));
   if (size_read < 0)
     return ErrorStatus(errno);
-  if (static_cast<size_t>(size_read) != size)
+  else if (size_read == 0)
+    return ErrorStatus(ESHUTDOWN);
+  else if (static_cast<size_t>(size_read) != size)
     return ErrorStatus(EIO);
   return {};
 }
@@ -251,7 +257,9 @@
   ssize_t size_read = RETRY_EINTR(recvmsg(socket_fd, &msg, MSG_WAITALL));
   if (size_read < 0)
     return ErrorStatus(errno);
-  if (static_cast<size_t>(size_read) != CountVectorSize(data, count))
+  else if (size_read == 0)
+    return ErrorStatus(ESHUTDOWN);
+  else if (static_cast<size_t>(size_read) != CountVectorSize(data, count))
     return ErrorStatus(EIO);
   return {};
 }
diff --git a/libs/vr/libpdx_uds/private/uds/channel_event_set.h b/libs/vr/libpdx_uds/private/uds/channel_event_set.h
new file mode 100644
index 0000000..1f464d5
--- /dev/null
+++ b/libs/vr/libpdx_uds/private/uds/channel_event_set.h
@@ -0,0 +1,62 @@
+#ifndef ANDROID_PDX_UDS_CHANNEL_EVENT_SET_H_
+#define ANDROID_PDX_UDS_CHANNEL_EVENT_SET_H_
+
+#include <errno.h>
+#include <poll.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+
+#include <pdx/file_handle.h>
+#include <pdx/status.h>
+
+namespace android {
+namespace pdx {
+namespace uds {
+
+class ChannelEventSet {
+ public:
+  ChannelEventSet();
+  ChannelEventSet(ChannelEventSet&&) = default;
+  ChannelEventSet& operator=(ChannelEventSet&&) = default;
+
+  BorrowedHandle event_fd() const { return epoll_fd_.Borrow(); }
+
+  explicit operator bool() const { return !!epoll_fd_ && !!event_fd_; }
+
+  Status<void> AddDataFd(const LocalHandle& data_fd);
+  int ModifyEvents(int clear_mask, int set_mask);
+
+ private:
+  LocalHandle epoll_fd_;
+  LocalHandle event_fd_;
+  uint32_t event_bits_ = 0;
+
+  static Status<void> SetupHandle(int fd, LocalHandle* handle,
+                                  const char* error_name);
+
+  ChannelEventSet(const ChannelEventSet&) = delete;
+  void operator=(const ChannelEventSet&) = delete;
+};
+
+class ChannelEventReceiver {
+ public:
+  ChannelEventReceiver() = default;
+  ChannelEventReceiver(LocalHandle epoll_fd) : epoll_fd_{std::move(epoll_fd)} {}
+  ChannelEventReceiver(ChannelEventReceiver&&) = default;
+  ChannelEventReceiver& operator=(ChannelEventReceiver&&) = default;
+
+  BorrowedHandle event_fd() const { return epoll_fd_.Borrow(); }
+  Status<int> GetPendingEvents() const;
+
+ private:
+  LocalHandle epoll_fd_;
+
+  ChannelEventReceiver(const ChannelEventReceiver&) = delete;
+  void operator=(const ChannelEventReceiver&) = delete;
+};
+
+}  // namespace uds
+}  // namespace pdx
+}  // namespace android
+
+#endif  // ANDROID_PDX_UDS_CHANNEL_EVENT_SET_H_
diff --git a/libs/vr/libpdx_uds/private/uds/channel_manager.h b/libs/vr/libpdx_uds/private/uds/channel_manager.h
index e8d3f6e..2aca414 100644
--- a/libs/vr/libpdx_uds/private/uds/channel_manager.h
+++ b/libs/vr/libpdx_uds/private/uds/channel_manager.h
@@ -6,6 +6,7 @@
 
 #include <pdx/channel_handle.h>
 #include <pdx/file_handle.h>
+#include <uds/channel_event_set.h>
 
 namespace android {
 namespace pdx {
@@ -16,13 +17,14 @@
   static ChannelManager& Get();
 
   LocalChannelHandle CreateHandle(LocalHandle data_fd, LocalHandle event_fd);
-  int GetEventFd(int32_t handle);
-
- private:
   struct ChannelData {
     LocalHandle data_fd;
-    LocalHandle event_fd;
+    ChannelEventReceiver event_receiver;
   };
+
+  ChannelData* GetChannelData(int32_t handle);
+
+ private:
   ChannelManager() = default;
 
   void CloseHandle(int32_t handle) override;
diff --git a/libs/vr/libpdx_uds/private/uds/client_channel.h b/libs/vr/libpdx_uds/private/uds/client_channel.h
index 12a40e7..45f6473 100644
--- a/libs/vr/libpdx_uds/private/uds/client_channel.h
+++ b/libs/vr/libpdx_uds/private/uds/client_channel.h
@@ -3,6 +3,7 @@
 
 #include <pdx/client_channel.h>
 
+#include <uds/channel_event_set.h>
 #include <uds/channel_manager.h>
 #include <uds/service_endpoint.h>
 
@@ -18,7 +19,17 @@
       LocalChannelHandle channel_handle);
 
   uint32_t GetIpcTag() const override { return Endpoint::kIpcTag; }
-  int event_fd() const override { return epoll_fd_.Get(); }
+
+  int event_fd() const override {
+    return channel_data_ ? channel_data_->event_receiver.event_fd().Get() : -1;
+  }
+  Status<int> GetEventMask(int /*events*/) override {
+    if (channel_data_)
+      return channel_data_->event_receiver.GetPendingEvents();
+    else
+      return ErrorStatus(EINVAL);
+  }
+
   LocalChannelHandle& GetChannelHandle() override { return channel_handle_; }
   void* AllocateTransactionState() override;
   void FreeTransactionState(void* state) override;
@@ -61,7 +72,7 @@
                              const iovec* receive_vector, size_t receive_count);
 
   LocalChannelHandle channel_handle_;
-  LocalHandle epoll_fd_;
+  ChannelManager::ChannelData* channel_data_;
 };
 
 }  // namespace uds
diff --git a/libs/vr/libpdx_uds/private/uds/service_endpoint.h b/libs/vr/libpdx_uds/private/uds/service_endpoint.h
index 0f69400..3ec8519 100644
--- a/libs/vr/libpdx_uds/private/uds/service_endpoint.h
+++ b/libs/vr/libpdx_uds/private/uds/service_endpoint.h
@@ -11,6 +11,7 @@
 
 #include <pdx/service.h>
 #include <pdx/service_endpoint.h>
+#include <uds/channel_event_set.h>
 #include <uds/service_dispatcher.h>
 
 namespace android {
@@ -95,8 +96,7 @@
  private:
   struct ChannelData {
     LocalHandle data_fd;
-    LocalHandle event_fd;
-    uint32_t event_mask{0};
+    ChannelEventSet event_set;
     Channel* channel_state{nullptr};
   };
 
@@ -110,6 +110,8 @@
     return next_message_id_.fetch_add(1, std::memory_order_relaxed);
   }
 
+  void BuildCloseMessage(int channel_id, Message* message);
+
   Status<void> AcceptConnection(Message* message);
   Status<void> ReceiveMessageForChannel(int channel_id, Message* message);
   Status<void> OnNewChannel(LocalHandle channel_fd);
@@ -130,9 +132,6 @@
   mutable std::mutex channel_mutex_;
   std::map<int, ChannelData> channels_;
 
-  mutable std::mutex service_mutex_;
-  std::condition_variable condition_;
-
   Service* service_{nullptr};
   std::atomic<uint32_t> next_message_id_;
 };
diff --git a/libs/vr/libpdx_uds/service_dispatcher.cpp b/libs/vr/libpdx_uds/service_dispatcher.cpp
index 8a40158..fa98f26 100644
--- a/libs/vr/libpdx_uds/service_dispatcher.cpp
+++ b/libs/vr/libpdx_uds/service_dispatcher.cpp
@@ -1,4 +1,3 @@
-#define LOG_TAG "ServiceDispatcher"
 #include "uds/service_dispatcher.h"
 
 #include <errno.h>
@@ -9,8 +8,6 @@
 #include "pdx/service.h"
 #include "uds/service_endpoint.h"
 
-#define TRACE 0
-
 static const int kMaxEventsPerLoop = 128;
 
 namespace android {
diff --git a/libs/vr/libpdx_uds/service_endpoint.cpp b/libs/vr/libpdx_uds/service_endpoint.cpp
index b6021e8..7bf753d 100644
--- a/libs/vr/libpdx_uds/service_endpoint.cpp
+++ b/libs/vr/libpdx_uds/service_endpoint.cpp
@@ -61,12 +61,17 @@
   ChannelReference PushChannelHandle(BorrowedChannelHandle handle) {
     if (!handle)
       return handle.value();
-    ChannelInfo<BorrowedHandle> channel_info;
-    channel_info.data_fd.Reset(handle.value());
-    channel_info.event_fd.Reset(
-        ChannelManager::Get().GetEventFd(handle.value()));
-    response.channels.push_back(std::move(channel_info));
-    return response.channels.size() - 1;
+
+    if (auto* channel_data =
+            ChannelManager::Get().GetChannelData(handle.value())) {
+      ChannelInfo<BorrowedHandle> channel_info;
+      channel_info.data_fd.Reset(handle.value());
+      channel_info.event_fd = channel_data->event_receiver.event_fd();
+      response.channels.push_back(std::move(channel_info));
+      return response.channels.size() - 1;
+    } else {
+      return -1;
+    }
   }
 
   ChannelReference PushChannelHandle(BorrowedHandle data_fd,
@@ -156,8 +161,6 @@
     return;
   }
 
-  // Use "this" as a unique pointer to distinguish the event fd from all
-  // the other entries that point to instances of Service.
   epoll_event socket_event;
   socket_event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
   socket_event.data.fd = fd.Get();
@@ -244,9 +247,9 @@
     return ErrorStatus(errno);
   }
   ChannelData channel_data;
-  int channel_id = channel_fd.Get();
+  const int channel_id = channel_fd.Get();
+  channel_data.event_set.AddDataFd(channel_fd);
   channel_data.data_fd = std::move(channel_fd);
-  channel_data.event_fd.Reset(eventfd(0, 0));
   channel_data.channel_state = channel_state;
   auto pair = channels_.emplace(channel_id, std::move(channel_data));
   return &pair.first->second;
@@ -272,6 +275,8 @@
 }
 
 int Endpoint::CloseChannelLocked(int channel_id) {
+  ALOGD_IF(TRACE, "Endpoint::CloseChannelLocked: channel_id=%d", channel_id);
+
   auto channel_data = channels_.find(channel_id);
   if (channel_data == channels_.end())
     return -EINVAL;
@@ -293,31 +298,15 @@
 int Endpoint::ModifyChannelEvents(int channel_id, int clear_mask,
                                   int set_mask) {
   std::lock_guard<std::mutex> autolock(channel_mutex_);
-  auto channel_data = channels_.find(channel_id);
-  if (channel_data == channels_.end())
-    return -EINVAL;
-  int old_mask = channel_data->second.event_mask;
-  int new_mask = (old_mask & ~clear_mask) | set_mask;
 
-  // EPOLLHUP shares the same bitmask with POLLHUP.
-  if ((new_mask & POLLHUP) && !(old_mask & POLLHUP)) {
-    CloseChannelLocked(channel_id);
+  auto search = channels_.find(channel_id);
+  if (search != channels_.end()) {
+    auto& channel_data = search->second;
+    channel_data.event_set.ModifyEvents(clear_mask, set_mask);
     return 0;
   }
 
-  // EPOLLIN shares the same bitmask with POLLIN and EPOLLPRI shares the same
-  // bitmask with POLLPRI
-  eventfd_t value = 1;
-  if (((new_mask & POLLIN) && !(old_mask & POLLIN)) ||
-      ((new_mask & POLLPRI) && !(old_mask & POLLPRI))) {
-    eventfd_write(channel_data->second.event_fd.Get(), value);
-  } else if ((!(new_mask & POLLIN) && (old_mask & POLLIN)) ||
-             (!(new_mask & POLLPRI) && (old_mask & POLLPRI))) {
-    eventfd_read(channel_data->second.event_fd.Get(), &value);
-  }
-
-  channel_data->second.event_mask = new_mask;
-  return 0;
+  return -EINVAL;
 }
 
 Status<RemoteChannelHandle> Endpoint::PushChannel(Message* message,
@@ -355,7 +344,8 @@
 
   auto* state = static_cast<MessageState*>(message->GetState());
   ChannelReference ref = state->PushChannelHandle(
-      remote_socket.Borrow(), channel_data.get()->event_fd.Borrow());
+      remote_socket.Borrow(),
+      channel_data.get()->event_set.event_fd().Borrow());
   state->sockets_to_close.push_back(std::move(remote_socket));
   return RemoteChannelHandle{ref};
 }
@@ -391,8 +381,9 @@
 int Endpoint::GetChannelEventFd(int channel_id) {
   std::lock_guard<std::mutex> autolock(channel_mutex_);
   auto channel_data = channels_.find(channel_id);
-  return (channel_data != channels_.end()) ? channel_data->second.event_fd.Get()
-                                           : -1;
+  return (channel_data != channels_.end())
+             ? channel_data->second.event_set.event_fd().Get()
+             : -1;
 }
 
 Status<void> Endpoint::ReceiveMessageForChannel(int channel_id,
@@ -400,9 +391,15 @@
   RequestHeader<LocalHandle> request;
   auto status = ReceiveData(channel_id, &request);
   if (!status) {
-    CloseChannel(channel_id);
-    return status;
+    if (status.error() == ESHUTDOWN) {
+      BuildCloseMessage(channel_id, message);
+      return {};
+    } else {
+      CloseChannel(channel_id);
+      return status;
+    }
   }
+
   MessageInfo info;
   info.pid = request.cred.pid;
   info.tid = -1;
@@ -435,19 +432,42 @@
   if (status && request.is_impulse)
     status = ReenableEpollEvent(channel_id);
 
-  if (!status)
-    CloseChannel(channel_id);
+  if (!status) {
+    if (status.error() == ESHUTDOWN) {
+      BuildCloseMessage(channel_id, message);
+      return {};
+    } else {
+      CloseChannel(channel_id);
+      return status;
+    }
+  }
 
   return status;
 }
 
-int Endpoint::MessageReceive(Message* message) {
-  {
-    std::unique_lock<std::mutex> lock(service_mutex_);
-    condition_.wait(lock, [this] { return service_ != nullptr; });
-  }
+void Endpoint::BuildCloseMessage(int channel_id, Message* message) {
+  ALOGD_IF(TRACE, "Endpoint::BuildCloseMessage: channel_id=%d", channel_id);
+  MessageInfo info;
+  info.pid = -1;
+  info.tid = -1;
+  info.cid = channel_id;
+  info.mid = GetNextAvailableMessageId();
+  info.euid = -1;
+  info.egid = -1;
+  info.op = opcodes::CHANNEL_CLOSE;
+  info.flags = 0;
+  info.service = service_;
+  info.channel = GetChannelState(channel_id);
+  info.send_len = 0;
+  info.recv_len = 0;
+  info.fd_count = 0;
+  *message = Message{info};
+}
 
-  // One event at a time.
+int Endpoint::MessageReceive(Message* message) {
+  // Receive at most one event from the epoll set. This should prevent multiple
+  // dispatch threads from attempting to handle messages on the same socket at
+  // the same time.
   epoll_event event;
   int count = RETRY_EINTR(
       epoll_wait(epoll_fd_.Get(), &event, 1, is_blocking_ ? -1 : 0));
@@ -472,22 +492,8 @@
   }
 
   int channel_id = event.data.fd;
-  if (event.events & EPOLLRDHUP) {
-    MessageInfo info;
-    info.pid = -1;
-    info.tid = -1;
-    info.cid = channel_id;
-    info.mid = GetNextAvailableMessageId();
-    info.euid = -1;
-    info.egid = -1;
-    info.op = opcodes::CHANNEL_CLOSE;
-    info.flags = 0;
-    info.service = service_;
-    info.channel = GetChannelState(channel_id);
-    info.send_len = 0;
-    info.recv_len = 0;
-    info.fd_count = 0;
-    *message = Message{info};
+  if (event.events & (EPOLLRDHUP | EPOLLHUP)) {
+    BuildCloseMessage(channel_id, message);
     return 0;
   }
 
@@ -498,18 +504,19 @@
 }
 
 int Endpoint::MessageReply(Message* message, int return_code) {
-  int channel_socket = GetChannelSocketFd(message->GetChannelId());
+  const int channel_id = message->GetChannelId();
+  const int channel_socket = GetChannelSocketFd(channel_id);
   if (channel_socket < 0)
     return -EBADF;
 
   auto* state = static_cast<MessageState*>(message->GetState());
   switch (message->GetOp()) {
     case opcodes::CHANNEL_CLOSE:
-      return CloseChannel(channel_socket);
+      return CloseChannel(channel_id);
 
     case opcodes::CHANNEL_OPEN:
       if (return_code < 0)
-        return CloseChannel(channel_socket);
+        return CloseChannel(channel_id);
       // Reply with the event fd.
       return_code = state->PushFileHandle(
           BorrowedHandle{GetChannelEventFd(channel_socket)});