Only create ProtoOutputStream when onGetData() is called.

The exception is EventMetricProducer. Each EventMetricProducer will still have a ProtoOutputStream
Because LogEvent comes as a fixed 4K, it's more memory efficient to have an 8k ProtoOutputStream for
storing the events.

Also removed finish() api in MetricProducer, which was intended to use with Dropbox.

Test: statsd_test & dogfood app
Bug: 70393808
Change-Id: I2efe4ecc76a88060a9aa5eb49d1fa6ea60bc5da8
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index 2df0c90..3b5c236 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -111,9 +111,7 @@
     unique_ptr<MetricsManager> newMetricsManager = std::make_unique<MetricsManager>(key, config);
 
     auto it = mMetricsManagers.find(key);
-    if (it != mMetricsManagers.end()) {
-        it->second->finish();
-    } else if (mMetricsManagers.size() > StatsdStats::kMaxConfigCount) {
+    if (it == mMetricsManagers.end() && mMetricsManagers.size() > StatsdStats::kMaxConfigCount) {
         ALOGE("Can't accept more configs!");
         return;
     }
@@ -167,11 +165,7 @@
 
     // First, fill in ConfigMetricsReport using current data on memory, which
     // starts from filling in StatsLogReport's.
-    for (auto& m : it->second->onDumpReport()) {
-        // Add each vector of StatsLogReport into a repeated field.
-        proto.write(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_METRICS,
-                    reinterpret_cast<char*>(m.get()->data()), m.get()->size());
-    }
+    it->second->onDumpReport(&proto);
 
     // Fill in UidMap.
     auto uidMap = mUidMap->getOutput(key);
@@ -205,7 +199,6 @@
 void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
     auto it = mMetricsManagers.find(key);
     if (it != mMetricsManagers.end()) {
-        it->second->finish();
         mMetricsManagers.erase(it);
         mUidMap->OnConfigRemoved(key);
     }
@@ -231,8 +224,9 @@
     mLastByteSizeTimes[key] = timestampNs;
     if (totalBytes >
         StatsdStats::kMaxMetricsBytesPerConfig) {  // Too late. We need to start clearing data.
-        // We ignore the return value so we force each metric producer to clear its contents.
-        metricsManager.onDumpReport();
+        // TODO(b/70571383): By 12/15/2017 add API to drop data directly
+        ProtoOutputStream proto;
+        metricsManager.onDumpReport(&proto);
         StatsdStats::getInstance().noteDataDropped(key);
         VLOG("StatsD had to toss out metrics for %s", key.ToString().c_str());
     } else if (totalBytes > .9 * StatsdStats::kMaxMetricsBytesPerConfig) {
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index 36ec6b9..fc12013 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -83,8 +83,6 @@
         mConditionSliced = true;
     }
 
-    startNewProtoOutputStreamLocked(mStartTimeNs);
-
     VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
 }
@@ -93,27 +91,18 @@
     VLOG("~CountMetricProducer() called");
 }
 
-void CountMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
-    mProto = std::make_unique<ProtoOutputStream>();
-    mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
-    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS);
-}
-
-void CountMetricProducer::finish() {
-}
-
 void CountMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
 }
 
-std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReportLocked() {
-    long long endTime = time(nullptr) * NS_PER_SEC;
+void CountMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+                                             ProtoOutputStream* protoOutput) {
+    flushIfNeededLocked(dumpTimeNs);
 
-    // Dump current bucket if it's stale.
-    // If current bucket is still on-going, don't force dump current bucket.
-    // In finish(), We can force dump current bucket.
-    flushIfNeededLocked(endTime);
+    protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+    long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_COUNT_METRICS);
+
     VLOG("metric %s dump report now...", mMetric.name().c_str());
 
     for (const auto& counter : mPastBuckets) {
@@ -125,52 +114,46 @@
             continue;
         }
         long long wrapperToken =
