Add support for consumer queue initial import and hangup.
- Add support for consumer queues to import buffers that are created
before the consumer queue is created, making multi-consumer queue
patterns possible. This is essential for VrFlinger operation.
- Add support for notifying consumer queues when the producer queue
hangs up.
- Correct the epoll event loop to check for hangups even when buffers
are available.
- Add method to retrieve the event fd from a queue.
- Add trace logging and minor cleanup.
- Improve bufferhubd dump state output.
Bug: 36401174
Test: build; bufferhub tests pass.
Change-Id: Idd6f38a3341c048192062734e288d11de48bc4d4
diff --git a/services/vr/bufferhubd/buffer_hub.cpp b/services/vr/bufferhubd/buffer_hub.cpp
index 8093b6b..debcc73 100644
--- a/services/vr/bufferhubd/buffer_hub.cpp
+++ b/services/vr/bufferhubd/buffer_hub.cpp
@@ -132,7 +132,7 @@
stream << std::endl;
stream << "Active Producer Queues:\n";
stream << std::right << std::setw(6) << "Id";
- stream << std::right << std::setw(12) << " Allocated";
+ stream << std::right << std::setw(12) << " Capacity";
stream << std::right << std::setw(12) << " Consumers";
stream << " UsageSetMask";
stream << " UsageClearMask";
diff --git a/services/vr/bufferhubd/consumer_queue_channel.cpp b/services/vr/bufferhubd/consumer_queue_channel.cpp
index 7422751..f447e00 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.cpp
+++ b/services/vr/bufferhubd/consumer_queue_channel.cpp
@@ -8,6 +8,7 @@
using android::pdx::RemoteChannelHandle;
using android::pdx::Status;
using android::pdx::rpc::DispatchRemoteMethod;
+using android::pdx::rpc::RemoteMethodError;
namespace android {
namespace dvr {
@@ -33,8 +34,10 @@
bool ConsumerQueueChannel::HandleMessage(Message& message) {
ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
auto producer = GetProducer();
- if (!producer)
- REPLY_ERROR_RETURN(message, EPIPE, true);
+ if (!producer) {
+ RemoteMethodError(message, EPIPE);
+ return true;
+ }
switch (message.GetOp()) {
case BufferHubRPC::CreateConsumerQueue::Opcode:
@@ -79,6 +82,9 @@
void ConsumerQueueChannel::RegisterNewBuffer(
const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
+ ALOGD_IF(TRACE,
+ "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu",
+ producer_channel->buffer_id(), slot);
pending_buffer_slots_.emplace(producer_channel, slot);
// Signal the client that there is new buffer available throught POLLIN.
@@ -89,7 +95,8 @@
ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
- ALOGD(
+ ALOGD_IF(
+ TRACE,
"ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to "
"import: %zu",
pending_buffer_slots_.size());
@@ -134,5 +141,12 @@
return {std::move(buffer_handles)};
}
+void ConsumerQueueChannel::OnProducerClosed() {
+ ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
+ buffer_id());
+ producer_.reset();
+ Hangup();
+}
+
} // namespace dvr
} // namespace android
diff --git a/services/vr/bufferhubd/consumer_queue_channel.h b/services/vr/bufferhubd/consumer_queue_channel.h
index e1005e4..aa3f531 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.h
+++ b/services/vr/bufferhubd/consumer_queue_channel.h
@@ -39,6 +39,8 @@
pdx::Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnConsumerQueueImportBuffers(Message& message);
+ void OnProducerClosed();
+
private:
std::shared_ptr<ProducerQueueChannel> GetProducer() const;
diff --git a/services/vr/bufferhubd/producer_channel.cpp b/services/vr/bufferhubd/producer_channel.cpp
index e452d04..398aa12 100644
--- a/services/vr/bufferhubd/producer_channel.cpp
+++ b/services/vr/bufferhubd/producer_channel.cpp
@@ -199,10 +199,15 @@
return ErrorStatus(EBUSY);
}
- if (meta_size_bytes_ != metadata.size())
+ if (meta_size_bytes_ != metadata.size()) {
+ ALOGD_IF(TRACE,
+ "ProducerChannel::OnProducerPost: Expected meta_size_bytes=%zu "
+ "got size=%zu",
+ meta_size_bytes_, metadata.size());
return ErrorStatus(EINVAL);
- std::copy(metadata.begin(), metadata.end(), meta_.get());
+ }
+ std::copy(metadata.begin(), metadata.end(), meta_.get());
post_fence_ = std::move(acquire_fence);
producer_owns_ = false;
diff --git a/services/vr/bufferhubd/producer_queue_channel.cpp b/services/vr/bufferhubd/producer_queue_channel.cpp
index bd8eed8..843277e 100644
--- a/services/vr/bufferhubd/producer_queue_channel.cpp
+++ b/services/vr/bufferhubd/producer_queue_channel.cpp
@@ -26,7 +26,12 @@
*error = 0;
}
-ProducerQueueChannel::~ProducerQueueChannel() {}
+ProducerQueueChannel::~ProducerQueueChannel() {
+ ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
+ buffer_id());
+ for (auto* consumer : consumer_channels_)
+ consumer->OnProducerClosed();
+}
/* static */
Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
@@ -108,13 +113,21 @@
return ErrorStatus(ENOMEM);
}
- const auto channel_status = service()->SetChannel(
- channel_id, std::make_shared<ConsumerQueueChannel>(
- service(), buffer_id(), channel_id, shared_from_this()));
+ auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
+ service(), buffer_id(), channel_id, shared_from_this());
+
+ // Register the existing buffers with the new consumer queue.
+ for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
+ if (auto buffer = buffers_[slot].lock())
+ consumer_queue_channel->RegisterNewBuffer(buffer, slot);
+ }
+
+ const auto channel_status =
+ service()->SetChannel(channel_id, consumer_queue_channel);
if (!channel_status) {
ALOGE(
- "ProducerQueueChannel::OnCreateConsumerQueue: failed to set new "
- "consumer channel: %s",
+ "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
+ "%s",
channel_status.GetErrorMessage().c_str());
return ErrorStatus(ENOMEM);
}
@@ -254,7 +267,7 @@
capacity_++;
// Notify each consumer channel about the new buffer.
- for (auto consumer_channel : consumer_channels_) {
+ for (auto* consumer_channel : consumer_channels_) {
ALOGD(
"ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
"buffer, buffer_id=%d",