MultiHal multithreaded polling

Change-Id: I3ebe380169eed1c8deeca2860d1788be6c14837e
diff --git a/modules/sensors/SensorEventQueue.cpp b/modules/sensors/SensorEventQueue.cpp
index c139944..00013de 100644
--- a/modules/sensors/SensorEventQueue.cpp
+++ b/modules/sensors/SensorEventQueue.cpp
@@ -17,55 +17,28 @@
 #include <hardware/sensors.h>
 #include <algorithm>
 #include <pthread.h>
-
 #include <linux/input.h>
-#include <cutils/atomic.h>
 #include <cutils/log.h>
 
 #include "SensorEventQueue.h"
 
 SensorEventQueue::SensorEventQueue(int capacity) {
     mCapacity = capacity;
+
     mStart = 0;
     mSize = 0;
     mData = new sensors_event_t[mCapacity];
-    pthread_cond_init(&mDataAvailableCondition, NULL);
     pthread_cond_init(&mSpaceAvailableCondition, NULL);
-    pthread_mutex_init(&mMutex, NULL);
 }
 
 SensorEventQueue::~SensorEventQueue() {
     delete[] mData;
     mData = NULL;
-    pthread_cond_destroy(&mDataAvailableCondition);
     pthread_cond_destroy(&mSpaceAvailableCondition);
-    pthread_mutex_destroy(&mMutex);
-}
-
-void SensorEventQueue::lock() {
-    pthread_mutex_lock(&mMutex);
-}
-
-void SensorEventQueue::unlock() {
-    pthread_mutex_unlock(&mMutex);
-}
-
-void SensorEventQueue::waitForSpaceAndLock() {
-    lock();
-    while (mSize >= mCapacity) {
-        pthread_cond_wait(&mSpaceAvailableCondition, &mMutex);
-    }
-}
-
-void SensorEventQueue::waitForDataAndLock() {
-    lock();
-    while (mSize <= 0) {
-        pthread_cond_wait(&mDataAvailableCondition, &mMutex);
-    }
 }
 
 int SensorEventQueue::getWritableRegion(int requestedLength, sensors_event_t** out) {
-    if (mSize >= mCapacity || requestedLength <= 0) {
+    if (mSize == mCapacity || requestedLength <= 0) {
         *out = NULL;
         return 0;
     }
@@ -88,9 +61,6 @@
 
 void SensorEventQueue::markAsWritten(int count) {
     mSize += count;
-    if (mSize) {
-        pthread_cond_broadcast(&mDataAvailableCondition);
-    }
 }
 
 int SensorEventQueue::getSize() {
@@ -98,13 +68,21 @@
 }
 
 sensors_event_t* SensorEventQueue::peek() {
-    if (mSize <= 0) return NULL;
+    if (mSize == 0) return NULL;
     return &mData[mStart];
 }
 
 void SensorEventQueue::dequeue() {
-    if (mSize <= 0) return;
+    if (mSize == 0) return;
+    if (mSize == mCapacity) {
+        pthread_cond_broadcast(&mSpaceAvailableCondition);
+    }
     mSize--;
     mStart = (mStart + 1) % mCapacity;
-    pthread_cond_broadcast(&mSpaceAvailableCondition);
+}
+
+void SensorEventQueue::waitForSpace(pthread_mutex_t* mutex) {
+    while (mSize == mCapacity) {
+        pthread_cond_wait(&mSpaceAvailableCondition, mutex);
+    }
 }
diff --git a/modules/sensors/SensorEventQueue.h b/modules/sensors/SensorEventQueue.h
index fd833fa..969d018 100644
--- a/modules/sensors/SensorEventQueue.h
+++ b/modules/sensors/SensorEventQueue.h
@@ -35,17 +35,11 @@
     int mStart; // start of readable region
     int mSize; // number of readable items
     sensors_event_t* mData;
-    pthread_cond_t mDataAvailableCondition;
     pthread_cond_t mSpaceAvailableCondition;
-    pthread_mutex_t mMutex;
 
 public:
     SensorEventQueue(int capacity);
     ~SensorEventQueue();
-    void lock();
-    void unlock();
-    void waitForSpaceAndLock();
-    void waitForDataAndLock();
 
     // Returns length of region, between zero and min(capacity, requestedLength). If there is any
     // writable space, it will return a region of at least one. Because it must return
@@ -73,6 +67,9 @@
     // This will decrease the size by one, freeing up the oldest readable event's slot for writing.
     // Only call while holding the lock.
     void dequeue();
+
+    // Blocks until space is available. No-op if there is already space.
+    void waitForSpace(pthread_mutex_t* mutex);
 };
 
 #endif // SENSOREVENTQUEUE_H_
diff --git a/modules/sensors/multihal.cpp b/modules/sensors/multihal.cpp
index 45099b5..52588c7 100644
--- a/modules/sensors/multihal.cpp
+++ b/modules/sensors/multihal.cpp
@@ -30,6 +30,7 @@
 
 #include <stdio.h>
 #include <dlfcn.h>
+#include <SensorEventQueue.h>
 
 // comment out to disable debug-level logging
 #define LOG_NDEBUG 0
@@ -41,6 +42,13 @@
 static pthread_mutex_t init_modules_mutex = PTHREAD_MUTEX_INITIALIZER;
 static pthread_mutex_t init_sensors_mutex = PTHREAD_MUTEX_INITIALIZER;
 
+// This mutex is shared by all queues
+static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+// Used to pause the multihal poll(). Broadcasted by sub-polling tasks if waiting_for_data.
+static pthread_cond_t data_available_cond = PTHREAD_COND_INITIALIZER;
+bool waiting_for_data = false;
+
 /*
  * Vector of sub modules, whose indexes are referred to ni this file as module_index.
  */
@@ -65,7 +73,7 @@
         return localHandle < that.localHandle;
     }
 
