blob: be1525d41f676c42e41e71e18f245647fa15e329 [file] [log] [blame]
Carlos Martinez Romero43d9afc2023-07-07 22:58:51 +00001// Copyright (C) 2023 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! libbufferstreams: Reactive Streams for Graphics Buffers
16
Carlos Martinez Romero89aab242023-10-03 21:03:47 +000017pub mod buffers;
James Shargo77e3c542023-09-12 11:58:07 -040018pub mod publishers;
James Shargo83baba52023-09-11 20:17:40 -040019mod stream_config;
James Shargo77e3c542023-09-12 11:58:07 -040020pub mod subscribers;
21pub mod subscriptions;
James Shargo83baba52023-09-11 20:17:40 -040022
Carlos Martinez Romero89aab242023-10-03 21:03:47 +000023use buffers::Buffer;
James Shargo83baba52023-09-11 20:17:40 -040024pub use stream_config::*;
25
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000026use std::time::Instant;
27
Carlos Martinez Romero43d9afc2023-07-07 22:58:51 +000028/// This function will print Hello World.
29#[no_mangle]
30pub extern "C" fn hello() -> bool {
31 println!("Hello world.");
32 true
33}
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000034
35/// BufferPublishers provide buffers to BufferSusbscribers. Depending on the
36/// particular object in question, these could be allocated locally or provided
37/// over IPC.
38///
39/// BufferPublishers are required to adhere to the following, based on the
40/// reactive streams specification:
James Shargo77e3c542023-09-12 11:58:07 -040041/// * The total number of on_next´s signalled by a Publisher to a Subscriber
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000042/// MUST be less than or equal to the total number of elements requested by that
43/// Subscriber´s Subscription at all times.
James Shargo77e3c542023-09-12 11:58:07 -040044/// * A Publisher MAY signal fewer on_next than requested and terminate the
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000045/// Subscription by calling on_complete or on_error.
James Shargo77e3c542023-09-12 11:58:07 -040046/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000047/// MUST be signaled serially.
James Shargo77e3c542023-09-12 11:58:07 -040048/// * If a Publisher fails it MUST signal an on_error.
49/// * If a Publisher terminates successfully (finite stream) it MUST signal an
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000050/// on_complete.
James Shargo77e3c542023-09-12 11:58:07 -040051/// * If a Publisher signals either on_error or on_complete on a Subscriber,
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000052/// that Subscriber’s Subscription MUST be considered cancelled.
James Shargo77e3c542023-09-12 11:58:07 -040053/// * Once a terminal state has been signaled (on_error, on_complete) it is
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000054/// REQUIRED that no further signals occur.
James Shargo77e3c542023-09-12 11:58:07 -040055/// * If a Subscription is cancelled its Subscriber MUST eventually stop being
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000056/// signaled.
James Shargo77e3c542023-09-12 11:58:07 -040057/// * A Publisher MAY support multiple Subscribers and decides whether each
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000058/// Subscription is unicast or multicast.
59pub trait BufferPublisher {
James Shargo83baba52023-09-11 20:17:40 -040060 /// Returns the StreamConfig of buffers that publisher creates.
61 fn get_publisher_stream_config(&self) -> StreamConfig;
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000062 /// This function will create the subscription between the publisher and
63 /// the subscriber.
James Shargo77e3c542023-09-12 11:58:07 -040064 fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static);
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000065}
66
67/// BufferSubscribers can subscribe to BufferPublishers. They can request Frames
68/// via the BufferSubscription they get from the publisher, then receive Frames
69/// via on_next.
70///
71/// BufferSubcribers are required to adhere to the following, based on the
72/// reactive streams specification:
James Shargo77e3c542023-09-12 11:58:07 -040073/// * The total number of on_next´s signalled by a Publisher to a Subscriber
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000074/// MUST be less than or equal to the total number of elements requested by that
75/// Subscriber´s Subscription at all times.
James Shargo77e3c542023-09-12 11:58:07 -040076/// * A Publisher MAY signal fewer on_next than requested and terminate the
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000077/// Subscription by calling on_complete or on_error.
James Shargo77e3c542023-09-12 11:58:07 -040078/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000079/// MUST be signaled serially.
James Shargo77e3c542023-09-12 11:58:07 -040080/// * If a Publisher fails it MUST signal an on_error.
81/// * If a Publisher terminates successfully (finite stream) it MUST signal an
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000082/// on_complete.
James Shargo77e3c542023-09-12 11:58:07 -040083/// * If a Publisher signals either on_error or on_complete on a Subscriber,
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000084/// that Subscriber’s Subscription MUST be considered cancelled.
James Shargo77e3c542023-09-12 11:58:07 -040085/// * Once a terminal state has been signaled (on_error, on_complete) it is
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000086/// REQUIRED that no further signals occur.
James Shargo77e3c542023-09-12 11:58:07 -040087/// * If a Subscription is cancelled its Subscriber MUST eventually stop being
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000088/// signaled.
James Shargo77e3c542023-09-12 11:58:07 -040089/// * Publisher.subscribe MAY be called as many times as wanted but MUST be
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000090/// with a different Subscriber each time.
James Shargo77e3c542023-09-12 11:58:07 -040091/// * A Publisher MAY support multiple Subscribers and decides whether each
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000092/// Subscription is unicast or multicast.
93pub trait BufferSubscriber {
James Shargo83baba52023-09-11 20:17:40 -040094 /// The StreamConfig of buffers that this subscriber expects.
95 fn get_subscriber_stream_config(&self) -> StreamConfig;
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000096 /// This function will be called at the beginning of the subscription.
James Shargo77e3c542023-09-12 11:58:07 -040097 fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>);
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000098 /// This function will be called for buffer that comes in.
James Shargo77e3c542023-09-12 11:58:07 -040099 fn on_next(&mut self, frame: Frame);
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000100 /// This function will be called in case of an error.
James Shargo77e3c542023-09-12 11:58:07 -0400101 fn on_error(&mut self, error: BufferError);
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000102 /// This function will be called on finite streams when done.
James Shargo77e3c542023-09-12 11:58:07 -0400103 fn on_complete(&mut self);
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000104}
105
106/// BufferSubscriptions serve as the bridge between BufferPublishers and
107/// BufferSubscribers. BufferSubscribers receive a BufferSubscription when they
108/// subscribe to a BufferPublisher via on_subscribe.
109/// This object is to be used by the BufferSubscriber to cancel its subscription
110/// or request more buffers.
111///
112/// BufferSubcriptions are required to adhere to the following, based on the
113/// reactive streams specification:
James Shargo77e3c542023-09-12 11:58:07 -0400114/// * Subscription.request and Subscription.cancel MUST only be called inside
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000115/// of its Subscriber context.
James Shargo77e3c542023-09-12 11:58:07 -0400116/// * The Subscription MUST allow the Subscriber to call Subscription.request
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000117/// synchronously from within on_next or on_subscribe.
James Shargo77e3c542023-09-12 11:58:07 -0400118/// * Subscription.request MUST place an upper bound on possible synchronous
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000119/// recursion between Publisher and Subscriber.
James Shargo77e3c542023-09-12 11:58:07 -0400120/// * Subscription.request SHOULD respect the responsivity of its caller by
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000121/// returning in a timely manner.
James Shargo77e3c542023-09-12 11:58:07 -0400122/// * Subscription.cancel MUST respect the responsivity of its caller by
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000123/// returning in a timely manner, MUST be idempotent and MUST be thread-safe.
James Shargo77e3c542023-09-12 11:58:07 -0400124/// * After the Subscription is cancelled, additional
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000125/// Subscription.request(n: u64) MUST be NOPs.
James Shargo77e3c542023-09-12 11:58:07 -0400126/// * After the Subscription is cancelled, additional Subscription.cancel()
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000127/// MUST be NOPs.
James Shargo77e3c542023-09-12 11:58:07 -0400128/// * While the Subscription is not cancelled, Subscription.request(n: u64)
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000129/// MUST register the given number of additional elements to be produced to the
130/// respective subscriber.
James Shargo77e3c542023-09-12 11:58:07 -0400131/// * While the Subscription is not cancelled, Subscription.request(n: u64)
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000132/// MUST signal on_error if the argument is <= 0. The cause message SHOULD
133/// explain that non-positive request signals are illegal.
James Shargo77e3c542023-09-12 11:58:07 -0400134/// * While the Subscription is not cancelled, Subscription.request(n: u64)
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000135/// MAY synchronously call on_next on this (or other) subscriber(s).
James Shargo77e3c542023-09-12 11:58:07 -0400136/// * While the Subscription is not cancelled, Subscription.request(n: u64)
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000137/// MAY synchronously call on_complete or on_error on this (or other)
138/// subscriber(s).
James Shargo77e3c542023-09-12 11:58:07 -0400139/// * While the Subscription is not cancelled, Subscription.cancel() MUST
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000140/// request the Publisher to eventually stop signaling its Subscriber. The
141/// operation is NOT REQUIRED to affect the Subscription immediately.
James Shargo77e3c542023-09-12 11:58:07 -0400142/// * While the Subscription is not cancelled, Subscription.cancel() MUST
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000143/// request the Publisher to eventually drop any references to the corresponding
144/// subscriber.
James Shargo77e3c542023-09-12 11:58:07 -0400145/// * While the Subscription is not cancelled, calling Subscription.cancel MAY
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000146/// cause the Publisher, if stateful, to transition into the shut-down state if
147/// no other Subscription exists at this point.
James Shargo77e3c542023-09-12 11:58:07 -0400148/// * Calling Subscription.cancel MUST return normally.
149/// * Calling Subscription.request MUST return normally.
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000150pub trait BufferSubscription {
151 /// request
152 fn request(&self, n: u64);
153 /// cancel
154 fn cancel(&self);
155}
James Shargo77e3c542023-09-12 11:58:07 -0400156
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000157/// Type used to describe errors produced by subscriptions.
James Shargo77e3c542023-09-12 11:58:07 -0400158pub type BufferError = anyhow::Error;
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000159
160/// Struct used to contain the buffer.
161pub struct Frame {
Carlos Martinez Romero89aab242023-10-03 21:03:47 +0000162 /// A buffer to be used this frame.
163 pub buffer: Buffer,
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000164 /// The time at which the buffer was dispatched.
165 pub present_time: Instant,
166 /// A fence used for reading/writing safely.
167 pub fence: i32,
168}
James Shargo77e3c542023-09-12 11:58:07 -0400169
170#[cfg(test)]
171mod test {
172 #![allow(warnings, unused)]
173 use super::*;
174
175 use anyhow::anyhow;
Carlos Martinez Romero89aab242023-10-03 21:03:47 +0000176 use buffers::Buffer;
177 use nativewindow::{AHardwareBuffer_Format, AHardwareBuffer_UsageFlags};
James Shargo77e3c542023-09-12 11:58:07 -0400178 use std::borrow::BorrowMut;
179 use std::error::Error;
180 use std::ops::Add;
181 use std::sync::Arc;
182 use std::time::Duration;
183
184 use crate::publishers::testing::*;
185 use crate::subscribers::{testing::*, SharedSubscriber};
186
187 const STREAM_CONFIG: StreamConfig = StreamConfig {
188 width: 1,
189 height: 1,
190 layers: 1,
191 format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
192 usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
193 stride: 0,
194 };
195
196 fn make_frame() -> Frame {
197 Frame {
Carlos Martinez Romero89aab242023-10-03 21:03:47 +0000198 buffer: Buffer::new_unowned(
199 STREAM_CONFIG
200 .create_hardware_buffer()
201 .expect("Unable to create hardware buffer for test"),
202 ),
James Shargo77e3c542023-09-12 11:58:07 -0400203 present_time: Instant::now() + Duration::from_secs(1),
204 fence: 0,
205 }
206 }
207
208 #[test]
209 fn test_test_implementations_next() {
210 let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
211 let mut publisher = TestPublisher::new(STREAM_CONFIG);
212
213 publisher.subscribe(subscriber.clone());
214 assert!(subscriber.map_inner(|s| s.has_subscription()));
215 assert!(publisher.has_subscriber());
216
217 publisher.send_frame(make_frame());
218 let events = subscriber.map_inner_mut(|s| s.take_events());
219 assert!(!matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
220
221 subscriber.map_inner(|s| s.request(1));
222 assert_eq!(publisher.pending_requests(), 1);
223
224 publisher.send_frame(make_frame());
225 let events = subscriber.map_inner_mut(|s| s.take_events());
226 assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
227 assert_eq!(publisher.pending_requests(), 0);
228 }
229
230 #[test]
231 fn test_test_implementations_complete() {
232 let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
233 let mut publisher = TestPublisher::new(STREAM_CONFIG);
234
235 publisher.subscribe(subscriber.clone());
236 assert!(subscriber.map_inner(|s| s.has_subscription()));
237 assert!(publisher.has_subscriber());
238
239 publisher.send_complete();
240 let events = subscriber.map_inner_mut(|s| s.take_events());
241 assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Complete));
242 }
243
244 #[test]
245 fn test_test_implementations_error() {
246 let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
247 let mut publisher = TestPublisher::new(STREAM_CONFIG);
248
249 publisher.subscribe(subscriber.clone());
250 assert!(subscriber.map_inner(|s| s.has_subscription()));
251 assert!(publisher.has_subscriber());
252
253 publisher.send_error(anyhow!("error"));
254 let events = subscriber.map_inner_mut(|s| s.take_events());
255 assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Error(_)));
256 }
257}