MH2 | Implement wakelock processing thread

Bug: 136511617
Test: Unit tests cannot be written for this change. Integration testing
will be performed later.

Change-Id: I651b5fde77c6017b3270413de896a947d95ff5f8
diff --git a/sensors/2.0/multihal/HalProxy.cpp b/sensors/2.0/multihal/HalProxy.cpp
index 5aa3b3d..d667218 100644
--- a/sensors/2.0/multihal/HalProxy.cpp
+++ b/sensors/2.0/multihal/HalProxy.cpp
@@ -24,6 +24,7 @@
 
 #include <dlfcn.h>
 
+#include <cinttypes>
 #include <fstream>
 #include <functional>
 #include <thread>
@@ -35,6 +36,9 @@
 namespace implementation {
 
 using ::android::hardware::sensors::V2_0::EventQueueFlagBits;
+using ::android::hardware::sensors::V2_0::WakeLockQueueFlagBits;
+using ::android::hardware::sensors::V2_0::implementation::getTimeNow;
+using ::android::hardware::sensors::V2_0::implementation::kWakelockTimeoutNs;
 
 typedef ISensorsSubHal*(SensorsHalGetSubHalFunc)(uint32_t*);
 
@@ -53,23 +57,23 @@
 HalProxy::HalProxy() {
     const char* kMultiHalConfigFile = "/vendor/etc/sensors/hals.conf";
     initializeSubHalListFromConfigFile(kMultiHalConfigFile);
-    initializeSubHalCallbacksAndSensorList();
+    init();
 }
 
 HalProxy::HalProxy(std::vector<ISensorsSubHal*>& subHalList) : mSubHalList(subHalList) {
-    initializeSubHalCallbacksAndSensorList();
+    init();
 }
 
 HalProxy::~HalProxy() {
-    {
-        std::lock_guard<std::mutex> lockGuard(mEventQueueWriteMutex);
-        mPendingWritesRun = false;
-        mEventQueueWriteCV.notify_one();
-    }
+    mThreadsRun.store(false);
+    mWakelockCV.notify_one();
+    mEventQueueWriteCV.notify_one();
     if (mPendingWritesThread.joinable()) {
         mPendingWritesThread.join();
     }
-    // TODO: Cleanup wakeup thread once it is implemented
+    if (mWakelockThread.joinable()) {
+        mWakelockThread.join();
+    }
 }
 
 Return<void> HalProxy::getSensorsList(getSensorsList_cb _hidl_cb) {
@@ -140,7 +144,7 @@
     }
 
     mPendingWritesThread = std::thread(startPendingWritesThread, this);
-    // TODO: start threads to read wake locks.
+    mWakelockThread = std::thread(startWakelockThread, this);
 
     for (size_t i = 0; i < mSubHalList.size(); i++) {
         auto subHal = mSubHalList[i];
@@ -322,7 +326,7 @@
     }
 }
 
-void HalProxy::initializeSubHalCallbacksAndSensorList() {
+void HalProxy::init() {
     initializeSubHalCallbacks();
     initializeSensorList();
 }
@@ -334,11 +338,12 @@
 void HalProxy::handlePendingWrites() {
     // TODO: Find a way to optimize locking strategy maybe using two mutexes instead of one.
     std::unique_lock<std::mutex> lock(mEventQueueWriteMutex);
-    while (mPendingWritesRun) {
+    while (mThreadsRun.load()) {
         mEventQueueWriteCV.wait(
-                lock, [&] { return !mPendingWriteEventsQueue.empty() || !mPendingWritesRun; });
-        if (!mPendingWriteEventsQueue.empty() && mPendingWritesRun) {
-            std::vector<Event>& pendingWriteEvents = mPendingWriteEventsQueue.front();
+                lock, [&] { return !mPendingWriteEventsQueue.empty() || !mThreadsRun.load(); });
+        if (mThreadsRun.load()) {
+            std::vector<Event>& pendingWriteEvents = mPendingWriteEventsQueue.front().first;
+            size_t numWakeupEvents = mPendingWriteEventsQueue.front().second;
             size_t eventQueueSize = mEventQueue->getQuantumCount();
             size_t numToWrite = std::min(pendingWriteEvents.size(), eventQueueSize);
             lock.unlock();
@@ -348,10 +353,16 @@
                         pendingWriteEvents.data(), numToWrite,
                         static_cast<uint32_t>(EventQueueFlagBits::EVENTS_READ),
                         static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS),
-                        kWakelockTimeoutNs, mEventQueueFlag)) {
+                        kPendingWriteTimeoutNs, mEventQueueFlag)) {
                 ALOGE("Dropping %zu events after blockingWrite failed.", numToWrite);
-            } else {
-                mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
+                if (numWakeupEvents > 0) {
+                    if (pendingWriteEvents.size() > eventQueueSize) {
+                        decrementRefCountAndMaybeReleaseWakelock(
+                                countNumWakeupEvents(pendingWriteEvents, eventQueueSize));
+                    } else {
+                        decrementRefCountAndMaybeReleaseWakelock(numWakeupEvents);
+                    }
+                }
             }
             lock.lock();
             if (pendingWriteEvents.size() > eventQueueSize) {
@@ -366,9 +377,60 @@
     }
 }
 
