bufferstreams: Add basic implementations of core BS traits
For BufferSubscriptions, this change provides a generic implenmentation
that tracks requests and cancellations.
For BufferPublishers and BufferSubscribers, we provide test
implementations that let a user manually control the flow of events
between the two objects.
The traits themselves have also been updated to be more generic--instead
of forcing Arc/Weak pointers for shared objects, we now use generic
owned types for a BufferPublisher's BufferSubscriber and a
BufferSubscriber's BufferSubscription.
To make it possible to hold into a handle to a BufferSubscriber while a
BufferPublisher owns it, we provide a generic implementation of
BufferSubscriber for any Arc<BS: BufferSubscriber> that delegates to the
underlying subscriber.
Bug: 296449936, 296100790
Test: atest libbufferstreams-internal_test
Change-Id: Ibbf925d2dfb85f606baa3dc1f9722440af4f862c
diff --git a/libs/bufferstreams/rust/src/lib.rs b/libs/bufferstreams/rust/src/lib.rs
index 7bd8e38..87f3104 100644
--- a/libs/bufferstreams/rust/src/lib.rs
+++ b/libs/bufferstreams/rust/src/lib.rs
@@ -14,12 +14,14 @@
//! libbufferstreams: Reactive Streams for Graphics Buffers
+pub mod publishers;
mod stream_config;
+pub mod subscribers;
+pub mod subscriptions;
pub use stream_config::*;
use nativewindow::*;
-use std::sync::{Arc, Weak};
use std::time::Instant;
/// This function will print Hello World.
@@ -35,30 +37,30 @@
///
/// BufferPublishers are required to adhere to the following, based on the
/// reactive streams specification:
-/// * The total number of on_next´s signalled by a Publisher to a Subscriber
+/// * The total number of on_next´s signalled by a Publisher to a Subscriber
/// MUST be less than or equal to the total number of elements requested by that
/// Subscriber´s Subscription at all times.
-/// * A Publisher MAY signal fewer on_next than requested and terminate the
+/// * A Publisher MAY signal fewer on_next than requested and terminate the
/// Subscription by calling on_complete or on_error.
-/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
+/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
/// MUST be signaled serially.
-/// * If a Publisher fails it MUST signal an on_error.
-/// * If a Publisher terminates successfully (finite stream) it MUST signal an
+/// * If a Publisher fails it MUST signal an on_error.
+/// * If a Publisher terminates successfully (finite stream) it MUST signal an
/// on_complete.
-/// * If a Publisher signals either on_error or on_complete on a Subscriber,
+/// * If a Publisher signals either on_error or on_complete on a Subscriber,
/// that Subscriber’s Subscription MUST be considered cancelled.
-/// * Once a terminal state has been signaled (on_error, on_complete) it is
+/// * Once a terminal state has been signaled (on_error, on_complete) it is
/// REQUIRED that no further signals occur.
-/// * If a Subscription is cancelled its Subscriber MUST eventually stop being
+/// * If a Subscription is cancelled its Subscriber MUST eventually stop being
/// signaled.
-/// * A Publisher MAY support multiple Subscribers and decides whether each
+/// * A Publisher MAY support multiple Subscribers and decides whether each
/// Subscription is unicast or multicast.
pub trait BufferPublisher {
/// Returns the StreamConfig of buffers that publisher creates.
fn get_publisher_stream_config(&self) -> StreamConfig;
/// This function will create the subscription between the publisher and
/// the subscriber.
- fn subscribe(&self, subscriber: Weak<dyn BufferSubscriber>);
+ fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static);
}
/// BufferSubscribers can subscribe to BufferPublishers. They can request Frames
@@ -67,37 +69,37 @@
///
/// BufferSubcribers are required to adhere to the following, based on the
/// reactive streams specification:
-/// * The total number of on_next´s signalled by a Publisher to a Subscriber
+/// * The total number of on_next´s signalled by a Publisher to a Subscriber
/// MUST be less than or equal to the total number of elements requested by that
/// Subscriber´s Subscription at all times.
-/// * A Publisher MAY signal fewer on_next than requested and terminate the
+/// * A Publisher MAY signal fewer on_next than requested and terminate the
/// Subscription by calling on_complete or on_error.
-/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
+/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
/// MUST be signaled serially.
-/// * If a Publisher fails it MUST signal an on_error.
-/// * If a Publisher terminates successfully (finite stream) it MUST signal an
+/// * If a Publisher fails it MUST signal an on_error.
+/// * If a Publisher terminates successfully (finite stream) it MUST signal an
/// on_complete.
-/// * If a Publisher signals either on_error or on_complete on a Subscriber,
+/// * If a Publisher signals either on_error or on_complete on a Subscriber,
/// that Subscriber’s Subscription MUST be considered cancelled.
-/// * Once a terminal state has been signaled (on_error, on_complete) it is
+/// * Once a terminal state has been signaled (on_error, on_complete) it is
/// REQUIRED that no further signals occur.
-/// * If a Subscription is cancelled its Subscriber MUST eventually stop being
+/// * If a Subscription is cancelled its Subscriber MUST eventually stop being
/// signaled.
-/// * Publisher.subscribe MAY be called as many times as wanted but MUST be
+/// * Publisher.subscribe MAY be called as many times as wanted but MUST be
/// with a different Subscriber each time.
-/// * A Publisher MAY support multiple Subscribers and decides whether each
+/// * A Publisher MAY support multiple Subscribers and decides whether each
/// Subscription is unicast or multicast.
pub trait BufferSubscriber {
/// The StreamConfig of buffers that this subscriber expects.
fn get_subscriber_stream_config(&self) -> StreamConfig;
/// This function will be called at the beginning of the subscription.
- fn on_subscribe(&self, subscription: Arc<dyn BufferSubscription>);
+ fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>);
/// This function will be called for buffer that comes in.
- fn on_next(&self, frame: Frame);
+ fn on_next(&mut self, frame: Frame);
/// This function will be called in case of an error.
- fn on_error(&self, error: BufferError);
+ fn on_error(&mut self, error: BufferError);
/// This function will be called on finite streams when done.
- fn on_complete(&self);
+ fn on_complete(&mut self);
}
/// BufferSubscriptions serve as the bridge between BufferPublishers and
@@ -108,50 +110,51 @@
///
/// BufferSubcriptions are required to adhere to the following, based on the
/// reactive streams specification:
-/// * Subscription.request and Subscription.cancel MUST only be called inside
+/// * Subscription.request and Subscription.cancel MUST only be called inside
/// of its Subscriber context.
-/// * The Subscription MUST allow the Subscriber to call Subscription.request
+/// * The Subscription MUST allow the Subscriber to call Subscription.request
/// synchronously from within on_next or on_subscribe.
-/// * Subscription.request MUST place an upper bound on possible synchronous
+/// * Subscription.request MUST place an upper bound on possible synchronous
/// recursion between Publisher and Subscriber.
-/// * Subscription.request SHOULD respect the responsivity of its caller by
+/// * Subscription.request SHOULD respect the responsivity of its caller by
/// returning in a timely manner.
-/// * Subscription.cancel MUST respect the responsivity of its caller by
+/// * Subscription.cancel MUST respect the responsivity of its caller by
/// returning in a timely manner, MUST be idempotent and MUST be thread-safe.
-/// * After the Subscription is cancelled, additional
+/// * After the Subscription is cancelled, additional
/// Subscription.request(n: u64) MUST be NOPs.
-/// * After the Subscription is cancelled, additional Subscription.cancel()
+/// * After the Subscription is cancelled, additional Subscription.cancel()
/// MUST be NOPs.
-/// * While the Subscription is not cancelled, Subscription.request(n: u64)
+/// * While the Subscription is not cancelled, Subscription.request(n: u64)
/// MUST register the given number of additional elements to be produced to the
/// respective subscriber.
-/// * While the Subscription is not cancelled, Subscription.request(n: u64)
+/// * While the Subscription is not cancelled, Subscription.request(n: u64)
/// MUST signal on_error if the argument is <= 0. The cause message SHOULD
/// explain that non-positive request signals are illegal.
-/// * While the Subscription is not cancelled, Subscription.request(n: u64)
+/// * While the Subscription is not cancelled, Subscription.request(n: u64)
/// MAY synchronously call on_next on this (or other) subscriber(s).
-/// * While the Subscription is not cancelled, Subscription.request(n: u64)
+/// * While the Subscription is not cancelled, Subscription.request(n: u64)
/// MAY synchronously call on_complete or on_error on this (or other)
/// subscriber(s).
-/// * While the Subscription is not cancelled, Subscription.cancel() MUST
+/// * While the Subscription is not cancelled, Subscription.cancel() MUST
/// request the Publisher to eventually stop signaling its Subscriber. The
/// operation is NOT REQUIRED to affect the Subscription immediately.
-/// * While the Subscription is not cancelled, Subscription.cancel() MUST
+/// * While the Subscription is not cancelled, Subscription.cancel() MUST
/// request the Publisher to eventually drop any references to the corresponding
/// subscriber.
-/// * While the Subscription is not cancelled, calling Subscription.cancel MAY
+/// * While the Subscription is not cancelled, calling Subscription.cancel MAY
/// cause the Publisher, if stateful, to transition into the shut-down state if
/// no other Subscription exists at this point.
-/// * Calling Subscription.cancel MUST return normally.
-/// * Calling Subscription.request MUST return normally.
+/// * Calling Subscription.cancel MUST return normally.
+/// * Calling Subscription.request MUST return normally.
pub trait BufferSubscription {
/// request
fn request(&self, n: u64);
/// cancel
fn cancel(&self);
}
+
/// Type used to describe errors produced by subscriptions.
-type BufferError = Box<dyn std::error::Error + Send + Sync + 'static>;
+pub type BufferError = anyhow::Error;
/// Struct used to contain the buffer.
pub struct Frame {
@@ -162,3 +165,88 @@
/// A fence used for reading/writing safely.
pub fence: i32,
}
+
+#[cfg(test)]
+mod test {
+ #![allow(warnings, unused)]
+ use super::*;
+
+ use anyhow::anyhow;
+ use std::borrow::BorrowMut;
+ use std::error::Error;
+ use std::ops::Add;
+ use std::sync::Arc;
+ use std::time::Duration;
+
+ use crate::publishers::testing::*;
+ use crate::subscribers::{testing::*, SharedSubscriber};
+
+ const STREAM_CONFIG: StreamConfig = StreamConfig {
+ width: 1,
+ height: 1,
+ layers: 1,
+ format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
+ usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
+ stride: 0,
+ };
+
+ fn make_frame() -> Frame {
+ Frame {
+ buffer: STREAM_CONFIG
+ .create_hardware_buffer()
+ .expect("Unable to create hardware buffer for test"),
+ present_time: Instant::now() + Duration::from_secs(1),
+ fence: 0,
+ }
+ }
+
+ #[test]
+ fn test_test_implementations_next() {
+ let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
+ let mut publisher = TestPublisher::new(STREAM_CONFIG);
+
+ publisher.subscribe(subscriber.clone());
+ assert!(subscriber.map_inner(|s| s.has_subscription()));
+ assert!(publisher.has_subscriber());
+
+ publisher.send_frame(make_frame());
+ let events = subscriber.map_inner_mut(|s| s.take_events());
+ assert!(!matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
+
+ subscriber.map_inner(|s| s.request(1));
+ assert_eq!(publisher.pending_requests(), 1);
+
+ publisher.send_frame(make_frame());
+ let events = subscriber.map_inner_mut(|s| s.take_events());
+ assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
+ assert_eq!(publisher.pending_requests(), 0);
+ }
+
+ #[test]
+ fn test_test_implementations_complete() {
+ let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
+ let mut publisher = TestPublisher::new(STREAM_CONFIG);
+
+ publisher.subscribe(subscriber.clone());
+ assert!(subscriber.map_inner(|s| s.has_subscription()));
+ assert!(publisher.has_subscriber());
+
+ publisher.send_complete();
+ let events = subscriber.map_inner_mut(|s| s.take_events());
+ assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Complete));
+ }
+
+ #[test]
+ fn test_test_implementations_error() {
+ let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
+ let mut publisher = TestPublisher::new(STREAM_CONFIG);
+
+ publisher.subscribe(subscriber.clone());
+ assert!(subscriber.map_inner(|s| s.has_subscription()));
+ assert!(publisher.has_subscriber());
+
+ publisher.send_error(anyhow!("error"));
+ let events = subscriber.map_inner_mut(|s| s.take_events());
+ assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Error(_)));
+ }
+}