Thread-safe metric producers.

Test: unit test passed
Change-Id: Ie47404e8649b63ee8ac32e40189a47f6cb7a9def
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index ce60eb9..4eb121c 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -96,6 +96,8 @@
 }
 
 void CountMetricProducer::startNewProtoOutputStream(long long startTime) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
     mProto = std::make_unique<ProtoOutputStream>();
     mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -109,13 +111,8 @@
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
 }
 
-std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
-    long long endTime = time(nullptr) * NS_PER_SEC;
-
-    // Dump current bucket if it's stale.
-    // If current bucket is still on-going, don't force dump current bucket.
-    // In finish(), We can force dump current bucket.
-    flushIfNeeded(endTime);
+void CountMetricProducer::serializeBuckets() {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     VLOG("metric %s dump report now...", mMetric.name().c_str());
 
     for (const auto& counter : mPastBuckets) {
@@ -161,28 +158,40 @@
         }
         mProto->end(wrapperToken);
     }
-
     mProto->end(mProtoToken);
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                   (long long)mCurrentBucketStartTimeNs);
 
+    mPastBuckets.clear();
+    // TODO: Clear mDimensionKeyMap once the report is dumped.
+}
+
+std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
+    long long endTime = time(nullptr) * NS_PER_SEC;
     VLOG("metric %s dump report now...", mMetric.name().c_str());
+    // Dump current bucket if it's stale.
+    // If current bucket is still on-going, don't force dump current bucket.
+    // In finish(), We can force dump current bucket.
+    flushIfNeeded(endTime);
+
+    // TODO(yanglu): merge these three functions to one to avoid three locks.
+    serializeBuckets();
+
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
 
     startNewProtoOutputStream(endTime);
-    mPastBuckets.clear();
 
     return buffer;
-
-    // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
 void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
     VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mCondition = conditionMet;
 }
 
 bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) {
         return false;
     }
@@ -210,38 +219,40 @@
 
     flushIfNeeded(eventTimeNs);
 
-    if (condition == false) {
+    // ===========GuardRail==============
+    if (hitGuardRail(eventKey)) {
         return;
     }
 
-    auto it = mCurrentSlicedCounter->find(eventKey);
-
-    if (it == mCurrentSlicedCounter->end()) {
-        // ===========GuardRail==============
-        if (hitGuardRail(eventKey)) {
+    // TODO(yanglu): move the following logic to a seperate function to make it lockable.
+    {
+        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+        if (condition == false) {
             return;
         }
 
-        // create a counter for the new key
-        (*mCurrentSlicedCounter)[eventKey] = 1;
-    } else {
-        // increment the existing value
-        auto& count = it->second;
-        count++;
+        auto it = mCurrentSlicedCounter->find(eventKey);
+        if (it == mCurrentSlicedCounter->end()) {
+            // create a counter for the new key
+            mCurrentSlicedCounter->insert({eventKey, 1});
+        } else {
+            // increment the existing value
+            auto& count = it->second;
+            count++;
+        }
+        const int64_t& count = mCurrentSlicedCounter->find(eventKey)->second;
+        for (auto& tracker : mAnomalyTrackers) {
+            tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, count);
+        }
+        VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(count));
     }
-
-    for (auto& tracker : mAnomalyTrackers) {
-        tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
-                                         mCurrentSlicedCounter->find(eventKey)->second);
-    }
-
-    VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(),
-         (long long)(*mCurrentSlicedCounter)[eventKey]);
 }
 
 // When a new matched event comes in, we check if event falls into the current
 // bucket. If not, flush the old counter to past buckets and initialize the new bucket.
 void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
     if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
         return;
     }
