bufferstreams: Add basic implementations of core BS traits

For BufferSubscriptions, this change provides a generic implenmentation
that tracks requests and cancellations.

For BufferPublishers and BufferSubscribers, we provide test
implementations that let a user manually control the flow of events
between the two objects.

The traits themselves have also been updated to be more generic--instead
of forcing Arc/Weak pointers for shared objects, we now use generic
owned types for a BufferPublisher's BufferSubscriber and a
BufferSubscriber's BufferSubscription.

To make it possible to hold into a handle to a BufferSubscriber while a
BufferPublisher owns it, we provide a generic implementation of
BufferSubscriber for any Arc<BS: BufferSubscriber> that delegates to the
underlying subscriber.

Bug: 296449936, 296100790
Test: atest libbufferstreams-internal_test
Change-Id: Ibbf925d2dfb85f606baa3dc1f9722440af4f862c
diff --git a/libs/bufferstreams/rust/src/lib.rs b/libs/bufferstreams/rust/src/lib.rs
index 7bd8e38..87f3104 100644
--- a/libs/bufferstreams/rust/src/lib.rs
+++ b/libs/bufferstreams/rust/src/lib.rs
@@ -14,12 +14,14 @@
 
 //! libbufferstreams: Reactive Streams for Graphics Buffers
 
+pub mod publishers;
 mod stream_config;
+pub mod subscribers;
+pub mod subscriptions;
 
 pub use stream_config::*;
 
 use nativewindow::*;
-use std::sync::{Arc, Weak};
 use std::time::Instant;
 
 /// This function will print Hello World.
@@ -35,30 +37,30 @@
 ///
 /// BufferPublishers are required to adhere to the following, based on the
 /// reactive streams specification:
-/// *   The total number of on_next´s signalled by a Publisher to a Subscriber
+/// * The total number of on_next´s signalled by a Publisher to a Subscriber
 /// MUST be less than or equal to the total number of elements requested by that
 /// Subscriber´s Subscription at all times.
-/// *   A Publisher MAY signal fewer on_next than requested and terminate the
+/// * A Publisher MAY signal fewer on_next than requested and terminate the
 /// Subscription by calling on_complete or on_error.
-/// *   on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
+/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
 /// MUST be signaled serially.
-/// *   If a Publisher fails it MUST signal an on_error.
-/// *   If a Publisher terminates successfully (finite stream) it MUST signal an
+/// * If a Publisher fails it MUST signal an on_error.
+/// * If a Publisher terminates successfully (finite stream) it MUST signal an
 /// on_complete.
-/// *   If a Publisher signals either on_error or on_complete on a Subscriber,
+/// * If a Publisher signals either on_error or on_complete on a Subscriber,
 /// that Subscriber’s Subscription MUST be considered cancelled.
-/// *   Once a terminal state has been signaled (on_error, on_complete) it is
+/// * Once a terminal state has been signaled (on_error, on_complete) it is
 /// REQUIRED that no further signals occur.
-/// *   If a Subscription is cancelled its Subscriber MUST eventually stop being
+/// * If a Subscription is cancelled its Subscriber MUST eventually stop being
 ///  signaled.
-/// *  A Publisher MAY support multiple Subscribers and decides whether each
+/// * A Publisher MAY support multiple Subscribers and decides whether each
 /// Subscription is unicast or multicast.
 pub trait BufferPublisher {
     /// Returns the StreamConfig of buffers that publisher creates.
     fn get_publisher_stream_config(&self) -> StreamConfig;
     /// This function will create the subscription between the publisher and
     /// the subscriber.
-    fn subscribe(&self, subscriber: Weak<dyn BufferSubscriber>);
+    fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static);
 }
 
 /// BufferSubscribers can subscribe to BufferPublishers. They can request Frames
@@ -67,37 +69,37 @@
 ///
 /// BufferSubcribers are required to adhere to the following, based on the
 /// reactive streams specification:
-/// *   The total number of on_next´s signalled by a Publisher to a Subscriber
+/// * The total number of on_next´s signalled by a Publisher to a Subscriber
 /// MUST be less than or equal to the total number of elements requested by that
 /// Subscriber´s Subscription at all times.
-/// *   A Publisher MAY signal fewer on_next than requested and terminate the
+/// * A Publisher MAY signal fewer on_next than requested and terminate the
 /// Subscription by calling on_complete or on_error.
