blob: 776490493d9861661dd24e3fcefaa9c57129f18c [file] [log] [blame]
Mikhail Naganov614e4b52022-06-30 21:05:11 +00001/*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000019#include <pthread.h>
Mikhail Naganov614e4b52022-06-30 21:05:11 +000020#include <sched.h>
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000021#include <sys/resource.h>
Mikhail Naganov614e4b52022-06-30 21:05:11 +000022
Mikhail Naganov0c174e92022-07-21 00:21:08 +000023#include <atomic>
Mikhail Naganov614e4b52022-06-30 21:05:11 +000024#include <condition_variable>
25#include <mutex>
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000026#include <string>
Mikhail Naganov614e4b52022-06-30 21:05:11 +000027#include <thread>
28
29#include <android-base/thread_annotations.h>
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000030#include <system/thread_defs.h>
Mikhail Naganov614e4b52022-06-30 21:05:11 +000031
32template <typename Impl>
33class StreamWorker {
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000034 enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
Mikhail Naganov614e4b52022-06-30 21:05:11 +000035
36 public:
37 StreamWorker() = default;
38 ~StreamWorker() { stop(); }
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000039 // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
40 // The nice number is used with the default scheduler. For threads that
41 // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
42 // it is recommended to implement an appropriate configuration sequence within `workerInit`.
43 bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
44 mThreadName = name;
45 mThreadPriority = priority;
Mikhail Naganov614e4b52022-06-30 21:05:11 +000046 mWorker = std::thread(&StreamWorker::workerThread, this);
47 std::unique_lock<std::mutex> lock(mWorkerLock);
48 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
49 mWorkerCv.wait(lock, [&]() {
50 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000051 return mWorkerState == WorkerState::RUNNING || !mError.empty();
Mikhail Naganov614e4b52022-06-30 21:05:11 +000052 });
Mikhail Naganov0c174e92022-07-21 00:21:08 +000053 mWorkerStateChangeRequest = false;
Mikhail Naganov614e4b52022-06-30 21:05:11 +000054 return mWorkerState == WorkerState::RUNNING;
55 }
56 void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
57 void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
58 bool hasError() {
59 std::lock_guard<std::mutex> lock(mWorkerLock);
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000060 return !mError.empty();
61 }
62 std::string getError() {
63 std::lock_guard<std::mutex> lock(mWorkerLock);
64 return mError;
Mikhail Naganov614e4b52022-06-30 21:05:11 +000065 }
66 void stop() {
67 {
68 std::lock_guard<std::mutex> lock(mWorkerLock);
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000069 if (mError.empty()) {
70 if (mWorkerState == WorkerState::STOPPED) return;
71 mWorkerState = WorkerState::STOPPED;
72 mWorkerStateChangeRequest = true;
73 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +000074 }
75 if (mWorker.joinable()) {
76 mWorker.join();
77 }
78 }
79 bool waitForAtLeastOneCycle() {
80 WorkerState newState;
81 switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
82 if (newState != WorkerState::PAUSED) return false;
83 switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
84 return newState == WorkerState::RUNNING;
85 }
Mikhail Naganov0c174e92022-07-21 00:21:08 +000086 // Only used by unit tests.
87 void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
88 lock ? mWorkerLock.lock() : mWorkerLock.unlock();
89 }
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000090 std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); }
Mikhail Naganov614e4b52022-06-30 21:05:11 +000091
92 // Methods that need to be provided by subclasses:
93 //
94 // Called once at the beginning of the thread loop. Must return
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +000095 // an empty string to enter the thread loop, otherwise the thread loop
96 // exits and the worker switches into the 'error' state, setting
97 // the error to the returned value.
98 // std::string workerInit();
Mikhail Naganov614e4b52022-06-30 21:05:11 +000099 //
100 // Called for each thread loop unless the thread is in 'paused' state.
101 // Must return 'true' to continue running, otherwise the thread loop
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000102 // exits and the worker switches into the 'error' state with a generic
103 // error message. It is recommended that the subclass reports any
104 // problems via logging facilities.
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000105 // bool workerCycle();
106
107 private:
108 void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
109 WorkerState* finalState = nullptr) {
110 std::unique_lock<std::mutex> lock(mWorkerLock);
111 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
112 if (mWorkerState != oldState) {
113 if (finalState) *finalState = mWorkerState;
114 return;
115 }
116 mWorkerState = newState;
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000117 mWorkerStateChangeRequest = true;
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000118 mWorkerCv.wait(lock, [&]() {
119 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
120 return mWorkerState != newState;
121 });
122 if (finalState) *finalState = mWorkerState;
123 }
124 void workerThread() {
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000125 std::string error = static_cast<Impl*>(this)->workerInit();
126 if (error.empty() && !mThreadName.empty()) {
127 std::string compliantName(mThreadName.substr(0, 15));
128 if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str());
129 errCode != 0) {
130 error.append("Failed to set thread name: ").append(strerror(errCode));
131 }
132 }
133 if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
134 if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
135 int errCode = errno;
136 error.append("Failed to set thread priority: ").append(strerror(errCode));
137 }
138 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000139 {
140 std::lock_guard<std::mutex> lock(mWorkerLock);
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000141 mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
142 mError = error;
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000143 }
144 mWorkerCv.notify_one();
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000145 if (!error.empty()) return;
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000146
147 for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
148 bool needToNotify = false;
149 if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle()
150 : (sched_yield(), true)) {
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000151 {
152 // See https://developer.android.com/training/articles/smp#nonracing
153 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
154 if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
155 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000156 //
157 // Pause and resume are synchronous. One worker cycle must complete
158 // before the worker indicates a state change. This is how 'mWorkerState' and
159 // 'state' interact:
160 //
161 // mWorkerState == RUNNING
162 // client sets mWorkerState := PAUSE_REQUESTED
163 // last workerCycle gets executed, state := mWorkerState := PAUSED by us
164 // (or the workers enters the 'error' state if workerCycle fails)
165 // client gets notified about state change in any case
166 // thread is doing a busy wait while 'state == PAUSED'
167 // client sets mWorkerState := RESUME_REQUESTED
168 // state := mWorkerState (RESUME_REQUESTED)
169 // mWorkerState := RUNNING, but we don't notify the client yet
170 // first workerCycle gets executed, the code below triggers a client notification
171 // (or if workerCycle fails, worker enters 'error' state and also notifies)
172 // state := mWorkerState (RUNNING)
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000173 std::lock_guard<std::mutex> lock(mWorkerLock);
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000174 if (state == WorkerState::RESUME_REQUESTED) {
175 needToNotify = true;
176 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000177 state = mWorkerState;
178 if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
179 state = mWorkerState = WorkerState::PAUSED;
180 needToNotify = true;
181 } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
182 mWorkerState = WorkerState::RUNNING;
183 }
184 } else {
185 std::lock_guard<std::mutex> lock(mWorkerLock);
186 if (state == WorkerState::RESUME_REQUESTED ||
187 mWorkerState == WorkerState::PAUSE_REQUESTED) {
188 needToNotify = true;
189 }
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000190 state = mWorkerState = WorkerState::STOPPED;
191 mError = "workerCycle failed";
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000192 }
193 if (needToNotify) {
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000194 {
195 std::lock_guard<std::mutex> lock(mWorkerLock);
196 mWorkerStateChangeRequest = false;
197 }
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000198 mWorkerCv.notify_one();
199 }
200 }
201 }
202
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000203 std::string mThreadName;
204 int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000205 std::thread mWorker;
206 std::mutex mWorkerLock;
207 std::condition_variable mWorkerCv;
208 WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
Mikhail Naganov48e2e8f2022-07-22 00:07:03 +0000209 std::string mError GUARDED_BY(mWorkerLock);
Mikhail Naganov0c174e92022-07-21 00:21:08 +0000210 // The atomic lock-free variable is used to prevent priority inversions
211 // that can occur when a high priority worker tries to acquire the lock
212 // which has been taken by a lower priority control thread which in its turn
213 // got preempted. To prevent a PI under normal operating conditions, that is,
214 // when there are no errors or state changes, the worker does not attempt
215 // taking `mWorkerLock` unless `mWorkerStateChangeRequest` is set.
216 // To make sure that updates to `mWorkerState` and `mWorkerStateChangeRequest`
217 // are serialized, they are always made under a lock.
218 static_assert(std::atomic<bool>::is_always_lock_free);
219 std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
Mikhail Naganov614e4b52022-06-30 21:05:11 +0000220};