-    bool operator=(const FullHandle &that) const {
+    bool operator==(const FullHandle &that) const {
         return moduleIndex == that.moduleIndex && localHandle == that.localHandle;
     }
 };
@@ -75,13 +83,12 @@
 int next_global_handle = 1;
 
 static int assign_global_handle(int module_index, int local_handle) {
-    ALOGD("assign_global_handle %d %d", module_index, local_handle);
     int global_handle = next_global_handle++;
-    FullHandle *full_handle = new FullHandle();
-    full_handle->moduleIndex = module_index;
-    full_handle->localHandle = local_handle;
-    full_to_global[*full_handle] = global_handle;
-    global_to_full[global_handle] = *full_handle;
+    FullHandle full_handle;
+    full_handle.moduleIndex = module_index;
+    full_handle.localHandle = local_handle;
+    full_to_global[full_handle] = global_handle;
+    global_to_full[global_handle] = full_handle;
     return global_handle;
 }
 
@@ -90,12 +97,53 @@
 }
 
 static int get_module_index(int global_handle) {
-    ALOGD("get_module_index %d", global_handle);
+    ALOGD("get_module_index for global_handle %d", global_handle);
     FullHandle f = global_to_full[global_handle];
     ALOGD("FullHandle moduleIndex %d, localHandle %d", f.moduleIndex, f.localHandle);
     return f.moduleIndex;
 }
 