-/// *   on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
+/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
 /// MUST be signaled serially.
-/// *   If a Publisher fails it MUST signal an on_error.
-/// *   If a Publisher terminates successfully (finite stream) it MUST signal an
+/// * If a Publisher fails it MUST signal an on_error.
+/// * If a Publisher terminates successfully (finite stream) it MUST signal an
 /// on_complete.
-/// *   If a Publisher signals either on_error or on_complete on a Subscriber,
+/// * If a Publisher signals either on_error or on_complete on a Subscriber,
 /// that Subscriber’s Subscription MUST be considered cancelled.
-/// *   Once a terminal state has been signaled (on_error, on_complete) it is
+/// * Once a terminal state has been signaled (on_error, on_complete) it is
 /// REQUIRED that no further signals occur.
-/// *   If a Subscription is cancelled its Subscriber MUST eventually stop being
+/// * If a Subscription is cancelled its Subscriber MUST eventually stop being
 /// signaled.
-/// *   Publisher.subscribe MAY be called as many times as wanted but MUST be
+/// * Publisher.subscribe MAY be called as many times as wanted but MUST be
 /// with a different Subscriber each time.
-/// *   A Publisher MAY support multiple Subscribers and decides whether each
+/// * A Publisher MAY support multiple Subscribers and decides whether each
 /// Subscription is unicast or multicast.
 pub trait BufferSubscriber {
     /// The StreamConfig of buffers that this subscriber expects.
     fn get_subscriber_stream_config(&self) -> StreamConfig;
     /// This function will be called at the beginning of the subscription.
-    fn on_subscribe(&self, subscription: Arc<dyn BufferSubscription>);
+    fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>);
     /// This function will be called for buffer that comes in.
-    fn on_next(&self, frame: Frame);
+    fn on_next(&mut self, frame: Frame);
     /// This function will be called in case of an error.
-    fn on_error(&self, error: BufferError);
+    fn on_error(&mut self, error: BufferError);
     /// This function will be called on finite streams when done.
-    fn on_complete(&self);
+    fn on_complete(&mut self);
 }
 
 /// BufferSubscriptions serve as the bridge between BufferPublishers and
@@ -108,50 +110,51 @@
 ///
 /// BufferSubcriptions are required to adhere to the following, based on the
 /// reactive streams specification:
-/// *   Subscription.request and Subscription.cancel MUST only be called inside
+/// * Subscription.request and Subscription.cancel MUST only be called inside
 /// of its Subscriber context.
-/// *   The Subscription MUST allow the Subscriber to call Subscription.request
+/// * The Subscription MUST allow the Subscriber to call Subscription.request
 /// synchronously from within on_next or on_subscribe.
-/// *   Subscription.request MUST place an upper bound on possible synchronous
+/// * Subscription.request MUST place an upper bound on possible synchronous
 /// recursion between Publisher and Subscriber.
-/// *   Subscription.request SHOULD respect the responsivity of its caller by
+/// * Subscription.request SHOULD respect the responsivity of its caller by
 /// returning in a timely manner.
-/// *   Subscription.cancel MUST respect the responsivity of its caller by
+/// * Subscription.cancel MUST respect the responsivity of its caller by
 /// returning in a timely manner, MUST be idempotent and MUST be thread-safe.
-/// *   After the Subscription is cancelled, additional
+/// * After the Subscription is cancelled, additional
 /// Subscription.request(n: u64) MUST be NOPs.
-/// *   After the Subscription is cancelled, additional Subscription.cancel()
+/// * After the Subscription is cancelled, additional Subscription.cancel()
 /// MUST be NOPs.
-/// *   While the Subscription is not cancelled, Subscription.request(n: u64)
+/// * While the Subscription is not cancelled, Subscription.request(n: u64)
 /// MUST register the given number of additional elements to be produced to the
 /// respective subscriber.
-/// *   While the Subscription is not cancelled, Subscription.request(n: u64)
+/// * While the Subscription is not cancelled, Subscription.request(n: u64)
 /// MUST signal on_error if the argument is <= 0. The cause message SHOULD
 /// explain that non-positive request signals are illegal.
-/// *  While the Subscription is not cancelled, Subscription.request(n: u64)
+/// * While the Subscription is not cancelled, Subscription.request(n: u64)
 /// MAY synchronously call on_next on this (or other) subscriber(s).
