audio: Fix the lifetime of the StreamWorker's logic part am: 0b9c5feed1 am: 4b279d6a32 am: 231ca12ce8 am: 63989892e3

Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/2199345

Change-Id: I90e8237e5427fd114a4c661437971906cff0219d
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/audio/aidl/common/Android.bp b/audio/aidl/common/Android.bp
index 37da9d6..f2d8fc2 100644
--- a/audio/aidl/common/Android.bp
+++ b/audio/aidl/common/Android.bp
@@ -23,7 +23,7 @@
     default_applicable_licenses: ["hardware_interfaces_license"],
 }
 
-cc_library_headers {
+cc_library {
     name: "libaudioaidlcommon",
     host_supported: true,
     vendor_available: true,
@@ -36,13 +36,16 @@
         "libbase_headers",
         "libsystem_headers",
     ],
+    srcs: [
+        "StreamWorker.cpp",
+    ],
 }
 
 cc_test {
     name: "libaudioaidlcommon_test",
     host_supported: true,
     vendor_available: true,
-    header_libs: [
+    static_libs: [
         "libaudioaidlcommon",
     ],
     shared_libs: [
diff --git a/audio/aidl/common/StreamWorker.cpp b/audio/aidl/common/StreamWorker.cpp
new file mode 100644
index 0000000..9bca760
--- /dev/null
+++ b/audio/aidl/common/StreamWorker.cpp
@@ -0,0 +1,160 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <pthread.h>
+#include <sched.h>
+#include <sys/resource.h>
+
+#include "include/StreamWorker.h"
+
+namespace android::hardware::audio::common::internal {
+
+bool ThreadController::start(const std::string& name, int priority) {
+    mThreadName = name;
+    mThreadPriority = priority;
+    mWorker = std::thread(&ThreadController::workerThread, this);
+    std::unique_lock<std::mutex> lock(mWorkerLock);
+    android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+    mWorkerCv.wait(lock, [&]() {
+        android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+        return mWorkerState == WorkerState::RUNNING || !mError.empty();
+    });
+    mWorkerStateChangeRequest = false;
+    return mWorkerState == WorkerState::RUNNING;
+}
+
+void ThreadController::stop() {
+    {
+        std::lock_guard<std::mutex> lock(mWorkerLock);
+        if (mWorkerState != WorkerState::STOPPED) {
+            mWorkerState = WorkerState::STOPPED;
+            mWorkerStateChangeRequest = true;
+        }
+    }
+    if (mWorker.joinable()) {
+        mWorker.join();
+    }
+}
+
+bool ThreadController::waitForAtLeastOneCycle() {
+    WorkerState newState;
+    switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
+    if (newState != WorkerState::PAUSED) return false;
+    switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
+    return newState == WorkerState::RUNNING;
+}
+
+void ThreadController::switchWorkerStateSync(WorkerState oldState, WorkerState newState,
+                                             WorkerState* finalState) {
+    std::unique_lock<std::mutex> lock(mWorkerLock);
+    android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+    if (mWorkerState != oldState) {
+        if (finalState) *finalState = mWorkerState;
+        return;
+    }
+    mWorkerState = newState;
+    mWorkerStateChangeRequest = true;
+    mWorkerCv.wait(lock, [&]() {
+        android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+        return mWorkerState != newState;
+    });
+    if (finalState) *finalState = mWorkerState;
+}
+
+void ThreadController::workerThread() {
+    using Status = StreamLogic::Status;
+
+    std::string error = mLogic->init();
+    if (error.empty() && !mThreadName.empty()) {
+        std::string compliantName(mThreadName.substr(0, 15));
+        if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); errCode != 0) {
+            error.append("Failed to set thread name: ").append(strerror(errCode));
+        }
+    }
+    if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
+        if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
+            int errCode = errno;
+            error.append("Failed to set thread priority: ").append(strerror(errCode));
+        }
+    }
+    {
+        std::lock_guard<std::mutex> lock(mWorkerLock);
+        mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
+        mError = error;
+    }
+    mWorkerCv.notify_one();
+    if (!error.empty()) return;
+
+    for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
+        bool needToNotify = false;
+        if (Status status = state != WorkerState::PAUSED ? mLogic->cycle()
+                                                         : (sched_yield(), Status::CONTINUE);
+            status == Status::CONTINUE) {
+            {
+                // See https://developer.android.com/training/articles/smp#nonracing
+                android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+                if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
+            }
+            //
+            // Pause and resume are synchronous. One worker cycle must complete
+            // before the worker indicates a state change. This is how 'mWorkerState' and
+            // 'state' interact:
+            //
+            // mWorkerState == RUNNING
+            // client sets mWorkerState := PAUSE_REQUESTED
+            // last workerCycle gets executed, state := mWorkerState := PAUSED by us
+            //   (or the workers enters the 'error' state if workerCycle fails)
+            // client gets notified about state change in any case
+            // thread is doing a busy wait while 'state == PAUSED'
+            // client sets mWorkerState := RESUME_REQUESTED
+            // state := mWorkerState (RESUME_REQUESTED)
+            // mWorkerState := RUNNING, but we don't notify the client yet
+            // first workerCycle gets executed, the code below triggers a client notification
+            //   (or if workerCycle fails, worker enters 'error' state and also notifies)
+            // state := mWorkerState (RUNNING)
+            std::lock_guard<std::mutex> lock(mWorkerLock);
+            if (state == WorkerState::RESUME_REQUESTED) {
+                needToNotify = true;
+            }
+            state = mWorkerState;
+            if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
+                state = mWorkerState = WorkerState::PAUSED;
+                needToNotify = true;
+            } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
+                mWorkerState = WorkerState::RUNNING;
+            }
+        } else {
+            std::lock_guard<std::mutex> lock(mWorkerLock);
+            if (state == WorkerState::RESUME_REQUESTED ||
+                mWorkerState == WorkerState::PAUSE_REQUESTED) {
+                needToNotify = true;
+            }
+            state = mWorkerState = WorkerState::STOPPED;
+            if (status == Status::ABORT) {
+                mError = "Received ABORT from the logic cycle";
+            }
+        }
+        if (needToNotify) {
+            {
+                std::lock_guard<std::mutex> lock(mWorkerLock);
+                mWorkerStateChangeRequest = false;
+            }
+            mWorkerCv.notify_one();
+        }
+    }
+}
+
+}  // namespace android::hardware::audio::common::internal
diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h
index 03685fc..6260eca 100644
--- a/audio/aidl/common/include/StreamWorker.h
+++ b/audio/aidl/common/include/StreamWorker.h
@@ -16,10 +16,6 @@
 
 #pragma once
 
