blob: 45f0274e5821650b16b5a13138f66eb86ad7602b [file] [log] [blame]
Janis Danisevskis93927dd2020-12-23 12:23:08 -08001// Copyright 2020, The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module implements the handling of async tasks.
16//! The worker thread has a high priority and a low priority queue. Adding a job to either
17//! will cause one thread to be spawned if none exists. As a compromise between performance
18//! and resource consumption, the thread will linger for about 30 seconds after it has
19//! processed all tasks before it terminates.
20//! Note that low priority tasks are processed only when the high priority queue is empty.
21
Janis Danisevskis796db6a2021-05-06 10:53:31 -070022use crate::utils::watchdog as wd;
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080023use std::{any::Any, any::TypeId, time::Duration};
Janis Danisevskis93927dd2020-12-23 12:23:08 -080024use std::{
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080025 collections::{HashMap, VecDeque},
Janis Danisevskis93927dd2020-12-23 12:23:08 -080026 sync::Arc,
27 sync::{Condvar, Mutex, MutexGuard},
28 thread,
29};
30
31#[derive(Debug, PartialEq, Eq)]
32enum State {
33 Exiting,
34 Running,
35}
36
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080037/// The Shelf allows async tasks to store state across invocations.
38/// Note: Store elves at your own peril ;-).
39#[derive(Debug, Default)]
40pub struct Shelf(HashMap<TypeId, Box<dyn Any + Send>>);
41
42impl Shelf {
43 /// Get a reference to the shelved data of type T. Returns Some if the data exists.
44 pub fn get_downcast_ref<T: Any + Send>(&self) -> Option<&T> {
45 self.0.get(&TypeId::of::<T>()).and_then(|v| v.downcast_ref::<T>())
46 }
47
48 /// Get a mutable reference to the shelved data of type T. If a T was inserted using put,
49 /// get_mut, or get_or_put_with.
50 pub fn get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T> {
51 self.0.get_mut(&TypeId::of::<T>()).and_then(|v| v.downcast_mut::<T>())
52 }
53
54 /// Remove the entry of the given type and returns the stored data if it existed.
55 pub fn remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T> {
56 self.0.remove(&TypeId::of::<T>()).and_then(|v| v.downcast::<T>().ok().map(|b| *b))
57 }
58
59 /// Puts data `v` on the shelf. If there already was an entry of type T it is returned.
60 pub fn put<T: Any + Send>(&mut self, v: T) -> Option<T> {
61 self.0
62 .insert(TypeId::of::<T>(), Box::new(v) as Box<dyn Any + Send>)
63 .and_then(|v| v.downcast::<T>().ok().map(|b| *b))
64 }
65
66 /// Gets a mutable reference to the entry of the given type and default creates it if necessary.
67 /// The type must implement Default.
68 pub fn get_mut<T: Any + Send + Default>(&mut self) -> &mut T {
69 self.0
70 .entry(TypeId::of::<T>())
71 .or_insert_with(|| Box::new(T::default()) as Box<dyn Any + Send>)
72 .downcast_mut::<T>()
73 .unwrap()
74 }
Janis Danisevskis7e8b4622021-02-13 10:01:59 -080075
76 /// Gets a mutable reference to the entry of the given type or creates it using the init
77 /// function. Init is not executed if the entry already existed.
78 pub fn get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T
79 where
80 F: FnOnce() -> T,
81 {
82 self.0
83 .entry(TypeId::of::<T>())
84 .or_insert_with(|| Box::new(init()) as Box<dyn Any + Send>)
85 .downcast_mut::<T>()
86 .unwrap()
87 }
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080088}
89
Janis Danisevskis93927dd2020-12-23 12:23:08 -080090struct AsyncTaskState {
91 state: State,
92 thread: Option<thread::JoinHandle<()>>,
David Drysdale0e45a612021-02-25 17:24:36 +000093 timeout: Duration,
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080094 hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
95 lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
David Drysdale0e45a612021-02-25 17:24:36 +000096 idle_fns: Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>,
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080097 /// The store allows tasks to store state across invocations. It is passed to each invocation
98 /// of each task. Tasks need to cooperate on the ids they use for storing state.
99 shelf: Option<Shelf>,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800100}
101
102/// AsyncTask spawns one worker thread on demand to process jobs inserted into
Janis Danisevskis7e8b4622021-02-13 10:01:59 -0800103/// a low and a high priority work queue. The queues are processed FIFO, and low
104/// priority queue is processed if the high priority queue is empty.
105/// Note: Because there is only one worker thread at a time for a given AsyncTask instance,
106/// all scheduled requests are guaranteed to be serialized with respect to one another.
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800107pub struct AsyncTask {
108 state: Arc<(Condvar, Mutex<AsyncTaskState>)>,
109}
110
111impl Default for AsyncTask {
112 fn default() -> Self {
David Drysdale0e45a612021-02-25 17:24:36 +0000113 Self::new(Duration::from_secs(30))
114 }
115}
116
117impl AsyncTask {
118 /// Construct an [`AsyncTask`] with a specific timeout value.
119 pub fn new(timeout: Duration) -> Self {
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800120 Self {
121 state: Arc::new((
122 Condvar::new(),
123 Mutex::new(AsyncTaskState {
124 state: State::Exiting,
125 thread: None,
David Drysdale0e45a612021-02-25 17:24:36 +0000126 timeout,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800127 hi_prio_req: VecDeque::new(),
128 lo_prio_req: VecDeque::new(),
David Drysdale0e45a612021-02-25 17:24:36 +0000129 idle_fns: Vec::new(),
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800130 shelf: None,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800131 }),
132 )),
133 }
134 }
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800135
David Drysdale0e45a612021-02-25 17:24:36 +0000136 /// Adds a one-off job to the high priority queue. High priority jobs are
137 /// completed before low priority jobs and can also overtake low priority
138 /// jobs. But they cannot preempt them.
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800139 pub fn queue_hi<F>(&self, f: F)
140 where
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800141 F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800142 {
143 self.queue(f, true)
144 }
145
David Drysdale0e45a612021-02-25 17:24:36 +0000146 /// Adds a one-off job to the low priority queue. Low priority jobs are
147 /// completed after high priority. And they are not executed as long as high
148 /// priority jobs are present. Jobs always run to completion and are never
149 /// preempted by high priority jobs.
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800150 pub fn queue_lo<F>(&self, f: F)
151 where
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800152 F: FnOnce(&mut Shelf) + Send + 'static,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800153 {
154 self.queue(f, false)
155 }
156
David Drysdale0e45a612021-02-25 17:24:36 +0000157 /// Adds an idle callback. This will be invoked whenever the worker becomes
158 /// idle (all high and low priority jobs have been performed).
159 pub fn add_idle<F>(&self, f: F)
160 where
161 F: Fn(&mut Shelf) + Send + Sync + 'static,
162 {
163 let (ref _condvar, ref state) = *self.state;
164 let mut state = state.lock().unwrap();
165 state.idle_fns.push(Arc::new(f));
166 }
167
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800168 fn queue<F>(&self, f: F, hi_prio: bool)
169 where
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800170 F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800171 {
172 let (ref condvar, ref state) = *self.state;
173 let mut state = state.lock().unwrap();
Janis Danisevskis796db6a2021-05-06 10:53:31 -0700174
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800175 if hi_prio {
176 state.hi_prio_req.push_back(Box::new(f));
177 } else {
178 state.lo_prio_req.push_back(Box::new(f));
179 }
180
181 if state.state != State::Running {
182 self.spawn_thread(&mut state);
183 }
184 drop(state);
185 condvar.notify_all();
186 }
187
188 fn spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>) {
189 if let Some(t) = state.thread.take() {
190 t.join().expect("AsyncTask panicked.");
191 }
192
193 let cloned_state = self.state.clone();
David Drysdale0e45a612021-02-25 17:24:36 +0000194 let timeout_period = state.timeout;
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800195
196 state.thread = Some(thread::spawn(move || {
197 let (ref condvar, ref state) = *cloned_state;
David Drysdale0e45a612021-02-25 17:24:36 +0000198
199 enum Action {
200 QueuedFn(Box<dyn FnOnce(&mut Shelf) + Send>),
201 IdleFns(Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>),
Jeff Vander Stoep46bbc612021-04-09 08:55:21 +0200202 }
David Drysdale0e45a612021-02-25 17:24:36 +0000203 let mut done_idle = false;
204
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800205 // When the worker starts, it takes the shelf and puts it on the stack.
206 let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default();
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800207 loop {
David Drysdale0e45a612021-02-25 17:24:36 +0000208 if let Some(action) = {
209 let state = state.lock().unwrap();
210 if !done_idle && state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() {
211 // No jobs queued so invoke the idle callbacks.
212 Some(Action::IdleFns(state.idle_fns.clone()))
213 } else {
214 // Wait for either a queued job to arrive or a timeout.
215 let (mut state, timeout) = condvar
216 .wait_timeout_while(state, timeout_period, |state| {
217 state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty()
218 })
219 .unwrap();
220 match (
221 state.hi_prio_req.pop_front(),
222 state.lo_prio_req.is_empty(),
223 timeout.timed_out(),
224 ) {
225 (Some(f), _, _) => Some(Action::QueuedFn(f)),
226 (None, false, _) => {
227 state.lo_prio_req.pop_front().map(|f| Action::QueuedFn(f))
228 }
229 (None, true, true) => {
230 // When the worker exits it puts the shelf back into the shared
231 // state for the next worker to use. So state is preserved not
232 // only across invocations but also across worker thread shut down.
233 state.shelf = Some(shelf);
234 state.state = State::Exiting;
235 break;
236 }
237 (None, true, false) => None,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800238 }
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800239 }
240 } {
David Drysdale0e45a612021-02-25 17:24:36 +0000241 // Now that the lock has been dropped, perform the action.
242 match action {
243 Action::QueuedFn(f) => {
Janis Danisevskis796db6a2021-05-06 10:53:31 -0700244 let _wd = wd::watch_millis("async_task thread: calling queued fn", 500);
David Drysdale0e45a612021-02-25 17:24:36 +0000245 f(&mut shelf);
246 done_idle = false;
247 }
248 Action::IdleFns(idle_fns) => {
249 for idle_fn in idle_fns {
Janis Danisevskis796db6a2021-05-06 10:53:31 -0700250 let _wd =
251 wd::watch_millis("async_task thread: calling idle_fn", 500);
David Drysdale0e45a612021-02-25 17:24:36 +0000252 idle_fn(&mut shelf);
253 }
254 done_idle = true;
255 }
256 }
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800257 }
258 }
259 }));
260 state.state = State::Running;
261 }
262}
David Drysdale2d3e5012021-02-23 12:30:27 +0000263
264#[cfg(test)]
265mod tests {
David Drysdalebddc3ae2021-02-23 16:21:46 +0000266 use super::{AsyncTask, Shelf};
David Drysdale0e45a612021-02-25 17:24:36 +0000267 use std::sync::{
268 mpsc::{channel, sync_channel, RecvTimeoutError},
269 Arc,
270 };
271 use std::time::Duration;
David Drysdale2d3e5012021-02-23 12:30:27 +0000272
273 #[test]
274 fn test_shelf() {
275 let mut shelf = Shelf::default();
276
277 let s = "A string".to_string();
278 assert_eq!(shelf.put(s), None);
279
280 let s2 = "Another string".to_string();
281 assert_eq!(shelf.put(s2), Some("A string".to_string()));
282
283 // Put something of a different type on the shelf.
284 #[derive(Debug, PartialEq, Eq)]
285 struct Elf {
286 pub name: String,
287 }
288 let e1 = Elf { name: "Glorfindel".to_string() };
289 assert_eq!(shelf.put(e1), None);
290
291 // The String value is still on the shelf.
292 let s3 = shelf.get_downcast_ref::<String>().unwrap();
293 assert_eq!(s3, "Another string");
294
295 // As is the Elf.
296 {
297 let e2 = shelf.get_downcast_mut::<Elf>().unwrap();
298 assert_eq!(e2.name, "Glorfindel");
299 e2.name = "Celeborn".to_string();
300 }
301
302 // Take the Elf off the shelf.
303 let e3 = shelf.remove_downcast_ref::<Elf>().unwrap();
304 assert_eq!(e3.name, "Celeborn");
305
306 assert_eq!(shelf.remove_downcast_ref::<Elf>(), None);
307
308 // No u64 value has been put on the shelf, so getting one gives the default value.
309 {
310 let i = shelf.get_mut::<u64>();
311 assert_eq!(*i, 0);
312 *i = 42;
313 }
314 let i2 = shelf.get_downcast_ref::<u64>().unwrap();
315 assert_eq!(*i2, 42);
316
317 // No i32 value has ever been seen near the shelf.
318 assert_eq!(shelf.get_downcast_ref::<i32>(), None);
319 assert_eq!(shelf.get_downcast_mut::<i32>(), None);
320 assert_eq!(shelf.remove_downcast_ref::<i32>(), None);
321 }
David Drysdalebddc3ae2021-02-23 16:21:46 +0000322
323 #[test]
324 fn test_async_task() {
325 let at = AsyncTask::default();
326
327 // First queue up a job that blocks until we release it, to avoid
328 // unpredictable synchronization.
329 let (start_sender, start_receiver) = channel();
330 at.queue_hi(move |shelf| {
331 start_receiver.recv().unwrap();
332 // Put a trace vector on the shelf
333 shelf.put(Vec::<String>::new());
334 });
335
336 // Queue up some high-priority and low-priority jobs.
337 for i in 0..3 {
338 let j = i;
339 at.queue_lo(move |shelf| {
340 let trace = shelf.get_mut::<Vec<String>>();
341 trace.push(format!("L{}", j));
342 });
343 let j = i;
344 at.queue_hi(move |shelf| {
345 let trace = shelf.get_mut::<Vec<String>>();
346 trace.push(format!("H{}", j));
347 });
348 }
349
350 // Finally queue up a low priority job that emits the trace.
351 let (trace_sender, trace_receiver) = channel();
352 at.queue_lo(move |shelf| {
353 let trace = shelf.get_downcast_ref::<Vec<String>>().unwrap();
354 trace_sender.send(trace.clone()).unwrap();
355 });
356
357 // Ready, set, go.
358 start_sender.send(()).unwrap();
359 let trace = trace_receiver.recv().unwrap();
360
361 assert_eq!(trace, vec!["H0", "H1", "H2", "L0", "L1", "L2"]);
362 }
363
364 #[test]
David Drysdale0e45a612021-02-25 17:24:36 +0000365 fn test_async_task_chain() {
366 let at = Arc::new(AsyncTask::default());
367 let (sender, receiver) = channel();
368 // Queue up a job that will queue up another job. This confirms
369 // that the job is not invoked with any internal AsyncTask locks held.
370 let at_clone = at.clone();
371 at.queue_hi(move |_shelf| {
372 at_clone.queue_lo(move |_shelf| {
373 sender.send(()).unwrap();
374 });
375 });
376 receiver.recv().unwrap();
377 }
378
379 #[test]
David Drysdalebddc3ae2021-02-23 16:21:46 +0000380 #[should_panic]
381 fn test_async_task_panic() {
382 let at = AsyncTask::default();
383 at.queue_hi(|_shelf| {
384 panic!("Panic from queued job");
385 });
386 // Queue another job afterwards to ensure that the async thread gets joined.
387 let (done_sender, done_receiver) = channel();
388 at.queue_hi(move |_shelf| {
389 done_sender.send(()).unwrap();
390 });
391 done_receiver.recv().unwrap();
392 }
David Drysdale0e45a612021-02-25 17:24:36 +0000393
394 #[test]
395 fn test_async_task_idle() {
396 let at = AsyncTask::new(Duration::from_secs(3));
397 // Need a SyncSender as it is Send+Sync.
398 let (idle_done_sender, idle_done_receiver) = sync_channel::<()>(3);
399 at.add_idle(move |_shelf| {
400 idle_done_sender.send(()).unwrap();
401 });
402
403 // Queue up some high-priority and low-priority jobs that take time.
404 for _i in 0..3 {
405 at.queue_lo(|_shelf| {
406 std::thread::sleep(Duration::from_millis(500));
407 });
408 at.queue_hi(|_shelf| {
409 std::thread::sleep(Duration::from_millis(500));
410 });
411 }
412 // Final low-priority job.
413 let (done_sender, done_receiver) = channel();
414 at.queue_lo(move |_shelf| {
415 done_sender.send(()).unwrap();
416 });
417
418 // Nothing happens until the last job completes.
419 assert_eq!(
420 idle_done_receiver.recv_timeout(Duration::from_secs(1)),
421 Err(RecvTimeoutError::Timeout)
422 );
423 done_receiver.recv().unwrap();
424 idle_done_receiver.recv_timeout(Duration::from_millis(1)).unwrap();
425
426 // Idle callback not executed again even if we wait for a while.
427 assert_eq!(
428 idle_done_receiver.recv_timeout(Duration::from_secs(3)),
429 Err(RecvTimeoutError::Timeout)
430 );
431
432 // However, if more work is done then there's another chance to go idle.
433 let (done_sender, done_receiver) = channel();
434 at.queue_hi(move |_shelf| {
435 std::thread::sleep(Duration::from_millis(500));
436 done_sender.send(()).unwrap();
437 });
438 // Idle callback not immediately executed, because the high priority
439 // job is taking a while.
440 assert_eq!(
441 idle_done_receiver.recv_timeout(Duration::from_millis(1)),
442 Err(RecvTimeoutError::Timeout)
443 );
444 done_receiver.recv().unwrap();
445 idle_done_receiver.recv_timeout(Duration::from_millis(1)).unwrap();
446 }
447
448 #[test]
449 fn test_async_task_multiple_idle() {
450 let at = AsyncTask::new(Duration::from_secs(3));
451 let (idle_sender, idle_receiver) = sync_channel::<i32>(5);
452 // Queue a high priority job to start things off
453 at.queue_hi(|_shelf| {
454 std::thread::sleep(Duration::from_millis(500));
455 });
456
457 // Multiple idle callbacks.
458 for i in 0..3 {
459 let idle_sender = idle_sender.clone();
460 at.add_idle(move |_shelf| {
461 idle_sender.send(i).unwrap();
462 });
463 }
464
465 // Nothing happens immediately.
466 assert_eq!(
467 idle_receiver.recv_timeout(Duration::from_millis(1)),
468 Err(RecvTimeoutError::Timeout)
469 );
470 // Wait for a moment and the idle jobs should have run.
471 std::thread::sleep(Duration::from_secs(1));
472
473 let mut results = Vec::new();
474 while let Ok(i) = idle_receiver.recv_timeout(Duration::from_millis(1)) {
475 results.push(i);
476 }
477 assert_eq!(results, [0, 1, 2]);
478 }
479
480 #[test]
481 fn test_async_task_idle_queues_job() {
482 let at = Arc::new(AsyncTask::new(Duration::from_secs(1)));
483 let at_clone = at.clone();
484 let (idle_sender, idle_receiver) = sync_channel::<i32>(100);
485 // Add an idle callback that queues a low-priority job.
486 at.add_idle(move |shelf| {
487 at_clone.queue_lo(|_shelf| {
488 // Slow things down so the channel doesn't fill up.
489 std::thread::sleep(Duration::from_millis(50));
490 });
491 let i = shelf.get_mut::<i32>();
492 idle_sender.send(*i).unwrap();
493 *i += 1;
494 });
495
496 // Nothing happens immediately.
497 assert_eq!(
498 idle_receiver.recv_timeout(Duration::from_millis(1500)),
499 Err(RecvTimeoutError::Timeout)
500 );
501
502 // Once we queue a normal job, things start.
503 at.queue_hi(|_shelf| {});
504 assert_eq!(0, idle_receiver.recv_timeout(Duration::from_millis(200)).unwrap());
505
506 // The idle callback queues a job, and completion of that job
507 // means the task is going idle again...so the idle callback will
508 // be called repeatedly.
509 assert_eq!(1, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
510 assert_eq!(2, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
511 assert_eq!(3, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
512 }
513
514 #[test]
515 #[should_panic]
516 fn test_async_task_idle_panic() {
517 let at = AsyncTask::new(Duration::from_secs(1));
518 let (idle_sender, idle_receiver) = sync_channel::<()>(3);
519 // Add an idle callback that panics.
520 at.add_idle(move |_shelf| {
521 idle_sender.send(()).unwrap();
522 panic!("Panic from idle callback");
523 });
524 // Queue a job to trigger idleness and ensuing panic.
525 at.queue_hi(|_shelf| {});
526 idle_receiver.recv().unwrap();
527
528 // Queue another job afterwards to ensure that the async thread gets joined
529 // and the panic detected.
530 let (done_sender, done_receiver) = channel();
531 at.queue_hi(move |_shelf| {
532 done_sender.send(()).unwrap();
533 });
534 done_receiver.recv().unwrap();
535 }
David Drysdale2d3e5012021-02-23 12:30:27 +0000536}