BufferHubQueueProducer reset buffers on disconnect

When Surface.cpp disconnects a IGraphicBufferProducer, it clears up all
the buffer cache it holds. Thus our implementation should do the same.

New tests:
1/ BufferHubQueueProducerTest.ConnectDisconnectReconnect: tests
   IGraphicBufferProducer to behave proper after disconnect and
   re-connect.
2/ BufferHubQueueTest.TestFreeAllBuffers: tests BufferHubQueue handles
   FreeAllBuffers() properly under different producer/consumer state.

Bug: 64402829
Test: Pause and resume GVR video player demo and observer no more fails
around IGraphicBufferProducer::requestBuffer and Surface::dequeueBuffer.
Change-Id: Ia61e3f991248135cecf22039045f6960226bda42
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
index bfb9a55..f9f87ff 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
@@ -417,6 +417,28 @@
   on_buffer_removed_ = callback;
 }
 
+pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
+  // Clear all available buffers.
+  available_buffers_.Clear();
+
+  pdx::Status<void> last_error;  // No error.
+  // Clear all buffers this producer queue is tracking.
+  for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
+    if (buffers_[slot] != nullptr) {
+      auto status = RemoveBuffer(slot);
+      if (!status) {
+        ALOGE(
+            "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
+            "slot=%d.",
+            slot);
+        last_error = status.error_status();
+      }
+    }
+  }
+
+  return last_error;
+}
+
 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
     : BASE(std::move(handle)) {
   auto status = ImportQueue();
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
index 6613add..62e2923 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp
@@ -206,11 +206,11 @@
   // It's either in free state (if the buffer has never been used before) or
   // in queued state (if the buffer has been dequeued and queued back to
   // BufferHubQueue).
-  // TODO(jwcai) Clean this up, make mBufferState compatible with BufferHub's
-  // model.
-  LOG_ALWAYS_FATAL_IF((!buffers_[slot].mBufferState.isFree() &&
-                       !buffers_[slot].mBufferState.isQueued()),
-                      "dequeueBuffer: slot %zu is not free or queued.", slot);
+  LOG_ALWAYS_FATAL_IF(
+      (!buffers_[slot].mBufferState.isFree() &&
+       !buffers_[slot].mBufferState.isQueued()),
+      "dequeueBuffer: slot %zu is not free or queued, actual state: %s.", slot,
+      buffers_[slot].mBufferState.string());
 
   buffers_[slot].mBufferState.freeQueued();
   buffers_[slot].mBufferState.dequeue();
@@ -514,6 +514,7 @@
     return BAD_VALUE;
   }
 
+  FreeAllBuffers();
   connected_api_ = kNoConnectedApi;
   return NO_ERROR;
 }
@@ -647,5 +648,31 @@
   return NO_ERROR;
 }
 
+status_t BufferHubQueueProducer::FreeAllBuffers() {
+  for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
+    // Reset in memory objects related the the buffer.
+    buffers_[slot].mGraphicBuffer = nullptr;
+    buffers_[slot].mBufferState.reset();
+    buffers_[slot].mRequestBufferCalled = false;
+    buffers_[slot].mBufferProducer = nullptr;
+    buffers_[slot].mFence = Fence::NO_FENCE;
+  }
+
+  auto status = queue_->FreeAllBuffers();
+  if (!status) {
+    ALOGE(
+        "BufferHubQueueProducer::FreeAllBuffers: Failed to free all buffers on "
+        "the queue: %s",
+        status.GetErrorMessage().c_str());
+  }
+
+  if (queue_->capacity() != 0 || queue_->count() != 0) {
+    LOG_ALWAYS_FATAL(
+        "BufferHubQueueProducer::FreeAllBuffers: Not all buffers are freed.");
+  }
+
+  return NO_ERROR;
+}
+
 }  // namespace dvr
 }  // namespace android
diff --git a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
index d57c7af..3e93788 100644
--- a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
+++ b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
@@ -122,6 +122,10 @@
   // to deregister a buffer for epoll and internal bookkeeping.
   virtual pdx::Status<void> RemoveBuffer(size_t slot);
 
+  // Free all buffers that belongs to this queue. Can only be called from
+  // producer side.
+  virtual pdx::Status<void> FreeAllBuffers();
+
   // Dequeue a buffer from the free queue, blocking until one is available. The
   // timeout argument specifies the number of milliseconds that |Dequeue()| will
   // block. Specifying a timeout of -1 causes Dequeue() to block indefinitely,
