Expose acquire_fence though ConsumerQueue::Dequeue

The current compositor implementation will be refactored to use
BufferHubQueue instead of ad-hoc BufferHub operations. We need this to
expose release_fence to compositor so that it can wait for buffers
to become avaiable by checking fence properly.

Bug: 36033302
Bug: 36148608
Test: Built and ran buffer_hub_queue-test
Change-Id: I75cfcb02e06a4b9e7e89b89690ca2d92ee09a678
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
index bad9503..a8077b9 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
@@ -13,9 +13,6 @@
 #include <pdx/file_handle.h>
 #include <private/dvr/bufferhub_rpc.h>
 
-using android::pdx::LocalHandle;
-using android::pdx::LocalChannelHandle;
-
 namespace android {
 namespace dvr {
 
@@ -28,6 +25,7 @@
       buffers_(BufferHubQueue::kMaxQueueCapacity),
       epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
       available_buffers_(BufferHubQueue::kMaxQueueCapacity),
+      fences_(BufferHubQueue::kMaxQueueCapacity),
       capacity_(0) {
   Initialize();
 }
@@ -41,6 +39,7 @@
       buffers_(BufferHubQueue::kMaxQueueCapacity),
       epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
       available_buffers_(BufferHubQueue::kMaxQueueCapacity),
+      fences_(BufferHubQueue::kMaxQueueCapacity),
       capacity_(0) {
   Initialize();
 }
@@ -134,7 +133,7 @@
 
   int events = status.get();
   if (events & EPOLLIN) {
-    int ret = OnBufferReady(buffer);
+    int ret = OnBufferReady(buffer, &fences_[slot]);
     if (ret < 0) {
       ALOGE("Failed to set buffer ready: %s", strerror(-ret));
       return;
@@ -254,7 +253,8 @@
 
 std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout,
                                                          size_t* slot,
-                                                         void* meta) {
+                                                         void* meta,
+                                                         LocalHandle* fence) {
   ALOGD("Dequeue: count=%zu, timeout=%d", count(), timeout);
 
   if (count() == 0 && !WaitForBuffers(timeout))
@@ -263,6 +263,8 @@
   std::shared_ptr<BufferHubBuffer> buf;
   BufferInfo& buffer_info = available_buffers_.Front();
 
+  *fence = std::move(fences_[buffer_info.slot]);
+
   // Report current pos as the output slot.
   std::swap(buffer_info.slot, *slot);
   // Swap buffer from vector to be returned later.
@@ -373,15 +375,21 @@
   return BufferHubQueue::DetachBuffer(slot);
 }
 
-std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(int timeout,
-                                                       size_t* slot) {
-  auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr);
+std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(
+    int timeout, size_t* slot, LocalHandle* release_fence) {
+  if (slot == nullptr || release_fence == nullptr) {
+    ALOGE("invalid parameter, slot=%p, release_fence=%p", slot, release_fence);
+    return nullptr;
+  }
+
+  auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
   return std::static_pointer_cast<BufferProducer>(buf);
 }
 
-int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) {
+int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+                                 LocalHandle* release_fence) {
   auto buffer = std::static_pointer_cast<BufferProducer>(buf);
-  return buffer->GainAsync();
+  return buffer->Gain(release_fence);
 }
 
 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, size_t meta_size)
@@ -431,9 +439,9 @@
   return BufferHubQueue::AddBuffer(buf, slot);
 }
 
