Add shared memory based buffer metadata

This CLs reduces BufferHub CPU consumption by adding asynchronous
state transition so that out-of-process VR composition can run on 2016
pixel devices smoothly. In addition, this CL addresses a couple corner
cases in the existing bufferhub logic, which fixes various blackscreen
issues.

1/ Tracks buffer transition states (gained, posted, acquired, released)
   from the client side via atomic shared memory and adds
   PostAsync/AcquireAsync/ReleaseAsync/GainAsync with metadata  and
   fence support.
2/ Adds dequeue order guarantee for buffers enqueued with
   dvrWriteBufferQueuePostBuffer.
3/ Synchronous BuffeHub operations are still supported.
4/ Bump up the bufferhubd's soft limit of open file descriptor.
5/ Handle orphaned consumer in acquired state. This is a corner case
   that consumer process goes aways (most likely due to a crash) leaving
   buffer stuck in acquired state with inconsistent buffer state.
6/ Fixes a race condition for released buffer to be Gain'ed and
   Acquire'd when a new consumer is created in released state.
7/ Improve silent consumer queue efficiency: Silent queues no longer
   import buffers or receive signals about new buffers and they are
   limited to only spawning other consumers and notifications about
   producers hanging up.
8/ Modify PDX/UDS channel event signaling to work around epoll
   behavior. PDX UDS uses a combination of an eventfd and an epoll set
   to simulate the original PDX transport channel events. An odd
   behavior discovered in the kernel implementation of epoll was found
   that causes the epoll fd to "unsignal" itself whenever epoll_wait()
   is called on it, regardless of whether it should still be
   pending. This breaks the edge triggerd behavior in nested epoll sets
   that channel events depend on. Since this is unlikely to ever be
   fixed in the kernel we work around the behavior by using the epoll
   set only as a logical OR of two eventfds and never calling
   epoll_wait() on it. When polling is required we use regluar poll()
   with the eventfds and data fd to avoid the bad behavior in
   epoll_wait().
9/ Keep reading data after PDX hangup signal. UDS will signal hangup
   when the other end of the socket closes. However, data could still be
   in the kerenl buffer and should be consumed. Fix an issue where the
   service misses an impulse sent right before the socket is closed.

Bug: 65455724
Bug: 65458354
Bug: 65458312
Bug: 64027135
Bug: 67424527
Test: libpdx_uds_tests
      bufferhub_tests
      buffer_hub_queue-test
      buffer_hub_queue_producer-test
      dvr_api-test

Change-Id: Id07db1f206ccf4e06f7ee3c671193334408971ca
diff --git a/services/vr/bufferhubd/Android.mk b/services/vr/bufferhubd/Android.mk
index 97f0332..28cf53d 100644
--- a/services/vr/bufferhubd/Android.mk
+++ b/services/vr/bufferhubd/Android.mk
@@ -22,6 +22,9 @@
     consumer_queue_channel.cpp \
     producer_queue_channel.cpp \
 
+headerLibraries := \
+	libdvr_headers
+
 staticLibraries := \
 	libperformance \
 	libpdx_default_transport \
@@ -41,6 +44,7 @@
 LOCAL_CFLAGS := -DLOG_TAG=\"bufferhubd\"
 LOCAL_CFLAGS += -DTRACE=0
 LOCAL_CFLAGS += -DATRACE_TAG=ATRACE_TAG_GRAPHICS
+LOCAL_HEADER_LIBRARIES := $(headerLibraries)
 LOCAL_STATIC_LIBRARIES := $(staticLibraries)
 LOCAL_SHARED_LIBRARIES := $(sharedLibraries)
 LOCAL_MODULE := bufferhubd
diff --git a/services/vr/bufferhubd/buffer_hub.cpp b/services/vr/bufferhubd/buffer_hub.cpp
index 26843c9..cdb1f91 100644
--- a/services/vr/bufferhubd/buffer_hub.cpp
+++ b/services/vr/bufferhubd/buffer_hub.cpp
@@ -20,8 +20,8 @@
 using android::pdx::ErrorStatus;
 using android::pdx::Message;
 using android::pdx::Status;
-using android::pdx::rpc::DispatchRemoteMethod;
 using android::pdx::default_transport::Endpoint;
+using android::pdx::rpc::DispatchRemoteMethod;
 
 namespace android {
 namespace dvr {
@@ -53,7 +53,15 @@
   stream << " ";
   stream << std::setw(6) << "Format";
   stream << " ";
-  stream << std::setw(11) << "Usage";
+  stream << std::setw(10) << "Usage";
+  stream << " ";
+  stream << std::setw(9) << "Pending";
+  stream << " ";
+  stream << std::setw(18) << "State";
+  stream << " ";
+  stream << std::setw(18) << "Signaled";
+  stream << " ";
+  stream << std::setw(10) << "Index";
   stream << " ";
   stream << "Name";
   stream << std::endl;
@@ -83,46 +91,15 @@
       stream << std::setw(8) << info.usage;
       stream << std::dec << std::setfill(' ');
       stream << " ";
-      stream << info.name;
-      stream << std::endl;
-    }
-  }
-
-  stream << "Active Consumer Buffers:\n";
-  stream << std::right;
-  stream << std::setw(6) << "Id";
-  stream << " ";
-  stream << std::setw(14) << "Geometry";
-  stream << " ";
-  stream << "Name";
-  stream << std::endl;
-
-  for (const auto& channel : channels) {
-    if (channel->channel_type() == BufferHubChannel::kConsumerType) {
-      BufferHubChannel::BufferInfo info = channel->GetBufferInfo();
-
-      stream << std::right;
-      stream << std::setw(6) << info.id;
+      stream << std::setw(9) << info.pending_count;
       stream << " ";
-
-      if (info.consumer_count == 0) {
-        // consumer_count is tracked by producer. When it's zero, producer must
-        // have already hung up and the consumer is orphaned.
-        stream << std::setw(14) << "Orphaned.";
-        stream << (" channel_id=" + std::to_string(channel->channel_id()));
-        stream << std::endl;
-        continue;
-      }
-
-      if (info.format == HAL_PIXEL_FORMAT_BLOB) {
-        std::string size = std::to_string(info.width) + " B";
-        stream << std::setw(14) << size;
-      } else {
-        std::string dimensions = std::to_string(info.width) + "x" +
-                                 std::to_string(info.height) + "x" +
-                                 std::to_string(info.layer_count);
-        stream << std::setw(14) << dimensions;
-      }
+      stream << "0x" << std::hex << std::setfill('0');
+      stream << std::setw(16) << info.state;
+      stream << " ";
+      stream << "0x" << std::setw(16) << info.signaled_mask;
+      stream << std::dec << std::setfill(' ');
+      stream << " ";
+      stream << std::setw(8) << info.index;
       stream << " ";
       stream << info.name;
       stream << std::endl;
@@ -184,6 +161,32 @@
     }
   }
 
