Keystore 2.0: add entropy feeder on idle
In AsyncTask, if both the high and low priority job queues are empty
then run any registered idle callbacks once, passing them the shelf
to play with. After this, the idle callbacks will only be called again
when some other job is queued.
Register an idle callback that feeds entropy to all known KeyMint
devices, provided that sufficient time (30s) has elapsed since the
last time entropy was fed.
Bug: 171703867
Test: keystore2_test, subset of CtsKeystoreTestCases with extra logging
Change-Id: Ic21cd1906ee24bb6c050ce17b104d8000c6aed14
diff --git a/keystore2/src/async_task.rs b/keystore2/src/async_task.rs
index 2b36f1f..20a7458 100644
--- a/keystore2/src/async_task.rs
+++ b/keystore2/src/async_task.rs
@@ -89,8 +89,10 @@
struct AsyncTaskState {
state: State,
thread: Option<thread::JoinHandle<()>>,
+ timeout: Duration,
hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
+ idle_fns: Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>,
/// The store allows tasks to store state across invocations. It is passed to each invocation
/// of each task. Tasks need to cooperate on the ids they use for storing state.
shelf: Option<Shelf>,
@@ -107,25 +109,32 @@
impl Default for AsyncTask {
fn default() -> Self {
+ Self::new(Duration::from_secs(30))
+ }
+}
+
+impl AsyncTask {
+ /// Construct an [`AsyncTask`] with a specific timeout value.
+ pub fn new(timeout: Duration) -> Self {
Self {
state: Arc::new((
Condvar::new(),
Mutex::new(AsyncTaskState {
state: State::Exiting,
thread: None,
+ timeout,
hi_prio_req: VecDeque::new(),
lo_prio_req: VecDeque::new(),
+ idle_fns: Vec::new(),
shelf: None,
}),
)),
}
}
-}
-impl AsyncTask {
- /// Adds a job to the high priority queue. High priority jobs are completed before
- /// low priority jobs and can also overtake low priority jobs. But they cannot
- /// preempt them.
+ /// Adds a one-off job to the high priority queue. High priority jobs are
+ /// completed before low priority jobs and can also overtake low priority
+ /// jobs. But they cannot preempt them.
pub fn queue_hi<F>(&self, f: F)
where
F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
@@ -133,10 +142,10 @@
self.queue(f, true)
}
- /// Adds a job to the low priority queue. Low priority jobs are completed after
- /// high priority. And they are not executed as long as high priority jobs are
- /// present. Jobs always run to completion and are never preempted by high
- /// priority jobs.
+ /// Adds a one-off job to the low priority queue. Low priority jobs are
+ /// completed after high priority. And they are not executed as long as high
+ /// priority jobs are present. Jobs always run to completion and are never
+ /// preempted by high priority jobs.
pub fn queue_lo<F>(&self, f: F)
where
F: FnOnce(&mut Shelf) + Send + 'static,
@@ -144,6 +153,17 @@
self.queue(f, false)
}
+ /// Adds an idle callback. This will be invoked whenever the worker becomes
+ /// idle (all high and low priority jobs have been performed).
+ pub fn add_idle<F>(&self, f: F)
+ where
+ F: Fn(&mut Shelf) + Send + Sync + 'static,
+ {
+ let (ref _condvar, ref state) = *self.state;
+ let mut state = state.lock().unwrap();
+ state.idle_fns.push(Arc::new(f));
+ }
+
fn queue<F>(&self, f: F, hi_prio: bool)
where
F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
@@ -169,39 +189,66 @@
}
let cloned_state = self.state.clone();
+ let timeout_period = state.timeout;
state.thread = Some(thread::spawn(move || {
let (ref condvar, ref state) = *cloned_state;
+
+ enum Action {
+ QueuedFn(Box<dyn FnOnce(&mut Shelf) + Send>),
+ IdleFns(Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>),
+ };
+ let mut done_idle = false;
+
// When the worker starts, it takes the shelf and puts it on the stack.
let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default();
loop {
- if let Some(f) = {
- let (mut state, timeout) = condvar
- .wait_timeout_while(
- state.lock().unwrap(),
- Duration::from_secs(30),
- |state| state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty(),
- )
- .unwrap();
- match (
- state.hi_prio_req.pop_front(),
- state.lo_prio_req.is_empty(),
- timeout.timed_out(),
- ) {
- (Some(f), _, _) => Some(f),
- (None, false, _) => state.lo_prio_req.pop_front(),
- (None, true, true) => {
- // When the worker exits it puts the shelf back into the shared
- // state for the next worker to use. So state is preserved not
- // only across invocations but also across worker thread shut down.
- state.shelf = Some(shelf);
- state.state = State::Exiting;
- break;
+ if let Some(action) = {
+ let state = state.lock().unwrap();
+ if !done_idle && state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() {
+ // No jobs queued so invoke the idle callbacks.
+ Some(Action::IdleFns(state.idle_fns.clone()))
+ } else {
+ // Wait for either a queued job to arrive or a timeout.
+ let (mut state, timeout) = condvar
+ .wait_timeout_while(state, timeout_period, |state| {
+ state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty()
+ })
+ .unwrap();
+ match (
+ state.hi_prio_req.pop_front(),
+ state.lo_prio_req.is_empty(),
+ timeout.timed_out(),
+ ) {
+ (Some(f), _, _) => Some(Action::QueuedFn(f)),
+ (None, false, _) => {
+ state.lo_prio_req.pop_front().map(|f| Action::QueuedFn(f))
+ }
+ (None, true, true) => {
+ // When the worker exits it puts the shelf back into the shared
+ // state for the next worker to use. So state is preserved not
+ // only across invocations but also across worker thread shut down.
+ state.shelf = Some(shelf);
+ state.state = State::Exiting;
+ break;
+ }
+ (None, true, false) => None,
}
- (None, true, false) => None,
}
} {
- f(&mut shelf)
+ // Now that the lock has been dropped, perform the action.
+ match action {
+ Action::QueuedFn(f) => {
+ f(&mut shelf);
+ done_idle = false;
+ }
+ Action::IdleFns(idle_fns) => {
+ for idle_fn in idle_fns {
+ idle_fn(&mut shelf);
+ }
+ done_idle = true;
+ }
+ }
}
}
}));
@@ -212,7 +259,11 @@
#[cfg(test)]
mod tests {
use super::{AsyncTask, Shelf};
- use std::sync::mpsc::channel;
+ use std::sync::{
+ mpsc::{channel, sync_channel, RecvTimeoutError},
+ Arc,
+ };
+ use std::time::Duration;
#[test]
fn test_shelf() {
@@ -306,6 +357,21 @@
}
#[test]
+ fn test_async_task_chain() {
+ let at = Arc::new(AsyncTask::default());
+ let (sender, receiver) = channel();
+ // Queue up a job that will queue up another job. This confirms
+ // that the job is not invoked with any internal AsyncTask locks held.
+ let at_clone = at.clone();
+ at.queue_hi(move |_shelf| {
+ at_clone.queue_lo(move |_shelf| {
+ sender.send(()).unwrap();
+ });
+ });
+ receiver.recv().unwrap();
+ }
+
+ #[test]
#[should_panic]
fn test_async_task_panic() {
let at = AsyncTask::default();
@@ -319,4 +385,147 @@
});
done_receiver.recv().unwrap();
}
+
+ #[test]
+ fn test_async_task_idle() {
+ let at = AsyncTask::new(Duration::from_secs(3));
+ // Need a SyncSender as it is Send+Sync.
+ let (idle_done_sender, idle_done_receiver) = sync_channel::<()>(3);
+ at.add_idle(move |_shelf| {
+ idle_done_sender.send(()).unwrap();
+ });
+
+ // Queue up some high-priority and low-priority jobs that take time.
+ for _i in 0..3 {
+ at.queue_lo(|_shelf| {
+ std::thread::sleep(Duration::from_millis(500));
+ });
+ at.queue_hi(|_shelf| {
+ std::thread::sleep(Duration::from_millis(500));
+ });
+ }
+ // Final low-priority job.
+ let (done_sender, done_receiver) = channel();
+ at.queue_lo(move |_shelf| {
+ done_sender.send(()).unwrap();
+ });
+
+ // Nothing happens until the last job completes.
+ assert_eq!(
+ idle_done_receiver.recv_timeout(Duration::from_secs(1)),
+ Err(RecvTimeoutError::Timeout)
+ );
+ done_receiver.recv().unwrap();
+ idle_done_receiver.recv_timeout(Duration::from_millis(1)).unwrap();
+
+ // Idle callback not executed again even if we wait for a while.
+ assert_eq!(
+ idle_done_receiver.recv_timeout(Duration::from_secs(3)),
+ Err(RecvTimeoutError::Timeout)
+ );
+
+ // However, if more work is done then there's another chance to go idle.
+ let (done_sender, done_receiver) = channel();
+ at.queue_hi(move |_shelf| {
+ std::thread::sleep(Duration::from_millis(500));
+ done_sender.send(()).unwrap();
+ });
+ // Idle callback not immediately executed, because the high priority
+ // job is taking a while.
+ assert_eq!(
+ idle_done_receiver.recv_timeout(Duration::from_millis(1)),
+ Err(RecvTimeoutError::Timeout)
+ );
+ done_receiver.recv().unwrap();
+ idle_done_receiver.recv_timeout(Duration::from_millis(1)).unwrap();
+ }
+
+ #[test]
+ fn test_async_task_multiple_idle() {
+ let at = AsyncTask::new(Duration::from_secs(3));
+ let (idle_sender, idle_receiver) = sync_channel::<i32>(5);
+ // Queue a high priority job to start things off
+ at.queue_hi(|_shelf| {
+ std::thread::sleep(Duration::from_millis(500));
+ });
+
+ // Multiple idle callbacks.
+ for i in 0..3 {
+ let idle_sender = idle_sender.clone();
+ at.add_idle(move |_shelf| {
+ idle_sender.send(i).unwrap();
+ });
+ }
+
+ // Nothing happens immediately.
+ assert_eq!(
+ idle_receiver.recv_timeout(Duration::from_millis(1)),
+ Err(RecvTimeoutError::Timeout)
+ );
+ // Wait for a moment and the idle jobs should have run.
+ std::thread::sleep(Duration::from_secs(1));
+
+ let mut results = Vec::new();
+ while let Ok(i) = idle_receiver.recv_timeout(Duration::from_millis(1)) {
+ results.push(i);
+ }
+ assert_eq!(results, [0, 1, 2]);
+ }
+
+ #[test]
+ fn test_async_task_idle_queues_job() {
+ let at = Arc::new(AsyncTask::new(Duration::from_secs(1)));
+ let at_clone = at.clone();
+ let (idle_sender, idle_receiver) = sync_channel::<i32>(100);
+ // Add an idle callback that queues a low-priority job.
+ at.add_idle(move |shelf| {
+ at_clone.queue_lo(|_shelf| {
+ // Slow things down so the channel doesn't fill up.
+ std::thread::sleep(Duration::from_millis(50));
+ });
+ let i = shelf.get_mut::<i32>();
+ idle_sender.send(*i).unwrap();
+ *i += 1;
+ });
+
+ // Nothing happens immediately.
+ assert_eq!(
+ idle_receiver.recv_timeout(Duration::from_millis(1500)),
+ Err(RecvTimeoutError::Timeout)
+ );
+
+ // Once we queue a normal job, things start.
+ at.queue_hi(|_shelf| {});
+ assert_eq!(0, idle_receiver.recv_timeout(Duration::from_millis(200)).unwrap());
+
+ // The idle callback queues a job, and completion of that job
+ // means the task is going idle again...so the idle callback will
+ // be called repeatedly.
+ assert_eq!(1, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
+ assert_eq!(2, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
+ assert_eq!(3, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
+ }
+
+ #[test]
+ #[should_panic]
+ fn test_async_task_idle_panic() {
+ let at = AsyncTask::new(Duration::from_secs(1));
+ let (idle_sender, idle_receiver) = sync_channel::<()>(3);
+ // Add an idle callback that panics.
+ at.add_idle(move |_shelf| {
+ idle_sender.send(()).unwrap();
+ panic!("Panic from idle callback");
+ });
+ // Queue a job to trigger idleness and ensuing panic.
+ at.queue_hi(|_shelf| {});
+ idle_receiver.recv().unwrap();
+
+ // Queue another job afterwards to ensure that the async thread gets joined
+ // and the panic detected.
+ let (done_sender, done_receiver) = channel();
+ at.queue_hi(move |_shelf| {
+ done_sender.send(()).unwrap();
+ });
+ done_receiver.recv().unwrap();
+ }
}