BufferHubQueue: ClearAvailable on consumer import

This fixes the issue that ConsumerQueue keeps receiving
|OnBufferAllocated| events when using UDS based libpdx. It also hardens
error handling during consumer imports.

Bug: 34690159
Test: Build, flash, buffer_hub_queue-test pases
Change-Id: I4f739a2869df851ed836f39122b3eba95697f3f6
diff --git a/libs/vr/libbufferhubqueue/Android.mk b/libs/vr/libbufferhubqueue/Android.mk
index 3ed7ff2..c70aaf4 100644
--- a/libs/vr/libbufferhubqueue/Android.mk
+++ b/libs/vr/libbufferhubqueue/Android.mk
@@ -40,6 +40,7 @@
 include $(CLEAR_VARS)
 LOCAL_SRC_FILES := $(sourceFiles)
 LOCAL_C_INCLUDES := $(includeFiles)
+LOCAL_CFLAGS := -DLOG_TAG=\"libbufferhubqueue\"
 LOCAL_EXPORT_C_INCLUDE_DIRS := $(includeFiles)
 LOCAL_STATIC_LIBRARIES := $(staticLibraries)
 LOCAL_SHARED_LIBRARIES := $(sharedLibraries)
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
index 0576b21..bad9503 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
@@ -1,5 +1,7 @@
 #include "include/private/dvr/buffer_hub_queue_client.h"
 
+//#define LOG_NDEBUG 0
+
 #include <inttypes.h>
 #include <log/log.h>
 #include <sys/epoll.h>
@@ -24,6 +26,7 @@
       meta_size_(meta_size),
       meta_buffer_tmp_(meta_size ? new uint8_t[meta_size] : nullptr),
       buffers_(BufferHubQueue::kMaxQueueCapacity),
+      epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
       available_buffers_(BufferHubQueue::kMaxQueueCapacity),
       capacity_(0) {
   Initialize();
@@ -36,6 +39,7 @@
       meta_size_(meta_size),
       meta_buffer_tmp_(meta_size ? new uint8_t[meta_size] : nullptr),
       buffers_(BufferHubQueue::kMaxQueueCapacity),
+      epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
       available_buffers_(BufferHubQueue::kMaxQueueCapacity),
       capacity_(0) {
   Initialize();
@@ -101,31 +105,12 @@
 
       ALOGD("New BufferHubQueue event %d: index=%" PRId64, i, index);
 
-      if (is_buffer_event_index(index) && (events[i].events & EPOLLIN)) {
-        auto buffer = buffers_[index];
-        ret = OnBufferReady(buffer);
-        if (ret < 0) {
-          ALOGE("Failed to set buffer ready: %s", strerror(-ret));
-          continue;
-        }
-        Enqueue(buffer, index);
-      } else if (is_buffer_event_index(index) &&
-                 (events[i].events & EPOLLHUP)) {
-        // This maybe caused by producer replacing an exising buffer slot.
-        // Currently the epoll FD is cleaned up when the replacement consumer
-        // client is imported.
-        ALOGW("Receives EPOLLHUP at slot: %" PRId64, index);
-      } else if (is_queue_event_index(index) && (events[i].events & EPOLLIN)) {
-        // Note that after buffer imports, if |count()| still returns 0, epoll
-        // wait will be tried again to acquire the newly imported buffer.
-        ret = OnBufferAllocated();
-        if (ret < 0) {
-          ALOGE("Failed to import buffer: %s", strerror(-ret));
-          continue;
-        }
+      if (is_buffer_event_index(index)) {
+        HandleBufferEvent(static_cast<size_t>(index), events[i]);
+      } else if (is_queue_event_index(index)) {
+        HandleQueueEvent(events[i]);
       } else {
-        ALOGW("Unknown event %d: u64=%" PRId64 ": events=%" PRIu32, i, index,
-              events[i].events);
+        ALOGW("Unknown event index: %" PRId64, index);
       }
     }
   }
@@ -133,6 +118,68 @@
   return true;
 }
 
