libbufferstreams: Add the BufferPoolPublisher.
The BufferPoolPublisher submits buffers from a pool over to a
subscriber.
Pair: jshargo
Bug: 296450854, 296101127
Test: atest libbufferstreams-internal_test
Change-Id: Ic473677c9c71b0505c3fcd2b4fb7d0fdf3d7d01b
diff --git a/libs/bufferstreams/rust/src/publishers/buffer_pool_publisher.rs b/libs/bufferstreams/rust/src/publishers/buffer_pool_publisher.rs
new file mode 100644
index 0000000..846105d
--- /dev/null
+++ b/libs/bufferstreams/rust/src/publishers/buffer_pool_publisher.rs
@@ -0,0 +1,112 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//!
+
+use std::time::Instant;
+
+use crate::{
+ buffers::BufferPool, subscriptions::SharedBufferSubscription, BufferPublisher,
+ BufferSubscriber, Frame, StreamConfig,
+};
+
+/// The [BufferPoolPublisher] submits buffers from a pool over to the subscriber.
+pub struct BufferPoolPublisher {
+ stream_config: StreamConfig,
+ buffer_pool: BufferPool,
+ subscription: SharedBufferSubscription,
+ subscriber: Option<Box<dyn BufferSubscriber>>,
+}
+
+impl BufferPoolPublisher {
+ /// The [BufferPoolPublisher] needs to initialize a [BufferPool], the [BufferPool] will create
+ /// all buffers at initialization using the stream_config.
+ pub fn new(stream_config: StreamConfig, size: usize) -> Option<Self> {
+ BufferPool::new(size, stream_config).map(|buffer_pool| Self {
+ stream_config,
+ buffer_pool,
+ subscription: SharedBufferSubscription::new(),
+ subscriber: None,
+ })
+ }
+
+ /// If the [SharedBufferSubscription] is ready for a [Frame], a buffer will be requested from
+ /// [BufferPool] and sent over to the [BufferSubscriber].
+ pub fn send_next_frame(&mut self, present_time: Instant) -> bool {
+ if let Some(subscriber) = self.subscriber.as_mut() {
+ if self.subscription.take_request() {
+ if let Some(buffer) = self.buffer_pool.next_buffer() {
+ let frame = Frame { buffer, present_time, fence: 0 };
+
+ subscriber.on_next(frame);
+ return true;
+ }
+ }
+ }
+ false
+ }
+}
+
+impl BufferPublisher for BufferPoolPublisher {
+ fn get_publisher_stream_config(&self) -> StreamConfig {
+ self.stream_config
+ }
+
+ fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static) {
+ assert!(self.subscriber.is_none());
+
+ self.subscriber = Some(Box::new(subscriber));
+ self.subscriber.as_mut().unwrap().on_subscribe(self.subscription.clone_for_subscriber());
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use nativewindow::{AHardwareBuffer_Format, AHardwareBuffer_UsageFlags};
+
+ use super::*;
+
+ use crate::{
+ subscribers::{
+ testing::{TestSubscriber, TestingSubscriberEvent},
+ SharedSubscriber,
+ },
+ StreamConfig,
+ };
+
+ const STREAM_CONFIG: StreamConfig = StreamConfig {
+ width: 1,
+ height: 1,
+ layers: 1,
+ format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
+ usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
+ stride: 0,
+ };
+
+ #[test]
+ fn test_send_next_frame() {
+ let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
+
+ let mut buffer_pool_publisher = BufferPoolPublisher::new(STREAM_CONFIG, 1).unwrap();
+ buffer_pool_publisher.subscribe(subscriber.clone());
+
+ subscriber.map_inner(|s| s.request(1));
+
+ assert!(buffer_pool_publisher.send_next_frame(Instant::now()));
+
+ let events = subscriber.map_inner_mut(|s| s.take_events());
+ assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
+ assert_eq!(buffer_pool_publisher.subscription.pending_requests(), 0);
+ }
+}
diff --git a/libs/bufferstreams/rust/src/publishers/mod.rs b/libs/bufferstreams/rust/src/publishers/mod.rs
index 2fd518e..8ed3ba0 100644
--- a/libs/bufferstreams/rust/src/publishers/mod.rs
+++ b/libs/bufferstreams/rust/src/publishers/mod.rs
@@ -14,4 +14,7 @@
//! This module provides [BufferSubscriber] implementations and helpers.
+mod buffer_pool_publisher;
pub mod testing;
+
+pub use buffer_pool_publisher::*;