-#include <pthread.h>
-#include <sched.h>
-#include <sys/resource.h>
-
 #include <atomic>
 #include <condition_variable>
 #include <mutex>
@@ -31,32 +27,18 @@
 
 namespace android::hardware::audio::common {
 
-template <typename Impl>
-class StreamWorker {
+class StreamLogic;
+
+namespace internal {
+
+class ThreadController {
     enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
 
   public:
-    enum class WorkerStatus { ABORT, CONTINUE, EXIT };
+    explicit ThreadController(StreamLogic* logic) : mLogic(logic) {}
+    ~ThreadController() { stop(); }
 
-    StreamWorker() = default;
-    ~StreamWorker() { stop(); }
-    // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
-    // The nice number is used with the default scheduler. For threads that
-    // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
-    // it is recommended to implement an appropriate configuration sequence within `workerInit`.
-    bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
-        mThreadName = name;
-        mThreadPriority = priority;
-        mWorker = std::thread(&StreamWorker::workerThread, this);
-        std::unique_lock<std::mutex> lock(mWorkerLock);
-        android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
-        mWorkerCv.wait(lock, [&]() {
-            android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
-            return mWorkerState == WorkerState::RUNNING || !mError.empty();
-        });
-        mWorkerStateChangeRequest = false;
-        return mWorkerState == WorkerState::RUNNING;
-    }
+    bool start(const std::string& name, int priority);
     void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
     void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
     bool hasError() {
@@ -67,150 +49,21 @@
         std::lock_guard<std::mutex> lock(mWorkerLock);
         return mError;
     }
-    void stop() {
-        {
-            std::lock_guard<std::mutex> lock(mWorkerLock);
-            if (mWorkerState != WorkerState::STOPPED) {
-                mWorkerState = WorkerState::STOPPED;
-                mWorkerStateChangeRequest = true;
-            }
-        }
-        if (mWorker.joinable()) {
-            mWorker.join();
-        }
-    }
-    bool waitForAtLeastOneCycle() {
-        WorkerState newState;
-        switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
-        if (newState != WorkerState::PAUSED) return false;
-        switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
-        return newState == WorkerState::RUNNING;
-    }
+    void stop();
+    bool waitForAtLeastOneCycle();
+
     // Only used by unit tests.
-    void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
+    void lockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
         lock ? mWorkerLock.lock() : mWorkerLock.unlock();
     }
-    std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); }
-
-    // Methods that need to be provided by subclasses:
-    //
-    // /* Called once at the beginning of the thread loop. Must return
-    //  * an empty string to enter the thread loop, otherwise the thread loop
-    //  * exits and the worker switches into the 'error' state, setting
-    //  * the error to the returned value.
-    //  */
-    // std::string workerInit();
-    //
-    // /* Called for each thread loop unless the thread is in 'paused' state.
-    //  * Must return 'CONTINUE' to continue running, otherwise the thread loop
-    //  * exits. If the result from worker cycle is 'ABORT' then the worker switches
-    //  * into the 'error' state with a generic error message. It is recommended that
-    //  * the subclass reports any problems via logging facilities. Returning the 'EXIT'
-    //  * status is equivalent to calling 'stop()' method. This is just a way of
-    //  * of stopping the worker by its own initiative.
-    //  */
-    // WorkerStatus workerCycle();
+    std::thread::native_handle_type getThreadNativeHandle() { return mWorker.native_handle(); }
 
   private:
     void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
