Create consumer state mask(s) immediately after producer allocate new buffer.

No functionality change in the current use cases.

Create consumer state mask(s) for newly allocated buffer in producer
queue for all connected consumer channels immediately before signaling
the consumer channels that there is a new buffer allocated in the queue.

Previously, the consumer queue imports newly allocated buffer upon
dequeue. During importing the buffer, after the IPC, bufferhubd does
two things:
1. create consumer state mask
2. create consumer channel

This changelist moves the first step before the IPC, moving it right
after the buffer is allocated in the producer queue. This changelist
does not affect the second step.

Test: AHardwareBufferTest BufferHubBuffer_test BufferHubMetadata_test
buffer_hub_binder_service-test buffer_hub_queue_producer-test
libsensor_test vrflinger_test buffer_hub-test
buffer_hub_queue-test dvr_buffer_queue-test dvr_api-test
dvr_display-Test

Bug: 119112218

Change-Id: I313e7ea98fcb555e08560e4c7e9bf295e40e1f27
diff --git a/services/vr/bufferhubd/consumer_queue_channel.cpp b/services/vr/bufferhubd/consumer_queue_channel.cpp
index 74b549d..5d7d4e9 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.cpp
+++ b/services/vr/bufferhubd/consumer_queue_channel.cpp
@@ -80,27 +80,35 @@
 }
 
 void ConsumerQueueChannel::RegisterNewBuffer(
-    const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
-  ALOGD_IF(TRACE,
-           "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d "
-           "slot=%zu silent=%d",
-           buffer_id(), producer_channel->buffer_id(), slot, silent_);
+    const std::shared_ptr<ProducerChannel>& producer_channel,
+    size_t producer_slot) {
+  ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d",
+           __FUNCTION__, buffer_id(), producer_channel->buffer_id(),
+           producer_slot, silent_);
   // Only register buffers if the queue is not silent.
-  if (!silent_) {
-    pending_buffer_slots_.emplace(producer_channel, slot);
-
-    // Signal the client that there is new buffer available.
-    SignalAvailable();
+  if (silent_) {
+    return;
   }
+
+  auto status = producer_channel->CreateConsumerStateMask();
+  if (!status.ok()) {
+    ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__,
+          status.GetErrorMessage().c_str());
+    return;
+  }
+  uint64_t consumer_state_mask = status.get();
+
+  pending_buffer_slots_.emplace(producer_channel, producer_slot,
+                                consumer_state_mask);
+  // Signal the client that there is new buffer available.
+  SignalAvailable();
 }
 
 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
-  ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
-  ALOGD_IF(TRACE,
-           "ConsumerQueueChannel::OnConsumerQueueImportBuffers: "
-           "pending_buffer_slots=%zu",
+  ATRACE_NAME(__FUNCTION__);
+  ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__,
            pending_buffer_slots_.size());
 
   // Indicate this is a silent queue that will not import buffers.
