MH2 | Implement pending writes thread
Spin up a background thread inside the hal proxy that is responsible for
writing events to the event fmq if a previous write failed on the normal
postEvents thread. Create several new unit tests that help test the new
functionality.
Bug: 136511617
Test: Unit tests passing.
Change-Id: Ic35c9736fc0402297ab50072c195f66c9feb887d
diff --git a/sensors/2.0/multihal/HalProxy.cpp b/sensors/2.0/multihal/HalProxy.cpp
index 81d1b64..ccd6e66 100644
--- a/sensors/2.0/multihal/HalProxy.cpp
+++ b/sensors/2.0/multihal/HalProxy.cpp
@@ -49,8 +49,15 @@
}
HalProxy::~HalProxy() {
- // TODO: Join any running threads and clean up FMQs and any other allocated
- // state.
+ {
+ std::lock_guard<std::mutex> lockGuard(mEventQueueWriteMutex);
+ mPendingWritesRun = false;
+ mEventQueueWriteCV.notify_one();
+ }
+ if (mPendingWritesThread.joinable()) {
+ mPendingWritesThread.join();
+ }
+ // TODO: Cleanup wakeup thread once it is implemented
}
Return<void> HalProxy::getSensorsList(getSensorsList_cb _hidl_cb) {
@@ -120,7 +127,8 @@
result = Result::BAD_VALUE;
}
- // TODO: start threads to read wake locks and process events from sub HALs.
+ mPendingWritesThread = std::thread(startPendingWritesThread, this);
+ // TODO: start threads to read wake locks.
for (size_t i = 0; i < mSubHalList.size(); i++) {
auto subHal = mSubHalList[i];
@@ -257,21 +265,66 @@
initializeSensorList();
}
+void HalProxy::startPendingWritesThread(HalProxy* halProxy) {
+ halProxy->handlePendingWrites();
+}
+
+void HalProxy::handlePendingWrites() {
+ // TODO: Find a way to optimize locking strategy maybe using two mutexes instead of one.
+ std::unique_lock<std::mutex> lock(mEventQueueWriteMutex);
+ while (mPendingWritesRun) {
+ mEventQueueWriteCV.wait(
+ lock, [&] { return !mPendingWriteEventsQueue.empty() || !mPendingWritesRun; });
+ if (!mPendingWriteEventsQueue.empty() && mPendingWritesRun) {
+ std::vector<Event>& pendingWriteEvents = mPendingWriteEventsQueue.front();
+ size_t eventQueueSize = mEventQueue->getQuantumCount();
+ size_t numToWrite = std::min(pendingWriteEvents.size(), eventQueueSize);
+ lock.unlock();
+ // TODO: Find a way to interrup writeBlocking if the thread should exit
+ // so we don't have to wait for timeout on framework restarts.
+ if (!mEventQueue->writeBlocking(
+ pendingWriteEvents.data(), numToWrite,
+ static_cast<uint32_t>(EventQueueFlagBits::EVENTS_READ),
+ static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS),
+ kWakelockTimeoutNs, mEventQueueFlag)) {
+ ALOGE("Dropping %zu events after blockingWrite failed.", numToWrite);
+ } else {
+ mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
+ }
+ lock.lock();
+ if (pendingWriteEvents.size() > eventQueueSize) {
+ // TODO: Check if this erase operation is too inefficient. It will copy all the
+ // events ahead of it down to fill gap off array at front after the erase.
+ pendingWriteEvents.erase(pendingWriteEvents.begin(),
+ pendingWriteEvents.begin() + eventQueueSize);
+ } else {
+ mPendingWriteEventsQueue.pop();
+ }
+ }
+ }
+}
+
void HalProxy::postEventsToMessageQueue(const std::vector<Event>& events) {
- std::lock_guard<std::mutex> lock(mEventQueueMutex);
- size_t numToWrite = std::min(events.size(), mEventQueue->availableToWrite());
- if (numToWrite > 0) {
- if (mEventQueue->write(events.data(), numToWrite)) {
- // TODO: While loop if mEventQueue->avaiableToWrite > 0 to possibly fit in more writes
- // immediately
- mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
- } else {
- numToWrite = 0;
+ size_t numToWrite = 0;
+ std::lock_guard<std::mutex> lock(mEventQueueWriteMutex);
+ if (mPendingWriteEventsQueue.empty()) {
+ numToWrite = std::min(events.size(), mEventQueue->availableToWrite());
+ if (numToWrite > 0) {
+ if (mEventQueue->write(events.data(), numToWrite)) {
+ // TODO: While loop if mEventQueue->avaiableToWrite > 0 to possibly fit in more
+ // writes immediately
+ mEventQueueFlag->wake(static_cast<uint32_t>(EventQueueFlagBits::READ_AND_PROCESS));
+ } else {
+ numToWrite = 0;
+ }
}
}
if (numToWrite < events.size()) {
- // TODO: Post from events[numToWrite -> end] to background events queue
- // Signal background thread
+ // TODO: Bound the mPendingWriteEventsQueue so that we do not trigger OOMs if framework
+ // stalls
+ mPendingWriteEventsQueue.push(
+ std::vector<Event>(events.begin() + numToWrite, events.end()));
+ mEventQueueWriteCV.notify_one();
}
}