blob: 03685fcf29aa3e5869c6506f2ad42ec7bfac7450 [file] [log] [blame]
Mikhail Naganov614e4b52022-06-30 21:05:11 +00001/*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000019#include <pthread.h>
Mikhail Naganov614e4b52022-06-30 21:05:11 +000020#include <sched.h>
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000021#include <sys/resource.h>
Mikhail Naganov614e4b52022-06-30 21:05:11 +000022
Mikhail Naganov0c174e92022-07-21 00:21:08 +000023#include <atomic>
Mikhail Naganov614e4b52022-06-30 21:05:11 +000024#include <condition_variable>
25#include <mutex>
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000026#include <string>
Mikhail Naganov614e4b52022-06-30 21:05:11 +000027#include <thread>
28
29#include <android-base/thread_annotations.h>
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000030#include <system/thread_defs.h>
Mikhail Naganov614e4b52022-06-30 21:05:11 +000031
Mikhail Naganov48d31152022-07-30 00:10:52 +000032namespace android::hardware::audio::common {
33
Mikhail Naganov614e4b52022-06-30 21:05:11 +000034template <typename Impl>
35class StreamWorker {
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000036 enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
Mikhail Naganov614e4b52022-06-30 21:05:11 +000037
38 public:
Mikhail Naganov48d31152022-07-30 00:10:52 +000039 enum class WorkerStatus { ABORT, CONTINUE, EXIT };
40
Mikhail Naganov614e4b52022-06-30 21:05:11 +000041 StreamWorker() = default;
42 ~StreamWorker() { stop(); }
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000043 // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
44 // The nice number is used with the default scheduler. For threads that
45 // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
46 // it is recommended to implement an appropriate configuration sequence within `workerInit`.
47 bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
48 mThreadName = name;
49 mThreadPriority = priority;
Mikhail Naganov614e4b52022-06-30 21:05:11 +000050 mWorker = std::thread(&StreamWorker::workerThread, this);
51 std::unique_lock<std::mutex> lock(mWorkerLock);
52 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
53 mWorkerCv.wait(lock, [&]() {
54 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000055 return mWorkerState == WorkerState::RUNNING || !mError.empty();
Mikhail Naganov614e4b52022-06-30 21:05:11 +000056 });
Mikhail Naganov0c174e92022-07-21 00:21:08 +000057 mWorkerStateChangeRequest = false;
Mikhail Naganov614e4b52022-06-30 21:05:11 +000058 return mWorkerState == WorkerState::RUNNING;
59 }
60 void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
61 void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
62 bool hasError() {
63 std::lock_guard<std::mutex> lock(mWorkerLock);
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000064 return !mError.empty();
65 }
66 std::string getError() {
67 std::lock_guard<std::mutex> lock(mWorkerLock);
68 return mError;
Mikhail Naganov614e4b52022-06-30 21:05:11 +000069 }
70 void stop() {
71 {
72 std::lock_guard<std::mutex> lock(mWorkerLock);
Mikhail Naganov48d31152022-07-30 00:10:52 +000073 if (mWorkerState != WorkerState::STOPPED) {
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000074 mWorkerState = WorkerState::STOPPED;
75 mWorkerStateChangeRequest = true;
76 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +000077 }
78 if (mWorker.joinable()) {
79 mWorker.join();
80 }
81 }
82 bool waitForAtLeastOneCycle() {
83 WorkerState newState;
84 switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
85 if (newState != WorkerState::PAUSED) return false;
86 switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
87 return newState == WorkerState::RUNNING;
88 }
Mikhail Naganov0c174e92022-07-21 00:21:08 +000089 // Only used by unit tests.
90 void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
91 lock ? mWorkerLock.lock() : mWorkerLock.unlock();
92 }
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000093 std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); }
Mikhail Naganov614e4b52022-06-30 21:05:11 +000094
95 // Methods that need to be provided by subclasses:
96 //
Mikhail Naganov48d31152022-07-30 00:10:52 +000097 // /* Called once at the beginning of the thread loop. Must return
98 // * an empty string to enter the thread loop, otherwise the thread loop
99 // * exits and the worker switches into the 'error' state, setting
100 // * the error to the returned value.
101 // */
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000102 // std::string workerInit();
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000103 //
Mikhail Naganov48d31152022-07-30 00:10:52 +0000104 // /* Called for each thread loop unless the thread is in 'paused' state.
105 // * Must return 'CONTINUE' to continue running, otherwise the thread loop
106 // * exits. If the result from worker cycle is 'ABORT' then the worker switches
107 // * into the 'error' state with a generic error message. It is recommended that
108 // * the subclass reports any problems via logging facilities. Returning the 'EXIT'
109 // * status is equivalent to calling 'stop()' method. This is just a way of
110 // * of stopping the worker by its own initiative.
111 // */
112 // WorkerStatus workerCycle();
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000113
114 private:
115 void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
116 WorkerState* finalState = nullptr) {
117 std::unique_lock<std::mutex> lock(mWorkerLock);
118 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
119 if (mWorkerState != oldState) {
120 if (finalState) *finalState = mWorkerState;
121 return;
122 }
123 mWorkerState = newState;
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000124 mWorkerStateChangeRequest = true;
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000125 mWorkerCv.wait(lock, [&]() {
126 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
127 return mWorkerState != newState;
128 });
129 if (finalState) *finalState = mWorkerState;
130 }
131 void workerThread() {
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000132 std::string error = static_cast<Impl*>(this)->workerInit();
133 if (error.empty() && !mThreadName.empty()) {
134 std::string compliantName(mThreadName.substr(0, 15));
135 if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str());
136 errCode != 0) {
137 error.append("Failed to set thread name: ").append(strerror(errCode));
138 }
139 }
140 if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
141 if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
142 int errCode = errno;
143 error.append("Failed to set thread priority: ").append(strerror(errCode));
144 }
145 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000146 {
147 std::lock_guard<std::mutex> lock(mWorkerLock);
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000148 mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
149 mError = error;
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000150 }
151 mWorkerCv.notify_one();
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000152 if (!error.empty()) return;
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000153
154 for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
155 bool needToNotify = false;
Mikhail Naganov48d31152022-07-30 00:10:52 +0000156 if (WorkerStatus status = state != WorkerState::PAUSED
157 ? static_cast<Impl*>(this)->workerCycle()
158 : (sched_yield(), WorkerStatus::CONTINUE);
159 status == WorkerStatus::CONTINUE) {
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000160 {
161 // See https://developer.android.com/training/articles/smp#nonracing
162 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
163 if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
164 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000165 //
166 // Pause and resume are synchronous. One worker cycle must complete
167 // before the worker indicates a state change. This is how 'mWorkerState' and
168 // 'state' interact:
169 //
170 // mWorkerState == RUNNING
171 // client sets mWorkerState := PAUSE_REQUESTED
172 // last workerCycle gets executed, state := mWorkerState := PAUSED by us
173 // (or the workers enters the 'error' state if workerCycle fails)
174 // client gets notified about state change in any case
175 // thread is doing a busy wait while 'state == PAUSED'
176 // client sets mWorkerState := RESUME_REQUESTED
177 // state := mWorkerState (RESUME_REQUESTED)
178 // mWorkerState := RUNNING, but we don't notify the client yet
179 // first workerCycle gets executed, the code below triggers a client notification
180 // (or if workerCycle fails, worker enters 'error' state and also notifies)
181 // state := mWorkerState (RUNNING)
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000182 std::lock_guard<std::mutex> lock(mWorkerLock);
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000183 if (state == WorkerState::RESUME_REQUESTED) {
184 needToNotify = true;
185 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000186 state = mWorkerState;
187 if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
188 state = mWorkerState = WorkerState::PAUSED;
189 needToNotify = true;
190 } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
191 mWorkerState = WorkerState::RUNNING;
192 }
193 } else {
194 std::lock_guard<std::mutex> lock(mWorkerLock);
195 if (state == WorkerState::RESUME_REQUESTED ||
196 mWorkerState == WorkerState::PAUSE_REQUESTED) {
197 needToNotify = true;
198 }
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000199 state = mWorkerState = WorkerState::STOPPED;
Mikhail Naganov48d31152022-07-30 00:10:52 +0000200 if (status == WorkerStatus::ABORT) {
201 mError = "workerCycle aborted";
202 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000203 }
204 if (needToNotify) {
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000205 {
206 std::lock_guard<std::mutex> lock(mWorkerLock);
207 mWorkerStateChangeRequest = false;
208 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000209 mWorkerCv.notify_one();
210 }
211 }
212 }
213
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000214 std::string mThreadName;
215 int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000216 std::thread mWorker;
217 std::mutex mWorkerLock;
218 std::condition_variable mWorkerCv;
219 WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000220 std::string mError GUARDED_BY(mWorkerLock);
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000221 // The atomic lock-free variable is used to prevent priority inversions
222 // that can occur when a high priority worker tries to acquire the lock
223 // which has been taken by a lower priority control thread which in its turn
224 // got preempted. To prevent a PI under normal operating conditions, that is,
225 // when there are no errors or state changes, the worker does not attempt
226 // taking `mWorkerLock` unless `mWorkerStateChangeRequest` is set.
227 // To make sure that updates to `mWorkerState` and `mWorkerStateChangeRequest`
228 // are serialized, they are always made under a lock.
229 static_assert(std::atomic<bool>::is_always_lock_free);
230 std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000231};
Mikhail Naganov48d31152022-07-30 00:10:52 +0000232
233} // namespace android::hardware::audio::common