@@ -108,30 +116,30 @@
     return ErrorStatus(EBADR);
 
   while (!pending_buffer_slots_.empty()) {
-    auto producer_channel = pending_buffer_slots_.front().first.lock();
-    size_t producer_slot = pending_buffer_slots_.front().second;
+    auto producer_channel =
+        pending_buffer_slots_.front().producer_channel.lock();
+    size_t producer_slot = pending_buffer_slots_.front().producer_slot;
+    uint64_t consumer_state_mask =
+        pending_buffer_slots_.front().consumer_state_mask;
     pending_buffer_slots_.pop();
 
     // It's possible that the producer channel has expired. When this occurs,
     // ignore the producer channel.
     if (producer_channel == nullptr) {
-      ALOGW(
-          "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
-          "channel has already been expired.");
+      ALOGW("%s: producer channel has already been expired.", __FUNCTION__);
       continue;
     }
 
-    auto status = producer_channel->CreateConsumer(message);
+    auto status =
+        producer_channel->CreateConsumer(message, consumer_state_mask);
 
     // If no buffers are imported successfully, clear available and return an
     // error. Otherwise, return all consumer handles already imported
     // successfully, but keep available bits on, so that the client can retry
     // importing remaining consumer buffers.
     if (!status) {
-      ALOGE(
-          "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
-          "consumer: %s",
-          status.GetErrorMessage().c_str());
+      ALOGE("%s: Failed create consumer: %s", __FUNCTION__,
+            status.GetErrorMessage().c_str());
       if (buffer_handles.empty()) {
         ClearAvailable();
         return status.error_status();
diff --git a/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h b/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h
index 8f35437..3a81b03 100644
--- a/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h
+++ b/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h
@@ -3,8 +3,8 @@
 
 #include <queue>
 
-#include <private/dvr/bufferhub_rpc.h>
 #include <private/dvr/buffer_hub.h>
+#include <private/dvr/bufferhub_rpc.h>
 #include <private/dvr/consumer_channel.h>
 #include <private/dvr/producer_queue_channel.h>
 
@@ -28,7 +28,8 @@
   // Called by ProdcuerQueueChannel to notify consumer queue that a new
   // buffer has been allocated.
   void RegisterNewBuffer(
-      const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot);
+      const std::shared_ptr<ProducerChannel>& producer_channel,
+      size_t producer_slot);
 
   // Called after clients been signaled by service that new buffer has been
   // allocated. Clients uses kOpConsumerQueueImportBuffers to import new
@@ -40,14 +41,29 @@
   void OnProducerClosed();
 
  private:
+  // Data structure to store relavant info of a newly allocated producer buffer
+  // so that consumer channel and buffer can be created later.
+  struct PendingBuffer {
+    PendingBuffer(std::shared_ptr<ProducerChannel> channel, size_t slot,
+                  uint64_t mask) {
+      producer_channel = channel;
+      producer_slot = slot;
+      consumer_state_mask = mask;
+    }
+    PendingBuffer() = delete;
+
+    std::weak_ptr<ProducerChannel> producer_channel;
+    size_t producer_slot;
+    uint64_t consumer_state_mask;
+  };
+
   std::shared_ptr<ProducerQueueChannel> GetProducer() const;
 
   // Pointer to the producer channel.
   std::weak_ptr<Channel> producer_;
 
   // Tracks newly allocated buffer producers along with it's slot number.
-  std::queue<std::pair<std::weak_ptr<ProducerChannel>, size_t>>
-      pending_buffer_slots_;
+  std::queue<PendingBuffer> pending_buffer_slots_;
 
   // Tracks how many buffers have this queue imported.
   size_t capacity_;
diff --git a/services/vr/bufferhubd/include/private/dvr/producer_channel.h b/services/vr/bufferhubd/include/private/dvr/producer_channel.h
index 5868b09..3ad9c70 100644
--- a/services/vr/bufferhubd/include/private/dvr/producer_channel.h
+++ b/services/vr/bufferhubd/include/private/dvr/producer_channel.h
@@ -53,7 +53,9 @@
 
   BufferDescription<BorrowedHandle> GetBuffer(uint64_t client_state_mask);
 
-  pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message);
+  pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message,
+                                                  uint64_t consumer_state_mask);
+  pdx::Status<uint64_t> CreateConsumerStateMask();
   pdx::Status<RemoteChannelHandle> OnNewConsumer(Message& message);
 
   pdx::Status<LocalFence> OnConsumerAcquire(Message& message);
@@ -93,7 +95,7 @@
   LocalFence post_fence_;
   LocalFence returned_fence_;
   size_t user_metadata_size_;  // size of user requested buffer buffer size.
-  size_t metadata_buf_size_;  // size of the ion buffer that holds metadata.
+  size_t metadata_buf_size_;   // size of the ion buffer that holds metadata.
 
   pdx::LocalHandle acquire_fence_fd_;
   pdx::LocalHandle release_fence_fd_;
@@ -111,6 +113,10 @@
   pdx::Status<void> OnProducerPost(Message& message, LocalFence acquire_fence);
   pdx::Status<LocalFence> OnProducerGain(Message& message);
 
+  // Remove consumer from atomics in shared memory based on consumer_state_mask.
+  // This function is used for clean up for failures in CreateConsumer method.
+  void RemoveConsumerClientMask(uint64_t consumer_state_mask);
+
   ProducerChannel(const ProducerChannel&) = delete;
   void operator=(const ProducerChannel&) = delete;
 };
diff --git a/services/vr/bufferhubd/producer_channel.cpp b/services/vr/bufferhubd/producer_channel.cpp
index 397c0ae..c6e8ea9 100644
--- a/services/vr/bufferhubd/producer_channel.cpp
+++ b/services/vr/bufferhubd/producer_channel.cpp
@@ -248,21 +248,7 @@
   return {GetBuffer(BufferHubDefs::kFirstClientBitMask)};
 }
 
-Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) {
-  ATRACE_NAME("ProducerChannel::CreateConsumer");
-  ALOGD_IF(TRACE,
-           "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d",
-           buffer_id(), producer_owns_);
-
-  int channel_id;
-  auto status = message.PushChannel(0, nullptr, &channel_id);
-  if (!status) {
-    ALOGE(
-        "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s",
-        status.GetErrorMessage().c_str());
-    return ErrorStatus(ENOMEM);
-  }
-
+Status<uint64_t> ProducerChannel::CreateConsumerStateMask() {
   // Try find the next consumer state bit which has not been claimed by any
   // consumer yet.
   // memory_order_acquire is chosen here because all writes in other threads
@@ -277,7 +263,6 @@
         "consumers per producer: 63.");
     return ErrorStatus(E2BIG);
   }