-                               WorkerState* finalState = nullptr) {
-        std::unique_lock<std::mutex> lock(mWorkerLock);
-        android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
-        if (mWorkerState != oldState) {
-            if (finalState) *finalState = mWorkerState;
-            return;
-        }
-        mWorkerState = newState;
-        mWorkerStateChangeRequest = true;
-        mWorkerCv.wait(lock, [&]() {
-            android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
-            return mWorkerState != newState;
-        });
-        if (finalState) *finalState = mWorkerState;
-    }
-    void workerThread() {
-        std::string error = static_cast<Impl*>(this)->workerInit();
-        if (error.empty() && !mThreadName.empty()) {
-            std::string compliantName(mThreadName.substr(0, 15));
-            if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str());
-                errCode != 0) {
-                error.append("Failed to set thread name: ").append(strerror(errCode));
-            }
-        }
-        if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
-            if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
-                int errCode = errno;
-                error.append("Failed to set thread priority: ").append(strerror(errCode));
-            }
-        }
-        {
-            std::lock_guard<std::mutex> lock(mWorkerLock);
-            mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
-            mError = error;
-        }
-        mWorkerCv.notify_one();
-        if (!error.empty()) return;
+                               WorkerState* finalState = nullptr);
+    void workerThread();
 
-        for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
-            bool needToNotify = false;
-            if (WorkerStatus status = state != WorkerState::PAUSED
-                                              ? static_cast<Impl*>(this)->workerCycle()
-                                              : (sched_yield(), WorkerStatus::CONTINUE);
-                status == WorkerStatus::CONTINUE) {
-                {
-                    // See https://developer.android.com/training/articles/smp#nonracing
-                    android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
-                    if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
-                }
-                //
-                // Pause and resume are synchronous. One worker cycle must complete
-                // before the worker indicates a state change. This is how 'mWorkerState' and
-                // 'state' interact:
-                //
-                // mWorkerState == RUNNING
-                // client sets mWorkerState := PAUSE_REQUESTED
-                // last workerCycle gets executed, state := mWorkerState := PAUSED by us
-                //   (or the workers enters the 'error' state if workerCycle fails)
-                // client gets notified about state change in any case
-                // thread is doing a busy wait while 'state == PAUSED'
-                // client sets mWorkerState := RESUME_REQUESTED
-                // state := mWorkerState (RESUME_REQUESTED)
-                // mWorkerState := RUNNING, but we don't notify the client yet
-                // first workerCycle gets executed, the code below triggers a client notification
-                //   (or if workerCycle fails, worker enters 'error' state and also notifies)
-                // state := mWorkerState (RUNNING)
-                std::lock_guard<std::mutex> lock(mWorkerLock);
-                if (state == WorkerState::RESUME_REQUESTED) {
-                    needToNotify = true;
-                }
-                state = mWorkerState;
-                if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
-                    state = mWorkerState = WorkerState::PAUSED;
-                    needToNotify = true;
-                } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
-                    mWorkerState = WorkerState::RUNNING;
-                }
-            } else {
-                std::lock_guard<std::mutex> lock(mWorkerLock);
-                if (state == WorkerState::RESUME_REQUESTED ||
-                    mWorkerState == WorkerState::PAUSE_REQUESTED) {
-                    needToNotify = true;
-                }
-                state = mWorkerState = WorkerState::STOPPED;
-                if (status == WorkerStatus::ABORT) {
-                    mError = "workerCycle aborted";
-                }
-            }
-            if (needToNotify) {
-                {
-                    std::lock_guard<std::mutex> lock(mWorkerLock);
-                    mWorkerStateChangeRequest = false;
-                }
-                mWorkerCv.notify_one();
-            }
-        }
-    }
-
+    StreamLogic* const mLogic;
     std::string mThreadName;
     int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
     std::thread mWorker;