@@ -275,6 +286,7 @@
 // greater than actual data size as it contains each dimension of
 // CountMetricData is  duplicated.
 size_t CountMetricProducer::byteSize() const {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     size_t totalSize = 0;
     for (const auto& pair : mPastBuckets) {
         totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h
index f78a199..164dfb2 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.h
+++ b/cmds/statsd/src/metrics/CountMetricProducer.h
@@ -75,6 +75,8 @@
     void startNewProtoOutputStream(long long timestamp) override;
 
 private:
+    void serializeBuckets();
+
     const CountMetric mMetric;
 
     // TODO: Add a lock to mPastBuckets.
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index a0374c0..3b49b9a 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -104,6 +104,7 @@
 }
 
 void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mProto = std::make_unique<ProtoOutputStream>();
     mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,7 +112,7 @@
 }
 
 unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
-        const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) {
+        const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const {
     switch (mMetric.aggregation_type()) {
         case DurationMetric_AggregationType_SUM:
             return make_unique<OringDurationTracker>(
@@ -130,6 +131,7 @@
 }
 
 void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
     flushIfNeeded(eventTime);
     // Now for each of the on-going event, check if the condition has changed for them.
@@ -139,6 +141,7 @@
 }
 
 void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
     mCondition = conditionMet;
     flushIfNeeded(eventTime);
@@ -149,15 +152,8 @@
     }
 }
 
-std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
-    long long endTime = time(nullptr) * NS_PER_SEC;
-
-    // Dump current bucket if it's stale.
-    // If current bucket is still on-going, don't force dump current bucket.
-    // In finish(), We can force dump current bucket.
-    flushIfNeeded(endTime);
-    VLOG("metric %s dump report now...", mMetric.name().c_str());
-
+void DurationMetricProducer::SerializeBuckets() {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
         VLOG("  dimension key %s", hashableKey.c_str());
@@ -214,13 +210,29 @@
     mProto->end(mProtoToken);
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                   (long long)mCurrentBucketStartTimeNs);
+}
+
+std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
+    VLOG("metric %s dump report now...", mMetric.name().c_str());
+
+    long long endTime = time(nullptr) * NS_PER_SEC;
+
+    // Dump current bucket if it's stale.
+    // If current bucket is still on-going, don't force dump current bucket.
+    // In finish(), We can force dump current bucket.
+    flushIfNeeded(endTime);
+
+    SerializeBuckets();
+
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
+
     startNewProtoOutputStream(endTime);
     // TODO: Properly clear the old buckets.
     return buffer;
 }
 
 void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
         return;
     }
@@ -240,6 +252,7 @@
 }
 
 bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     // the key is not new, we are good.
     if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) {
         return false;
@@ -265,32 +278,37 @@
         const LogEvent& event, bool scheduledPull) {
     flushIfNeeded(event.GetTimestampNs());
 
-    if (matcherIndex == mStopAllIndex) {
-        for (auto& pair : mCurrentSlicedDuration) {
-            pair.second->noteStopAll(event.GetTimestampNs());
-        }
-        return;
-    }
-
-    HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
-
-    if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
-        if (hitGuardRail(eventKey)) {
+    // TODO(yanglu): move the following logic to a seperate function to make it lockable.
+    {
+        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+        if (matcherIndex == mStopAllIndex) {
+            for (auto& pair : mCurrentSlicedDuration) {
+                pair.second->noteStopAll(event.GetTimestampNs());
+            }
             return;
         }
-        mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]);
-    }
 
-    auto it = mCurrentSlicedDuration.find(eventKey);
+        HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
 