+  stream << std::endl;
+  stream << "Orphaned Consumer Buffers:\n";
+  stream << std::right;
+  stream << std::setw(6) << "Id";
+  stream << " ";
+  stream << std::setw(14) << "Geometry";
+  stream << " ";
+  stream << "Name";
+  stream << std::endl;
+
+  for (const auto& channel : channels) {
+    BufferHubChannel::BufferInfo info = channel->GetBufferInfo();
+    // consumer_count is tracked by producer. When it's zero, producer must have
+    // already hung up and the consumer is orphaned.
+    if (channel->channel_type() == BufferHubChannel::kConsumerType &&
+        info.consumer_count == 0) {
+      stream << std::right;
+      stream << std::setw(6) << info.id;
+      stream << " ";
+
+      stream << std::setw(14) << "Orphaned.";
+      stream << (" channel_id=" + std::to_string(channel->channel_id()));
+      stream << std::endl;
+    }
+  }
+
   return stream.str();
 }
 
@@ -444,6 +447,7 @@
            "BufferHubChannel::SignalAvailable: channel_id=%d buffer_id=%d",
            channel_id(), buffer_id());
   if (!IsDetached()) {
+    signaled_ = true;
     const auto status = service_->ModifyChannelEvents(channel_id_, 0, POLLIN);
     ALOGE_IF(!status,
              "BufferHubChannel::SignalAvailable: failed to signal availability "
@@ -460,6 +464,7 @@
            "BufferHubChannel::ClearAvailable: channel_id=%d buffer_id=%d",
            channel_id(), buffer_id());
   if (!IsDetached()) {
+    signaled_ = false;
     const auto status = service_->ModifyChannelEvents(channel_id_, POLLIN, 0);
     ALOGE_IF(!status,
              "BufferHubChannel::ClearAvailable: failed to clear availability "
diff --git a/services/vr/bufferhubd/buffer_hub.h b/services/vr/bufferhubd/buffer_hub.h
index b0df11f..270ac95 100644
--- a/services/vr/bufferhubd/buffer_hub.h
+++ b/services/vr/bufferhubd/buffer_hub.h
@@ -53,6 +53,10 @@
     uint32_t layer_count = 0;
     uint32_t format = 0;
     uint64_t usage = 0;
+    size_t pending_count = 0;
+    uint64_t state = 0;
+    uint64_t signaled_mask = 0;
+    uint64_t index = 0;
     std::string name;
 
     // Data filed for producer queue.
@@ -60,7 +64,9 @@
     UsagePolicy usage_policy{0, 0, 0, 0};
 
     BufferInfo(int id, size_t consumer_count, uint32_t width, uint32_t height,
-               uint32_t layer_count, uint32_t format, uint64_t usage, const std::string& name)
+               uint32_t layer_count, uint32_t format, uint64_t usage,
+               size_t pending_count, uint64_t state, uint64_t signaled_mask,
+               uint64_t index, const std::string& name)
         : id(id),
           type(kProducerType),
           consumer_count(consumer_count),
@@ -69,6 +75,10 @@
           layer_count(layer_count),
           format(format),
           usage(usage),
+          pending_count(pending_count),
+          state(state),
+          signaled_mask(signaled_mask),
+          index(index),
           name(name) {}
 
     BufferInfo(int id, size_t consumer_count, size_t capacity,
@@ -101,6 +111,8 @@
   int channel_id() const { return channel_id_; }
   bool IsDetached() const { return channel_id_ == kDetachedId; }
 
+  bool signaled() const { return signaled_; }
+
   void Detach() {
     if (channel_type_ == kProducerType)
       channel_id_ = kDetachedId;
@@ -124,6 +136,8 @@
   // buffer if it is detached and re-attached to another channel.
   int channel_id_;
 
+  bool signaled_;
+
   ChannelType channel_type_;
 
   BufferHubChannel(const BufferHubChannel&) = delete;
diff --git a/services/vr/bufferhubd/bufferhubd.cpp b/services/vr/bufferhubd/bufferhubd.cpp
index 1613821..b27f218 100644
--- a/services/vr/bufferhubd/bufferhubd.cpp
+++ b/services/vr/bufferhubd/bufferhubd.cpp
@@ -2,6 +2,7 @@
 #include <unistd.h>
 
 #include <log/log.h>
+#include <sys/resource.h>
 
 #include <dvr/performance_client_api.h>
 #include <pdx/service_dispatcher.h>
@@ -16,6 +17,23 @@
   // We need to be able to create endpoints with full perms.
   umask(0000);
 
+  // Bump up the soft limit of open fd to the hard limit.
+  struct rlimit64 rlim;
+  ret = getrlimit64(RLIMIT_NOFILE, &rlim);
+  LOG_ALWAYS_FATAL_IF(ret != 0, "Failed to get nofile limit.");
+
+  ALOGI("Current nofile limit is %llu/%llu.", rlim.rlim_cur, rlim.rlim_max);
+  rlim.rlim_cur = rlim.rlim_max;
+  ret = setrlimit64(RLIMIT_NOFILE, &rlim);
+  ALOGE_IF(ret < 0, "Failed to set nofile limit, error=%s", strerror(errno));
+
+  rlim.rlim_cur = -1;
+  rlim.rlim_max = -1;
+  if (getrlimit64(RLIMIT_NOFILE, &rlim) < 0)
+    ALOGE("Failed to get nofile limit.");
+  else
+    ALOGI("New nofile limit is %llu/%llu.", rlim.rlim_cur, rlim.rlim_max);
+
   dispatcher = android::pdx::ServiceDispatcher::Create();
   CHECK_ERROR(!dispatcher, error, "Failed to create service dispatcher\n");
 
diff --git a/services/vr/bufferhubd/consumer_channel.cpp b/services/vr/bufferhubd/consumer_channel.cpp
index ac6896a..a6d2dbb 100644
--- a/services/vr/bufferhubd/consumer_channel.cpp
+++ b/services/vr/bufferhubd/consumer_channel.cpp
@@ -19,9 +19,10 @@
 namespace dvr {
 
 ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id,
-                                 int channel_id,
+                                 int channel_id, uint64_t consumer_state_bit,
                                  const std::shared_ptr<Channel> producer)
     : BufferHubChannel(service, buffer_id, channel_id, kConsumerType),
+      consumer_state_bit_(consumer_state_bit),
       producer_(producer) {
   GetProducer()->AddConsumer(this);
 }
@@ -32,8 +33,6 @@
            channel_id(), buffer_id());
 
   if (auto producer = GetProducer()) {
-    if (!released_)  // Producer is waiting for our Release.
-      producer->OnConsumerIgnored();
     producer->RemoveConsumer(this);
   }
 }
@@ -43,6 +42,8 @@
   if (auto producer = GetProducer()) {
     // If producer has not hung up, copy most buffer info from the producer.
     info = producer->GetBufferInfo();
+  } else {
+    info.signaled_mask = consumer_state_bit();
   }
   info.id = buffer_id();
   return info;
@@ -55,6 +56,9 @@
 void ConsumerChannel::HandleImpulse(Message& message) {
   ATRACE_NAME("ConsumerChannel::HandleImpulse");
   switch (message.GetOp()) {
+    case BufferHubRPC::ConsumerAcquire::Opcode:
+      OnConsumerAcquire(message);
+      break;
     case BufferHubRPC::ConsumerRelease::Opcode:
       OnConsumerRelease(message, {});
       break;
@@ -70,7 +74,7 @@
   switch (message.GetOp()) {
     case BufferHubRPC::GetBuffer::Opcode:
       DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
-          *producer, &ProducerChannel::OnGetBuffer, message);
+          *this, &ConsumerChannel::OnGetBuffer, message);
       return true;
 
     case BufferHubRPC::NewConsumer::Opcode:
@@ -98,9 +102,18 @@
   }
 }
 
-Status<std::pair<BorrowedFence, ConsumerChannel::MetaData>>
-ConsumerChannel::OnConsumerAcquire(Message& message,
-                                   std::size_t metadata_size) {
+Status<BufferDescription<BorrowedHandle>> ConsumerChannel::OnGetBuffer(
+    Message& /*message*/) {
+  ATRACE_NAME("ConsumerChannel::OnGetBuffer");
+  ALOGD_IF(TRACE, "ConsumerChannel::OnGetBuffer: buffer=%d", buffer_id());
+  if (auto producer = GetProducer()) {
+    return {producer->GetBuffer(consumer_state_bit_)};
+  } else {
+    return ErrorStatus(EPIPE);
+  }
+}
+
+Status<LocalFence> ConsumerChannel::OnConsumerAcquire(Message& message) {
   ATRACE_NAME("ConsumerChannel::OnConsumerAcquire");
   auto producer = GetProducer();
   if (!producer)
@@ -114,7 +127,7 @@
         producer->buffer_id());
     return ErrorStatus(EBUSY);
   } else {
-    auto status = producer->OnConsumerAcquire(message, metadata_size);
+    auto status = producer->OnConsumerAcquire(message);
     if (status) {
       ClearAvailable();
       acquired_ = true;
diff --git a/services/vr/bufferhubd/consumer_channel.h b/services/vr/bufferhubd/consumer_channel.h
index 208a002..55cf969 100644
--- a/services/vr/bufferhubd/consumer_channel.h
+++ b/services/vr/bufferhubd/consumer_channel.h
@@ -12,32 +12,35 @@
 // Consumer channels are attached to a Producer channel
 class ConsumerChannel : public BufferHubChannel {
  public:
+  using BorrowedHandle = pdx::BorrowedHandle;
   using Channel = pdx::Channel;
   using Message = pdx::Message;
 
   ConsumerChannel(BufferHubService* service, int buffer_id, int channel_id,
+                  uint64_t consumer_state_bit,
                   const std::shared_ptr<Channel> producer);
   ~ConsumerChannel() override;
 
   bool HandleMessage(Message& message) override;
   void HandleImpulse(Message& message) override;
 
+  uint64_t consumer_state_bit() const { return consumer_state_bit_; }
   BufferInfo GetBufferInfo() const override;
 
   bool OnProducerPosted();
   void OnProducerClosed();
 
  private:
-  using MetaData = pdx::rpc::BufferWrapper<std::uint8_t*>;
-
   std::shared_ptr<ProducerChannel> GetProducer() const;
 
-  pdx::Status<std::pair<BorrowedFence, MetaData>> OnConsumerAcquire(
-      Message& message, std::size_t metadata_size);
+  pdx::Status<BufferDescription<BorrowedHandle>> OnGetBuffer(Message& message);
+
+  pdx::Status<LocalFence> OnConsumerAcquire(Message& message);
   pdx::Status<void> OnConsumerRelease(Message& message,
                                       LocalFence release_fence);
   pdx::Status<void> OnConsumerSetIgnore(Message& message, bool ignore);
 
+  uint64_t consumer_state_bit_{0};
   bool acquired_{false};
   bool released_{true};
   bool ignored_{false};  // True if we are ignoring events.
diff --git a/services/vr/bufferhubd/consumer_queue_channel.cpp b/services/vr/bufferhubd/consumer_queue_channel.cpp
index f447e00..4d43001 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.cpp
+++ b/services/vr/bufferhubd/consumer_queue_channel.cpp
@@ -15,10 +15,11 @@
 
 ConsumerQueueChannel::ConsumerQueueChannel(
     BufferHubService* service, int buffer_id, int channel_id,
-    const std::shared_ptr<Channel>& producer)
+    const std::shared_ptr<Channel>& producer, bool silent)
     : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
       producer_(producer),
-      capacity_(0) {
+      capacity_(0),
+      silent_(silent) {
   GetProducer()->AddConsumer(this);
 }
 
@@ -83,23 +84,30 @@
 void ConsumerQueueChannel::RegisterNewBuffer(
     const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
   ALOGD_IF(TRACE,
-           "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu",
-           producer_channel->buffer_id(), slot);
-  pending_buffer_slots_.emplace(producer_channel, slot);
+           "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d "
+           "slot=%zu silent=%d",
+           buffer_id(), producer_channel->buffer_id(), slot, silent_);
+  // Only register buffers if the queue is not silent.
+  if (!silent_) {
+    pending_buffer_slots_.emplace(producer_channel, slot);
 
-  // Signal the client that there is new buffer available throught POLLIN.
-  SignalAvailable();
+    // Signal the client that there is new buffer available.
+    SignalAvailable();
+  }
 }
 
 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
   ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
-  ALOGD_IF(
-      TRACE,
-      "ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to "
-      "import: %zu",
-      pending_buffer_slots_.size());
+  ALOGD_IF(TRACE,
+           "ConsumerQueueChannel::OnConsumerQueueImportBuffers: "
+           "pending_buffer_slots=%zu",
+           pending_buffer_slots_.size());
+
+  // Indicate this is a silent queue that will not import buffers.
+  if (silent_)
+    return ErrorStatus(EBADR);
 
   while (!pending_buffer_slots_.empty()) {
     auto producer_channel = pending_buffer_slots_.front().first.lock();
diff --git a/services/vr/bufferhubd/consumer_queue_channel.h b/services/vr/bufferhubd/consumer_queue_channel.h
index aa3f531..8437c4c 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.h
+++ b/services/vr/bufferhubd/consumer_queue_channel.h
@@ -19,7 +19,7 @@
   using RemoteChannelHandle = pdx::RemoteChannelHandle;
 
   ConsumerQueueChannel(BufferHubService* service, int buffer_id, int channel_id,
-                       const std::shared_ptr<Channel>& producer);
+                       const std::shared_ptr<Channel>& producer, bool silent);
   ~ConsumerQueueChannel() override;
 
   bool HandleMessage(Message& message) override;
@@ -54,6 +54,10 @@
   // Tracks how many buffers have this queue imported.
   size_t capacity_;
 
+  // A silent queue does not signal or export buffers. It is only used to spawn
+  // another consumer queue.
+  bool silent_;
+
   ConsumerQueueChannel(const ConsumerQueueChannel&) = delete;
   void operator=(const ConsumerQueueChannel&) = delete;
 };
diff --git a/services/vr/bufferhubd/producer_channel.cpp b/services/vr/bufferhubd/producer_channel.cpp
index b2db795..716db5e 100644
--- a/services/vr/bufferhubd/producer_channel.cpp
+++ b/services/vr/bufferhubd/producer_channel.cpp
@@ -2,6 +2,8 @@
 
 #include <log/log.h>
 #include <sync/sync.h>
+#include <sys/epoll.h>
+#include <sys/eventfd.h>
 #include <sys/poll.h>
 #include <utils/Trace.h>
 
@@ -24,24 +26,88 @@
 namespace android {
 namespace dvr {
 
+namespace {
+
+static inline uint64_t FindNextClearedBit(uint64_t bits) {
+  return ~bits - (~bits & (~bits - 1));
+}
+
+}  // namespace
+
 ProducerChannel::ProducerChannel(BufferHubService* service, int channel_id,
                                  uint32_t width, uint32_t height,
                                  uint32_t layer_count, uint32_t format,
-                                 uint64_t usage, size_t meta_size_bytes,
+                                 uint64_t usage, size_t user_metadata_size,
                                  int* error)
     : BufferHubChannel(service, channel_id, channel_id, kProducerType),
       pending_consumers_(0),
       producer_owns_(true),
-      meta_size_bytes_(meta_size_bytes),
-      meta_(meta_size_bytes ? new uint8_t[meta_size_bytes] : nullptr) {
-  const int ret = buffer_.Alloc(width, height, layer_count, format, usage);
-  if (ret < 0) {
+      user_metadata_size_(user_metadata_size),
+      metadata_buf_size_(BufferHubDefs::kMetadataHeaderSize +
+                         user_metadata_size) {
+  if (int ret = buffer_.Alloc(width, height, layer_count, format, usage)) {
     ALOGE("ProducerChannel::ProducerChannel: Failed to allocate buffer: %s",
           strerror(-ret));
     *error = ret;
     return;
   }
 
+  if (int ret = metadata_buffer_.Alloc(metadata_buf_size_, /*height=*/1,
+                                       /*layer_count=*/1,
+                                       BufferHubDefs::kMetadataFormat,
+                                       BufferHubDefs::kMetadataUsage)) {
+    ALOGE("ProducerChannel::ProducerChannel: Failed to allocate metadata: %s",
+          strerror(-ret));
+    *error = ret;
+    return;
+  }
+
+  void* metadata_ptr = nullptr;
+  if (int ret = metadata_buffer_.Lock(BufferHubDefs::kMetadataUsage, /*x=*/0,
+                                      /*y=*/0, metadata_buf_size_,
+                                      /*height=*/1, &metadata_ptr)) {
+    ALOGE("ProducerChannel::ProducerChannel: Failed to lock metadata.");
+    *error = -ret;
+    return;
+  }
+  metadata_header_ =
+      reinterpret_cast<BufferHubDefs::MetadataHeader*>(metadata_ptr);
+
+  // Using placement new here to reuse shared memory instead of new allocation
+  // and also initialize the value to zero.
+  buffer_state_ =
+      new (&metadata_header_->buffer_state) std::atomic<uint64_t>(0);
+  fence_state_ =
+      new (&metadata_header_->fence_state) std::atomic<uint64_t>(0);
+
+  acquire_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
+  release_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
+  if (!acquire_fence_fd_ || !release_fence_fd_) {
+    ALOGE("ProducerChannel::ProducerChannel: Failed to create shared fences.");
+    *error = -EIO;
+    return;
+  }
+
+  dummy_fence_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
+  if (!dummy_fence_fd_) {
+    ALOGE("ProducerChannel::ProducerChannel: Failed to create dummy fences.");
+    *error = -EIO;
+    return;
+  }
+
+  epoll_event event;
+  event.events = 0;
+  event.data.u64 = 0ULL;
+  if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_ADD, dummy_fence_fd_.Get(),
+                &event) < 0) {
+    ALOGE(
+        "ProducerChannel::ProducerChannel: Failed to modify the shared "
+        "release fence to include the dummy fence: %s",
+        strerror(errno));
+    *error = -EIO;
+    return;
+  }
+
   // Success.
   *error = 0;
 }
@@ -49,11 +115,11 @@
 Status<std::shared_ptr<ProducerChannel>> ProducerChannel::Create(
     BufferHubService* service, int channel_id, uint32_t width, uint32_t height,
     uint32_t layer_count, uint32_t format, uint64_t usage,
-    size_t meta_size_bytes) {
+    size_t user_metadata_size) {
   int error;
   std::shared_ptr<ProducerChannel> producer(
       new ProducerChannel(service, channel_id, width, height, layer_count,
-                          format, usage, meta_size_bytes, &error));
+                          format, usage, user_metadata_size, &error));
   if (error < 0)
     return ErrorStatus(-error);
   else
@@ -62,16 +128,24 @@
 
 ProducerChannel::~ProducerChannel() {
   ALOGD_IF(TRACE,
-           "ProducerChannel::~ProducerChannel: channel_id=%d buffer_id=%d",
-           channel_id(), buffer_id());
+           "ProducerChannel::~ProducerChannel: channel_id=%d buffer_id=%d "
+           "state=%" PRIx64 ".",
+           channel_id(), buffer_id(), buffer_state_->load());
   for (auto consumer : consumer_channels_)
     consumer->OnProducerClosed();
 }
 
 BufferHubChannel::BufferInfo ProducerChannel::GetBufferInfo() const {
+  // Derive the mask of signaled buffers in this producer / consumer set.
+  uint64_t signaled_mask = signaled() ? BufferHubDefs::kProducerStateBit : 0;
+  for (const ConsumerChannel* consumer : consumer_channels_) {
+    signaled_mask |= consumer->signaled() ? consumer->consumer_state_bit() : 0;
+  }
+
   return BufferInfo(buffer_id(), consumer_channels_.size(), buffer_.width(),
                     buffer_.height(), buffer_.layer_count(), buffer_.format(),
-                    buffer_.usage(), name_);
+                    buffer_.usage(), pending_consumers_, buffer_state_->load(),
+                    signaled_mask, metadata_header_->queue_index, name_);
 }
 
 void ProducerChannel::HandleImpulse(Message& message) {
@@ -80,6 +154,9 @@
     case BufferHubRPC::ProducerGain::Opcode:
       OnProducerGain(message);
       break;
+    case BufferHubRPC::ProducerPost::Opcode:
+      OnProducerPost(message, {});
+      break;
   }
 }
 