@@ -230,4 +83,71 @@
     std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
 };
 
+}  // namespace internal
+
+class StreamLogic {
+  public:
+    friend class internal::ThreadController;
+
+    virtual ~StreamLogic() = default;
+
+  protected:
+    enum class Status { ABORT, CONTINUE, EXIT };
+
+    /* Called once at the beginning of the thread loop. Must return
+     * an empty string to enter the thread loop, otherwise the thread loop
+     * exits and the worker switches into the 'error' state, setting
+     * the error to the returned value.
+     */
+    virtual std::string init() = 0;
+
+    /* Called for each thread loop unless the thread is in 'paused' state.
+     * Must return 'CONTINUE' to continue running, otherwise the thread loop
+     * exits. If the result from worker cycle is 'ABORT' then the worker switches
+     * into the 'error' state with a generic error message. It is recommended that
+     * the subclass reports any problems via logging facilities. Returning the 'EXIT'
+     * status is equivalent to calling 'stop()' method. This is just a way of
+     * of stopping the worker by its own initiative.
+     */
+    virtual Status cycle() = 0;
+};
+
+template <class LogicImpl>
+class StreamWorker : public LogicImpl {
+  public:
+    template <class... Args>
+    explicit StreamWorker(Args&&... args) : LogicImpl(std::forward<Args>(args)...), mThread(this) {}
+
+    // Methods of LogicImpl are available via inheritance.
+    // Forwarded methods of ThreadController follow.
+
+    // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
+    // The nice number is used with the default scheduler. For threads that
+    // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
+    // it is recommended to implement an appropriate configuration sequence within
+    // 'LogicImpl' or 'StreamLogic::init'.
+    bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
+        return mThread.start(name, priority);
+    }
+    void pause() { mThread.pause(); }
+    void resume() { mThread.resume(); }
+    bool hasError() { return mThread.hasError(); }
+    std::string getError() { return mThread.getError(); }
+    void stop() { return mThread.stop(); }
+    bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); }
+
+    // Only used by unit tests.
+    void testLockUnlockMutex(bool lock) { mThread.lockUnlockMutex(lock); }
+    std::thread::native_handle_type testGetThreadNativeHandle() {
+        return mThread.getThreadNativeHandle();
+    }
+
+  private:
+    // The ThreadController gets destroyed before LogicImpl.
+    // After the controller has been destroyed, it is guaranteed that
+    // the thread was joined, thus the 'cycle' method of LogicImpl
+    // will not be called anymore, and it is safe to destroy LogicImpl.
+    internal::ThreadController mThread;
+};
+
 }  // namespace android::hardware::audio::common
diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp
index df81c69..e3e484d 100644
--- a/audio/aidl/common/tests/streamworker_tests.cpp
+++ b/audio/aidl/common/tests/streamworker_tests.cpp
@@ -16,6 +16,7 @@
 
 #include <pthread.h>
 #include <sched.h>
+#include <sys/resource.h>
 #include <unistd.h>
 
 #include <atomic>
@@ -26,18 +27,19 @@
 #define LOG_TAG "StreamWorker_Test"
 #include <log/log.h>
 
+using android::hardware::audio::common::StreamLogic;
 using android::hardware::audio::common::StreamWorker;
 
-class TestWorker : public StreamWorker<TestWorker> {
+class TestWorkerLogic : public StreamLogic {
   public:
     struct Stream {
-        void setErrorStatus() { status = WorkerStatus::ABORT; }
-        void setStopStatus() { status = WorkerStatus::EXIT; }
-        std::atomic<WorkerStatus> status = WorkerStatus::CONTINUE;
+        void setErrorStatus() { status = Status::ABORT; }
+        void setStopStatus() { status = Status::EXIT; }
+        std::atomic<Status> status = Status::CONTINUE;
     };
 
     // Use nullptr to test error reporting from the worker thread.
-    explicit TestWorker(Stream* stream) : mStream(stream) {}
+    explicit TestWorkerLogic(Stream* stream) : mStream(stream) {}
 
     size_t getWorkerCycles() const { return mWorkerCycles; }
     int getPriority() const { return mPriority; }
@@ -48,8 +50,10 @@
         return mWorkerCycles == cyclesBefore;
     }
 
-    std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
-    WorkerStatus workerCycle() {
+  protected:
+    // StreamLogic implementation
+    std::string init() override { return mStream != nullptr ? "" : "Expected error"; }
+    Status cycle() override {
         mPriority = getpriority(PRIO_PROCESS, 0);
         do {
             mWorkerCycles++;
@@ -62,6 +66,7 @@
     std::atomic<size_t> mWorkerCycles = 0;
     std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
 };
+using TestWorker = StreamWorker<TestWorkerLogic>;
 
 // The parameter specifies whether an extra call to 'stop' is made at the end.
 class StreamWorkerInvalidTest : public testing::TestWithParam<bool> {
diff --git a/audio/aidl/default/Android.bp b/audio/aidl/default/Android.bp
index 027d928..07b1097 100644
--- a/audio/aidl/default/Android.bp
+++ b/audio/aidl/default/Android.bp
@@ -11,6 +11,7 @@
     name: "libaudioserviceexampleimpl",
     vendor: true,
     shared_libs: [
+        "libaudioaidlcommon",
         "libbase",
         "libbinder_ndk",
         "libstagefright_foundation",
diff --git a/audio/aidl/vts/Android.bp b/audio/aidl/vts/Android.bp
index 75ff37f..1d0ec7c 100644
--- a/audio/aidl/vts/Android.bp
+++ b/audio/aidl/vts/Android.bp
@@ -26,6 +26,7 @@
         "android.hardware.common-V2-ndk",
         "android.hardware.common.fmq-V1-ndk",
         "android.media.audio.common.types-V1-ndk",
+        "libaudioaidlcommon",
     ],
     test_suites: [
         "general-tests",