-    if (matcherIndex == mStartIndex) {
-        it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
-    } else if (matcherIndex == mStopIndex) {
-        it->second->noteStop(atomKey, event.GetTimestampNs(), false);
+        if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
+            if (hitGuardRail(eventKey)) {
+                return;
+            }
+            mCurrentSlicedDuration[eventKey] =
+                    createDurationTracker(eventKey, mPastBuckets[eventKey]);
+        }
+        auto it = mCurrentSlicedDuration.find(eventKey);
+
+        if (matcherIndex == mStartIndex) {
+            it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
+        } else if (matcherIndex == mStopIndex) {
+            it->second->noteStop(atomKey, event.GetTimestampNs(), false);
+        }
     }
 }
 
 size_t DurationMetricProducer::byteSize() const {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     size_t totalSize = 0;
     for (const auto& pair : mPastBuckets) {
         totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h
index 5b5373e..68fff48 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.h
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.h
@@ -71,6 +71,8 @@
     void startNewProtoOutputStream(long long timestamp) override;
 
 private:
+    void SerializeBuckets();
+
     const DurationMetric mMetric;
 
     // Index of the SimpleLogEntryMatcher which defines the start.
@@ -96,8 +98,8 @@
     std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>>
             mCurrentSlicedDuration;
 
-    std::unique_ptr<DurationTracker> createDurationTracker(const HashableDimensionKey& eventKey,
-                                                           std::vector<DurationBucket>& bucket);
+    std::unique_ptr<DurationTracker> createDurationTracker(
+            const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const;
     bool hitGuardRail(const HashableDimensionKey& newKey);
 
     static const size_t kBucketSize = sizeof(DurationBucket{});
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 95a18f7..53f112a 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -73,6 +73,7 @@
 }
 
 void EventMetricProducer::startNewProtoOutputStream(long long startTime) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mProto = std::make_unique<ProtoOutputStream>();
     // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData,
     // and StatsEvent.
@@ -89,11 +90,16 @@
 
 std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() {
     long long endTime = time(nullptr) * NS_PER_SEC;
-    mProto->end(mProtoToken);
-    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
+    // TODO(yanglu): make this section to an util function.
+    {
+        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+        mProto->end(mProtoToken);
+        mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
 
-    size_t bufferSize = mProto->size();
-    VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
+        size_t bufferSize = mProto->size();
+        VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
+    }
+
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
 
     startNewProtoOutputStream(endTime);
@@ -103,6 +109,7 @@
 
 void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
     VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mCondition = conditionMet;
 }
 
@@ -110,6 +117,7 @@
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
         const LogEvent& event, bool scheduledPull) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (!condition) {
         return;
     }
@@ -124,6 +132,7 @@
 }
 
 size_t EventMetricProducer::byteSize() const {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     return mProto->bytesWritten();
 }
 
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 1791654..ed4c760 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -102,6 +102,7 @@
 }
 
 void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mProto = std::make_unique<ProtoOutputStream>();
     mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,14 +112,8 @@
 void GaugeMetricProducer::finish() {
 }
 
-std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
-    VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
-
-    // Dump current bucket if it's stale.
-    // If current bucket is still on-going, don't force dump current bucket.
-    // In finish(), We can force dump current bucket.
-    flushIfNeeded(time(nullptr) * NS_PER_SEC);
-
+void GaugeMetricProducer::SerializeBuckets() {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
         auto it = mDimensionKeyMap.find(hashableKey);
@@ -166,50 +161,69 @@
     mProto->end(mProtoToken);
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                   (long long)mCurrentBucketStartTimeNs);
+    mPastBuckets.clear();
+}
+
+std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
+    VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
+
+    // Dump current bucket if it's stale.
+    // If current bucket is still on-going, don't force dump current bucket.
+    // In finish(), We can force dump current bucket.
+    flushIfNeeded(time(nullptr) * NS_PER_SEC);
+
+    SerializeBuckets();
 
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
 
     startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
-    mPastBuckets.clear();
-
     return buffer;
 
     // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
 void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
-    AutoMutex _l(mLock);
     VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
-    flushIfNeeded(eventTime);
-    mCondition = conditionMet;
 
-    // Push mode. No need to proactively pull the gauge data.
-    if (mPullTagId == -1) {
-        return;
-    }
-    if (!mCondition) {
-        return;
-    }
-    // Already have gauge metric for the current bucket, do not do it again.
-    if (mCurrentSlicedBucket->size() > 0) {
-        return;
-    }
+    // flushIfNeeded holds the write lock and is thread-safe.
+    flushIfNeeded(eventTime);
+
     vector<std::shared_ptr<LogEvent>> allData;
