libbufferstreams: Add Buffer, BufferPool and BufferOwner.

This change adds supoprt for a simple buffer pool. Buffer pools can be
notified when a buffer is dropped, so the buffer can be provided by the
pool again.

We introduced the concept of a BufferOwner, which is generic and can be
implemented by a client for their own custom buffer pools.

Along the way we updated the Frame struct to use a Buffer instead of a
AHardwareBuffer.

Pair: jshargo
Bug: 296450854, 296101127
Test: atest libbufferstreams-internal_test
Change-Id: Ib7c1ba19f96d1deb3d329366aa9215ad89228f9e
diff --git a/libs/bufferstreams/rust/src/buffers/buffer.rs b/libs/bufferstreams/rust/src/buffers/buffer.rs
new file mode 100644
index 0000000..0a8516e
--- /dev/null
+++ b/libs/bufferstreams/rust/src/buffers/buffer.rs
@@ -0,0 +1,80 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Wrapper around the HardwareBuffer
+
+use nativewindow::*;
+
+use super::{buffer_owner::NoBufferOwner, BufferOwner};
+
+/// A wrapper for a hardware buffer.
+///
+/// This buffer may be associated with a buffer pool to which it will be returned to it when dropped.
+pub struct Buffer {
+    buffer_owner: Box<dyn BufferOwner>,
+    hardware_buffer: HardwareBuffer,
+}
+
+impl Buffer {
+    /// Create new buffer with a custom [BufferOwner].
+    pub fn new(buffer_owner: Box<dyn BufferOwner>, hardware_buffer: HardwareBuffer) -> Self {
+        Self { buffer_owner, hardware_buffer }
+    }
+
+    /// Create a new buffer with no association to any buffer pool.
+    pub fn new_unowned(hardware_buffer: HardwareBuffer) -> Self {
+        Self { buffer_owner: Box::new(NoBufferOwner), hardware_buffer }
+    }
+
+    /// Get the id of the underlying buffer.
+    pub fn id(&self) -> u64 {
+        self.hardware_buffer.id()
+    }
+
+    /// Get a reference to the underlying hardware buffer.
+    pub fn buffer(&self) -> &HardwareBuffer {
+        &self.hardware_buffer
+    }
+}
+
+impl Drop for Buffer {
+    fn drop(&mut self) {
+        self.buffer_owner.on_return(self);
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    use crate::StreamConfig;
+
+    const STREAM_CONFIG: StreamConfig = StreamConfig {
+        width: 1,
+        height: 1,
+        layers: 1,
+        format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
+        usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
+        stride: 0,
+    };
+
+    #[test]
+    fn test_get_buffer_id() {
+        let hardware_buffer = STREAM_CONFIG.create_hardware_buffer().unwrap();
+        let buffer_id = hardware_buffer.id();
+
+        let buffer = Buffer::new_unowned(hardware_buffer);
+        assert_eq!(buffer_id, buffer.id());
+    }
+}
diff --git a/libs/bufferstreams/rust/src/buffers/buffer_owner.rs b/libs/bufferstreams/rust/src/buffers/buffer_owner.rs
new file mode 100644
index 0000000..a4abb9d
--- /dev/null
+++ b/libs/bufferstreams/rust/src/buffers/buffer_owner.rs
@@ -0,0 +1,28 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use super::Buffer;
+
+/// Trait that represents an owner of a buffer that might need to handle events such as a buffer
+/// being dropped.
+pub trait BufferOwner {
+    /// Called when a buffer is dropped.
+    fn on_return(&self, buffer: &Buffer);
+}
+
+pub(super) struct NoBufferOwner;
+
+impl BufferOwner for NoBufferOwner {
+    fn on_return(&self, _buffer: &Buffer) {}
+}
diff --git a/libs/bufferstreams/rust/src/buffers/buffer_pool.rs b/libs/bufferstreams/rust/src/buffers/buffer_pool.rs
new file mode 100644
index 0000000..05804e2
--- /dev/null
+++ b/libs/bufferstreams/rust/src/buffers/buffer_pool.rs
@@ -0,0 +1,137 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! A Buffer Pool containing and managing HardwareBuffers
+
+use std::{
+    collections::HashMap,
+    sync::{Arc, Mutex, Weak},
+};
+
+use nativewindow::*;
+
+use crate::StreamConfig;
+
+use super::{Buffer, BufferOwner};
+
+pub(super) struct BufferPoolInner {
+    size: usize,
+    hardware_buffers: HashMap<u64, HardwareBuffer>,
+    available_buffers: Vec<u64>,
+}
+
+impl BufferPoolInner {
+    pub(super) fn return_buffer(&mut self, buffer_id: u64) {
+        assert!(self.hardware_buffers.contains_key(&buffer_id));
+        assert!(!self.available_buffers.contains(&buffer_id));
+
+        self.available_buffers.push(buffer_id);
+    }
+}
+
+struct BufferPoolOwner(Weak<Mutex<BufferPoolInner>>);
+
+impl BufferOwner for BufferPoolOwner {
+    fn on_return(&self, buffer: &Buffer) {
+        if let Some(locked_buffer_pool) = self.0.upgrade() {
+            let mut buffer_pool = locked_buffer_pool.lock().unwrap();
+
+            buffer_pool.return_buffer(buffer.id());
+        }
+    }
+}
+
+/// A thread-safe collection of buffers.
+///
+/// A buffer pool can be of arbitrary size. It creates and then holds references to all buffers
+/// associated with it.
+pub struct BufferPool(Arc<Mutex<BufferPoolInner>>);
+
+impl BufferPool {
+    /// Creates a new buffer pool of size pool_size. All buffers will be created according to
+    /// the stream config.
+    ///
+    /// This constructor creates all buffers at initialization.
+    pub fn new(pool_size: usize, stream_config: StreamConfig) -> Option<Self> {
+        let mut hardware_buffers = HashMap::new();
+        let mut available_buffers = Vec::new();
+        for _ in 0..pool_size {
+            if let Some(buffer) = stream_config.create_hardware_buffer() {
+                available_buffers.push(buffer.id());
+                hardware_buffers.insert(buffer.id(), buffer);
+            } else {
+                return None;
+            }
+        }
+        Some(Self(Arc::new(Mutex::new(BufferPoolInner {
+            size: pool_size,
+            hardware_buffers,
+            available_buffers,
+        }))))
+    }
+
+    /// Try to acquire the next available buffer in the buffer pool.
+    ///
+    /// If all buffers are in use it will return None.
+    pub fn next_buffer(&mut self) -> Option<Buffer> {
+        let mut inner = self.0.lock().unwrap();
+        if let Some(buffer_id) = inner.available_buffers.pop() {
+            Some(Buffer::new(
+                Box::new(BufferPoolOwner(Arc::downgrade(&self.0))),
+                inner.hardware_buffers[&buffer_id].clone(),
+            ))
+        } else {
+            None
+        }
+    }
+
+    /// Gets the size of the buffer pool.
+    pub fn size(&self) -> usize {
+        let inner = self.0.lock().unwrap();
+        inner.size
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    const STREAM_CONFIG: StreamConfig = StreamConfig {
+        width: 1,
+        height: 1,
+        layers: 1,
+        format: AHardwareBuffer_Format::AHARDWAREBUFFER_FORMAT_R8G8B8A8_UNORM,
+        usage: AHardwareBuffer_UsageFlags::AHARDWAREBUFFER_USAGE_CPU_READ_OFTEN,
+        stride: 0,
+    };
+
+    #[test]
+    fn buffer_pool_next_buffer() {
+        let mut buffer_pool = BufferPool::new(1, STREAM_CONFIG).unwrap();
+        let next_buffer = buffer_pool.next_buffer();
+
+        assert!(next_buffer.is_some());
+        assert!(buffer_pool.next_buffer().is_none());
+    }
+
+    #[test]
+    fn drop_buffer_returns_to_pool() {
+        let mut buffer_pool = BufferPool::new(1, STREAM_CONFIG).unwrap();
+        let next_buffer = buffer_pool.next_buffer();
+
+        assert!(next_buffer.is_some());
+        drop(next_buffer);
+        assert!(buffer_pool.next_buffer().is_some());
+    }
+}
diff --git a/libs/bufferstreams/rust/src/buffers/mod.rs b/libs/bufferstreams/rust/src/buffers/mod.rs
new file mode 100644
index 0000000..83360d6
--- /dev/null
+++ b/libs/bufferstreams/rust/src/buffers/mod.rs
@@ -0,0 +1,23 @@
+// Copyright (C) 2023 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Module containing Buffers and BufferPools
+
+mod buffer;
+mod buffer_owner;
+mod buffer_pool;
+
+pub use buffer::*;
+pub use buffer_owner::*;
+pub use buffer_pool::*;
diff --git a/libs/bufferstreams/rust/src/lib.rs b/libs/bufferstreams/rust/src/lib.rs
index 5964281..be1525d 100644
--- a/libs/bufferstreams/rust/src/lib.rs
+++ b/libs/bufferstreams/rust/src/lib.rs
@@ -14,14 +14,15 @@
 
 //! libbufferstreams: Reactive Streams for Graphics Buffers
 
+pub mod buffers;
 pub mod publishers;
 mod stream_config;
 pub mod subscribers;
 pub mod subscriptions;
 
+use buffers::Buffer;
 pub use stream_config::*;
 
-use nativewindow::*;
 use std::time::Instant;
 
 /// This function will print Hello World.
@@ -158,8 +159,8 @@
 
 /// Struct used to contain the buffer.
 pub struct Frame {
-    /// A handle to the C buffer interface.
-    pub buffer: HardwareBuffer,
+    /// A buffer to be used this frame.
+    pub buffer: Buffer,
     /// The time at which the buffer was dispatched.
     pub present_time: Instant,
     /// A fence used for reading/writing safely.
@@ -172,6 +173,8 @@
     use super::*;
 
     use anyhow::anyhow;
+    use buffers::Buffer;
+    use nativewindow::{AHardwareBuffer_Format, AHardwareBuffer_UsageFlags};
     use std::borrow::BorrowMut;
     use std::error::Error;
     use std::ops::Add;
@@ -192,9 +195,11 @@
 
     fn make_frame() -> Frame {
         Frame {
-            buffer: STREAM_CONFIG
-                .create_hardware_buffer()
-                .expect("Unable to create hardware buffer for test"),
+            buffer: Buffer::new_unowned(
+                STREAM_CONFIG
+                    .create_hardware_buffer()
+                    .expect("Unable to create hardware buffer for test"),
+            ),
             present_time: Instant::now() + Duration::from_secs(1),
             fence: 0,
         }