BufferHubQueue: ClearAvailable on consumer import
This fixes the issue that ConsumerQueue keeps receiving
|OnBufferAllocated| events when using UDS based libpdx. It also hardens
error handling during consumer imports.
Bug: 34690159
Test: Build, flash, buffer_hub_queue-test pases
Change-Id: I4f739a2869df851ed836f39122b3eba95697f3f6
diff --git a/libs/vr/libbufferhubqueue/Android.mk b/libs/vr/libbufferhubqueue/Android.mk
index 3ed7ff2..c70aaf4 100644
--- a/libs/vr/libbufferhubqueue/Android.mk
+++ b/libs/vr/libbufferhubqueue/Android.mk
@@ -40,6 +40,7 @@
include $(CLEAR_VARS)
LOCAL_SRC_FILES := $(sourceFiles)
LOCAL_C_INCLUDES := $(includeFiles)
+LOCAL_CFLAGS := -DLOG_TAG=\"libbufferhubqueue\"
LOCAL_EXPORT_C_INCLUDE_DIRS := $(includeFiles)
LOCAL_STATIC_LIBRARIES := $(staticLibraries)
LOCAL_SHARED_LIBRARIES := $(sharedLibraries)
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
index 0576b21..bad9503 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
@@ -1,5 +1,7 @@
#include "include/private/dvr/buffer_hub_queue_client.h"
+//#define LOG_NDEBUG 0
+
#include <inttypes.h>
#include <log/log.h>
#include <sys/epoll.h>
@@ -24,6 +26,7 @@
meta_size_(meta_size),
meta_buffer_tmp_(meta_size ? new uint8_t[meta_size] : nullptr),
buffers_(BufferHubQueue::kMaxQueueCapacity),
+ epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
available_buffers_(BufferHubQueue::kMaxQueueCapacity),
capacity_(0) {
Initialize();
@@ -36,6 +39,7 @@
meta_size_(meta_size),
meta_buffer_tmp_(meta_size ? new uint8_t[meta_size] : nullptr),
buffers_(BufferHubQueue::kMaxQueueCapacity),
+ epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
available_buffers_(BufferHubQueue::kMaxQueueCapacity),
capacity_(0) {
Initialize();
@@ -101,31 +105,12 @@
ALOGD("New BufferHubQueue event %d: index=%" PRId64, i, index);
- if (is_buffer_event_index(index) && (events[i].events & EPOLLIN)) {
- auto buffer = buffers_[index];
- ret = OnBufferReady(buffer);
- if (ret < 0) {
- ALOGE("Failed to set buffer ready: %s", strerror(-ret));
- continue;
- }
- Enqueue(buffer, index);
- } else if (is_buffer_event_index(index) &&
- (events[i].events & EPOLLHUP)) {
- // This maybe caused by producer replacing an exising buffer slot.
- // Currently the epoll FD is cleaned up when the replacement consumer
- // client is imported.
- ALOGW("Receives EPOLLHUP at slot: %" PRId64, index);
- } else if (is_queue_event_index(index) && (events[i].events & EPOLLIN)) {
- // Note that after buffer imports, if |count()| still returns 0, epoll
- // wait will be tried again to acquire the newly imported buffer.
- ret = OnBufferAllocated();
- if (ret < 0) {
- ALOGE("Failed to import buffer: %s", strerror(-ret));
- continue;
- }
+ if (is_buffer_event_index(index)) {
+ HandleBufferEvent(static_cast<size_t>(index), events[i]);
+ } else if (is_queue_event_index(index)) {
+ HandleQueueEvent(events[i]);
} else {
- ALOGW("Unknown event %d: u64=%" PRId64 ": events=%" PRIu32, i, index,
- events[i].events);
+ ALOGW("Unknown event index: %" PRId64, index);
}
}
}
@@ -133,6 +118,68 @@
return true;
}
+void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {
+ auto buffer = buffers_[slot];
+ if (!buffer) {
+ ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
+ return;
+ }
+
+ auto status = buffer->GetEventMask(event.events);
+ if (!status) {
+ ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
+ status.GetErrorMessage().c_str());
+ return;
+ }
+
+ int events = status.get();
+ if (events & EPOLLIN) {
+ int ret = OnBufferReady(buffer);
+ if (ret < 0) {
+ ALOGE("Failed to set buffer ready: %s", strerror(-ret));
+ return;
+ }
+ Enqueue(buffer, slot);
+ } else if (events & EPOLLHUP) {
+ // This might be caused by producer replacing an existing buffer slot, or
+ // when BufferHubQueue is shutting down. For the first case, currently the
+ // epoll FD is cleaned up when the replacement consumer client is imported,
+ // we shouldn't detach again if |epollhub_pending_[slot]| is set.
+ ALOGW(
+ "Receives EPOLLHUP at slot: %zu, buffer event fd: %d, EPOLLHUP "
+ "pending: %d",
+ slot, buffer->event_fd(), epollhup_pending_[slot]);
+ if (epollhup_pending_[slot]) {
+ epollhup_pending_[slot] = false;
+ } else {
+ DetachBuffer(slot);
+ }
+ } else {
+ ALOGW("Unknown event, slot=%zu, epoll events=%d", slot, events);
+ }
+}
+
+void BufferHubQueue::HandleQueueEvent(const epoll_event& event) {
+ auto status = GetEventMask(event.events);
+ if (!status) {
+ ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
+ status.GetErrorMessage().c_str());
+ return;
+ }
+
+ int events = status.get();
+ if (events & EPOLLIN) {
+ // Note that after buffer imports, if |count()| still returns 0, epoll
+ // wait will be tried again to acquire the newly imported buffer.
+ int ret = OnBufferAllocated();
+ if (ret < 0) {
+ ALOGE("Failed to import buffer: %s", strerror(-ret));
+ }
+ } else {
+ ALOGW("Unknown epoll events=%d", events);
+ }
+}
+
int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
size_t slot) {
if (is_full()) {
@@ -146,8 +193,9 @@
if (buffers_[slot] != nullptr) {
// Replace the buffer if the slot is preoccupied. This could happen when the
// producer side replaced the slot with a newly allocated buffer. Detach the
- // buffer and set up with the new one.
+ // buffer before setting up with the new one.
DetachBuffer(slot);
+ epollhup_pending_[slot] = true;
}
epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_consumer.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_consumer.cpp
index 1ea3994..02bca09 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_consumer.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_consumer.cpp
@@ -1,5 +1,7 @@
#include "include/private/dvr/buffer_hub_queue_consumer.h"
+//#define LOG_NDEBUG 0
+
namespace android {
namespace dvr {
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_core.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_core.cpp
index a108042..b013c85 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_core.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_core.cpp
@@ -1,5 +1,8 @@
#include "include/private/dvr/buffer_hub_queue_core.h"
+//#define LOG_NDEBUG 0
+#define LOG_TAG "BufferHubQueueCore"
+
#include <log/log.h>
namespace android {
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
index 752e8c4..7ddf49b 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
@@ -1,5 +1,7 @@
#include "include/private/dvr/buffer_hub_queue_producer.h"
+//#define LOG_NDEBUG 0
+
#include <inttypes.h>
#include <log/log.h>
diff --git a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
index 83e77d4..1f2830a 100644
--- a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
+++ b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
@@ -49,6 +49,14 @@
return buffers_[slot];
}
+ Status<int> GetEventMask(int events) {
+ if (auto* client_channel = GetChannel()) {
+ return client_channel->GetEventMask(events);
+ } else {
+ return pdx::ErrorStatus(EINVAL);
+ }
+ }
+
// Enqueue a buffer marks buffer to be available (|Gain|'ed for producer
// and |Acquire|'ed for consumer. This is only used for internal bookkeeping.
void Enqueue(std::shared_ptr<BufferHubBuffer> buf, size_t slot);
@@ -87,6 +95,9 @@
// Wait for buffers to be released and re-add them to the queue.
bool WaitForBuffers(int timeout);
+ void HandleBufferEvent(size_t slot, const epoll_event& event);
+ void HandleQueueEvent(const epoll_event& event);
+
virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) = 0;
// Called when a buffer is allocated remotely.
@@ -160,6 +171,30 @@
// |buffers_| tracks all |BufferHubBuffer|s created by this |BufferHubQueue|.
std::vector<std::shared_ptr<BufferHubBuffer>> buffers_;
+ // |epollhup_pending_| tracks whether a slot of |buffers_| get detached before
+ // its corresponding EPOLLHUP event got handled. This could happen as the
+ // following sequence:
+ // 1. Producer queue's client side allocates a new buffer (at slot 1).
+ // 2. Producer queue's client side replaces an existing buffer (at slot 0).
+ // This is implemented by first detaching the buffer and then allocating a
+ // new buffer.
+ // 3. During the same epoll_wait, Consumer queue's client side gets EPOLLIN
+ // event on the queue which indicates a new buffer is avaiable and the
+ // EPOLLHUP event for slot 0. Consumer handles these two events in order.
+ // 4. Consumer client calls BufferHubRPC::ConsumerQueueImportBuffers and both
+ // slot 0 and (the new) slot 1 buffer will be imported. During the import
+ // of the buffer at slot 1, consuemr client detaches the old buffer so that
+ // the new buffer can be registered. At the same time
+ // |epollhup_pending_[slot]| is marked to indicate that buffer at this slot
+ // was detached prior to EPOLLHUP event.
+ // 5. Consumer client continues to handle the EPOLLHUP. Since
+ // |epollhup_pending_[slot]| is marked as true, it can safely ignore the
+ // event without detaching the newly allocated buffer at slot 1.
+ //
+ // In normal situations where the previously described sequence doesn't
+ // happen, an EPOLLHUP event should trigger a regular buffer detach.
+ std::vector<bool> epollhup_pending_;
+
// |available_buffers_| uses |dvr::RingBuffer| to implementation queue
// sematics. When |Dequeue|, we pop the front element from
// |available_buffers_|, and that buffer's reference count will decrease by
@@ -225,7 +260,7 @@
// Returns Zero on success and negative error code when buffer allocation
// fails.
int AllocateBuffer(int width, int height, int format, int usage,
- size_t buffer_count, size_t* out_slot);
+ size_t slice_count, size_t* out_slot);
// Add a producer buffer to populate the queue. Once added, a producer buffer
// is available to use (i.e. in |Gain|'ed mode).