Only create ProtoOutputStream when onGetData() is called.
The exception is EventMetricProducer. Each EventMetricProducer will still have a ProtoOutputStream
Because LogEvent comes as a fixed 4K, it's more memory efficient to have an 8k ProtoOutputStream for
storing the events.
Also removed finish() api in MetricProducer, which was intended to use with Dropbox.
Test: statsd_test & dogfood app
Bug: 70393808
Change-Id: I2efe4ecc76a88060a9aa5eb49d1fa6ea60bc5da8
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 1f6bd58b..55d84e0 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -91,8 +91,6 @@
metric.bucket().bucket_size_millis());
}
- startNewProtoOutputStreamLocked(mStartTimeNs);
-
VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
}
@@ -104,23 +102,15 @@
}
}
-void GaugeMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
- mProto = std::make_unique<ProtoOutputStream>();
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
- mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
-}
-
-void GaugeMetricProducer::finish() {
-}
-
-std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReportLocked() {
+void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+ ProtoOutputStream* protoOutput) {
VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeededLocked(time(nullptr) * NS_PER_SEC);
+ flushIfNeededLocked(dumpTimeNs);
+
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+ long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
@@ -132,51 +122,45 @@
VLOG(" dimension key %s", hashableKey.c_str());
long long wrapperToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+ protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
// First fill dimension (KeyValuePairs).
for (const auto& kv : it->second) {
- long long dimensionToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
- mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+ long long dimensionToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
+ protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
if (kv.has_value_str()) {
- mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
+ protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
} else if (kv.has_value_int()) {
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
} else if (kv.has_value_bool()) {
- mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+ protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
} else if (kv.has_value_float()) {
- mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+ protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
}
- mProto->end(dimensionToken);
+ protoOutput->end(dimensionToken);
}
// Then fill bucket_info (GaugeBucketInfo).
for (const auto& bucket : pair.second) {
- long long bucketInfoToken =
- mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
- (long long)bucket.mBucketStartNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
- (long long)bucket.mBucketEndNs);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge);
- mProto->end(bucketInfoToken);
+ long long bucketInfoToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+ (long long)bucket.mBucketStartNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+ (long long)bucket.mBucketEndNs);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge);
+ protoOutput->end(bucketInfoToken);
VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
(long long)bucket.mBucketEndNs, (long long)bucket.mGauge);
}
- mProto->end(wrapperToken);
+ protoOutput->end(wrapperToken);
}
- mProto->end(mProtoToken);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
- (long long)mCurrentBucketStartTimeNs);
+ protoOutput->end(protoToken);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
- std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
-
- startNewProtoOutputStreamLocked(time(nullptr) * NS_PER_SEC);
mPastBuckets.clear();
-
- return buffer;
-
+ mStartTimeNs = mCurrentBucketStartTimeNs;
// TODO: Clear mDimensionKeyMap once the report is dumped.
}