-                mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
 
         // First fill dimension (KeyValuePairs).
         for (const auto& kv : it->second) {
-            long long dimensionToken =
-                    mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
-            mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+            long long dimensionToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
+            protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
             if (kv.has_value_str()) {
-                mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
+                protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
             } else if (kv.has_value_int()) {
-                mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+                protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
             } else if (kv.has_value_bool()) {
-                mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+                protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
             } else if (kv.has_value_float()) {
-                mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+                protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
             }
-            mProto->end(dimensionToken);
+            protoOutput->end(dimensionToken);
         }
 
         // Then fill bucket_info (CountBucketInfo).
         for (const auto& bucket : counter.second) {
-            long long bucketInfoToken =
-                    mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
-                          (long long)bucket.mBucketStartNs);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
-                          (long long)bucket.mBucketEndNs);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)bucket.mCount);
-            mProto->end(bucketInfoToken);
+            long long bucketInfoToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+                               (long long)bucket.mBucketStartNs);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+                               (long long)bucket.mBucketEndNs);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_COUNT, (long long)bucket.mCount);
+            protoOutput->end(bucketInfoToken);
             VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
                  (long long)bucket.mBucketEndNs, (long long)bucket.mCount);
         }
-        mProto->end(wrapperToken);
+        protoOutput->end(wrapperToken);
     }
 
-    mProto->end(mProtoToken);
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
-                  (long long)mCurrentBucketStartTimeNs);
+    protoOutput->end(protoToken);
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
 
-    VLOG("metric %s dump report now...", mMetric.name().c_str());
-    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
-
-    startNewProtoOutputStreamLocked(endTime);
     mPastBuckets.clear();
-
-    return buffer;
+    mStartTimeNs = mCurrentBucketStartTimeNs;
 
     // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h
index 800a2b9..21bd9d6 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.h
+++ b/cmds/statsd/src/metrics/CountMetricProducer.h
@@ -48,8 +48,6 @@
 
     virtual ~CountMetricProducer();
 
-    void finish() override;
-
     // TODO: Implement this later.
     virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version)
             override{};
@@ -63,8 +61,8 @@
             const LogEvent& event, bool scheduledPull) override;
 
 private:
-    // TODO: Pass a timestamp as a parameter in onDumpReport.
-    std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
+    void onDumpReportLocked(const uint64_t dumpTimeNs,
+                            android::util::ProtoOutputStream* protoOutput) override;
 
     // Internal interface to handle condition change.
     void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
@@ -78,9 +76,6 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const uint64_t& newEventTime);
 
-    // Util function to init/reset the proto output stream.
-    void startNewProtoOutputStreamLocked(long long timestamp);
-
     const CountMetric mMetric;
 
     // TODO: Add a lock to mPastBuckets.
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index cedea30..9920f65 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -93,8 +93,6 @@
         mConditionSliced = true;
     }
 
-    startNewProtoOutputStreamLocked(mStartTimeNs);
-
     VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
 }
@@ -114,13 +112,6 @@
     return new AnomalyTracker(alert, mConfigKey);
 }
 
-void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
-    mProto = std::make_unique<ProtoOutputStream>();
-    mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
-    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS);
-}
-
 unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
         const HashableDimensionKey& eventKey) const {
     switch (mMetric.aggregation_type()) {
@@ -135,11 +126,6 @@
     }
 }
 
-void DurationMetricProducer::finish() {
-    // TODO: write the StatsLogReport to dropbox using
-    // DropboxWriter.
-}
-
 void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
     flushIfNeededLocked(eventTime);
@@ -161,13 +147,14 @@
     }
 }
 
-std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked() {
-    long long endTime = time(nullptr) * NS_PER_SEC;
+void DurationMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+                                                ProtoOutputStream* protoOutput) {
+    flushIfNeededLocked(dumpTimeNs);
 
-    // Dump current bucket if it's stale.
-    // If current bucket is still on-going, don't force dump current bucket.
-    // In finish(), We can force dump current bucket.
-    flushIfNeededLocked(endTime);
+    protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+    long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_DURATION_METRICS);
+
     VLOG("metric %s dump report now...", mMetric.name().c_str());
 
     for (const auto& pair : mPastBuckets) {
@@ -180,49 +167,46 @@
         }
 
         long long wrapperToken =