-
   uint64_t updated_active_clients_bit_mask =
       current_active_clients_bit_mask | client_state_mask;
   // Set the updated value only if the current value stays the same as what was
@@ -286,28 +271,71 @@
   // thread, and the modification will be visible in other threads that acquire
   // active_clients_bit_mask_. If the comparison fails, load the result of
   // all writes from all threads to updated_active_clients_bit_mask.
-  if (!active_clients_bit_mask_->compare_exchange_weak(
-          current_active_clients_bit_mask, updated_active_clients_bit_mask,
-          std::memory_order_acq_rel, std::memory_order_acquire)) {
+  // Keep on finding the next available slient state mask until succeed or out
+  // of memory.
+  while (!active_clients_bit_mask_->compare_exchange_weak(
+      current_active_clients_bit_mask, updated_active_clients_bit_mask,
+      std::memory_order_acq_rel, std::memory_order_acquire)) {
     ALOGE("Current active clients bit mask is changed to %" PRIx64
-          ", which was expected to be %" PRIx64 ".",
+          ", which was expected to be %" PRIx64
+          ". Trying to generate a new client state mask to resolve race "
+          "condition.",
           updated_active_clients_bit_mask, current_active_clients_bit_mask);
-    return ErrorStatus(EBUSY);
+    client_state_mask = BufferHubDefs::FindNextAvailableClientStateMask(
+        current_active_clients_bit_mask | orphaned_consumer_bit_mask_);
+    if (client_state_mask == 0ULL) {
+      ALOGE(
+          "ProducerChannel::CreateConsumer: reached the maximum mumber of "
+          "consumers per producer: 63.");
+      return ErrorStatus(E2BIG);
+    }
+    updated_active_clients_bit_mask =
+        current_active_clients_bit_mask | client_state_mask;
   }
 
-  auto consumer =
-      std::make_shared<ConsumerChannel>(service(), buffer_id(), channel_id,
-                                        client_state_mask, shared_from_this());
+  return {client_state_mask};
+}
+
+void ProducerChannel::RemoveConsumerClientMask(uint64_t consumer_state_mask) {
+  // Clear up the buffer state and fence state in case there is already
+  // something there due to possible race condition between producer post and
+  // consumer failed to create channel.
+  buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
+  fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
+
+  // Restore the consumer state bit and make it visible in other threads that
+  // acquire the active_clients_bit_mask_.
+  active_clients_bit_mask_->fetch_and(~consumer_state_mask,
+                                      std::memory_order_release);
+}
+
+Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(
+    Message& message, uint64_t consumer_state_mask) {
+  ATRACE_NAME("ProducerChannel::CreateConsumer");
+  ALOGD_IF(TRACE,
+           "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d",
+           buffer_id(), producer_owns_);
+
+  int channel_id;
+  auto status = message.PushChannel(0, nullptr, &channel_id);
+  if (!status) {
+    ALOGE(
+        "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s",
+        status.GetErrorMessage().c_str());
+    RemoveConsumerClientMask(consumer_state_mask);
+    return ErrorStatus(ENOMEM);
+  }
+
+  auto consumer = std::make_shared<ConsumerChannel>(
+      service(), buffer_id(), channel_id, consumer_state_mask,
+      shared_from_this());
   const auto channel_status = service()->SetChannel(channel_id, consumer);
   if (!channel_status) {
     ALOGE(
         "ProducerChannel::CreateConsumer: failed to set new consumer channel: "
         "%s",
         channel_status.GetErrorMessage().c_str());
-    // Restore the consumer state bit and make it visible in other threads that
-    // acquire the active_clients_bit_mask_.
-    active_clients_bit_mask_->fetch_and(~client_state_mask,
-                                        std::memory_order_release);
+    RemoveConsumerClientMask(consumer_state_mask);
     return ErrorStatus(ENOMEM);
   }
 
@@ -327,7 +355,11 @@
 Status<RemoteChannelHandle> ProducerChannel::OnNewConsumer(Message& message) {
   ATRACE_NAME("ProducerChannel::OnNewConsumer");
   ALOGD_IF(TRACE, "ProducerChannel::OnNewConsumer: buffer_id=%d", buffer_id());
-  return CreateConsumer(message);
+  auto status = CreateConsumerStateMask();
+  if (!status.ok()) {
+    return status.error_status();
+  }
+  return CreateConsumer(message, /*consumer_state_mask=*/status.get());
 }
 
 Status<void> ProducerChannel::OnProducerPost(Message&,