Keystore 2.0: Add shelf to async tasks.
The Shelf allows async tasks to store state across invocations.
Test: N/A
Change-Id: Ic4cbf7973e0c812479da87431372ed210e9411cf
diff --git a/keystore2/src/async_task.rs b/keystore2/src/async_task.rs
index 6edd760..24431d2 100644
--- a/keystore2/src/async_task.rs
+++ b/keystore2/src/async_task.rs
@@ -19,9 +19,9 @@
//! processed all tasks before it terminates.
//! Note that low priority tasks are processed only when the high priority queue is empty.
-use std::time::Duration;
+use std::{any::Any, any::TypeId, time::Duration};
use std::{
- collections::VecDeque,
+ collections::{HashMap, VecDeque},
sync::Arc,
sync::{Condvar, Mutex, MutexGuard},
thread,
@@ -33,11 +33,54 @@
Running,
}
+/// The Shelf allows async tasks to store state across invocations.
+/// Note: Store elves at your own peril ;-).
+#[derive(Debug, Default)]
+pub struct Shelf(HashMap<TypeId, Box<dyn Any + Send>>);
+
+impl Shelf {
+ /// Get a reference to the shelved data of type T. Returns Some if the data exists.
+ pub fn get_downcast_ref<T: Any + Send>(&self) -> Option<&T> {
+ self.0.get(&TypeId::of::<T>()).and_then(|v| v.downcast_ref::<T>())
+ }
+
+ /// Get a mutable reference to the shelved data of type T. If a T was inserted using put,
+ /// get_mut, or get_or_put_with.
+ pub fn get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T> {
+ self.0.get_mut(&TypeId::of::<T>()).and_then(|v| v.downcast_mut::<T>())
+ }
+
+ /// Remove the entry of the given type and returns the stored data if it existed.
+ pub fn remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T> {
+ self.0.remove(&TypeId::of::<T>()).and_then(|v| v.downcast::<T>().ok().map(|b| *b))
+ }
+
+ /// Puts data `v` on the shelf. If there already was an entry of type T it is returned.
+ pub fn put<T: Any + Send>(&mut self, v: T) -> Option<T> {
+ self.0
+ .insert(TypeId::of::<T>(), Box::new(v) as Box<dyn Any + Send>)
+ .and_then(|v| v.downcast::<T>().ok().map(|b| *b))
+ }
+
+ /// Gets a mutable reference to the entry of the given type and default creates it if necessary.
+ /// The type must implement Default.
+ pub fn get_mut<T: Any + Send + Default>(&mut self) -> &mut T {
+ self.0
+ .entry(TypeId::of::<T>())
+ .or_insert_with(|| Box::new(T::default()) as Box<dyn Any + Send>)
+ .downcast_mut::<T>()
+ .unwrap()
+ }
+}
+
struct AsyncTaskState {
state: State,
thread: Option<thread::JoinHandle<()>>,
- hi_prio_req: VecDeque<Box<dyn FnOnce() + Send>>,
- lo_prio_req: VecDeque<Box<dyn FnOnce() + Send>>,
+ hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
+ lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
+ /// The store allows tasks to store state across invocations. It is passed to each invocation
+ /// of each task. Tasks need to cooperate on the ids they use for storing state.
+ shelf: Option<Shelf>,
}
/// AsyncTask spawns one worker thread on demand to process jobs inserted into
@@ -56,6 +99,7 @@
thread: None,
hi_prio_req: VecDeque::new(),
lo_prio_req: VecDeque::new(),
+ shelf: None,
}),
)),
}
@@ -68,7 +112,7 @@
/// preempt them.
pub fn queue_hi<F>(&self, f: F)
where
- F: FnOnce() + Send + 'static,
+ F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
{
self.queue(f, true)
}
@@ -79,14 +123,14 @@
/// priority jobs.
pub fn queue_lo<F>(&self, f: F)
where
- F: FnOnce() + Send + 'static,
+ F: FnOnce(&mut Shelf) + Send + 'static,
{
self.queue(f, false)
}
fn queue<F>(&self, f: F, hi_prio: bool)
where
- F: FnOnce() + Send + 'static,
+ F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
{
let (ref condvar, ref state) = *self.state;
let mut state = state.lock().unwrap();
@@ -112,6 +156,8 @@
state.thread = Some(thread::spawn(move || {
let (ref condvar, ref state) = *cloned_state;
+ // When the worker starts, it takes the shelf and puts it on the stack.
+ let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default();
loop {
if let Some(f) = {
let (mut state, timeout) = condvar
@@ -129,13 +175,17 @@
(Some(f), _, _) => Some(f),
(None, false, _) => state.lo_prio_req.pop_front(),
(None, true, true) => {
+ // When the worker exits it puts the shelf back into the shared
+ // state for the next worker to use. So state is preserved not
+ // only across invocations but also across worker thread shut down.
+ state.shelf = Some(shelf);
state.state = State::Exiting;
break;
}
(None, true, false) => None,
}
} {
- f()
+ f(&mut shelf)
}
}
}));