-                mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
 
         // First fill dimension (KeyValuePairs).
         for (const auto& kv : it->second) {
-            long long dimensionToken =
-                    mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
-            mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+            long long dimensionToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
+            protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
             if (kv.has_value_str()) {
-                mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
+                protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
             } else if (kv.has_value_int()) {
-                mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+                protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
             } else if (kv.has_value_bool()) {
-                mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+                protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
             } else if (kv.has_value_float()) {
-                mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+                protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
             }
-            mProto->end(dimensionToken);
+            protoOutput->end(dimensionToken);
         }
 
         // Then fill bucket_info (DurationBucketInfo).
         for (const auto& bucket : pair.second) {
-            long long bucketInfoToken =
-                    mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
-                          (long long)bucket.mBucketStartNs);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
-                          (long long)bucket.mBucketEndNs);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration);
-            mProto->end(bucketInfoToken);
+            long long bucketInfoToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+                               (long long)bucket.mBucketStartNs);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+                               (long long)bucket.mBucketEndNs);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DURATION, (long long)bucket.mDuration);
+            protoOutput->end(bucketInfoToken);
             VLOG("\t bucket [%lld - %lld] duration: %lld", (long long)bucket.mBucketStartNs,
                  (long long)bucket.mBucketEndNs, (long long)bucket.mDuration);
         }
 
-        mProto->end(wrapperToken);
+        protoOutput->end(wrapperToken);
     }
 
-    mProto->end(mProtoToken);
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
-                  (long long)mCurrentBucketStartTimeNs);
-    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
-    startNewProtoOutputStreamLocked(endTime);
+    protoOutput->end(protoToken);
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
     mPastBuckets.clear();
-    return buffer;
+    mStartTimeNs = mCurrentBucketStartTimeNs;
 }
 
 void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) {
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h
index 4bf9d1c..e509af4 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.h
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.h
@@ -47,8 +47,6 @@
 
     virtual sp<AnomalyTracker> createAnomalyTracker(const Alert &alert) override;
 
-    void finish() override;
-
     // TODO: Implement this later.
     virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version)
             override{};
@@ -62,8 +60,8 @@
             const LogEvent& event, bool scheduledPull) override;
 
 private:
-    // TODO: Pass a timestamp as a parameter in onDumpReport.
-    std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
+    void onDumpReportLocked(const uint64_t dumpTimeNs,
+                            android::util::ProtoOutputStream* protoOutput) override;
 
     // Internal interface to handle condition change.
     void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
@@ -77,9 +75,6 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const uint64_t& eventTime);
 
-    // Util function to init/reset the proto output stream.
-    void startNewProtoOutputStreamLocked(long long timestamp);
-
     const DurationMetric mMetric;
 
     // Index of the SimpleAtomMatcher which defines the start.
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 8bdc9e3..217aff0 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -62,7 +62,7 @@
         mConditionSliced = true;
     }
 
-    startNewProtoOutputStreamLocked(mStartTimeNs);
+    startNewProtoOutputStreamLocked();
 
     VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -72,33 +72,45 @@
     VLOG("~EventMetricProducer() called");
 }
 
-void EventMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
+void EventMetricProducer::startNewProtoOutputStreamLocked() {
     mProto = std::make_unique<ProtoOutputStream>();
-    // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData,
-    // and StatsEvent.
-    mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
-    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_EVENT_METRICS);
-}
-
-void EventMetricProducer::finish() {
 }
 
 void EventMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
 }
 