-std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(int timeout,
-                                                       size_t* slot, void* meta,
-                                                       size_t meta_size) {
+std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(
+    int timeout, size_t* slot, void* meta, size_t meta_size,
+    LocalHandle* acquire_fence) {
   if (meta_size != meta_size_) {
     ALOGE(
         "metadata size (%zu) for the dequeuing buffer does not match metadata "
@@ -441,14 +449,21 @@
         meta_size, meta_size_);
     return nullptr;
   }
-  auto buf = BufferHubQueue::Dequeue(timeout, slot, meta);
+
+  if (slot == nullptr || meta == nullptr || acquire_fence == nullptr) {
+    ALOGE("invalid parameter, slot=%p, meta=%p, acquire_fence=%p", slot, meta,
+          acquire_fence);
+    return nullptr;
+  }
+
+  auto buf = BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
   return std::static_pointer_cast<BufferConsumer>(buf);
 }
 
-int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) {
+int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+                                 LocalHandle* acquire_fence) {
   auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
-  LocalHandle fence;
-  return buffer->Acquire(&fence, meta_buffer_tmp_.get(), meta_size_);
+  return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
 }
 
 int ConsumerQueue::OnBufferAllocated() {
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
index 7ddf49b..0deb764 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
@@ -83,8 +83,9 @@
   std::shared_ptr<BufferProducer> buffer_producer;
 
   for (size_t retry = 0; retry < BufferHubQueue::kMaxQueueCapacity; retry++) {
+    LocalHandle fence;
     buffer_producer =
-        core_->producer_->Dequeue(core_->dequeue_timeout_ms_, &slot);
+        core_->producer_->Dequeue(core_->dequeue_timeout_ms_, &slot, &fence);
     if (!buffer_producer)
       return NO_MEMORY;
 
diff --git a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
index 1f2830a..feaf3d7 100644
--- a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
+++ b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
@@ -20,6 +20,7 @@
 // automatically re-requeued when released by the remote side.
 class BufferHubQueue : public pdx::Client {
  public:
+  using LocalHandle = pdx::LocalHandle;
   using LocalChannelHandle = pdx::LocalChannelHandle;
   template <typename T>
   using Status = pdx::Status<T>;
@@ -91,14 +92,15 @@
   // while specifying a timeout equal to zero cause |Dequeue()| to return
   // immediately, even if no buffers are available.
   std::shared_ptr<BufferHubBuffer> Dequeue(int timeout, size_t* slot,
-                                           void* meta);
+                                           void* meta, LocalHandle* fence);
 
   // Wait for buffers to be released and re-add them to the queue.
   bool WaitForBuffers(int timeout);
   void HandleBufferEvent(size_t slot, const epoll_event& event);
   void HandleQueueEvent(const epoll_event& event);
 
-  virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) = 0;
+  virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+                            LocalHandle* fence) = 0;
 
   // Called when a buffer is allocated remotely.
   virtual int OnBufferAllocated() = 0;
@@ -202,6 +204,10 @@
   // prevent the buffer from being deleted.
   RingBuffer<BufferInfo> available_buffers_;
 
+  // Fences (acquire fence for consumer and release fence for consumer) , one
+  // for each buffer slot.
+  std::vector<LocalHandle> fences_;
+
   // Keep track with how many buffers have been added into the queue.
   size_t capacity_;
 
@@ -274,7 +280,8 @@
   // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode,
   // and caller should call Post() once it's done writing to release the buffer
   // to the consumer side.
-  std::shared_ptr<BufferProducer> Dequeue(int timeout, size_t* slot);
+  std::shared_ptr<BufferProducer> Dequeue(int timeout, size_t* slot,
+                                          LocalHandle* release_fence);
 
  private:
   friend BASE;
@@ -287,7 +294,8 @@
   ProducerQueue(size_t meta_size, int usage_set_mask, int usage_clear_mask,
                 int usage_deny_set_mask, int usage_deny_clear_mask);
 
-  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) override;
+  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+                    LocalHandle* release_fence) override;
 
   // Producer buffer is always allocated from the client (i.e. local) side.
   int OnBufferAllocated() override { return 0; }
@@ -316,14 +324,11 @@
   // Dequeue() is done with the corect metadata type and size with those used
   // when the buffer is orignally created.
   template <typename Meta>
-  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot,
-                                          Meta* meta) {
-    return Dequeue(timeout, slot, meta, sizeof(*meta));
+  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, Meta* meta,
+                                          LocalHandle* acquire_fence) {
+    return Dequeue(timeout, slot, meta, sizeof(*meta), acquire_fence);
   }
 
-  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta,
-                                          size_t meta_size);
-
  private:
   friend BASE;
 
