audio: Implement setting name and priority in StreamWorker

Audio threads need to be able to set their priority.
Also, traditionally these worker threads set a custom
thread name. Implement this functionality in StreamWorker.

Since initialization steps can fail, implement simple
error reporting via a string field `mError`. The state
of the string field replaces the dedicated `ERROR` worker
state.

Bug: 205884982
Test: atest libaudioaidlcommon_test --iterations
Merged-In: Ie9ab94922d47f277a4993a90b478a2fa76657923
Change-Id: Ie9ab94922d47f277a4993a90b478a2fa76657923
(cherry picked from commit e9e0f7c0f54a013088f4235f182c6f07ab7a4d9f)
diff --git a/audio/aidl/common/Android.bp b/audio/aidl/common/Android.bp
index 6a1c4a4..37da9d6 100644
--- a/audio/aidl/common/Android.bp
+++ b/audio/aidl/common/Android.bp
@@ -30,9 +30,11 @@
     export_include_dirs: ["include"],
     header_libs: [
         "libbase_headers",
+        "libsystem_headers",
     ],
     export_header_lib_headers: [
         "libbase_headers",
+        "libsystem_headers",
     ],
 }
 
diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h
index 74e99df..7764904 100644
--- a/audio/aidl/common/include/StreamWorker.h
+++ b/audio/aidl/common/include/StreamWorker.h
@@ -16,29 +16,39 @@
 
 #pragma once
 
+#include <pthread.h>
 #include <sched.h>
+#include <sys/resource.h>
 
 #include <atomic>
 #include <condition_variable>
 #include <mutex>
+#include <string>
 #include <thread>
 
 #include <android-base/thread_annotations.h>
+#include <system/thread_defs.h>
 
 template <typename Impl>
 class StreamWorker {
-    enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED, ERROR };
+    enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
 
   public:
     StreamWorker() = default;
     ~StreamWorker() { stop(); }
-    bool start() {
+    // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
+    // The nice number is used with the default scheduler. For threads that
+    // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
+    // it is recommended to implement an appropriate configuration sequence within `workerInit`.
+    bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
+        mThreadName = name;
+        mThreadPriority = priority;
         mWorker = std::thread(&StreamWorker::workerThread, this);
         std::unique_lock<std::mutex> lock(mWorkerLock);
         android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
         mWorkerCv.wait(lock, [&]() {
             android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
-            return mWorkerState != WorkerState::STOPPED;
+            return mWorkerState == WorkerState::RUNNING || !mError.empty();
         });
         mWorkerStateChangeRequest = false;
         return mWorkerState == WorkerState::RUNNING;
@@ -47,14 +57,20 @@
     void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
     bool hasError() {
         std::lock_guard<std::mutex> lock(mWorkerLock);
-        return mWorkerState == WorkerState::ERROR;
+        return !mError.empty();
+    }
+    std::string getError() {
+        std::lock_guard<std::mutex> lock(mWorkerLock);
+        return mError;
     }
     void stop() {
         {
             std::lock_guard<std::mutex> lock(mWorkerLock);
-            if (mWorkerState == WorkerState::STOPPED) return;
-            mWorkerState = WorkerState::STOPPED;
-            mWorkerStateChangeRequest = true;
+            if (mError.empty()) {
+                if (mWorkerState == WorkerState::STOPPED) return;
+                mWorkerState = WorkerState::STOPPED;
+                mWorkerStateChangeRequest = true;
+            }
         }
         if (mWorker.joinable()) {
             mWorker.join();
@@ -71,17 +87,21 @@
     void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
         lock ? mWorkerLock.lock() : mWorkerLock.unlock();
     }
+    std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); }
 
     // Methods that need to be provided by subclasses:
     //
     // Called once at the beginning of the thread loop. Must return
-    // 'true' to enter the thread loop, otherwise the thread loop
-    // exits and the worker switches into the 'error' state.
-    // bool workerInit();
+    // an empty string to enter the thread loop, otherwise the thread loop
+    // exits and the worker switches into the 'error' state, setting
+    // the error to the returned value.
+    // std::string workerInit();
     //
     // Called for each thread loop unless the thread is in 'paused' state.
     // Must return 'true' to continue running, otherwise the thread loop
-    // exits and the worker switches into the 'error' state.
+    // exits and the worker switches into the 'error' state with a generic
+    // error message. It is recommended that the subclass reports any
+    // problems via logging facilities.
     // bool workerCycle();
 
   private:
