libudx: Move ServiceDispatcher from libpdx_uds to libpdx

The two separate implementations of ServiceDispatcher for UDS and ServiceFS
are practically identical. Moved the implementation into the base libpdx lib,
and removed the transport-specific versions.

Bug: None
Test: `m -j32` succeeds; pdx_tests, pdx_servicefs_tests, pdx_uds_tests pass
Change-Id: I2344f4f23bc3da27eb7b192344844b87660e8610
diff --git a/libs/vr/libpdx/service_dispatcher.cpp b/libs/vr/libpdx/service_dispatcher.cpp
new file mode 100644
index 0000000..b112fa3
--- /dev/null
+++ b/libs/vr/libpdx/service_dispatcher.cpp
@@ -0,0 +1,192 @@
+#include <pdx/service_dispatcher.h>
+
+#include <errno.h>
+#include <log/log.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+
+#include <pdx/service.h>
+#include <pdx/service_endpoint.h>
+
+static const int kMaxEventsPerLoop = 128;
+
+namespace android {
+namespace pdx {
+
+std::unique_ptr<ServiceDispatcher> ServiceDispatcher::Create() {
+  std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()};
+  if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) {
+    dispatcher.reset();
+  }
+
+  return dispatcher;
+}
+
+ServiceDispatcher::ServiceDispatcher() {
+  event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
+  if (!event_fd_) {
+    ALOGE("Failed to create event fd because: %s\n", strerror(errno));
+    return;
+  }
+
+  epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
+  if (!epoll_fd_) {
+    ALOGE("Failed to create epoll fd because: %s\n", strerror(errno));
+    return;
+  }
+
+  // Use "this" as a unique pointer to distinguish the event fd from all
+  // the other entries that point to instances of Service.
+  epoll_event event;
+  event.events = EPOLLIN;
+  event.data.ptr = this;
+
+  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) {
+    ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno));
+
+    // Close the fds here and signal failure to the factory method.
+    event_fd_.Close();
+    epoll_fd_.Close();
+  }
+}
+
+ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); }
+
+int ServiceDispatcher::ThreadEnter() {
+  std::lock_guard<std::mutex> autolock(mutex_);
+
+  if (canceled_)
+    return -EBUSY;
+
+  thread_count_++;
+  return 0;
+}
+
+void ServiceDispatcher::ThreadExit() {
+  std::lock_guard<std::mutex> autolock(mutex_);
+  thread_count_--;
+  condition_.notify_one();
+}
+
+int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) {
+  std::lock_guard<std::mutex> autolock(mutex_);
+
+  epoll_event event;
+  event.events = EPOLLIN;
+  event.data.ptr = service.get();
+
+  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, service->endpoint()->epoll_fd(),
+                &event) < 0) {
+    ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno));
+    return -errno;
+  }
+
+  services_.push_back(service);
+  return 0;
+}
+
+int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) {
+  std::lock_guard<std::mutex> autolock(mutex_);
+
+  // It's dangerous to remove a service while other threads may be using it.
+  if (thread_count_ > 0)
+    return -EBUSY;
+
+  epoll_event dummy;  // See BUGS in man 2 epoll_ctl.
+  if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, service->endpoint()->epoll_fd(),
+                &dummy) < 0) {
+    ALOGE("Failed to remove service from dispatcher because: %s\n",
+          strerror(errno));
+    return -errno;
+  }
+
+  services_.erase(std::remove(services_.begin(), services_.end(), service),
+                  services_.end());
+  return 0;
+}
+
+int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); }
+
+int ServiceDispatcher::ReceiveAndDispatch(int timeout) {
+  int ret = ThreadEnter();
+  if (ret < 0)
+    return ret;
+
+  epoll_event events[kMaxEventsPerLoop];
+
+  int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout);
+  if (count <= 0) {
+    ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n",
+             strerror(errno));
+    ThreadExit();
+    return count < 0 ? -errno : -ETIMEDOUT;
+  }
+
+  for (int i = 0; i < count; i++) {
+    if (events[i].data.ptr == this) {
+      ThreadExit();
+      return -EBUSY;
+    } else {
+      Service* service = static_cast<Service*>(events[i].data.ptr);
+
+      ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
+               service->endpoint()->epoll_fd());
+      service->ReceiveAndDispatch();
+    }
+  }
+
+  ThreadExit();
+  return 0;
+}
+
+int ServiceDispatcher::EnterDispatchLoop() {
+  int ret = ThreadEnter();
+  if (ret < 0)
+    return ret;
+
+  epoll_event events[kMaxEventsPerLoop];
+
+  while (!IsCanceled()) {
+    int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1);
+    if (count < 0 && errno != EINTR) {
+      ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno));
+      ThreadExit();
+      return -errno;
+    }
+
+    for (int i = 0; i < count; i++) {
+      if (events[i].data.ptr == this) {
+        ThreadExit();
+        return -EBUSY;
+      } else {
+        Service* service = static_cast<Service*>(events[i].data.ptr);
+
+        ALOGI_IF(TRACE, "Dispatching message: fd=%d\n",
+                 service->endpoint()->epoll_fd());
+        service->ReceiveAndDispatch();
+      }
+    }
+  }
+
+  ThreadExit();
+  return 0;
+}
+
+void ServiceDispatcher::SetCanceled(bool cancel) {
+  std::unique_lock<std::mutex> lock(mutex_);
+  canceled_ = cancel;
+
+  if (canceled_ && thread_count_ > 0) {
+    eventfd_write(event_fd_.Get(), 1);  // Signal threads to quit.
+
+    condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); });
+
+    eventfd_t value;
+    eventfd_read(event_fd_.Get(), &value);  // Unsignal.
+  }
+}
+
+bool ServiceDispatcher::IsCanceled() const { return canceled_; }
+
+}  // namespace pdx
+}  // namespace android