-std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReportLocked() {
-    long long endTime = time(nullptr) * NS_PER_SEC;
-    mProto->end(mProtoToken);
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
+std::unique_ptr<std::vector<uint8_t>> serializeProtoLocked(ProtoOutputStream& protoOutput) {
+    size_t bufferSize = protoOutput.size();
+
+    std::unique_ptr<std::vector<uint8_t>> buffer(new std::vector<uint8_t>(bufferSize));
+
+    size_t pos = 0;
+    auto it = protoOutput.data();
+    while (it.readBuffer() != NULL) {
+        size_t toRead = it.currentToRead();
+        std::memcpy(&((*buffer)[pos]), it.readBuffer(), toRead);
+        pos += toRead;
+        it.rp()->move(toRead);
+    }
+
+    return buffer;
+}
+
+void EventMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+                                             ProtoOutputStream* protoOutput) {
+    protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
 
     size_t bufferSize = mProto->size();
     VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
-    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
+    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked(*mProto);
 
-    startNewProtoOutputStreamLocked(endTime);
+    protoOutput->write(FIELD_TYPE_MESSAGE | FIELD_ID_EVENT_METRICS,
+                       reinterpret_cast<char*>(buffer.get()->data()), buffer.get()->size());
 
-    return buffer;
+    startNewProtoOutputStreamLocked();
+    mStartTimeNs = dumpTimeNs;
 }
 
 void EventMetricProducer::onConditionChangedLocked(const bool conditionMet,
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.h b/cmds/statsd/src/metrics/EventMetricProducer.h
index da3b3ca..75ccf47 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.h
+++ b/cmds/statsd/src/metrics/EventMetricProducer.h
@@ -40,8 +40,6 @@
 
     virtual ~EventMetricProducer();
 
-    void finish() override;
-
     // TODO: Implement this later.
     virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version)
             override{};
@@ -49,7 +47,7 @@
     virtual void notifyAppRemoved(const string& apk, const int uid) override{};
 
 protected:
-    void startNewProtoOutputStreamLocked(long long timestamp);
+    void startNewProtoOutputStreamLocked();
 
 private:
     void onMatchedLogEventInternalLocked(
@@ -57,8 +55,8 @@
             const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
             const LogEvent& event, bool scheduledPull) override;
 
-    // TODO: Pass a timestamp as a parameter in onDumpReport.
-    std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
+    void onDumpReportLocked(const uint64_t dumpTimeNs,
+                            android::util::ProtoOutputStream* protoOutput) override;
 
     // Internal interface to handle condition change.
     void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
@@ -70,6 +68,10 @@
     size_t byteSizeLocked() const override;
 
     const EventMetric mMetric;
+
+    // Maps to a EventMetricDataWrapper. Storing atom events in ProtoOutputStream
+    // is more space efficient than storing LogEvent.
+    std::unique_ptr<android::util::ProtoOutputStream> mProto;
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 1f6bd58b..55d84e0 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -91,8 +91,6 @@
                                              metric.bucket().bucket_size_millis());
     }
 
-    startNewProtoOutputStreamLocked(mStartTimeNs);
-
     VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
 }
@@ -104,23 +102,15 @@
     }
 }
 
-void GaugeMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
-    mProto = std::make_unique<ProtoOutputStream>();
-    mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
-    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
-}
-
-void GaugeMetricProducer::finish() {
-}
-
-std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReportLocked() {
+void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+                                             ProtoOutputStream* protoOutput) {
     VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
 
-    // Dump current bucket if it's stale.
-    // If current bucket is still on-going, don't force dump current bucket.
-    // In finish(), We can force dump current bucket.
-    flushIfNeededLocked(time(nullptr) * NS_PER_SEC);
+    flushIfNeededLocked(dumpTimeNs);
+
+    protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+    long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
 
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
@@ -132,51 +122,45 @@
 
         VLOG("  dimension key %s", hashableKey.c_str());
         long long wrapperToken =
-                mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
 
         // First fill dimension (KeyValuePairs).
         for (const auto& kv : it->second) {
-            long long dimensionToken =
-                    mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
-            mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+            long long dimensionToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
+            protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
             if (kv.has_value_str()) {
-                mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
+                protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
             } else if (kv.has_value_int()) {
-                mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+                protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
             } else if (kv.has_value_bool()) {
-                mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+                protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
             } else if (kv.has_value_float()) {
-                mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+                protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
             }
-            mProto->end(dimensionToken);
+            protoOutput->end(dimensionToken);
         }
 
         // Then fill bucket_info (GaugeBucketInfo).
         for (const auto& bucket : pair.second) {
-            long long bucketInfoToken =
-                    mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
-                          (long long)bucket.mBucketStartNs);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
-                          (long long)bucket.mBucketEndNs);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge);
-            mProto->end(bucketInfoToken);
+            long long bucketInfoToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+                               (long long)bucket.mBucketStartNs);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+                               (long long)bucket.mBucketEndNs);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge);
+            protoOutput->end(bucketInfoToken);
             VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
                  (long long)bucket.mBucketEndNs, (long long)bucket.mGauge);
         }
