audio: Allow stopping a StreamWorker from the looping thread

Enhance the return type of the 'workerCycle' to allow it
exiting without inducing an error on the controller side.

Also, put StreamWorker into a namespace.

Bug: 205884982
Test: atest libaudioaidlcommon_test --iterations
Merged-In: I3b27028b10f80f27985040cae8f8b0e6ab63ddad
Change-Id: I3b27028b10f80f27985040cae8f8b0e6ab63ddad
(cherry picked from commit 5021df71c713a01e619088f741ac88334fb0c25b)
diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h
index 7764904..03685fc 100644
--- a/audio/aidl/common/include/StreamWorker.h
+++ b/audio/aidl/common/include/StreamWorker.h
@@ -29,11 +29,15 @@
 #include <android-base/thread_annotations.h>
 #include <system/thread_defs.h>
 
+namespace android::hardware::audio::common {
+
 template <typename Impl>
 class StreamWorker {
     enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
 
   public:
+    enum class WorkerStatus { ABORT, CONTINUE, EXIT };
+
     StreamWorker() = default;
     ~StreamWorker() { stop(); }
     // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
@@ -66,8 +70,7 @@
     void stop() {
         {
             std::lock_guard<std::mutex> lock(mWorkerLock);
-            if (mError.empty()) {
-                if (mWorkerState == WorkerState::STOPPED) return;
+            if (mWorkerState != WorkerState::STOPPED) {
                 mWorkerState = WorkerState::STOPPED;
                 mWorkerStateChangeRequest = true;
             }
@@ -91,18 +94,22 @@
 
     // Methods that need to be provided by subclasses:
     //
-    // Called once at the beginning of the thread loop. Must return
-    // an empty string to enter the thread loop, otherwise the thread loop
-    // exits and the worker switches into the 'error' state, setting
-    // the error to the returned value.
+    // /* Called once at the beginning of the thread loop. Must return
+    //  * an empty string to enter the thread loop, otherwise the thread loop
+    //  * exits and the worker switches into the 'error' state, setting
+    //  * the error to the returned value.
+    //  */
     // std::string workerInit();
     //
-    // Called for each thread loop unless the thread is in 'paused' state.
-    // Must return 'true' to continue running, otherwise the thread loop
-    // exits and the worker switches into the 'error' state with a generic
-    // error message. It is recommended that the subclass reports any
-    // problems via logging facilities.
-    // bool workerCycle();
+    // /* Called for each thread loop unless the thread is in 'paused' state.
+    //  * Must return 'CONTINUE' to continue running, otherwise the thread loop
+    //  * exits. If the result from worker cycle is 'ABORT' then the worker switches
+    //  * into the 'error' state with a generic error message. It is recommended that
+    //  * the subclass reports any problems via logging facilities. Returning the 'EXIT'
+    //  * status is equivalent to calling 'stop()' method. This is just a way of
+    //  * of stopping the worker by its own initiative.
+    //  */
+    // WorkerStatus workerCycle();
 
   private:
     void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
@@ -146,8 +153,10 @@
 
         for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
             bool needToNotify = false;
-            if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle()
-                                             : (sched_yield(), true)) {
+            if (WorkerStatus status = state != WorkerState::PAUSED
+                                              ? static_cast<Impl*>(this)->workerCycle()
+                                              : (sched_yield(), WorkerStatus::CONTINUE);
+                status == WorkerStatus::CONTINUE) {
                 {
                     // See https://developer.android.com/training/articles/smp#nonracing
                     android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
@@ -188,7 +197,9 @@
                     needToNotify = true;
                 }
                 state = mWorkerState = WorkerState::STOPPED;
-                mError = "workerCycle failed";
+                if (status == WorkerStatus::ABORT) {
+                    mError = "workerCycle aborted";
+                }
             }
             if (needToNotify) {
                 {
@@ -218,3 +229,5 @@
     static_assert(std::atomic<bool>::is_always_lock_free);
     std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
 };
+
+}  // namespace android::hardware::audio::common
diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp
index 9fb1a8e..df81c69 100644
--- a/audio/aidl/common/tests/streamworker_tests.cpp
+++ b/audio/aidl/common/tests/streamworker_tests.cpp
@@ -26,14 +26,18 @@
 #define LOG_TAG "StreamWorker_Test"
 #include <log/log.h>
 
-struct TestStream {
-    std::atomic<bool> error = false;
-};
+using android::hardware::audio::common::StreamWorker;
 
 class TestWorker : public StreamWorker<TestWorker> {
   public:
+    struct Stream {
+        void setErrorStatus() { status = WorkerStatus::ABORT; }
+        void setStopStatus() { status = WorkerStatus::EXIT; }
+        std::atomic<WorkerStatus> status = WorkerStatus::CONTINUE;
+    };
+
     // Use nullptr to test error reporting from the worker thread.
-    explicit TestWorker(TestStream* stream) : mStream(stream) {}
+    explicit TestWorker(Stream* stream) : mStream(stream) {}
 
     size_t getWorkerCycles() const { return mWorkerCycles; }
     int getPriority() const { return mPriority; }
@@ -45,16 +49,16 @@
     }
 
     std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
-    bool workerCycle() {
+    WorkerStatus workerCycle() {
         mPriority = getpriority(PRIO_PROCESS, 0);
         do {
             mWorkerCycles++;
         } while (mWorkerCycles == 0);
-        return !mStream->error;
+        return mStream->status;
     }
 
   private:
-    TestStream* const mStream;
+    Stream* const mStream;
     std::atomic<size_t> mWorkerCycles = 0;
     std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
 };
@@ -70,7 +74,8 @@
     }
 
   protected:
-    StreamWorkerInvalidTest(TestStream* stream) : testing::TestWithParam<bool>(), worker(stream) {}
+    StreamWorkerInvalidTest(TestWorker::Stream* stream)
+        : testing::TestWithParam<bool>(), worker(stream) {}
     TestWorker worker;
 };
 
@@ -118,7 +123,7 @@
     StreamWorkerTest() : StreamWorkerInvalidTest(&stream) {}
 
   protected:
-    TestStream stream;
+    TestWorker::Stream stream;
 };
 
 static constexpr unsigned kWorkerIdleCheckTime = 50 * 1000;
@@ -130,21 +135,47 @@
 
 TEST_P(StreamWorkerTest, Start) {
     ASSERT_TRUE(worker.start());
+    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
+    EXPECT_FALSE(worker.hasError());
+}
+
+TEST_P(StreamWorkerTest, StartStop) {
+    ASSERT_TRUE(worker.start());
+    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
+    EXPECT_FALSE(worker.hasError());
+    worker.stop();
+    EXPECT_FALSE(worker.hasError());
+}
+
+TEST_P(StreamWorkerTest, WorkerExit) {
+    ASSERT_TRUE(worker.start());
+    stream.setStopStatus();
     worker.waitForAtLeastOneCycle();
     EXPECT_FALSE(worker.hasError());
+    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
 }
 
 TEST_P(StreamWorkerTest, WorkerError) {
     ASSERT_TRUE(worker.start());
-    stream.error = true;
+    stream.setErrorStatus();
     worker.waitForAtLeastOneCycle();
     EXPECT_TRUE(worker.hasError());
     EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
 }
 
+TEST_P(StreamWorkerTest, StopAfterError) {
+    ASSERT_TRUE(worker.start());
+    stream.setErrorStatus();
+    worker.waitForAtLeastOneCycle();
+    EXPECT_TRUE(worker.hasError());
+    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
+    worker.stop();
+    EXPECT_TRUE(worker.hasError());
+}
+
 TEST_P(StreamWorkerTest, PauseResume) {
     ASSERT_TRUE(worker.start());
-    worker.waitForAtLeastOneCycle();
+    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
     EXPECT_FALSE(worker.hasError());
     worker.pause();
     EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
@@ -158,7 +189,7 @@
 
 TEST_P(StreamWorkerTest, StopPaused) {
     ASSERT_TRUE(worker.start());
-    worker.waitForAtLeastOneCycle();
+    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
     EXPECT_FALSE(worker.hasError());
     worker.pause();
     worker.stop();
@@ -167,7 +198,7 @@
 
 TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) {
     ASSERT_TRUE(worker.start());
-    stream.error = true;
+    stream.setErrorStatus();
     worker.waitForAtLeastOneCycle();
     EXPECT_TRUE(worker.hasError());
     worker.pause();
@@ -177,7 +208,7 @@
 
 TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) {
     ASSERT_TRUE(worker.start());
-    stream.error = true;
+    stream.setErrorStatus();
     worker.waitForAtLeastOneCycle();
     EXPECT_TRUE(worker.hasError());
     worker.resume();
@@ -187,11 +218,11 @@
 
 TEST_P(StreamWorkerTest, WorkerErrorOnResume) {
     ASSERT_TRUE(worker.start());
-    worker.waitForAtLeastOneCycle();
+    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
     EXPECT_FALSE(worker.hasError());
     worker.pause();
     EXPECT_FALSE(worker.hasError());
-    stream.error = true;
+    stream.setErrorStatus();
     EXPECT_FALSE(worker.hasError());
     worker.resume();
     worker.waitForAtLeastOneCycle();
@@ -208,7 +239,7 @@
 
 TEST_P(StreamWorkerTest, WaitForAtLeastOneCycleError) {
     ASSERT_TRUE(worker.start());
-    stream.error = true;
+    stream.setErrorStatus();
     EXPECT_FALSE(worker.waitForAtLeastOneCycle());
 }
 
@@ -220,7 +251,7 @@
         usleep(kWorkerIdleCheckTime);
     }
     worker.testLockUnlockMutex(false);
-    worker.waitForAtLeastOneCycle();
+    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
     EXPECT_FALSE(worker.hasError());
 }
 
@@ -235,7 +266,7 @@
 TEST_P(StreamWorkerTest, ThreadPriority) {
     const int priority = ANDROID_PRIORITY_LOWEST;
     ASSERT_TRUE(worker.start("", priority)) << worker.getError();
-    worker.waitForAtLeastOneCycle();
+    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
     EXPECT_EQ(priority, worker.getPriority());
 }