Use semaphore instead of condition variable

BackgroundExecutor uses a condition variable to synchronize processing
to a work queue, which means that there is possible mutex contention on
the main thread in rare scenarios.

Instead, we can make the main thread non-blocking with a thread-safe
queue, and use semaphores for signaling the background thread to wake
up. Using semaphores saves a small number of instructions, but overall
reduces contention

Bug: 232113929
Test: bouncy ball

Change-Id: Iced25e8349bdb2a70cac1ed681059a0b14258407
diff --git a/services/surfaceflinger/BackgroundExecutor.cpp b/services/surfaceflinger/BackgroundExecutor.cpp
index de8e6b3..a15de2b 100644
--- a/services/surfaceflinger/BackgroundExecutor.cpp
+++ b/services/surfaceflinger/BackgroundExecutor.cpp
@@ -19,6 +19,8 @@
 #define LOG_TAG "BackgroundExecutor"
 #define ATRACE_TAG ATRACE_TAG_GRAPHICS
 
+#include <utils/Log.h>
+
 #include "BackgroundExecutor.h"
 
 namespace android {
@@ -27,41 +29,49 @@
 
 BackgroundExecutor::BackgroundExecutor() : Singleton<BackgroundExecutor>() {
     mThread = std::thread([&]() {
-        bool done = false;
-        while (!done) {
-            std::vector<std::function<void()>> tasks;
-            {
-                std::unique_lock lock(mMutex);
-                android::base::ScopedLockAssertion assumeLock(mMutex);
-                mWorkAvailableCv.wait(lock,
-                                      [&]() REQUIRES(mMutex) { return mDone || !mTasks.empty(); });
-                tasks = std::move(mTasks);
-                mTasks.clear();
-                done = mDone;
-            } // unlock mMutex
+        LOG_ALWAYS_FATAL_IF(sem_init(&mSemaphore, 0, 0), "sem_init failed");
+        while (!mDone) {
+            LOG_ALWAYS_FATAL_IF(sem_wait(&mSemaphore), "sem_wait failed (%d)", errno);
 
-            for (auto& task : tasks) {
-                task();
+            ftl::SmallVector<Work*, 10> workItems;
+
+            Work* work = mWorks.pop();
+            while (work) {
+                workItems.push_back(work);
+                work = mWorks.pop();
+            }
+
+            // Sequence numbers are guaranteed to be in intended order, as we assume a single
+            // producer and single consumer.
+            std::stable_sort(workItems.begin(), workItems.end(), [](Work* left, Work* right) {
+                return left->sequence < right->sequence;
+            });
+            for (Work* work : workItems) {
+                for (auto& task : work->tasks) {
+                    task();
+                }
+                delete work;
             }
         }
     });
 }
 
 BackgroundExecutor::~BackgroundExecutor() {
-    {
-        std::scoped_lock lock(mMutex);
-        mDone = true;
-        mWorkAvailableCv.notify_all();
-    }
+    mDone = true;
+    LOG_ALWAYS_FATAL_IF(sem_post(&mSemaphore), "sem_post failed");
     if (mThread.joinable()) {
         mThread.join();
+        LOG_ALWAYS_FATAL_IF(sem_destroy(&mSemaphore), "sem_destroy failed");
     }
 }
 
-void BackgroundExecutor::execute(std::function<void()> task) {
-    std::scoped_lock lock(mMutex);
-    mTasks.emplace_back(std::move(task));
-    mWorkAvailableCv.notify_all();
+void BackgroundExecutor::sendCallbacks(Callbacks&& tasks) {
+    Work* work = new Work();
+    work->sequence = mSequence;
+    work->tasks = std::move(tasks);
+    mWorks.push(work);
+    mSequence++;
+    LOG_ALWAYS_FATAL_IF(sem_post(&mSemaphore), "sem_post failed");
 }
 
 } // namespace android
\ No newline at end of file
diff --git a/services/surfaceflinger/BackgroundExecutor.h b/services/surfaceflinger/BackgroundExecutor.h
index 6db7dda..eeaf3bd 100644
--- a/services/surfaceflinger/BackgroundExecutor.h
+++ b/services/surfaceflinger/BackgroundExecutor.h
@@ -16,9 +16,11 @@
 
 #pragma once
 
+#include <Tracing/LocklessStack.h>
 #include <android-base/thread_annotations.h>
+#include <ftl/small_vector.h>
+#include <semaphore.h>
 #include <utils/Singleton.h>
-#include <condition_variable>
 #include <mutex>
 #include <queue>
 #include <thread>
@@ -30,13 +32,26 @@
 public:
     BackgroundExecutor();
     ~BackgroundExecutor();
-    void execute(std::function<void()>);
+    using Callbacks = ftl::SmallVector<std::function<void()>, 10>;
+    // Queues callbacks onto a work queue to be executed by a background thread.
+    // Note that this is not thread-safe - a single producer is assumed.
+    void sendCallbacks(Callbacks&& tasks);
 
 private:
-    std::mutex mMutex;
-    std::condition_variable mWorkAvailableCv GUARDED_BY(mMutex);
-    bool mDone GUARDED_BY(mMutex) = false;
-    std::vector<std::function<void()>> mTasks GUARDED_BY(mMutex);
+    sem_t mSemaphore;
+    std::atomic_bool mDone = false;
+
+    // Sequence number for work items.
+    // Work items are batched by sequence number. Work items for earlier sequence numbers are
+    // executed first. Work items with the same sequence number are executed in the same order they
+    // were added to the stack (meaning the stack must reverse the order after popping from the
+    // queue)
+    int32_t mSequence = 0;
+    struct Work {
+        int32_t sequence = 0;
+        Callbacks tasks;
+    };
+    LocklessStack<Work> mWorks;
     std::thread mThread;
 };
 
diff --git a/services/surfaceflinger/SurfaceFlinger.cpp b/services/surfaceflinger/SurfaceFlinger.cpp
index d93281b..1495249 100644
--- a/services/surfaceflinger/SurfaceFlinger.cpp
+++ b/services/surfaceflinger/SurfaceFlinger.cpp
@@ -3226,12 +3226,12 @@
     if (!updateWindowInfo && mInputWindowCommands.empty()) {
         return;
     }
-    BackgroundExecutor::getInstance().execute([updateWindowInfo,
-                                               windowInfos = std::move(windowInfos),
-                                               displayInfos = std::move(displayInfos),
-                                               inputWindowCommands =
-                                                       std::move(mInputWindowCommands),
-                                               inputFlinger = mInputFlinger, this]() {
+    BackgroundExecutor::getInstance().sendCallbacks({[updateWindowInfo,
+                                                      windowInfos = std::move(windowInfos),
+                                                      displayInfos = std::move(displayInfos),
+                                                      inputWindowCommands =
+                                                              std::move(mInputWindowCommands),
+                                                      inputFlinger = mInputFlinger, this]() {
         ATRACE_NAME("BackgroundExecutor::updateInputFlinger");
         if (updateWindowInfo) {
             mWindowInfosListenerInvoker->windowInfosChanged(windowInfos, displayInfos,
@@ -3244,7 +3244,7 @@
         for (const auto& focusRequest : inputWindowCommands.focusRequests) {
             inputFlinger->setFocusedWindow(focusRequest);
         }
-    });
+    }});
 
     mInputWindowCommands.clear();
 }
diff --git a/services/surfaceflinger/TransactionCallbackInvoker.cpp b/services/surfaceflinger/TransactionCallbackInvoker.cpp
index e1f348f..9f4a6cd 100644
--- a/services/surfaceflinger/TransactionCallbackInvoker.cpp
+++ b/services/surfaceflinger/TransactionCallbackInvoker.cpp
@@ -184,6 +184,7 @@
 void TransactionCallbackInvoker::sendCallbacks(bool onCommitOnly) {
     // For each listener
     auto completedTransactionsItr = mCompletedTransactions.begin();
+    BackgroundExecutor::Callbacks callbacks;
     while (completedTransactionsItr != mCompletedTransactions.end()) {
         auto& [listener, transactionStatsDeque] = *completedTransactionsItr;
         ListenerStats listenerStats;
@@ -218,7 +219,7 @@
                 // keep it as an IBinder due to consistency reasons: if we
                 // interface_cast at the IPC boundary when reading a Parcel,
                 // we get pointers that compare unequal in the SF process.
-                BackgroundExecutor::getInstance().execute([stats = std::move(listenerStats)]() {
+                callbacks.emplace_back([stats = std::move(listenerStats)]() {
                     interface_cast<ITransactionCompletedListener>(stats.listener)
                             ->onTransactionCompleted(stats);
                 });
@@ -230,6 +231,8 @@
     if (mPresentFence) {
         mPresentFence.clear();
     }
+
+    BackgroundExecutor::getInstance().sendCallbacks(std::move(callbacks));
 }
 
 // -----------------------------------------------------------------------