-        mProto->end(wrapperToken);
+        protoOutput->end(wrapperToken);
     }
-    mProto->end(mProtoToken);
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
-                  (long long)mCurrentBucketStartTimeNs);
+    protoOutput->end(protoToken);
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
 
-    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
-
-    startNewProtoOutputStreamLocked(time(nullptr) * NS_PER_SEC);
     mPastBuckets.clear();
-
-    return buffer;
-
+    mStartTimeNs = mCurrentBucketStartTimeNs;
     // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 36705b1..e4bda02 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -56,8 +56,6 @@
     // Handles when the pulled data arrives.
     void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
 
-    void finish() override;
-
     // TODO: Implement this later.
     virtual void notifyAppUpgrade(const string& apk, const int uid, const int64_t version)
             override{};
@@ -71,8 +69,8 @@
             const LogEvent& event, bool scheduledPull) override;
 
 private:
-    // TODO: Pass a timestamp as a parameter in onDumpReport.
-    std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
+    void onDumpReportLocked(const uint64_t dumpTimeNs,
+                            android::util::ProtoOutputStream* protoOutput) override;
 
     // Internal interface to handle condition change.
     void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
@@ -86,9 +84,6 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const uint64_t& eventTime);
 
-    // Util function to init/reset the proto output stream.
-    void startNewProtoOutputStreamLocked(long long timestamp);
-
     // The default bucket size for gauge metric is 1 second.
     static const uint64_t kDefaultGaugemBucketSizeNs = 1000 * 1000 * 1000;
 
diff --git a/cmds/statsd/src/metrics/MetricProducer.cpp b/cmds/statsd/src/metrics/MetricProducer.cpp
index 7e78a85..5a0a7c7f 100644
--- a/cmds/statsd/src/metrics/MetricProducer.cpp
+++ b/cmds/statsd/src/metrics/MetricProducer.cpp
@@ -64,23 +64,6 @@
                                     scheduledPull);
 }
 
-std::unique_ptr<std::vector<uint8_t>> MetricProducer::serializeProtoLocked() {
-    size_t bufferSize = mProto->size();
-
-    std::unique_ptr<std::vector<uint8_t>> buffer(new std::vector<uint8_t>(bufferSize));
-
-    size_t pos = 0;
-    auto it = mProto->data();
-    while (it.readBuffer() != NULL) {
-        size_t toRead = it.currentToRead();
-        std::memcpy(&((*buffer)[pos]), it.readBuffer(), toRead);
-        pos += toRead;
-        it.rp()->move(toRead);
-    }
-
-    return buffer;
-}
-
 }  // namespace statsd
 }  // namespace os
 }  // namespace android
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index adeb3cd..ef2ef29 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -74,16 +74,10 @@
         return mConditionSliced;
     };
 
-    // This is called when the metric collecting is done, e.g., when there is a new configuration
-    // coming. MetricProducer should do the clean up, and dump existing data to dropbox.
-    virtual void finish() = 0;
-
-    // TODO: Pass a timestamp as a parameter in onDumpReport and update all its
-    // implementations.
-    // onDumpReport returns the proto-serialized output and clears the previously stored contents.
-    std::unique_ptr<std::vector<uint8_t>> onDumpReport() {
+    // Output the metrics data to [protoOutput]. All metrics reports end with the same timestamp.
+    void onDumpReport(const uint64_t dumpTimeNs, android::util::ProtoOutputStream* protoOutput) {
         std::lock_guard<std::mutex> lock(mMutex);
-        return onDumpReportLocked();
+        return onDumpReportLocked(dumpTimeNs, protoOutput);
     }
 
     // Returns the memory in bytes currently used to store this metric's data. Does not change
@@ -110,12 +104,14 @@
 protected:
     virtual void onConditionChangedLocked(const bool condition, const uint64_t eventTime) = 0;
     virtual void onSlicedConditionMayChangeLocked(const uint64_t eventTime) = 0;
-    virtual std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() = 0;
+    virtual void onDumpReportLocked(const uint64_t dumpTimeNs,
+                                    android::util::ProtoOutputStream* protoOutput) = 0;
     virtual size_t byteSizeLocked() const = 0;
 
     const ConfigKey mConfigKey;
 
-    const uint64_t mStartTimeNs;
+    // The start time for the current in memory metrics data.
+    uint64_t mStartTimeNs;
 
     uint64_t mCurrentBucketStartTimeNs;
 
@@ -165,15 +161,7 @@
     void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event,
                                  bool scheduledPull);
 
