|  | #include <pdx/service_dispatcher.h> | 
|  |  | 
|  | #include <errno.h> | 
|  | #include <log/log.h> | 
|  | #include <sys/epoll.h> | 
|  | #include <sys/eventfd.h> | 
|  |  | 
|  | #include <pdx/service.h> | 
|  | #include <pdx/service_endpoint.h> | 
|  |  | 
|  | static const int kMaxEventsPerLoop = 128; | 
|  |  | 
|  | namespace android { | 
|  | namespace pdx { | 
|  |  | 
|  | std::unique_ptr<ServiceDispatcher> ServiceDispatcher::Create() { | 
|  | std::unique_ptr<ServiceDispatcher> dispatcher{new ServiceDispatcher()}; | 
|  | if (!dispatcher->epoll_fd_ || !dispatcher->event_fd_) { | 
|  | dispatcher.reset(); | 
|  | } | 
|  |  | 
|  | return dispatcher; | 
|  | } | 
|  |  | 
|  | ServiceDispatcher::ServiceDispatcher() { | 
|  | event_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)); | 
|  | if (!event_fd_) { | 
|  | ALOGE("Failed to create event fd because: %s\n", strerror(errno)); | 
|  | return; | 
|  | } | 
|  |  | 
|  | epoll_fd_.Reset(epoll_create1(EPOLL_CLOEXEC)); | 
|  | if (!epoll_fd_) { | 
|  | ALOGE("Failed to create epoll fd because: %s\n", strerror(errno)); | 
|  | return; | 
|  | } | 
|  |  | 
|  | // Use "this" as a unique pointer to distinguish the event fd from all | 
|  | // the other entries that point to instances of Service. | 
|  | epoll_event event; | 
|  | event.events = EPOLLIN; | 
|  | event.data.ptr = this; | 
|  |  | 
|  | if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd_.Get(), &event) < 0) { | 
|  | ALOGE("Failed to add event fd to epoll fd because: %s\n", strerror(errno)); | 
|  |  | 
|  | // Close the fds here and signal failure to the factory method. | 
|  | event_fd_.Close(); | 
|  | epoll_fd_.Close(); | 
|  | } | 
|  | } | 
|  |  | 
|  | ServiceDispatcher::~ServiceDispatcher() { SetCanceled(true); } | 
|  |  | 
|  | int ServiceDispatcher::ThreadEnter() { | 
|  | std::lock_guard<std::mutex> autolock(mutex_); | 
|  |  | 
|  | if (canceled_) | 
|  | return -EBUSY; | 
|  |  | 
|  | thread_count_++; | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | void ServiceDispatcher::ThreadExit() { | 
|  | std::lock_guard<std::mutex> autolock(mutex_); | 
|  | thread_count_--; | 
|  | condition_.notify_one(); | 
|  | } | 
|  |  | 
|  | int ServiceDispatcher::AddService(const std::shared_ptr<Service>& service) { | 
|  | std::lock_guard<std::mutex> autolock(mutex_); | 
|  |  | 
|  | epoll_event event; | 
|  | event.events = EPOLLIN; | 
|  | event.data.ptr = service.get(); | 
|  |  | 
|  | if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, service->endpoint()->epoll_fd(), | 
|  | &event) < 0) { | 
|  | ALOGE("Failed to add service to dispatcher because: %s\n", strerror(errno)); | 
|  | return -errno; | 
|  | } | 
|  |  | 
|  | services_.push_back(service); | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | int ServiceDispatcher::RemoveService(const std::shared_ptr<Service>& service) { | 
|  | std::lock_guard<std::mutex> autolock(mutex_); | 
|  |  | 
|  | // It's dangerous to remove a service while other threads may be using it. | 
|  | if (thread_count_ > 0) | 
|  | return -EBUSY; | 
|  |  | 
|  | epoll_event ee;  // See BUGS in man 2 epoll_ctl. | 
|  | if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_DEL, service->endpoint()->epoll_fd(), | 
|  | &ee) < 0) { | 
|  | ALOGE("Failed to remove service from dispatcher because: %s\n", | 
|  | strerror(errno)); | 
|  | return -errno; | 
|  | } | 
|  |  | 
|  | services_.erase(std::remove(services_.begin(), services_.end(), service), | 
|  | services_.end()); | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | int ServiceDispatcher::ReceiveAndDispatch() { return ReceiveAndDispatch(-1); } | 
|  |  | 
|  | int ServiceDispatcher::ReceiveAndDispatch(int timeout) { | 
|  | int ret = ThreadEnter(); | 
|  | if (ret < 0) | 
|  | return ret; | 
|  |  | 
|  | epoll_event events[kMaxEventsPerLoop]; | 
|  |  | 
|  | int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, timeout); | 
|  | if (count <= 0) { | 
|  | ALOGE_IF(count < 0, "Failed to wait for epoll events because: %s\n", | 
|  | strerror(errno)); | 
|  | ThreadExit(); | 
|  | return count < 0 ? -errno : -ETIMEDOUT; | 
|  | } | 
|  |  | 
|  | for (int i = 0; i < count; i++) { | 
|  | if (events[i].data.ptr == this) { | 
|  | ThreadExit(); | 
|  | return -EBUSY; | 
|  | } else { | 
|  | Service* service = static_cast<Service*>(events[i].data.ptr); | 
|  |  | 
|  | ALOGI_IF(TRACE, "Dispatching message: fd=%d\n", | 
|  | service->endpoint()->epoll_fd()); | 
|  | service->ReceiveAndDispatch(); | 
|  | } | 
|  | } | 
|  |  | 
|  | ThreadExit(); | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | int ServiceDispatcher::EnterDispatchLoop() { | 
|  | int ret = ThreadEnter(); | 
|  | if (ret < 0) | 
|  | return ret; | 
|  |  | 
|  | epoll_event events[kMaxEventsPerLoop]; | 
|  |  | 
|  | while (!IsCanceled()) { | 
|  | int count = epoll_wait(epoll_fd_.Get(), events, kMaxEventsPerLoop, -1); | 
|  | if (count < 0 && errno != EINTR) { | 
|  | ALOGE("Failed to wait for epoll events because: %s\n", strerror(errno)); | 
|  | ThreadExit(); | 
|  | return -errno; | 
|  | } | 
|  |  | 
|  | for (int i = 0; i < count; i++) { | 
|  | if (events[i].data.ptr == this) { | 
|  | ThreadExit(); | 
|  | return -EBUSY; | 
|  | } else { | 
|  | Service* service = static_cast<Service*>(events[i].data.ptr); | 
|  |  | 
|  | ALOGI_IF(TRACE, "Dispatching message: fd=%d\n", | 
|  | service->endpoint()->epoll_fd()); | 
|  | service->ReceiveAndDispatch(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | ThreadExit(); | 
|  | return 0; | 
|  | } | 
|  |  | 
|  | void ServiceDispatcher::SetCanceled(bool cancel) { | 
|  | std::unique_lock<std::mutex> lock(mutex_); | 
|  | canceled_ = cancel; | 
|  |  | 
|  | if (canceled_ && thread_count_ > 0) { | 
|  | eventfd_write(event_fd_.Get(), 1);  // Signal threads to quit. | 
|  |  | 
|  | condition_.wait(lock, [this] { return !(canceled_ && thread_count_ > 0); }); | 
|  |  | 
|  | eventfd_t value; | 
|  | eventfd_read(event_fd_.Get(), &value);  // Unsignal. | 
|  | } | 
|  | } | 
|  |  | 
|  | bool ServiceDispatcher::IsCanceled() const { return canceled_; } | 
|  |  | 
|  | }  // namespace pdx | 
|  | }  // namespace android |