MH2 | Implement pending writes thread

Spin up a background thread inside the hal proxy that is responsible for
writing events to the event fmq if a previous write failed on the normal
postEvents thread. Create several new unit tests that help test the new
functionality.

Bug: 136511617
Test: Unit tests passing.
Change-Id: Ic35c9736fc0402297ab50072c195f66c9feb887d
diff --git a/sensors/2.0/multihal/HalProxy.cpp b/sensors/2.0/multihal/HalProxy.cpp
index 81d1b64..ccd6e66 100644
--- a/sensors/2.0/multihal/HalProxy.cpp
+++ b/sensors/2.0/multihal/HalProxy.cpp
@@ -49,8 +49,15 @@
 }
 
 HalProxy::~HalProxy() {
-    // TODO: Join any running threads and clean up FMQs and any other allocated
-    // state.
+    {
+        std::lock_guard<std::mutex> lockGuard(mEventQueueWriteMutex);
+        mPendingWritesRun = false;
+        mEventQueueWriteCV.notify_one();
+    }
+    if (mPendingWritesThread.joinable()) {
+        mPendingWritesThread.join();
+    }
+    // TODO: Cleanup wakeup thread once it is implemented
 }
 
 Return<void> HalProxy::getSensorsList(getSensorsList_cb _hidl_cb) {
@@ -120,7 +127,8 @@
         result = Result::BAD_VALUE;
     }
 
-    // TODO: start threads to read wake locks and process events from sub HALs.
+    mPendingWritesThread = std::thread(startPendingWritesThread, this);
+    // TODO: start threads to read wake locks.
 
     for (size_t i = 0; i < mSubHalList.size(); i++) {
         auto subHal = mSubHalList[i];
@@ -257,21 +265,66 @@
     initializeSensorList();
 }
 
