blob: 5964281c9dd18c507266984c04ce8f2af7c6a270 [file] [log] [blame]
Carlos Martinez Romero43d9afc2023-07-07 22:58:51 +00001// Copyright (C) 2023 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! libbufferstreams: Reactive Streams for Graphics Buffers
16
James Shargo77e3c542023-09-12 11:58:07 -040017pub mod publishers;
James Shargo83baba52023-09-11 20:17:40 -040018mod stream_config;
James Shargo77e3c542023-09-12 11:58:07 -040019pub mod subscribers;
20pub mod subscriptions;
James Shargo83baba52023-09-11 20:17:40 -040021
22pub use stream_config::*;
23
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000024use nativewindow::*;
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000025use std::time::Instant;
26
Carlos Martinez Romero43d9afc2023-07-07 22:58:51 +000027/// This function will print Hello World.
28#[no_mangle]
29pub extern "C" fn hello() -> bool {
30 println!("Hello world.");
31 true
32}
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000033
34/// BufferPublishers provide buffers to BufferSusbscribers. Depending on the
35/// particular object in question, these could be allocated locally or provided
36/// over IPC.
37///
38/// BufferPublishers are required to adhere to the following, based on the
39/// reactive streams specification:
James Shargo77e3c542023-09-12 11:58:07 -040040/// * The total number of on_next´s signalled by a Publisher to a Subscriber
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000041/// MUST be less than or equal to the total number of elements requested by that
42/// Subscriber´s Subscription at all times.
James Shargo77e3c542023-09-12 11:58:07 -040043/// * A Publisher MAY signal fewer on_next than requested and terminate the
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000044/// Subscription by calling on_complete or on_error.
James Shargo77e3c542023-09-12 11:58:07 -040045/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000046/// MUST be signaled serially.
James Shargo77e3c542023-09-12 11:58:07 -040047/// * If a Publisher fails it MUST signal an on_error.
48/// * If a Publisher terminates successfully (finite stream) it MUST signal an
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000049/// on_complete.
James Shargo77e3c542023-09-12 11:58:07 -040050/// * If a Publisher signals either on_error or on_complete on a Subscriber,
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000051/// that Subscriber’s Subscription MUST be considered cancelled.
James Shargo77e3c542023-09-12 11:58:07 -040052/// * Once a terminal state has been signaled (on_error, on_complete) it is
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000053/// REQUIRED that no further signals occur.
James Shargo77e3c542023-09-12 11:58:07 -040054/// * If a Subscription is cancelled its Subscriber MUST eventually stop being
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000055/// signaled.
James Shargo77e3c542023-09-12 11:58:07 -040056/// * A Publisher MAY support multiple Subscribers and decides whether each
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000057/// Subscription is unicast or multicast.
58pub trait BufferPublisher {
James Shargo83baba52023-09-11 20:17:40 -040059 /// Returns the StreamConfig of buffers that publisher creates.
60 fn get_publisher_stream_config(&self) -> StreamConfig;
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000061 /// This function will create the subscription between the publisher and
62 /// the subscriber.
James Shargo77e3c542023-09-12 11:58:07 -040063 fn subscribe(&mut self, subscriber: impl BufferSubscriber + 'static);
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000064}
65
66/// BufferSubscribers can subscribe to BufferPublishers. They can request Frames
67/// via the BufferSubscription they get from the publisher, then receive Frames
68/// via on_next.
69///
70/// BufferSubcribers are required to adhere to the following, based on the
71/// reactive streams specification:
James Shargo77e3c542023-09-12 11:58:07 -040072/// * The total number of on_next´s signalled by a Publisher to a Subscriber
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000073/// MUST be less than or equal to the total number of elements requested by that
74/// Subscriber´s Subscription at all times.
James Shargo77e3c542023-09-12 11:58:07 -040075/// * A Publisher MAY signal fewer on_next than requested and terminate the
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000076/// Subscription by calling on_complete or on_error.
James Shargo77e3c542023-09-12 11:58:07 -040077/// * on_subscribe, on_next, on_error and on_complete signaled to a Subscriber
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000078/// MUST be signaled serially.
James Shargo77e3c542023-09-12 11:58:07 -040079/// * If a Publisher fails it MUST signal an on_error.
80/// * If a Publisher terminates successfully (finite stream) it MUST signal an
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000081/// on_complete.
James Shargo77e3c542023-09-12 11:58:07 -040082/// * If a Publisher signals either on_error or on_complete on a Subscriber,
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000083/// that Subscriber’s Subscription MUST be considered cancelled.
James Shargo77e3c542023-09-12 11:58:07 -040084/// * Once a terminal state has been signaled (on_error, on_complete) it is
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000085/// REQUIRED that no further signals occur.
James Shargo77e3c542023-09-12 11:58:07 -040086/// * If a Subscription is cancelled its Subscriber MUST eventually stop being
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000087/// signaled.
James Shargo77e3c542023-09-12 11:58:07 -040088/// * Publisher.subscribe MAY be called as many times as wanted but MUST be
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000089/// with a different Subscriber each time.
James Shargo77e3c542023-09-12 11:58:07 -040090/// * A Publisher MAY support multiple Subscribers and decides whether each
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000091/// Subscription is unicast or multicast.
92pub trait BufferSubscriber {
James Shargo83baba52023-09-11 20:17:40 -040093 /// The StreamConfig of buffers that this subscriber expects.
94 fn get_subscriber_stream_config(&self) -> StreamConfig;
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000095 /// This function will be called at the beginning of the subscription.
James Shargo77e3c542023-09-12 11:58:07 -040096 fn on_subscribe(&mut self, subscription: Box<dyn BufferSubscription>);
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000097 /// This function will be called for buffer that comes in.
James Shargo77e3c542023-09-12 11:58:07 -040098 fn on_next(&mut self, frame: Frame);
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +000099 /// This function will be called in case of an error.
James Shargo77e3c542023-09-12 11:58:07 -0400100 fn on_error(&mut self, error: BufferError);
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000101 /// This function will be called on finite streams when done.
James Shargo77e3c542023-09-12 11:58:07 -0400102 fn on_complete(&mut self);
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000103}
104
105/// BufferSubscriptions serve as the bridge between BufferPublishers and
106/// BufferSubscribers. BufferSubscribers receive a BufferSubscription when they
107/// subscribe to a BufferPublisher via on_subscribe.
108/// This object is to be used by the BufferSubscriber to cancel its subscription
109/// or request more buffers.
110///
111/// BufferSubcriptions are required to adhere to the following, based on the
112/// reactive streams specification:
James Shargo77e3c542023-09-12 11:58:07 -0400113/// * Subscription.request and Subscription.cancel MUST only be called inside
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000114/// of its Subscriber context.
James Shargo77e3c542023-09-12 11:58:07 -0400115/// * The Subscription MUST allow the Subscriber to call Subscription.request
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000116/// synchronously from within on_next or on_subscribe.
James Shargo77e3c542023-09-12 11:58:07 -0400117/// * Subscription.request MUST place an upper bound on possible synchronous
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000118/// recursion between Publisher and Subscriber.
James Shargo77e3c542023-09-12 11:58:07 -0400119/// * Subscription.request SHOULD respect the responsivity of its caller by
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000120/// returning in a timely manner.
James Shargo77e3c542023-09-12 11:58:07 -0400121/// * Subscription.cancel MUST respect the responsivity of its caller by
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000122/// returning in a timely manner, MUST be idempotent and MUST be thread-safe.
James Shargo77e3c542023-09-12 11:58:07 -0400123/// * After the Subscription is cancelled, additional
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000124/// Subscription.request(n: u64) MUST be NOPs.
James Shargo77e3c542023-09-12 11:58:07 -0400125/// * After the Subscription is cancelled, additional Subscription.cancel()
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000126/// MUST be NOPs.
James Shargo77e3c542023-09-12 11:58:07 -0400127/// * While the Subscription is not cancelled, Subscription.request(n: u64)
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000128/// MUST register the given number of additional elements to be produced to the
129/// respective subscriber.
James Shargo77e3c542023-09-12 11:58:07 -0400130/// * While the Subscription is not cancelled, Subscription.request(n: u64)
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000131/// MUST signal on_error if the argument is <= 0. The cause message SHOULD
132/// explain that non-positive request signals are illegal.
James Shargo77e3c542023-09-12 11:58:07 -0400133/// * While the Subscription is not cancelled, Subscription.request(n: u64)
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000134/// MAY synchronously call on_next on this (or other) subscriber(s).
James Shargo77e3c542023-09-12 11:58:07 -0400135/// * While the Subscription is not cancelled, Subscription.request(n: u64)
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000136/// MAY synchronously call on_complete or on_error on this (or other)
137/// subscriber(s).
James Shargo77e3c542023-09-12 11:58:07 -0400138/// * While the Subscription is not cancelled, Subscription.cancel() MUST
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000139/// request the Publisher to eventually stop signaling its Subscriber. The
140/// operation is NOT REQUIRED to affect the Subscription immediately.
James Shargo77e3c542023-09-12 11:58:07 -0400141/// * While the Subscription is not cancelled, Subscription.cancel() MUST
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000142/// request the Publisher to eventually drop any references to the corresponding
143/// subscriber.
James Shargo77e3c542023-09-12 11:58:07 -0400144/// * While the Subscription is not cancelled, calling Subscription.cancel MAY
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000145/// cause the Publisher, if stateful, to transition into the shut-down state if
146/// no other Subscription exists at this point.
James Shargo77e3c542023-09-12 11:58:07 -0400147/// * Calling Subscription.cancel MUST return normally.
148/// * Calling Subscription.request MUST return normally.
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000149pub trait BufferSubscription {
150 /// request
151 fn request(&self, n: u64);
152 /// cancel
153 fn cancel(&self);
154}
James Shargo77e3c542023-09-12 11:58:07 -0400155
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000156/// Type used to describe errors produced by subscriptions.
James Shargo77e3c542023-09-12 11:58:07 -0400157pub type BufferError = anyhow::Error;
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000158
159/// Struct used to contain the buffer.
160pub struct Frame {
161 /// A handle to the C buffer interface.
Jim Shargo4ab0fdc2023-09-27 22:51:58 +0000162 pub buffer: HardwareBuffer,
Carlos Martinez Romeroa4004692023-07-20 21:34:15 +0000163 /// The time at which the buffer was dispatched.
164 pub present_time: Instant,
165 /// A fence used for reading/writing safely.
166 pub fence: i32,
167}
James Shargo77e3c542023-09-12 11:58:07 -0400168
169#[cfg(test)]
170mod test {
171 #![allow(warnings, unused)]
172 use super::*;
173
174 use anyhow::anyhow;
175 use std::borrow::BorrowMut;
176 use std::error::Error;
177 use std::ops::Add;
178 use std::sync::Arc;
179 use std::time::Duration;
180
181 use crate::publishers::testing::*;
182 use crate::subscribers::{testing::*, SharedSubscriber};
183
184 const STREAM_CONFIG: StreamConfig = StreamConfig {
185 width: 1,
186 height: 1,
187 layers: 1,
188 format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
189 usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
190 stride: 0,
191 };
192
193 fn make_frame() -> Frame {
194 Frame {
195 buffer: STREAM_CONFIG
196 .create_hardware_buffer()
197 .expect("Unable to create hardware buffer for test"),
198 present_time: Instant::now() + Duration::from_secs(1),
199 fence: 0,
200 }
201 }
202
203 #[test]
204 fn test_test_implementations_next() {
205 let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
206 let mut publisher = TestPublisher::new(STREAM_CONFIG);
207
208 publisher.subscribe(subscriber.clone());
209 assert!(subscriber.map_inner(|s| s.has_subscription()));
210 assert!(publisher.has_subscriber());
211
212 publisher.send_frame(make_frame());
213 let events = subscriber.map_inner_mut(|s| s.take_events());
214 assert!(!matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
215
216 subscriber.map_inner(|s| s.request(1));
217 assert_eq!(publisher.pending_requests(), 1);
218
219 publisher.send_frame(make_frame());
220 let events = subscriber.map_inner_mut(|s| s.take_events());
221 assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Next(_)));
222 assert_eq!(publisher.pending_requests(), 0);
223 }
224
225 #[test]
226 fn test_test_implementations_complete() {
227 let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
228 let mut publisher = TestPublisher::new(STREAM_CONFIG);
229
230 publisher.subscribe(subscriber.clone());
231 assert!(subscriber.map_inner(|s| s.has_subscription()));
232 assert!(publisher.has_subscriber());
233
234 publisher.send_complete();
235 let events = subscriber.map_inner_mut(|s| s.take_events());
236 assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Complete));
237 }
238
239 #[test]
240 fn test_test_implementations_error() {
241 let subscriber = SharedSubscriber::new(TestSubscriber::new(STREAM_CONFIG));
242 let mut publisher = TestPublisher::new(STREAM_CONFIG);
243
244 publisher.subscribe(subscriber.clone());
245 assert!(subscriber.map_inner(|s| s.has_subscription()));
246 assert!(publisher.has_subscriber());
247
248 publisher.send_error(anyhow!("error"));
249 let events = subscriber.map_inner_mut(|s| s.take_events());
250 assert!(matches!(events.last().unwrap(), TestingSubscriberEvent::Error(_)));
251 }
252}