-    if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
-        ALOGE("Stats puller failed for tag: %d", mPullTagId);
-        return;
+    // The following section is to update the condition and re-pull the gauge.
+    // TODO(yanglu): make it a seperate lockable function.
+    {
+        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
+        mCondition = conditionMet;
+
+        // Push mode. No need to proactively pull the gauge data.
+        if (mPullTagId == -1) {
+            return;
+        }
+        if (!mCondition) {
+            return;
+        }
+        // Already have gauge metric for the current bucket, do not do it again.
+        if (mCurrentSlicedBucket->size() > 0) {
+            return;
+        }
+        if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
+            ALOGE("Stats puller failed for tag: %d", mPullTagId);
+            return;
+        }
     }
+
+    // onMatchedLogEventInternal holds the write lock and is thread-safe.
     for (const auto& data : allData) {
         onMatchedLogEvent(0, *data, false /*scheduledPull*/);
     }
-    flushIfNeeded(eventTime);
 }
 
 void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
 }
 
-int64_t GaugeMetricProducer::getGauge(const LogEvent& event) {
+int64_t GaugeMetricProducer::getGauge(const LogEvent& event) const {
     status_t err = NO_ERROR;
     int64_t val = event.GetLong(mMetric.gauge_field(), &err);
     if (err == NO_ERROR) {
@@ -221,13 +235,14 @@
 }
 
 void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
-    AutoMutex mutex(mLock);
+    // onMatchedLogEventInternal holds the write lock and is thread-safe.
     for (const auto& data : allData) {
         onMatchedLogEvent(0, *data, true /*scheduledPull*/);
     }
 }
 
 bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
         return false;
     }
@@ -251,32 +266,32 @@
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const map<string, HashableDimensionKey>& conditionKey, bool condition,
         const LogEvent& event, bool scheduledPull) {
+    uint64_t eventTimeNs = event.GetTimestampNs();
+    flushIfNeeded(eventTimeNs);
+
     if (condition == false) {
         return;
     }
-    uint64_t eventTimeNs = event.GetTimestampNs();
+    const long gauge = getGauge(event);
+    if (gauge < 0) {
+        return;
+    }
+    if (hitGuardRail(eventKey)) {
+        return;
+    }
+
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (eventTimeNs < mCurrentBucketStartTimeNs) {
         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
              (long long)mCurrentBucketStartTimeNs);
         return;
     }
 
-    // When the event happens in a new bucket, flush the old buckets.
-    if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) {
-        flushIfNeeded(eventTimeNs);
-    }
-
     // For gauge metric, we just simply use the first gauge in the given bucket.
     if (!mCurrentSlicedBucket->empty()) {
         return;
     }
-    const long gauge = getGauge(event);
-    if (gauge >= 0) {
-        if (hitGuardRail(eventKey)) {
-            return;
-        }
         (*mCurrentSlicedBucket)[eventKey] = gauge;
-    }
     for (auto& tracker : mAnomalyTrackers) {
         tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge);
     }
@@ -288,6 +303,7 @@
 // if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
 // the GaugeMetricProducer while holding the lock.
 void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
         return;
     }