+void HalProxy::startPendingWritesThread(HalProxy* halProxy) {
+    halProxy->handlePendingWrites();
+}
+
+void HalProxy::handlePendingWrites() {
+    // TODO: Find a way to optimize locking strategy maybe using two mutexes instead of one.
+    std::unique_lock<std::mutex> lock(mEventQueueWriteMutex);
+    while (mPendingWritesRun) {
+        mEventQueueWriteCV.wait(
+                lock, [&] { return !mPendingWriteEventsQueue.empty() || !mPendingWritesRun; });
+        if (!mPendingWriteEventsQueue.empty() && mPendingWritesRun) {
+            std::vector<Event>& pendingWriteEvents = mPendingWriteEventsQueue.front();
+            size_t eventQueueSize = mEventQueue->getQuantumCount();
+            size_t numToWrite = std::min(pendingWriteEvents.size(), eventQueueSize);
+            lock.unlock();
+            // TODO: Find a way to interrup writeBlocking if the thread should exit
+            // so we don't have to wait for timeout on framework restarts.
+            if (!mEventQueue->writeBlocking(
+                        pendingWriteEvents.data(), numToWrite,
+                        static_cast<uint32_t>(EventQueueFlagBits::EVENTS_READ),
+                        static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS),
+                        kWakelockTimeoutNs, mEventQueueFlag)) {
+                ALOGE("Dropping %zu events after blockingWrite failed.", numToWrite);
+            } else {
+                mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
+            }
+            lock.lock();
+            if (pendingWriteEvents.size() > eventQueueSize) {
+                // TODO: Check if this erase operation is too inefficient. It will copy all the
+                // events ahead of it down to fill gap off array at front after the erase.
+                pendingWriteEvents.erase(pendingWriteEvents.begin(),
+                                         pendingWriteEvents.begin() + eventQueueSize);
+            } else {
+                mPendingWriteEventsQueue.pop();
+            }
+        }
+    }
+}
+
 void HalProxy::postEventsToMessageQueue(const std::vector<Event>& events) {
-    std::lock_guard<std::mutex> lock(mEventQueueMutex);
-    size_t numToWrite = std::min(events.size(), mEventQueue->availableToWrite());
-    if (numToWrite > 0) {
-        if (mEventQueue->write(events.data(), numToWrite)) {
-            // TODO: While loop if mEventQueue->avaiableToWrite > 0 to possibly fit in more writes
-            // immediately
-            mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
-        } else {
-            numToWrite = 0;
+    size_t numToWrite = 0;
+    std::lock_guard<std::mutex> lock(mEventQueueWriteMutex);
+    if (mPendingWriteEventsQueue.empty()) {
+        numToWrite = std::min(events.size(), mEventQueue->availableToWrite());
+        if (numToWrite > 0) {
+            if (mEventQueue->write(events.data(), numToWrite)) {
+                // TODO: While loop if mEventQueue->avaiableToWrite > 0 to possibly fit in more
+                // writes immediately
+                mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
+            } else {
+                numToWrite = 0;
+            }
         }
     }
     if (numToWrite < events.size()) {
-        // TODO: Post from events[numToWrite -> end] to background events queue
-        // Signal background thread
+        // TODO: Bound the mPendingWriteEventsQueue so that we do not trigger OOMs if framework
+        // stalls
+        mPendingWriteEventsQueue.push(
+                std::vector<Event>(events.begin() + numToWrite, events.end()));
+        mEventQueueWriteCV.notify_one();
     }
 }
 
diff --git a/sensors/2.0/multihal/include/HalProxy.h b/sensors/2.0/multihal/include/HalProxy.h
index bdcc1ff..ae4b2c5 100644
--- a/sensors/2.0/multihal/include/HalProxy.h
+++ b/sensors/2.0/multihal/include/HalProxy.h
@@ -24,7 +24,12 @@
 #include <hidl/MQDescriptor.h>
 #include <hidl/Status.h>
 
+#include <atomic>
+#include <condition_variable>
 #include <map>
+#include <mutex>
+#include <queue>
+#include <thread>
 
 namespace android {
 namespace hardware {
@@ -159,6 +164,7 @@
      */
     std::vector<ISensorsSubHal*> mSubHalList;
 
+    //! The list of subhal callbacks for each subhal where the indices correlate with mSubHalList
     std::vector<const sp<IHalProxyCallback>> mSubHalCallbacks;
 
     /**
@@ -179,6 +185,9 @@
     //! The mutex for the event queue.
     std::mutex mEventQueueMutex;
 
+    //! The timeout for each pending write on background thread for events.
+    static const int64_t kWakelockTimeoutNs = 5 * INT64_C(1000000000) /* 5 seconds */;
+
     //! The scoped wakelock ref count.
     size_t mWakelockRefCount = 0;
 
@@ -188,6 +197,21 @@
     //! The bit mask used to get the subhal index from a sensor handle.
     static constexpr uint32_t kSensorHandleSubHalIndexMask = 0xFF000000;
 
+    //! The events that were not able to be written to fmq right away
+    std::queue<std::vector<Event>> mPendingWriteEventsQueue;
+
+    //! The mutex protecting writing to the fmq and the pending events queue
+    std::mutex mEventQueueWriteMutex;
+
+    //! The condition variable waiting on pending write events to stack up
+    std::condition_variable mEventQueueWriteCV;
+
+    //! The thread object ptr that handles pending writes
+    std::thread mPendingWritesThread;
+
+    //! The bool indicating whether to end the pending writes background thread or not
+    bool mPendingWritesRun = true;
+
     /**
      * Initialize the list of SubHal objects in mSubHalList by reading from dynamic libraries
      * listed in a config file.
@@ -211,6 +235,16 @@
     void initializeSubHalCallbacksAndSensorList();
 
     /**
+     * Starts the thread that handles pending writes to event fmq.
+     *
+     * @param halProxy The HalProxy object pointer.
+     */
+    static void startPendingWritesThread(HalProxy* halProxy);
+
+    //! Handles the pending writes on events to eventqueue.
+    void handlePendingWrites();
+
+    /**
      * Clear direct channel flags if the HalProxy has already chosen a subhal as its direct channel
      * subhal. Set the directChannelSubHal pointer to the subHal passed in if this is the first
      * direct channel enabled sensor seen.
diff --git a/sensors/2.0/multihal/tests/HalProxy_test.cpp b/sensors/2.0/multihal/tests/HalProxy_test.cpp
index 4b1a15e..61fb14c 100644
--- a/sensors/2.0/multihal/tests/HalProxy_test.cpp
+++ b/sensors/2.0/multihal/tests/HalProxy_test.cpp
@@ -22,11 +22,10 @@
 #include "ScopedWakelock.h"
 #include "SensorsSubHal.h"
 
+#include <chrono>
+#include <thread>
 #include <vector>
 
-#undef LOG_TAG
-#define LOG_TAG "HalProxy_test"
-
 namespace {
 
 using ::android::hardware::hidl_vec;
@@ -98,7 +97,7 @@
  * Construct and return a HIDL Event type thats sensorHandle refers to a proximity sensor
  *    which is a wakeup type sensor.
  *
- * @ return A proximity event.
+ * @return A proximity event.
  */
 Event makeProximityEvent();
 
@@ -106,10 +105,30 @@
  * Construct and return a HIDL Event type thats sensorHandle refers to a proximity sensor
  *    which is a wakeup type sensor.
  *
- * @ return A proximity event.
+ * @return A proximity event.
  */
 Event makeAccelerometerEvent();
 
+/**
+ * Make a certain number of proximity type events with the sensorHandle field set to
+ * the proper number for AllSensorsSubHal subhal type.
+ *
+ * @param numEvents The number of events to make.
+ *
+ * @return The created list of events.
+ */
+std::vector<Event> makeMultipleProximityEvents(size_t numEvents);
+
+/**
+ * Make a certain number of accelerometer type events with the sensorHandle field set to
+ * the proper number for AllSensorsSubHal subhal type.
+ *
+ * @param numEvents The number of events to make.
+ *
+ * @return The created list of events.
+ */
+std::vector<Event> makeMultipleAccelerometerEvents(size_t numEvents);
+
 // Tests follow
 TEST(HalProxyTest, GetSensorsListOneSubHalTest) {
     AllSensorsSubHal subHal;
@@ -232,10 +251,7 @@
     ::android::sp<ISensorsCallback> callback = new SensorsCallback();
     proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
 
-    std::vector<Event> events;
-    for (size_t i = 0; i < kNumEvents; i++) {
-        events.push_back(makeAccelerometerEvent());
-    }
+    std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
     subHal.postEvents(events, false /* wakeup */);
 
     EXPECT_EQ(eventQueue->availableToRead(), kNumEvents);
@@ -272,15 +288,114 @@
     ::android::sp<ISensorsCallback> callback = new SensorsCallback();
     proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
 
-    std::vector<Event> events;
-    for (size_t i = 0; i < kNumEvents; i++) {
-        events.push_back(makeProximityEvent());
-    }
+    std::vector<Event> events = makeMultipleProximityEvents(kNumEvents);
     subHal.postEvents(events, true /* wakeup */);
 
     EXPECT_EQ(eventQueue->availableToRead(), kNumEvents);
 }
 
+TEST(HalProxyTest, PostEventsMultipleSubhals) {
+    constexpr size_t kQueueSize = 5;
+    constexpr size_t kNumEvents = 2;
+    AllSensorsSubHal subHal1, subHal2;
+    std::vector<ISensorsSubHal*> subHals{&subHal1, &subHal2};
+    HalProxy proxy(subHals);
+    std::unique_ptr<EventMessageQueue> eventQueue =
+            std::make_unique<EventMessageQueue>(kQueueSize, true);
+    std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
+            std::make_unique<WakeupMessageQueue>(kQueueSize, true);
+    ::android::sp<ISensorsCallback> callback = new SensorsCallback();
+    proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
+
+    std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
+    subHal1.postEvents(events, false /* wakeup */);
+
+    EXPECT_EQ(eventQueue->availableToRead(), kNumEvents);
+
+    subHal2.postEvents(events, false /* wakeup */);
+
+    EXPECT_EQ(eventQueue->availableToRead(), kNumEvents * 2);
+}
+
+TEST(HalProxyTest, PostEventsDelayedWrite) {
+    constexpr size_t kQueueSize = 5;
+    constexpr size_t kNumEvents = 6;
+    AllSensorsSubHal subHal1, subHal2;
+    std::vector<ISensorsSubHal*> subHals{&subHal1, &subHal2};
+    HalProxy proxy(subHals);
+    std::unique_ptr<EventMessageQueue> eventQueue =
+            std::make_unique<EventMessageQueue>(kQueueSize, true);
+    std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
+            std::make_unique<WakeupMessageQueue>(kQueueSize, true);
+    ::android::sp<ISensorsCallback> callback = new SensorsCallback();
+    proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
+
+    std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
+    subHal1.postEvents(events, false /* wakeup */);
+
+    EXPECT_EQ(eventQueue->availableToRead(), kQueueSize);
+
+    Event eventOut;
+    // writeblock 1 event out of queue, timeout for half a second
+    EXPECT_TRUE(eventQueue->readBlocking(&eventOut, 1, 500000000));
+
+    // Sleep for a half second so that blocking write has time complete in background thread
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+    // proxy background thread should have wrote last event when it saw space
+    EXPECT_EQ(eventQueue->availableToRead(), kQueueSize);
+}
+
+TEST(HalProxyTest, PostEventsMultipleSubhalsThreaded) {
+    constexpr size_t kQueueSize = 5;
+    constexpr size_t kNumEvents = 2;
+    AllSensorsSubHal subHal1, subHal2;
+    std::vector<ISensorsSubHal*> subHals{&subHal1, &subHal2};
+    HalProxy proxy(subHals);
+    std::unique_ptr<EventMessageQueue> eventQueue =
+            std::make_unique<EventMessageQueue>(kQueueSize, true);
+    std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
+            std::make_unique<WakeupMessageQueue>(kQueueSize, true);
+    ::android::sp<ISensorsCallback> callback = new SensorsCallback();
+    proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
+
+    std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
+
+    std::thread t1(&AllSensorsSubHal::postEvents, &subHal1, events, false);
+    std::thread t2(&AllSensorsSubHal::postEvents, &subHal2, events, false);
+
+    t1.join();
+    t2.join();
+
+    EXPECT_EQ(eventQueue->availableToRead(), kNumEvents * 2);
+}
+
+TEST(HalProxyTest, DestructingWithEventsPendingOnBackgroundThreadTest) {
+    constexpr size_t kQueueSize = 5;
+    constexpr size_t kNumEvents = 6;
+    AllSensorsSubHal subHal;
+    std::vector<ISensorsSubHal*> subHals{&subHal};
+
+    std::unique_ptr<EventMessageQueue> eventQueue =
+            std::make_unique<EventMessageQueue>(kQueueSize, true);
+    std::unique_ptr<WakeupMessageQueue> wakeLockQueue =
+            std::make_unique<WakeupMessageQueue>(kQueueSize, true);
+    ::android::sp<ISensorsCallback> callback = new SensorsCallback();
+    HalProxy proxy(subHals);
+    proxy.initialize(*eventQueue->getDesc(), *wakeLockQueue->getDesc(), callback);
+
+    std::vector<Event> events = makeMultipleAccelerometerEvents(kNumEvents);
+    subHal.postEvents(events, false /* wakeup */);
+
+    // Sleep for a half second so that background thread has time to attempt it's blocking write
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+
+    // Should see a 5 second wait for blocking write timeout here
+
+    // Should be one events left on pending writes queue here and proxy will destruct
+    // If this TEST completes then it was a success, if it hangs we will see a crash
+}
+
 // Helper implementations follow
 void testSensorsListFromProxyAndSubHal(const std::vector<SensorInfo>& proxySensorsList,
                                        const std::vector<SensorInfo>& subHalSensorsList) {
@@ -332,4 +447,20 @@
     return event;
 }
 
+std::vector<Event> makeMultipleProximityEvents(size_t numEvents) {
+    std::vector<Event> events;
+    for (size_t i = 0; i < numEvents; i++) {
+        events.push_back(makeProximityEvent());
+    }
+    return events;
+}
+
+std::vector<Event> makeMultipleAccelerometerEvents(size_t numEvents) {
+    std::vector<Event> events;
+    for (size_t i = 0; i < numEvents; i++) {
+        events.push_back(makeAccelerometerEvent());
+    }
+    return events;
+}
+
 }  // namespace