blob: 74e99df4cd5c22c7d6a3412293f37e00e00c1c67 [file] [log] [blame]
Mikhail Naganov614e4b52022-06-30 21:05:11 +00001/*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <sched.h>
20
Mikhail Naganov0c174e92022-07-21 00:21:08 +000021#include <atomic>
Mikhail Naganov614e4b52022-06-30 21:05:11 +000022#include <condition_variable>
23#include <mutex>
24#include <thread>
25
26#include <android-base/thread_annotations.h>
27
28template <typename Impl>
29class StreamWorker {
30 enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED, ERROR };
31
32 public:
33 StreamWorker() = default;
34 ~StreamWorker() { stop(); }
35 bool start() {
36 mWorker = std::thread(&StreamWorker::workerThread, this);
37 std::unique_lock<std::mutex> lock(mWorkerLock);
38 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
39 mWorkerCv.wait(lock, [&]() {
40 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
41 return mWorkerState != WorkerState::STOPPED;
42 });
Mikhail Naganov0c174e92022-07-21 00:21:08 +000043 mWorkerStateChangeRequest = false;
Mikhail Naganov614e4b52022-06-30 21:05:11 +000044 return mWorkerState == WorkerState::RUNNING;
45 }
46 void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
47 void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
48 bool hasError() {
49 std::lock_guard<std::mutex> lock(mWorkerLock);
50 return mWorkerState == WorkerState::ERROR;
51 }
52 void stop() {
53 {
54 std::lock_guard<std::mutex> lock(mWorkerLock);
55 if (mWorkerState == WorkerState::STOPPED) return;
56 mWorkerState = WorkerState::STOPPED;
Mikhail Naganov0c174e92022-07-21 00:21:08 +000057 mWorkerStateChangeRequest = true;
Mikhail Naganov614e4b52022-06-30 21:05:11 +000058 }
59 if (mWorker.joinable()) {
60 mWorker.join();
61 }
62 }
63 bool waitForAtLeastOneCycle() {
64 WorkerState newState;
65 switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
66 if (newState != WorkerState::PAUSED) return false;
67 switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
68 return newState == WorkerState::RUNNING;
69 }
Mikhail Naganov0c174e92022-07-21 00:21:08 +000070 // Only used by unit tests.
71 void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
72 lock ? mWorkerLock.lock() : mWorkerLock.unlock();
73 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +000074
75 // Methods that need to be provided by subclasses:
76 //
77 // Called once at the beginning of the thread loop. Must return
78 // 'true' to enter the thread loop, otherwise the thread loop
79 // exits and the worker switches into the 'error' state.
80 // bool workerInit();
81 //
82 // Called for each thread loop unless the thread is in 'paused' state.
83 // Must return 'true' to continue running, otherwise the thread loop
84 // exits and the worker switches into the 'error' state.
85 // bool workerCycle();
86
87 private:
88 void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
89 WorkerState* finalState = nullptr) {
90 std::unique_lock<std::mutex> lock(mWorkerLock);
91 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
92 if (mWorkerState != oldState) {
93 if (finalState) *finalState = mWorkerState;
94 return;
95 }
96 mWorkerState = newState;
Mikhail Naganov0c174e92022-07-21 00:21:08 +000097 mWorkerStateChangeRequest = true;
Mikhail Naganov614e4b52022-06-30 21:05:11 +000098 mWorkerCv.wait(lock, [&]() {
99 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
100 return mWorkerState != newState;
101 });
102 if (finalState) *finalState = mWorkerState;
103 }
104 void workerThread() {
105 bool success = static_cast<Impl*>(this)->workerInit();
106 {
107 std::lock_guard<std::mutex> lock(mWorkerLock);
108 mWorkerState = success ? WorkerState::RUNNING : WorkerState::ERROR;
109 }
110 mWorkerCv.notify_one();
111 if (!success) return;
112
113 for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
114 bool needToNotify = false;
115 if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle()
116 : (sched_yield(), true)) {
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000117 {
118 // See https://developer.android.com/training/articles/smp#nonracing
119 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
120 if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
121 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000122 //
123 // Pause and resume are synchronous. One worker cycle must complete
124 // before the worker indicates a state change. This is how 'mWorkerState' and
125 // 'state' interact:
126 //
127 // mWorkerState == RUNNING
128 // client sets mWorkerState := PAUSE_REQUESTED
129 // last workerCycle gets executed, state := mWorkerState := PAUSED by us
130 // (or the workers enters the 'error' state if workerCycle fails)
131 // client gets notified about state change in any case
132 // thread is doing a busy wait while 'state == PAUSED'
133 // client sets mWorkerState := RESUME_REQUESTED
134 // state := mWorkerState (RESUME_REQUESTED)
135 // mWorkerState := RUNNING, but we don't notify the client yet
136 // first workerCycle gets executed, the code below triggers a client notification
137 // (or if workerCycle fails, worker enters 'error' state and also notifies)
138 // state := mWorkerState (RUNNING)
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000139 std::lock_guard<std::mutex> lock(mWorkerLock);
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000140 if (state == WorkerState::RESUME_REQUESTED) {
141 needToNotify = true;
142 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000143 state = mWorkerState;
144 if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
145 state = mWorkerState = WorkerState::PAUSED;
146 needToNotify = true;
147 } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
148 mWorkerState = WorkerState::RUNNING;
149 }
150 } else {
151 std::lock_guard<std::mutex> lock(mWorkerLock);
152 if (state == WorkerState::RESUME_REQUESTED ||
153 mWorkerState == WorkerState::PAUSE_REQUESTED) {
154 needToNotify = true;
155 }
156 mWorkerState = WorkerState::ERROR;
157 state = WorkerState::STOPPED;
158 }
159 if (needToNotify) {
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000160 {
161 std::lock_guard<std::mutex> lock(mWorkerLock);
162 mWorkerStateChangeRequest = false;
163 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000164 mWorkerCv.notify_one();
165 }
166 }
167 }
168
169 std::thread mWorker;
170 std::mutex mWorkerLock;
171 std::condition_variable mWorkerCv;
172 WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000173 // The atomic lock-free variable is used to prevent priority inversions
174 // that can occur when a high priority worker tries to acquire the lock
175 // which has been taken by a lower priority control thread which in its turn
176 // got preempted. To prevent a PI under normal operating conditions, that is,
177 // when there are no errors or state changes, the worker does not attempt
178 // taking `mWorkerLock` unless `mWorkerStateChangeRequest` is set.
179 // To make sure that updates to `mWorkerState` and `mWorkerStateChangeRequest`
180 // are serialized, they are always made under a lock.
181 static_assert(std::atomic<bool>::is_always_lock_free);
182 std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000183};