Keystore 2.0: Add shelf to async tasks.

The Shelf allows async tasks to store state across invocations.

Test: N/A
Change-Id: Ic4cbf7973e0c812479da87431372ed210e9411cf
diff --git a/keystore2/src/async_task.rs b/keystore2/src/async_task.rs
index 6edd760..24431d2 100644
--- a/keystore2/src/async_task.rs
+++ b/keystore2/src/async_task.rs
@@ -19,9 +19,9 @@
 //! processed all tasks before it terminates.
 //! Note that low priority tasks are processed only when the high priority queue is empty.
 
-use std::time::Duration;
+use std::{any::Any, any::TypeId, time::Duration};
 use std::{
-    collections::VecDeque,
+    collections::{HashMap, VecDeque},
     sync::Arc,
     sync::{Condvar, Mutex, MutexGuard},
     thread,
@@ -33,11 +33,54 @@
     Running,
 }
 
+/// The Shelf allows async tasks to store state across invocations.
+/// Note: Store elves at your own peril ;-).
+#[derive(Debug, Default)]
+pub struct Shelf(HashMap<TypeId, Box<dyn Any + Send>>);
+
+impl Shelf {
+    /// Get a reference to the shelved data of type T. Returns Some if the data exists.
+    pub fn get_downcast_ref<T: Any + Send>(&self) -> Option<&T> {
+        self.0.get(&TypeId::of::<T>()).and_then(|v| v.downcast_ref::<T>())
+    }
+
+    /// Get a mutable reference to the shelved data of type T. If a T was inserted using put,
+    /// get_mut, or get_or_put_with.
+    pub fn get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T> {
+        self.0.get_mut(&TypeId::of::<T>()).and_then(|v| v.downcast_mut::<T>())
+    }
+
+    /// Remove the entry of the given type and returns the stored data if it existed.
+    pub fn remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T> {
+        self.0.remove(&TypeId::of::<T>()).and_then(|v| v.downcast::<T>().ok().map(|b| *b))
+    }
+
+    /// Puts data `v` on the shelf. If there already was an entry of type T it is returned.
+    pub fn put<T: Any + Send>(&mut self, v: T) -> Option<T> {
+        self.0
+            .insert(TypeId::of::<T>(), Box::new(v) as Box<dyn Any + Send>)
+            .and_then(|v| v.downcast::<T>().ok().map(|b| *b))
+    }
+
+    /// Gets a mutable reference to the entry of the given type and default creates it if necessary.
+    /// The type must implement Default.
+    pub fn get_mut<T: Any + Send + Default>(&mut self) -> &mut T {
+        self.0
+            .entry(TypeId::of::<T>())
+            .or_insert_with(|| Box::new(T::default()) as Box<dyn Any + Send>)
+            .downcast_mut::<T>()
+            .unwrap()
+    }
+}
+
 struct AsyncTaskState {
     state: State,
     thread: Option<thread::JoinHandle<()>>,
-    hi_prio_req: VecDeque<Box<dyn FnOnce() + Send>>,
-    lo_prio_req: VecDeque<Box<dyn FnOnce() + Send>>,
+    hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
+    lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
+    /// The store allows tasks to store state across invocations. It is passed to each invocation
+    /// of each task. Tasks need to cooperate on the ids they use for storing state.
+    shelf: Option<Shelf>,
 }
 
 /// AsyncTask spawns one worker thread on demand to process jobs inserted into
@@ -56,6 +99,7 @@
                     thread: None,
                     hi_prio_req: VecDeque::new(),
                     lo_prio_req: VecDeque::new(),
+                    shelf: None,
                 }),
             )),
         }
@@ -68,7 +112,7 @@
     /// preempt them.
     pub fn queue_hi<F>(&self, f: F)
     where
-        F: FnOnce() + Send + 'static,
+        F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
     {
         self.queue(f, true)
     }
@@ -79,14 +123,14 @@
     /// priority jobs.
     pub fn queue_lo<F>(&self, f: F)
     where
-        F: FnOnce() + Send + 'static,
+        F: FnOnce(&mut Shelf) + Send + 'static,
     {
         self.queue(f, false)
     }
 
     fn queue<F>(&self, f: F, hi_prio: bool)
     where
-        F: FnOnce() + Send + 'static,
+        F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
     {
         let (ref condvar, ref state) = *self.state;
         let mut state = state.lock().unwrap();
@@ -112,6 +156,8 @@
 
         state.thread = Some(thread::spawn(move || {
             let (ref condvar, ref state) = *cloned_state;
+            // When the worker starts, it takes the shelf and puts it on the stack.
+            let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default();
             loop {
                 if let Some(f) = {
                     let (mut state, timeout) = condvar
@@ -129,13 +175,17 @@
                         (Some(f), _, _) => Some(f),
                         (None, false, _) => state.lo_prio_req.pop_front(),
                         (None, true, true) => {
+                            // When the worker exits it puts the shelf back into the shared
+                            // state for the next worker to use. So state is preserved not
+                            // only across invocations but also across worker thread shut down.
+                            state.shelf = Some(shelf);
                             state.state = State::Exiting;
                             break;
                         }
                         (None, true, false) => None,
                     }
                 } {
-                    f()
+                    f(&mut shelf)
                 }
             }
         }));