@@ -121,16 +198,26 @@
   }
 }
 
-Status<NativeBufferHandle<BorrowedHandle>> ProducerChannel::OnGetBuffer(
+BufferDescription<BorrowedHandle> ProducerChannel::GetBuffer(
+    uint64_t buffer_state_bit) {
+  return {
+      buffer_,          metadata_buffer_,           buffer_id(),
+      buffer_state_bit, acquire_fence_fd_.Borrow(), release_fence_fd_.Borrow()};
+}
+
+Status<BufferDescription<BorrowedHandle>> ProducerChannel::OnGetBuffer(
     Message& /*message*/) {
   ATRACE_NAME("ProducerChannel::OnGetBuffer");
-  ALOGD_IF(TRACE, "ProducerChannel::OnGetBuffer: buffer=%d", buffer_id());
-  return {NativeBufferHandle<BorrowedHandle>(buffer_, buffer_id())};
+  ALOGD_IF(TRACE, "ProducerChannel::OnGetBuffer: buffer=%d, state=%" PRIx64 ".",
+           buffer_id(), buffer_state_->load());
+  return {GetBuffer(BufferHubDefs::kProducerStateBit)};
 }
 
 Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) {
   ATRACE_NAME("ProducerChannel::CreateConsumer");
-  ALOGD_IF(TRACE, "ProducerChannel::CreateConsumer: buffer_id=%d", buffer_id());
+  ALOGD_IF(TRACE,
+           "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d",
+           buffer_id(), producer_owns_);
 
   int channel_id;
   auto status = message.PushChannel(0, nullptr, &channel_id);
@@ -141,8 +228,21 @@
     return ErrorStatus(ENOMEM);
   }
 
