Merge "Create consumer state mask(s) immediately after producer allocate new buffer."
diff --git a/services/vr/bufferhubd/consumer_queue_channel.cpp b/services/vr/bufferhubd/consumer_queue_channel.cpp
index 74b549d..5d7d4e9 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.cpp
+++ b/services/vr/bufferhubd/consumer_queue_channel.cpp
@@ -80,27 +80,35 @@
}
void ConsumerQueueChannel::RegisterNewBuffer(
- const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
- ALOGD_IF(TRACE,
- "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d "
- "slot=%zu silent=%d",
- buffer_id(), producer_channel->buffer_id(), slot, silent_);
+ const std::shared_ptr<ProducerChannel>& producer_channel,
+ size_t producer_slot) {
+ ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d",
+ __FUNCTION__, buffer_id(), producer_channel->buffer_id(),
+ producer_slot, silent_);
// Only register buffers if the queue is not silent.
- if (!silent_) {
- pending_buffer_slots_.emplace(producer_channel, slot);
-
- // Signal the client that there is new buffer available.
- SignalAvailable();
+ if (silent_) {
+ return;
}
+
+ auto status = producer_channel->CreateConsumerStateMask();
+ if (!status.ok()) {
+ ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__,
+ status.GetErrorMessage().c_str());
+ return;
+ }
+ uint64_t consumer_state_mask = status.get();
+
+ pending_buffer_slots_.emplace(producer_channel, producer_slot,
+ consumer_state_mask);
+ // Signal the client that there is new buffer available.
+ SignalAvailable();
}
Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
- ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
- ALOGD_IF(TRACE,
- "ConsumerQueueChannel::OnConsumerQueueImportBuffers: "
- "pending_buffer_slots=%zu",
+ ATRACE_NAME(__FUNCTION__);
+ ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__,
pending_buffer_slots_.size());
// Indicate this is a silent queue that will not import buffers.
@@ -108,30 +116,30 @@
return ErrorStatus(EBADR);
while (!pending_buffer_slots_.empty()) {
- auto producer_channel = pending_buffer_slots_.front().first.lock();
- size_t producer_slot = pending_buffer_slots_.front().second;
+ auto producer_channel =
+ pending_buffer_slots_.front().producer_channel.lock();
+ size_t producer_slot = pending_buffer_slots_.front().producer_slot;
+ uint64_t consumer_state_mask =
+ pending_buffer_slots_.front().consumer_state_mask;
pending_buffer_slots_.pop();
// It's possible that the producer channel has expired. When this occurs,
// ignore the producer channel.
if (producer_channel == nullptr) {
- ALOGW(
- "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
- "channel has already been expired.");
+ ALOGW("%s: producer channel has already been expired.", __FUNCTION__);
continue;
}
- auto status = producer_channel->CreateConsumer(message);
+ auto status =
+ producer_channel->CreateConsumer(message, consumer_state_mask);
// If no buffers are imported successfully, clear available and return an
// error. Otherwise, return all consumer handles already imported
// successfully, but keep available bits on, so that the client can retry
// importing remaining consumer buffers.
if (!status) {
- ALOGE(
- "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
- "consumer: %s",
- status.GetErrorMessage().c_str());
+ ALOGE("%s: Failed create consumer: %s", __FUNCTION__,
+ status.GetErrorMessage().c_str());
if (buffer_handles.empty()) {
ClearAvailable();
return status.error_status();
diff --git a/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h b/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h
index 8f35437..3a81b03 100644
--- a/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h
+++ b/services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h
@@ -3,8 +3,8 @@
#include <queue>
-#include <private/dvr/bufferhub_rpc.h>
#include <private/dvr/buffer_hub.h>
+#include <private/dvr/bufferhub_rpc.h>
#include <private/dvr/consumer_channel.h>
#include <private/dvr/producer_queue_channel.h>
@@ -28,7 +28,8 @@
// Called by ProdcuerQueueChannel to notify consumer queue that a new
// buffer has been allocated.
void RegisterNewBuffer(
- const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot);
+ const std::shared_ptr<ProducerChannel>& producer_channel,
+ size_t producer_slot);
// Called after clients been signaled by service that new buffer has been
// allocated. Clients uses kOpConsumerQueueImportBuffers to import new
@@ -40,14 +41,29 @@
void OnProducerClosed();
private:
+ // Data structure to store relavant info of a newly allocated producer buffer
+ // so that consumer channel and buffer can be created later.
+ struct PendingBuffer {
+ PendingBuffer(std::shared_ptr<ProducerChannel> channel, size_t slot,
+ uint64_t mask) {
+ producer_channel = channel;
+ producer_slot = slot;
+ consumer_state_mask = mask;
+ }
+ PendingBuffer() = delete;
+
+ std::weak_ptr<ProducerChannel> producer_channel;
+ size_t producer_slot;
+ uint64_t consumer_state_mask;
+ };
+
std::shared_ptr<ProducerQueueChannel> GetProducer() const;
// Pointer to the producer channel.
std::weak_ptr<Channel> producer_;
// Tracks newly allocated buffer producers along with it's slot number.
- std::queue<std::pair<std::weak_ptr<ProducerChannel>, size_t>>
- pending_buffer_slots_;
+ std::queue<PendingBuffer> pending_buffer_slots_;
// Tracks how many buffers have this queue imported.
size_t capacity_;
diff --git a/services/vr/bufferhubd/include/private/dvr/producer_channel.h b/services/vr/bufferhubd/include/private/dvr/producer_channel.h
index 5868b09..3ad9c70 100644
--- a/services/vr/bufferhubd/include/private/dvr/producer_channel.h
+++ b/services/vr/bufferhubd/include/private/dvr/producer_channel.h
@@ -53,7 +53,9 @@
BufferDescription<BorrowedHandle> GetBuffer(uint64_t client_state_mask);
- pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message);
+ pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message,
+ uint64_t consumer_state_mask);
+ pdx::Status<uint64_t> CreateConsumerStateMask();
pdx::Status<RemoteChannelHandle> OnNewConsumer(Message& message);
pdx::Status<LocalFence> OnConsumerAcquire(Message& message);
@@ -93,7 +95,7 @@
LocalFence post_fence_;
LocalFence returned_fence_;
size_t user_metadata_size_; // size of user requested buffer buffer size.
- size_t metadata_buf_size_; // size of the ion buffer that holds metadata.
+ size_t metadata_buf_size_; // size of the ion buffer that holds metadata.
pdx::LocalHandle acquire_fence_fd_;
pdx::LocalHandle release_fence_fd_;
@@ -111,6 +113,10 @@
pdx::Status<void> OnProducerPost(Message& message, LocalFence acquire_fence);
pdx::Status<LocalFence> OnProducerGain(Message& message);
+ // Remove consumer from atomics in shared memory based on consumer_state_mask.
+ // This function is used for clean up for failures in CreateConsumer method.
+ void RemoveConsumerClientMask(uint64_t consumer_state_mask);
+
ProducerChannel(const ProducerChannel&) = delete;
void operator=(const ProducerChannel&) = delete;
};
diff --git a/services/vr/bufferhubd/producer_channel.cpp b/services/vr/bufferhubd/producer_channel.cpp
index 397c0ae..c6e8ea9 100644
--- a/services/vr/bufferhubd/producer_channel.cpp
+++ b/services/vr/bufferhubd/producer_channel.cpp
@@ -248,21 +248,7 @@
return {GetBuffer(BufferHubDefs::kFirstClientBitMask)};
}
-Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) {
- ATRACE_NAME("ProducerChannel::CreateConsumer");
- ALOGD_IF(TRACE,
- "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d",
- buffer_id(), producer_owns_);
-
- int channel_id;
- auto status = message.PushChannel(0, nullptr, &channel_id);
- if (!status) {
- ALOGE(
- "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s",
- status.GetErrorMessage().c_str());
- return ErrorStatus(ENOMEM);
- }
-
+Status<uint64_t> ProducerChannel::CreateConsumerStateMask() {
// Try find the next consumer state bit which has not been claimed by any
// consumer yet.
// memory_order_acquire is chosen here because all writes in other threads
@@ -277,7 +263,6 @@
"consumers per producer: 63.");
return ErrorStatus(E2BIG);
}
-
uint64_t updated_active_clients_bit_mask =
current_active_clients_bit_mask | client_state_mask;
// Set the updated value only if the current value stays the same as what was
@@ -286,28 +271,71 @@
// thread, and the modification will be visible in other threads that acquire
// active_clients_bit_mask_. If the comparison fails, load the result of
// all writes from all threads to updated_active_clients_bit_mask.
- if (!active_clients_bit_mask_->compare_exchange_weak(
- current_active_clients_bit_mask, updated_active_clients_bit_mask,
- std::memory_order_acq_rel, std::memory_order_acquire)) {
+ // Keep on finding the next available slient state mask until succeed or out
+ // of memory.
+ while (!active_clients_bit_mask_->compare_exchange_weak(
+ current_active_clients_bit_mask, updated_active_clients_bit_mask,
+ std::memory_order_acq_rel, std::memory_order_acquire)) {
ALOGE("Current active clients bit mask is changed to %" PRIx64
- ", which was expected to be %" PRIx64 ".",
+ ", which was expected to be %" PRIx64
+ ". Trying to generate a new client state mask to resolve race "
+ "condition.",
updated_active_clients_bit_mask, current_active_clients_bit_mask);
- return ErrorStatus(EBUSY);
+ client_state_mask = BufferHubDefs::FindNextAvailableClientStateMask(
+ current_active_clients_bit_mask | orphaned_consumer_bit_mask_);
+ if (client_state_mask == 0ULL) {
+ ALOGE(
+ "ProducerChannel::CreateConsumer: reached the maximum mumber of "
+ "consumers per producer: 63.");
+ return ErrorStatus(E2BIG);
+ }
+ updated_active_clients_bit_mask =
+ current_active_clients_bit_mask | client_state_mask;
}
- auto consumer =
- std::make_shared<ConsumerChannel>(service(), buffer_id(), channel_id,
- client_state_mask, shared_from_this());
+ return {client_state_mask};
+}
+
+void ProducerChannel::RemoveConsumerClientMask(uint64_t consumer_state_mask) {
+ // Clear up the buffer state and fence state in case there is already
+ // something there due to possible race condition between producer post and
+ // consumer failed to create channel.
+ buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
+ fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
+
+ // Restore the consumer state bit and make it visible in other threads that
+ // acquire the active_clients_bit_mask_.
+ active_clients_bit_mask_->fetch_and(~consumer_state_mask,
+ std::memory_order_release);
+}
+
+Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(
+ Message& message, uint64_t consumer_state_mask) {
+ ATRACE_NAME("ProducerChannel::CreateConsumer");
+ ALOGD_IF(TRACE,
+ "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d",
+ buffer_id(), producer_owns_);
+
+ int channel_id;
+ auto status = message.PushChannel(0, nullptr, &channel_id);
+ if (!status) {
+ ALOGE(
+ "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s",
+ status.GetErrorMessage().c_str());
+ RemoveConsumerClientMask(consumer_state_mask);
+ return ErrorStatus(ENOMEM);
+ }
+
+ auto consumer = std::make_shared<ConsumerChannel>(
+ service(), buffer_id(), channel_id, consumer_state_mask,
+ shared_from_this());
const auto channel_status = service()->SetChannel(channel_id, consumer);
if (!channel_status) {
ALOGE(
"ProducerChannel::CreateConsumer: failed to set new consumer channel: "
"%s",
channel_status.GetErrorMessage().c_str());
- // Restore the consumer state bit and make it visible in other threads that
- // acquire the active_clients_bit_mask_.
- active_clients_bit_mask_->fetch_and(~client_state_mask,
- std::memory_order_release);
+ RemoveConsumerClientMask(consumer_state_mask);
return ErrorStatus(ENOMEM);
}
@@ -327,7 +355,11 @@
Status<RemoteChannelHandle> ProducerChannel::OnNewConsumer(Message& message) {
ATRACE_NAME("ProducerChannel::OnNewConsumer");
ALOGD_IF(TRACE, "ProducerChannel::OnNewConsumer: buffer_id=%d", buffer_id());
- return CreateConsumer(message);
+ auto status = CreateConsumerStateMask();
+ if (!status.ok()) {
+ return status.error_status();
+ }
+ return CreateConsumer(message, /*consumer_state_mask=*/status.get());
}
Status<void> ProducerChannel::OnProducerPost(Message&,