Add support for consumer queue initial import and hangup.

- Add support for consumer queues to import buffers that are created
  before the consumer queue is created, making multi-consumer queue
  patterns possible. This is essential for VrFlinger operation.
- Add support for notifying consumer queues when the producer queue
  hangs up.
- Correct the epoll event loop to check for hangups even when buffers
  are available.
- Add method to retrieve the event fd from a queue.
- Add trace logging and minor cleanup.
- Improve bufferhubd dump state output.

Bug: 36401174
Test: build; bufferhub tests pass.
Change-Id: Idd6f38a3341c048192062734e288d11de48bc4d4
diff --git a/services/vr/bufferhubd/buffer_hub.cpp b/services/vr/bufferhubd/buffer_hub.cpp
index 8093b6b..debcc73 100644
--- a/services/vr/bufferhubd/buffer_hub.cpp
+++ b/services/vr/bufferhubd/buffer_hub.cpp
@@ -132,7 +132,7 @@
   stream << std::endl;
   stream << "Active Producer Queues:\n";
   stream << std::right << std::setw(6) << "Id";
-  stream << std::right << std::setw(12) << " Allocated";
+  stream << std::right << std::setw(12) << " Capacity";
   stream << std::right << std::setw(12) << " Consumers";
   stream << " UsageSetMask";
   stream << " UsageClearMask";
diff --git a/services/vr/bufferhubd/consumer_queue_channel.cpp b/services/vr/bufferhubd/consumer_queue_channel.cpp
index 7422751..f447e00 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.cpp
+++ b/services/vr/bufferhubd/consumer_queue_channel.cpp
@@ -8,6 +8,7 @@
 using android::pdx::RemoteChannelHandle;
 using android::pdx::Status;
 using android::pdx::rpc::DispatchRemoteMethod;
+using android::pdx::rpc::RemoteMethodError;
 
 namespace android {
 namespace dvr {
@@ -33,8 +34,10 @@
 bool ConsumerQueueChannel::HandleMessage(Message& message) {
   ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
   auto producer = GetProducer();
-  if (!producer)
-    REPLY_ERROR_RETURN(message, EPIPE, true);
+  if (!producer) {
+    RemoteMethodError(message, EPIPE);
+    return true;
+  }
 
   switch (message.GetOp()) {
     case BufferHubRPC::CreateConsumerQueue::Opcode:
@@ -79,6 +82,9 @@
 
 void ConsumerQueueChannel::RegisterNewBuffer(
     const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
+  ALOGD_IF(TRACE,
+           "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu",
+           producer_channel->buffer_id(), slot);
   pending_buffer_slots_.emplace(producer_channel, slot);
 
   // Signal the client that there is new buffer available throught POLLIN.
@@ -89,7 +95,8 @@
 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
   ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
-  ALOGD(
+  ALOGD_IF(
+      TRACE,
       "ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to "
       "import: %zu",
       pending_buffer_slots_.size());
@@ -134,5 +141,12 @@
   return {std::move(buffer_handles)};
 }
 
+void ConsumerQueueChannel::OnProducerClosed() {
+  ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
+           buffer_id());
+  producer_.reset();
+  Hangup();
+}
+
 }  // namespace dvr
 }  // namespace android
diff --git a/services/vr/bufferhubd/consumer_queue_channel.h b/services/vr/bufferhubd/consumer_queue_channel.h
index e1005e4..aa3f531 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.h
+++ b/services/vr/bufferhubd/consumer_queue_channel.h
@@ -39,6 +39,8 @@
   pdx::Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
   OnConsumerQueueImportBuffers(Message& message);
 
+  void OnProducerClosed();
+
  private:
   std::shared_ptr<ProducerQueueChannel> GetProducer() const;
 
diff --git a/services/vr/bufferhubd/producer_channel.cpp b/services/vr/bufferhubd/producer_channel.cpp
index e452d04..398aa12 100644
--- a/services/vr/bufferhubd/producer_channel.cpp
+++ b/services/vr/bufferhubd/producer_channel.cpp
@@ -199,10 +199,15 @@
     return ErrorStatus(EBUSY);
   }
 
-  if (meta_size_bytes_ != metadata.size())
+  if (meta_size_bytes_ != metadata.size()) {
+    ALOGD_IF(TRACE,
+             "ProducerChannel::OnProducerPost: Expected meta_size_bytes=%zu "
+             "got size=%zu",
+             meta_size_bytes_, metadata.size());
     return ErrorStatus(EINVAL);
-  std::copy(metadata.begin(), metadata.end(), meta_.get());
+  }
 
+  std::copy(metadata.begin(), metadata.end(), meta_.get());
   post_fence_ = std::move(acquire_fence);
   producer_owns_ = false;
 
diff --git a/services/vr/bufferhubd/producer_queue_channel.cpp b/services/vr/bufferhubd/producer_queue_channel.cpp
index bd8eed8..843277e 100644
--- a/services/vr/bufferhubd/producer_queue_channel.cpp
+++ b/services/vr/bufferhubd/producer_queue_channel.cpp
@@ -26,7 +26,12 @@
   *error = 0;
 }
 
-ProducerQueueChannel::~ProducerQueueChannel() {}
+ProducerQueueChannel::~ProducerQueueChannel() {
+  ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
+           buffer_id());
+  for (auto* consumer : consumer_channels_)
+    consumer->OnProducerClosed();
+}
 
 /* static */
 Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
@@ -108,13 +113,21 @@
     return ErrorStatus(ENOMEM);
   }
 
-  const auto channel_status = service()->SetChannel(
-      channel_id, std::make_shared<ConsumerQueueChannel>(
-                      service(), buffer_id(), channel_id, shared_from_this()));
+  auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
+      service(), buffer_id(), channel_id, shared_from_this());
+
+  // Register the existing buffers with the new consumer queue.
+  for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
+    if (auto buffer = buffers_[slot].lock())
+      consumer_queue_channel->RegisterNewBuffer(buffer, slot);
+  }
+
+  const auto channel_status =
+      service()->SetChannel(channel_id, consumer_queue_channel);
   if (!channel_status) {
     ALOGE(
-        "ProducerQueueChannel::OnCreateConsumerQueue: failed to set new "
-        "consumer channel: %s",
+        "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
+        "%s",
         channel_status.GetErrorMessage().c_str());
     return ErrorStatus(ENOMEM);
   }
@@ -254,7 +267,7 @@
   capacity_++;
 
   // Notify each consumer channel about the new buffer.
-  for (auto consumer_channel : consumer_channels_) {
+  for (auto* consumer_channel : consumer_channels_) {
     ALOGD(
         "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
         "buffer, buffer_id=%d",