Add support for consumer queue initial import and hangup.

- Add support for consumer queues to import buffers that are created
  before the consumer queue is created, making multi-consumer queue
  patterns possible. This is essential for VrFlinger operation.
- Add support for notifying consumer queues when the producer queue
  hangs up.
- Correct the epoll event loop to check for hangups even when buffers
  are available.
- Add method to retrieve the event fd from a queue.
- Add trace logging and minor cleanup.
- Improve bufferhubd dump state output.

Bug: 36401174
Test: build; bufferhub tests pass.
Change-Id: Idd6f38a3341c048192062734e288d11de48bc4d4
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
index 5b85069..433db42 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
@@ -87,6 +87,14 @@
     return nullptr;
 }
 
+std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
+  if (auto status = CreateConsumerQueueHandle())
+    return std::unique_ptr<ConsumerQueue>(
+        new ConsumerQueue(status.take(), true));
+  else
+    return nullptr;
+}
+
 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
   auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
   if (!status) {
@@ -103,12 +111,18 @@
 bool BufferHubQueue::WaitForBuffers(int timeout) {
   std::array<epoll_event, kMaxEvents> events;
 
-  while (count() == 0) {
-    int ret = epoll_fd_.Wait(events.data(), events.size(), timeout);
+  // Loop at least once to check for hangups.
+  do {
+    ALOGD_IF(TRACE, "BufferHubQueue::WaitForBuffers: count=%zu capacity=%zu",
+             count(), capacity());
+
+    // If there is already a buffer then just check for hangup without waiting.
+    const int ret = epoll_fd_.Wait(events.data(), events.size(),
+                                   count() == 0 ? timeout : 0);
 
     if (ret == 0) {
       ALOGD_IF(TRACE, "Wait on epoll returns nothing before timeout.");
-      return false;
+      return count() != 0;
     }
 
     if (ret < 0 && ret != -EINTR) {
@@ -136,9 +150,9 @@
               index);
       }
     }
-  }
+  } while (count() == 0 && capacity() > 0 && !hung_up());
 
-  return true;
+  return count() != 0;
 }
 
 void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {
@@ -203,6 +217,9 @@
       ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
             buffer_status.GetErrorMessage().c_str());
     }
+  } else if (events & EPOLLHUP) {
+    ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
+    hung_up_ = true;
   } else {
     ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%d", events);
   }
@@ -260,7 +277,7 @@
   return 0;
 }
 
-void BufferHubQueue::Enqueue(std::shared_ptr<BufferHubBuffer> buf,
+void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
                              size_t slot) {
   if (count() == capacity_) {
     ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
@@ -272,8 +289,7 @@
   // the limitation of the RingBuffer we are using. Would be better to refactor
   // that.
   BufferInfo buffer_info(slot, meta_size_);
-  // Swap buffer into vector.
-  std::swap(buffer_info.buffer, buf);
+  buffer_info.buffer = buf;
   // Swap metadata loaded during onBufferReady into vector.
   std::swap(buffer_info.metadata, meta_buffer_tmp_);
 
@@ -286,7 +302,7 @@
                                                          LocalHandle* fence) {
   ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);
 
-  if (count() == 0 && !WaitForBuffers(timeout))
+  if (!WaitForBuffers(timeout))
     return nullptr;
 
   std::shared_ptr<BufferHubBuffer> buf;
@@ -366,9 +382,8 @@
       InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
           width, height, format, usage, slice_count, kBufferCount);
   if (!status) {
-    ALOGE(
-        "ProducerQueue::AllocateBuffer failed to create producer buffer "
-        "through BufferHub.");
+    ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
+          status.GetErrorMessage().c_str());
     return -status.error();
   }
 
@@ -428,14 +443,14 @@
   return std::static_pointer_cast<BufferProducer>(buf);
 }
 
-int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                  LocalHandle* release_fence) {
   auto buffer = std::static_pointer_cast<BufferProducer>(buf);
   return buffer->Gain(release_fence);
 }
 
-ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
-    : BufferHubQueue(std::move(handle)) {
+ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
+    : BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) {
   auto status = ImportQueue();
   if (!status) {
     ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
@@ -443,8 +458,14 @@
     Close(-status.error());
   }
 
-  // TODO(b/34387835) Import buffers in case the ProducerQueue we are
-  // based on was not empty.
+  auto import_status = ImportBuffers();
+  if (import_status) {
+    ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
+          import_status.get());
+  } else {
+    ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
+          import_status.GetErrorMessage().c_str());
+  }
 }
 
 Status<size_t> ConsumerQueue::ImportBuffers() {
@@ -457,6 +478,7 @@
     return ErrorStatus(status.error());
   }
 
+  int ret;
   int last_error = 0;
   int imported_buffers = 0;
 
