blob: 6edd7607203ff4e437bf0b77a186de478bf37f09 [file] [log] [blame]
Janis Danisevskis93927dd2020-12-23 12:23:08 -08001// Copyright 2020, The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module implements the handling of async tasks.
16//! The worker thread has a high priority and a low priority queue. Adding a job to either
17//! will cause one thread to be spawned if none exists. As a compromise between performance
18//! and resource consumption, the thread will linger for about 30 seconds after it has
19//! processed all tasks before it terminates.
20//! Note that low priority tasks are processed only when the high priority queue is empty.
21
22use std::time::Duration;
23use std::{
24 collections::VecDeque,
25 sync::Arc,
26 sync::{Condvar, Mutex, MutexGuard},
27 thread,
28};
29
30#[derive(Debug, PartialEq, Eq)]
31enum State {
32 Exiting,
33 Running,
34}
35
36struct AsyncTaskState {
37 state: State,
38 thread: Option<thread::JoinHandle<()>>,
39 hi_prio_req: VecDeque<Box<dyn FnOnce() + Send>>,
40 lo_prio_req: VecDeque<Box<dyn FnOnce() + Send>>,
41}
42
43/// AsyncTask spawns one worker thread on demand to process jobs inserted into
44/// a low and a high priority work queue.
45pub struct AsyncTask {
46 state: Arc<(Condvar, Mutex<AsyncTaskState>)>,
47}
48
49impl Default for AsyncTask {
50 fn default() -> Self {
51 Self {
52 state: Arc::new((
53 Condvar::new(),
54 Mutex::new(AsyncTaskState {
55 state: State::Exiting,
56 thread: None,
57 hi_prio_req: VecDeque::new(),
58 lo_prio_req: VecDeque::new(),
59 }),
60 )),
61 }
62 }
63}
64
65impl AsyncTask {
66 /// Adds a job to the high priority queue. High priority jobs are completed before
67 /// low priority jobs and can also overtake low priority jobs. But they cannot
68 /// preempt them.
69 pub fn queue_hi<F>(&self, f: F)
70 where
71 F: FnOnce() + Send + 'static,
72 {
73 self.queue(f, true)
74 }
75
76 /// Adds a job to the low priority queue. Low priority jobs are completed after
77 /// high priority. And they are not executed as long as high priority jobs are
78 /// present. Jobs always run to completion and are never preempted by high
79 /// priority jobs.
80 pub fn queue_lo<F>(&self, f: F)
81 where
82 F: FnOnce() + Send + 'static,
83 {
84 self.queue(f, false)
85 }
86
87 fn queue<F>(&self, f: F, hi_prio: bool)
88 where
89 F: FnOnce() + Send + 'static,
90 {
91 let (ref condvar, ref state) = *self.state;
92 let mut state = state.lock().unwrap();
93 if hi_prio {
94 state.hi_prio_req.push_back(Box::new(f));
95 } else {
96 state.lo_prio_req.push_back(Box::new(f));
97 }
98
99 if state.state != State::Running {
100 self.spawn_thread(&mut state);
101 }
102 drop(state);
103 condvar.notify_all();
104 }
105
106 fn spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>) {
107 if let Some(t) = state.thread.take() {
108 t.join().expect("AsyncTask panicked.");
109 }
110
111 let cloned_state = self.state.clone();
112
113 state.thread = Some(thread::spawn(move || {
114 let (ref condvar, ref state) = *cloned_state;
115 loop {
116 if let Some(f) = {
117 let (mut state, timeout) = condvar
118 .wait_timeout_while(
119 state.lock().unwrap(),
120 Duration::from_secs(30),
121 |state| state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty(),
122 )
123 .unwrap();
124 match (
125 state.hi_prio_req.pop_front(),
126 state.lo_prio_req.is_empty(),
127 timeout.timed_out(),
128 ) {
129 (Some(f), _, _) => Some(f),
130 (None, false, _) => state.lo_prio_req.pop_front(),
131 (None, true, true) => {
132 state.state = State::Exiting;
133 break;
134 }
135 (None, true, false) => None,
136 }
137 } {
138 f()
139 }
140 }
141 }));
142 state.state = State::Running;
143 }
144}