Merge "Create consumer state mask(s) immediately after producer allocate new buffer."
diff --git a/services/vr/bufferhubd/consumer_queue_channel.cpp b/services/vr/bufferhubd/consumer_queue_channel.cpp
index 74b549d..5d7d4e9 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.cpp
+++ b/services/vr/bufferhubd/consumer_queue_channel.cpp
@@ -80,27 +80,35 @@
 }
 
 void ConsumerQueueChannel::RegisterNewBuffer(
-    const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
-  ALOGD_IF(TRACE,
-           "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d "
-           "slot=%zu silent=%d",
-           buffer_id(), producer_channel->buffer_id(), slot, silent_);
+    const std::shared_ptr<ProducerChannel>& producer_channel,
+    size_t producer_slot) {
+  ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d",
+           __FUNCTION__, buffer_id(), producer_channel->buffer_id(),
+           producer_slot, silent_);
   // Only register buffers if the queue is not silent.
-  if (!silent_) {
-    pending_buffer_slots_.emplace(producer_channel, slot);
-
-    // Signal the client that there is new buffer available.
-    SignalAvailable();
+  if (silent_) {
+    return;
   }
+
+  auto status = producer_channel->CreateConsumerStateMask();
+  if (!status.ok()) {
+    ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__,
+          status.GetErrorMessage().c_str());
+    return;
+  }
+  uint64_t consumer_state_mask = status.get();
+
+  pending_buffer_slots_.emplace(producer_channel, producer_slot,
+                                consumer_state_mask);
+  // Signal the client that there is new buffer available.
+  SignalAvailable();
 }
 
 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
-  ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
-  ALOGD_IF(TRACE,
-           "ConsumerQueueChannel::OnConsumerQueueImportBuffers: "
-           "pending_buffer_slots=%zu",
+  ATRACE_NAME(__FUNCTION__);
+  ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__,
            pending_buffer_slots_.size());
 
   // Indicate this is a silent queue that will not import buffers.
@@ -108,30 +116,30 @@
     return ErrorStatus(EBADR);
 
   while (!pending_buffer_slots_.empty()) {
-    auto producer_channel = pending_buffer_slots_.front().first.lock();
-    size_t producer_slot = pending_buffer_slots_.front().second;
+    auto producer_channel =
+        pending_buffer_slots_.front().producer_channel.lock();
+    size_t producer_slot = pending_buffer_slots_.front().producer_slot;
+    uint64_t consumer_state_mask =
+        pending_buffer_slots_.front().consumer_state_mask;
     pending_buffer_slots_.pop();
 
     // It's possible that the producer channel has expired. When this occurs,
     // ignore the producer channel.
     if (producer_channel == nullptr) {
-      ALOGW(
-          "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
-          "channel has already been expired.");
+      ALOGW("%s: producer channel has already been expired.", __FUNCTION__);
       continue;
     }
 
-    auto status = producer_channel->CreateConsumer(message);
+    auto status =
+        producer_channel->CreateConsumer(message, consumer_state_mask);
 
     // If no buffers are imported successfully, clear available and return an
     // error. Otherwise, return all consumer handles already imported
     // successfully, but keep available bits on, so that the client can retry
     // importing remaining consumer buffers.
     if (!status) {
-      ALOGE(
-          "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
-          "consumer: %s",
-          status.GetErrorMessage().c_str());
+      ALOGE("%s: Failed create consumer: %s", __FUNCTION__,
+            status.GetErrorMessage().c_str());
       if (buffer_handles.empty()) {
         ClearAvailable();
         return status.error_status();
diff --git a/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h b/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h
index 8f35437..3a81b03 100644
--- a/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h
+++ b/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h
@@ -3,8 +3,8 @@
 
 #include <queue>
 
-#include <private/dvr/bufferhub_rpc.h>
 #include <private/dvr/buffer_hub.h>
+#include <private/dvr/bufferhub_rpc.h>
 #include <private/dvr/consumer_channel.h>
 #include <private/dvr/producer_queue_channel.h>
 
@@ -28,7 +28,8 @@
   // Called by ProdcuerQueueChannel to notify consumer queue that a new
   // buffer has been allocated.
   void RegisterNewBuffer(
-      const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot);
+      const std::shared_ptr<ProducerChannel>& producer_channel,
+      size_t producer_slot);
 
   // Called after clients been signaled by service that new buffer has been
   // allocated. Clients uses kOpConsumerQueueImportBuffers to import new
@@ -40,14 +41,29 @@
   void OnProducerClosed();
 
  private:
+  // Data structure to store relavant info of a newly allocated producer buffer
+  // so that consumer channel and buffer can be created later.
+  struct PendingBuffer {
+    PendingBuffer(std::shared_ptr<ProducerChannel> channel, size_t slot,
+                  uint64_t mask) {
+      producer_channel = channel;
+      producer_slot = slot;
+      consumer_state_mask = mask;
+    }
+    PendingBuffer() = delete;
+
+    std::weak_ptr<ProducerChannel> producer_channel;
+    size_t producer_slot;
+    uint64_t consumer_state_mask;
+  };
+
   std::shared_ptr<ProducerQueueChannel> GetProducer() const;
 
   // Pointer to the producer channel.
   std::weak_ptr<Channel> producer_;
 
   // Tracks newly allocated buffer producers along with it's slot number.
-  std::queue<std::pair<std::weak_ptr<ProducerChannel>, size_t>>
-      pending_buffer_slots_;
+  std::queue<PendingBuffer> pending_buffer_slots_;
 
   // Tracks how many buffers have this queue imported.
   size_t capacity_;
diff --git a/services/vr/bufferhubd/include/private/dvr/producer_channel.h b/services/vr/bufferhubd/include/private/dvr/producer_channel.h
index 5868b09..3ad9c70 100644
--- a/services/vr/bufferhubd/include/private/dvr/producer_channel.h
+++ b/services/vr/bufferhubd/include/private/dvr/producer_channel.h
@@ -53,7 +53,9 @@
 
   BufferDescription<BorrowedHandle> GetBuffer(uint64_t client_state_mask);
 
-  pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message);
+  pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message,
+                                                  uint64_t consumer_state_mask);
+  pdx::Status<uint64_t> CreateConsumerStateMask();
   pdx::Status<RemoteChannelHandle> OnNewConsumer(Message& message);
 
   pdx::Status<LocalFence> OnConsumerAcquire(Message& message);