-  auto consumer = std::make_shared<ConsumerChannel>(
-      service(), buffer_id(), channel_id, shared_from_this());
+  // Try find the next consumer state bit which has not been claimed by any
+  // consumer yet.
+  uint64_t consumer_state_bit = FindNextClearedBit(
+      active_consumer_bit_mask_ | orphaned_consumer_bit_mask_ |
+      BufferHubDefs::kProducerStateBit);
+  if (consumer_state_bit == 0ULL) {
+    ALOGE(
+        "ProducerChannel::CreateConsumer: reached the maximum mumber of "
+        "consumers per producer: 63.");
+    return ErrorStatus(E2BIG);
+  }
+
+  auto consumer =
+      std::make_shared<ConsumerChannel>(service(), buffer_id(), channel_id,
+                                        consumer_state_bit, shared_from_this());
   const auto channel_status = service()->SetChannel(channel_id, consumer);
   if (!channel_status) {
     ALOGE(
@@ -152,12 +252,14 @@
     return ErrorStatus(ENOMEM);
   }
 
-  if (!producer_owns_) {
+  if (!producer_owns_ &&
+      !BufferHubDefs::IsBufferReleased(buffer_state_->load())) {
     // Signal the new consumer when adding it to a posted producer.
     if (consumer->OnProducerPosted())
       pending_consumers_++;
   }
 
+  active_consumer_bit_mask_ |= consumer_state_bit;
   return {status.take()};
 }
 
@@ -168,8 +270,7 @@
 }
 
 Status<void> ProducerChannel::OnProducerPost(
-    Message&, LocalFence acquire_fence,
-    BufferWrapper<std::vector<std::uint8_t>> metadata) {
+    Message&, LocalFence acquire_fence) {
   ATRACE_NAME("ProducerChannel::OnProducerPost");
   ALOGD_IF(TRACE, "ProducerChannel::OnProducerPost: buffer_id=%d", buffer_id());
   if (!producer_owns_) {
@@ -177,27 +278,45 @@
     return ErrorStatus(EBUSY);
   }
 
-  if (meta_size_bytes_ != metadata.size()) {
-    ALOGD_IF(TRACE,
-             "ProducerChannel::OnProducerPost: Expected meta_size_bytes=%zu "
-             "got size=%zu",
-             meta_size_bytes_, metadata.size());
-    return ErrorStatus(EINVAL);
+  epoll_event event;
+  event.events = 0;
+  event.data.u64 = 0ULL;
+  int ret = epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD,
+                      dummy_fence_fd_.Get(), &event);
+  ALOGE_IF(ret < 0,
+      "ProducerChannel::OnProducerPost: Failed to modify the shared "
+      "release fence to include the dummy fence: %s",
+      strerror(errno));
+
+  eventfd_t dummy_fence_count = 0ULL;
+  if (eventfd_read(dummy_fence_fd_.Get(), &dummy_fence_count) < 0) {
+    const int error = errno;
+    if (error != EAGAIN) {
+      ALOGE(
+          "ProducerChannel::ProducerChannel: Failed to read dummy fence, "
+          "error: %s",
+          strerror(error));
+      return ErrorStatus(error);
+    }
   }
 
-  std::copy(metadata.begin(), metadata.end(), meta_.get());
+  ALOGW_IF(dummy_fence_count > 0,
+           "ProducerChannel::ProducerChannel: %" PRIu64
+           " dummy fence(s) was signaled during last release/gain cycle "
+           "buffer_id=%d.",
+           dummy_fence_count, buffer_id());
+
   post_fence_ = std::move(acquire_fence);
   producer_owns_ = false;
 
-  // Signal any interested consumers. If there are none, automatically release
-  // the buffer.
+  // Signal any interested consumers. If there are none, the buffer will stay
+  // in posted state until a consumer comes online. This behavior guarantees
+  // that no frame is silently dropped.
   pending_consumers_ = 0;
   for (auto consumer : consumer_channels_) {
     if (consumer->OnProducerPosted())
       pending_consumers_++;
   }
-  if (pending_consumers_ == 0)
-    SignalAvailable();
   ALOGD_IF(TRACE, "ProducerChannel::OnProducerPost: %d pending consumers",
            pending_consumers_);
 
@@ -214,8 +333,13 @@
   }
 
   // There are still pending consumers, return busy.