-    std::unique_ptr<android::util::ProtoOutputStream> mProto;
-
-    long long mProtoToken;
-
-    // Read/Write mutex to make the producer thread-safe.
-    // TODO(yanglu): replace with std::shared_mutex when available in libc++.
     mutable std::mutex mMutex;
-
-    std::unique_ptr<std::vector<uint8_t>> serializeProtoLocked();
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/metrics/MetricsManager.cpp b/cmds/statsd/src/metrics/MetricsManager.cpp
index 9fdc6fa..0510fff 100644
--- a/cmds/statsd/src/metrics/MetricsManager.cpp
+++ b/cmds/statsd/src/metrics/MetricsManager.cpp
@@ -27,6 +27,11 @@
 #include "stats_util.h"
 
 #include <log/logprint.h>
+
+using android::util::FIELD_COUNT_REPEATED;
+using android::util::FIELD_TYPE_MESSAGE;
+using android::util::ProtoOutputStream;
+
 using std::make_unique;
 using std::set;
 using std::string;
@@ -37,6 +42,8 @@
 namespace os {
 namespace statsd {
 
+const int FIELD_ID_METRICS = 1;
+
 MetricsManager::MetricsManager(const ConfigKey& key, const StatsdConfig& config) : mConfigKey(key) {
     mConfigValid =
             initStatsdConfig(key, config, mTagIds, mAllAtomMatchers, mAllConditionTrackers,
@@ -65,21 +72,17 @@
     return mConfigValid;
 }
 
-void MetricsManager::finish() {
-    for (auto& metricProducer : mAllMetricProducers) {
-        metricProducer->finish();
-    }
-}
-
-vector<std::unique_ptr<vector<uint8_t>>> MetricsManager::onDumpReport() {
+void MetricsManager::onDumpReport(ProtoOutputStream* protoOutput) {
     VLOG("=========================Metric Reports Start==========================");
+    uint64_t dumpTimeStampNs = time(nullptr) * NS_PER_SEC;
     // one StatsLogReport per MetricProduer
-    vector<std::unique_ptr<vector<uint8_t>>> reportList;
     for (auto& metric : mAllMetricProducers) {
-        reportList.push_back(metric->onDumpReport());
+        long long token =
+                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_METRICS);
+        metric->onDumpReport(dumpTimeStampNs, protoOutput);
+        protoOutput->end(token);
     }
     VLOG("=========================Metric Reports End==========================");
-    return reportList;
 }
 
 // Consume the stats log if it's interesting to this metric.
diff --git a/cmds/statsd/src/metrics/MetricsManager.h b/cmds/statsd/src/metrics/MetricsManager.h
index 86c4733..34ea667 100644
--- a/cmds/statsd/src/metrics/MetricsManager.h
+++ b/cmds/statsd/src/metrics/MetricsManager.h
@@ -43,16 +43,13 @@
 
     void onLogEvent(const LogEvent& event);
 
-    // Called when everything should wrap up. We are about to finish (e.g., new config comes).
-    void finish();
-
     void onAnomalyAlarmFired(const uint64_t timestampNs,
                          unordered_set<sp<const AnomalyAlarm>, SpHash<AnomalyAlarm>>& anomalySet);
 
     void setAnomalyMonitor(const sp<AnomalyMonitor>& anomalyMonitor);
 
     // Config source owner can call onDumpReport() to get all the metrics collected.
-    virtual std::vector<std::unique_ptr<std::vector<uint8_t>>> onDumpReport();
+    virtual void onDumpReport(android::util::ProtoOutputStream* protoOutput);
 
     // Computes the total byte size of all metrics managed by a single config source.
     // Does not change the state.
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 977aa88..721d02c 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -97,9 +97,6 @@
         mStatsPullerManager->RegisterReceiver(mPullTagId, this,
                                               metric.bucket().bucket_size_millis());
     }
-
-    startNewProtoOutputStreamLocked(mStartTimeNs);
-
     VLOG("value metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
          (long long)mBucketSizeNs, (long long)mStartTimeNs);
 }
@@ -120,24 +117,17 @@
     }
 }
 
