libpdx: Add support for more event bits.
- Add generic abstraction for multiple event bits with an implementation-
defined mechanism to deliver the bits.
- Update ServiceFS backend to pass through event bits.
- Implement EPOLLIN, EPOLLPRI, and EPOLLHUP event bit for UDS backed.
Bug: 34466748
Test: Build/flash system; observe stable operation.
Change-Id: I86afb5645b72ec69c095734c7891a690432150a3
diff --git a/libs/vr/libpdx/private/pdx/client.h b/libs/vr/libpdx/private/pdx/client.h
index 4eafe76..a590087 100644
--- a/libs/vr/libpdx/private/pdx/client.h
+++ b/libs/vr/libpdx/private/pdx/client.h
@@ -108,8 +108,22 @@
*/
void DisableAutoReconnect();
+ /*
+ * Returns an fd that the client may use to check/wait for asynchronous
+ * notifications to the channel. It is implementation dependent how the
+ * transport backend handles this feature, however all implementations must
+ * support at least POLLIN/EPOLLIN/readable.
+ *
+ * For uses that require more than one type of event, use
+ * ClientChannel::GetEventMask() to distinguish between events.
+ */
int event_fd() const;
+
+ /*
+ * Returns the underlying ClientChannel object.
+ */
ClientChannel* GetChannel() const { return channel_.get(); }
+ std::unique_ptr<ClientChannel>&& TakeChannel() { return std::move(channel_); }
private:
Client(const Client&) = delete;
diff --git a/libs/vr/libpdx/private/pdx/client_channel.h b/libs/vr/libpdx/private/pdx/client_channel.h
index e7ea475..dbfd626 100644
--- a/libs/vr/libpdx/private/pdx/client_channel.h
+++ b/libs/vr/libpdx/private/pdx/client_channel.h
@@ -18,6 +18,8 @@
virtual uint32_t GetIpcTag() const = 0;
virtual int event_fd() const = 0;
+ virtual Status<int> GetEventMask(int events) = 0;
+
virtual LocalChannelHandle& GetChannelHandle() = 0;
virtual void* AllocateTransactionState() = 0;
virtual void FreeTransactionState(void* state) = 0;
diff --git a/libs/vr/libpdx/private/pdx/mock_client_channel.h b/libs/vr/libpdx/private/pdx/mock_client_channel.h
index 7780ee3..561c939 100644
--- a/libs/vr/libpdx/private/pdx/mock_client_channel.h
+++ b/libs/vr/libpdx/private/pdx/mock_client_channel.h
@@ -11,6 +11,7 @@
public:
MOCK_CONST_METHOD0(GetIpcTag, uint32_t());
MOCK_CONST_METHOD0(event_fd, int());
+ MOCK_METHOD1(GetEventMask, Status<int>(int));
MOCK_METHOD0(GetChannelHandle, LocalChannelHandle&());
MOCK_METHOD0(AllocateTransactionState, void*());
MOCK_METHOD1(FreeTransactionState, void(void* state));
diff --git a/libs/vr/libpdx_uds/Android.bp b/libs/vr/libpdx_uds/Android.bp
index 9c4a3b0..09eeaa0 100644
--- a/libs/vr/libpdx_uds/Android.bp
+++ b/libs/vr/libpdx_uds/Android.bp
@@ -5,10 +5,13 @@
"-Wall",
"-Wextra",
"-Werror",
+ "-DLOG_TAG=\"libpdx_uds\"",
+ "-DTRACE=0",
],
export_include_dirs: ["private"],
local_include_dirs: ["private"],
srcs: [
+ "channel_event_set.cpp",
"channel_manager.cpp",
"client_channel_factory.cpp",
"client_channel.cpp",
diff --git a/libs/vr/libpdx_uds/channel_event_set.cpp b/libs/vr/libpdx_uds/channel_event_set.cpp
new file mode 100644
index 0000000..f8baeab
--- /dev/null
+++ b/libs/vr/libpdx_uds/channel_event_set.cpp
@@ -0,0 +1,115 @@
+#include "private/uds/channel_event_set.h"
+
+#include <log/log.h>
+
+#include <uds/ipc_helper.h>
+
+namespace android {
+namespace pdx {
+namespace uds {
+
+ChannelEventSet::ChannelEventSet() {
+ const int flags = EFD_CLOEXEC | EFD_NONBLOCK;
+ LocalHandle epoll_fd, event_fd;
+
+ if (!SetupHandle(epoll_create(1), &epoll_fd, "epoll") ||
+ !SetupHandle(eventfd(0, flags), &event_fd, "event")) {
+ return;
+ }
+
+ epoll_event event;
+ event.events = 0;
+ event.data.u32 = 0;
+ if (epoll_ctl(epoll_fd.Get(), EPOLL_CTL_ADD, event_fd.Get(), &event) < 0) {
+ const int error = errno;
+ ALOGE("ChannelEventSet::ChannelEventSet: Failed to add event_fd: %s",
+ strerror(error));
+ return;
+ }
+
+ epoll_fd_ = std::move(epoll_fd);
+ event_fd_ = std::move(event_fd);
+}
+
+Status<void> ChannelEventSet::AddDataFd(const LocalHandle& data_fd) {
+ epoll_event event;
+ event.events = EPOLLHUP | EPOLLRDHUP;
+ event.data.u32 = event.events;
+ if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, data_fd.Get(), &event) < 0) {
+ const int error = errno;
+ ALOGE("ChannelEventSet::ChannelEventSet: Failed to add event_fd: %s",
+ strerror(error));
+ return ErrorStatus{error};
+ } else {
+ return {};
+ }
+}
+
+int ChannelEventSet::ModifyEvents(int clear_mask, int set_mask) {
+ ALOGD_IF(TRACE, "ChannelEventSet::ModifyEvents: clear_mask=%x set_mask=%x",
+ clear_mask, set_mask);
+ const int old_bits = event_bits_;
+ const int new_bits = (event_bits_ & ~clear_mask) | set_mask;
+ event_bits_ = new_bits;
+
+ // If anything changed clear the event and update the event mask.
+ if (old_bits != new_bits) {
+ eventfd_t value;
+ eventfd_read(event_fd_.Get(), &value);
+
+ epoll_event event;
+ event.events = POLLIN;
+ event.data.u32 = event_bits_;
+ if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_MOD, event_fd_.Get(), &event) <
+ 0) {
+ const int error = errno;
+ ALOGE("ChannelEventSet::AddEventHandle: Failed to update event: %s",
+ strerror(error));
+ return -error;
+ }
+ }
+
+ // If there are any bits set, re-trigger the eventfd.
+ if (new_bits)
+ eventfd_write(event_fd_.Get(), 1);
+
+ return 0;
+}
+
+Status<void> ChannelEventSet::SetupHandle(int fd, LocalHandle* handle,
+ const char* error_name) {
+ const int error = errno;
+ handle->Reset(fd);
+ if (!*handle) {
+ ALOGE("ChannelEventSet::SetupHandle: Failed to setup %s handle: %s",
+ error_name, strerror(error));
+ return ErrorStatus{error};
+ }
+ return {};
+}
+
+Status<int> ChannelEventReceiver::GetPendingEvents() const {
+ constexpr long kTimeoutMs = 0;
+ epoll_event event;
+ const int count =
+ RETRY_EINTR(epoll_wait(epoll_fd_.Get(), &event, 1, kTimeoutMs));
+
+ Status<int> status;
+ if (count < 0) {
+ status.SetError(errno);
+ ALOGE("ChannelEventReceiver::GetPendingEvents: Failed to get events: %s",
+ status.GetErrorMessage().c_str());
+ return status;
+ }
+
+ const int mask_out = event.data.u32;
+ ALOGD_IF(TRACE, "ChannelEventReceiver::GetPendingEvents: mask_out=%x",
+ mask_out);
+
+ status.SetValue(mask_out);
+ return status;
+}
+
+} // namespace uds
+} // namespace pdx
+} // namespace android
diff --git a/libs/vr/libpdx_uds/channel_manager.cpp b/libs/vr/libpdx_uds/channel_manager.cpp
index 387625c..afc0a4f 100644
--- a/libs/vr/libpdx_uds/channel_manager.cpp
+++ b/libs/vr/libpdx_uds/channel_manager.cpp
@@ -33,11 +33,11 @@
return LocalChannelHandle(nullptr, -1);
}
-int ChannelManager::GetEventFd(int32_t handle) {
+ChannelManager::ChannelData* ChannelManager::GetChannelData(int32_t handle) {
std::lock_guard<std::mutex> autolock(mutex_);
auto channel = channels_.find(handle);
- return channel != channels_.end() ? channel->second.event_fd.Get() : -1;
-};
+ return channel != channels_.end() ? &channel->second : nullptr;
+}
} // namespace uds
} // namespace pdx
diff --git a/libs/vr/libpdx_uds/client_channel.cpp b/libs/vr/libpdx_uds/client_channel.cpp
index 3394759..4cbdb94 100644
--- a/libs/vr/libpdx_uds/client_channel.cpp
+++ b/libs/vr/libpdx_uds/client_channel.cpp
@@ -50,12 +50,17 @@
ChannelReference PushChannelHandle(BorrowedChannelHandle handle) {
if (!handle)
return handle.value();
- ChannelInfo<BorrowedHandle> channel_info;
- channel_info.data_fd.Reset(handle.value());
- channel_info.event_fd.Reset(
- ChannelManager::Get().GetEventFd(handle.value()));
- request.channels.push_back(std::move(channel_info));
- return request.channels.size() - 1;
+
+ if (auto* channel_data =
+ ChannelManager::Get().GetChannelData(handle.value())) {
+ ChannelInfo<BorrowedHandle> channel_info;
+ channel_info.data_fd.Reset(handle.value());
+ channel_info.event_fd = channel_data->event_receiver.event_fd();
+ request.channels.push_back(std::move(channel_info));
+ return request.channels.size() - 1;
+ } else {
+ return -1;
+ }
}
RequestHeader<BorrowedHandle> request;
@@ -127,27 +132,7 @@
ClientChannel::ClientChannel(LocalChannelHandle channel_handle)
: channel_handle_{std::move(channel_handle)} {
- int data_fd = channel_handle_.value();
- int event_fd =
- channel_handle_ ? ChannelManager::Get().GetEventFd(data_fd) : -1;
-
- if (event_fd >= 0) {
- epoll_fd_.Reset(epoll_create(1));
- if (epoll_fd_) {
- epoll_event data_ev;
- data_ev.events = EPOLLHUP;
- data_ev.data.fd = data_fd;
-
- epoll_event event_ev;
- event_ev.events = EPOLLIN;
- event_ev.data.fd = event_fd;
- if (epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, data_fd, &data_ev) < 0 ||
- epoll_ctl(epoll_fd_.Get(), EPOLL_CTL_ADD, event_fd, &event_ev) < 0) {
- ALOGE("Failed to add fd to epoll fd because: %s\n", strerror(errno));
- epoll_fd_.Close();
- }
- }
- }
+ channel_data_ = ChannelManager::Get().GetChannelData(channel_handle_.value());
}
std::unique_ptr<pdx::ClientChannel> ClientChannel::Create(
diff --git a/libs/vr/libpdx_uds/ipc_helper.cpp b/libs/vr/libpdx_uds/ipc_helper.cpp
index dca23ef..ee7299e 100644
--- a/libs/vr/libpdx_uds/ipc_helper.cpp
+++ b/libs/vr/libpdx_uds/ipc_helper.cpp
@@ -134,7 +134,9 @@
RETRY_EINTR(recv(socket_fd, &preamble, sizeof(preamble), MSG_WAITALL));
if (ret < 0)
return ErrorStatus(errno);
- if (ret != sizeof(preamble) || preamble.magic != kMagicPreamble)
+ else if (ret == 0)
+ return ErrorStatus(ESHUTDOWN);
+ else if (ret != sizeof(preamble) || preamble.magic != kMagicPreamble)
return ErrorStatus(EIO);
buffer_.resize(preamble.data_size);
@@ -157,7 +159,9 @@
ret = RETRY_EINTR(recvmsg(socket_fd, &msg, MSG_WAITALL));
if (ret < 0)
return ErrorStatus(errno);
- if (static_cast<uint32_t>(ret) != preamble.data_size)
+ else if (ret == 0)
+ return ErrorStatus(ESHUTDOWN);
+ else if (static_cast<uint32_t>(ret) != preamble.data_size)
return ErrorStatus(EIO);
bool cred_available = false;
@@ -239,7 +243,9 @@
ssize_t size_read = RETRY_EINTR(recv(socket_fd, data, size, MSG_WAITALL));
if (size_read < 0)
return ErrorStatus(errno);
- if (static_cast<size_t>(size_read) != size)
+ else if (size_read == 0)
+ return ErrorStatus(ESHUTDOWN);
+ else if (static_cast<size_t>(size_read) != size)
return ErrorStatus(EIO);
return {};
}
@@ -251,7 +257,9 @@
ssize_t size_read = RETRY_EINTR(recvmsg(socket_fd, &msg, MSG_WAITALL));
if (size_read < 0)
return ErrorStatus(errno);
- if (static_cast<size_t>(size_read) != CountVectorSize(data, count))
+ else if (size_read == 0)
+ return ErrorStatus(ESHUTDOWN);
+ else if (static_cast<size_t>(size_read) != CountVectorSize(data, count))
return ErrorStatus(EIO);
return {};
}
diff --git a/libs/vr/libpdx_uds/private/uds/channel_event_set.h b/libs/vr/libpdx_uds/private/uds/channel_event_set.h
new file mode 100644
index 0000000..1f464d5
--- /dev/null
+++ b/libs/vr/libpdx_uds/private/uds/channel_event_set.h
@@ -0,0 +1,62 @@
+#ifndef ANDROID_PDX_UDS_CHANNEL_EVENT_SET_H_
+#define ANDROID_PDX_UDS_CHANNEL_EVENT_SET_H_
+
+#include <errno.h>
+#include <poll.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
+
+#include <pdx/file_handle.h>
+#include <pdx/status.h>
+
+namespace android {
+namespace pdx {
+namespace uds {
+
+class ChannelEventSet {
+ public:
+ ChannelEventSet();
+ ChannelEventSet(ChannelEventSet&&) = default;
+ ChannelEventSet& operator=(ChannelEventSet&&) = default;
+
+ BorrowedHandle event_fd() const { return epoll_fd_.Borrow(); }
+
+ explicit operator bool() const { return !!epoll_fd_ && !!event_fd_; }
+
+ Status<void> AddDataFd(const LocalHandle& data_fd);
+ int ModifyEvents(int clear_mask, int set_mask);
+
+ private:
+ LocalHandle epoll_fd_;
+ LocalHandle event_fd_;
+ uint32_t event_bits_ = 0;
+
+ static Status<void> SetupHandle(int fd, LocalHandle* handle,
+ const char* error_name);
+
+ ChannelEventSet(const ChannelEventSet&) = delete;
+ void operator=(const ChannelEventSet&) = delete;
+};
+
+class ChannelEventReceiver {
+ public:
+ ChannelEventReceiver() = default;
+ ChannelEventReceiver(LocalHandle epoll_fd) : epoll_fd_{std::move(epoll_fd)} {}
+ ChannelEventReceiver(ChannelEventReceiver&&) = default;
+ ChannelEventReceiver& operator=(ChannelEventReceiver&&) = default;
+
+ BorrowedHandle event_fd() const { return epoll_fd_.Borrow(); }
+ Status<int> GetPendingEvents() const;
+
+ private:
+ LocalHandle epoll_fd_;
+
+ ChannelEventReceiver(const ChannelEventReceiver&) = delete;
+ void operator=(const ChannelEventReceiver&) = delete;
+};
+
+} // namespace uds
+} // namespace pdx
+} // namespace android
+
+#endif // ANDROID_PDX_UDS_CHANNEL_EVENT_SET_H_
diff --git a/libs/vr/libpdx_uds/private/uds/channel_manager.h b/libs/vr/libpdx_uds/private/uds/channel_manager.h
index e8d3f6e..2aca414 100644
--- a/libs/vr/libpdx_uds/private/uds/channel_manager.h
+++ b/libs/vr/libpdx_uds/private/uds/channel_manager.h
@@ -6,6 +6,7 @@
#include <pdx/channel_handle.h>
#include <pdx/file_handle.h>
+#include <uds/channel_event_set.h>
namespace android {
namespace pdx {
@@ -16,13 +17,14 @@
static ChannelManager& Get();
LocalChannelHandle CreateHandle(LocalHandle data_fd, LocalHandle event_fd);
- int GetEventFd(int32_t handle);
-
- private:
struct ChannelData {
LocalHandle data_fd;
- LocalHandle event_fd;
+ ChannelEventReceiver event_receiver;
};
+
+ ChannelData* GetChannelData(int32_t handle);
+
+ private:
ChannelManager() = default;
void CloseHandle(int32_t handle) override;
diff --git a/libs/vr/libpdx_uds/private/uds/client_channel.h b/libs/vr/libpdx_uds/private/uds/client_channel.h
index 12a40e7..45f6473 100644
--- a/libs/vr/libpdx_uds/private/uds/client_channel.h
+++ b/libs/vr/libpdx_uds/private/uds/client_channel.h
@@ -3,6 +3,7 @@
#include <pdx/client_channel.h>
+#include <uds/channel_event_set.h>
#include <uds/channel_manager.h>
#include <uds/service_endpoint.h>
@@ -18,7 +19,17 @@
LocalChannelHandle channel_handle);
uint32_t GetIpcTag() const override { return Endpoint::kIpcTag; }
- int event_fd() const override { return epoll_fd_.Get(); }
+
+ int event_fd() const override {
+ return channel_data_ ? channel_data_->event_receiver.event_fd().Get() : -1;
+ }
+ Status<int> GetEventMask(int /*events*/) override {
+ if (channel_data_)
+ return channel_data_->event_receiver.GetPendingEvents();
+ else
+ return ErrorStatus(EINVAL);
+ }
+
LocalChannelHandle& GetChannelHandle() override { return channel_handle_; }
void* AllocateTransactionState() override;
void FreeTransactionState(void* state) override;
@@ -61,7 +72,7 @@
const iovec* receive_vector, size_t receive_count);
LocalChannelHandle channel_handle_;
- LocalHandle epoll_fd_;
+ ChannelManager::ChannelData* channel_data_;
};
} // namespace uds
diff --git a/libs/vr/libpdx_uds/private/uds/service_endpoint.h b/libs/vr/libpdx_uds/private/uds/service_endpoint.h
index 0f69400..3ec8519 100644
--- a/libs/vr/libpdx_uds/private/uds/service_endpoint.h
+++ b/libs/vr/libpdx_uds/private/uds/service_endpoint.h
@@ -11,6 +11,7 @@
#include <pdx/service.h>
#include <pdx/service_endpoint.h>
+#include <uds/channel_event_set.h>
#include <uds/service_dispatcher.h>
namespace android {
@@ -95,8 +96,7 @@
private:
struct ChannelData {
LocalHandle data_fd;
- LocalHandle event_fd;
- uint32_t event_mask{0};
+ ChannelEventSet event_set;
Channel* channel_state{nullptr};
};
@@ -110,6 +110,8 @@
return next_message_id_.fetch_add(1, std::memory_order_relaxed);
}
+ void BuildCloseMessage(int channel_id, Message* message);
+
Status<void> AcceptConnection(Message* message);
Status<void> ReceiveMessageForChannel(int channel_id, Message* message);
Status<void> OnNewChannel(LocalHandle channel_fd);
@@ -130,9 +132,6 @@
mutable std::mutex channel_mutex_;
std::map<int, ChannelData> channels_;
- mutable std::mutex service_mutex_;
- std::condition_variable condition_;
-
Service* service_{nullptr};
std::atomic<uint32_t> next_message_id_;
};
diff --git a/libs/vr/libpdx_uds/service_dispatcher.cpp b/libs/vr/libpdx_uds/service_dispatcher.cpp
index 8a40158..fa98f26 100644
--- a/libs/vr/libpdx_uds/service_dispatcher.cpp
+++ b/libs/vr/libpdx_uds/service_dispatcher.cpp
@@ -1,4 +1,3 @@
-#define LOG_TAG "ServiceDispatcher"
#include "uds/service_dispatcher.h"
#include <errno.h>
@@ -9,8 +8,6 @@
#include "pdx/service.h"
#include "uds/service_endpoint.h"
-#define TRACE 0
-
static const int kMaxEventsPerLoop = 128;
namespace android {
diff --git a/libs/vr/libpdx_uds/service_endpoint.cpp b/libs/vr/libpdx_uds/service_endpoint.cpp
index b6021e8..7bf753d 100644
--- a/libs/vr/libpdx_uds/service_endpoint.cpp
+++ b/libs/vr/libpdx_uds/service_endpoint.cpp
@@ -61,12 +61,17 @@
ChannelReference PushChannelHandle(BorrowedChannelHandle handle) {
if (!handle)
return handle.value();
- ChannelInfo<BorrowedHandle> channel_info;
- channel_info.data_fd.Reset(handle.value());
- channel_info.event_fd.Reset(
- ChannelManager::Get().GetEventFd(handle.value()));
- response.channels.push_back(std::move(channel_info));
- return response.channels.size() - 1;
+
+ if (auto* channel_data =
+ ChannelManager::Get().GetChannelData(handle.value())) {
+ ChannelInfo<BorrowedHandle> channel_info;
+ channel_info.data_fd.Reset(handle.value());
+ channel_info.event_fd = channel_data->event_receiver.event_fd();
+ response.channels.push_back(std::move(channel_info));
+ return response.channels.size() - 1;
+ } else {
+ return -1;
+ }
}
ChannelReference PushChannelHandle(BorrowedHandle data_fd,
@@ -156,8 +161,6 @@
return;
}
- // Use "this" as a unique pointer to distinguish the event fd from all
- // the other entries that point to instances of Service.
epoll_event socket_event;
socket_event.events = EPOLLIN | EPOLLRDHUP | EPOLLONESHOT;
socket_event.data.fd = fd.Get();
@@ -244,9 +247,9 @@
return ErrorStatus(errno);
}
ChannelData channel_data;
- int channel_id = channel_fd.Get();
+ const int channel_id = channel_fd.Get();
+ channel_data.event_set.AddDataFd(channel_fd);
channel_data.data_fd = std::move(channel_fd);
- channel_data.event_fd.Reset(eventfd(0, 0));
channel_data.channel_state = channel_state;
auto pair = channels_.emplace(channel_id, std::move(channel_data));
return &pair.first->second;
@@ -272,6 +275,8 @@
}
int Endpoint::CloseChannelLocked(int channel_id) {
+ ALOGD_IF(TRACE, "Endpoint::CloseChannelLocked: channel_id=%d", channel_id);
+
auto channel_data = channels_.find(channel_id);
if (channel_data == channels_.end())
return -EINVAL;
@@ -293,31 +298,15 @@
int Endpoint::ModifyChannelEvents(int channel_id, int clear_mask,
int set_mask) {
std::lock_guard<std::mutex> autolock(channel_mutex_);
- auto channel_data = channels_.find(channel_id);
- if (channel_data == channels_.end())
- return -EINVAL;
- int old_mask = channel_data->second.event_mask;
- int new_mask = (old_mask & ~clear_mask) | set_mask;
- // EPOLLHUP shares the same bitmask with POLLHUP.
- if ((new_mask & POLLHUP) && !(old_mask & POLLHUP)) {
- CloseChannelLocked(channel_id);
+ auto search = channels_.find(channel_id);
+ if (search != channels_.end()) {
+ auto& channel_data = search->second;
+ channel_data.event_set.ModifyEvents(clear_mask, set_mask);
return 0;
}
- // EPOLLIN shares the same bitmask with POLLIN and EPOLLPRI shares the same
- // bitmask with POLLPRI
- eventfd_t value = 1;
- if (((new_mask & POLLIN) && !(old_mask & POLLIN)) ||
- ((new_mask & POLLPRI) && !(old_mask & POLLPRI))) {
- eventfd_write(channel_data->second.event_fd.Get(), value);
- } else if ((!(new_mask & POLLIN) && (old_mask & POLLIN)) ||
- (!(new_mask & POLLPRI) && (old_mask & POLLPRI))) {
- eventfd_read(channel_data->second.event_fd.Get(), &value);
- }
-
- channel_data->second.event_mask = new_mask;
- return 0;
+ return -EINVAL;
}
Status<RemoteChannelHandle> Endpoint::PushChannel(Message* message,
@@ -355,7 +344,8 @@
auto* state = static_cast<MessageState*>(message->GetState());
ChannelReference ref = state->PushChannelHandle(
- remote_socket.Borrow(), channel_data.get()->event_fd.Borrow());
+ remote_socket.Borrow(),
+ channel_data.get()->event_set.event_fd().Borrow());
state->sockets_to_close.push_back(std::move(remote_socket));
return RemoteChannelHandle{ref};
}
@@ -391,8 +381,9 @@
int Endpoint::GetChannelEventFd(int channel_id) {
std::lock_guard<std::mutex> autolock(channel_mutex_);
auto channel_data = channels_.find(channel_id);
- return (channel_data != channels_.end()) ? channel_data->second.event_fd.Get()
- : -1;
+ return (channel_data != channels_.end())
+ ? channel_data->second.event_set.event_fd().Get()
+ : -1;
}
Status<void> Endpoint::ReceiveMessageForChannel(int channel_id,
@@ -400,9 +391,15 @@
RequestHeader<LocalHandle> request;
auto status = ReceiveData(channel_id, &request);
if (!status) {
- CloseChannel(channel_id);
- return status;
+ if (status.error() == ESHUTDOWN) {
+ BuildCloseMessage(channel_id, message);
+ return {};
+ } else {
+ CloseChannel(channel_id);
+ return status;
+ }
}
+
MessageInfo info;
info.pid = request.cred.pid;
info.tid = -1;
@@ -435,19 +432,42 @@
if (status && request.is_impulse)
status = ReenableEpollEvent(channel_id);
- if (!status)
- CloseChannel(channel_id);
+ if (!status) {
+ if (status.error() == ESHUTDOWN) {
+ BuildCloseMessage(channel_id, message);
+ return {};
+ } else {
+ CloseChannel(channel_id);
+ return status;
+ }
+ }
return status;
}
-int Endpoint::MessageReceive(Message* message) {
- {
- std::unique_lock<std::mutex> lock(service_mutex_);
- condition_.wait(lock, [this] { return service_ != nullptr; });
- }
+void Endpoint::BuildCloseMessage(int channel_id, Message* message) {
+ ALOGD_IF(TRACE, "Endpoint::BuildCloseMessage: channel_id=%d", channel_id);
+ MessageInfo info;
+ info.pid = -1;
+ info.tid = -1;
+ info.cid = channel_id;
+ info.mid = GetNextAvailableMessageId();
+ info.euid = -1;
+ info.egid = -1;
+ info.op = opcodes::CHANNEL_CLOSE;
+ info.flags = 0;
+ info.service = service_;
+ info.channel = GetChannelState(channel_id);
+ info.send_len = 0;
+ info.recv_len = 0;
+ info.fd_count = 0;
+ *message = Message{info};
+}
- // One event at a time.
+int Endpoint::MessageReceive(Message* message) {
+ // Receive at most one event from the epoll set. This should prevent multiple
+ // dispatch threads from attempting to handle messages on the same socket at
+ // the same time.
epoll_event event;
int count = RETRY_EINTR(
epoll_wait(epoll_fd_.Get(), &event, 1, is_blocking_ ? -1 : 0));
@@ -472,22 +492,8 @@
}
int channel_id = event.data.fd;
- if (event.events & EPOLLRDHUP) {
- MessageInfo info;
- info.pid = -1;
- info.tid = -1;
- info.cid = channel_id;
- info.mid = GetNextAvailableMessageId();
- info.euid = -1;
- info.egid = -1;
- info.op = opcodes::CHANNEL_CLOSE;
- info.flags = 0;
- info.service = service_;
- info.channel = GetChannelState(channel_id);
- info.send_len = 0;
- info.recv_len = 0;
- info.fd_count = 0;
- *message = Message{info};
+ if (event.events & (EPOLLRDHUP | EPOLLHUP)) {
+ BuildCloseMessage(channel_id, message);
return 0;
}
@@ -498,18 +504,19 @@
}
int Endpoint::MessageReply(Message* message, int return_code) {
- int channel_socket = GetChannelSocketFd(message->GetChannelId());
+ const int channel_id = message->GetChannelId();
+ const int channel_socket = GetChannelSocketFd(channel_id);
if (channel_socket < 0)
return -EBADF;
auto* state = static_cast<MessageState*>(message->GetState());
switch (message->GetOp()) {
case opcodes::CHANNEL_CLOSE:
- return CloseChannel(channel_socket);
+ return CloseChannel(channel_id);
case opcodes::CHANNEL_OPEN:
if (return_code < 0)
- return CloseChannel(channel_socket);
+ return CloseChannel(channel_id);
// Reply with the event fd.
return_code = state->PushFileHandle(
BorrowedHandle{GetChannelEventFd(channel_socket)});