@@ -468,7 +490,24 @@
 
     std::unique_ptr<BufferConsumer> buffer_consumer =
         BufferConsumer::Import(std::move(buffer_handle_slot.first));
-    int ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
+
+    // Setup ignore state before adding buffer to the queue.
+    if (ignore_on_import_) {
+      ALOGD_IF(TRACE,
+               "ConsumerQueue::ImportBuffers: Setting buffer to ignored state: "
+               "buffer_id=%d",
+               buffer_consumer->id());
+      ret = buffer_consumer->SetIgnore(true);
+      if (ret < 0) {
+        ALOGE(
+            "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
+            "imported buffer buffer_id=%d: %s",
+            buffer_consumer->id(), strerror(-ret));
+        last_error = ret;
+      }
+    }
+
+    ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
     if (ret < 0) {
       ALOGE("ConsumerQueue::ImportBuffers failed to add buffer, ret: %s",
             strerror(-ret));
@@ -502,7 +541,7 @@
     return nullptr;
   }
 
-  if (slot == nullptr || meta == nullptr || acquire_fence == nullptr) {
+  if (slot == nullptr || acquire_fence == nullptr) {
     ALOGE(
         "ConsumerQueue::Dequeue: Invalid parameter, slot=%p, meta=%p, "
         "acquire_fence=%p",
@@ -514,7 +553,7 @@
   return std::static_pointer_cast<BufferConsumer>(buf);
 }
 
-int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                  LocalHandle* acquire_fence) {
   auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
   return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
diff --git a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
index 1eafe76..2357bd9 100644
--- a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
+++ b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
@@ -33,6 +33,11 @@
   // a new consumer queue client or nullptr on failure.
   std::unique_ptr<ConsumerQueue> CreateConsumerQueue();
 
+  // Create a new consumer queue that is attached to the producer. This queue
+  // sets each of its imported consumer buffers to the ignored state to avoid
+  // participation in lifecycle events.
+  std::unique_ptr<ConsumerQueue> CreateSilentConsumerQueue();
+
   // Return the default buffer width of this buffer queue.
   size_t default_width() const { return default_width_; }
 
@@ -71,9 +76,19 @@
     }
   }
 
+  // Returns an fd that signals pending queue events using
+  // EPOLLIN/POLLIN/readible. Either HandleQueueEvents or WaitForBuffers may be
+  // called to handle pending queue events.
+  int queue_fd() const { return epoll_fd_.Get(); }
+
+  // Handles any pending events, returning available buffers to the queue and
+  // reaping disconnected buffers. Returns true if successful, false if an error
+  // occurred.
+  bool HandleQueueEvents() { return WaitForBuffers(0); }
+
   // Enqueue a buffer marks buffer to be available (|Gain|'ed for producer
   // and |Acquire|'ed for consumer. This is only used for internal bookkeeping.
-  void Enqueue(std::shared_ptr<BufferHubBuffer> buf, size_t slot);
+  void Enqueue(const std::shared_ptr<BufferHubBuffer>& buf, size_t slot);
 
   // |BufferHubQueue| will keep track of at most this value of buffers.
   static constexpr size_t kMaxQueueCapacity =
@@ -88,6 +103,7 @@
   static constexpr int kNoTimeOut = -1;
 
   int id() const { return id_; }
+  bool hung_up() const { return hung_up_; }
 
  protected:
   BufferHubQueue(LocalChannelHandle channel);
@@ -121,7 +137,7 @@
   void HandleBufferEvent(size_t slot, const epoll_event& event);
   void HandleQueueEvent(const epoll_event& event);
 
-  virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+  virtual int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                             LocalHandle* fence) = 0;
 
   // Called when a buffer is allocated remotely.
@@ -248,6 +264,12 @@
   // Epoll fd used to wait for BufferHub events.
   EpollFileDescriptor epoll_fd_;
 
+  // Flag indicating that the other side hung up. For ProducerQueues this
+  // triggers when BufferHub dies or explicitly closes the queue channel. For
+  // ConsumerQueues this can either mean the same or that the ProducerQueue on
+  // the other end hung up.
+  bool hung_up_{false};
+
   // Global id for the queue that is consistent across processes.
   int id_;
 
@@ -261,6 +283,9 @@
   static std::unique_ptr<ProducerQueue> Create() {
     return BASE::Create(sizeof(Meta));
   }
+  static std::unique_ptr<ProducerQueue> Create(size_t meta_size_bytes) {
+    return BASE::Create(meta_size_bytes);
+  }
 
   // Usage bits in |usage_set_mask| will be automatically masked on. Usage bits
   // in |usage_clear_mask| will be automatically masked off. Note that
