Thread-safe metric producers.

Test: unit test passed
Change-Id: Ie47404e8649b63ee8ac32e40189a47f6cb7a9def
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 1791654..ed4c760 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -102,6 +102,7 @@
 }
 
 void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mProto = std::make_unique<ProtoOutputStream>();
     mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,14 +112,8 @@
 void GaugeMetricProducer::finish() {
 }
 
-std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
-    VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
-
-    // Dump current bucket if it's stale.
-    // If current bucket is still on-going, don't force dump current bucket.
-    // In finish(), We can force dump current bucket.
-    flushIfNeeded(time(nullptr) * NS_PER_SEC);
-
+void GaugeMetricProducer::SerializeBuckets() {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
         auto it = mDimensionKeyMap.find(hashableKey);
@@ -166,50 +161,69 @@
     mProto->end(mProtoToken);
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                   (long long)mCurrentBucketStartTimeNs);
+    mPastBuckets.clear();
+}
+
+std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
+    VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
+
+    // Dump current bucket if it's stale.
+    // If current bucket is still on-going, don't force dump current bucket.
+    // In finish(), We can force dump current bucket.
+    flushIfNeeded(time(nullptr) * NS_PER_SEC);
+
+    SerializeBuckets();
 
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
 
     startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
-    mPastBuckets.clear();
-
     return buffer;
 
     // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
 void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
-    AutoMutex _l(mLock);
     VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
-    flushIfNeeded(eventTime);
-    mCondition = conditionMet;
 
-    // Push mode. No need to proactively pull the gauge data.
-    if (mPullTagId == -1) {
-        return;
-    }
-    if (!mCondition) {
-        return;
-    }
-    // Already have gauge metric for the current bucket, do not do it again.
-    if (mCurrentSlicedBucket->size() > 0) {
-        return;
-    }
+    // flushIfNeeded holds the write lock and is thread-safe.
+    flushIfNeeded(eventTime);
+
     vector<std::shared_ptr<LogEvent>> allData;
-    if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
-        ALOGE("Stats puller failed for tag: %d", mPullTagId);
-        return;
+    // The following section is to update the condition and re-pull the gauge.
+    // TODO(yanglu): make it a seperate lockable function.
+    {
+        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
+        mCondition = conditionMet;
+
+        // Push mode. No need to proactively pull the gauge data.
+        if (mPullTagId == -1) {
+            return;
+        }
+        if (!mCondition) {
+            return;
+        }
+        // Already have gauge metric for the current bucket, do not do it again.
+        if (mCurrentSlicedBucket->size() > 0) {
+            return;
+        }
+        if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
+            ALOGE("Stats puller failed for tag: %d", mPullTagId);
+            return;
+        }
     }
+
+    // onMatchedLogEventInternal holds the write lock and is thread-safe.
     for (const auto& data : allData) {
         onMatchedLogEvent(0, *data, false /*scheduledPull*/);
     }
-    flushIfNeeded(eventTime);
 }
 
 void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
 }
 
-int64_t GaugeMetricProducer::getGauge(const LogEvent& event) {
+int64_t GaugeMetricProducer::getGauge(const LogEvent& event) const {
     status_t err = NO_ERROR;
     int64_t val = event.GetLong(mMetric.gauge_field(), &err);
     if (err == NO_ERROR) {
@@ -221,13 +235,14 @@
 }
 
 void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
-    AutoMutex mutex(mLock);
+    // onMatchedLogEventInternal holds the write lock and is thread-safe.
     for (const auto& data : allData) {
         onMatchedLogEvent(0, *data, true /*scheduledPull*/);
     }
 }
 
 bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
         return false;
     }
@@ -251,32 +266,32 @@
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const map<string, HashableDimensionKey>& conditionKey, bool condition,
         const LogEvent& event, bool scheduledPull) {
+    uint64_t eventTimeNs = event.GetTimestampNs();
+    flushIfNeeded(eventTimeNs);
+
     if (condition == false) {
         return;
     }
-    uint64_t eventTimeNs = event.GetTimestampNs();
+    const long gauge = getGauge(event);
+    if (gauge < 0) {
+        return;
+    }
+    if (hitGuardRail(eventKey)) {
+        return;
+    }
+
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (eventTimeNs < mCurrentBucketStartTimeNs) {
         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
              (long long)mCurrentBucketStartTimeNs);
         return;
     }
 
-    // When the event happens in a new bucket, flush the old buckets.
-    if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) {
-        flushIfNeeded(eventTimeNs);
-    }
-
     // For gauge metric, we just simply use the first gauge in the given bucket.
     if (!mCurrentSlicedBucket->empty()) {
         return;
     }
-    const long gauge = getGauge(event);
-    if (gauge >= 0) {
-        if (hitGuardRail(eventKey)) {
-            return;
-        }
         (*mCurrentSlicedBucket)[eventKey] = gauge;
-    }
     for (auto& tracker : mAnomalyTrackers) {
         tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge);
     }
@@ -288,6 +303,7 @@
 // if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
 // the GaugeMetricProducer while holding the lock.
 void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
         return;
     }
@@ -321,6 +337,7 @@
 }
 
 size_t GaugeMetricProducer::byteSize() const {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     size_t totalSize = 0;
     for (const auto& pair : mPastBuckets) {
         totalSize += pair.second.size() * kBucketSize;