Implement a logic to rate limitted flush statsd with by checking the
size of each MetricsProducer's. The implementation of byteSize() method
is still TBD as it depends on migration to ProtoOutputStream.

Test: statsd, statsd_test
Change-Id: I966606044d7cb814dabe94192bacecad91f28177
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index 56d4e4d..cdaca1b 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -46,6 +46,7 @@
     // pass the event to metrics managers.
     for (auto& pair : mMetricsManagers) {
         pair.second->onLogEvent(msg);
+        flushIfNecessary(msg.GetTimestampNs(), pair.first, pair.second);
     }
 }
 
@@ -84,41 +85,40 @@
         it->second->finish();
         mMetricsManagers.erase(it);
     }
+    auto flushTime = mLastFlushTimes.find(key);
+    if (flushTime != mLastFlushTimes.end()) {
+        mLastFlushTimes.erase(flushTime);
+    }
 }
 
-void StatsLogProcessor::addEventMetricData(const EventMetricData& eventMetricData) {
-    // TODO: Replace this code when MetricsManager.onDumpReport() is ready to
-    // get a list of byte arrays.
-    flushIfNecessary(eventMetricData);
-    const int numBytes = eventMetricData.ByteSize();
-    char buffer[numBytes];
-    eventMetricData.SerializeToArray(&buffer[0], numBytes);
-    string bufferString(buffer, numBytes);
-    mEvents.push_back(bufferString);
-    mBufferSize += eventMetricData.ByteSize();
-}
+void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs,
+                                         const ConfigKey& key,
+                                         const unique_ptr<MetricsManager>& metricsManager) {
+    auto lastFlushNs = mLastFlushTimes.find(key);
+    if (lastFlushNs != mLastFlushTimes.end()) {
+        if (timestampNs - lastFlushNs->second < kMinFlushPeriod) {
+            return;
+        }
+    }
 
-void StatsLogProcessor::flushIfNecessary(const EventMetricData& eventMetricData) {
-    if (eventMetricData.ByteSize() + mBufferSize > kMaxSerializedBytes) {
-      flush();
+    size_t totalBytes = metricsManager->byteSize();
+    if (totalBytes > kMaxSerializedBytes) {
+        flush();
+        mLastFlushTimes[key] = std::move(timestampNs);
     }
 }
 
 void StatsLogProcessor::flush() {
+    // TODO: Take ConfigKey as an argument and flush metrics related to the
+    // ConfigKey. Also, create a wrapper that holds a repeated field of
+    // StatsLogReport's.
+    /*
     StatsLogReport logReport;
-    for (string eventBuffer : mEvents) {
-        EventMetricData eventFromBuffer;
-        eventFromBuffer.ParseFromString(eventBuffer);
-        EventMetricData* newEntry = logReport.mutable_event_metrics()->add_data();
-        newEntry->CopyFrom(eventFromBuffer);
-    }
-
     const int numBytes = logReport.ByteSize();
     vector<uint8_t> logReportBuffer(numBytes);
     logReport.SerializeToArray(&logReportBuffer[0], numBytes);
     mPushLog(logReportBuffer);
-    mEvents.clear();
-    mBufferSize = 0;
+    */
 }
 
 }  // namespace statsd