audio: Fix the lifetime of the StreamWorker's logic part am: 0b9c5feed1 am: 4b279d6a32 am: 231ca12ce8 am: 63989892e3
Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/2199345
Change-Id: I90e8237e5427fd114a4c661437971906cff0219d
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/audio/aidl/common/Android.bp b/audio/aidl/common/Android.bp
index 37da9d6..f2d8fc2 100644
--- a/audio/aidl/common/Android.bp
+++ b/audio/aidl/common/Android.bp
@@ -23,7 +23,7 @@
default_applicable_licenses: ["hardware_interfaces_license"],
}
-cc_library_headers {
+cc_library {
name: "libaudioaidlcommon",
host_supported: true,
vendor_available: true,
@@ -36,13 +36,16 @@
"libbase_headers",
"libsystem_headers",
],
+ srcs: [
+ "StreamWorker.cpp",
+ ],
}
cc_test {
name: "libaudioaidlcommon_test",
host_supported: true,
vendor_available: true,
- header_libs: [
+ static_libs: [
"libaudioaidlcommon",
],
shared_libs: [
diff --git a/audio/aidl/common/StreamWorker.cpp b/audio/aidl/common/StreamWorker.cpp
new file mode 100644
index 0000000..9bca760
--- /dev/null
+++ b/audio/aidl/common/StreamWorker.cpp
@@ -0,0 +1,160 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <pthread.h>
+#include <sched.h>
+#include <sys/resource.h>
+
+#include "include/StreamWorker.h"
+
+namespace android::hardware::audio::common::internal {
+
+bool ThreadController::start(const std::string& name, int priority) {
+ mThreadName = name;
+ mThreadPriority = priority;
+ mWorker = std::thread(&ThreadController::workerThread, this);
+ std::unique_lock<std::mutex> lock(mWorkerLock);
+ android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+ mWorkerCv.wait(lock, [&]() {
+ android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+ return mWorkerState == WorkerState::RUNNING || !mError.empty();
+ });
+ mWorkerStateChangeRequest = false;
+ return mWorkerState == WorkerState::RUNNING;
+}
+
+void ThreadController::stop() {
+ {
+ std::lock_guard<std::mutex> lock(mWorkerLock);
+ if (mWorkerState != WorkerState::STOPPED) {
+ mWorkerState = WorkerState::STOPPED;
+ mWorkerStateChangeRequest = true;
+ }
+ }
+ if (mWorker.joinable()) {
+ mWorker.join();
+ }
+}
+
+bool ThreadController::waitForAtLeastOneCycle() {
+ WorkerState newState;
+ switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
+ if (newState != WorkerState::PAUSED) return false;
+ switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
+ return newState == WorkerState::RUNNING;
+}
+
+void ThreadController::switchWorkerStateSync(WorkerState oldState, WorkerState newState,
+ WorkerState* finalState) {
+ std::unique_lock<std::mutex> lock(mWorkerLock);
+ android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+ if (mWorkerState != oldState) {
+ if (finalState) *finalState = mWorkerState;
+ return;
+ }
+ mWorkerState = newState;
+ mWorkerStateChangeRequest = true;
+ mWorkerCv.wait(lock, [&]() {
+ android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+ return mWorkerState != newState;
+ });
+ if (finalState) *finalState = mWorkerState;
+}
+
+void ThreadController::workerThread() {
+ using Status = StreamLogic::Status;
+
+ std::string error = mLogic->init();
+ if (error.empty() && !mThreadName.empty()) {
+ std::string compliantName(mThreadName.substr(0, 15));
+ if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); errCode != 0) {
+ error.append("Failed to set thread name: ").append(strerror(errCode));
+ }
+ }
+ if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
+ if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
+ int errCode = errno;
+ error.append("Failed to set thread priority: ").append(strerror(errCode));
+ }
+ }
+ {
+ std::lock_guard<std::mutex> lock(mWorkerLock);
+ mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
+ mError = error;
+ }
+ mWorkerCv.notify_one();
+ if (!error.empty()) return;
+
+ for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
+ bool needToNotify = false;
+ if (Status status = state != WorkerState::PAUSED ? mLogic->cycle()
+ : (sched_yield(), Status::CONTINUE);
+ status == Status::CONTINUE) {
+ {
+ // See https://developer.android.com/training/articles/smp#nonracing
+ android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+ if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
+ }
+ //
+ // Pause and resume are synchronous. One worker cycle must complete
+ // before the worker indicates a state change. This is how 'mWorkerState' and
+ // 'state' interact:
+ //
+ // mWorkerState == RUNNING
+ // client sets mWorkerState := PAUSE_REQUESTED
+ // last workerCycle gets executed, state := mWorkerState := PAUSED by us
+ // (or the workers enters the 'error' state if workerCycle fails)
+ // client gets notified about state change in any case
+ // thread is doing a busy wait while 'state == PAUSED'
+ // client sets mWorkerState := RESUME_REQUESTED
+ // state := mWorkerState (RESUME_REQUESTED)
+ // mWorkerState := RUNNING, but we don't notify the client yet
+ // first workerCycle gets executed, the code below triggers a client notification
+ // (or if workerCycle fails, worker enters 'error' state and also notifies)
+ // state := mWorkerState (RUNNING)
+ std::lock_guard<std::mutex> lock(mWorkerLock);
+ if (state == WorkerState::RESUME_REQUESTED) {
+ needToNotify = true;
+ }
+ state = mWorkerState;
+ if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
+ state = mWorkerState = WorkerState::PAUSED;
+ needToNotify = true;
+ } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
+ mWorkerState = WorkerState::RUNNING;
+ }
+ } else {
+ std::lock_guard<std::mutex> lock(mWorkerLock);
+ if (state == WorkerState::RESUME_REQUESTED ||
+ mWorkerState == WorkerState::PAUSE_REQUESTED) {
+ needToNotify = true;
+ }
+ state = mWorkerState = WorkerState::STOPPED;
+ if (status == Status::ABORT) {
+ mError = "Received ABORT from the logic cycle";
+ }
+ }
+ if (needToNotify) {
+ {
+ std::lock_guard<std::mutex> lock(mWorkerLock);
+ mWorkerStateChangeRequest = false;
+ }
+ mWorkerCv.notify_one();
+ }
+ }
+}
+
+} // namespace android::hardware::audio::common::internal
diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h
index 03685fc..6260eca 100644
--- a/audio/aidl/common/include/StreamWorker.h
+++ b/audio/aidl/common/include/StreamWorker.h
@@ -16,10 +16,6 @@
#pragma once
-#include <pthread.h>
-#include <sched.h>
-#include <sys/resource.h>
-
#include <atomic>
#include <condition_variable>
#include <mutex>
@@ -31,32 +27,18 @@
namespace android::hardware::audio::common {
-template <typename Impl>
-class StreamWorker {
+class StreamLogic;
+
+namespace internal {
+
+class ThreadController {
enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
public:
- enum class WorkerStatus { ABORT, CONTINUE, EXIT };
+ explicit ThreadController(StreamLogic* logic) : mLogic(logic) {}
+ ~ThreadController() { stop(); }
- StreamWorker() = default;
- ~StreamWorker() { stop(); }
- // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
- // The nice number is used with the default scheduler. For threads that
- // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
- // it is recommended to implement an appropriate configuration sequence within `workerInit`.
- bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
- mThreadName = name;
- mThreadPriority = priority;
- mWorker = std::thread(&StreamWorker::workerThread, this);
- std::unique_lock<std::mutex> lock(mWorkerLock);
- android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
- mWorkerCv.wait(lock, [&]() {
- android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
- return mWorkerState == WorkerState::RUNNING || !mError.empty();
- });
- mWorkerStateChangeRequest = false;
- return mWorkerState == WorkerState::RUNNING;
- }
+ bool start(const std::string& name, int priority);
void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
bool hasError() {
@@ -67,150 +49,21 @@
std::lock_guard<std::mutex> lock(mWorkerLock);
return mError;
}
- void stop() {
- {
- std::lock_guard<std::mutex> lock(mWorkerLock);
- if (mWorkerState != WorkerState::STOPPED) {
- mWorkerState = WorkerState::STOPPED;
- mWorkerStateChangeRequest = true;
- }
- }
- if (mWorker.joinable()) {
- mWorker.join();
- }
- }
- bool waitForAtLeastOneCycle() {
- WorkerState newState;
- switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
- if (newState != WorkerState::PAUSED) return false;
- switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
- return newState == WorkerState::RUNNING;
- }
+ void stop();
+ bool waitForAtLeastOneCycle();
+
// Only used by unit tests.
- void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
+ void lockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
lock ? mWorkerLock.lock() : mWorkerLock.unlock();
}
- std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); }
-
- // Methods that need to be provided by subclasses:
- //
- // /* Called once at the beginning of the thread loop. Must return
- // * an empty string to enter the thread loop, otherwise the thread loop
- // * exits and the worker switches into the 'error' state, setting
- // * the error to the returned value.
- // */
- // std::string workerInit();
- //
- // /* Called for each thread loop unless the thread is in 'paused' state.
- // * Must return 'CONTINUE' to continue running, otherwise the thread loop
- // * exits. If the result from worker cycle is 'ABORT' then the worker switches
- // * into the 'error' state with a generic error message. It is recommended that
- // * the subclass reports any problems via logging facilities. Returning the 'EXIT'
- // * status is equivalent to calling 'stop()' method. This is just a way of
- // * of stopping the worker by its own initiative.
- // */
- // WorkerStatus workerCycle();
+ std::thread::native_handle_type getThreadNativeHandle() { return mWorker.native_handle(); }
private:
void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
- WorkerState* finalState = nullptr) {
- std::unique_lock<std::mutex> lock(mWorkerLock);
- android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
- if (mWorkerState != oldState) {
- if (finalState) *finalState = mWorkerState;
- return;
- }
- mWorkerState = newState;
- mWorkerStateChangeRequest = true;
- mWorkerCv.wait(lock, [&]() {
- android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
- return mWorkerState != newState;
- });
- if (finalState) *finalState = mWorkerState;
- }
- void workerThread() {
- std::string error = static_cast<Impl*>(this)->workerInit();
- if (error.empty() && !mThreadName.empty()) {
- std::string compliantName(mThreadName.substr(0, 15));
- if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str());
- errCode != 0) {
- error.append("Failed to set thread name: ").append(strerror(errCode));
- }
- }
- if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
- if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
- int errCode = errno;
- error.append("Failed to set thread priority: ").append(strerror(errCode));
- }
- }
- {
- std::lock_guard<std::mutex> lock(mWorkerLock);
- mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
- mError = error;
- }
- mWorkerCv.notify_one();
- if (!error.empty()) return;
+ WorkerState* finalState = nullptr);
+ void workerThread();
- for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
- bool needToNotify = false;
- if (WorkerStatus status = state != WorkerState::PAUSED
- ? static_cast<Impl*>(this)->workerCycle()
- : (sched_yield(), WorkerStatus::CONTINUE);
- status == WorkerStatus::CONTINUE) {
- {
- // See https://developer.android.com/training/articles/smp#nonracing
- android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
- if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
- }
- //
- // Pause and resume are synchronous. One worker cycle must complete
- // before the worker indicates a state change. This is how 'mWorkerState' and
- // 'state' interact:
- //
- // mWorkerState == RUNNING
- // client sets mWorkerState := PAUSE_REQUESTED
- // last workerCycle gets executed, state := mWorkerState := PAUSED by us
- // (or the workers enters the 'error' state if workerCycle fails)
- // client gets notified about state change in any case
- // thread is doing a busy wait while 'state == PAUSED'
- // client sets mWorkerState := RESUME_REQUESTED
- // state := mWorkerState (RESUME_REQUESTED)
- // mWorkerState := RUNNING, but we don't notify the client yet
- // first workerCycle gets executed, the code below triggers a client notification
- // (or if workerCycle fails, worker enters 'error' state and also notifies)
- // state := mWorkerState (RUNNING)
- std::lock_guard<std::mutex> lock(mWorkerLock);
- if (state == WorkerState::RESUME_REQUESTED) {
- needToNotify = true;
- }
- state = mWorkerState;
- if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
- state = mWorkerState = WorkerState::PAUSED;
- needToNotify = true;
- } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
- mWorkerState = WorkerState::RUNNING;
- }
- } else {
- std::lock_guard<std::mutex> lock(mWorkerLock);
- if (state == WorkerState::RESUME_REQUESTED ||
- mWorkerState == WorkerState::PAUSE_REQUESTED) {
- needToNotify = true;
- }
- state = mWorkerState = WorkerState::STOPPED;
- if (status == WorkerStatus::ABORT) {
- mError = "workerCycle aborted";
- }
- }
- if (needToNotify) {
- {
- std::lock_guard<std::mutex> lock(mWorkerLock);
- mWorkerStateChangeRequest = false;
- }
- mWorkerCv.notify_one();
- }
- }
- }
-
+ StreamLogic* const mLogic;
std::string mThreadName;
int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
std::thread mWorker;
@@ -230,4 +83,71 @@
std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
};
+} // namespace internal
+
+class StreamLogic {
+ public:
+ friend class internal::ThreadController;
+
+ virtual ~StreamLogic() = default;
+
+ protected:
+ enum class Status { ABORT, CONTINUE, EXIT };
+
+ /* Called once at the beginning of the thread loop. Must return
+ * an empty string to enter the thread loop, otherwise the thread loop
+ * exits and the worker switches into the 'error' state, setting
+ * the error to the returned value.
+ */
+ virtual std::string init() = 0;
+
+ /* Called for each thread loop unless the thread is in 'paused' state.
+ * Must return 'CONTINUE' to continue running, otherwise the thread loop
+ * exits. If the result from worker cycle is 'ABORT' then the worker switches
+ * into the 'error' state with a generic error message. It is recommended that
+ * the subclass reports any problems via logging facilities. Returning the 'EXIT'
+ * status is equivalent to calling 'stop()' method. This is just a way of
+ * of stopping the worker by its own initiative.
+ */
+ virtual Status cycle() = 0;
+};
+
+template <class LogicImpl>
+class StreamWorker : public LogicImpl {
+ public:
+ template <class... Args>
+ explicit StreamWorker(Args&&... args) : LogicImpl(std::forward<Args>(args)...), mThread(this) {}
+
+ // Methods of LogicImpl are available via inheritance.
+ // Forwarded methods of ThreadController follow.
+
+ // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
+ // The nice number is used with the default scheduler. For threads that
+ // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
+ // it is recommended to implement an appropriate configuration sequence within
+ // 'LogicImpl' or 'StreamLogic::init'.
+ bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
+ return mThread.start(name, priority);
+ }
+ void pause() { mThread.pause(); }
+ void resume() { mThread.resume(); }
+ bool hasError() { return mThread.hasError(); }
+ std::string getError() { return mThread.getError(); }
+ void stop() { return mThread.stop(); }
+ bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); }
+
+ // Only used by unit tests.
+ void testLockUnlockMutex(bool lock) { mThread.lockUnlockMutex(lock); }
+ std::thread::native_handle_type testGetThreadNativeHandle() {
+ return mThread.getThreadNativeHandle();
+ }
+
+ private:
+ // The ThreadController gets destroyed before LogicImpl.
+ // After the controller has been destroyed, it is guaranteed that
+ // the thread was joined, thus the 'cycle' method of LogicImpl
+ // will not be called anymore, and it is safe to destroy LogicImpl.
+ internal::ThreadController mThread;
+};
+
} // namespace android::hardware::audio::common
diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp
index df81c69..e3e484d 100644
--- a/audio/aidl/common/tests/streamworker_tests.cpp
+++ b/audio/aidl/common/tests/streamworker_tests.cpp
@@ -16,6 +16,7 @@
#include <pthread.h>
#include <sched.h>
+#include <sys/resource.h>
#include <unistd.h>
#include <atomic>
@@ -26,18 +27,19 @@
#define LOG_TAG "StreamWorker_Test"
#include <log/log.h>
+using android::hardware::audio::common::StreamLogic;
using android::hardware::audio::common::StreamWorker;
-class TestWorker : public StreamWorker<TestWorker> {
+class TestWorkerLogic : public StreamLogic {
public:
struct Stream {
- void setErrorStatus() { status = WorkerStatus::ABORT; }
- void setStopStatus() { status = WorkerStatus::EXIT; }
- std::atomic<WorkerStatus> status = WorkerStatus::CONTINUE;
+ void setErrorStatus() { status = Status::ABORT; }
+ void setStopStatus() { status = Status::EXIT; }
+ std::atomic<Status> status = Status::CONTINUE;
};
// Use nullptr to test error reporting from the worker thread.
- explicit TestWorker(Stream* stream) : mStream(stream) {}
+ explicit TestWorkerLogic(Stream* stream) : mStream(stream) {}
size_t getWorkerCycles() const { return mWorkerCycles; }
int getPriority() const { return mPriority; }
@@ -48,8 +50,10 @@
return mWorkerCycles == cyclesBefore;
}
- std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
- WorkerStatus workerCycle() {
+ protected:
+ // StreamLogic implementation
+ std::string init() override { return mStream != nullptr ? "" : "Expected error"; }
+ Status cycle() override {
mPriority = getpriority(PRIO_PROCESS, 0);
do {
mWorkerCycles++;
@@ -62,6 +66,7 @@
std::atomic<size_t> mWorkerCycles = 0;
std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
};
+using TestWorker = StreamWorker<TestWorkerLogic>;
// The parameter specifies whether an extra call to 'stop' is made at the end.
class StreamWorkerInvalidTest : public testing::TestWithParam<bool> {
diff --git a/audio/aidl/default/Android.bp b/audio/aidl/default/Android.bp
index 027d928..07b1097 100644
--- a/audio/aidl/default/Android.bp
+++ b/audio/aidl/default/Android.bp
@@ -11,6 +11,7 @@
name: "libaudioserviceexampleimpl",
vendor: true,
shared_libs: [
+ "libaudioaidlcommon",
"libbase",
"libbinder_ndk",
"libstagefright_foundation",
diff --git a/audio/aidl/vts/Android.bp b/audio/aidl/vts/Android.bp
index 75ff37f..1d0ec7c 100644
--- a/audio/aidl/vts/Android.bp
+++ b/audio/aidl/vts/Android.bp
@@ -26,6 +26,7 @@
"android.hardware.common-V2-ndk",
"android.hardware.common.fmq-V1-ndk",
"android.media.audio.common.types-V1-ndk",
+ "libaudioaidlcommon",
],
test_suites: [
"general-tests",