-  if (pending_consumers_ > 0)
+  if (pending_consumers_ > 0) {
+    ALOGE(
+        "ProducerChannel::OnGain: Producer (id=%d) is gaining a buffer that "
+        "still has %d pending consumer(s).",
+        buffer_id(), pending_consumers_);
     return ErrorStatus(EBUSY);
+  }
 
   ClearAvailable();
   producer_owns_ = true;
@@ -223,9 +347,7 @@
   return {std::move(returned_fence_)};
 }
 
-Status<std::pair<BorrowedFence, BufferWrapper<std::uint8_t*>>>
-ProducerChannel::OnConsumerAcquire(Message& /*message*/,
-                                   std::size_t metadata_size) {
+Status<LocalFence> ProducerChannel::OnConsumerAcquire(Message& /*message*/) {
   ATRACE_NAME("ProducerChannel::OnConsumerAcquire");
   ALOGD_IF(TRACE, "ProducerChannel::OnConsumerAcquire: buffer_id=%d",
            buffer_id());
@@ -236,12 +358,7 @@
 
   // Return a borrowed fd to avoid unnecessary duplication of the underlying fd.
   // Serialization just needs to read the handle.
-  if (metadata_size == 0)
-    return {std::make_pair(post_fence_.borrow(),
-                           WrapBuffer<std::uint8_t>(nullptr, 0))};
-  else
-    return {std::make_pair(post_fence_.borrow(),
-                           WrapBuffer(meta_.get(), meta_size_bytes_))};
+  return {std::move(post_fence_)};
 }
 
 Status<void> ProducerChannel::OnConsumerRelease(Message&,
@@ -273,17 +390,75 @@
   }
 
   OnConsumerIgnored();
+  if (pending_consumers_ == 0) {
+    // Clear the producer bit atomically to transit into released state. This
+    // has to done by BufferHub as it requries synchronization among all
+    // consumers.
+    BufferHubDefs::ModifyBufferState(buffer_state_,
+                                     BufferHubDefs::kProducerStateBit, 0ULL);
+    ALOGD_IF(TRACE,
+             "ProducerChannel::OnConsumerRelease: releasing last consumer: "
+             "buffer_id=%d state=%" PRIx64 ".",
+             buffer_id(), buffer_state_->load());
+
+    if (orphaned_consumer_bit_mask_) {
+      ALOGW(
+          "ProducerChannel::OnConsumerRelease: orphaned buffer detected "
+          "during the this acquire/release cycle: id=%d orphaned=0x%" PRIx64
+          " queue_index=%" PRIu64 ".",
+          buffer_id(), orphaned_consumer_bit_mask_,
+          metadata_header_->queue_index);
+      orphaned_consumer_bit_mask_ = 0;
+    }
+
+    SignalAvailable();
+  }
+
+  ALOGE_IF(pending_consumers_ &&
+               BufferHubDefs::IsBufferReleased(buffer_state_->load()),
+           "ProducerChannel::OnConsumerRelease: buffer state inconsistent: "
+           "pending_consumers=%d, buffer buffer is in releaed state.",
+           pending_consumers_);
   return {};
 }
 
 void ProducerChannel::OnConsumerIgnored() {
-  if (!--pending_consumers_)
-    SignalAvailable();
+  if (pending_consumers_ == 0) {
+    ALOGE("ProducerChannel::OnConsumerIgnored: no pending consumer.");
+    return;
+  }
+
+  --pending_consumers_;
   ALOGD_IF(TRACE,
            "ProducerChannel::OnConsumerIgnored: buffer_id=%d %d consumers left",
            buffer_id(), pending_consumers_);
 }
 
