audio: Fix the lifetime of the StreamWorker's logic part
Fix the mistake of making StreamWorker to inherit
from the part which provides actual thread logic (Impl).
The lifetime of the logic object must be longer
than the lifetime of the StreamWorker's thread.
Otherwise, the thread could still have running while
the logic has already been destroyed (consider
the order of destructors in C++ class inheritance).
With this fix, the StreamWorker class does not have
to be a template anymore, thus reorganize the code
to move big methods into a .cpp file.
Bug: 205884982
Test: atest libaudioaidlcommon_test --iterations
Merged-In: I5bc2c8fd9d78a0fbc9fddab67456cc5214584045
Change-Id: I5bc2c8fd9d78a0fbc9fddab67456cc5214584045
(cherry picked from commmit 84024eccee71607c13bf13d7de80947efca4de50)
Change-Id: I70958f437657b574cda6480c3216a0b1ea252433
diff --git a/audio/aidl/common/Android.bp b/audio/aidl/common/Android.bp
index 37da9d6..f2d8fc2 100644
--- a/audio/aidl/common/Android.bp
+++ b/audio/aidl/common/Android.bp
@@ -23,7 +23,7 @@
default_applicable_licenses: ["hardware_interfaces_license"],
}
-cc_library_headers {
+cc_library {
name: "libaudioaidlcommon",
host_supported: true,
vendor_available: true,
@@ -36,13 +36,16 @@
"libbase_headers",
"libsystem_headers",
],
+ srcs: [
+ "StreamWorker.cpp",
+ ],
}
cc_test {
name: "libaudioaidlcommon_test",
host_supported: true,
vendor_available: true,
- header_libs: [
+ static_libs: [
"libaudioaidlcommon",
],
shared_libs: [
diff --git a/audio/aidl/common/StreamWorker.cpp b/audio/aidl/common/StreamWorker.cpp
new file mode 100644
index 0000000..9bca760
--- /dev/null
+++ b/audio/aidl/common/StreamWorker.cpp
@@ -0,0 +1,160 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <pthread.h>
+#include <sched.h>
+#include <sys/resource.h>
+
+#include "include/StreamWorker.h"
+
+namespace android::hardware::audio::common::internal {
+
+bool ThreadController::start(const std::string& name, int priority) {
+ mThreadName = name;
+ mThreadPriority = priority;
+ mWorker = std::thread(&ThreadController::workerThread, this);
+ std::unique_lock<std::mutex> lock(mWorkerLock);
+ android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+ mWorkerCv.wait(lock, [&]() {
+ android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+ return mWorkerState == WorkerState::RUNNING || !mError.empty();
+ });
+ mWorkerStateChangeRequest = false;
+ return mWorkerState == WorkerState::RUNNING;
+}
+
+void ThreadController::stop() {
+ {
+ std::lock_guard<std::mutex> lock(mWorkerLock);
+ if (mWorkerState != WorkerState::STOPPED) {
+ mWorkerState = WorkerState::STOPPED;
+ mWorkerStateChangeRequest = true;
+ }
+ }
+ if (mWorker.joinable()) {
+ mWorker.join();
+ }
+}
+
+bool ThreadController::waitForAtLeastOneCycle() {
+ WorkerState newState;
+ switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
+ if (newState != WorkerState::PAUSED) return false;
+ switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
+ return newState == WorkerState::RUNNING;
+}
+
+void ThreadController::switchWorkerStateSync(WorkerState oldState, WorkerState newState,
+ WorkerState* finalState) {
+ std::unique_lock<std::mutex> lock(mWorkerLock);
+ android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+ if (mWorkerState != oldState) {
+ if (finalState) *finalState = mWorkerState;
+ return;
+ }
+ mWorkerState = newState;
+ mWorkerStateChangeRequest = true;
+ mWorkerCv.wait(lock, [&]() {
+ android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+ return mWorkerState != newState;
+ });
+ if (finalState) *finalState = mWorkerState;
+}
+
+void ThreadController::workerThread() {
+ using Status = StreamLogic::Status;
+
+ std::string error = mLogic->init();
+ if (error.empty() && !mThreadName.empty()) {
+ std::string compliantName(mThreadName.substr(0, 15));
+ if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); errCode != 0) {
+ error.append("Failed to set thread name: ").append(strerror(errCode));
+ }
+ }
+ if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
+ if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
+ int errCode = errno;
+ error.append("Failed to set thread priority: ").append(strerror(errCode));
+ }
+ }
+ {
+ std::lock_guard<std::mutex> lock(mWorkerLock);
+ mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
+ mError = error;
+ }
+ mWorkerCv.notify_one();
+ if (!error.empty()) return;
+
+ for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
+ bool needToNotify = false;
+ if (Status status = state != WorkerState::PAUSED ? mLogic->cycle()
+ : (sched_yield(), Status::CONTINUE);
+ status == Status::CONTINUE) {
+ {
+ // See https://developer.android.com/training/articles/smp#nonracing
+ android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+ if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
+ }
+ //
+ // Pause and resume are synchronous. One worker cycle must complete
+ // before the worker indicates a state change. This is how 'mWorkerState' and
+ // 'state' interact:
+ //
+ // mWorkerState == RUNNING
+ // client sets mWorkerState := PAUSE_REQUESTED
+ // last workerCycle gets executed, state := mWorkerState := PAUSED by us
+ // (or the workers enters the 'error' state if workerCycle fails)
+ // client gets notified about state change in any case
+ // thread is doing a busy wait while 'state == PAUSED'
+ // client sets mWorkerState := RESUME_REQUESTED
+ // state := mWorkerState (RESUME_REQUESTED)
+ // mWorkerState := RUNNING, but we don't notify the client yet
+ // first workerCycle gets executed, the code below triggers a client notification
+ // (or if workerCycle fails, worker enters 'error' state and also notifies)
+ // state := mWorkerState (RUNNING)
+ std::lock_guard<std::mutex> lock(mWorkerLock);
+ if (state == WorkerState::RESUME_REQUESTED) {
+ needToNotify = true;
+ }
+ state = mWorkerState;
+ if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
+ state = mWorkerState = WorkerState::PAUSED;
+ needToNotify = true;
+ } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
+ mWorkerState = WorkerState::RUNNING;
+ }
+ } else {
+ std::lock_guard<std::mutex> lock(mWorkerLock);
+ if (state == WorkerState::RESUME_REQUESTED ||
+ mWorkerState == WorkerState::PAUSE_REQUESTED) {
+ needToNotify = true;
+ }
+ state = mWorkerState = WorkerState::STOPPED;
+ if (status == Status::ABORT) {
+ mError = "Received ABORT from the logic cycle";
+ }
+ }
+ if (needToNotify) {
+ {
+ std::lock_guard<std::mutex> lock(mWorkerLock);
+ mWorkerStateChangeRequest = false;
+ }
+ mWorkerCv.notify_one();
+ }
+ }
+}
+
+} // namespace android::hardware::audio::common::internal
diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h
index 03685fc..6260eca 100644
--- a/audio/aidl/common/include/StreamWorker.h
+++ b/audio/aidl/common/include/StreamWorker.h
@@ -16,10 +16,6 @@
#pragma once
-#include <pthread.h>
-#include <sched.h>
-#include <sys/resource.h>
-
#include <atomic>
#include <condition_variable>
#include <mutex>
@@ -31,32 +27,18 @@
namespace android::hardware::audio::common {
-template <typename Impl>
-class StreamWorker {
+class StreamLogic;
+
+namespace internal {
+
+class ThreadController {
enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
public:
- enum class WorkerStatus { ABORT, CONTINUE, EXIT };
+ explicit ThreadController(StreamLogic* logic) : mLogic(logic) {}
+ ~ThreadController() { stop(); }
- StreamWorker() = default;
- ~StreamWorker() { stop(); }
- // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
- // The nice number is used with the default scheduler. For threads that
- // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
- // it is recommended to implement an appropriate configuration sequence within `workerInit`.
- bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
- mThreadName = name;
- mThreadPriority = priority;
- mWorker = std::thread(&StreamWorker::workerThread, this);
- std::unique_lock<std::mutex> lock(mWorkerLock);
- android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
- mWorkerCv.wait(lock, [&]() {
- android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
- return mWorkerState == WorkerState::RUNNING || !mError.empty();
- });
- mWorkerStateChangeRequest = false;
- return mWorkerState == WorkerState::RUNNING;
- }
+ bool start(const std::string& name, int priority);
void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
bool hasError() {
@@ -67,150 +49,21 @@
std::lock_guard<std::mutex> lock(mWorkerLock);
return mError;
}
- void stop() {
- {
- std::lock_guard<std::mutex> lock(mWorkerLock);
- if (mWorkerState != WorkerState::STOPPED) {
- mWorkerState = WorkerState::STOPPED;
- mWorkerStateChangeRequest = true;
- }
- }
- if (mWorker.joinable()) {
- mWorker.join();
- }
- }
- bool waitForAtLeastOneCycle() {
- WorkerState newState;
- switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
- if (newState != WorkerState::PAUSED) return false;
- switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
- return newState == WorkerState::RUNNING;
- }
+ void stop();
+ bool waitForAtLeastOneCycle();
+
// Only used by unit tests.
- void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
+ void lockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
lock ? mWorkerLock.lock() : mWorkerLock.unlock();
}
- std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); }
-
- // Methods that need to be provided by subclasses:
- //
- // /* Called once at the beginning of the thread loop. Must return
- // * an empty string to enter the thread loop, otherwise the thread loop
- // * exits and the worker switches into the 'error' state, setting
- // * the error to the returned value.
- // */
- // std::string workerInit();
- //
- // /* Called for each thread loop unless the thread is in 'paused' state.
- // * Must return 'CONTINUE' to continue running, otherwise the thread loop
- // * exits. If the result from worker cycle is 'ABORT' then the worker switches
- // * into the 'error' state with a generic error message. It is recommended that
- // * the subclass reports any problems via logging facilities. Returning the 'EXIT'
- // * status is equivalent to calling 'stop()' method. This is just a way of
- // * of stopping the worker by its own initiative.
- // */
- // WorkerStatus workerCycle();
+ std::thread::native_handle_type getThreadNativeHandle() { return mWorker.native_handle(); }
private:
void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
- WorkerState* finalState = nullptr) {
- std::unique_lock<std::mutex> lock(mWorkerLock);
- android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
- if (mWorkerState != oldState) {
- if (finalState) *finalState = mWorkerState;
- return;
- }
- mWorkerState = newState;
- mWorkerStateChangeRequest = true;
- mWorkerCv.wait(lock, [&]() {
- android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
- return mWorkerState != newState;
- });
- if (finalState) *finalState = mWorkerState;
- }
- void workerThread() {
- std::string error = static_cast<Impl*>(this)->workerInit();
- if (error.empty() && !mThreadName.empty()) {
- std::string compliantName(mThreadName.substr(0, 15));
- if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str());
- errCode != 0) {
- error.append("Failed to set thread name: ").append(strerror(errCode));
- }
- }
- if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
- if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
- int errCode = errno;
- error.append("Failed to set thread priority: ").append(strerror(errCode));
- }
- }
- {
- std::lock_guard<std::mutex> lock(mWorkerLock);
- mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
- mError = error;
- }
- mWorkerCv.notify_one();
- if (!error.empty()) return;
+ WorkerState* finalState = nullptr);
+ void workerThread();
- for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
- bool needToNotify = false;
- if (WorkerStatus status = state != WorkerState::PAUSED
- ? static_cast<Impl*>(this)->workerCycle()
- : (sched_yield(), WorkerStatus::CONTINUE);
- status == WorkerStatus::CONTINUE) {
- {
- // See https://developer.android.com/training/articles/smp#nonracing
- android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
- if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
- }
- //
- // Pause and resume are synchronous. One worker cycle must complete
- // before the worker indicates a state change. This is how 'mWorkerState' and
- // 'state' interact:
- //
- // mWorkerState == RUNNING
- // client sets mWorkerState := PAUSE_REQUESTED
- // last workerCycle gets executed, state := mWorkerState := PAUSED by us
- // (or the workers enters the 'error' state if workerCycle fails)
- // client gets notified about state change in any case
- // thread is doing a busy wait while 'state == PAUSED'
- // client sets mWorkerState := RESUME_REQUESTED
- // state := mWorkerState (RESUME_REQUESTED)
- // mWorkerState := RUNNING, but we don't notify the client yet
- // first workerCycle gets executed, the code below triggers a client notification
- // (or if workerCycle fails, worker enters 'error' state and also notifies)
- // state := mWorkerState (RUNNING)
- std::lock_guard<std::mutex> lock(mWorkerLock);
- if (state == WorkerState::RESUME_REQUESTED) {
- needToNotify = true;
- }
- state = mWorkerState;
- if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
- state = mWorkerState = WorkerState::PAUSED;
- needToNotify = true;
- } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
- mWorkerState = WorkerState::RUNNING;
- }
- } else {
- std::lock_guard<std::mutex> lock(mWorkerLock);
- if (state == WorkerState::RESUME_REQUESTED ||
- mWorkerState == WorkerState::PAUSE_REQUESTED) {
- needToNotify = true;
- }
- state = mWorkerState = WorkerState::STOPPED;
- if (status == WorkerStatus::ABORT) {
- mError = "workerCycle aborted";
- }
- }
- if (needToNotify) {
- {
- std::lock_guard<std::mutex> lock(mWorkerLock);
- mWorkerStateChangeRequest = false;
- }
- mWorkerCv.notify_one();
- }
- }
- }
-
+ StreamLogic* const mLogic;
std::string mThreadName;
int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
std::thread mWorker;
@@ -230,4 +83,71 @@
std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
};
+} // namespace internal
+
+class StreamLogic {
+ public:
+ friend class internal::ThreadController;
+
+ virtual ~StreamLogic() = default;
+
+ protected:
+ enum class Status { ABORT, CONTINUE, EXIT };
+
+ /* Called once at the beginning of the thread loop. Must return
+ * an empty string to enter the thread loop, otherwise the thread loop
+ * exits and the worker switches into the 'error' state, setting
+ * the error to the returned value.
+ */
+ virtual std::string init() = 0;
+
+ /* Called for each thread loop unless the thread is in 'paused' state.
+ * Must return 'CONTINUE' to continue running, otherwise the thread loop
+ * exits. If the result from worker cycle is 'ABORT' then the worker switches
+ * into the 'error' state with a generic error message. It is recommended that
+ * the subclass reports any problems via logging facilities. Returning the 'EXIT'
+ * status is equivalent to calling 'stop()' method. This is just a way of
+ * of stopping the worker by its own initiative.
+ */
+ virtual Status cycle() = 0;
+};
+
+template <class LogicImpl>
+class StreamWorker : public LogicImpl {
+ public:
+ template <class... Args>
+ explicit StreamWorker(Args&&... args) : LogicImpl(std::forward<Args>(args)...), mThread(this) {}
+
+ // Methods of LogicImpl are available via inheritance.
+ // Forwarded methods of ThreadController follow.
+
+ // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
+ // The nice number is used with the default scheduler. For threads that
+ // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
+ // it is recommended to implement an appropriate configuration sequence within
+ // 'LogicImpl' or 'StreamLogic::init'.
+ bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
+ return mThread.start(name, priority);
+ }
+ void pause() { mThread.pause(); }
+ void resume() { mThread.resume(); }
+ bool hasError() { return mThread.hasError(); }
+ std::string getError() { return mThread.getError(); }
+ void stop() { return mThread.stop(); }
+ bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); }
+
+ // Only used by unit tests.
+ void testLockUnlockMutex(bool lock) { mThread.lockUnlockMutex(lock); }
+ std::thread::native_handle_type testGetThreadNativeHandle() {
+ return mThread.getThreadNativeHandle();
+ }
+
+ private:
+ // The ThreadController gets destroyed before LogicImpl.
+ // After the controller has been destroyed, it is guaranteed that
+ // the thread was joined, thus the 'cycle' method of LogicImpl
+ // will not be called anymore, and it is safe to destroy LogicImpl.
+ internal::ThreadController mThread;
+};
+
} // namespace android::hardware::audio::common
diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp
index df81c69..e3e484d 100644
--- a/audio/aidl/common/tests/streamworker_tests.cpp
+++ b/audio/aidl/common/tests/streamworker_tests.cpp
@@ -16,6 +16,7 @@
#include <pthread.h>
#include <sched.h>
+#include <sys/resource.h>
#include <unistd.h>
#include <atomic>
@@ -26,18 +27,19 @@
#define LOG_TAG "StreamWorker_Test"
#include <log/log.h>
+using android::hardware::audio::common::StreamLogic;
using android::hardware::audio::common::StreamWorker;
-class TestWorker : public StreamWorker<TestWorker> {
+class TestWorkerLogic : public StreamLogic {
public:
struct Stream {
- void setErrorStatus() { status = WorkerStatus::ABORT; }
- void setStopStatus() { status = WorkerStatus::EXIT; }
- std::atomic<WorkerStatus> status = WorkerStatus::CONTINUE;
+ void setErrorStatus() { status = Status::ABORT; }
+ void setStopStatus() { status = Status::EXIT; }
+ std::atomic<Status> status = Status::CONTINUE;
};
// Use nullptr to test error reporting from the worker thread.
- explicit TestWorker(Stream* stream) : mStream(stream) {}
+ explicit TestWorkerLogic(Stream* stream) : mStream(stream) {}
size_t getWorkerCycles() const { return mWorkerCycles; }
int getPriority() const { return mPriority; }
@@ -48,8 +50,10 @@
return mWorkerCycles == cyclesBefore;
}
- std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
- WorkerStatus workerCycle() {
+ protected:
+ // StreamLogic implementation
+ std::string init() override { return mStream != nullptr ? "" : "Expected error"; }
+ Status cycle() override {
mPriority = getpriority(PRIO_PROCESS, 0);
do {
mWorkerCycles++;
@@ -62,6 +66,7 @@
std::atomic<size_t> mWorkerCycles = 0;
std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
};
+using TestWorker = StreamWorker<TestWorkerLogic>;
// The parameter specifies whether an extra call to 'stop' is made at the end.
class StreamWorkerInvalidTest : public testing::TestWithParam<bool> {
diff --git a/audio/aidl/default/Android.bp b/audio/aidl/default/Android.bp
index 027d928..07b1097 100644
--- a/audio/aidl/default/Android.bp
+++ b/audio/aidl/default/Android.bp
@@ -11,6 +11,7 @@
name: "libaudioserviceexampleimpl",
vendor: true,
shared_libs: [
+ "libaudioaidlcommon",
"libbase",
"libbinder_ndk",
"libstagefright_foundation",
diff --git a/audio/aidl/vts/Android.bp b/audio/aidl/vts/Android.bp
index 75ff37f..1d0ec7c 100644
--- a/audio/aidl/vts/Android.bp
+++ b/audio/aidl/vts/Android.bp
@@ -26,6 +26,7 @@
"android.hardware.common-V2-ndk",
"android.hardware.common.fmq-V1-ndk",
"android.media.audio.common.types-V1-ndk",
+ "libaudioaidlcommon",
],
test_suites: [
"general-tests",