blob: 0ed522fcb56b7fc92b4dc06cc99d2f3fb88e937f [file] [log] [blame]
Janis Danisevskis7e13aa02021-05-04 14:34:41 -07001// Copyright 2021, The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Can be removed when instrumentations are added to keystore.
16#![allow(dead_code)]
17
18//! This module implements a watchdog thread.
19
20use std::{
21 cmp::min,
22 collections::HashMap,
23 sync::Arc,
24 sync::{Condvar, Mutex, MutexGuard},
25 thread,
26};
27use std::{
28 marker::PhantomData,
29 time::{Duration, Instant},
30};
31
32/// Represents a Watchdog record. It can be created with `Watchdog::watch` or
33/// `Watchdog::watch_with`. It disarms the record when dropped.
34pub struct WatchPoint {
35 id: &'static str,
36 wd: Arc<Watchdog>,
37 not_send: PhantomData<*mut ()>, // WatchPoint must not be Send.
38}
39
40impl Drop for WatchPoint {
41 fn drop(&mut self) {
42 self.wd.disarm(self.id)
43 }
44}
45
46#[derive(Debug, PartialEq, Eq)]
47enum State {
48 NotRunning,
49 Running,
50}
51
52#[derive(Debug, Clone, Hash, PartialEq, Eq)]
53struct Index {
54 tid: thread::ThreadId,
55 id: &'static str,
56}
57
58struct Record {
59 started: Instant,
60 deadline: Instant,
61 callback: Option<Box<dyn Fn() -> String + Send + 'static>>,
62}
63
64struct WatchdogState {
65 state: State,
66 thread: Option<thread::JoinHandle<()>>,
67 timeout: Duration,
68 records: HashMap<Index, Record>,
69 has_overdue: bool,
70}
71
72impl WatchdogState {
73 fn update_overdue_and_find_next_timeout(&mut self) -> Option<Duration> {
74 let now = Instant::now();
75 let mut next_timeout: Option<Duration> = None;
76 self.has_overdue = false;
77 for (_, r) in self.records.iter() {
78 let timeout = r.deadline.saturating_duration_since(now);
79 if timeout == Duration::new(0, 0) {
80 self.has_overdue = true;
81 continue;
82 }
83 next_timeout = match next_timeout {
84 Some(nt) => {
85 if timeout < nt {
86 Some(timeout)
87 } else {
88 Some(nt)
89 }
90 }
91 None => Some(timeout),
92 };
93 }
94 next_timeout
95 }
96
97 fn log_report(&self) -> bool {
98 if !self.has_overdue {
99 return false;
100 }
101 log::warn!("Keystore Watchdog report:");
102 log::warn!("Overdue records:");
103 let now = Instant::now();
104 for (i, r) in self.records.iter() {
105 if r.deadline.saturating_duration_since(now) == Duration::new(0, 0) {
106 match &r.callback {
107 Some(cb) => {
108 log::warn!(
109 "{:?} {} Pending: {:?} Overdue {:?}: {}",
110 i.tid,
111 i.id,
112 r.started.elapsed(),
113 r.deadline.elapsed(),
114 (cb)()
115 );
116 }
117 None => {
118 log::warn!(
119 "{:?} {} Pending: {:?} Overdue {:?}",
120 i.tid,
121 i.id,
122 r.started.elapsed(),
123 r.deadline.elapsed()
124 );
125 }
126 }
127 }
128 }
129 true
130 }
131
132 fn disarm(&mut self, index: Index) {
133 self.records.remove(&index);
134 }
135
136 fn arm(&mut self, index: Index, record: Record) {
137 if self.records.insert(index.clone(), record).is_some() {
138 log::warn!("Recursive watchdog record at \"{:?}\" replaces previous record.", index);
139 }
140 }
141}
142
143/// Watchdog spawns a thread that logs records of all overdue watch points when a deadline
144/// is missed and at least every second as long as overdue watch points exist.
145/// The thread terminates when idle for a given period of time.
146pub struct Watchdog {
147 state: Arc<(Condvar, Mutex<WatchdogState>)>,
148}
149
150impl Watchdog {
151 /// If we have overdue records, we want to be noisy about it and log a report
152 /// at least every `NOISY_REPORT_TIMEOUT` interval.
153 const NOISY_REPORT_TIMEOUT: Duration = Duration::from_secs(1);
154
155 /// Construct a [`Watchdog`]. When `timeout` has elapsed since the watchdog thread became
156 /// idle, i.e., there are no more active or overdue watch points, the watchdog thread
157 /// terminates.
158 pub fn new(timeout: Duration) -> Arc<Self> {
159 Arc::new(Self {
160 state: Arc::new((
161 Condvar::new(),
162 Mutex::new(WatchdogState {
163 state: State::NotRunning,
164 thread: None,
165 timeout,
166 records: HashMap::new(),
167 has_overdue: false,
168 }),
169 )),
170 })
171 }
172
173 fn watch_with_optional(
174 wd: &Arc<Self>,
175 callback: Option<Box<dyn Fn() -> String + Send + 'static>>,
176 id: &'static str,
177 timeout: Duration,
178 ) -> Option<WatchPoint> {
179 let deadline = Instant::now().checked_add(timeout);
180 if deadline.is_none() {
181 log::warn!("Deadline computation failed for WatchPoint \"{}\"", id);
182 log::warn!("WatchPoint not armed.");
183 return None;
184 }
185 wd.arm(callback, id, deadline.unwrap());
186 Some(WatchPoint { id, wd: wd.clone(), not_send: Default::default() })
187 }
188
189 /// Create a new watch point. If the WatchPoint is not dropped before the timeout
190 /// expires, a report is logged at least every second, which includes the id string
191 /// and whatever string the callback returns.
192 pub fn watch_with(
193 wd: &Arc<Self>,
194 id: &'static str,
195 timeout: Duration,
196 callback: impl Fn() -> String + Send + 'static,
197 ) -> Option<WatchPoint> {
198 Self::watch_with_optional(wd, Some(Box::new(callback)), id, timeout)
199 }
200
201 /// Like `watch_with`, but without a callback.
202 pub fn watch(wd: &Arc<Self>, id: &'static str, timeout: Duration) -> Option<WatchPoint> {
203 Self::watch_with_optional(wd, None, id, timeout)
204 }
205
206 fn arm(
207 &self,
208 callback: Option<Box<dyn Fn() -> String + Send + 'static>>,
209 id: &'static str,
210 deadline: Instant,
211 ) {
212 let tid = thread::current().id();
213 let index = Index { tid, id };
214 let record = Record { started: Instant::now(), deadline, callback };
215
216 let (ref condvar, ref state) = *self.state;
217
218 let mut state = state.lock().unwrap();
219 state.arm(index, record);
220
221 if state.state != State::Running {
222 self.spawn_thread(&mut state);
223 }
224 drop(state);
225 condvar.notify_all();
226 }
227
228 fn disarm(&self, id: &'static str) {
229 let tid = thread::current().id();
230 let index = Index { tid, id };
231 let (_, ref state) = *self.state;
232
233 let mut state = state.lock().unwrap();
234 state.disarm(index);
235 // There is no need to notify condvar. There is no action required for the
236 // watchdog thread before the next deadline.
237 }
238
239 fn spawn_thread(&self, state: &mut MutexGuard<WatchdogState>) {
240 if let Some(t) = state.thread.take() {
241 t.join().expect("Watchdog thread panicked.");
242 }
243
244 let cloned_state = self.state.clone();
245
246 state.thread = Some(thread::spawn(move || {
247 let (ref condvar, ref state) = *cloned_state;
248
249 let mut state = state.lock().unwrap();
250
251 loop {
252 let next_timeout = state.update_overdue_and_find_next_timeout();
253 let has_overdue = state.log_report();
254 let (next_timeout, idle) = match (has_overdue, next_timeout) {
255 (true, Some(next_timeout)) => {
256 (min(next_timeout, Self::NOISY_REPORT_TIMEOUT), false)
257 }
258 (false, Some(next_timeout)) => (next_timeout, false),
259 (true, None) => (Self::NOISY_REPORT_TIMEOUT, false),
260 (false, None) => (state.timeout, true),
261 };
262
263 let (s, timeout) = condvar.wait_timeout(state, next_timeout).unwrap();
264 state = s;
265
266 if idle && timeout.timed_out() && state.records.is_empty() {
267 state.state = State::NotRunning;
268 break;
269 }
270 }
Janis Danisevskis2ee014b2021-05-05 14:29:08 -0700271 log::info!("Watchdog thread idle -> terminating. Have a great day.");
Janis Danisevskis7e13aa02021-05-04 14:34:41 -0700272 }));
273 state.state = State::Running;
274 }
275}
276
277#[cfg(test)]
278mod tests {
279
280 use super::*;
281 use std::sync::atomic;
282 use std::thread;
283 use std::time::Duration;
284
285 #[test]
286 fn test_watchdog() {
287 android_logger::init_once(
288 android_logger::Config::default()
289 .with_tag("keystore2_watchdog_tests")
290 .with_min_level(log::Level::Debug),
291 );
292
293 let wd = Watchdog::new(Watchdog::NOISY_REPORT_TIMEOUT.checked_mul(3).unwrap());
294 let hit_count = Arc::new(atomic::AtomicU8::new(0));
295 let hit_count_clone = hit_count.clone();
296 let wp =
297 Watchdog::watch_with(&wd, "test_watchdog", Duration::from_millis(100), move || {
298 format!("hit_count: {}", hit_count_clone.fetch_add(1, atomic::Ordering::Relaxed))
299 });
300 assert_eq!(0, hit_count.load(atomic::Ordering::Relaxed));
301 thread::sleep(Duration::from_millis(500));
302 assert_eq!(1, hit_count.load(atomic::Ordering::Relaxed));
303 thread::sleep(Watchdog::NOISY_REPORT_TIMEOUT);
304 assert_eq!(2, hit_count.load(atomic::Ordering::Relaxed));
305 drop(wp);
306 thread::sleep(Watchdog::NOISY_REPORT_TIMEOUT.checked_mul(4).unwrap());
307 assert_eq!(2, hit_count.load(atomic::Ordering::Relaxed));
308 let (_, ref state) = *wd.state;
309 let state = state.lock().unwrap();
310 assert_eq!(state.state, State::NotRunning);
311 }
312}