+void ProducerChannel::OnConsumerOrphaned(ConsumerChannel* channel) {
+  // Ignore the orphaned consumer.
+  OnConsumerIgnored();
+
+  const uint64_t consumer_state_bit = channel->consumer_state_bit();
+  ALOGE_IF(orphaned_consumer_bit_mask_ & consumer_state_bit,
+           "ProducerChannel::OnConsumerOrphaned: Consumer "
+           "(consumer_state_bit=%" PRIx64 ") is already orphaned.",
+           consumer_state_bit);
+  orphaned_consumer_bit_mask_ |= consumer_state_bit;
+
+  // Atomically clear the fence state bit as an orphaned consumer will never
+  // signal a release fence. Also clear the buffer state as it won't be released
+  // as well.
+  fence_state_->fetch_and(~consumer_state_bit);
+  BufferHubDefs::ModifyBufferState(buffer_state_, consumer_state_bit, 0ULL);
+
+  ALOGW(
+      "ProducerChannel::OnConsumerOrphaned: detected new orphaned consumer "
+      "buffer_id=%d consumer_state_bit=%" PRIx64 " queue_index=%" PRIu64
+      " buffer_state=%" PRIx64 " fence_state=%" PRIx64 ".",
+      buffer_id(), consumer_state_bit, metadata_header_->queue_index,
+      buffer_state_->load(), fence_state_->load());
+}
+
 Status<void> ProducerChannel::OnProducerMakePersistent(Message& message,
                                                        const std::string& name,
                                                        int user_id,
@@ -335,6 +510,40 @@
 void ProducerChannel::RemoveConsumer(ConsumerChannel* channel) {
   consumer_channels_.erase(
       std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
+  active_consumer_bit_mask_ &= ~channel->consumer_state_bit();
+
+  const uint64_t buffer_state = buffer_state_->load();
+  if (BufferHubDefs::IsBufferPosted(buffer_state) ||
+      BufferHubDefs::IsBufferAcquired(buffer_state)) {
+    // The consumer client is being destoryed without releasing. This could
+    // happen in corner cases when the consumer crashes. Here we mark it
+    // orphaned before remove it from producer.
+    OnConsumerOrphaned(channel);
+  }
+
+  if (BufferHubDefs::IsBufferReleased(buffer_state) ||
+      BufferHubDefs::IsBufferGained(buffer_state)) {
+    // The consumer is being close while it is suppose to signal a release
+    // fence. Signal the dummy fence here.
+    if (fence_state_->load() & channel->consumer_state_bit()) {
+      epoll_event event;
+      event.events = EPOLLIN;
+      event.data.u64 = channel->consumer_state_bit();
+      if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD,
+                    dummy_fence_fd_.Get(), &event) < 0) {
+        ALOGE(
+            "ProducerChannel::RemoveConsumer: Failed to modify the shared "
+            "release fence to include the dummy fence: %s",
+            strerror(errno));
+        return;
+      }
+      ALOGW(
+          "ProducerChannel::RemoveConsumer: signal dummy release fence "
+          "buffer_id=%d",
+          buffer_id());
+      eventfd_write(dummy_fence_fd_.Get(), 1);
+    }
+  }
 }
 
 // Returns true if either the user or group ids match the owning ids or both
