blob: 8a273dc7f2b9c5d7dc48fd319a6a3f93bdbe902a [file] [log] [blame]
Mikhail Naganov614e4b52022-06-30 21:05:11 +00001/*
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <sched.h>
20
21#include <condition_variable>
22#include <mutex>
23#include <thread>
24
25#include <android-base/thread_annotations.h>
26
27template <typename Impl>
28class StreamWorker {
29 enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED, ERROR };
30
31 public:
32 StreamWorker() = default;
33 ~StreamWorker() { stop(); }
34 bool start() {
35 mWorker = std::thread(&StreamWorker::workerThread, this);
36 std::unique_lock<std::mutex> lock(mWorkerLock);
37 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
38 mWorkerCv.wait(lock, [&]() {
39 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
40 return mWorkerState != WorkerState::STOPPED;
41 });
42 return mWorkerState == WorkerState::RUNNING;
43 }
44 void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
45 void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
46 bool hasError() {
47 std::lock_guard<std::mutex> lock(mWorkerLock);
48 return mWorkerState == WorkerState::ERROR;
49 }
50 void stop() {
51 {
52 std::lock_guard<std::mutex> lock(mWorkerLock);
53 if (mWorkerState == WorkerState::STOPPED) return;
54 mWorkerState = WorkerState::STOPPED;
55 }
56 if (mWorker.joinable()) {
57 mWorker.join();
58 }
59 }
60 bool waitForAtLeastOneCycle() {
61 WorkerState newState;
62 switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
63 if (newState != WorkerState::PAUSED) return false;
64 switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
65 return newState == WorkerState::RUNNING;
66 }
67
68 // Methods that need to be provided by subclasses:
69 //
70 // Called once at the beginning of the thread loop. Must return
71 // 'true' to enter the thread loop, otherwise the thread loop
72 // exits and the worker switches into the 'error' state.
73 // bool workerInit();
74 //
75 // Called for each thread loop unless the thread is in 'paused' state.
76 // Must return 'true' to continue running, otherwise the thread loop
77 // exits and the worker switches into the 'error' state.
78 // bool workerCycle();
79
80 private:
81 void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
82 WorkerState* finalState = nullptr) {
83 std::unique_lock<std::mutex> lock(mWorkerLock);
84 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
85 if (mWorkerState != oldState) {
86 if (finalState) *finalState = mWorkerState;
87 return;
88 }
89 mWorkerState = newState;
90 mWorkerCv.wait(lock, [&]() {
91 android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
92 return mWorkerState != newState;
93 });
94 if (finalState) *finalState = mWorkerState;
95 }
96 void workerThread() {
97 bool success = static_cast<Impl*>(this)->workerInit();
98 {
99 std::lock_guard<std::mutex> lock(mWorkerLock);
100 mWorkerState = success ? WorkerState::RUNNING : WorkerState::ERROR;
101 }
102 mWorkerCv.notify_one();
103 if (!success) return;
104
105 for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
106 bool needToNotify = false;
107 if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle()
108 : (sched_yield(), true)) {
109 //
110 // Pause and resume are synchronous. One worker cycle must complete
111 // before the worker indicates a state change. This is how 'mWorkerState' and
112 // 'state' interact:
113 //
114 // mWorkerState == RUNNING
115 // client sets mWorkerState := PAUSE_REQUESTED
116 // last workerCycle gets executed, state := mWorkerState := PAUSED by us
117 // (or the workers enters the 'error' state if workerCycle fails)
118 // client gets notified about state change in any case
119 // thread is doing a busy wait while 'state == PAUSED'
120 // client sets mWorkerState := RESUME_REQUESTED
121 // state := mWorkerState (RESUME_REQUESTED)
122 // mWorkerState := RUNNING, but we don't notify the client yet
123 // first workerCycle gets executed, the code below triggers a client notification
124 // (or if workerCycle fails, worker enters 'error' state and also notifies)
125 // state := mWorkerState (RUNNING)
126 if (state == WorkerState::RESUME_REQUESTED) {
127 needToNotify = true;
128 }
129 std::lock_guard<std::mutex> lock(mWorkerLock);
130 state = mWorkerState;
131 if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
132 state = mWorkerState = WorkerState::PAUSED;
133 needToNotify = true;
134 } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
135 mWorkerState = WorkerState::RUNNING;
136 }
137 } else {
138 std::lock_guard<std::mutex> lock(mWorkerLock);
139 if (state == WorkerState::RESUME_REQUESTED ||
140 mWorkerState == WorkerState::PAUSE_REQUESTED) {
141 needToNotify = true;
142 }
143 mWorkerState = WorkerState::ERROR;
144 state = WorkerState::STOPPED;
145 }
146 if (needToNotify) {
147 mWorkerCv.notify_one();
148 }
149 }
150 }
151
152 std::thread mWorker;
153 std::mutex mWorkerLock;
154 std::condition_variable mWorkerCv;
155 WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
156};