Support StatsD sending broadcasts.
StatsD will send a broadcast when we're 90% of the way to our
allocated memory limit for the configuration. If the memory usage
goes over the limit, we just lose all the data for this config.
Also modifies the adb shell commands to facilitate debugging of the
broadcasts.
Test: Manually tested on marlin-eng with custom gmscore code.
Change-Id: I517a15bd4c959aa221802f84a51f13141a725102
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index b764ce5..abd2a35 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -50,8 +50,8 @@
const int FIELD_ID_NAME = 2;
StatsLogProcessor::StatsLogProcessor(const sp<UidMap>& uidMap,
- const std::function<void(const vector<uint8_t>&)>& pushLog)
- : mUidMap(uidMap), mPushLog(pushLog) {
+ const std::function<void(const ConfigKey&)>& sendBroadcast)
+ : mUidMap(uidMap), mSendBroadcast(sendBroadcast) {
}
StatsLogProcessor::~StatsLogProcessor() {
@@ -102,12 +102,27 @@
}
}
-vector<uint8_t> StatsLogProcessor::onDumpReport(const ConfigKey& key) {
+size_t StatsLogProcessor::GetMetricsSize(const ConfigKey& key) {
auto it = mMetricsManagers.find(key);
if (it == mMetricsManagers.end()) {
ALOGW("Config source %s does not exist", key.ToString().c_str());
- return vector<uint8_t>();
+ return 0;
}
+ return it->second->byteSize();
+}
+
+void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outData) {
+ auto it = mMetricsManagers.find(key);
+ if (it == mMetricsManagers.end()) {
+ ALOGW("Config source %s does not exist", key.ToString().c_str());
+ return;
+ }
+
+ // This allows another broadcast to be sent within the rate-limit period if we get close to
+ // filling the buffer again soon.
+ mBroadcastTimesMutex.lock();
+ mLastBroadcastTimes.erase(key);
+ mBroadcastTimesMutex.unlock();
ProtoOutputStream proto;
@@ -131,17 +146,18 @@
uidMap.SerializeToArray(&uidMapBuffer[0], uidMapSize);
proto.write(FIELD_TYPE_MESSAGE | FIELD_ID_UID_MAP, uidMapBuffer, uidMapSize);
- vector<uint8_t> buffer(proto.size());
- size_t pos = 0;
- auto iter = proto.data();
- while (iter.readBuffer() != NULL) {
- size_t toRead = iter.currentToRead();
- std::memcpy(&buffer[pos], iter.readBuffer(), toRead);
- pos += toRead;
- iter.rp()->move(toRead);
+ if (outData != nullptr) {
+ outData->clear();
+ outData->resize(proto.size());
+ size_t pos = 0;
+ auto iter = proto.data();
+ while (iter.readBuffer() != NULL) {
+ size_t toRead = iter.currentToRead();
+ std::memcpy(&((*outData)[pos]), iter.readBuffer(), toRead);
+ pos += toRead;
+ iter.rp()->move(toRead);
+ }
}
-
- return buffer;
}
void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
@@ -151,42 +167,34 @@
mMetricsManagers.erase(it);
mUidMap->OnConfigRemoved(key);
}
- auto flushTime = mLastFlushTimes.find(key);
- if (flushTime != mLastFlushTimes.end()) {
- mLastFlushTimes.erase(flushTime);
- }
+
+ std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);
+ mLastBroadcastTimes.erase(key);
}
void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs,
const ConfigKey& key,
const unique_ptr<MetricsManager>& metricsManager) {
- auto lastFlushNs = mLastFlushTimes.find(key);
- if (lastFlushNs != mLastFlushTimes.end()) {
- if (timestampNs - lastFlushNs->second < kMinFlushPeriod) {
- return;
- }
- }
+ std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);
size_t totalBytes = metricsManager->byteSize();
- if (totalBytes > kMaxSerializedBytes) {
- flush();
- mLastFlushTimes[key] = std::move(timestampNs);
+ if (totalBytes > .9 * kMaxSerializedBytes) { // Send broadcast so that receivers can pull data.
+ auto lastFlushNs = mLastBroadcastTimes.find(key);
+ if (lastFlushNs != mLastBroadcastTimes.end()) {
+ if (timestampNs - lastFlushNs->second < kMinBroadcastPeriod) {
+ return;
+ }
+ }
+ mLastBroadcastTimes[key] = timestampNs;
+ ALOGD("StatsD requesting broadcast for %s", key.ToString().c_str());
+ mSendBroadcast(key);
+ } else if (totalBytes > kMaxSerializedBytes) { // Too late. We need to start clearing data.
+ // We ignore the return value so we force each metric producer to clear its contents.
+ metricsManager->onDumpReport();
+ ALOGD("StatsD had to toss out metrics for %s", key.ToString().c_str());
}
}
-void StatsLogProcessor::flush() {
- // TODO: Take ConfigKey as an argument and flush metrics related to the
- // ConfigKey. Also, create a wrapper that holds a repeated field of
- // StatsLogReport's.
- /*
- StatsLogReport logReport;
- const int numBytes = logReport.ByteSize();
- vector<uint8_t> logReportBuffer(numBytes);
- logReport.SerializeToArray(&logReportBuffer[0], numBytes);
- mPushLog(logReportBuffer);
- */
-}
-
} // namespace statsd
} // namespace os
} // namespace android