@@ -93,7 +95,7 @@
   LocalFence post_fence_;
   LocalFence returned_fence_;
   size_t user_metadata_size_;  // size of user requested buffer buffer size.
-  size_t metadata_buf_size_;  // size of the ion buffer that holds metadata.
+  size_t metadata_buf_size_;   // size of the ion buffer that holds metadata.
 
   pdx::LocalHandle acquire_fence_fd_;
   pdx::LocalHandle release_fence_fd_;
@@ -111,6 +113,10 @@
   pdx::Status<void> OnProducerPost(Message& message, LocalFence acquire_fence);
   pdx::Status<LocalFence> OnProducerGain(Message& message);
 
+  // Remove consumer from atomics in shared memory based on consumer_state_mask.
+  // This function is used for clean up for failures in CreateConsumer method.
+  void RemoveConsumerClientMask(uint64_t consumer_state_mask);
+
   ProducerChannel(const ProducerChannel&) = delete;
   void operator=(const ProducerChannel&) = delete;
 };
diff --git a/services/vr/bufferhubd/producer_channel.cpp b/services/vr/bufferhubd/producer_channel.cpp
index 397c0ae..c6e8ea9 100644
--- a/services/vr/bufferhubd/producer_channel.cpp
+++ b/services/vr/bufferhubd/producer_channel.cpp
@@ -248,21 +248,7 @@
   return {GetBuffer(BufferHubDefs::kFirstClientBitMask)};
 }
 
-Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) {
-  ATRACE_NAME("ProducerChannel::CreateConsumer");
-  ALOGD_IF(TRACE,
-           "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d",
-           buffer_id(), producer_owns_);
-
-  int channel_id;
-  auto status = message.PushChannel(0, nullptr, &channel_id);
-  if (!status) {
-    ALOGE(
-        "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s",
-        status.GetErrorMessage().c_str());
-    return ErrorStatus(ENOMEM);
-  }
-
+Status<uint64_t> ProducerChannel::CreateConsumerStateMask() {
   // Try find the next consumer state bit which has not been claimed by any
   // consumer yet.
   // memory_order_acquire is chosen here because all writes in other threads
@@ -277,7 +263,6 @@
         "consumers per producer: 63.");
     return ErrorStatus(E2BIG);
   }
-
   uint64_t updated_active_clients_bit_mask =
       current_active_clients_bit_mask | client_state_mask;
   // Set the updated value only if the current value stays the same as what was