@@ -331,7 +356,7 @@
                 uint64_t usage_clear_mask, uint64_t usage_deny_set_mask,
                 uint64_t usage_deny_clear_mask);
 
-  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+  int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                     LocalHandle* release_fence) override;
 };
 
@@ -339,16 +364,22 @@
  public:
   // Get a buffer consumer. Note that the method doesn't check whether the
   // buffer slot has a valid buffer that has been imported already. When no
-  // buffer has been imported before it returns |nullptr|; otherwise it returns
-  // a shared pointer to a |BufferConsumer|.
+  // buffer has been imported before it returns nullptr; otherwise returns a
+  // shared pointer to a BufferConsumer.
   std::shared_ptr<BufferConsumer> GetBuffer(size_t slot) const {
     return std::static_pointer_cast<BufferConsumer>(
         BufferHubQueue::GetBuffer(slot));
   }
 
-  // Import a |ConsumerQueue| from a channel handle.
-  static std::unique_ptr<ConsumerQueue> Import(LocalChannelHandle handle) {
-    return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle)));
+  // Import a ConsumerQueue from a channel handle. |ignore_on_import| controls
+  // whether or not buffers are set to be ignored when imported. This may be
+  // used to avoid participation in the buffer lifecycle by a consumer queue
+  // that is only used to spawn other consumer queues, such as in an
+  // intermediate service.
+  static std::unique_ptr<ConsumerQueue> Import(LocalChannelHandle handle,
+                                               bool ignore_on_import = false) {
+    return std::unique_ptr<ConsumerQueue>(
+        new ConsumerQueue(std::move(handle), ignore_on_import));
   }
 
   // Import newly created buffers from the service side.
@@ -366,6 +397,10 @@
                                           LocalHandle* acquire_fence) {
     return Dequeue(timeout, slot, meta, sizeof(*meta), acquire_fence);
   }
+  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot,
+                                          LocalHandle* acquire_fence) {
+    return Dequeue(timeout, slot, nullptr, 0, acquire_fence);
+  }
 
   std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta,
                                           size_t meta_size,
@@ -374,7 +409,7 @@
  private:
   friend BufferHubQueue;
 
-  ConsumerQueue(LocalChannelHandle handle);
+  ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import = false);
 
   // Add a consumer buffer to populate the queue. Once added, a consumer buffer
   // is NOT available to use until the producer side |Post| it. |WaitForBuffers|
@@ -382,10 +417,14 @@
   // consumer.
   int AddBuffer(const std::shared_ptr<BufferConsumer>& buf, size_t slot);
 
-  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+  int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                     LocalHandle* acquire_fence) override;
 
   Status<void> OnBufferAllocated() override;
+
+  // Flag indicating that imported (consumer) buffers should be ignored when
+  // imported to avoid participating in the buffer ownership flow.
+  bool ignore_on_import_;
 };
 
 }  // namespace dvr
diff --git a/libs/vr/libdvrcommon/include/private/dvr/epoll_file_descriptor.h b/libs/vr/libdvrcommon/include/private/dvr/epoll_file_descriptor.h
index 91e12c5..099a409 100644
--- a/libs/vr/libdvrcommon/include/private/dvr/epoll_file_descriptor.h
+++ b/libs/vr/libdvrcommon/include/private/dvr/epoll_file_descriptor.h
@@ -52,6 +52,8 @@
       return ret;
   }
 
+  int Get() const { return fd_.get(); }
+
  private:
   base::unique_fd fd_;
 };
diff --git a/services/vr/bufferhubd/buffer_hub.cpp b/services/vr/bufferhubd/buffer_hub.cpp
index 8093b6b..debcc73 100644
--- a/services/vr/bufferhubd/buffer_hub.cpp
+++ b/services/vr/bufferhubd/buffer_hub.cpp
@@ -132,7 +132,7 @@
   stream << std::endl;
   stream << "Active Producer Queues:\n";
   stream << std::right << std::setw(6) << "Id";
-  stream << std::right << std::setw(12) << " Allocated";
+  stream << std::right << std::setw(12) << " Capacity";
   stream << std::right << std::setw(12) << " Consumers";
   stream << " UsageSetMask";
   stream << " UsageClearMask";
diff --git a/services/vr/bufferhubd/consumer_queue_channel.cpp b/services/vr/bufferhubd/consumer_queue_channel.cpp
index 7422751..f447e00 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.cpp
+++ b/services/vr/bufferhubd/consumer_queue_channel.cpp
@@ -8,6 +8,7 @@
 using android::pdx::RemoteChannelHandle;
 using android::pdx::Status;
 using android::pdx::rpc::DispatchRemoteMethod;