@@ -350,10 +559,12 @@
 // Returns true if the given parameters match the underlying buffer parameters.
 bool ProducerChannel::CheckParameters(uint32_t width, uint32_t height,
                                       uint32_t layer_count, uint32_t format,
-                                      uint64_t usage, size_t meta_size_bytes) {
-  return meta_size_bytes == meta_size_bytes_ && buffer_.width() == width &&
-         buffer_.height() == height && buffer_.layer_count() == layer_count &&
-         buffer_.format() == format && buffer_.usage() == usage;
+                                      uint64_t usage,
+                                      size_t user_metadata_size) {
+  return user_metadata_size == user_metadata_size_ &&
+         buffer_.width() == width && buffer_.height() == height &&
+         buffer_.layer_count() == layer_count && buffer_.format() == format &&
+         buffer_.usage() == usage;
 }
 
 }  // namespace dvr
diff --git a/services/vr/bufferhubd/producer_channel.h b/services/vr/bufferhubd/producer_channel.h
index 5ada478..e280f4d 100644
--- a/services/vr/bufferhubd/producer_channel.h
+++ b/services/vr/bufferhubd/producer_channel.h
@@ -33,7 +33,7 @@
   static pdx::Status<std::shared_ptr<ProducerChannel>> Create(
       BufferHubService* service, int channel_id, uint32_t width,
       uint32_t height, uint32_t layer_count, uint32_t format, uint64_t usage,
-      size_t meta_size_bytes);
+      size_t user_metadata_size);
 
   ~ProducerChannel() override;
 
@@ -42,24 +42,25 @@
 
   BufferInfo GetBufferInfo() const override;
 