@@ -286,28 +271,71 @@
   // thread, and the modification will be visible in other threads that acquire
   // active_clients_bit_mask_. If the comparison fails, load the result of
   // all writes from all threads to updated_active_clients_bit_mask.
-  if (!active_clients_bit_mask_->compare_exchange_weak(
-          current_active_clients_bit_mask, updated_active_clients_bit_mask,
-          std::memory_order_acq_rel, std::memory_order_acquire)) {
+  // Keep on finding the next available slient state mask until succeed or out
+  // of memory.
+  while (!active_clients_bit_mask_->compare_exchange_weak(
+      current_active_clients_bit_mask, updated_active_clients_bit_mask,
+      std::memory_order_acq_rel, std::memory_order_acquire)) {
     ALOGE("Current active clients bit mask is changed to %" PRIx64
-          ", which was expected to be %" PRIx64 ".",
+          ", which was expected to be %" PRIx64
+          ". Trying to generate a new client state mask to resolve race "
+          "condition.",
           updated_active_clients_bit_mask, current_active_clients_bit_mask);
-    return ErrorStatus(EBUSY);
+    client_state_mask = BufferHubDefs::FindNextAvailableClientStateMask(
+        current_active_clients_bit_mask | orphaned_consumer_bit_mask_);
+    if (client_state_mask == 0ULL) {
+      ALOGE(
+          "ProducerChannel::CreateConsumer: reached the maximum mumber of "
+          "consumers per producer: 63.");
+      return ErrorStatus(E2BIG);
+    }
+    updated_active_clients_bit_mask =
+        current_active_clients_bit_mask | client_state_mask;
   }
 
-  auto consumer =
-      std::make_shared<ConsumerChannel>(service(), buffer_id(), channel_id,
-                                        client_state_mask, shared_from_this());
+  return {client_state_mask};
+}
+
+void ProducerChannel::RemoveConsumerClientMask(uint64_t consumer_state_mask) {
+  // Clear up the buffer state and fence state in case there is already
+  // something there due to possible race condition between producer post and
+  // consumer failed to create channel.
+  buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
+  fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
+
+  // Restore the consumer state bit and make it visible in other threads that
+  // acquire the active_clients_bit_mask_.
+  active_clients_bit_mask_->fetch_and(~consumer_state_mask,
+                                      std::memory_order_release);
+}
+
+Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(
+    Message& message, uint64_t consumer_state_mask) {
+  ATRACE_NAME("ProducerChannel::CreateConsumer");
+  ALOGD_IF(TRACE,
+           "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d",
+           buffer_id(), producer_owns_);
+
+  int channel_id;
+  auto status = message.PushChannel(0, nullptr, &channel_id);
+  if (!status) {
+    ALOGE(
+        "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s",
+        status.GetErrorMessage().c_str());
+    RemoveConsumerClientMask(consumer_state_mask);
+    return ErrorStatus(ENOMEM);
+  }
+
+  auto consumer = std::make_shared<ConsumerChannel>(
+      service(), buffer_id(), channel_id, consumer_state_mask,
+      shared_from_this());
   const auto channel_status = service()->SetChannel(channel_id, consumer);
   if (!channel_status) {
     ALOGE(
         "ProducerChannel::CreateConsumer: failed to set new consumer channel: "
         "%s",
         channel_status.GetErrorMessage().c_str());
-    // Restore the consumer state bit and make it visible in other threads that
-    // acquire the active_clients_bit_mask_.
-    active_clients_bit_mask_->fetch_and(~client_state_mask,
-                                        std::memory_order_release);
+    RemoveConsumerClientMask(consumer_state_mask);
     return ErrorStatus(ENOMEM);
   }
 
@@ -327,7 +355,11 @@
 Status<RemoteChannelHandle> ProducerChannel::OnNewConsumer(Message& message) {
   ATRACE_NAME("ProducerChannel::OnNewConsumer");
   ALOGD_IF(TRACE, "ProducerChannel::OnNewConsumer: buffer_id=%d", buffer_id());
-  return CreateConsumer(message);
+  auto status = CreateConsumerStateMask();
+  if (!status.ok()) {
+    return status.error_status();
+  }
+  return CreateConsumer(message, /*consumer_state_mask=*/status.get());
 }
 
 Status<void> ProducerChannel::OnProducerPost(Message&,