MultiHal multithreaded polling
Change-Id: I3ebe380169eed1c8deeca2860d1788be6c14837e
diff --git a/modules/sensors/multihal.cpp b/modules/sensors/multihal.cpp
index 45099b5..52588c7 100644
--- a/modules/sensors/multihal.cpp
+++ b/modules/sensors/multihal.cpp
@@ -30,6 +30,7 @@
#include <stdio.h>
#include <dlfcn.h>
+#include <SensorEventQueue.h>
// comment out to disable debug-level logging
#define LOG_NDEBUG 0
@@ -41,6 +42,13 @@
static pthread_mutex_t init_modules_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t init_sensors_mutex = PTHREAD_MUTEX_INITIALIZER;
+// This mutex is shared by all queues
+static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+// Used to pause the multihal poll(). Broadcasted by sub-polling tasks if waiting_for_data.
+static pthread_cond_t data_available_cond = PTHREAD_COND_INITIALIZER;
+bool waiting_for_data = false;
+
/*
* Vector of sub modules, whose indexes are referred to ni this file as module_index.
*/
@@ -65,7 +73,7 @@
return localHandle < that.localHandle;
}
- bool operator=(const FullHandle &that) const {
+ bool operator==(const FullHandle &that) const {
return moduleIndex == that.moduleIndex && localHandle == that.localHandle;
}
};
@@ -75,13 +83,12 @@
int next_global_handle = 1;
static int assign_global_handle(int module_index, int local_handle) {
- ALOGD("assign_global_handle %d %d", module_index, local_handle);
int global_handle = next_global_handle++;
- FullHandle *full_handle = new FullHandle();
- full_handle->moduleIndex = module_index;
- full_handle->localHandle = local_handle;
- full_to_global[*full_handle] = global_handle;
- global_to_full[global_handle] = *full_handle;
+ FullHandle full_handle;
+ full_handle.moduleIndex = module_index;
+ full_handle.localHandle = local_handle;
+ full_to_global[full_handle] = global_handle;
+ global_to_full[global_handle] = full_handle;
return global_handle;
}
@@ -90,12 +97,53 @@
}
static int get_module_index(int global_handle) {
- ALOGD("get_module_index %d", global_handle);
+ ALOGD("get_module_index for global_handle %d", global_handle);
FullHandle f = global_to_full[global_handle];
ALOGD("FullHandle moduleIndex %d, localHandle %d", f.moduleIndex, f.localHandle);
return f.moduleIndex;
}
+static const int SENSOR_EVENT_QUEUE_CAPACITY = 20;
+
+struct TaskContext {
+ sensors_poll_device_t* device;
+ SensorEventQueue* queue;
+};
+
+void *writerTask(void* ptr) {
+ ALOGD("writerTask STARTS");
+ TaskContext* ctx = (TaskContext*)ptr;
+ sensors_poll_device_t* device = ctx->device;
+ SensorEventQueue* queue = ctx->queue;
+ sensors_event_t* buffer;
+ int eventsPolled;
+ while (1) {
+ ALOGD("writerTask before lock 1");
+ pthread_mutex_lock(&queue_mutex);
+ ALOGD("writerTask before waitForSpace");
+ queue->waitForSpace(&queue_mutex);
+ ALOGD("writerTask after waitForSpace");
+ int bufferSize = queue->getWritableRegion(SENSOR_EVENT_QUEUE_CAPACITY, &buffer);
+ // Do blocking poll outside of lock
+ pthread_mutex_unlock(&queue_mutex);
+
+ ALOGD("writerTask before poll() - bufferSize = %d", bufferSize);
+ eventsPolled = device->poll(device, buffer, bufferSize);
+ ALOGD("writerTask poll() got %d events.", eventsPolled);
+
+ ALOGD("writerTask before lock 2");
+ pthread_mutex_lock(&queue_mutex);
+ queue->markAsWritten(eventsPolled);
+ ALOGD("writerTask wrote %d events", eventsPolled);
+ if (waiting_for_data) {
+ ALOGD("writerTask - broadcast data_available_cond");
+ pthread_cond_broadcast(&data_available_cond);
+ }
+ pthread_mutex_unlock(&queue_mutex);
+ }
+ // never actually returns
+ return NULL;
+}
/*
* Cache of all sensors, with original handles replaced by global handles.
@@ -124,15 +172,31 @@
int close();
std::vector<hw_device_t*> sub_hw_devices;
+ std::vector<SensorEventQueue*> queues;
+ std::vector<pthread_t> threads;
+ int nextReadIndex;
sensors_poll_device_t* get_v0_device_by_handle(int global_handle);
sensors_poll_device_1_t* get_v1_device_by_handle(int global_handle);
int get_device_version_by_handle(int global_handle);
+
+ void copy_event_remap_handle(sensors_event_t* src, sensors_event_t* dest, int sub_index);
};
void sensors_poll_context_t::addSubHwDevice(struct hw_device_t* sub_hw_device) {
ALOGD("addSubHwDevice");
this->sub_hw_devices.push_back(sub_hw_device);
+
+ SensorEventQueue *queue = new SensorEventQueue(SENSOR_EVENT_QUEUE_CAPACITY);
+ this->queues.push_back(queue);
+
+ TaskContext* taskContext = new TaskContext();
+ taskContext->device = (sensors_poll_device_t*) sub_hw_device;
+ taskContext->queue = queue;
+
+ pthread_t writerThread;
+ pthread_create(&writerThread, NULL, writerTask, taskContext);
+ this->threads.push_back(writerThread);
}
sensors_poll_device_t* sensors_poll_context_t::get_v0_device_by_handle(int handle) {
@@ -168,34 +232,60 @@
return retval;
}
-int sensors_poll_context_t::poll(sensors_event_t *data, int count) {
- ALOGD("poll");
-
- // This only gets the first device. Parallel polling of multiple devices is coming soon.
- int sub_index = 0;
- sensors_poll_device_t* v0 = (sensors_poll_device_t*) this->sub_hw_devices[sub_index];
-
- ALOGD("poll's blocking read begins...");
- int retval = v0->poll(v0, data, count);
- ALOGD("...poll's blocking read ends");
- ALOGD("rewriting %d sensor handles...", retval);
- // A normal event's "sensor" field is a local handles. Convert it to a global handle.
+void sensors_poll_context_t::copy_event_remap_handle(sensors_event_t* dest, sensors_event_t* src,
+ int sub_index) {
+ memcpy(dest, src, sizeof(struct sensors_event_t));
+ // A normal event's "sensor" field is a local handle. Convert it to a global handle.
// A meta-data event must have its sensor set to 0, but it has a nested event
// with a local handle that needs to be converted to a global handle.
FullHandle full_handle;
full_handle.moduleIndex = sub_index;
- for (int i = 0; i < retval; i++) {
- sensors_event_t *event = &data[i];
- // If it's a metadata event, rewrite the inner payload, not the sensor field.
- if (event->type == SENSOR_TYPE_META_DATA) {
- full_handle.localHandle = event->meta_data.sensor;
- event->meta_data.sensor = full_to_global[full_handle];
- } else {
- full_handle.localHandle = event->sensor;
- event->sensor = full_to_global[full_handle];
+ // If it's a metadata event, rewrite the inner payload, not the sensor field.
+ if (dest->type == SENSOR_TYPE_META_DATA) {
+ full_handle.localHandle = dest->meta_data.sensor;
+ dest->meta_data.sensor = full_to_global[full_handle];
+ } else {
+ full_handle.localHandle = dest->sensor;
+ dest->sensor = full_to_global[full_handle];
+ }
+}
+
+int sensors_poll_context_t::poll(sensors_event_t *data, int maxReads) {
+ ALOGD("poll");
+ int empties = 0;
+ int queueCount = (int)this->queues.size();
+ int eventsRead = 0;
+
+ pthread_mutex_lock(&queue_mutex);
+ while (eventsRead == 0) {
+ while (empties < queueCount && eventsRead < maxReads) {
+ SensorEventQueue* queue = this->queues.at(this->nextReadIndex);
+ ALOGD("queue size: %d", queue->getSize());
+ sensors_event_t* event = queue->peek();
+ if (event == NULL) {
+ empties++;
+ } else {
+ empties = 0;
+ this->copy_event_remap_handle(&data[eventsRead++], event, nextReadIndex);
+ queue->dequeue();
+ }
+ this->nextReadIndex = (this->nextReadIndex + 1) % queueCount;
+ }
+ if (eventsRead == 0) {
+ // The queues have been scanned and none contain data.
+ // Wait for any of them to signal that there's data.
+ ALOGD("poll stopping to wait for data");
+ waiting_for_data = true;
+ pthread_cond_wait(&data_available_cond, &queue_mutex);
+ waiting_for_data = false;
+ empties = 0;
+ ALOGD("poll done waiting for data");
}
}
- return retval;
+ pthread_mutex_unlock(&queue_mutex);
+ ALOGD("...poll's blocking read ends. Returning %d events.", eventsRead);
+
+ return eventsRead;
}
int sensors_poll_context_t::batch(int handle, int flags, int64_t period_ns, int64_t timeout) {
@@ -436,9 +526,13 @@
static int module__get_sensors_list(struct sensors_module_t* module,
struct sensor_t const** list) {
- ALOGD("module__get_sensors_list");
+ ALOGD("module__get_sensors_list start");
lazy_init_sensors_list();
*list = global_sensors_list;
+ ALOGD("global_sensors_count: %d", global_sensors_count);
+ for (int i = 0; i < global_sensors_count; i++) {
+ ALOGD("sensor type: %d", global_sensors_list[i].type);
+ }
return global_sensors_count;
}
@@ -480,6 +574,8 @@
dev->proxy_device.batch = device__batch;
dev->proxy_device.flush = device__flush;
+ dev->nextReadIndex = 0;
+
// Open() the subhal modules. Remember their devices in a vector parallel to sub_hw_modules.
for (std::vector<hw_module_t*>::iterator it = sub_hw_modules->begin();
it != sub_hw_modules->end(); it++) {