@@ -102,13 +122,27 @@
         if (finalState) *finalState = mWorkerState;
     }
     void workerThread() {
-        bool success = static_cast<Impl*>(this)->workerInit();
+        std::string error = static_cast<Impl*>(this)->workerInit();
+        if (error.empty() && !mThreadName.empty()) {
+            std::string compliantName(mThreadName.substr(0, 15));
+            if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str());
+                errCode != 0) {
+                error.append("Failed to set thread name: ").append(strerror(errCode));
+            }
+        }
+        if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
+            if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
+                int errCode = errno;
+                error.append("Failed to set thread priority: ").append(strerror(errCode));
+            }
+        }
         {
             std::lock_guard<std::mutex> lock(mWorkerLock);
-            mWorkerState = success ? WorkerState::RUNNING : WorkerState::ERROR;
+            mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
+            mError = error;
         }
         mWorkerCv.notify_one();
-        if (!success) return;
+        if (!error.empty()) return;
 
         for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
             bool needToNotify = false;
@@ -153,8 +187,8 @@
                     mWorkerState == WorkerState::PAUSE_REQUESTED) {
                     needToNotify = true;
                 }
-                mWorkerState = WorkerState::ERROR;
-                state = WorkerState::STOPPED;
+                state = mWorkerState = WorkerState::STOPPED;
+                mError = "workerCycle failed";
             }
             if (needToNotify) {
                 {
@@ -166,10 +200,13 @@
         }
     }
 
+    std::string mThreadName;
+    int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
     std::thread mWorker;
     std::mutex mWorkerLock;
     std::condition_variable mWorkerCv;
     WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
+    std::string mError GUARDED_BY(mWorkerLock);
     // The atomic lock-free variable is used to prevent priority inversions
     // that can occur when a high priority worker tries to acquire the lock
     // which has been taken by a lower priority control thread which in its turn
diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp
index c9d3dbd..9fb1a8e 100644
--- a/audio/aidl/common/tests/streamworker_tests.cpp
+++ b/audio/aidl/common/tests/streamworker_tests.cpp
@@ -14,8 +14,10 @@
  * limitations under the License.
  */
 
+#include <pthread.h>
 #include <sched.h>
 #include <unistd.h>
+
 #include <atomic>
 
 #include <StreamWorker.h>
@@ -34,6 +36,7 @@
     explicit TestWorker(TestStream* stream) : mStream(stream) {}
 
     size_t getWorkerCycles() const { return mWorkerCycles; }
+    int getPriority() const { return mPriority; }
     bool hasWorkerCycleCalled() const { return mWorkerCycles != 0; }
     bool hasNoWorkerCycleCalled(useconds_t usec) {
         const size_t cyclesBefore = mWorkerCycles;
@@ -41,8 +44,9 @@
         return mWorkerCycles == cyclesBefore;
     }
 
-    bool workerInit() { return mStream; }
+    std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
     bool workerCycle() {
+        mPriority = getpriority(PRIO_PROCESS, 0);
         do {
             mWorkerCycles++;
         } while (mWorkerCycles == 0);
@@ -52,6 +56,7 @@
   private:
     TestStream* const mStream;
     std::atomic<size_t> mWorkerCycles = 0;
+    std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
 };
 
 // The parameter specifies whether an extra call to 'stop' is made at the end.
@@ -219,4 +224,19 @@
     EXPECT_FALSE(worker.hasError());
 }
 
+TEST_P(StreamWorkerTest, ThreadName) {
+    const std::string workerName = "TestWorker";
+    ASSERT_TRUE(worker.start(workerName)) << worker.getError();
+    char nameBuf[128];
+    ASSERT_EQ(0, pthread_getname_np(worker.testGetThreadNativeHandle(), nameBuf, sizeof(nameBuf)));
+    EXPECT_EQ(workerName, nameBuf);
+}
+
+TEST_P(StreamWorkerTest, ThreadPriority) {
+    const int priority = ANDROID_PRIORITY_LOWEST;
+    ASSERT_TRUE(worker.start("", priority)) << worker.getError();
+    worker.waitForAtLeastOneCycle();
+    EXPECT_EQ(priority, worker.getPriority());
+}
+
 INSTANTIATE_TEST_SUITE_P(StreamWorker, StreamWorkerTest, testing::Bool());