@@ -321,6 +337,7 @@
 }
 
 size_t GaugeMetricProducer::byteSize() const {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     size_t totalSize = 0;
     for (const auto& pair : mPastBuckets) {
         totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index f344303..8df6111 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -81,6 +81,8 @@
     void startNewProtoOutputStream(long long timestamp) override;
 
 private:
+    void SerializeBuckets();
+
     // The default bucket size for gauge metric is 1 second.
     static const uint64_t kDefaultGaugemBucketSizeNs = 1000 * 1000 * 1000;
     const GaugeMetric mMetric;
@@ -89,8 +91,6 @@
     // tagId for pulled data. -1 if this is not pulled
     const int mPullTagId;
 
-    Mutex mLock;
-
     // Save the past buckets and we can clear when the StatsLogReport is dumped.
     // TODO: Add a lock to mPastBuckets.
     std::unordered_map<HashableDimensionKey, std::vector<GaugeBucket>> mPastBuckets;
@@ -98,7 +98,7 @@
     // The current bucket.
     std::shared_ptr<DimToValMap> mCurrentSlicedBucket = std::make_shared<DimToValMap>();
 
-    int64_t getGauge(const LogEvent& event);
+    int64_t getGauge(const LogEvent& event) const;
 
     bool hitGuardRail(const HashableDimensionKey& newKey);
 
diff --git a/cmds/statsd/src/metrics/MetricProducer.cpp b/cmds/statsd/src/metrics/MetricProducer.cpp
index 62fb632..7542a94 100644
--- a/cmds/statsd/src/metrics/MetricProducer.cpp
+++ b/cmds/statsd/src/metrics/MetricProducer.cpp
@@ -23,6 +23,7 @@
 
 void MetricProducer::onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event,
                                        bool scheduledPull) {
+    std::unique_lock<std::shared_timed_mutex> writeLock(mRWMutex);
     uint64_t eventTimeNs = event.GetTimestampNs();
     // this is old event, maybe statsd restarted?
     if (eventTimeNs < mStartTimeNs) {
@@ -59,12 +60,16 @@
     } else {
         condition = mCondition;
     }
+    // Unlock as onMatchedLogEventInternal is threadsafe.
+    writeLock.unlock();
 
     onMatchedLogEventInternal(matcherIndex, eventKey, conditionKeys, condition, event,
                               scheduledPull);
 }
 
 std::unique_ptr<std::vector<uint8_t>> MetricProducer::serializeProto() {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
     size_t bufferSize = mProto->size();
 
     std::unique_ptr<std::vector<uint8_t>> buffer(new std::vector<uint8_t>(bufferSize));
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index b22ff6f..27343ad 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -17,6 +17,8 @@
 #ifndef METRIC_PRODUCER_H
 #define METRIC_PRODUCER_H
 
+#include <shared_mutex>
+
 #include "anomaly/AnomalyTracker.h"
 #include "condition/ConditionWizard.h"
 #include "config/ConfigKey.h"
@@ -137,6 +139,10 @@
 
     long long mProtoToken;
 
+    // Read/Write mutex to make the producer thread-safe.
+    // TODO(yanglu): replace with std::shared_mutex when available in libc++.
+    mutable std::shared_timed_mutex mRWMutex;
+
     virtual void startNewProtoOutputStream(long long timestamp) = 0;
 
     std::unique_ptr<std::vector<uint8_t>> serializeProto();
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 66c8419..eed7841 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -121,6 +121,7 @@
 }
 
 void ValueMetricProducer::startNewProtoOutputStream(long long startTime) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mProto = std::make_unique<ProtoOutputStream>();
     mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -136,9 +137,8 @@
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
 }
 
-std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
-    VLOG("metric %s dump report now...", mMetric.name().c_str());
-
+void ValueMetricProducer::SerializeBuckets() {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
         VLOG("  dimension key %s", hashableKey.c_str());
@@ -185,47 +185,63 @@
     mProto->end(mProtoToken);
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                   (long long)mCurrentBucketStartTimeNs);
+    mPastBuckets.clear();
+}
 
+std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
     VLOG("metric %s dump report now...", mMetric.name().c_str());
+
+    SerializeBuckets();
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
 
     startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
-    mPastBuckets.clear();
 
     return buffer;