@@ -335,9 +340,14 @@
   // consumer.
   int AddBuffer(const std::shared_ptr<BufferConsumer>& buf, size_t slot);
 
-  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) override;
+  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+                    LocalHandle* acquire_fence) override;
 
   int OnBufferAllocated() override;
+
+  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta,
+                                          size_t meta_size,
+                                          LocalHandle* acquire_fence);
 };
 
 }  // namespace dvr
diff --git a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp
index 841554e..811543d 100644
--- a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp
+++ b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp
@@ -59,12 +59,13 @@
   // But dequeue multiple times.
   for (size_t i = 0; i < nb_dequeue_times; i++) {
     size_t slot;
-    auto p1 = producer_queue_->Dequeue(0, &slot);
+    LocalHandle fence;
+    auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
     ASSERT_NE(nullptr, p1);
     size_t mi = i;
     ASSERT_EQ(p1->Post(LocalHandle(), &mi, sizeof(mi)), 0);
     size_t mo;
-    auto c1 = consumer_queue_->Dequeue(100, &slot, &mo);
+    auto c1 = consumer_queue_->Dequeue(100, &slot, &mo, &fence);
     ASSERT_NE(nullptr, c1);
     ASSERT_EQ(mi, mo);
     c1->Release(LocalHandle());
@@ -91,19 +92,21 @@
     ASSERT_EQ(consumer_queue_->capacity(), i);
     // Dequeue returns nullptr since no buffer is ready to consumer, but
     // this implicitly triggers buffer import and bump up |capacity|.
-    auto consumer_null = consumer_queue_->Dequeue(0, &slot, &seq);
+    LocalHandle fence;
+    auto consumer_null = consumer_queue_->Dequeue(0, &slot, &seq, &fence);
     ASSERT_EQ(nullptr, consumer_null);
     ASSERT_EQ(consumer_queue_->capacity(), i + 1);
   }
 
   for (size_t i = 0; i < nb_buffer; i++) {
+    LocalHandle fence;
     // First time, there is no buffer available to dequeue.
-    auto buffer_null = consumer_queue_->Dequeue(0, &slot, &seq);
+    auto buffer_null = consumer_queue_->Dequeue(0, &slot, &seq, &fence);
     ASSERT_EQ(nullptr, buffer_null);
 
     // Make sure Producer buffer is Post()'ed so that it's ready to Accquire
     // in the consumer's Dequeue() function.
-    auto producer = producer_queue_->Dequeue(0, &slot);
+    auto producer = producer_queue_->Dequeue(0, &slot, &fence);
     ASSERT_NE(nullptr, producer);
 
     uint64_t seq_in = static_cast<uint64_t>(i);
@@ -111,7 +114,7 @@
 
     // Second time, the just |Post()|'ed buffer should be dequeued.
     uint64_t seq_out = 0;
-    auto consumer = consumer_queue_->Dequeue(0, &slot, &seq_out);
+    auto consumer = consumer_queue_->Dequeue(0, &slot, &seq_out, &fence);
     ASSERT_NE(nullptr, consumer);
     ASSERT_EQ(seq_in, seq_out);
   }
@@ -132,11 +135,12 @@
 
   for (auto mi : ms) {
     size_t slot;
-    auto p1 = producer_queue_->Dequeue(0, &slot);
+    LocalHandle fence;
+    auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
     ASSERT_NE(nullptr, p1);
     ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0);
     TestMetadata mo;
-    auto c1 = consumer_queue_->Dequeue(0, &slot, &mo);
+    auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
     ASSERT_EQ(mi.a, mo.a);
     ASSERT_EQ(mi.b, mo.b);
     ASSERT_EQ(mi.c, mo.c);
@@ -150,13 +154,14 @@
 
   int64_t mi = 3;
   size_t slot;
-  auto p1 = producer_queue_->Dequeue(0, &slot);
+  LocalHandle fence;
+  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
   ASSERT_NE(nullptr, p1);
   ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0);
 
   int32_t mo;
   // Acquire a buffer with mismatched metadata is not OK.
-  auto c1 = consumer_queue_->Dequeue(0, &slot, &mo);
+  auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
   ASSERT_EQ(nullptr, c1);
 }
 
