Sensor Batching Bug fixes.
i) SensorService dropping events. Increase SOCKET_BUFFER_SIZE in BitTube ctor.
ii) Call flush before every activate.
iii) Emulate flush for older devices. Add a trivial flush complete event when flush is called.
Bug: 10641596
Change-Id: I30d0f3948e830457143f16e157b6ad81908687ce
diff --git a/services/sensorservice/SensorService.cpp b/services/sensorservice/SensorService.cpp
index cb99531..b26e572 100644
--- a/services/sensorservice/SensorService.cpp
+++ b/services/sensorservice/SensorService.cpp
@@ -148,6 +148,21 @@
// debugging sensor list
mUserSensorListDebug = mSensorList;
+ mSocketBufferSize = SOCKET_BUFFER_SIZE_NON_BATCHED;
+ FILE *fp = fopen("/proc/sys/net/core/wmem_max", "r");
+ char line[128];
+ if (fp != NULL && fgets(line, sizeof(line), fp) != NULL) {
+ line[sizeof(line) - 1] = '\0';
+ sscanf(line, "%u", &mSocketBufferSize);
+ if (mSocketBufferSize > MAX_SOCKET_BUFFER_SIZE_BATCHED) {
+ mSocketBufferSize = MAX_SOCKET_BUFFER_SIZE_BATCHED;
+ }
+ }
+ ALOGD("Max socket buffer size %u", mSocketBufferSize);
+ if (fp) {
+ fclose(fp);
+ }
+
run("SensorService", PRIORITY_URGENT_DISPLAY);
mInitCheck = NO_ERROR;
}
@@ -255,7 +270,6 @@
SensorFusion::getInstance().dump(result);
SensorDevice::getInstance().dump(result);
- result.appendFormat("%d active connections\n", mActiveConnections.size());
result.append("Active sensors:\n");
for (size_t i=0 ; i<mActiveSensors.size() ; i++) {
int handle = mActiveSensors.keyAt(i);
@@ -264,6 +278,17 @@
handle,
mActiveSensors.valueAt(i)->getNumConnections());
}
+
+ result.appendFormat("%u Max Socket Buffer size\n", mSocketBufferSize);
+ result.appendFormat("%d active connections\n", mActiveConnections.size());
+
+ for (size_t i=0 ; i < mActiveConnections.size() ; i++) {
+ sp<SensorEventConnection> connection(mActiveConnections[i].promote());
+ if (connection != 0) {
+ result.appendFormat("Connection Number: %d \n", i);
+ connection->dump(result);
+ }
+ }
}
write(fd, result.string(), result.size());
return NO_ERROR;
@@ -404,7 +429,6 @@
sensors_event_t const * buffer, size_t count)
{
Mutex::Autolock _l(mLock);
-
// record the last event for each sensor
int32_t prev = buffer[0].sensor;
for (size_t i=1 ; i<count ; i++) {
@@ -565,6 +589,15 @@
status_t err = sensor->batch(connection.get(), handle, reservedFlags, samplingPeriodNs,
maxBatchReportLatencyNs);
+ if (err == NO_ERROR) {
+ connection->setFirstFlushPending(handle, true);
+ status_t err_flush = sensor->flush(connection.get(), handle);
+ // Flush may return error if the sensor is not activated or the underlying h/w sensor does
+ // not support flush.
+ if (err_flush != NO_ERROR) {
+ connection->setFirstFlushPending(handle, false);
+ }
+ }
if (err == NO_ERROR) {
ALOGD_IF(DEBUG_CONNECTIONS, "Calling activate on %d", handle);
@@ -649,6 +682,21 @@
if (sensor == NULL) {
return BAD_VALUE;
}
+ SensorDevice& dev(SensorDevice::getInstance());
+
+ if (dev.getHalDeviceVersion() < SENSORS_DEVICE_API_VERSION_1_1) {
+ // For older devices increment pending flush count, which will send a trivial flush complete
+ // event for all the connections which are registered for updates on this sensor.
+ const SortedVector< wp<SensorEventConnection> > activeConnections(
+ getActiveConnections());
+ for (size_t i=0 ; i<activeConnections.size() ; i++) {
+ sp<SensorEventConnection> connection(activeConnections[i].promote());
+ if (connection != 0) {
+ connection->incrementPendingFlushCount(handle);
+ }
+ }
+ return NO_ERROR;
+ }
return sensor->flush(connection.get(), handle);
}
// ---------------------------------------------------------------------------
@@ -683,8 +731,15 @@
SensorService::SensorEventConnection::SensorEventConnection(
const sp<SensorService>& service, uid_t uid)
- : mService(service), mChannel(new BitTube()), mUid(uid)
+ : mService(service), mUid(uid)
{
+ const SensorDevice& device(SensorDevice::getInstance());
+ if (device.getHalDeviceVersion() >= SENSORS_DEVICE_API_VERSION_1_1) {
+ // Increase socket buffer size to 1MB for batching capabilities.
+ mChannel = new BitTube(service->mSocketBufferSize);
+ } else {
+ mChannel = new BitTube(SOCKET_BUFFER_SIZE_NON_BATCHED);
+ }
}
SensorService::SensorEventConnection::~SensorEventConnection()
@@ -697,10 +752,22 @@
{
}
+void SensorService::SensorEventConnection::dump(String8& result) {
+ Mutex::Autolock _l(mConnectionLock);
+ for (size_t i = 0; i < mSensorInfo.size(); ++i) {
+ const FlushInfo& flushInfo = mSensorInfo.valueAt(i);
+ result.appendFormat("\t %s | status: %s | pending flush events %d\n",
+ mService->getSensorName(mSensorInfo.keyAt(i)).string(),
+ flushInfo.mFirstFlushPending ? "First flush pending" :
+ "active",
+ flushInfo.mPendingFlushEventsToSend);
+ }
+}
+
bool SensorService::SensorEventConnection::addSensor(int32_t handle) {
Mutex::Autolock _l(mConnectionLock);
- if (mSensorInfo.indexOf(handle) < 0) {
- mSensorInfo.add(handle);
+ if (mSensorInfo.indexOfKey(handle) < 0) {
+ mSensorInfo.add(handle, FlushInfo());
return true;
}
return false;
@@ -708,7 +775,7 @@
bool SensorService::SensorEventConnection::removeSensor(int32_t handle) {
Mutex::Autolock _l(mConnectionLock);
- if (mSensorInfo.remove(handle) >= 0) {
+ if (mSensorInfo.removeItem(handle) >= 0) {
return true;
}
return false;
@@ -716,7 +783,7 @@
bool SensorService::SensorEventConnection::hasSensor(int32_t handle) const {
Mutex::Autolock _l(mConnectionLock);
- return mSensorInfo.indexOf(handle) >= 0;
+ return mSensorInfo.indexOfKey(handle) >= 0;
}
bool SensorService::SensorEventConnection::hasAnySensor() const {
@@ -724,12 +791,32 @@
return mSensorInfo.size() ? true : false;
}
+void SensorService::SensorEventConnection::setFirstFlushPending(int32_t handle,
+ bool value) {
+ Mutex::Autolock _l(mConnectionLock);
+ ssize_t index = mSensorInfo.indexOfKey(handle);
+ if (index >= 0) {
+ FlushInfo& flushInfo = mSensorInfo.editValueAt(index);
+ flushInfo.mFirstFlushPending = value;
+ }
+}
+
+void SensorService::SensorEventConnection::incrementPendingFlushCount(int32_t handle) {
+ Mutex::Autolock _l(mConnectionLock);
+ ssize_t index = mSensorInfo.indexOfKey(handle);
+ if (index >= 0) {
+ FlushInfo& flushInfo = mSensorInfo.editValueAt(index);
+ flushInfo.mPendingFlushEventsToSend++;
+ }
+}
+
status_t SensorService::SensorEventConnection::sendEvents(
sensors_event_t const* buffer, size_t numEvents,
sensors_event_t* scratch)
{
// filter out events not for this connection
size_t count = 0;
+
if (scratch) {
Mutex::Autolock _l(mConnectionLock);
size_t i=0;
@@ -742,10 +829,18 @@
// filtered correctly. buffer[i].sensor is zero for meta_data events.
curr = buffer[i].meta_data.sensor;
}
- if (mSensorInfo.indexOf(curr) >= 0) {
+ ssize_t index = mSensorInfo.indexOfKey(curr);
+ if (index >= 0 && mSensorInfo[index].mFirstFlushPending == true &&
+ buffer[i].type == SENSOR_TYPE_META_DATA) {
+ // This is the first flush before activate is called. Events can now be sent for
+ // this sensor on this connection.
+ ALOGD_IF(DEBUG_CONNECTIONS, "First flush event for sensor==%d ",
+ buffer[i].meta_data.sensor);
+ mSensorInfo.editValueAt(index).mFirstFlushPending = false;
+ }
+ if (index >= 0 && mSensorInfo[index].mFirstFlushPending == false) {
do {
- scratch[count] = buffer[i];
- ++count; ++i;
+ scratch[count++] = buffer[i++];
} while ((i<numEvents) && ((buffer[i].sensor == curr) ||
(buffer[i].type == SENSOR_TYPE_META_DATA &&
buffer[i].meta_data.sensor == curr)));
@@ -758,19 +853,62 @@
count = numEvents;
}
+ // Send pending flush events (if any) before sending events from the cache.
+ {
+ ASensorEvent flushCompleteEvent;
+ flushCompleteEvent.type = SENSOR_TYPE_META_DATA;
+ flushCompleteEvent.sensor = 0;
+ Mutex::Autolock _l(mConnectionLock);
+ // Loop through all the sensors for this connection and check if there are any pending
+ // flush complete events to be sent.
+ for (size_t i = 0; i < mSensorInfo.size(); ++i) {
+ FlushInfo& flushInfo = mSensorInfo.editValueAt(i);
+ while (flushInfo.mPendingFlushEventsToSend > 0) {
+ flushCompleteEvent.meta_data.sensor = mSensorInfo.keyAt(i);
+ ssize_t size = SensorEventQueue::write(mChannel, &flushCompleteEvent, 1);
+ if (size < 0) {
+ // ALOGW("dropping %d events on the floor", count);
+ countFlushCompleteEvents(scratch, count);
+ return size;
+ }
+ ALOGD_IF(DEBUG_CONNECTIONS, "sent dropped flush complete event==%d ",
+ flushCompleteEvent.meta_data.sensor);
+ flushInfo.mPendingFlushEventsToSend--;
+ }
+ }
+ }
+
// NOTE: ASensorEvent and sensors_event_t are the same type
ssize_t size = SensorEventQueue::write(mChannel,
reinterpret_cast<ASensorEvent const*>(scratch), count);
if (size == -EAGAIN) {
// the destination doesn't accept events anymore, it's probably
// full. For now, we just drop the events on the floor.
- //ALOGW("dropping %d events on the floor", count);
+ // ALOGW("dropping %d events on the floor", count);
+ countFlushCompleteEvents(scratch, count);
return size;
}
return size < 0 ? status_t(size) : status_t(NO_ERROR);
}
+void SensorService::SensorEventConnection::countFlushCompleteEvents(
+ sensors_event_t* scratch, const int numEventsDropped) {
+ ALOGD_IF(DEBUG_CONNECTIONS, "dropping %d events ", numEventsDropped);
+ Mutex::Autolock _l(mConnectionLock);
+ // Count flushComplete events in the events that are about to the dropped. These will be sent
+ // separately before the next batch of events.
+ for (int j = 0; j < numEventsDropped; ++j) {
+ if (scratch[j].type == SENSOR_TYPE_META_DATA) {
+ FlushInfo& flushInfo = mSensorInfo.editValueFor(scratch[j].meta_data.sensor);
+ flushInfo.mPendingFlushEventsToSend++;
+ ALOGD_IF(DEBUG_CONNECTIONS, "increment pendingFlushCount %d",
+ flushInfo.mPendingFlushEventsToSend);
+ }
+ }
+ return;
+}
+
sp<BitTube> SensorService::SensorEventConnection::getSensorChannel() const
{
return mChannel;
@@ -799,6 +937,7 @@
status_t SensorService::SensorEventConnection::flushSensor(int handle) {
return mService->flushSensor(this, handle);
}
+
// ---------------------------------------------------------------------------
}; // namespace android
diff --git a/services/sensorservice/SensorService.h b/services/sensorservice/SensorService.h
index 6267dd1..2311bff 100644
--- a/services/sensorservice/SensorService.h
+++ b/services/sensorservice/SensorService.h
@@ -38,6 +38,9 @@
// ---------------------------------------------------------------------------
#define DEBUG_CONNECTIONS false
+// Max size is 1 MB which is enough to accept a batch of about 10k events.
+#define MAX_SOCKET_BUFFER_SIZE_BATCHED 1024 * 1024
+#define SOCKET_BUFFER_SIZE_NON_BATCHED 4 * 1024
struct sensors_poll_device_t;
struct sensors_module_t;
@@ -77,14 +80,27 @@
nsecs_t maxBatchReportLatencyNs, int reservedFlags);
virtual status_t setEventRate(int handle, nsecs_t samplingPeriodNs);
virtual status_t flushSensor(int handle);
+ // Count the number of flush complete events which are about to be dropped in the buffer.
+ // Increment mPendingFlushEventsToSend in mSensorInfo. These flush complete events will be
+ // sent separately before the next batch of events.
+ void countFlushCompleteEvents(sensors_event_t* scratch, int numEventsDropped);
sp<SensorService> const mService;
- sp<BitTube> const mChannel;
+ sp<BitTube> mChannel;
uid_t mUid;
mutable Mutex mConnectionLock;
- // protected by SensorService::mLock
- SortedVector<int> mSensorInfo;
+ struct FlushInfo {
+ // The number of flush complete events dropped for this sensor is stored here.
+ // They are sent separately before the next batch of events.
+ int mPendingFlushEventsToSend;
+ // Every activate is preceded by a flush. Only after the first flush complete is
+ // received, the events for the sensor are sent on that *connection*.
+ bool mFirstFlushPending;
+ FlushInfo() : mPendingFlushEventsToSend(0), mFirstFlushPending(false) {}
+ };
+ // protected by SensorService::mLock. Key for this vector is the sensor handle.
+ KeyedVector<int, FlushInfo> mSensorInfo;
public:
SensorEventConnection(const sp<SensorService>& service, uid_t uid);
@@ -95,6 +111,9 @@
bool hasAnySensor() const;
bool addSensor(int32_t handle);
bool removeSensor(int32_t handle);
+ void setFirstFlushPending(int32_t handle, bool value);
+ void incrementPendingFlushCount(int32_t handle);
+ void dump(String8& result);
uid_t getUid() const { return mUid; }
};
@@ -130,6 +149,7 @@
DefaultKeyedVector<int, SensorInterface*> mSensorMap;
Vector<SensorInterface *> mVirtualSensorList;
status_t mInitCheck;
+ size_t mSocketBufferSize;
// protected by mLock
mutable Mutex mLock;