@@ -297,6 +301,11 @@
   // Remove producer buffer from the queue.
   pdx::Status<void> RemoveBuffer(size_t slot) override;
 
+  // Free all buffers on this producer queue.
+  pdx::Status<void> FreeAllBuffers() override {
+    return BufferHubQueue::FreeAllBuffers();
+  }
+
   // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode,
   // and caller should call Post() once it's done writing to release the buffer
   // to the consumer side.
diff --git a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_producer.h b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_producer.h
index d33a831..5c8761d 100644
--- a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_producer.h
+++ b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_producer.h
@@ -132,6 +132,10 @@
   // Remove a buffer via BufferHubRPC.
   status_t RemoveBuffer(size_t slot);
 
+  // Free all buffers which are owned by the prodcuer. Note that if graphic
+  // buffers are acquired by the consumer, we can't .
+  status_t FreeAllBuffers();
+
   // Concreate implementation backed by BufferHubBuffer.
   std::shared_ptr<ProducerQueue> queue_;
 
diff --git a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp
index e0a4052..7581a06 100644
--- a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp
+++ b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp
@@ -570,6 +570,103 @@
   EXPECT_EQ(consumer_queue_->is_async(), kIsAsync);
 }
 
+TEST_F(BufferHubQueueTest, TestFreeAllBuffers) {
+  constexpr size_t kBufferCount = 2;
+
+#define CHECK_NO_BUFFER_THEN_ALLOCATE(num_buffers)  \
+  EXPECT_EQ(consumer_queue_->count(), 0U);          \
+  EXPECT_EQ(consumer_queue_->capacity(), 0U);       \
+  EXPECT_EQ(producer_queue_->count(), 0U);          \
+  EXPECT_EQ(producer_queue_->capacity(), 0U);       \
+  for (size_t i = 0; i < num_buffers; i++) {        \
+    AllocateBuffer();                               \
+  }                                                 \
+  EXPECT_EQ(producer_queue_->count(), num_buffers); \
+  EXPECT_EQ(producer_queue_->capacity(), num_buffers);
+
+  size_t slot;
+  uint64_t seq;
+  LocalHandle fence;
+  pdx::Status<void> status;
+  pdx::Status<std::shared_ptr<BufferConsumer>> consumer_status;
+  pdx::Status<std::shared_ptr<BufferProducer>> producer_status;
+  std::shared_ptr<BufferConsumer> consumer_buffer;
+  std::shared_ptr<BufferProducer> producer_buffer;
+
+  ASSERT_TRUE(CreateQueues(config_builder_.SetMetadata<uint64_t>().Build(),
+                           UsagePolicy{}));
+
+  // Free all buffers when buffers are avaible for dequeue.
+  CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
+  status = producer_queue_->FreeAllBuffers();
+  EXPECT_TRUE(status.ok());
+
+  // Free all buffers when one buffer is dequeued.
+  CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
+  producer_status = producer_queue_->Dequeue(0, &slot, &fence);
+  ASSERT_TRUE(producer_status.ok());
+  status = producer_queue_->FreeAllBuffers();
+  EXPECT_TRUE(status.ok());
+
+  // Free all buffers when all buffers are dequeued.
+  CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
+  for (size_t i = 0; i < kBufferCount; i++) {
+    producer_status = producer_queue_->Dequeue(0, &slot, &fence);
+    ASSERT_TRUE(producer_status.ok());
+  }
+  status = producer_queue_->FreeAllBuffers();
+  EXPECT_TRUE(status.ok());
+
+  // Free all buffers when one buffer is posted.
+  CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
+  producer_status = producer_queue_->Dequeue(0, &slot, &fence);
+  ASSERT_TRUE(producer_status.ok());
+  producer_buffer = producer_status.take();
+  ASSERT_NE(nullptr, producer_buffer);
+  ASSERT_EQ(0, producer_buffer->Post(fence, &seq, sizeof(seq)));
+  status = producer_queue_->FreeAllBuffers();
+  EXPECT_TRUE(status.ok());
+
+  // Free all buffers when all buffers are posted.
+  CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
+  for (size_t i = 0; i < kBufferCount; i++) {
+    producer_status = producer_queue_->Dequeue(0, &slot, &fence);
+    ASSERT_TRUE(producer_status.ok());
+    producer_buffer = producer_status.take();
+    ASSERT_NE(nullptr, producer_buffer);
+    ASSERT_EQ(0, producer_buffer->Post(fence, &seq, sizeof(seq)));
+  }
+  status = producer_queue_->FreeAllBuffers();
+  EXPECT_TRUE(status.ok());
+
+  // Free all buffers when all buffers are acquired.
+  CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
+  for (size_t i = 0; i < kBufferCount; i++) {
+    producer_status = producer_queue_->Dequeue(0, &slot, &fence);
+    ASSERT_TRUE(producer_status.ok());
+    producer_buffer = producer_status.take();
+    ASSERT_NE(nullptr, producer_buffer);
+    ASSERT_EQ(0, producer_buffer->Post(fence, &seq, sizeof(seq)));
+    consumer_status = consumer_queue_->Dequeue(0, &slot, &seq, &fence);
+    ASSERT_TRUE(consumer_status.ok());
+  }
+
+  status = producer_queue_->FreeAllBuffers();
+  EXPECT_TRUE(status.ok());
+
+  // In addition to FreeAllBuffers() from the queue, it is also required to
+  // delete all references to the ProducerBuffer (i.e. the PDX client).
+  producer_buffer = nullptr;
+
+  // Crank consumer queue events to pickup EPOLLHUP events on the queue.
+  consumer_queue_->HandleQueueEvents();
+
+  // One last check.
+  CHECK_NO_BUFFER_THEN_ALLOCATE(kBufferCount);
+
+#undef CHECK_NO_BUFFER_THEN_ALLOCATE
+}
+
 }  // namespace
 
 }  // namespace dvr
