blob: 20a7458c92c38f351ae087b931540ea124cbfd0b [file] [log] [blame]
Janis Danisevskis93927dd2020-12-23 12:23:08 -08001// Copyright 2020, The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module implements the handling of async tasks.
16//! The worker thread has a high priority and a low priority queue. Adding a job to either
17//! will cause one thread to be spawned if none exists. As a compromise between performance
18//! and resource consumption, the thread will linger for about 30 seconds after it has
19//! processed all tasks before it terminates.
20//! Note that low priority tasks are processed only when the high priority queue is empty.
21
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080022use std::{any::Any, any::TypeId, time::Duration};
Janis Danisevskis93927dd2020-12-23 12:23:08 -080023use std::{
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080024 collections::{HashMap, VecDeque},
Janis Danisevskis93927dd2020-12-23 12:23:08 -080025 sync::Arc,
26 sync::{Condvar, Mutex, MutexGuard},
27 thread,
28};
29
30#[derive(Debug, PartialEq, Eq)]
31enum State {
32 Exiting,
33 Running,
34}
35
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080036/// The Shelf allows async tasks to store state across invocations.
37/// Note: Store elves at your own peril ;-).
38#[derive(Debug, Default)]
39pub struct Shelf(HashMap<TypeId, Box<dyn Any + Send>>);
40
41impl Shelf {
42 /// Get a reference to the shelved data of type T. Returns Some if the data exists.
43 pub fn get_downcast_ref<T: Any + Send>(&self) -> Option<&T> {
44 self.0.get(&TypeId::of::<T>()).and_then(|v| v.downcast_ref::<T>())
45 }
46
47 /// Get a mutable reference to the shelved data of type T. If a T was inserted using put,
48 /// get_mut, or get_or_put_with.
49 pub fn get_downcast_mut<T: Any + Send>(&mut self) -> Option<&mut T> {
50 self.0.get_mut(&TypeId::of::<T>()).and_then(|v| v.downcast_mut::<T>())
51 }
52
53 /// Remove the entry of the given type and returns the stored data if it existed.
54 pub fn remove_downcast_ref<T: Any + Send>(&mut self) -> Option<T> {
55 self.0.remove(&TypeId::of::<T>()).and_then(|v| v.downcast::<T>().ok().map(|b| *b))
56 }
57
58 /// Puts data `v` on the shelf. If there already was an entry of type T it is returned.
59 pub fn put<T: Any + Send>(&mut self, v: T) -> Option<T> {
60 self.0
61 .insert(TypeId::of::<T>(), Box::new(v) as Box<dyn Any + Send>)
62 .and_then(|v| v.downcast::<T>().ok().map(|b| *b))
63 }
64
65 /// Gets a mutable reference to the entry of the given type and default creates it if necessary.
66 /// The type must implement Default.
67 pub fn get_mut<T: Any + Send + Default>(&mut self) -> &mut T {
68 self.0
69 .entry(TypeId::of::<T>())
70 .or_insert_with(|| Box::new(T::default()) as Box<dyn Any + Send>)
71 .downcast_mut::<T>()
72 .unwrap()
73 }
Janis Danisevskis7e8b4622021-02-13 10:01:59 -080074
75 /// Gets a mutable reference to the entry of the given type or creates it using the init
76 /// function. Init is not executed if the entry already existed.
77 pub fn get_or_put_with<T: Any + Send, F>(&mut self, init: F) -> &mut T
78 where
79 F: FnOnce() -> T,
80 {
81 self.0
82 .entry(TypeId::of::<T>())
83 .or_insert_with(|| Box::new(init()) as Box<dyn Any + Send>)
84 .downcast_mut::<T>()
85 .unwrap()
86 }
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080087}
88
Janis Danisevskis93927dd2020-12-23 12:23:08 -080089struct AsyncTaskState {
90 state: State,
91 thread: Option<thread::JoinHandle<()>>,
David Drysdale0e45a612021-02-25 17:24:36 +000092 timeout: Duration,
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080093 hi_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
94 lo_prio_req: VecDeque<Box<dyn FnOnce(&mut Shelf) + Send>>,
David Drysdale0e45a612021-02-25 17:24:36 +000095 idle_fns: Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>,
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -080096 /// The store allows tasks to store state across invocations. It is passed to each invocation
97 /// of each task. Tasks need to cooperate on the ids they use for storing state.
98 shelf: Option<Shelf>,
Janis Danisevskis93927dd2020-12-23 12:23:08 -080099}
100
101/// AsyncTask spawns one worker thread on demand to process jobs inserted into
Janis Danisevskis7e8b4622021-02-13 10:01:59 -0800102/// a low and a high priority work queue. The queues are processed FIFO, and low
103/// priority queue is processed if the high priority queue is empty.
104/// Note: Because there is only one worker thread at a time for a given AsyncTask instance,
105/// all scheduled requests are guaranteed to be serialized with respect to one another.
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800106pub struct AsyncTask {
107 state: Arc<(Condvar, Mutex<AsyncTaskState>)>,
108}
109
110impl Default for AsyncTask {
111 fn default() -> Self {
David Drysdale0e45a612021-02-25 17:24:36 +0000112 Self::new(Duration::from_secs(30))
113 }
114}
115
116impl AsyncTask {
117 /// Construct an [`AsyncTask`] with a specific timeout value.
118 pub fn new(timeout: Duration) -> Self {
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800119 Self {
120 state: Arc::new((
121 Condvar::new(),
122 Mutex::new(AsyncTaskState {
123 state: State::Exiting,
124 thread: None,
David Drysdale0e45a612021-02-25 17:24:36 +0000125 timeout,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800126 hi_prio_req: VecDeque::new(),
127 lo_prio_req: VecDeque::new(),
David Drysdale0e45a612021-02-25 17:24:36 +0000128 idle_fns: Vec::new(),
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800129 shelf: None,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800130 }),
131 )),
132 }
133 }
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800134
David Drysdale0e45a612021-02-25 17:24:36 +0000135 /// Adds a one-off job to the high priority queue. High priority jobs are
136 /// completed before low priority jobs and can also overtake low priority
137 /// jobs. But they cannot preempt them.
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800138 pub fn queue_hi<F>(&self, f: F)
139 where
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800140 F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800141 {
142 self.queue(f, true)
143 }
144
David Drysdale0e45a612021-02-25 17:24:36 +0000145 /// Adds a one-off job to the low priority queue. Low priority jobs are
146 /// completed after high priority. And they are not executed as long as high
147 /// priority jobs are present. Jobs always run to completion and are never
148 /// preempted by high priority jobs.
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800149 pub fn queue_lo<F>(&self, f: F)
150 where
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800151 F: FnOnce(&mut Shelf) + Send + 'static,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800152 {
153 self.queue(f, false)
154 }
155
David Drysdale0e45a612021-02-25 17:24:36 +0000156 /// Adds an idle callback. This will be invoked whenever the worker becomes
157 /// idle (all high and low priority jobs have been performed).
158 pub fn add_idle<F>(&self, f: F)
159 where
160 F: Fn(&mut Shelf) + Send + Sync + 'static,
161 {
162 let (ref _condvar, ref state) = *self.state;
163 let mut state = state.lock().unwrap();
164 state.idle_fns.push(Arc::new(f));
165 }
166
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800167 fn queue<F>(&self, f: F, hi_prio: bool)
168 where
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800169 F: for<'r> FnOnce(&'r mut Shelf) + Send + 'static,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800170 {
171 let (ref condvar, ref state) = *self.state;
172 let mut state = state.lock().unwrap();
173 if hi_prio {
174 state.hi_prio_req.push_back(Box::new(f));
175 } else {
176 state.lo_prio_req.push_back(Box::new(f));
177 }
178
179 if state.state != State::Running {
180 self.spawn_thread(&mut state);
181 }
182 drop(state);
183 condvar.notify_all();
184 }
185
186 fn spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>) {
187 if let Some(t) = state.thread.take() {
188 t.join().expect("AsyncTask panicked.");
189 }
190
191 let cloned_state = self.state.clone();
David Drysdale0e45a612021-02-25 17:24:36 +0000192 let timeout_period = state.timeout;
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800193
194 state.thread = Some(thread::spawn(move || {
195 let (ref condvar, ref state) = *cloned_state;
David Drysdale0e45a612021-02-25 17:24:36 +0000196
197 enum Action {
198 QueuedFn(Box<dyn FnOnce(&mut Shelf) + Send>),
199 IdleFns(Vec<Arc<dyn Fn(&mut Shelf) + Send + Sync>>),
200 };
201 let mut done_idle = false;
202
Janis Danisevskis40f0e6b2021-02-10 15:48:44 -0800203 // When the worker starts, it takes the shelf and puts it on the stack.
204 let mut shelf = state.lock().unwrap().shelf.take().unwrap_or_default();
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800205 loop {
David Drysdale0e45a612021-02-25 17:24:36 +0000206 if let Some(action) = {
207 let state = state.lock().unwrap();
208 if !done_idle && state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty() {
209 // No jobs queued so invoke the idle callbacks.
210 Some(Action::IdleFns(state.idle_fns.clone()))
211 } else {
212 // Wait for either a queued job to arrive or a timeout.
213 let (mut state, timeout) = condvar
214 .wait_timeout_while(state, timeout_period, |state| {
215 state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty()
216 })
217 .unwrap();
218 match (
219 state.hi_prio_req.pop_front(),
220 state.lo_prio_req.is_empty(),
221 timeout.timed_out(),
222 ) {
223 (Some(f), _, _) => Some(Action::QueuedFn(f)),
224 (None, false, _) => {
225 state.lo_prio_req.pop_front().map(|f| Action::QueuedFn(f))
226 }
227 (None, true, true) => {
228 // When the worker exits it puts the shelf back into the shared
229 // state for the next worker to use. So state is preserved not
230 // only across invocations but also across worker thread shut down.
231 state.shelf = Some(shelf);
232 state.state = State::Exiting;
233 break;
234 }
235 (None, true, false) => None,
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800236 }
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800237 }
238 } {
David Drysdale0e45a612021-02-25 17:24:36 +0000239 // Now that the lock has been dropped, perform the action.
240 match action {
241 Action::QueuedFn(f) => {
242 f(&mut shelf);
243 done_idle = false;
244 }
245 Action::IdleFns(idle_fns) => {
246 for idle_fn in idle_fns {
247 idle_fn(&mut shelf);
248 }
249 done_idle = true;
250 }
251 }
Janis Danisevskis93927dd2020-12-23 12:23:08 -0800252 }
253 }
254 }));
255 state.state = State::Running;
256 }
257}
David Drysdale2d3e5012021-02-23 12:30:27 +0000258
259#[cfg(test)]
260mod tests {
David Drysdalebddc3ae2021-02-23 16:21:46 +0000261 use super::{AsyncTask, Shelf};
David Drysdale0e45a612021-02-25 17:24:36 +0000262 use std::sync::{
263 mpsc::{channel, sync_channel, RecvTimeoutError},
264 Arc,
265 };
266 use std::time::Duration;
David Drysdale2d3e5012021-02-23 12:30:27 +0000267
268 #[test]
269 fn test_shelf() {
270 let mut shelf = Shelf::default();
271
272 let s = "A string".to_string();
273 assert_eq!(shelf.put(s), None);
274
275 let s2 = "Another string".to_string();
276 assert_eq!(shelf.put(s2), Some("A string".to_string()));
277
278 // Put something of a different type on the shelf.
279 #[derive(Debug, PartialEq, Eq)]
280 struct Elf {
281 pub name: String,
282 }
283 let e1 = Elf { name: "Glorfindel".to_string() };
284 assert_eq!(shelf.put(e1), None);
285
286 // The String value is still on the shelf.
287 let s3 = shelf.get_downcast_ref::<String>().unwrap();
288 assert_eq!(s3, "Another string");
289
290 // As is the Elf.
291 {
292 let e2 = shelf.get_downcast_mut::<Elf>().unwrap();
293 assert_eq!(e2.name, "Glorfindel");
294 e2.name = "Celeborn".to_string();
295 }
296
297 // Take the Elf off the shelf.
298 let e3 = shelf.remove_downcast_ref::<Elf>().unwrap();
299 assert_eq!(e3.name, "Celeborn");
300
301 assert_eq!(shelf.remove_downcast_ref::<Elf>(), None);
302
303 // No u64 value has been put on the shelf, so getting one gives the default value.
304 {
305 let i = shelf.get_mut::<u64>();
306 assert_eq!(*i, 0);
307 *i = 42;
308 }
309 let i2 = shelf.get_downcast_ref::<u64>().unwrap();
310 assert_eq!(*i2, 42);
311
312 // No i32 value has ever been seen near the shelf.
313 assert_eq!(shelf.get_downcast_ref::<i32>(), None);
314 assert_eq!(shelf.get_downcast_mut::<i32>(), None);
315 assert_eq!(shelf.remove_downcast_ref::<i32>(), None);
316 }
David Drysdalebddc3ae2021-02-23 16:21:46 +0000317
318 #[test]
319 fn test_async_task() {
320 let at = AsyncTask::default();
321
322 // First queue up a job that blocks until we release it, to avoid
323 // unpredictable synchronization.
324 let (start_sender, start_receiver) = channel();
325 at.queue_hi(move |shelf| {
326 start_receiver.recv().unwrap();
327 // Put a trace vector on the shelf
328 shelf.put(Vec::<String>::new());
329 });
330
331 // Queue up some high-priority and low-priority jobs.
332 for i in 0..3 {
333 let j = i;
334 at.queue_lo(move |shelf| {
335 let trace = shelf.get_mut::<Vec<String>>();
336 trace.push(format!("L{}", j));
337 });
338 let j = i;
339 at.queue_hi(move |shelf| {
340 let trace = shelf.get_mut::<Vec<String>>();
341 trace.push(format!("H{}", j));
342 });
343 }
344
345 // Finally queue up a low priority job that emits the trace.
346 let (trace_sender, trace_receiver) = channel();
347 at.queue_lo(move |shelf| {
348 let trace = shelf.get_downcast_ref::<Vec<String>>().unwrap();
349 trace_sender.send(trace.clone()).unwrap();
350 });
351
352 // Ready, set, go.
353 start_sender.send(()).unwrap();
354 let trace = trace_receiver.recv().unwrap();
355
356 assert_eq!(trace, vec!["H0", "H1", "H2", "L0", "L1", "L2"]);
357 }
358
359 #[test]
David Drysdale0e45a612021-02-25 17:24:36 +0000360 fn test_async_task_chain() {
361 let at = Arc::new(AsyncTask::default());
362 let (sender, receiver) = channel();
363 // Queue up a job that will queue up another job. This confirms
364 // that the job is not invoked with any internal AsyncTask locks held.
365 let at_clone = at.clone();
366 at.queue_hi(move |_shelf| {
367 at_clone.queue_lo(move |_shelf| {
368 sender.send(()).unwrap();
369 });
370 });
371 receiver.recv().unwrap();
372 }
373
374 #[test]
David Drysdalebddc3ae2021-02-23 16:21:46 +0000375 #[should_panic]
376 fn test_async_task_panic() {
377 let at = AsyncTask::default();
378 at.queue_hi(|_shelf| {
379 panic!("Panic from queued job");
380 });
381 // Queue another job afterwards to ensure that the async thread gets joined.
382 let (done_sender, done_receiver) = channel();
383 at.queue_hi(move |_shelf| {
384 done_sender.send(()).unwrap();
385 });
386 done_receiver.recv().unwrap();
387 }
David Drysdale0e45a612021-02-25 17:24:36 +0000388
389 #[test]
390 fn test_async_task_idle() {
391 let at = AsyncTask::new(Duration::from_secs(3));
392 // Need a SyncSender as it is Send+Sync.
393 let (idle_done_sender, idle_done_receiver) = sync_channel::<()>(3);
394 at.add_idle(move |_shelf| {
395 idle_done_sender.send(()).unwrap();
396 });
397
398 // Queue up some high-priority and low-priority jobs that take time.
399 for _i in 0..3 {
400 at.queue_lo(|_shelf| {
401 std::thread::sleep(Duration::from_millis(500));
402 });
403 at.queue_hi(|_shelf| {
404 std::thread::sleep(Duration::from_millis(500));
405 });
406 }
407 // Final low-priority job.
408 let (done_sender, done_receiver) = channel();
409 at.queue_lo(move |_shelf| {
410 done_sender.send(()).unwrap();
411 });
412
413 // Nothing happens until the last job completes.
414 assert_eq!(
415 idle_done_receiver.recv_timeout(Duration::from_secs(1)),
416 Err(RecvTimeoutError::Timeout)
417 );
418 done_receiver.recv().unwrap();
419 idle_done_receiver.recv_timeout(Duration::from_millis(1)).unwrap();
420
421 // Idle callback not executed again even if we wait for a while.
422 assert_eq!(
423 idle_done_receiver.recv_timeout(Duration::from_secs(3)),
424 Err(RecvTimeoutError::Timeout)
425 );
426
427 // However, if more work is done then there's another chance to go idle.
428 let (done_sender, done_receiver) = channel();
429 at.queue_hi(move |_shelf| {
430 std::thread::sleep(Duration::from_millis(500));
431 done_sender.send(()).unwrap();
432 });
433 // Idle callback not immediately executed, because the high priority
434 // job is taking a while.
435 assert_eq!(
436 idle_done_receiver.recv_timeout(Duration::from_millis(1)),
437 Err(RecvTimeoutError::Timeout)
438 );
439 done_receiver.recv().unwrap();
440 idle_done_receiver.recv_timeout(Duration::from_millis(1)).unwrap();
441 }
442
443 #[test]
444 fn test_async_task_multiple_idle() {
445 let at = AsyncTask::new(Duration::from_secs(3));
446 let (idle_sender, idle_receiver) = sync_channel::<i32>(5);
447 // Queue a high priority job to start things off
448 at.queue_hi(|_shelf| {
449 std::thread::sleep(Duration::from_millis(500));
450 });
451
452 // Multiple idle callbacks.
453 for i in 0..3 {
454 let idle_sender = idle_sender.clone();
455 at.add_idle(move |_shelf| {
456 idle_sender.send(i).unwrap();
457 });
458 }
459
460 // Nothing happens immediately.
461 assert_eq!(
462 idle_receiver.recv_timeout(Duration::from_millis(1)),
463 Err(RecvTimeoutError::Timeout)
464 );
465 // Wait for a moment and the idle jobs should have run.
466 std::thread::sleep(Duration::from_secs(1));
467
468 let mut results = Vec::new();
469 while let Ok(i) = idle_receiver.recv_timeout(Duration::from_millis(1)) {
470 results.push(i);
471 }
472 assert_eq!(results, [0, 1, 2]);
473 }
474
475 #[test]
476 fn test_async_task_idle_queues_job() {
477 let at = Arc::new(AsyncTask::new(Duration::from_secs(1)));
478 let at_clone = at.clone();
479 let (idle_sender, idle_receiver) = sync_channel::<i32>(100);
480 // Add an idle callback that queues a low-priority job.
481 at.add_idle(move |shelf| {
482 at_clone.queue_lo(|_shelf| {
483 // Slow things down so the channel doesn't fill up.
484 std::thread::sleep(Duration::from_millis(50));
485 });
486 let i = shelf.get_mut::<i32>();
487 idle_sender.send(*i).unwrap();
488 *i += 1;
489 });
490
491 // Nothing happens immediately.
492 assert_eq!(
493 idle_receiver.recv_timeout(Duration::from_millis(1500)),
494 Err(RecvTimeoutError::Timeout)
495 );
496
497 // Once we queue a normal job, things start.
498 at.queue_hi(|_shelf| {});
499 assert_eq!(0, idle_receiver.recv_timeout(Duration::from_millis(200)).unwrap());
500
501 // The idle callback queues a job, and completion of that job
502 // means the task is going idle again...so the idle callback will
503 // be called repeatedly.
504 assert_eq!(1, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
505 assert_eq!(2, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
506 assert_eq!(3, idle_receiver.recv_timeout(Duration::from_millis(100)).unwrap());
507 }
508
509 #[test]
510 #[should_panic]
511 fn test_async_task_idle_panic() {
512 let at = AsyncTask::new(Duration::from_secs(1));
513 let (idle_sender, idle_receiver) = sync_channel::<()>(3);
514 // Add an idle callback that panics.
515 at.add_idle(move |_shelf| {
516 idle_sender.send(()).unwrap();
517 panic!("Panic from idle callback");
518 });
519 // Queue a job to trigger idleness and ensuing panic.
520 at.queue_hi(|_shelf| {});
521 idle_receiver.recv().unwrap();
522
523 // Queue another job afterwards to ensure that the async thread gets joined
524 // and the panic detected.
525 let (done_sender, done_receiver) = channel();
526 at.queue_hi(move |_shelf| {
527 done_sender.send(()).unwrap();
528 });
529 done_receiver.recv().unwrap();
530 }
David Drysdale2d3e5012021-02-23 12:30:27 +0000531}