+void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {
+  auto buffer = buffers_[slot];
+  if (!buffer) {
+    ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
+    return;
+  }
+
+  auto status = buffer->GetEventMask(event.events);
+  if (!status) {
+    ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
+          status.GetErrorMessage().c_str());
+    return;
+  }
+
+  int events = status.get();
+  if (events & EPOLLIN) {
+    int ret = OnBufferReady(buffer);
+    if (ret < 0) {
+      ALOGE("Failed to set buffer ready: %s", strerror(-ret));
+      return;
+    }
+    Enqueue(buffer, slot);
+  } else if (events & EPOLLHUP) {
+    // This might be caused by producer replacing an existing buffer slot, or
+    // when BufferHubQueue is shutting down. For the first case, currently the
+    // epoll FD is cleaned up when the replacement consumer client is imported,
+    // we shouldn't detach again if |epollhub_pending_[slot]| is set.
+    ALOGW(
+        "Receives EPOLLHUP at slot: %zu, buffer event fd: %d, EPOLLHUP "
+        "pending: %d",
+        slot, buffer->event_fd(), epollhup_pending_[slot]);
+    if (epollhup_pending_[slot]) {
+      epollhup_pending_[slot] = false;
+    } else {
+      DetachBuffer(slot);
+    }
+  } else {
+    ALOGW("Unknown event, slot=%zu, epoll events=%d", slot, events);
+  }
+}
+
+void BufferHubQueue::HandleQueueEvent(const epoll_event& event) {
+  auto status = GetEventMask(event.events);
+  if (!status) {
+    ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
+          status.GetErrorMessage().c_str());
+    return;
+  }
+
+  int events = status.get();
+  if (events & EPOLLIN) {
+    // Note that after buffer imports, if |count()| still returns 0, epoll
+    // wait will be tried again to acquire the newly imported buffer.
+    int ret = OnBufferAllocated();
+    if (ret < 0) {
+      ALOGE("Failed to import buffer: %s", strerror(-ret));
+    }
+  } else {
+    ALOGW("Unknown epoll events=%d", events);
+  }
+}
+
 int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
                               size_t slot) {
   if (is_full()) {
@@ -146,8 +193,9 @@
   if (buffers_[slot] != nullptr) {
     // Replace the buffer if the slot is preoccupied. This could happen when the
     // producer side replaced the slot with a newly allocated buffer. Detach the
-    // buffer and set up with the new one.
+    // buffer before setting up with the new one.
     DetachBuffer(slot);
+    epollhup_pending_[slot] = true;
   }
 
   epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_consumer.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_consumer.cpp
index 1ea3994..02bca09 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_consumer.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_consumer.cpp
@@ -1,5 +1,7 @@
 #include "include/private/dvr/buffer_hub_queue_consumer.h"
 
+//#define LOG_NDEBUG 0
+
 namespace android {
 namespace dvr {
 
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_core.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_core.cpp
index a108042..b013c85 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_core.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_core.cpp
@@ -1,5 +1,8 @@
 #include "include/private/dvr/buffer_hub_queue_core.h"
 
+//#define LOG_NDEBUG 0
+#define LOG_TAG "BufferHubQueueCore"
+
 #include <log/log.h>
 
 namespace android {
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
index 752e8c4..7ddf49b 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
@@ -1,5 +1,7 @@
 #include "include/private/dvr/buffer_hub_queue_producer.h"
 
+//#define LOG_NDEBUG 0
+
 #include <inttypes.h>
 #include <log/log.h>
 
diff --git a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
index 83e77d4..1f2830a 100644
--- a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
+++ b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
@@ -49,6 +49,14 @@
     return buffers_[slot];
   }
 
+  Status<int> GetEventMask(int events) {
+    if (auto* client_channel = GetChannel()) {
+      return client_channel->GetEventMask(events);
+    } else {
+      return pdx::ErrorStatus(EINVAL);
+    }
+  }
+
   // Enqueue a buffer marks buffer to be available (|Gain|'ed for producer
   // and |Acquire|'ed for consumer. This is only used for internal bookkeeping.
   void Enqueue(std::shared_ptr<BufferHubBuffer> buf, size_t slot);
@@ -87,6 +95,9 @@
 
   // Wait for buffers to be released and re-add them to the queue.
   bool WaitForBuffers(int timeout);
+  void HandleBufferEvent(size_t slot, const epoll_event& event);
+  void HandleQueueEvent(const epoll_event& event);
+
   virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) = 0;
 
   // Called when a buffer is allocated remotely.
@@ -160,6 +171,30 @@
   // |buffers_| tracks all |BufferHubBuffer|s created by this |BufferHubQueue|.
   std::vector<std::shared_ptr<BufferHubBuffer>> buffers_;
 
+  // |epollhup_pending_| tracks whether a slot of |buffers_| get detached before
+  // its corresponding EPOLLHUP event got handled. This could happen as the
+  // following sequence:
+  // 1. Producer queue's client side allocates a new buffer (at slot 1).
+  // 2. Producer queue's client side replaces an existing buffer (at slot 0).
+  //    This is implemented by first detaching the buffer and then allocating a
+  //    new buffer.
+  // 3. During the same epoll_wait, Consumer queue's client side gets EPOLLIN
+  //    event on the queue which indicates a new buffer is avaiable and the
+  //    EPOLLHUP event for slot 0. Consumer handles these two events in order.
+  // 4. Consumer client calls BufferHubRPC::ConsumerQueueImportBuffers and both
+  //    slot 0 and (the new) slot 1 buffer will be imported. During the import
+  //    of the buffer at slot 1, consuemr client detaches the old buffer so that
+  //    the new buffer can be registered. At the same time
+  //    |epollhup_pending_[slot]| is marked to indicate that buffer at this slot
+  //    was detached prior to EPOLLHUP event.
+  // 5. Consumer client continues to handle the EPOLLHUP. Since
+  //    |epollhup_pending_[slot]| is marked as true, it can safely ignore the
+  //    event without detaching the newly allocated buffer at slot 1.
+  //
+  // In normal situations where the previously described sequence doesn't
+  // happen, an EPOLLHUP event should trigger a regular buffer detach.
+  std::vector<bool> epollhup_pending_;
+
   // |available_buffers_| uses |dvr::RingBuffer| to implementation queue
   // sematics. When |Dequeue|, we pop the front element from
   // |available_buffers_|, and  that buffer's reference count will decrease by
@@ -225,7 +260,7 @@
   // Returns Zero on success and negative error code when buffer allocation
   // fails.
   int AllocateBuffer(int width, int height, int format, int usage,
-                     size_t buffer_count, size_t* out_slot);
+                     size_t slice_count, size_t* out_slot);
 
   // Add a producer buffer to populate the queue. Once added, a producer buffer
   // is available to use (i.e. in |Gain|'ed mode).