+static const int SENSOR_EVENT_QUEUE_CAPACITY = 20;
+
+struct TaskContext {
+  sensors_poll_device_t* device;
+  SensorEventQueue* queue;
+};
+
+void *writerTask(void* ptr) {
+    ALOGD("writerTask STARTS");
+    TaskContext* ctx = (TaskContext*)ptr;
+    sensors_poll_device_t* device = ctx->device;
+    SensorEventQueue* queue = ctx->queue;
+    sensors_event_t* buffer;
+    int eventsPolled;
+    while (1) {
+        ALOGD("writerTask before lock 1");
+        pthread_mutex_lock(&queue_mutex);
+        ALOGD("writerTask before waitForSpace");
+        queue->waitForSpace(&queue_mutex);
+        ALOGD("writerTask after waitForSpace");
+        int bufferSize = queue->getWritableRegion(SENSOR_EVENT_QUEUE_CAPACITY, &buffer);
+        // Do blocking poll outside of lock
+        pthread_mutex_unlock(&queue_mutex);
+
+        ALOGD("writerTask before poll() - bufferSize = %d", bufferSize);
+        eventsPolled = device->poll(device, buffer, bufferSize);
+        ALOGD("writerTask poll() got %d events.", eventsPolled);
+
+        ALOGD("writerTask before lock 2");
+        pthread_mutex_lock(&queue_mutex);
+        queue->markAsWritten(eventsPolled);
+        ALOGD("writerTask wrote %d events", eventsPolled);
+        if (waiting_for_data) {
+            ALOGD("writerTask - broadcast data_available_cond");
+            pthread_cond_broadcast(&data_available_cond);
+        }
+        pthread_mutex_unlock(&queue_mutex);
+    }
+    // never actually returns
+    return NULL;
+}
 
 /*
  * Cache of all sensors, with original handles replaced by global handles.
@@ -124,15 +172,31 @@
     int close();
 
     std::vector<hw_device_t*> sub_hw_devices;
+    std::vector<SensorEventQueue*> queues;
+    std::vector<pthread_t> threads;
+    int nextReadIndex;
 
     sensors_poll_device_t* get_v0_device_by_handle(int global_handle);
     sensors_poll_device_1_t* get_v1_device_by_handle(int global_handle);
     int get_device_version_by_handle(int global_handle);
+
+    void copy_event_remap_handle(sensors_event_t* src, sensors_event_t* dest, int sub_index);
 };
 
 void sensors_poll_context_t::addSubHwDevice(struct hw_device_t* sub_hw_device) {
     ALOGD("addSubHwDevice");
     this->sub_hw_devices.push_back(sub_hw_device);
+
+    SensorEventQueue *queue = new SensorEventQueue(SENSOR_EVENT_QUEUE_CAPACITY);
+    this->queues.push_back(queue);
+
+    TaskContext* taskContext = new TaskContext();
+    taskContext->device = (sensors_poll_device_t*) sub_hw_device;
+    taskContext->queue = queue;
+
+    pthread_t writerThread;
+    pthread_create(&writerThread, NULL, writerTask, taskContext);
+    this->threads.push_back(writerThread);
 }
 
 sensors_poll_device_t* sensors_poll_context_t::get_v0_device_by_handle(int handle) {
@@ -168,34 +232,60 @@
     return retval;
 }
 
-int sensors_poll_context_t::poll(sensors_event_t *data, int count) {
-    ALOGD("poll");
-
-    // This only gets the first device. Parallel polling of multiple devices is coming soon.
-    int sub_index = 0;
-    sensors_poll_device_t* v0 = (sensors_poll_device_t*) this->sub_hw_devices[sub_index];
-
-    ALOGD("poll's blocking read begins...");
-    int retval = v0->poll(v0, data, count);
-    ALOGD("...poll's blocking read ends");
-    ALOGD("rewriting %d sensor handles...", retval);
-    // A normal event's "sensor" field is a local handles. Convert it to a global handle.
+void sensors_poll_context_t::copy_event_remap_handle(sensors_event_t* dest, sensors_event_t* src,
+        int sub_index) {
+    memcpy(dest, src, sizeof(struct sensors_event_t));
+    // A normal event's "sensor" field is a local handle. Convert it to a global handle.
     // A meta-data event must have its sensor set to 0, but it has a nested event
     // with a local handle that needs to be converted to a global handle.
     FullHandle full_handle;
     full_handle.moduleIndex = sub_index;
-    for (int i = 0; i < retval; i++) {
-        sensors_event_t *event = &data[i];
-        // If it's a metadata event, rewrite the inner payload, not the sensor field.
-        if (event->type == SENSOR_TYPE_META_DATA) {
-            full_handle.localHandle = event->meta_data.sensor;
-            event->meta_data.sensor = full_to_global[full_handle];
-        } else {
-            full_handle.localHandle = event->sensor;
-            event->sensor = full_to_global[full_handle];
+    // If it's a metadata event, rewrite the inner payload, not the sensor field.
+    if (dest->type == SENSOR_TYPE_META_DATA) {
+        full_handle.localHandle = dest->meta_data.sensor;
+        dest->meta_data.sensor = full_to_global[full_handle];
+    } else {
+        full_handle.localHandle = dest->sensor;
+        dest->sensor = full_to_global[full_handle];
+    }
+}
+
+int sensors_poll_context_t::poll(sensors_event_t *data, int maxReads) {
+    ALOGD("poll");
+    int empties = 0;
+    int queueCount = (int)this->queues.size();
+    int eventsRead = 0;
+
+    pthread_mutex_lock(&queue_mutex);
+    while (eventsRead == 0) {
+        while (empties < queueCount && eventsRead < maxReads) {
+            SensorEventQueue* queue = this->queues.at(this->nextReadIndex);
+            ALOGD("queue size: %d", queue->getSize());
+            sensors_event_t* event = queue->peek();
+            if (event == NULL) {
+                empties++;
+            } else {
+                empties = 0;
+                this->copy_event_remap_handle(&data[eventsRead++], event, nextReadIndex);
+                queue->dequeue();
+            }
+            this->nextReadIndex = (this->nextReadIndex + 1) % queueCount;
+        }
+        if (eventsRead == 0) {
+            // The queues have been scanned and none contain data.
+            // Wait for any of them to signal that there's data.
+            ALOGD("poll stopping to wait for data");
+            waiting_for_data = true;
+            pthread_cond_wait(&data_available_cond, &queue_mutex);
+            waiting_for_data = false;
+            empties = 0;
+            ALOGD("poll done waiting for data");
         }
     }
-    return retval;
+    pthread_mutex_unlock(&queue_mutex);
+    ALOGD("...poll's blocking read ends. Returning %d events.", eventsRead);
+
+    return eventsRead;
 }
 
 int sensors_poll_context_t::batch(int handle, int flags, int64_t period_ns, int64_t timeout) {
@@ -436,9 +526,13 @@
 
 static int module__get_sensors_list(struct sensors_module_t* module,
         struct sensor_t const** list) {
-    ALOGD("module__get_sensors_list");
+    ALOGD("module__get_sensors_list start");
     lazy_init_sensors_list();
     *list = global_sensors_list;
+    ALOGD("global_sensors_count: %d", global_sensors_count);
+    for (int i = 0; i < global_sensors_count; i++) {
+        ALOGD("sensor type: %d", global_sensors_list[i].type);
+    }
     return global_sensors_count;
 }
 
@@ -480,6 +574,8 @@
     dev->proxy_device.batch = device__batch;
     dev->proxy_device.flush = device__flush;
 
+    dev->nextReadIndex = 0;
+
     // Open() the subhal modules. Remember their devices in a vector parallel to sub_hw_modules.
     for (std::vector<hw_module_t*>::iterator it = sub_hw_modules->begin();
             it != sub_hw_modules->end(); it++) {
diff --git a/modules/sensors/tests/SensorEventQueue_test.cpp b/modules/sensors/tests/SensorEventQueue_test.cpp
index 3b89964..cbe4377 100644
--- a/modules/sensors/tests/SensorEventQueue_test.cpp
+++ b/modules/sensors/tests/SensorEventQueue_test.cpp
@@ -2,6 +2,8 @@
 #include <stdlib.h>
 #include <hardware/sensors.h>
 #include <pthread.h>
+#include <cutils/atomic.h>
+
 #include "SensorEventQueue.cpp"
 
 // Unit tests for the SensorEventQueue.
@@ -78,93 +80,9 @@
     return true;
 }
 
-static const int TTOQ_EVENT_COUNT = 10000;
-
-struct TaskContext {
-  bool success;
-  SensorEventQueue* queue;
-};
-
-void* writerTask(void* ptr) {
-    printf("writerTask starts\n");
-    TaskContext* ctx = (TaskContext*)ptr;
-    SensorEventQueue* queue = ctx->queue;
-    int totalWrites = 0;
-    sensors_event_t* buffer;
-    while (totalWrites < TTOQ_EVENT_COUNT) {
-        queue->waitForSpaceAndLock();
-        int writableSize = queue->getWritableRegion(rand() % 10 + 1, &buffer);
-        queue->unlock();
-        for (int i = 0; i < writableSize; i++) {
-            // serialize the events
-            buffer[i].timestamp = totalWrites++;
-        }
-        queue->lock();
-        queue->markAsWritten(writableSize);
-        queue->unlock();
-    }
-    printf("writerTask ends normally\n");
-    return NULL;
-}
-
-void* readerTask(void* ptr) {
-    printf("readerTask starts\n");
-    TaskContext* ctx = (TaskContext*)ptr;
-    SensorEventQueue* queue = ctx->queue;
-    int totalReads = 0;
-    while (totalReads < TTOQ_EVENT_COUNT) {
-        queue->waitForDataAndLock();
-        int maxReads = rand() % 20 + 1;
-        int reads = 0;
-        while (queue->getSize() && reads < maxReads) {
-            sensors_event_t* event = queue->peek();
-            if (totalReads != event->timestamp) {
-                printf("FAILURE: readerTask expected timestamp %d; actual was %d\n",
-                        totalReads, (int)(event->timestamp));
-                ctx->success = false;
-                return NULL;
-            }
-            queue->dequeue();
-            totalReads++;
-            reads++;
-        }
-        queue->unlock();
-    }
-    printf("readerTask ends normally\n");
-    return NULL;
-}
-
-
-// Create a short queue, and write and read a ton of data through it.
-// Write serial timestamps into the events, and expect to read them in the right order.
-bool testTwoThreadsOneQueue() {
-    printf("TEST testTwoThreadsOneQueue\n");
-    SensorEventQueue* queue = new SensorEventQueue(100);
-
-    TaskContext readerCtx;
-    readerCtx.success = true;
-    readerCtx.queue = queue;
-
-    TaskContext writerCtx;
-    writerCtx.success = true;
-    writerCtx.queue = queue;
-
-    pthread_t writer, reader;
-    pthread_create(&reader, NULL, readerTask, &readerCtx);
-    pthread_create(&writer, NULL, writerTask, &writerCtx);
-
-    pthread_join(writer, NULL);
-    pthread_join(reader, NULL);
-
-    printf("testTwoThreadsOneQueue done\n");
-    return readerCtx.success && writerCtx.success;
-}
-
-
 int main(int argc, char **argv) {
     if (testSimpleWriteSizeCounts() &&
-            testWrappingWriteSizeCounts() &&
-            testTwoThreadsOneQueue()) {
+            testWrappingWriteSizeCounts()) {
         printf("ALL PASSED\n");
     } else {
         printf("SOMETHING FAILED\n");