-
     // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
 void ValueMetricProducer::onConditionChanged(const bool condition, const uint64_t eventTime) {
-    AutoMutex _l(mLock);
-    mCondition = condition;
+    vector<shared_ptr<LogEvent>> allData;
 
-    if (mPullTagId != -1) {
+    // TODO(yanglu): move the following logic to a seperate function to make it lockable.
+    {
+        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+        mCondition = condition;
+
+        if (mPullTagId == -1) {
+            return;
+        }
+
         if (mCondition == true) {
             mStatsPullerManager->RegisterReceiver(mPullTagId, this,
                                                   mMetric.bucket().bucket_size_millis());
         } else if (mCondition == false) {
             mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
         }
-
-        vector<shared_ptr<LogEvent>> allData;
-        if (mStatsPullerManager->Pull(mPullTagId, &allData)) {
-            if (allData.size() == 0) {
-                return;
-            }
-            for (const auto& data : allData) {
-                onMatchedLogEvent(0, *data, false);
-            }
-            flushIfNeeded(eventTime);
+        if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
+            return;
         }
+    }
+
+    if (allData.size() == 0) {
         return;
     }
+
+    // onMatchedLogEventInternal holds the write lock and is thread-safe.
+    for (const auto& data : allData) {
+        onMatchedLogEvent(0, *data, false);
+    }
+    // flushIfNeeded holds the write lock and is thread-safe.
+    flushIfNeeded(eventTime);
+}
+
+bool ValueMetricProducer::IsConditionMet() const {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
+    return mCondition == true || !mMetric.has_condition();
 }
 
 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
-    AutoMutex _l(mLock);
-    if (mCondition == true || !mMetric.has_condition()) {
+    if (IsConditionMet()) {
         if (allData.size() == 0) {
             return;
         }
@@ -242,6 +258,7 @@
 }
 
 bool ValueMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     // ===========GuardRail==============
     // 1. Report the tuple count if the tuple count > soft limit
     if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
@@ -262,58 +279,75 @@
     return false;
 }
 
-void ValueMetricProducer::onMatchedLogEventInternal(
-        const size_t matcherIndex, const HashableDimensionKey& eventKey,
-        const map<string, HashableDimensionKey>& conditionKey, bool condition,
-        const LogEvent& event, bool scheduledPull) {
-    uint64_t eventTimeNs = event.GetTimestampNs();
+void ValueMetricProducer::onMatchedLogEventInternal_pull(const uint64_t& eventTimeNs,
+                                                         const HashableDimensionKey& eventKey,
+                                                         const long& value, bool scheduledPull) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
     if (eventTimeNs < mCurrentBucketStartTimeNs) {
         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
              (long long)mCurrentBucketStartTimeNs);
         return;
     }
-
-    if (hitGuardRail(eventKey)) {
-        return;
-    }
     Interval& interval = mCurrentSlicedBucket[eventKey];
-
-    long value = get_value(event);
-
-    if (mPullTagId != -1) {
-        if (scheduledPull) {
-            // scheduled pull always sets beginning of current bucket and end
-            // of next bucket
-            if (interval.raw.size() > 0) {
-                interval.raw.back().second = value;
-            } else {
-                interval.raw.push_back(make_pair(value, value));
-            }
-            Interval& nextInterval = mNextSlicedBucket[eventKey];
-            if (nextInterval.raw.size() == 0) {
-                nextInterval.raw.push_back(make_pair(value, 0));
-            } else {
-                nextInterval.raw.front().first = value;
-            }
+    if (scheduledPull) {
+        // scheduled pull always sets beginning of current bucket and end
+        // of next bucket
+        if (interval.raw.size() > 0) {
+            interval.raw.back().second = value;
         } else {
-            if (mCondition == true) {
-                interval.raw.push_back(make_pair(value, 0));
-            } else {
-                if (interval.raw.size() != 0) {
-                    interval.raw.back().second = value;
-                } else {
-                    interval.tainted = true;
-                    VLOG("Data on condition true missing!");
-                }
-            }
+            interval.raw.push_back(make_pair(value, value));
+        }
+        Interval& nextInterval = mNextSlicedBucket[eventKey];
+        if (nextInterval.raw.size() == 0) {
+            nextInterval.raw.push_back(make_pair(value, 0));
+        } else {
+            nextInterval.raw.front().first = value;
         }
     } else {
-        flushIfNeeded(eventTimeNs);
-        interval.raw.push_back(make_pair(value, 0));
+        if (mCondition == true) {
+            interval.raw.push_back(make_pair(value, 0));
+        } else {
+            if (interval.raw.size() != 0) {
+                interval.raw.back().second = value;
+            } else {
+                interval.tainted = true;
+                VLOG("Data on condition true missing!");
+            }
+        }
     }
 }
 
-long ValueMetricProducer::get_value(const LogEvent& event) {
+void ValueMetricProducer::onMatchedLogEventInternal_push(const uint64_t& eventTimeNs,
+                                                         const HashableDimensionKey& eventKey,
+                                                         const long& value) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+    if (eventTimeNs < mCurrentBucketStartTimeNs) {
+        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
+             (long long)mCurrentBucketStartTimeNs);
+        return;
+    }
+    mCurrentSlicedBucket[eventKey].raw.push_back(make_pair(value, 0));
+}
+
+void ValueMetricProducer::onMatchedLogEventInternal(
+        const size_t matcherIndex, const HashableDimensionKey& eventKey,
+        const map<string, HashableDimensionKey>& conditionKey, bool condition,
+        const LogEvent& event, bool scheduledPull) {
+    uint64_t eventTimeNs = event.GetTimestampNs();
+    long value = get_value(event);
+    if (hitGuardRail(eventKey)) {
+        return;
+    }
+    if (mPullTagId != -1) {
+        onMatchedLogEventInternal_pull(eventTimeNs, eventKey, value, scheduledPull);
+    } else {
+        flushIfNeeded(eventTimeNs);
+        onMatchedLogEventInternal_push(eventTimeNs, eventKey, value);
+    }
+}
+
+long ValueMetricProducer::get_value(const LogEvent& event) const {
     status_t err = NO_ERROR;
     long val = event.GetLong(mMetric.value_field(), &err);
     if (err == NO_ERROR) {
@@ -325,6 +359,7 @@
 }
 
 void ValueMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
+    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTimeNs) {
         VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
              (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
@@ -373,6 +408,7 @@
 }
 
 size_t ValueMetricProducer::byteSize() const {
+    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     size_t totalSize = 0;
     for (const auto& pair : mPastBuckets) {
         totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index a024bd8..e87e9da 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -72,6 +72,16 @@
     void startNewProtoOutputStream(long long timestamp) override;
 
 private:
+    void onMatchedLogEventInternal_pull(const uint64_t& eventTimeNs,
+                                        const HashableDimensionKey& eventKey, const long& value,
+                                        bool scheduledPull);
+    void onMatchedLogEventInternal_push(const uint64_t& eventTimeNs,
+                                        const HashableDimensionKey& eventKey, const long& value);
+
+    void SerializeBuckets();
+
+    bool IsConditionMet() const;
+
     const ValueMetric mMetric;
 
     std::shared_ptr<StatsPullerManager> mStatsPullerManager;
@@ -82,8 +92,6 @@
                         const int pullTagId, const uint64_t startTimeNs,
                         std::shared_ptr<StatsPullerManager> statsPullerManager);
 
-    Mutex mLock;
-
     // tagId for pulled data. -1 if this is not pulled
     const int mPullTagId;
 
@@ -102,7 +110,7 @@
     // TODO: Add a lock to mPastBuckets.
     std::unordered_map<HashableDimensionKey, std::vector<ValueBucket>> mPastBuckets;
 
-    long get_value(const LogEvent& event);
+    long get_value(const LogEvent& event) const;
 
     bool hitGuardRail(const HashableDimensionKey& newKey);