diff --git a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue_producer-test.cpp b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue_producer-test.cpp
index 8f55125..28cd63a 100644
--- a/libs/vr/libbufferhubqueue/tests/buffer_hub_queue_producer-test.cpp
+++ b/libs/vr/libbufferhubqueue/tests/buffer_hub_queue_producer-test.cpp
@@ -508,6 +508,44 @@
   ASSERT_EQ(NO_INIT, mProducer->cancelBuffer(slot, Fence::NO_FENCE));
 }
 
+TEST_F(BufferHubQueueProducerTest, ConnectDisconnectReconnect) {
+  int slot = -1;
+  sp<GraphicBuffer> buffer;
+  IGraphicBufferProducer::QueueBufferInput input = CreateBufferInput();
+  IGraphicBufferProducer::QueueBufferOutput output;
+
+  EXPECT_NO_FATAL_FAILURE(ConnectProducer());
+
+  constexpr int maxDequeuedBuffers = 1;
+  int minUndequeuedBuffers;
+  EXPECT_EQ(NO_ERROR, mProducer->query(NATIVE_WINDOW_MIN_UNDEQUEUED_BUFFERS,
+                                       &minUndequeuedBuffers));
+  EXPECT_EQ(NO_ERROR, mProducer->setAsyncMode(false));
+  EXPECT_EQ(NO_ERROR, mProducer->setMaxDequeuedBufferCount(maxDequeuedBuffers));
+
+  int maxCapacity = maxDequeuedBuffers + minUndequeuedBuffers;
+
+  // Dequeue, request, and queue all buffers.
+  for (int i = 0; i < maxCapacity; i++) {
+    EXPECT_NO_FATAL_FAILURE(DequeueBuffer(&slot));
+    EXPECT_EQ(NO_ERROR, mProducer->requestBuffer(slot, &buffer));
+    EXPECT_EQ(NO_ERROR, mProducer->queueBuffer(slot, input, &output));
+  }
+
+  // Disconnect then reconnect.
+  EXPECT_EQ(NO_ERROR, mProducer->disconnect(kTestApi));
+  EXPECT_NO_FATAL_FAILURE(ConnectProducer());
+
+  // Dequeue, request, and queue all buffers.
+  for (int i = 0; i < maxCapacity; i++) {
+    EXPECT_NO_FATAL_FAILURE(DequeueBuffer(&slot));
+    EXPECT_EQ(NO_ERROR, mProducer->requestBuffer(slot, &buffer));
+    EXPECT_EQ(NO_ERROR, mProducer->queueBuffer(slot, input, &output));
+  }
+
+  EXPECT_EQ(NO_ERROR, mProducer->disconnect(kTestApi));
+}
+
 }  // namespace
 
 }  // namespace dvr