-/// *  While the Subscription is not cancelled, Subscription.request(n: u64)
+/// * While the Subscription is not cancelled, Subscription.request(n: u64)
 /// MAY synchronously call on_complete or on_error on this (or other)
 /// subscriber(s).
-/// *  While the Subscription is not cancelled, Subscription.cancel() MUST
+/// * While the Subscription is not cancelled, Subscription.cancel() MUST
 /// request the Publisher to eventually stop signaling its Subscriber. The
 /// operation is NOT REQUIRED to affect the Subscription immediately.
-/// *  While the Subscription is not cancelled, Subscription.cancel() MUST
+/// * While the Subscription is not cancelled, Subscription.cancel() MUST
 /// request the Publisher to eventually drop any references to the corresponding
 /// subscriber.
-/// *  While the Subscription is not cancelled, calling Subscription.cancel MAY
+/// * While the Subscription is not cancelled, calling Subscription.cancel MAY
 /// cause the Publisher, if stateful, to transition into the shut-down state if
 /// no other Subscription exists at this point.
-/// *  Calling Subscription.cancel MUST return normally.
-/// *  Calling Subscription.request MUST return normally.
+/// * Calling Subscription.cancel MUST return normally.
+/// * Calling Subscription.request MUST return normally.
 pub trait BufferSubscription {
     /// request
     fn request(&self, n: u64);
     /// cancel
     fn cancel(&self);
 }
+
 /// Type used to describe errors produced by subscriptions.
-type BufferError = Box<dyn std::error::Error + Send + Sync + 'static>;
+pub type BufferError = anyhow::Error;
 
 /// Struct used to contain the buffer.
 pub struct Frame {
@@ -162,3 +165,88 @@
     /// A fence used for reading/writing safely.
     pub fence: i32,
 }
+
+#[cfg(test)]
+mod test {
+    #![allow(warnings, unused)]
+    use super::*;
+
+    use anyhow::anyhow;
+    use std::borrow::BorrowMut;
+    use std::error::Error;
+    use std::ops::Add;
+    use std::sync::Arc;
+    use std::time::Duration;
+
+    use crate::publishers::testing::*;
+    use crate::subscribers::{testing::*, SharedSubscriber};
+
+    const STREAM_CONFIG: StreamConfig = StreamConfig {
+        width: 1,
+        height: 1,
+        layers: 1,
+        format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
+        usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
+        stride: 0,
+    };
+
+    fn make_frame() -> Frame {
+        Frame {
+            buffer: STREAM_CONFIG
+                .create_hardware_buffer()
+                .expect("Unable to create hardware buffer for test"),
+            present_time: Instant::now() + Duration::from_secs(1),
+            fence: 0,
+        }
+    }
+
+    #[test]
+    fn test_test_implementations_next() {
+        let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
+        let mut publisher = TestPublisher::new(STREAM_CONFIG);
+
+        publisher.subscribe(subscriber.clone());
+        assert!(subscriber.map_inner(|s| s.has_subscription()));
+        assert!(publisher.has_subscriber());
+
+        publisher.send_frame(make_frame());
+        let events = subscriber.map_inner_mut(|s| s.take_events());
+        assert!(!matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
+
+        subscriber.map_inner(|s| s.request(1));
+        assert_eq!(publisher.pending_requests(), 1);
+
+        publisher.send_frame(make_frame());
+        let events = subscriber.map_inner_mut(|s| s.take_events());
+        assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
+        assert_eq!(publisher.pending_requests(), 0);
+    }
+
+    #[test]
+    fn test_test_implementations_complete() {
+        let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
+        let mut publisher = TestPublisher::new(STREAM_CONFIG);
+
+        publisher.subscribe(subscriber.clone());
+        assert!(subscriber.map_inner(|s| s.has_subscription()));
+        assert!(publisher.has_subscriber());
+
+        publisher.send_complete();
+        let events = subscriber.map_inner_mut(|s| s.take_events());
+        assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Complete));
+    }
+
+    #[test]
+    fn test_test_implementations_error() {
+        let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
+        let mut publisher = TestPublisher::new(STREAM_CONFIG);
+
+        publisher.subscribe(subscriber.clone());
+        assert!(subscriber.map_inner(|s| s.has_subscription()));
+        assert!(publisher.has_subscriber());
+
+        publisher.send_error(anyhow!("error"));
+        let events = subscriber.map_inner_mut(|s| s.take_events());
+        assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Error(_)));
+    }
+}