+using android::pdx::rpc::RemoteMethodError;
 
 namespace android {
 namespace dvr {
@@ -33,8 +34,10 @@
 bool ConsumerQueueChannel::HandleMessage(Message& message) {
   ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
   auto producer = GetProducer();
-  if (!producer)
-    REPLY_ERROR_RETURN(message, EPIPE, true);
+  if (!producer) {
+    RemoteMethodError(message, EPIPE);
+    return true;
+  }
 
   switch (message.GetOp()) {
     case BufferHubRPC::CreateConsumerQueue::Opcode:
@@ -79,6 +82,9 @@
 
 void ConsumerQueueChannel::RegisterNewBuffer(
     const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
+  ALOGD_IF(TRACE,
+           "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu",
+           producer_channel->buffer_id(), slot);
   pending_buffer_slots_.emplace(producer_channel, slot);
 
   // Signal the client that there is new buffer available throught POLLIN.
@@ -89,7 +95,8 @@
 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
   ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
-  ALOGD(
+  ALOGD_IF(
+      TRACE,
       "ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to "
       "import: %zu",
       pending_buffer_slots_.size());
@@ -134,5 +141,12 @@
   return {std::move(buffer_handles)};
 }
 
+void ConsumerQueueChannel::OnProducerClosed() {
+  ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
+           buffer_id());
+  producer_.reset();
+  Hangup();
+}
+
 }  // namespace dvr
 }  // namespace android
diff --git a/services/vr/bufferhubd/consumer_queue_channel.h b/services/vr/bufferhubd/consumer_queue_channel.h
index e1005e4..aa3f531 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.h
+++ b/services/vr/bufferhubd/consumer_queue_channel.h
@@ -39,6 +39,8 @@
   pdx::Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
   OnConsumerQueueImportBuffers(Message& message);
 
+  void OnProducerClosed();
+
  private:
   std::shared_ptr<ProducerQueueChannel> GetProducer() const;
 
diff --git a/services/vr/bufferhubd/producer_channel.cpp b/services/vr/bufferhubd/producer_channel.cpp
index e452d04..398aa12 100644
--- a/services/vr/bufferhubd/producer_channel.cpp
+++ b/services/vr/bufferhubd/producer_channel.cpp
@@ -199,10 +199,15 @@
     return ErrorStatus(EBUSY);
   }
 
-  if (meta_size_bytes_ != metadata.size())
+  if (meta_size_bytes_ != metadata.size()) {
+    ALOGD_IF(TRACE,
+             "ProducerChannel::OnProducerPost: Expected meta_size_bytes=%zu "
+             "got size=%zu",
+             meta_size_bytes_, metadata.size());
     return ErrorStatus(EINVAL);
-  std::copy(metadata.begin(), metadata.end(), meta_.get());
+  }
 
+  std::copy(metadata.begin(), metadata.end(), meta_.get());
   post_fence_ = std::move(acquire_fence);
   producer_owns_ = false;
 
diff --git a/services/vr/bufferhubd/producer_queue_channel.cpp b/services/vr/bufferhubd/producer_queue_channel.cpp
index bd8eed8..843277e 100644
--- a/services/vr/bufferhubd/producer_queue_channel.cpp
+++ b/services/vr/bufferhubd/producer_queue_channel.cpp
@@ -26,7 +26,12 @@
   *error = 0;
 }
 
-ProducerQueueChannel::~ProducerQueueChannel() {}
+ProducerQueueChannel::~ProducerQueueChannel() {
+  ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
+           buffer_id());
+  for (auto* consumer : consumer_channels_)
+    consumer->OnProducerClosed();
+}
 
 /* static */
 Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
@@ -108,13 +113,21 @@
     return ErrorStatus(ENOMEM);
   }
 
-  const auto channel_status = service()->SetChannel(
-      channel_id, std::make_shared<ConsumerQueueChannel>(
-                      service(), buffer_id(), channel_id, shared_from_this()));
+  auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
+      service(), buffer_id(), channel_id, shared_from_this());
+
+  // Register the existing buffers with the new consumer queue.
+  for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
+    if (auto buffer = buffers_[slot].lock())
+      consumer_queue_channel->RegisterNewBuffer(buffer, slot);
+  }
+
+  const auto channel_status =
+      service()->SetChannel(channel_id, consumer_queue_channel);
   if (!channel_status) {
     ALOGE(
-        "ProducerQueueChannel::OnCreateConsumerQueue: failed to set new "
-        "consumer channel: %s",
+        "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
+        "%s",
         channel_status.GetErrorMessage().c_str());
     return ErrorStatus(ENOMEM);
   }
@@ -254,7 +267,7 @@
   capacity_++;
 
   // Notify each consumer channel about the new buffer.
-  for (auto consumer_channel : consumer_channels_) {
+  for (auto* consumer_channel : consumer_channels_) {
     ALOGD(
         "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
         "buffer, buffer_id=%d",