Migrate all remaining MetricProducers to use ProtoOutputStream

Test: statsd, statsd_test
Change-Id: I1087e1c1ffb372ca288dfc575cb7a372b11ce8c5
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index 09132bf..c0a0d98 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -23,6 +23,12 @@
 #include <limits.h>
 #include <stdlib.h>
 
+using android::util::FIELD_TYPE_BOOL;
+using android::util::FIELD_TYPE_FLOAT;
+using android::util::FIELD_TYPE_INT32;
+using android::util::FIELD_TYPE_INT64;
+using android::util::FIELD_TYPE_MESSAGE;
+using android::util::ProtoOutputStream;
 using std::string;
 using std::unordered_map;
 using std::vector;
@@ -31,6 +37,27 @@
 namespace os {
 namespace statsd {
 
+// for StatsLogReport
+const int FIELD_ID_METRIC_ID = 1;
+const int FIELD_ID_START_REPORT_NANOS = 2;
+const int FIELD_ID_END_REPORT_NANOS = 3;
+const int FIELD_ID_DURATION_METRICS = 6;
+// for DurationMetricDataWrapper
+const int FIELD_ID_DATA = 1;
+// for DurationMetricData
+const int FIELD_ID_DIMENSION = 1;
+const int FIELD_ID_BUCKET_INFO = 2;
+// for KeyValuePair
+const int FIELD_ID_KEY = 1;
+const int FIELD_ID_VALUE_STR = 2;
+const int FIELD_ID_VALUE_INT = 3;
+const int FIELD_ID_VALUE_BOOL = 4;
+const int FIELD_ID_VALUE_FLOAT = 5;
+// for DurationBucketInfo
+const int FIELD_ID_START_BUCKET_NANOS = 1;
+const int FIELD_ID_END_BUCKET_NANOS = 2;
+const int FIELD_ID_DURATION = 3;
+
 DurationMetricProducer::DurationMetricProducer(const DurationMetric& metric,
                                                const int conditionIndex, const size_t startIndex,
                                                const size_t stopIndex, const size_t stopAllIndex,
@@ -61,6 +88,8 @@
         mConditionSliced = true;
     }
 
+    startNewProtoOutputStream(mStartTimeNs);
+
     VLOG("metric %lld created. bucket size %lld start_time: %lld", metric.metric_id(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
 }
@@ -69,8 +98,15 @@
     VLOG("~DurationMetric() called");
 }
 
+void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
+    mProto = std::make_unique<ProtoOutputStream>();
+    mProto->write(FIELD_TYPE_INT32 | FIELD_ID_METRIC_ID, mMetric.metric_id());
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
+    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS);
+}
+
 unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
-        vector<DurationBucketInfo>& bucket) {
+        vector<DurationBucket>& bucket) {
     switch (mMetric.type()) {
         case DurationMetric_AggregationType_DURATION_SUM:
             return make_unique<OringDurationTracker>(mWizard, mConditionTrackerIndex,
@@ -124,29 +160,69 @@
 }
 
 StatsLogReport DurationMetricProducer::onDumpReport() {
-    VLOG("metric %lld dump report now...", mMetric.metric_id());
-    StatsLogReport report;
-    report.set_metric_id(mMetric.metric_id());
-    report.set_start_report_nanos(mStartTimeNs);
+    long long endTime = time(nullptr) * NS_PER_SEC;
+
     // Dump current bucket if it's stale.
     // If current bucket is still on-going, don't force dump current bucket.
     // In finish(), We can force dump current bucket.
-    flushIfNeeded(time(nullptr) * NS_PER_SEC);
-    report.set_end_report_nanos(mCurrentBucketStartTimeNs);
+    flushIfNeeded(endTime);
+    VLOG("metric %lld dump report now...", mMetric.metric_id());
 
-    StatsLogReport_DurationMetricDataWrapper* wrapper = report.mutable_duration_metrics();
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
+        VLOG("  dimension key %s", hashableKey.c_str());
         auto it = mDimensionKeyMap.find(hashableKey);
         if (it == mDimensionKeyMap.end()) {
             ALOGW("Dimension key %s not found?!?! skip...", hashableKey.c_str());
             continue;
         }
-        VLOG("  dimension key %s", hashableKey.c_str());
-        addDurationBucketsToReport(*wrapper, it->second, pair.second);
+        long long wrapperToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DATA);
+
+        // First fill dimension (KeyValuePairs).
+        for (const auto& kv : it->second) {
+            long long dimensionToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DIMENSION);
+            mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+            if (kv.has_value_str()) {
+                mProto->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_STR, kv.value_str());
+            } else if (kv.has_value_int()) {
+                mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+            } else if (kv.has_value_bool()) {
+                mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+            } else if (kv.has_value_float()) {
+                mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+            }
+            mProto->end(dimensionToken);
+        }
+
+        // Then fill bucket_info (DurationBucketInfo).
+        for (const auto& bucket : pair.second) {
+            long long bucketInfoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_BUCKET_INFO);
+            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+                          (long long)bucket.mBucketStartNs);
+            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+                          (long long)bucket.mBucketEndNs);
+            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration);
+            mProto->end(bucketInfoToken);
+            VLOG("\t bucket [%lld - %lld] duration: %lld", (long long)bucket.mBucketStartNs,
+                 (long long)bucket.mBucketEndNs, (long long)bucket.mDuration);
+        }
+
+        mProto->end(wrapperToken);
     }
-    return report;
-};
+
+    mProto->end(mProtoToken);
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
+                  (long long)mCurrentBucketStartTimeNs);
+
+    std::unique_ptr<uint8_t[]> buffer = serializeProto();
+
+    startNewProtoOutputStream(endTime);
+    mPastBuckets.clear();
+
+    // TODO: Once we migrate all MetricProducers to use ProtoOutputStream, we should return this:
+    // return std::move(buffer);
+    return StatsLogReport();
+}
 
 void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
     if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
@@ -188,17 +264,17 @@
 
     if (matcherIndex == mStartIndex) {
         it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
-
     } else if (matcherIndex == mStopIndex) {
         it->second->noteStop(atomKey, event.GetTimestampNs());
     }
 }
 
 size_t DurationMetricProducer::byteSize() {
-    // TODO: return actual proto size when ProtoOutputStream is ready for use for
-    // DurationMetricsProducer.
-    //    return mProto->size();
-    return 0;
+  size_t totalSize = 0;
+  for (const auto& pair : mPastBuckets) {
+      totalSize += pair.second.size() * kBucketSize;
+  }
+  return totalSize;
 }
 
 }  // namespace statsd