audio: Prevent priority inversions in aidl StreamWorker

Avoid taking a lock in high priority worker threads
without a real need.

Bug: 205884982
Test: atest libaudioaidlcommon_test --iterations
Merged-In: I8cc0f5cb58752b7b7d413a9f4e46093c39445892
Change-Id: I8cc0f5cb58752b7b7d413a9f4e46093c39445892
(cherry picked from commit d989a4b66926a6d5fc089f70399f156cd5d3e5a3)
diff --git a/audio/aidl/common/include/StreamWorker.h b/audio/aidl/common/include/StreamWorker.h
index 8a273dc..74e99df 100644
--- a/audio/aidl/common/include/StreamWorker.h
+++ b/audio/aidl/common/include/StreamWorker.h
@@ -18,6 +18,7 @@
 
 #include <sched.h>
 
+#include <atomic>
 #include <condition_variable>
 #include <mutex>
 #include <thread>
@@ -39,6 +40,7 @@
             android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
             return mWorkerState != WorkerState::STOPPED;
         });
+        mWorkerStateChangeRequest = false;
         return mWorkerState == WorkerState::RUNNING;
     }
     void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
@@ -52,6 +54,7 @@
             std::lock_guard<std::mutex> lock(mWorkerLock);
             if (mWorkerState == WorkerState::STOPPED) return;
             mWorkerState = WorkerState::STOPPED;
+            mWorkerStateChangeRequest = true;
         }
         if (mWorker.joinable()) {
             mWorker.join();
@@ -64,6 +67,10 @@
         switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
         return newState == WorkerState::RUNNING;
     }
+    // Only used by unit tests.
+    void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
+        lock ? mWorkerLock.lock() : mWorkerLock.unlock();
+    }
 
     // Methods that need to be provided by subclasses:
     //
@@ -87,6 +94,7 @@
             return;
         }
         mWorkerState = newState;
+        mWorkerStateChangeRequest = true;
         mWorkerCv.wait(lock, [&]() {
             android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
             return mWorkerState != newState;
@@ -106,6 +114,11 @@
             bool needToNotify = false;
             if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle()
                                              : (sched_yield(), true)) {
+                {
+                    // See https://developer.android.com/training/articles/smp#nonracing
+                    android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
+                    if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
+                }
                 //
                 // Pause and resume are synchronous. One worker cycle must complete
                 // before the worker indicates a state change. This is how 'mWorkerState' and
@@ -123,10 +136,10 @@
                 // first workerCycle gets executed, the code below triggers a client notification
                 //   (or if workerCycle fails, worker enters 'error' state and also notifies)
                 // state := mWorkerState (RUNNING)
+                std::lock_guard<std::mutex> lock(mWorkerLock);
                 if (state == WorkerState::RESUME_REQUESTED) {
                     needToNotify = true;
                 }
-                std::lock_guard<std::mutex> lock(mWorkerLock);
                 state = mWorkerState;
                 if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
                     state = mWorkerState = WorkerState::PAUSED;
@@ -144,6 +157,10 @@
                 state = WorkerState::STOPPED;
             }
             if (needToNotify) {
+                {
+                    std::lock_guard<std::mutex> lock(mWorkerLock);
+                    mWorkerStateChangeRequest = false;
+                }
                 mWorkerCv.notify_one();
             }
         }
@@ -153,4 +170,14 @@
     std::mutex mWorkerLock;
     std::condition_variable mWorkerCv;
     WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
+    // The atomic lock-free variable is used to prevent priority inversions
+    // that can occur when a high priority worker tries to acquire the lock
+    // which has been taken by a lower priority control thread which in its turn
+    // got preempted. To prevent a PI under normal operating conditions, that is,
+    // when there are no errors or state changes, the worker does not attempt
+    // taking `mWorkerLock` unless `mWorkerStateChangeRequest` is set.
+    // To make sure that updates to `mWorkerState` and `mWorkerStateChangeRequest`
+    // are serialized, they are always made under a lock.
+    static_assert(std::atomic<bool>::is_always_lock_free);
+    std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
 };
diff --git a/audio/aidl/common/tests/streamworker_tests.cpp b/audio/aidl/common/tests/streamworker_tests.cpp
index bb354e5..c9d3dbd 100644
--- a/audio/aidl/common/tests/streamworker_tests.cpp
+++ b/audio/aidl/common/tests/streamworker_tests.cpp
@@ -207,4 +207,16 @@
     EXPECT_FALSE(worker.waitForAtLeastOneCycle());
 }
 
+TEST_P(StreamWorkerTest, MutexDoesNotBlockWorker) {
+    ASSERT_TRUE(worker.start());
+    const size_t workerCyclesBefore = worker.getWorkerCycles();
+    worker.testLockUnlockMutex(true);
+    while (worker.getWorkerCycles() == workerCyclesBefore) {
+        usleep(kWorkerIdleCheckTime);
+    }
+    worker.testLockUnlockMutex(false);
+    worker.waitForAtLeastOneCycle();
+    EXPECT_FALSE(worker.hasError());
+}
+
 INSTANTIATE_TEST_SUITE_P(StreamWorker, StreamWorkerTest, testing::Bool());