-  pdx::Status<NativeBufferHandle<BorrowedHandle>> OnGetBuffer(Message& message);
+  BufferDescription<BorrowedHandle> GetBuffer(uint64_t buffer_state_bit);
 
   pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message);
   pdx::Status<RemoteChannelHandle> OnNewConsumer(Message& message);
 
-  pdx::Status<std::pair<BorrowedFence, BufferWrapper<std::uint8_t*>>>
-  OnConsumerAcquire(Message& message, std::size_t metadata_size);
+  pdx::Status<LocalFence> OnConsumerAcquire(Message& message);
   pdx::Status<void> OnConsumerRelease(Message& message,
                                       LocalFence release_fence);
 
   void OnConsumerIgnored();
+  void OnConsumerOrphaned(ConsumerChannel* channel);
 
   void AddConsumer(ConsumerChannel* channel);
   void RemoveConsumer(ConsumerChannel* channel);
 
   bool CheckAccess(int euid, int egid);
   bool CheckParameters(uint32_t width, uint32_t height, uint32_t layer_count,
-                       uint32_t format, uint64_t usage, size_t meta_size_bytes);
+                       uint32_t format, uint64_t usage,
+                       size_t user_metadata_size);
 
   pdx::Status<void> OnProducerMakePersistent(Message& message,
                                              const std::string& name,
@@ -74,11 +75,28 @@
 
   IonBuffer buffer_;
 
+  // IonBuffer that is shared between bufferhubd, producer, and consumers.
+  IonBuffer metadata_buffer_;
+  BufferHubDefs::MetadataHeader* metadata_header_ = nullptr;
+  std::atomic<uint64_t>* buffer_state_ = nullptr;
+  std::atomic<uint64_t>* fence_state_ = nullptr;
+
+  // All active consumer bits. Valid bits are the lower 63 bits, while the
+  // highest bit is reserved for the producer and should not be set.
+  uint64_t active_consumer_bit_mask_{0ULL};
+  // All orphaned consumer bits. Valid bits are the lower 63 bits, while the
+  // highest bit is reserved for the producer and should not be set.
+  uint64_t orphaned_consumer_bit_mask_{0ULL};
+
   bool producer_owns_;
   LocalFence post_fence_;
   LocalFence returned_fence_;
-  size_t meta_size_bytes_;
-  std::unique_ptr<uint8_t[]> meta_;
+  size_t user_metadata_size_;  // size of user requested buffer buffer size.
+  size_t metadata_buf_size_;  // size of the ion buffer that holds metadata.
+
+  pdx::LocalHandle acquire_fence_fd_;
+  pdx::LocalHandle release_fence_fd_;
+  pdx::LocalHandle dummy_fence_fd_;
 
   static constexpr int kNoCheckId = -1;
   static constexpr int kUseCallerId = 0;
@@ -92,11 +110,10 @@
 
   ProducerChannel(BufferHubService* service, int channel, uint32_t width,
                   uint32_t height, uint32_t layer_count, uint32_t format,
-                  uint64_t usage, size_t meta_size_bytes, int* error);
+                  uint64_t usage, size_t user_metadata_size, int* error);
 
-  pdx::Status<void> OnProducerPost(
-      Message& message, LocalFence acquire_fence,
-      BufferWrapper<std::vector<std::uint8_t>> metadata);
+  pdx::Status<BufferDescription<BorrowedHandle>> OnGetBuffer(Message& message);
+  pdx::Status<void> OnProducerPost(Message& message, LocalFence acquire_fence);
   pdx::Status<LocalFence> OnProducerGain(Message& message);
 
   ProducerChannel(const ProducerChannel&) = delete;
diff --git a/services/vr/bufferhubd/producer_queue_channel.cpp b/services/vr/bufferhubd/producer_queue_channel.cpp
index b8bb728..c0c48c2 100644
--- a/services/vr/bufferhubd/producer_queue_channel.cpp
+++ b/services/vr/bufferhubd/producer_queue_channel.cpp
@@ -7,8 +7,8 @@
 
 using android::pdx::ErrorStatus;
 using android::pdx::Message;
-using android::pdx::Status;
 using android::pdx::RemoteChannelHandle;
+using android::pdx::Status;
 using android::pdx::rpc::DispatchRemoteMethod;
 
 namespace android {
@@ -96,10 +96,12 @@
 }
 
 Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue(
-    Message& message) {
+    Message& message, bool silent) {
   ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue");
-  ALOGD_IF(TRACE, "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d",
-           channel_id());
+  ALOGD_IF(
+      TRACE,
+      "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d slient=%d",
+      channel_id(), silent);
 
   int channel_id;
   auto status = message.PushChannel(0, nullptr, &channel_id);
@@ -112,7 +114,7 @@
   }
 
   auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
-      service(), buffer_id(), channel_id, shared_from_this());
+      service(), buffer_id(), channel_id, shared_from_this(), silent);
 
   // Register the existing buffers with the new consumer queue.
   for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
@@ -222,7 +224,7 @@
 
   auto producer_channel_status =
       ProducerChannel::Create(service(), buffer_id, width, height, layer_count,
-                              format, usage, config_.meta_size_bytes);
+                              format, usage, config_.user_metadata_size);
   if (!producer_channel_status) {
     ALOGE(
         "ProducerQueueChannel::AllocateBuffer: Failed to create producer "
diff --git a/services/vr/bufferhubd/producer_queue_channel.h b/services/vr/bufferhubd/producer_queue_channel.h
index fd519c5..e825f47 100644
--- a/services/vr/bufferhubd/producer_queue_channel.h
+++ b/services/vr/bufferhubd/producer_queue_channel.h
@@ -26,7 +26,7 @@
   // Returns a handle for the service channel, as well as the size of the
   // metadata associated with the queue.
   pdx::Status<pdx::RemoteChannelHandle> OnCreateConsumerQueue(
-      pdx::Message& message);
+      pdx::Message& message, bool silent);
 
   pdx::Status<QueueInfo> OnGetQueueInfo(pdx::Message& message);