Migrate all remaining MetricProducers to use ProtoOutputStream
Test: statsd, statsd_test
Change-Id: I1087e1c1ffb372ca288dfc575cb7a372b11ce8c5
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index 09132bf..c0a0d98 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -23,6 +23,12 @@
#include <limits.h>
#include <stdlib.h>
+using android::util::FIELD_TYPE_BOOL;
+using android::util::FIELD_TYPE_FLOAT;
+using android::util::FIELD_TYPE_INT32;
+using android::util::FIELD_TYPE_INT64;
+using android::util::FIELD_TYPE_MESSAGE;
+using android::util::ProtoOutputStream;
using std::string;
using std::unordered_map;
using std::vector;
@@ -31,6 +37,27 @@
namespace os {
namespace statsd {
+// for StatsLogReport
+const int FIELD_ID_METRIC_ID = 1;
+const int FIELD_ID_START_REPORT_NANOS = 2;
+const int FIELD_ID_END_REPORT_NANOS = 3;
+const int FIELD_ID_DURATION_METRICS = 6;
+// for DurationMetricDataWrapper
+const int FIELD_ID_DATA = 1;
+// for DurationMetricData
+const int FIELD_ID_DIMENSION = 1;
+const int FIELD_ID_BUCKET_INFO = 2;
+// for KeyValuePair
+const int FIELD_ID_KEY = 1;
+const int FIELD_ID_VALUE_STR = 2;
+const int FIELD_ID_VALUE_INT = 3;
+const int FIELD_ID_VALUE_BOOL = 4;
+const int FIELD_ID_VALUE_FLOAT = 5;
+// for DurationBucketInfo
+const int FIELD_ID_START_BUCKET_NANOS = 1;
+const int FIELD_ID_END_BUCKET_NANOS = 2;
+const int FIELD_ID_DURATION = 3;
+
DurationMetricProducer::DurationMetricProducer(const DurationMetric& metric,
const int conditionIndex, const size_t startIndex,
const size_t stopIndex, const size_t stopAllIndex,
@@ -61,6 +88,8 @@
mConditionSliced = true;
}
+ startNewProtoOutputStream(mStartTimeNs);
+
VLOG("metric %lld created. bucket size %lld start_time: %lld", metric.metric_id(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
}
@@ -69,8 +98,15 @@
VLOG("~DurationMetric() called");
}
+void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
+ mProto = std::make_unique<ProtoOutputStream>();
+ mProto->write(FIELD_TYPE_INT32 | FIELD_ID_METRIC_ID, mMetric.metric_id());
+ mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
+ mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS);
+}
+
unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
- vector<DurationBucketInfo>& bucket) {
+ vector<DurationBucket>& bucket) {
switch (mMetric.type()) {
case DurationMetric_AggregationType_DURATION_SUM:
return make_unique<OringDurationTracker>(mWizard, mConditionTrackerIndex,
@@ -124,29 +160,69 @@
}
StatsLogReport DurationMetricProducer::onDumpReport() {
- VLOG("metric %lld dump report now...", mMetric.metric_id());
- StatsLogReport report;
- report.set_metric_id(mMetric.metric_id());
- report.set_start_report_nanos(mStartTimeNs);
+ long long endTime = time(nullptr) * NS_PER_SEC;
+
// Dump current bucket if it's stale.
// If current bucket is still on-going, don't force dump current bucket.
// In finish(), We can force dump current bucket.
- flushIfNeeded(time(nullptr) * NS_PER_SEC);
- report.set_end_report_nanos(mCurrentBucketStartTimeNs);
+ flushIfNeeded(endTime);
+ VLOG("metric %lld dump report now...", mMetric.metric_id());
- StatsLogReport_DurationMetricDataWrapper* wrapper = report.mutable_duration_metrics();
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
+ VLOG(" dimension key %s", hashableKey.c_str());
auto it = mDimensionKeyMap.find(hashableKey);
if (it == mDimensionKeyMap.end()) {
ALOGW("Dimension key %s not found?!?! skip...", hashableKey.c_str());
continue;
}
- VLOG(" dimension key %s", hashableKey.c_str());
- addDurationBucketsToReport(*wrapper, it->second, pair.second);
+ long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DATA);
+
+ // First fill dimension (KeyValuePairs).
+ for (const auto& kv : it->second) {
+ long long dimensionToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION);
+ mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+ if (kv.has_value_str()) {
+ mProto->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_STR, kv.value_str());
+ } else if (kv.has_value_int()) {
+ mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+ } else if (kv.has_value_bool()) {
+ mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+ } else if (kv.has_value_float()) {
+ mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+ }
+ mProto->end(dimensionToken);
+ }
+
+ // Then fill bucket_info (DurationBucketInfo).
+ for (const auto& bucket : pair.second) {
+ long long bucketInfoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_BUCKET_INFO);
+ mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+ (long long)bucket.mBucketStartNs);
+ mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+ (long long)bucket.mBucketEndNs);
+ mProto->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration);
+ mProto->end(bucketInfoToken);
+ VLOG("\t bucket [%lld - %lld] duration: %lld", (long long)bucket.mBucketStartNs,
+ (long long)bucket.mBucketEndNs, (long long)bucket.mDuration);
+ }
+
+ mProto->end(wrapperToken);
}
- return report;
-};
+
+ mProto->end(mProtoToken);
+ mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
+ (long long)mCurrentBucketStartTimeNs);
+
+ std::unique_ptr<uint8_t[]> buffer = serializeProto();
+
+ startNewProtoOutputStream(endTime);
+ mPastBuckets.clear();
+
+ // TODO: Once we migrate all MetricProducers to use ProtoOutputStream, we should return this:
+ // return std::move(buffer);
+ return StatsLogReport();
+}
void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
@@ -188,17 +264,17 @@
if (matcherIndex == mStartIndex) {
it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
-
} else if (matcherIndex == mStopIndex) {
it->second->noteStop(atomKey, event.GetTimestampNs());
}
}
size_t DurationMetricProducer::byteSize() {
- // TODO: return actual proto size when ProtoOutputStream is ready for use for
- // DurationMetricsProducer.
- // return mProto->size();
- return 0;
+ size_t totalSize = 0;
+ for (const auto& pair : mPastBuckets) {
+ totalSize += pair.second.size() * kBucketSize;
+ }
+ return totalSize;
}
} // namespace statsd