Add support for consumer queue initial import and hangup.
- Add support for consumer queues to import buffers that are created
before the consumer queue is created, making multi-consumer queue
patterns possible. This is essential for VrFlinger operation.
- Add support for notifying consumer queues when the producer queue
hangs up.
- Correct the epoll event loop to check for hangups even when buffers
are available.
- Add method to retrieve the event fd from a queue.
- Add trace logging and minor cleanup.
- Improve bufferhubd dump state output.
Bug: 36401174
Test: build; bufferhub tests pass.
Change-Id: Idd6f38a3341c048192062734e288d11de48bc4d4
diff --git a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
index 5b85069..433db42 100644
--- a/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
+++ b/libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp
@@ -87,6 +87,14 @@
return nullptr;
}
+std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
+ if (auto status = CreateConsumerQueueHandle())
+ return std::unique_ptr<ConsumerQueue>(
+ new ConsumerQueue(status.take(), true));
+ else
+ return nullptr;
+}
+
Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
if (!status) {
@@ -103,12 +111,18 @@
bool BufferHubQueue::WaitForBuffers(int timeout) {
std::array<epoll_event, kMaxEvents> events;
- while (count() == 0) {
- int ret = epoll_fd_.Wait(events.data(), events.size(), timeout);
+ // Loop at least once to check for hangups.
+ do {
+ ALOGD_IF(TRACE, "BufferHubQueue::WaitForBuffers: count=%zu capacity=%zu",
+ count(), capacity());
+
+ // If there is already a buffer then just check for hangup without waiting.
+ const int ret = epoll_fd_.Wait(events.data(), events.size(),
+ count() == 0 ? timeout : 0);
if (ret == 0) {
ALOGD_IF(TRACE, "Wait on epoll returns nothing before timeout.");
- return false;
+ return count() != 0;
}
if (ret < 0 && ret != -EINTR) {
@@ -136,9 +150,9 @@
index);
}
}
- }
+ } while (count() == 0 && capacity() > 0 && !hung_up());
- return true;
+ return count() != 0;
}
void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {
@@ -203,6 +217,9 @@
ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
buffer_status.GetErrorMessage().c_str());
}
+ } else if (events & EPOLLHUP) {
+ ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
+ hung_up_ = true;
} else {
ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%d", events);
}
@@ -260,7 +277,7 @@
return 0;
}
-void BufferHubQueue::Enqueue(std::shared_ptr<BufferHubBuffer> buf,
+void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
size_t slot) {
if (count() == capacity_) {
ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
@@ -272,8 +289,7 @@
// the limitation of the RingBuffer we are using. Would be better to refactor
// that.
BufferInfo buffer_info(slot, meta_size_);
- // Swap buffer into vector.
- std::swap(buffer_info.buffer, buf);
+ buffer_info.buffer = buf;
// Swap metadata loaded during onBufferReady into vector.
std::swap(buffer_info.metadata, meta_buffer_tmp_);
@@ -286,7 +302,7 @@
LocalHandle* fence) {
ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);
- if (count() == 0 && !WaitForBuffers(timeout))
+ if (!WaitForBuffers(timeout))
return nullptr;
std::shared_ptr<BufferHubBuffer> buf;
@@ -366,9 +382,8 @@
InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
width, height, format, usage, slice_count, kBufferCount);
if (!status) {
- ALOGE(
- "ProducerQueue::AllocateBuffer failed to create producer buffer "
- "through BufferHub.");
+ ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
+ status.GetErrorMessage().c_str());
return -status.error();
}
@@ -428,14 +443,14 @@
return std::static_pointer_cast<BufferProducer>(buf);
}
-int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
LocalHandle* release_fence) {
auto buffer = std::static_pointer_cast<BufferProducer>(buf);
return buffer->Gain(release_fence);
}
-ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
- : BufferHubQueue(std::move(handle)) {
+ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
+ : BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) {
auto status = ImportQueue();
if (!status) {
ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
@@ -443,8 +458,14 @@
Close(-status.error());
}
- // TODO(b/34387835) Import buffers in case the ProducerQueue we are
- // based on was not empty.
+ auto import_status = ImportBuffers();
+ if (import_status) {
+ ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
+ import_status.get());
+ } else {
+ ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
+ import_status.GetErrorMessage().c_str());
+ }
}
Status<size_t> ConsumerQueue::ImportBuffers() {
@@ -457,6 +478,7 @@
return ErrorStatus(status.error());
}
+ int ret;
int last_error = 0;
int imported_buffers = 0;
@@ -468,7 +490,24 @@
std::unique_ptr<BufferConsumer> buffer_consumer =
BufferConsumer::Import(std::move(buffer_handle_slot.first));
- int ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
+
+ // Setup ignore state before adding buffer to the queue.
+ if (ignore_on_import_) {
+ ALOGD_IF(TRACE,
+ "ConsumerQueue::ImportBuffers: Setting buffer to ignored state: "
+ "buffer_id=%d",
+ buffer_consumer->id());
+ ret = buffer_consumer->SetIgnore(true);
+ if (ret < 0) {
+ ALOGE(
+ "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
+ "imported buffer buffer_id=%d: %s",
+ buffer_consumer->id(), strerror(-ret));
+ last_error = ret;
+ }
+ }
+
+ ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
if (ret < 0) {
ALOGE("ConsumerQueue::ImportBuffers failed to add buffer, ret: %s",
strerror(-ret));
@@ -502,7 +541,7 @@
return nullptr;
}
- if (slot == nullptr || meta == nullptr || acquire_fence == nullptr) {
+ if (slot == nullptr || acquire_fence == nullptr) {
ALOGE(
"ConsumerQueue::Dequeue: Invalid parameter, slot=%p, meta=%p, "
"acquire_fence=%p",
@@ -514,7 +553,7 @@
return std::static_pointer_cast<BufferConsumer>(buf);
}
-int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
LocalHandle* acquire_fence) {
auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
diff --git a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
index 1eafe76..2357bd9 100644
--- a/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
+++ b/libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h
@@ -33,6 +33,11 @@
// a new consumer queue client or nullptr on failure.
std::unique_ptr<ConsumerQueue> CreateConsumerQueue();
+ // Create a new consumer queue that is attached to the producer. This queue
+ // sets each of its imported consumer buffers to the ignored state to avoid
+ // participation in lifecycle events.
+ std::unique_ptr<ConsumerQueue> CreateSilentConsumerQueue();
+
// Return the default buffer width of this buffer queue.
size_t default_width() const { return default_width_; }
@@ -71,9 +76,19 @@
}
}
+ // Returns an fd that signals pending queue events using
+ // EPOLLIN/POLLIN/readible. Either HandleQueueEvents or WaitForBuffers may be
+ // called to handle pending queue events.
+ int queue_fd() const { return epoll_fd_.Get(); }
+
+ // Handles any pending events, returning available buffers to the queue and
+ // reaping disconnected buffers. Returns true if successful, false if an error
+ // occurred.
+ bool HandleQueueEvents() { return WaitForBuffers(0); }
+
// Enqueue a buffer marks buffer to be available (|Gain|'ed for producer
// and |Acquire|'ed for consumer. This is only used for internal bookkeeping.
- void Enqueue(std::shared_ptr<BufferHubBuffer> buf, size_t slot);
+ void Enqueue(const std::shared_ptr<BufferHubBuffer>& buf, size_t slot);
// |BufferHubQueue| will keep track of at most this value of buffers.
static constexpr size_t kMaxQueueCapacity =
@@ -88,6 +103,7 @@
static constexpr int kNoTimeOut = -1;
int id() const { return id_; }
+ bool hung_up() const { return hung_up_; }
protected:
BufferHubQueue(LocalChannelHandle channel);
@@ -121,7 +137,7 @@
void HandleBufferEvent(size_t slot, const epoll_event& event);
void HandleQueueEvent(const epoll_event& event);
- virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+ virtual int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
LocalHandle* fence) = 0;
// Called when a buffer is allocated remotely.
@@ -248,6 +264,12 @@
// Epoll fd used to wait for BufferHub events.
EpollFileDescriptor epoll_fd_;
+ // Flag indicating that the other side hung up. For ProducerQueues this
+ // triggers when BufferHub dies or explicitly closes the queue channel. For
+ // ConsumerQueues this can either mean the same or that the ProducerQueue on
+ // the other end hung up.
+ bool hung_up_{false};
+
// Global id for the queue that is consistent across processes.
int id_;
@@ -261,6 +283,9 @@
static std::unique_ptr<ProducerQueue> Create() {
return BASE::Create(sizeof(Meta));
}
+ static std::unique_ptr<ProducerQueue> Create(size_t meta_size_bytes) {
+ return BASE::Create(meta_size_bytes);
+ }
// Usage bits in |usage_set_mask| will be automatically masked on. Usage bits
// in |usage_clear_mask| will be automatically masked off. Note that
@@ -331,7 +356,7 @@
uint64_t usage_clear_mask, uint64_t usage_deny_set_mask,
uint64_t usage_deny_clear_mask);
- int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+ int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
LocalHandle* release_fence) override;
};
@@ -339,16 +364,22 @@
public:
// Get a buffer consumer. Note that the method doesn't check whether the
// buffer slot has a valid buffer that has been imported already. When no
- // buffer has been imported before it returns |nullptr|; otherwise it returns
- // a shared pointer to a |BufferConsumer|.
+ // buffer has been imported before it returns nullptr; otherwise returns a
+ // shared pointer to a BufferConsumer.
std::shared_ptr<BufferConsumer> GetBuffer(size_t slot) const {
return std::static_pointer_cast<BufferConsumer>(
BufferHubQueue::GetBuffer(slot));
}
- // Import a |ConsumerQueue| from a channel handle.
- static std::unique_ptr<ConsumerQueue> Import(LocalChannelHandle handle) {
- return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle)));
+ // Import a ConsumerQueue from a channel handle. |ignore_on_import| controls
+ // whether or not buffers are set to be ignored when imported. This may be
+ // used to avoid participation in the buffer lifecycle by a consumer queue
+ // that is only used to spawn other consumer queues, such as in an
+ // intermediate service.
+ static std::unique_ptr<ConsumerQueue> Import(LocalChannelHandle handle,
+ bool ignore_on_import = false) {
+ return std::unique_ptr<ConsumerQueue>(
+ new ConsumerQueue(std::move(handle), ignore_on_import));
}
// Import newly created buffers from the service side.
@@ -366,6 +397,10 @@
LocalHandle* acquire_fence) {
return Dequeue(timeout, slot, meta, sizeof(*meta), acquire_fence);
}
+ std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot,
+ LocalHandle* acquire_fence) {
+ return Dequeue(timeout, slot, nullptr, 0, acquire_fence);
+ }
std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta,
size_t meta_size,
@@ -374,7 +409,7 @@
private:
friend BufferHubQueue;
- ConsumerQueue(LocalChannelHandle handle);
+ ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import = false);
// Add a consumer buffer to populate the queue. Once added, a consumer buffer
// is NOT available to use until the producer side |Post| it. |WaitForBuffers|
@@ -382,10 +417,14 @@
// consumer.
int AddBuffer(const std::shared_ptr<BufferConsumer>& buf, size_t slot);
- int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
+ int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
LocalHandle* acquire_fence) override;
Status<void> OnBufferAllocated() override;
+
+ // Flag indicating that imported (consumer) buffers should be ignored when
+ // imported to avoid participating in the buffer ownership flow.
+ bool ignore_on_import_;
};
} // namespace dvr
diff --git a/libs/vr/libdvrcommon/include/private/dvr/epoll_file_descriptor.h b/libs/vr/libdvrcommon/include/private/dvr/epoll_file_descriptor.h
index 91e12c5..099a409 100644
--- a/libs/vr/libdvrcommon/include/private/dvr/epoll_file_descriptor.h
+++ b/libs/vr/libdvrcommon/include/private/dvr/epoll_file_descriptor.h
@@ -52,6 +52,8 @@
return ret;
}
+ int Get() const { return fd_.get(); }
+
private:
base::unique_fd fd_;
};
diff --git a/services/vr/bufferhubd/buffer_hub.cpp b/services/vr/bufferhubd/buffer_hub.cpp
index 8093b6b..debcc73 100644
--- a/services/vr/bufferhubd/buffer_hub.cpp
+++ b/services/vr/bufferhubd/buffer_hub.cpp
@@ -132,7 +132,7 @@
stream << std::endl;
stream << "Active Producer Queues:\n";
stream << std::right << std::setw(6) << "Id";
- stream << std::right << std::setw(12) << " Allocated";
+ stream << std::right << std::setw(12) << " Capacity";
stream << std::right << std::setw(12) << " Consumers";
stream << " UsageSetMask";
stream << " UsageClearMask";
diff --git a/services/vr/bufferhubd/consumer_queue_channel.cpp b/services/vr/bufferhubd/consumer_queue_channel.cpp
index 7422751..f447e00 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.cpp
+++ b/services/vr/bufferhubd/consumer_queue_channel.cpp
@@ -8,6 +8,7 @@
using android::pdx::RemoteChannelHandle;
using android::pdx::Status;
using android::pdx::rpc::DispatchRemoteMethod;
+using android::pdx::rpc::RemoteMethodError;
namespace android {
namespace dvr {
@@ -33,8 +34,10 @@
bool ConsumerQueueChannel::HandleMessage(Message& message) {
ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
auto producer = GetProducer();
- if (!producer)
- REPLY_ERROR_RETURN(message, EPIPE, true);
+ if (!producer) {
+ RemoteMethodError(message, EPIPE);
+ return true;
+ }
switch (message.GetOp()) {
case BufferHubRPC::CreateConsumerQueue::Opcode:
@@ -79,6 +82,9 @@
void ConsumerQueueChannel::RegisterNewBuffer(
const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
+ ALOGD_IF(TRACE,
+ "ConsumerQueueChannel::RegisterNewBuffer: buffer_id=%d slot=%zu",
+ producer_channel->buffer_id(), slot);
pending_buffer_slots_.emplace(producer_channel, slot);
// Signal the client that there is new buffer available throught POLLIN.
@@ -89,7 +95,8 @@
ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
- ALOGD(
+ ALOGD_IF(
+ TRACE,
"ConsumerQueueChannel::OnConsumerQueueImportBuffers number of buffers to "
"import: %zu",
pending_buffer_slots_.size());
@@ -134,5 +141,12 @@
return {std::move(buffer_handles)};
}
+void ConsumerQueueChannel::OnProducerClosed() {
+ ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
+ buffer_id());
+ producer_.reset();
+ Hangup();
+}
+
} // namespace dvr
} // namespace android
diff --git a/services/vr/bufferhubd/consumer_queue_channel.h b/services/vr/bufferhubd/consumer_queue_channel.h
index e1005e4..aa3f531 100644
--- a/services/vr/bufferhubd/consumer_queue_channel.h
+++ b/services/vr/bufferhubd/consumer_queue_channel.h
@@ -39,6 +39,8 @@
pdx::Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
OnConsumerQueueImportBuffers(Message& message);
+ void OnProducerClosed();
+
private:
std::shared_ptr<ProducerQueueChannel> GetProducer() const;
diff --git a/services/vr/bufferhubd/producer_channel.cpp b/services/vr/bufferhubd/producer_channel.cpp
index e452d04..398aa12 100644
--- a/services/vr/bufferhubd/producer_channel.cpp
+++ b/services/vr/bufferhubd/producer_channel.cpp
@@ -199,10 +199,15 @@
return ErrorStatus(EBUSY);
}
- if (meta_size_bytes_ != metadata.size())
+ if (meta_size_bytes_ != metadata.size()) {
+ ALOGD_IF(TRACE,
+ "ProducerChannel::OnProducerPost: Expected meta_size_bytes=%zu "
+ "got size=%zu",
+ meta_size_bytes_, metadata.size());
return ErrorStatus(EINVAL);
- std::copy(metadata.begin(), metadata.end(), meta_.get());
+ }
+ std::copy(metadata.begin(), metadata.end(), meta_.get());
post_fence_ = std::move(acquire_fence);
producer_owns_ = false;
diff --git a/services/vr/bufferhubd/producer_queue_channel.cpp b/services/vr/bufferhubd/producer_queue_channel.cpp
index bd8eed8..843277e 100644
--- a/services/vr/bufferhubd/producer_queue_channel.cpp
+++ b/services/vr/bufferhubd/producer_queue_channel.cpp
@@ -26,7 +26,12 @@
*error = 0;
}
-ProducerQueueChannel::~ProducerQueueChannel() {}
+ProducerQueueChannel::~ProducerQueueChannel() {
+ ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
+ buffer_id());
+ for (auto* consumer : consumer_channels_)
+ consumer->OnProducerClosed();
+}
/* static */
Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
@@ -108,13 +113,21 @@
return ErrorStatus(ENOMEM);
}
- const auto channel_status = service()->SetChannel(
- channel_id, std::make_shared<ConsumerQueueChannel>(
- service(), buffer_id(), channel_id, shared_from_this()));
+ auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
+ service(), buffer_id(), channel_id, shared_from_this());
+
+ // Register the existing buffers with the new consumer queue.
+ for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
+ if (auto buffer = buffers_[slot].lock())
+ consumer_queue_channel->RegisterNewBuffer(buffer, slot);
+ }
+
+ const auto channel_status =
+ service()->SetChannel(channel_id, consumer_queue_channel);
if (!channel_status) {
ALOGE(
- "ProducerQueueChannel::OnCreateConsumerQueue: failed to set new "
- "consumer channel: %s",
+ "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
+ "%s",
channel_status.GetErrorMessage().c_str());
return ErrorStatus(ENOMEM);
}
@@ -254,7 +267,7 @@
capacity_++;
// Notify each consumer channel about the new buffer.
- for (auto consumer_channel : consumer_channels_) {
+ for (auto* consumer_channel : consumer_channels_) {
ALOGD(
"ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
"buffer, buffer_id=%d",