Only create ProtoOutputStream when onGetData() is called.
The exception is EventMetricProducer. Each EventMetricProducer will still have a ProtoOutputStream
Because LogEvent comes as a fixed 4K, it's more memory efficient to have an 8k ProtoOutputStream for
storing the events.
Also removed finish() api in MetricProducer, which was intended to use with Dropbox.
Test: statsd_test & dogfood app
Bug: 70393808
Change-Id: I2efe4ecc76a88060a9aa5eb49d1fa6ea60bc5da8
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index 2df0c90..3b5c236 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -111,9 +111,7 @@
unique_ptr<MetricsManager> newMetricsManager = std::make_unique<MetricsManager>(key, config);
auto it = mMetricsManagers.find(key);
- if (it != mMetricsManagers.end()) {
- it->second->finish();
- } else if (mMetricsManagers.size() > StatsdStats::kMaxConfigCount) {
+ if (it == mMetricsManagers.end() && mMetricsManagers.size() > StatsdStats::kMaxConfigCount) {
ALOGE("Can't accept more configs!");
return;
}
@@ -167,11 +165,7 @@
// First, fill in ConfigMetricsReport using current data on memory, which
// starts from filling in StatsLogReport's.
- for (auto& m : it->second->onDumpReport()) {
- // Add each vector of StatsLogReport into a repeated field.
- proto.write(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_METRICS,
- reinterpret_cast<char*>(m.get()->data()), m.get()->size());
- }
+ it->second->onDumpReport(&proto);
// Fill in UidMap.
auto uidMap = mUidMap->getOutput(key);
@@ -205,7 +199,6 @@
void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
auto it = mMetricsManagers.find(key);
if (it != mMetricsManagers.end()) {
- it->second->finish();
mMetricsManagers.erase(it);
mUidMap->OnConfigRemoved(key);
}
@@ -231,8 +224,9 @@
mLastByteSizeTimes[key] = timestampNs;
if (totalBytes >
StatsdStats::kMaxMetricsBytesPerConfig) { // Too late. We need to start clearing data.
- // We ignore the return value so we force each metric producer to clear its contents.
- metricsManager.onDumpReport();
+ // TODO(b/70571383): By 12/15/2017 add API to drop data directly
+ ProtoOutputStream proto;
+ metricsManager.onDumpReport(&proto);
StatsdStats::getInstance().noteDataDropped(key);
VLOG("StatsD had to toss out metrics for %s", key.ToString().c_str());
} else if (totalBytes > .9 * StatsdStats::kMaxMetricsBytesPerConfig) {
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index 36ec6b9..fc12013 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -83,8 +83,6 @@
mConditionSliced = true;
}
- startNewProtoOutputStreamLocked(mStartTimeNs);
-
VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
}
@@ -93,27 +91,18 @@
VLOG("~CountMetricProducer() called");
}
-void CountMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
- mProto = std::make_unique<ProtoOutputStream>();
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
- mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS);
-}
-
-void CountMetricProducer::finish() {
-}
-
void CountMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
-std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReportLocked() {
- long long endTime = time(nullptr) * NS_PER_SEC;
+void CountMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+ ProtoOutputStream* protoOutput) {
+ flushIfNeededLocked(dumpTimeNs);
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeededLocked(endTime);
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+ long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS);
+
VLOG("metric %s dump report now...", mMetric.name().c_str());
for (const auto& counter : mPastBuckets) {
@@ -125,52 +114,46 @@
continue;
}
long long wrapperToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
// First fill dimension (KeyValuePairs).
for (const auto& kv : it->second) {
- long long dimensionToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
- mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+ long long dimensionToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
+ protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
if (kv.has_value_str()) {
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
} else if (kv.has_value_int()) {
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
} else if (kv.has_value_bool()) {
- mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+ protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
} else if (kv.has_value_float()) {
- mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+ protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
}
- mProto->end(dimensionToken);
+ protoOutput->end(dimensionToken);
}
// Then fill bucket_info (CountBucketInfo).
for (const auto& bucket : counter.second) {
- long long bucketInfoToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
- (long long)bucket.mBucketStartNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
- (long long)bucket.mBucketEndNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)bucket.mCount);
- mProto->end(bucketInfoToken);
+ long long bucketInfoToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+ (long long)bucket.mBucketStartNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+ (long long)bucket.mBucketEndNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)bucket.mCount);
+ protoOutput->end(bucketInfoToken);
VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
(long long)bucket.mBucketEndNs, (long long)bucket.mCount);
}
- mProto->end(wrapperToken);
+ protoOutput->end(wrapperToken);
}
- mProto->end(mProtoToken);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
- (long long)mCurrentBucketStartTimeNs);
+ protoOutput->end(protoToken);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
- VLOG("metric %s dump report now...", mMetric.name().c_str());
- std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
-
- startNewProtoOutputStreamLocked(endTime);
mPastBuckets.clear();
-
- return buffer;
+ mStartTimeNs = mCurrentBucketStartTimeNs;
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h
index 800a2b9..21bd9d6 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.h
+++ b/cmds/statsd/src/metrics/CountMetricProducer.h
@@ -48,8 +48,6 @@
virtual ~CountMetricProducer();
- void finish() override;
-
// TODO: Implement this later.
virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version)
override{};
@@ -63,8 +61,8 @@
const LogEvent& event, bool scheduledPull) override;
private:
- // TODO: Pass a timestamp as a parameter in onDumpReport.
- std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
+ void onDumpReportLocked(const uint64_t dumpTimeNs,
+ android::util::ProtoOutputStream* protoOutput) override;
// Internal interface to handle condition change.
void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
@@ -78,9 +76,6 @@
// Util function to flush the old packet.
void flushIfNeededLocked(const uint64_t& newEventTime);
- // Util function to init/reset the proto output stream.
- void startNewProtoOutputStreamLocked(long long timestamp);
-
const CountMetric mMetric;
// TODO: Add a lock to mPastBuckets.
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index cedea30..9920f65 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -93,8 +93,6 @@
mConditionSliced = true;
}
- startNewProtoOutputStreamLocked(mStartTimeNs);
-
VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
}
@@ -114,13 +112,6 @@
return new AnomalyTracker(alert, mConfigKey);
}
-void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
- mProto = std::make_unique<ProtoOutputStream>();
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
- mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS);
-}
-
unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
const HashableDimensionKey& eventKey) const {
switch (mMetric.aggregation_type()) {
@@ -135,11 +126,6 @@
}
}
-void DurationMetricProducer::finish() {
- // TODO: write the StatsLogReport to dropbox using
- // DropboxWriter.
-}
-
void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
flushIfNeededLocked(eventTime);
@@ -161,13 +147,14 @@
}
}
-std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked() {
- long long endTime = time(nullptr) * NS_PER_SEC;
+void DurationMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+ ProtoOutputStream* protoOutput) {
+ flushIfNeededLocked(dumpTimeNs);
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeededLocked(endTime);
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+ long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS);
+
VLOG("metric %s dump report now...", mMetric.name().c_str());
for (const auto& pair : mPastBuckets) {
@@ -180,49 +167,46 @@
}
long long wrapperToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
// First fill dimension (KeyValuePairs).
for (const auto& kv : it->second) {
- long long dimensionToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
- mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+ long long dimensionToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
+ protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
if (kv.has_value_str()) {
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
} else if (kv.has_value_int()) {
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
} else if (kv.has_value_bool()) {
- mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+ protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
} else if (kv.has_value_float()) {
- mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+ protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
}
- mProto->end(dimensionToken);
+ protoOutput->end(dimensionToken);
}
// Then fill bucket_info (DurationBucketInfo).
for (const auto& bucket : pair.second) {
- long long bucketInfoToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
- (long long)bucket.mBucketStartNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
- (long long)bucket.mBucketEndNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration);
- mProto->end(bucketInfoToken);
+ long long bucketInfoToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+ (long long)bucket.mBucketStartNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+ (long long)bucket.mBucketEndNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration);
+ protoOutput->end(bucketInfoToken);
VLOG("\t bucket [%lld - %lld] duration: %lld", (long long)bucket.mBucketStartNs,
(long long)bucket.mBucketEndNs, (long long)bucket.mDuration);
}
- mProto->end(wrapperToken);
+ protoOutput->end(wrapperToken);
}
- mProto->end(mProtoToken);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
- (long long)mCurrentBucketStartTimeNs);
- std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
- startNewProtoOutputStreamLocked(endTime);
+ protoOutput->end(protoToken);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
mPastBuckets.clear();
- return buffer;
+ mStartTimeNs = mCurrentBucketStartTimeNs;
}
void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) {
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h
index 4bf9d1c..e509af4 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.h
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.h
@@ -47,8 +47,6 @@
virtual sp<AnomalyTracker> createAnomalyTracker(const Alert &alert) override;
- void finish() override;
-
// TODO: Implement this later.
virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version)
override{};
@@ -62,8 +60,8 @@
const LogEvent& event, bool scheduledPull) override;
private:
- // TODO: Pass a timestamp as a parameter in onDumpReport.
- std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
+ void onDumpReportLocked(const uint64_t dumpTimeNs,
+ android::util::ProtoOutputStream* protoOutput) override;
// Internal interface to handle condition change.
void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
@@ -77,9 +75,6 @@
// Util function to flush the old packet.
void flushIfNeededLocked(const uint64_t& eventTime);
- // Util function to init/reset the proto output stream.
- void startNewProtoOutputStreamLocked(long long timestamp);
-
const DurationMetric mMetric;
// Index of the SimpleAtomMatcher which defines the start.
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 8bdc9e3..217aff0 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -62,7 +62,7 @@
mConditionSliced = true;
}
- startNewProtoOutputStreamLocked(mStartTimeNs);
+ startNewProtoOutputStreamLocked();
VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -72,33 +72,45 @@
VLOG("~EventMetricProducer() called");
}
-void EventMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
+void EventMetricProducer::startNewProtoOutputStreamLocked() {
mProto = std::make_unique<ProtoOutputStream>();
- // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData,
- // and StatsEvent.
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
- mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_EVENT_METRICS);
-}
-
-void EventMetricProducer::finish() {
}
void EventMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
}
-std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReportLocked() {
- long long endTime = time(nullptr) * NS_PER_SEC;
- mProto->end(mProtoToken);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
+std::unique_ptr<std::vector<uint8_t>> serializeProtoLocked(ProtoOutputStream& protoOutput) {
+ size_t bufferSize = protoOutput.size();
+
+ std::unique_ptr<std::vector<uint8_t>> buffer(new std::vector<uint8_t>(bufferSize));
+
+ size_t pos = 0;
+ auto it = protoOutput.data();
+ while (it.readBuffer() != NULL) {
+ size_t toRead = it.currentToRead();
+ std::memcpy(&((*buffer)[pos]), it.readBuffer(), toRead);
+ pos += toRead;
+ it.rp()->move(toRead);
+ }
+
+ return buffer;
+}
+
+void EventMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+ ProtoOutputStream* protoOutput) {
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
size_t bufferSize = mProto->size();
VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
- std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
+ std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(*mProto);
- startNewProtoOutputStreamLocked(endTime);
+ protoOutput->write(FIELD_TYPE_MESSAGE | FIELD_ID_EVENT_METRICS,
+ reinterpret_cast<char*>(buffer.get()->data()), buffer.get()->size());
- return buffer;
+ startNewProtoOutputStreamLocked();
+ mStartTimeNs = dumpTimeNs;
}
void EventMetricProducer::onConditionChangedLocked(const bool conditionMet,
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.h b/cmds/statsd/src/metrics/EventMetricProducer.h
index da3b3ca..75ccf47 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.h
+++ b/cmds/statsd/src/metrics/EventMetricProducer.h
@@ -40,8 +40,6 @@
virtual ~EventMetricProducer();
- void finish() override;
-
// TODO: Implement this later.
virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version)
override{};
@@ -49,7 +47,7 @@
virtual void notifyAppRemoved(const string& apk, const int uid) override{};
protected:
- void startNewProtoOutputStreamLocked(long long timestamp);
+ void startNewProtoOutputStreamLocked();
private:
void onMatchedLogEventInternalLocked(
@@ -57,8 +55,8 @@
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) override;
- // TODO: Pass a timestamp as a parameter in onDumpReport.
- std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
+ void onDumpReportLocked(const uint64_t dumpTimeNs,
+ android::util::ProtoOutputStream* protoOutput) override;
// Internal interface to handle condition change.
void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
@@ -70,6 +68,10 @@
size_t byteSizeLocked() const override;
const EventMetric mMetric;
+
+ // Maps to a EventMetricDataWrapper. Storing atom events in ProtoOutputStream
+ // is more space efficient than storing LogEvent.
+ std::unique_ptr<android::util::ProtoOutputStream> mProto;
};
} // namespace statsd
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 1f6bd58b..55d84e0 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -91,8 +91,6 @@
metric.bucket().bucket_size_millis());
}
- startNewProtoOutputStreamLocked(mStartTimeNs);
-
VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
}
@@ -104,23 +102,15 @@
}
}
-void GaugeMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
- mProto = std::make_unique<ProtoOutputStream>();
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
- mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
-}
-
-void GaugeMetricProducer::finish() {
-}
-
-std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReportLocked() {
+void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+ ProtoOutputStream* protoOutput) {
VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeededLocked(time(nullptr) * NS_PER_SEC);
+ flushIfNeededLocked(dumpTimeNs);
+
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+ long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
@@ -132,51 +122,45 @@
VLOG(" dimension key %s", hashableKey.c_str());
long long wrapperToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
// First fill dimension (KeyValuePairs).
for (const auto& kv : it->second) {
- long long dimensionToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
- mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+ long long dimensionToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
+ protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
if (kv.has_value_str()) {
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
} else if (kv.has_value_int()) {
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
} else if (kv.has_value_bool()) {
- mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+ protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
} else if (kv.has_value_float()) {
- mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+ protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
}
- mProto->end(dimensionToken);
+ protoOutput->end(dimensionToken);
}
// Then fill bucket_info (GaugeBucketInfo).
for (const auto& bucket : pair.second) {
- long long bucketInfoToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
- (long long)bucket.mBucketStartNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
- (long long)bucket.mBucketEndNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge);
- mProto->end(bucketInfoToken);
+ long long bucketInfoToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+ (long long)bucket.mBucketStartNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+ (long long)bucket.mBucketEndNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge);
+ protoOutput->end(bucketInfoToken);
VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
(long long)bucket.mBucketEndNs, (long long)bucket.mGauge);
}
- mProto->end(wrapperToken);
+ protoOutput->end(wrapperToken);
}
- mProto->end(mProtoToken);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
- (long long)mCurrentBucketStartTimeNs);
+ protoOutput->end(protoToken);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
- std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
-
- startNewProtoOutputStreamLocked(time(nullptr) * NS_PER_SEC);
mPastBuckets.clear();
-
- return buffer;
-
+ mStartTimeNs = mCurrentBucketStartTimeNs;
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 36705b1..e4bda02 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -56,8 +56,6 @@
// Handles when the pulled data arrives.
void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
- void finish() override;
-
// TODO: Implement this later.
virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version)
override{};
@@ -71,8 +69,8 @@
const LogEvent& event, bool scheduledPull) override;
private:
- // TODO: Pass a timestamp as a parameter in onDumpReport.
- std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
+ void onDumpReportLocked(const uint64_t dumpTimeNs,
+ android::util::ProtoOutputStream* protoOutput) override;
// Internal interface to handle condition change.
void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
@@ -86,9 +84,6 @@
// Util function to flush the old packet.
void flushIfNeededLocked(const uint64_t& eventTime);
- // Util function to init/reset the proto output stream.
- void startNewProtoOutputStreamLocked(long long timestamp);
-
// The default bucket size for gauge metric is 1 second.
static const uint64_t kDefaultGaugemBucketSizeNs = 1000 * 1000 * 1000;
diff --git a/cmds/statsd/src/metrics/MetricProducer.cpp b/cmds/statsd/src/metrics/MetricProducer.cpp
index 7e78a85..5a0a7c7f 100644
--- a/cmds/statsd/src/metrics/MetricProducer.cpp
+++ b/cmds/statsd/src/metrics/MetricProducer.cpp
@@ -64,23 +64,6 @@
scheduledPull);
}
-std::unique_ptr<std::vector<uint8_t>> MetricProducer::serializeProtoLocked() {
- size_t bufferSize = mProto->size();
-
- std::unique_ptr<std::vector<uint8_t>> buffer(new std::vector<uint8_t>(bufferSize));
-
- size_t pos = 0;
- auto it = mProto->data();
- while (it.readBuffer() != NULL) {
- size_t toRead = it.currentToRead();
- std::memcpy(&((*buffer)[pos]), it.readBuffer(), toRead);
- pos += toRead;
- it.rp()->move(toRead);
- }
-
- return buffer;
-}
-
} // namespace statsd
} // namespace os
} // namespace android
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index adeb3cd..ef2ef29 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -74,16 +74,10 @@
return mConditionSliced;
};
- // This is called when the metric collecting is done, e.g., when there is a new configuration
- // coming. MetricProducer should do the clean up, and dump existing data to dropbox.
- virtual void finish() = 0;
-
- // TODO: Pass a timestamp as a parameter in onDumpReport and update all its
- // implementations.
- // onDumpReport returns the proto-serialized output and clears the previously stored contents.
- std::unique_ptr<std::vector<uint8_t>> onDumpReport() {
+ // Output the metrics data to [protoOutput]. All metrics reports end with the same timestamp.
+ void onDumpReport(const uint64_t dumpTimeNs, android::util::ProtoOutputStream* protoOutput) {
std::lock_guard<std::mutex> lock(mMutex);
- return onDumpReportLocked();
+ return onDumpReportLocked(dumpTimeNs, protoOutput);
}
// Returns the memory in bytes currently used to store this metric's data. Does not change
@@ -110,12 +104,14 @@
protected:
virtual void onConditionChangedLocked(const bool condition, const uint64_t eventTime) = 0;
virtual void onSlicedConditionMayChangeLocked(const uint64_t eventTime) = 0;
- virtual std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() = 0;
+ virtual void onDumpReportLocked(const uint64_t dumpTimeNs,
+ android::util::ProtoOutputStream* protoOutput) = 0;
virtual size_t byteSizeLocked() const = 0;
const ConfigKey mConfigKey;
- const uint64_t mStartTimeNs;
+ // The start time for the current in memory metrics data.
+ uint64_t mStartTimeNs;
uint64_t mCurrentBucketStartTimeNs;
@@ -165,15 +161,7 @@
void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event,
bool scheduledPull);
- std::unique_ptr<android::util::ProtoOutputStream> mProto;
-
- long long mProtoToken;
-
- // Read/Write mutex to make the producer thread-safe.
- // TODO(yanglu): replace with std::shared_mutex when available in libc++.
mutable std::mutex mMutex;
-
- std::unique_ptr<std::vector<uint8_t>> serializeProtoLocked();
};
} // namespace statsd
diff --git a/cmds/statsd/src/metrics/MetricsManager.cpp b/cmds/statsd/src/metrics/MetricsManager.cpp
index 9fdc6fa..0510fff 100644
--- a/cmds/statsd/src/metrics/MetricsManager.cpp
+++ b/cmds/statsd/src/metrics/MetricsManager.cpp
@@ -27,6 +27,11 @@
#include "stats_util.h"
#include <log/logprint.h>
+
+using android::util::FIELD_COUNT_REPEATED;
+using android::util::FIELD_TYPE_MESSAGE;
+using android::util::ProtoOutputStream;
+
using std::make_unique;
using std::set;
using std::string;
@@ -37,6 +42,8 @@
namespace os {
namespace statsd {
+const int FIELD_ID_METRICS = 1;
+
MetricsManager::MetricsManager(const ConfigKey& key, const StatsdConfig& config) : mConfigKey(key) {
mConfigValid =
initStatsdConfig(key, config, mTagIds, mAllAtomMatchers, mAllConditionTrackers,
@@ -65,21 +72,17 @@
return mConfigValid;
}
-void MetricsManager::finish() {
- for (auto& metricProducer : mAllMetricProducers) {
- metricProducer->finish();
- }
-}
-
-vector<std::unique_ptr<vector<uint8_t>>> MetricsManager::onDumpReport() {
+void MetricsManager::onDumpReport(ProtoOutputStream* protoOutput) {
VLOG("=========================Metric Reports Start==========================");
+ uint64_t dumpTimeStampNs = time(nullptr) * NS_PER_SEC;
// one StatsLogReport per MetricProduer
- vector<std::unique_ptr<vector<uint8_t>>> reportList;
for (auto& metric : mAllMetricProducers) {
- reportList.push_back(metric->onDumpReport());
+ long long token =
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_METRICS);
+ metric->onDumpReport(dumpTimeStampNs, protoOutput);
+ protoOutput->end(token);
}
VLOG("=========================Metric Reports End==========================");
- return reportList;
}
// Consume the stats log if it's interesting to this metric.
diff --git a/cmds/statsd/src/metrics/MetricsManager.h b/cmds/statsd/src/metrics/MetricsManager.h
index 86c4733..34ea667 100644
--- a/cmds/statsd/src/metrics/MetricsManager.h
+++ b/cmds/statsd/src/metrics/MetricsManager.h
@@ -43,16 +43,13 @@
void onLogEvent(const LogEvent& event);
- // Called when everything should wrap up. We are about to finish (e.g., new config comes).
- void finish();
-
void onAnomalyAlarmFired(const uint64_t timestampNs,
unordered_set<sp<const AnomalyAlarm>, SpHash<AnomalyAlarm>>& anomalySet);
void setAnomalyMonitor(const sp<AnomalyMonitor>& anomalyMonitor);
// Config source owner can call onDumpReport() to get all the metrics collected.
- virtual std::vector<std::unique_ptr<std::vector<uint8_t>>> onDumpReport();
+ virtual void onDumpReport(android::util::ProtoOutputStream* protoOutput);
// Computes the total byte size of all metrics managed by a single config source.
// Does not change the state.
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 977aa88..721d02c 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -97,9 +97,6 @@
mStatsPullerManager->RegisterReceiver(mPullTagId, this,
metric.bucket().bucket_size_millis());
}
-
- startNewProtoOutputStreamLocked(mStartTimeNs);
-
VLOG("value metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
}
@@ -120,24 +117,17 @@
}
}
-void ValueMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
- mProto = std::make_unique<ProtoOutputStream>();
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
- mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
-}
-
-void ValueMetricProducer::finish() {
- // TODO: write the StatsLogReport to dropbox using
- // DropboxWriter.
-}
-
void ValueMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
-std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReportLocked() {
+void ValueMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+ ProtoOutputStream* protoOutput) {
VLOG("metric %s dump report now...", mMetric.name().c_str());
+ flushIfNeededLocked(dumpTimeNs);
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+ long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
@@ -148,52 +138,46 @@
continue;
}
long long wrapperToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
// First fill dimension (KeyValuePairs).
for (const auto& kv : it->second) {
- long long dimensionToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
- mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+ long long dimensionToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
+ protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
if (kv.has_value_str()) {
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
} else if (kv.has_value_int()) {
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
} else if (kv.has_value_bool()) {
- mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+ protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
} else if (kv.has_value_float()) {
- mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+ protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
}
- mProto->end(dimensionToken);
+ protoOutput->end(dimensionToken);
}
// Then fill bucket_info (ValueBucketInfo).
for (const auto& bucket : pair.second) {
- long long bucketInfoToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
- (long long)bucket.mBucketStartNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
- (long long)bucket.mBucketEndNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue);
- mProto->end(bucketInfoToken);
+ long long bucketInfoToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+ (long long)bucket.mBucketStartNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+ (long long)bucket.mBucketEndNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue);
+ protoOutput->end(bucketInfoToken);
VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
(long long)bucket.mBucketEndNs, (long long)bucket.mValue);
}
- mProto->end(wrapperToken);
+ protoOutput->end(wrapperToken);
}
- mProto->end(mProtoToken);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
- (long long)mCurrentBucketStartTimeNs);
+ protoOutput->end(protoToken);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
VLOG("metric %s dump report now...", mMetric.name().c_str());
- std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
-
- startNewProtoOutputStreamLocked(time(nullptr) * NS_PER_SEC);
mPastBuckets.clear();
-
- return buffer;
-
+ mStartTimeNs = mCurrentBucketStartTimeNs;
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index a2efd3f..8d60ff6 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -44,8 +44,6 @@
virtual ~ValueMetricProducer();
- void finish() override;
-
void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
// TODO: Implement this later.
@@ -61,8 +59,8 @@
const LogEvent& event, bool scheduledPull) override;
private:
- // TODO: Pass a timestamp as a parameter in onDumpReport.
- std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
+ void onDumpReportLocked(const uint64_t dumpTimeNs,
+ android::util::ProtoOutputStream* protoOutput) override;
// Internal interface to handle condition change.
void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
@@ -76,9 +74,6 @@
// Util function to flush the old packet.
void flushIfNeededLocked(const uint64_t& eventTime);
- // Util function to init/reset the proto output stream.
- void startNewProtoOutputStreamLocked(long long timestamp);
-
const ValueMetric mMetric;
std::shared_ptr<StatsPullerManager> mStatsPullerManager;