blob: 6548445fd803c281ebd8740c465dc4d59c28d965 [file] [log] [blame]
Janis Danisevskis93927dd2020-12-23 12:23:08 -08001// Copyright 2020, The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module implements the handling of async tasks.
16//! The worker thread has a high priority and a low priority queue. Adding a job to either
17//! will cause one thread to be spawned if none exists. As a compromise between performance
18//! and resource consumption, the thread will linger for about 30 seconds after it has
19//! processed all tasks before it terminates.
20//! Note that low priority tasks are processed only when the high priority queue is empty.
21
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080022use std::{any::Any, any::TypeId, time::Duration};
Janis Danisevskis93927dd2020-12-23 12:23:08 -080023use std::{
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080024 collections::{HashMap, VecDeque},
Janis Danisevskis93927dd2020-12-23 12:23:08 -080025 sync::Arc,
26 sync::{Condvar, Mutex, MutexGuard},
27 thread,
28};
29
30#[derive(Debug, PartialEq, Eq)]
31enum State {
32 Exiting,
33 Running,
34}
35
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080036/// The Shelf allows async tasks to store state across invocations.
37/// Note: Store elves at your own peril ;-).
38#[derive(Debug, Default)]
39pub struct Shelf(HashMap<TypeId, Box<dyn Any + Send>>);
40
41impl Shelf {
42 /// Get a reference to the shelved data of type T. Returns Some if the data exists.
43 pub fn get_downcast_ref<T: Any + Send>(&self) -> Option<&T> {
44 self.0.get(&TypeId::of::<T>()).and_then(|v| v.downcast_ref::<T>())
45 }
46
47 /// Get a mutable reference to the shelved data of type T. If a T was inserted using put,
48 /// get_mut, or get_or_put_with.
49 pub fn get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T> {
50 self.0.get_mut(&TypeId::of::<T>()).and_then(|v| v.downcast_mut::<T>())
51 }
52
53 /// Remove the entry of the given type and returns the stored data if it existed.
54 pub fn remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T> {
55 self.0.remove(&TypeId::of::<T>()).and_then(|v| v.downcast::<T>().ok().map(|b| *b))
56 }
57
58 /// Puts data `v` on the shelf. If there already was an entry of type T it is returned.
59 pub fn put<T: Any + Send>(&mut self, v: T) -> Option<T> {
60 self.0
61 .insert(TypeId::of::<T>(), Box::new(v) as Box<dyn Any + Send>)
62 .and_then(|v| v.downcast::<T>().ok().map(|b| *b))
63 }
64
65 /// Gets a mutable reference to the entry of the given type and default creates it if necessary.
66 /// The type must implement Default.
67 pub fn get_mut<T: Any + Send + Default>(&mut self) -> &mut T {
68 self.0
69 .entry(TypeId::of::<T>())
Charisee03e00842023-01-25 01:41:23 +000070 .or_insert_with(|| Box::<T>::default() as Box<dyn Any + Send>)
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080071 .downcast_mut::<T>()
72 .unwrap()
73 }
Janis Danisevskis7e8b4622021-02-13 10:01:59 -080074
75 /// Gets a mutable reference to the entry of the given type or creates it using the init
76 /// function. Init is not executed if the entry already existed.
77 pub fn get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T
78 where
79 F: FnOnce() -> T,
80 {
81 self.0
82 .entry(TypeId::of::<T>())
83 .or_insert_with(|| Box::new(init()) as Box<dyn Any + Send>)
84 .downcast_mut::<T>()
85 .unwrap()
86 }
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080087}
88
Janis Danisevskis93927dd2020-12-23 12:23:08 -080089struct AsyncTaskState {
90 state: State,
91 thread: Option<thread::JoinHandle<()>>,
David Drysdale0e45a612021-02-25 17:24:36 +000092 timeout: Duration,
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080093 hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
94 lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
David Drysdale0e45a612021-02-25 17:24:36 +000095 idle_fns: Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>,
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080096 /// The store allows tasks to store state across invocations. It is passed to each invocation
97 /// of each task. Tasks need to cooperate on the ids they use for storing state.
98 shelf: Option<Shelf>,
Janis Danisevskis93927dd2020-12-23 12:23:08 -080099}
100
101/// AsyncTask spawns one worker thread on demand to process jobs inserted into
Janis Danisevskis7e8b4622021-02-13 10:01:59 -0800102/// a low and a high priority work queue. The queues are processed FIFO, and low
103/// priority queue is processed if the high priority queue is empty.
104/// Note: Because there is only one worker thread at a time for a given AsyncTask instance,
105/// all scheduled requests are guaranteed to be serialized with respect to one another.
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800106pub struct AsyncTask {
107 state: Arc<(Condvar, Mutex<AsyncTaskState>)>,
108}
109
110impl Default for AsyncTask {
111 fn default() -> Self {
David Drysdale0e45a612021-02-25 17:24:36 +0000112 Self::new(Duration::from_secs(30))
113 }
114}
115
116impl AsyncTask {
117 /// Construct an [`AsyncTask`] with a specific timeout value.
118 pub fn new(timeout: Duration) -> Self {
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800119 Self {
120 state: Arc::new((
121 Condvar::new(),
122 Mutex::new(AsyncTaskState {
123 state: State::Exiting,
124 thread: None,
David Drysdale0e45a612021-02-25 17:24:36 +0000125 timeout,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800126 hi_prio_req: VecDeque::new(),
127 lo_prio_req: VecDeque::new(),
David Drysdale0e45a612021-02-25 17:24:36 +0000128 idle_fns: Vec::new(),
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800129 shelf: None,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800130 }),
131 )),
132 }
133 }
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800134
David Drysdale0e45a612021-02-25 17:24:36 +0000135 /// Adds a one-off job to the high priority queue. High priority jobs are
136 /// completed before low priority jobs and can also overtake low priority
137 /// jobs. But they cannot preempt them.
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800138 pub fn queue_hi<F>(&self, f: F)
139 where
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800140 F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800141 {
142 self.queue(f, true)
143 }
144
David Drysdale0e45a612021-02-25 17:24:36 +0000145 /// Adds a one-off job to the low priority queue. Low priority jobs are
146 /// completed after high priority. And they are not executed as long as high
147 /// priority jobs are present. Jobs always run to completion and are never
148 /// preempted by high priority jobs.
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800149 pub fn queue_lo<F>(&self, f: F)
150 where
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800151 F: FnOnce(&mut Shelf) + Send + 'static,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800152 {
153 self.queue(f, false)
154 }
155
David Drysdale0e45a612021-02-25 17:24:36 +0000156 /// Adds an idle callback. This will be invoked whenever the worker becomes
157 /// idle (all high and low priority jobs have been performed).
158 pub fn add_idle<F>(&self, f: F)
159 where
160 F: Fn(&mut Shelf) + Send + Sync + 'static,
161 {
162 let (ref _condvar, ref state) = *self.state;
163 let mut state = state.lock().unwrap();
164 state.idle_fns.push(Arc::new(f));
165 }
166
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800167 fn queue<F>(&self, f: F, hi_prio: bool)
168 where
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800169 F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800170 {
171 let (ref condvar, ref state) = *self.state;
172 let mut state = state.lock().unwrap();
Janis Danisevskis796db6a2021-05-06 10:53:31 -0700173
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800174 if hi_prio {
175 state.hi_prio_req.push_back(Box::new(f));
176 } else {
177 state.lo_prio_req.push_back(Box::new(f));
178 }
179
180 if state.state != State::Running {
181 self.spawn_thread(&mut state);
182 }
183 drop(state);
184 condvar.notify_all();
185 }
186
187 fn spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>) {
188 if let Some(t) = state.thread.take() {
189 t.join().expect("AsyncTask panicked.");
190 }
191
192 let cloned_state = self.state.clone();
David Drysdale0e45a612021-02-25 17:24:36 +0000193 let timeout_period = state.timeout;
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800194
195 state.thread = Some(thread::spawn(move || {
196 let (ref condvar, ref state) = *cloned_state;
David Drysdale0e45a612021-02-25 17:24:36 +0000197
198 enum Action {
199 QueuedFn(Box<dyn FnOnce(&mut Shelf) + Send>),
200 IdleFns(Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>),
Jeff Vander Stoep46bbc612021-04-09 08:55:21 +0200201 }
David Drysdale0e45a612021-02-25 17:24:36 +0000202 let mut done_idle = false;
203
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800204 // When the worker starts, it takes the shelf and puts it on the stack.
205 let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default();
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800206 loop {
David Drysdale0e45a612021-02-25 17:24:36 +0000207 if let Some(action) = {
208 let state = state.lock().unwrap();
209 if !done_idle && state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() {
210 // No jobs queued so invoke the idle callbacks.
211 Some(Action::IdleFns(state.idle_fns.clone()))
212 } else {
213 // Wait for either a queued job to arrive or a timeout.
214 let (mut state, timeout) = condvar
215 .wait_timeout_while(state, timeout_period, |state| {
216 state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty()
217 })
218 .unwrap();
219 match (
220 state.hi_prio_req.pop_front(),
221 state.lo_prio_req.is_empty(),
222 timeout.timed_out(),
223 ) {
224 (Some(f), _, _) => Some(Action::QueuedFn(f)),
225 (None, false, _) => {
226 state.lo_prio_req.pop_front().map(|f| Action::QueuedFn(f))
227 }
228 (None, true, true) => {
229 // When the worker exits it puts the shelf back into the shared
230 // state for the next worker to use. So state is preserved not
231 // only across invocations but also across worker thread shut down.
232 state.shelf = Some(shelf);
233 state.state = State::Exiting;
234 break;
235 }
236 (None, true, false) => None,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800237 }
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800238 }
239 } {
David Drysdale0e45a612021-02-25 17:24:36 +0000240 // Now that the lock has been dropped, perform the action.
241 match action {
242 Action::QueuedFn(f) => {
243 f(&mut shelf);
244 done_idle = false;
245 }
246 Action::IdleFns(idle_fns) => {
247 for idle_fn in idle_fns {
248 idle_fn(&mut shelf);
249 }
250 done_idle = true;
251 }
252 }
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800253 }
254 }
255 }));
256 state.state = State::Running;
257 }
258}
David Drysdale2d3e5012021-02-23 12:30:27 +0000259
260#[cfg(test)]
261mod tests {
David Drysdalebddc3ae2021-02-23 16:21:46 +0000262 use super::{AsyncTask, Shelf};
David Drysdale0e45a612021-02-25 17:24:36 +0000263 use std::sync::{
264 mpsc::{channel, sync_channel, RecvTimeoutError},
265 Arc,
266 };
267 use std::time::Duration;
David Drysdale2d3e5012021-02-23 12:30:27 +0000268
269 #[test]
270 fn test_shelf() {
271 let mut shelf = Shelf::default();
272
273 let s = "A string".to_string();
274 assert_eq!(shelf.put(s), None);
275
276 let s2 = "Another string".to_string();
277 assert_eq!(shelf.put(s2), Some("A string".to_string()));
278
279 // Put something of a different type on the shelf.
280 #[derive(Debug, PartialEq, Eq)]
281 struct Elf {
282 pub name: String,
283 }
284 let e1 = Elf { name: "Glorfindel".to_string() };
285 assert_eq!(shelf.put(e1), None);
286
287 // The String value is still on the shelf.
288 let s3 = shelf.get_downcast_ref::<String>().unwrap();
289 assert_eq!(s3, "Another string");
290
291 // As is the Elf.
292 {
293 let e2 = shelf.get_downcast_mut::<Elf>().unwrap();
294 assert_eq!(e2.name, "Glorfindel");
295 e2.name = "Celeborn".to_string();
296 }
297
298 // Take the Elf off the shelf.
299 let e3 = shelf.remove_downcast_ref::<Elf>().unwrap();
300 assert_eq!(e3.name, "Celeborn");
301
302 assert_eq!(shelf.remove_downcast_ref::<Elf>(), None);
303
304 // No u64 value has been put on the shelf, so getting one gives the default value.
305 {
306 let i = shelf.get_mut::<u64>();
307 assert_eq!(*i, 0);
308 *i = 42;
309 }
310 let i2 = shelf.get_downcast_ref::<u64>().unwrap();
311 assert_eq!(*i2, 42);
312
313 // No i32 value has ever been seen near the shelf.
314 assert_eq!(shelf.get_downcast_ref::<i32>(), None);
315 assert_eq!(shelf.get_downcast_mut::<i32>(), None);
316 assert_eq!(shelf.remove_downcast_ref::<i32>(), None);
317 }
David Drysdalebddc3ae2021-02-23 16:21:46 +0000318
319 #[test]
320 fn test_async_task() {
321 let at = AsyncTask::default();
322
323 // First queue up a job that blocks until we release it, to avoid
324 // unpredictable synchronization.
325 let (start_sender, start_receiver) = channel();
326 at.queue_hi(move |shelf| {
327 start_receiver.recv().unwrap();
328 // Put a trace vector on the shelf
329 shelf.put(Vec::<String>::new());
330 });
331
332 // Queue up some high-priority and low-priority jobs.
333 for i in 0..3 {
334 let j = i;
335 at.queue_lo(move |shelf| {
336 let trace = shelf.get_mut::<Vec<String>>();
337 trace.push(format!("L{}", j));
338 });
339 let j = i;
340 at.queue_hi(move |shelf| {
341 let trace = shelf.get_mut::<Vec<String>>();
342 trace.push(format!("H{}", j));
343 });
344 }
345
346 // Finally queue up a low priority job that emits the trace.
347 let (trace_sender, trace_receiver) = channel();
348 at.queue_lo(move |shelf| {
349 let trace = shelf.get_downcast_ref::<Vec<String>>().unwrap();
350 trace_sender.send(trace.clone()).unwrap();
351 });
352
353 // Ready, set, go.
354 start_sender.send(()).unwrap();
355 let trace = trace_receiver.recv().unwrap();
356
357 assert_eq!(trace, vec!["H0", "H1", "H2", "L0", "L1", "L2"]);
358 }
359
360 #[test]
David Drysdale0e45a612021-02-25 17:24:36 +0000361 fn test_async_task_chain() {
362 let at = Arc::new(AsyncTask::default());
363 let (sender, receiver) = channel();
364 // Queue up a job that will queue up another job. This confirms
365 // that the job is not invoked with any internal AsyncTask locks held.
366 let at_clone = at.clone();
367 at.queue_hi(move |_shelf| {
368 at_clone.queue_lo(move |_shelf| {
369 sender.send(()).unwrap();
370 });
371 });
372 receiver.recv().unwrap();
373 }
374
375 #[test]
David Drysdalebddc3ae2021-02-23 16:21:46 +0000376 #[should_panic]
377 fn test_async_task_panic() {
378 let at = AsyncTask::default();
379 at.queue_hi(|_shelf| {
380 panic!("Panic from queued job");
381 });
382 // Queue another job afterwards to ensure that the async thread gets joined.
383 let (done_sender, done_receiver) = channel();
384 at.queue_hi(move |_shelf| {
385 done_sender.send(()).unwrap();
386 });
387 done_receiver.recv().unwrap();
388 }
David Drysdale0e45a612021-02-25 17:24:36 +0000389
390 #[test]
391 fn test_async_task_idle() {
392 let at = AsyncTask::new(Duration::from_secs(3));
393 // Need a SyncSender as it is Send+Sync.
394 let (idle_done_sender, idle_done_receiver) = sync_channel::<()>(3);
395 at.add_idle(move |_shelf| {
396 idle_done_sender.send(()).unwrap();
397 });
398
399 // Queue up some high-priority and low-priority jobs that take time.
400 for _i in 0..3 {
401 at.queue_lo(|_shelf| {
402 std::thread::sleep(Duration::from_millis(500));
403 });
404 at.queue_hi(|_shelf| {
405 std::thread::sleep(Duration::from_millis(500));
406 });
407 }
408 // Final low-priority job.
409 let (done_sender, done_receiver) = channel();
410 at.queue_lo(move |_shelf| {
411 done_sender.send(()).unwrap();
412 });
413
414 // Nothing happens until the last job completes.
415 assert_eq!(
416 idle_done_receiver.recv_timeout(Duration::from_secs(1)),
417 Err(RecvTimeoutError::Timeout)
418 );
419 done_receiver.recv().unwrap();
David Drysdaleb9c13672021-08-27 07:50:29 +0100420 // Now that the last low-priority job has completed, the idle task should
421 // fire pretty much immediately.
422 idle_done_receiver.recv_timeout(Duration::from_millis(50)).unwrap();
David Drysdale0e45a612021-02-25 17:24:36 +0000423
424 // Idle callback not executed again even if we wait for a while.
425 assert_eq!(
426 idle_done_receiver.recv_timeout(Duration::from_secs(3)),
427 Err(RecvTimeoutError::Timeout)
428 );
429
430 // However, if more work is done then there's another chance to go idle.
431 let (done_sender, done_receiver) = channel();
432 at.queue_hi(move |_shelf| {
433 std::thread::sleep(Duration::from_millis(500));
434 done_sender.send(()).unwrap();
435 });
436 // Idle callback not immediately executed, because the high priority
437 // job is taking a while.
438 assert_eq!(
439 idle_done_receiver.recv_timeout(Duration::from_millis(1)),
440 Err(RecvTimeoutError::Timeout)
441 );
442 done_receiver.recv().unwrap();
David Drysdaleb9c13672021-08-27 07:50:29 +0100443 idle_done_receiver.recv_timeout(Duration::from_millis(50)).unwrap();
David Drysdale0e45a612021-02-25 17:24:36 +0000444 }
445
446 #[test]
447 fn test_async_task_multiple_idle() {
448 let at = AsyncTask::new(Duration::from_secs(3));
449 let (idle_sender, idle_receiver) = sync_channel::<i32>(5);
450 // Queue a high priority job to start things off
451 at.queue_hi(|_shelf| {
452 std::thread::sleep(Duration::from_millis(500));
453 });
454
455 // Multiple idle callbacks.
456 for i in 0..3 {
457 let idle_sender = idle_sender.clone();
458 at.add_idle(move |_shelf| {
459 idle_sender.send(i).unwrap();
460 });
461 }
462
463 // Nothing happens immediately.
464 assert_eq!(
465 idle_receiver.recv_timeout(Duration::from_millis(1)),
466 Err(RecvTimeoutError::Timeout)
467 );
468 // Wait for a moment and the idle jobs should have run.
469 std::thread::sleep(Duration::from_secs(1));
470
471 let mut results = Vec::new();
472 while let Ok(i) = idle_receiver.recv_timeout(Duration::from_millis(1)) {
473 results.push(i);
474 }
475 assert_eq!(results, [0, 1, 2]);
476 }
477
478 #[test]
479 fn test_async_task_idle_queues_job() {
480 let at = Arc::new(AsyncTask::new(Duration::from_secs(1)));
481 let at_clone = at.clone();
482 let (idle_sender, idle_receiver) = sync_channel::<i32>(100);
483 // Add an idle callback that queues a low-priority job.
484 at.add_idle(move |shelf| {
485 at_clone.queue_lo(|_shelf| {
486 // Slow things down so the channel doesn't fill up.
487 std::thread::sleep(Duration::from_millis(50));
488 });
489 let i = shelf.get_mut::<i32>();
490 idle_sender.send(*i).unwrap();
491 *i += 1;
492 });
493
494 // Nothing happens immediately.
495 assert_eq!(
496 idle_receiver.recv_timeout(Duration::from_millis(1500)),
497 Err(RecvTimeoutError::Timeout)
498 );
499
500 // Once we queue a normal job, things start.
501 at.queue_hi(|_shelf| {});
502 assert_eq!(0, idle_receiver.recv_timeout(Duration::from_millis(200)).unwrap());
503
504 // The idle callback queues a job, and completion of that job
505 // means the task is going idle again...so the idle callback will
506 // be called repeatedly.
507 assert_eq!(1, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
508 assert_eq!(2, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
509 assert_eq!(3, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
510 }
511
512 #[test]
513 #[should_panic]
514 fn test_async_task_idle_panic() {
515 let at = AsyncTask::new(Duration::from_secs(1));
516 let (idle_sender, idle_receiver) = sync_channel::<()>(3);
517 // Add an idle callback that panics.
518 at.add_idle(move |_shelf| {
519 idle_sender.send(()).unwrap();
520 panic!("Panic from idle callback");
521 });
522 // Queue a job to trigger idleness and ensuing panic.
523 at.queue_hi(|_shelf| {});
524 idle_receiver.recv().unwrap();
525
526 // Queue another job afterwards to ensure that the async thread gets joined
527 // and the panic detected.
528 let (done_sender, done_receiver) = channel();
529 at.queue_hi(move |_shelf| {
530 done_sender.send(()).unwrap();
531 });
532 done_receiver.recv().unwrap();
533 }
David Drysdale2d3e5012021-02-23 12:30:27 +0000534}