-void ValueMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
-    mProto = std::make_unique<ProtoOutputStream>();
-    mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
-    mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
-}
-
-void ValueMetricProducer::finish() {
-    // TODO: write the StatsLogReport to dropbox using
-    // DropboxWriter.
-}
-
 void ValueMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
 }
 
-std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReportLocked() {
+void ValueMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
+                                             ProtoOutputStream* protoOutput) {
     VLOG("metric %s dump report now...", mMetric.name().c_str());
+    flushIfNeededLocked(dumpTimeNs);
+    protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, (long long)mStartTimeNs);
+    long long protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
 
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
@@ -148,52 +138,46 @@
             continue;
         }
         long long wrapperToken =
-                mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
+                protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
 
         // First fill dimension (KeyValuePairs).
         for (const auto& kv : it->second) {
-            long long dimensionToken =
-                    mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
-            mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
+            long long dimensionToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
+            protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
             if (kv.has_value_str()) {
-                mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
+                protoOutput->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
             } else if (kv.has_value_int()) {
-                mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
+                protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
             } else if (kv.has_value_bool()) {
-                mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
+                protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
             } else if (kv.has_value_float()) {
-                mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
+                protoOutput->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
             }
-            mProto->end(dimensionToken);
+            protoOutput->end(dimensionToken);
         }
 
         // Then fill bucket_info (ValueBucketInfo).
         for (const auto& bucket : pair.second) {
-            long long bucketInfoToken =
-                    mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
-                          (long long)bucket.mBucketStartNs);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
-                          (long long)bucket.mBucketEndNs);
-            mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue);
-            mProto->end(bucketInfoToken);
+            long long bucketInfoToken = protoOutput->start(
+                    FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
+                               (long long)bucket.mBucketStartNs);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
+                               (long long)bucket.mBucketEndNs);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue);
+            protoOutput->end(bucketInfoToken);
             VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
                  (long long)bucket.mBucketEndNs, (long long)bucket.mValue);
         }
-        mProto->end(wrapperToken);
+        protoOutput->end(wrapperToken);
     }
-    mProto->end(mProtoToken);
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
-                  (long long)mCurrentBucketStartTimeNs);
+    protoOutput->end(protoToken);
+    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)dumpTimeNs);
 
     VLOG("metric %s dump report now...", mMetric.name().c_str());
-    std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
-
-    startNewProtoOutputStreamLocked(time(nullptr) * NS_PER_SEC);
     mPastBuckets.clear();
-
-    return buffer;
-
+    mStartTimeNs = mCurrentBucketStartTimeNs;
     // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index a2efd3f..8d60ff6 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -44,8 +44,6 @@
 
     virtual ~ValueMetricProducer();
 
-    void finish() override;
-
     void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
 
     // TODO: Implement this later.
@@ -61,8 +59,8 @@
             const LogEvent& event, bool scheduledPull) override;
 
 private:
-    // TODO: Pass a timestamp as a parameter in onDumpReport.
-    std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
+    void onDumpReportLocked(const uint64_t dumpTimeNs,
+                            android::util::ProtoOutputStream* protoOutput) override;
 
     // Internal interface to handle condition change.
     void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
@@ -76,9 +74,6 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const uint64_t& eventTime);
 
-    // Util function to init/reset the proto output stream.
-    void startNewProtoOutputStreamLocked(long long timestamp);
-
     const ValueMetric mMetric;
 
     std::shared_ptr<StatsPullerManager> mStatsPullerManager;