@@ -165,12 +170,13 @@
   AllocateBuffer();
 
   size_t slot;
-  auto p1 = producer_queue_->Dequeue(0, &slot);
+  LocalHandle fence;
+  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
   ASSERT_NE(nullptr, p1);
 
   int64_t mo;
   producer_queue_->Enqueue(p1, slot);
-  auto c1 = consumer_queue_->Dequeue(0, &slot, &mo);
+  auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
   ASSERT_EQ(nullptr, c1);
 }
 
@@ -179,12 +185,13 @@
 
   size_t s1;
   AllocateBuffer();
-  auto p1 = producer_queue_->Dequeue(0, &s1);
+  LocalHandle fence;
+  auto p1 = producer_queue_->Dequeue(0, &s1, &fence);
   ASSERT_NE(nullptr, p1);
 
   // producer queue is exhausted
   size_t s2;
-  auto p2_null = producer_queue_->Dequeue(0, &s2);
+  auto p2_null = producer_queue_->Dequeue(0, &s2, &fence);
   ASSERT_EQ(nullptr, p2_null);
 
   // dynamically add buffer.
@@ -193,7 +200,7 @@
   ASSERT_EQ(producer_queue_->capacity(), 2U);
 
   // now we can dequeue again
-  auto p2 = producer_queue_->Dequeue(0, &s2);
+  auto p2 = producer_queue_->Dequeue(0, &s2, &fence);
   ASSERT_NE(nullptr, p2);
   ASSERT_EQ(producer_queue_->count(), 0U);
   // p1 and p2 should have different slot number
@@ -206,14 +213,14 @@
   int64_t seq = 1;
   ASSERT_EQ(p1->Post(LocalHandle(), seq), 0);
   size_t cs1, cs2;
-  auto c1 = consumer_queue_->Dequeue(0, &cs1, &seq);
+  auto c1 = consumer_queue_->Dequeue(0, &cs1, &seq, &fence);
   ASSERT_NE(nullptr, c1);
   ASSERT_EQ(consumer_queue_->count(), 0U);
   ASSERT_EQ(consumer_queue_->capacity(), 2U);
   ASSERT_EQ(cs1, s1);
 
   ASSERT_EQ(p2->Post(LocalHandle(), seq), 0);
-  auto c2 = consumer_queue_->Dequeue(0, &cs2, &seq);
+  auto c2 = consumer_queue_->Dequeue(0, &cs2, &seq, &fence);
   ASSERT_NE(nullptr, c2);
   ASSERT_EQ(cs2, s2);
 }
@@ -229,7 +236,8 @@
       kBufferSliceCount, &slot);
   ASSERT_EQ(ret, 0);
 
-  auto p1 = producer_queue_->Dequeue(0, &slot);
+  LocalHandle fence;
+  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
   ASSERT_EQ(p1->usage() & set_mask, set_mask);
 }
 
@@ -244,7 +252,8 @@
       kBufferSliceCount, &slot);
   ASSERT_EQ(ret, 0);
 
-  auto p1 = producer_queue_->Dequeue(0, &slot);
+  LocalHandle fence;
+  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
   ASSERT_EQ(p1->usage() & clear_mask, 0);
 }
 
diff --git a/libs/vr/libvrflinger/video_compositor.cpp b/libs/vr/libvrflinger/video_compositor.cpp
index 6b39a3c..411e3a3 100644
--- a/libs/vr/libvrflinger/video_compositor.cpp
+++ b/libs/vr/libvrflinger/video_compositor.cpp
@@ -79,7 +79,9 @@
     // queued in order from the producer side.
     // TODO(jwcai) Use |metadata.timestamp_ns| to schedule video frames
     // accurately.
-    auto buffer_consumer = consumer_queue_->Dequeue(0, &slot, &metadata);
+    LocalHandle acquire_fence;
+    auto buffer_consumer =
+        consumer_queue_->Dequeue(0, &slot, &metadata, &acquire_fence);
 
     if (buffer_consumer) {
       // Create a new texture if it hasn't been created yet, or the same slot