-void HalProxy::postEventsToMessageQueue(const std::vector<Event>& events) {
+void HalProxy::startWakelockThread(HalProxy* halProxy) {
+    halProxy->handleWakelocks();
+}
+
+void HalProxy::handleWakelocks() {
+    std::unique_lock<std::recursive_mutex> lock(mWakelockMutex);
+    while (mThreadsRun.load()) {
+        mWakelockCV.wait(lock, [&] { return mWakelockRefCount > 0 || !mThreadsRun.load(); });
+        if (mThreadsRun.load()) {
+            int64_t timeLeft;
+            if (sharedWakelockDidTimeout(&timeLeft)) {
+                resetSharedWakelock();
+            } else {
+                uint32_t numWakeLocksProcessed;
+                lock.unlock();
+                bool success = mWakeLockQueue->readBlocking(
+                        &numWakeLocksProcessed, 1, 0,
+                        static_cast<uint32_t>(WakeLockQueueFlagBits::DATA_WRITTEN), timeLeft);
+                lock.lock();
+                if (success) {
+                    decrementRefCountAndMaybeReleaseWakelock(
+                            static_cast<size_t>(numWakeLocksProcessed));
+                }
+            }
+        }
+    }
+    resetSharedWakelock();
+}
+
+bool HalProxy::sharedWakelockDidTimeout(int64_t* timeLeft) {
+    bool didTimeout;
+    int64_t duration = getTimeNow() - mWakelockTimeoutStartTime;
+    if (duration > kWakelockTimeoutNs) {
+        didTimeout = true;
+    } else {
+        didTimeout = false;
+        *timeLeft = kWakelockTimeoutNs - duration;
+    }
+    return didTimeout;
+}
+
+void HalProxy::resetSharedWakelock() {
+    std::lock_guard<std::recursive_mutex> lockGuard(mWakelockMutex);
+    decrementRefCountAndMaybeReleaseWakelock(mWakelockRefCount);
+    mWakelockTimeoutResetTime = getTimeNow();
+}
+
+void HalProxy::postEventsToMessageQueue(const std::vector<Event>& events, size_t numWakeupEvents,
+                                        ScopedWakelock wakelock) {
     size_t numToWrite = 0;
     std::lock_guard<std::mutex> lock(mEventQueueWriteMutex);
+    if (wakelock.isLocked()) {
+        incrementRefCountAndMaybeAcquireWakelock(numWakeupEvents);
+    }
     if (mPendingWriteEventsQueue.empty()) {
         numToWrite = std::min(events.size(), mEventQueue->availableToWrite());
         if (numToWrite > 0) {
@@ -384,28 +446,37 @@
     if (numToWrite < events.size()) {
         // TODO: Bound the mPendingWriteEventsQueue so that we do not trigger OOMs if framework
         // stalls
-        mPendingWriteEventsQueue.push(
-                std::vector<Event>(events.begin() + numToWrite, events.end()));
+        std::vector<Event> eventsLeft(events.begin() + numToWrite, events.end());
+        mPendingWriteEventsQueue.push({eventsLeft, numWakeupEvents});
         mEventQueueWriteCV.notify_one();
     }
 }
 
-// TODO: Implement the wakelock timeout in these next two methods. Also pass in the subhal
-// index for better tracking.
-
-void HalProxy::incrementRefCountAndMaybeAcquireWakelock() {
-    std::lock_guard<std::mutex> lockGuard(mWakelockRefCountMutex);
+bool HalProxy::incrementRefCountAndMaybeAcquireWakelock(size_t delta,
+                                                        int64_t* timeoutStart /* = nullptr */) {
+    if (!mThreadsRun.load()) return false;
+    std::lock_guard<std::recursive_mutex> lockGuard(mWakelockMutex);
     if (mWakelockRefCount == 0) {
-        acquire_wake_lock(PARTIAL_WAKE_LOCK, kWakeLockName);
+        acquire_wake_lock(PARTIAL_WAKE_LOCK, kWakelockName);
+        mWakelockCV.notify_one();
     }
-    mWakelockRefCount++;
+    mWakelockTimeoutStartTime = getTimeNow();
+    mWakelockRefCount += delta;
+    if (timeoutStart != nullptr) {
+        *timeoutStart = mWakelockTimeoutStartTime;
+    }
+    return true;
 }
 
-void HalProxy::decrementRefCountAndMaybeReleaseWakelock() {
-    std::lock_guard<std::mutex> lockGuard(mWakelockRefCountMutex);
-    mWakelockRefCount--;
+void HalProxy::decrementRefCountAndMaybeReleaseWakelock(size_t delta,
+                                                        int64_t timeoutStart /* = -1 */) {
+    if (!mThreadsRun.load()) return;
+    std::lock_guard<std::recursive_mutex> lockGuard(mWakelockMutex);
+    if (timeoutStart == -1) timeoutStart = mWakelockTimeoutResetTime;
+    if (mWakelockRefCount == 0 || timeoutStart < mWakelockTimeoutResetTime) return;
+    mWakelockRefCount -= std::min(mWakelockRefCount, delta);
     if (mWakelockRefCount == 0) {
-        release_wake_lock(kWakeLockName);
+        release_wake_lock(kWakelockName);
     }
 }
 
@@ -427,6 +498,17 @@
     return mSubHalList[static_cast<size_t>(sensorHandle >> 24)];
 }
 
+size_t HalProxy::countNumWakeupEvents(const std::vector<Event>& events, size_t n) {
+    size_t numWakeupEvents = 0;
+    for (size_t i = 0; i < n; i++) {
+        int32_t sensorHandle = events[i].sensorHandle;
+        if (mSensors[sensorHandle].flags & static_cast<uint32_t>(V1_0::SensorFlagBits::WAKE_UP)) {
+            numWakeupEvents++;
+        }
+    }
+    return numWakeupEvents;
+}
+
 uint32_t HalProxy::clearSubHalIndex(uint32_t sensorHandle) {
     return sensorHandle & (~kSensorHandleSubHalIndexMask);
 }
@@ -436,7 +518,7 @@
 }
 
 void HalProxyCallback::postEvents(const std::vector<Event>& events, ScopedWakelock wakelock) {
-    (void)wakelock;
+    if (events.empty() || !mHalProxy->areThreadsRunning()) return;
     size_t numWakeupEvents;
     std::vector<Event> processedEvents = processEvents(events, &numWakeupEvents);
     if (numWakeupEvents > 0) {
@@ -450,8 +532,7 @@
                     " w/ index %zu.",
                     mSubHalIndex);
     }
-
-    mHalProxy->postEventsToMessageQueue(processedEvents);
+    mHalProxy->postEventsToMessageQueue(events, numWakeupEvents, std::move(wakelock));
 }
 
 ScopedWakelock HalProxyCallback::createScopedWakelock(bool lock) {
@@ -461,13 +542,13 @@
 
 std::vector<Event> HalProxyCallback::processEvents(const std::vector<Event>& events,
                                                    size_t* numWakeupEvents) const {
-    std::vector<Event> eventsOut;
     *numWakeupEvents = 0;
+    std::vector<Event> eventsOut;
     for (Event event : events) {
         event.sensorHandle = setSubHalIndex(event.sensorHandle, mSubHalIndex);
         eventsOut.push_back(event);
-        if ((mHalProxy->getSensorInfo(event.sensorHandle).flags & V1_0::SensorFlagBits::WAKE_UP) !=
-            0) {
+        const SensorInfo& sensor = mHalProxy->getSensorInfo(event.sensorHandle);
+        if ((sensor.flags & V1_0::SensorFlagBits::WAKE_UP) != 0) {
             (*numWakeupEvents)++;
         }
     }