Revert "Thread-safe metric producers."

This reverts commit 8de6939c494da838f6dbbda0631f66425dbbd25b.

Change-Id: Ieae841bfc5339b569f0fca909a6066de72806617
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index 4eb121c..ce60eb9 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -96,8 +96,6 @@
 }
 
 void CountMetricProducer::startNewProtoOutputStream(long long startTime) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-
     mProto = std::make_unique<ProtoOutputStream>();
     mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,8 +109,13 @@
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
 }
 
-void CountMetricProducer::serializeBuckets() {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
+    long long endTime = time(nullptr) * NS_PER_SEC;
+
+    // Dump current bucket if it's stale.
+    // If current bucket is still on-going, don't force dump current bucket.
+    // In finish(), We can force dump current bucket.
+    flushIfNeeded(endTime);
     VLOG("metric %s dump report now...", mMetric.name().c_str());
 
     for (const auto& counter : mPastBuckets) {
@@ -158,40 +161,28 @@
         }
         mProto->end(wrapperToken);
     }
+
     mProto->end(mProtoToken);
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                   (long long)mCurrentBucketStartTimeNs);
 
-    mPastBuckets.clear();
-    // TODO: Clear mDimensionKeyMap once the report is dumped.
-}
-
-std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
-    long long endTime = time(nullptr) * NS_PER_SEC;
     VLOG("metric %s dump report now...", mMetric.name().c_str());
-    // Dump current bucket if it's stale.
-    // If current bucket is still on-going, don't force dump current bucket.
-    // In finish(), We can force dump current bucket.
-    flushIfNeeded(endTime);
-
-    // TODO(yanglu): merge these three functions to one to avoid three locks.
-    serializeBuckets();
-
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
 
     startNewProtoOutputStream(endTime);
+    mPastBuckets.clear();
 
     return buffer;
+
+    // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
 void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
     VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mCondition = conditionMet;
 }
 
 bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
-    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) {
         return false;
     }
@@ -219,40 +210,38 @@
 
     flushIfNeeded(eventTimeNs);
 
-    // ===========GuardRail==============
-    if (hitGuardRail(eventKey)) {
+    if (condition == false) {
         return;
     }
 
-    // TODO(yanglu): move the following logic to a seperate function to make it lockable.
-    {
-        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-        if (condition == false) {
+    auto it = mCurrentSlicedCounter->find(eventKey);
+
+    if (it == mCurrentSlicedCounter->end()) {
+        // ===========GuardRail==============
+        if (hitGuardRail(eventKey)) {
             return;
         }
 
-        auto it = mCurrentSlicedCounter->find(eventKey);
-        if (it == mCurrentSlicedCounter->end()) {
-            // create a counter for the new key
-            mCurrentSlicedCounter->insert({eventKey, 1});
-        } else {
-            // increment the existing value
-            auto& count = it->second;
-            count++;
-        }
-        const int64_t& count = mCurrentSlicedCounter->find(eventKey)->second;
-        for (auto& tracker : mAnomalyTrackers) {
-            tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, count);
-        }
-        VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(count));
+        // create a counter for the new key
+        (*mCurrentSlicedCounter)[eventKey] = 1;
+    } else {
+        // increment the existing value
+        auto& count = it->second;
+        count++;
     }
+
+    for (auto& tracker : mAnomalyTrackers) {
+        tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
+                                         mCurrentSlicedCounter->find(eventKey)->second);
+    }
+
+    VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(),
+         (long long)(*mCurrentSlicedCounter)[eventKey]);
 }
 
 // When a new matched event comes in, we check if event falls into the current
 // bucket. If not, flush the old counter to past buckets and initialize the new bucket.
 void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-
     if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
         return;
     }
@@ -286,7 +275,6 @@
 // greater than actual data size as it contains each dimension of
 // CountMetricData is  duplicated.
 size_t CountMetricProducer::byteSize() const {
-    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     size_t totalSize = 0;
     for (const auto& pair : mPastBuckets) {
         totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h
index 164dfb2..f78a199 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.h
+++ b/cmds/statsd/src/metrics/CountMetricProducer.h
@@ -75,8 +75,6 @@
     void startNewProtoOutputStream(long long timestamp) override;
 
 private:
-    void serializeBuckets();
-
     const CountMetric mMetric;
 
     // TODO: Add a lock to mPastBuckets.
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index 3b49b9a..a0374c0 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -104,7 +104,6 @@
 }
 
 void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mProto = std::make_unique<ProtoOutputStream>();
     mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -112,7 +111,7 @@
 }
 
 unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
-        const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const {
+        const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) {
     switch (mMetric.aggregation_type()) {
         case DurationMetric_AggregationType_SUM:
             return make_unique<OringDurationTracker>(
@@ -131,7 +130,6 @@
 }
 
 void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
     flushIfNeeded(eventTime);
     // Now for each of the on-going event, check if the condition has changed for them.
@@ -141,7 +139,6 @@
 }
 
 void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
     mCondition = conditionMet;
     flushIfNeeded(eventTime);
@@ -152,8 +149,15 @@
     }
 }
 
-void DurationMetricProducer::SerializeBuckets() {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
+    long long endTime = time(nullptr) * NS_PER_SEC;
+
+    // Dump current bucket if it's stale.
+    // If current bucket is still on-going, don't force dump current bucket.
+    // In finish(), We can force dump current bucket.
+    flushIfNeeded(endTime);
+    VLOG("metric %s dump report now...", mMetric.name().c_str());
+
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
         VLOG("  dimension key %s", hashableKey.c_str());
@@ -210,29 +214,13 @@
     mProto->end(mProtoToken);
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                   (long long)mCurrentBucketStartTimeNs);
-}
-
-std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
-    VLOG("metric %s dump report now...", mMetric.name().c_str());
-
-    long long endTime = time(nullptr) * NS_PER_SEC;
-
-    // Dump current bucket if it's stale.
-    // If current bucket is still on-going, don't force dump current bucket.
-    // In finish(), We can force dump current bucket.
-    flushIfNeeded(endTime);
-
-    SerializeBuckets();
-
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
-
     startNewProtoOutputStream(endTime);
     // TODO: Properly clear the old buckets.
     return buffer;
 }
 
 void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
         return;
     }
@@ -252,7 +240,6 @@
 }
 
 bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
-    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     // the key is not new, we are good.
     if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) {
         return false;
@@ -278,37 +265,32 @@
         const LogEvent& event, bool scheduledPull) {
     flushIfNeeded(event.GetTimestampNs());
 
-    // TODO(yanglu): move the following logic to a seperate function to make it lockable.
-    {
-        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-        if (matcherIndex == mStopAllIndex) {
-            for (auto& pair : mCurrentSlicedDuration) {
-                pair.second->noteStopAll(event.GetTimestampNs());
-            }
+    if (matcherIndex == mStopAllIndex) {
+        for (auto& pair : mCurrentSlicedDuration) {
+            pair.second->noteStopAll(event.GetTimestampNs());
+        }
+        return;
+    }
+
+    HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
+
+    if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
+        if (hitGuardRail(eventKey)) {
             return;
         }
+        mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]);
+    }
 
-        HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
+    auto it = mCurrentSlicedDuration.find(eventKey);
 
-        if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
-            if (hitGuardRail(eventKey)) {
-                return;
-            }
-            mCurrentSlicedDuration[eventKey] =
-                    createDurationTracker(eventKey, mPastBuckets[eventKey]);
-        }
-        auto it = mCurrentSlicedDuration.find(eventKey);
-
-        if (matcherIndex == mStartIndex) {
-            it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
-        } else if (matcherIndex == mStopIndex) {
-            it->second->noteStop(atomKey, event.GetTimestampNs(), false);
-        }
+    if (matcherIndex == mStartIndex) {
+        it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
+    } else if (matcherIndex == mStopIndex) {
+        it->second->noteStop(atomKey, event.GetTimestampNs(), false);
     }
 }
 
 size_t DurationMetricProducer::byteSize() const {
-    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     size_t totalSize = 0;
     for (const auto& pair : mPastBuckets) {
         totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h
index 68fff48..5b5373e 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.h
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.h
@@ -71,8 +71,6 @@
     void startNewProtoOutputStream(long long timestamp) override;
 
 private:
-    void SerializeBuckets();
-
     const DurationMetric mMetric;
 
     // Index of the SimpleLogEntryMatcher which defines the start.
@@ -98,8 +96,8 @@
     std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>>
             mCurrentSlicedDuration;
 
-    std::unique_ptr<DurationTracker> createDurationTracker(
-            const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const;
+    std::unique_ptr<DurationTracker> createDurationTracker(const HashableDimensionKey& eventKey,
+                                                           std::vector<DurationBucket>& bucket);
     bool hitGuardRail(const HashableDimensionKey& newKey);
 
     static const size_t kBucketSize = sizeof(DurationBucket{});
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 53f112a..95a18f7 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -73,7 +73,6 @@
 }
 
 void EventMetricProducer::startNewProtoOutputStream(long long startTime) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mProto = std::make_unique<ProtoOutputStream>();
     // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData,
     // and StatsEvent.
@@ -90,16 +89,11 @@
 
 std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() {
     long long endTime = time(nullptr) * NS_PER_SEC;
-    // TODO(yanglu): make this section to an util function.
-    {
-        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-        mProto->end(mProtoToken);
-        mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
+    mProto->end(mProtoToken);
+    mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
 
-        size_t bufferSize = mProto->size();
-        VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
-    }
-
+    size_t bufferSize = mProto->size();
+    VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
 
     startNewProtoOutputStream(endTime);
@@ -109,7 +103,6 @@
 
 void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
     VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mCondition = conditionMet;
 }
 
@@ -117,7 +110,6 @@
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
         const LogEvent& event, bool scheduledPull) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (!condition) {
         return;
     }
@@ -132,7 +124,6 @@
 }
 
 size_t EventMetricProducer::byteSize() const {
-    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     return mProto->bytesWritten();
 }
 
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index ed4c760..1791654 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -102,7 +102,6 @@
 }
 
 void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mProto = std::make_unique<ProtoOutputStream>();
     mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -112,8 +111,14 @@
 void GaugeMetricProducer::finish() {
 }
 
-void GaugeMetricProducer::SerializeBuckets() {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
+    VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
+
+    // Dump current bucket if it's stale.
+    // If current bucket is still on-going, don't force dump current bucket.
+    // In finish(), We can force dump current bucket.
+    flushIfNeeded(time(nullptr) * NS_PER_SEC);
+
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
         auto it = mDimensionKeyMap.find(hashableKey);
@@ -161,69 +166,50 @@
     mProto->end(mProtoToken);
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                   (long long)mCurrentBucketStartTimeNs);
-    mPastBuckets.clear();
-}
-
-std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
-    VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
-
-    // Dump current bucket if it's stale.
-    // If current bucket is still on-going, don't force dump current bucket.
-    // In finish(), We can force dump current bucket.
-    flushIfNeeded(time(nullptr) * NS_PER_SEC);
-
-    SerializeBuckets();
 
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
 
     startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
+    mPastBuckets.clear();
+
     return buffer;
 
     // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
 void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
+    AutoMutex _l(mLock);
     VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
-
-    // flushIfNeeded holds the write lock and is thread-safe.
     flushIfNeeded(eventTime);
+    mCondition = conditionMet;
 
-    vector<std::shared_ptr<LogEvent>> allData;
-    // The following section is to update the condition and re-pull the gauge.
-    // TODO(yanglu): make it a seperate lockable function.
-    {
-        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-
-        mCondition = conditionMet;
-
-        // Push mode. No need to proactively pull the gauge data.
-        if (mPullTagId == -1) {
-            return;
-        }
-        if (!mCondition) {
-            return;
-        }
-        // Already have gauge metric for the current bucket, do not do it again.
-        if (mCurrentSlicedBucket->size() > 0) {
-            return;
-        }
-        if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
-            ALOGE("Stats puller failed for tag: %d", mPullTagId);
-            return;
-        }
+    // Push mode. No need to proactively pull the gauge data.
+    if (mPullTagId == -1) {
+        return;
     }
-
-    // onMatchedLogEventInternal holds the write lock and is thread-safe.
+    if (!mCondition) {
+        return;
+    }
+    // Already have gauge metric for the current bucket, do not do it again.
+    if (mCurrentSlicedBucket->size() > 0) {
+        return;
+    }
+    vector<std::shared_ptr<LogEvent>> allData;
+    if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
+        ALOGE("Stats puller failed for tag: %d", mPullTagId);
+        return;
+    }
     for (const auto& data : allData) {
         onMatchedLogEvent(0, *data, false /*scheduledPull*/);
     }
+    flushIfNeeded(eventTime);
 }
 
 void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
 }
 
-int64_t GaugeMetricProducer::getGauge(const LogEvent& event) const {
+int64_t GaugeMetricProducer::getGauge(const LogEvent& event) {
     status_t err = NO_ERROR;
     int64_t val = event.GetLong(mMetric.gauge_field(), &err);
     if (err == NO_ERROR) {
@@ -235,14 +221,13 @@
 }
 
 void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
-    // onMatchedLogEventInternal holds the write lock and is thread-safe.
+    AutoMutex mutex(mLock);
     for (const auto& data : allData) {
         onMatchedLogEvent(0, *data, true /*scheduledPull*/);
     }
 }
 
 bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
-    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
         return false;
     }
@@ -266,32 +251,32 @@
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const map<string, HashableDimensionKey>& conditionKey, bool condition,
         const LogEvent& event, bool scheduledPull) {
-    uint64_t eventTimeNs = event.GetTimestampNs();
-    flushIfNeeded(eventTimeNs);
-
     if (condition == false) {
         return;
     }
-    const long gauge = getGauge(event);
-    if (gauge < 0) {
-        return;
-    }
-    if (hitGuardRail(eventKey)) {
-        return;
-    }
-
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+    uint64_t eventTimeNs = event.GetTimestampNs();
     if (eventTimeNs < mCurrentBucketStartTimeNs) {
         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
              (long long)mCurrentBucketStartTimeNs);
         return;
     }
 
+    // When the event happens in a new bucket, flush the old buckets.
+    if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) {
+        flushIfNeeded(eventTimeNs);
+    }
+
     // For gauge metric, we just simply use the first gauge in the given bucket.
     if (!mCurrentSlicedBucket->empty()) {
         return;
     }
+    const long gauge = getGauge(event);
+    if (gauge >= 0) {
+        if (hitGuardRail(eventKey)) {
+            return;
+        }
         (*mCurrentSlicedBucket)[eventKey] = gauge;
+    }
     for (auto& tracker : mAnomalyTrackers) {
         tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge);
     }
@@ -303,7 +288,6 @@
 // if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
 // the GaugeMetricProducer while holding the lock.
 void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
         return;
     }
@@ -337,7 +321,6 @@
 }
 
 size_t GaugeMetricProducer::byteSize() const {
-    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     size_t totalSize = 0;
     for (const auto& pair : mPastBuckets) {
         totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 8df6111..f344303 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -81,8 +81,6 @@
     void startNewProtoOutputStream(long long timestamp) override;
 
 private:
-    void SerializeBuckets();
-
     // The default bucket size for gauge metric is 1 second.
     static const uint64_t kDefaultGaugemBucketSizeNs = 1000 * 1000 * 1000;
     const GaugeMetric mMetric;
@@ -91,6 +89,8 @@
     // tagId for pulled data. -1 if this is not pulled
     const int mPullTagId;
 
+    Mutex mLock;
+
     // Save the past buckets and we can clear when the StatsLogReport is dumped.
     // TODO: Add a lock to mPastBuckets.
     std::unordered_map<HashableDimensionKey, std::vector<GaugeBucket>> mPastBuckets;
@@ -98,7 +98,7 @@
     // The current bucket.
     std::shared_ptr<DimToValMap> mCurrentSlicedBucket = std::make_shared<DimToValMap>();
 
-    int64_t getGauge(const LogEvent& event) const;
+    int64_t getGauge(const LogEvent& event);
 
     bool hitGuardRail(const HashableDimensionKey& newKey);
 
diff --git a/cmds/statsd/src/metrics/MetricProducer.cpp b/cmds/statsd/src/metrics/MetricProducer.cpp
index 7542a94..62fb632 100644
--- a/cmds/statsd/src/metrics/MetricProducer.cpp
+++ b/cmds/statsd/src/metrics/MetricProducer.cpp
@@ -23,7 +23,6 @@
 
 void MetricProducer::onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event,
                                        bool scheduledPull) {
-    std::unique_lock<std::shared_timed_mutex> writeLock(mRWMutex);
     uint64_t eventTimeNs = event.GetTimestampNs();
     // this is old event, maybe statsd restarted?
     if (eventTimeNs < mStartTimeNs) {
@@ -60,16 +59,12 @@
     } else {
         condition = mCondition;
     }
-    // Unlock as onMatchedLogEventInternal is threadsafe.
-    writeLock.unlock();
 
     onMatchedLogEventInternal(matcherIndex, eventKey, conditionKeys, condition, event,
                               scheduledPull);
 }
 
 std::unique_ptr<std::vector<uint8_t>> MetricProducer::serializeProto() {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-
     size_t bufferSize = mProto->size();
 
     std::unique_ptr<std::vector<uint8_t>> buffer(new std::vector<uint8_t>(bufferSize));
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index 27343ad..b22ff6f 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -17,8 +17,6 @@
 #ifndef METRIC_PRODUCER_H
 #define METRIC_PRODUCER_H
 
-#include <shared_mutex>
-
 #include "anomaly/AnomalyTracker.h"
 #include "condition/ConditionWizard.h"
 #include "config/ConfigKey.h"
@@ -139,10 +137,6 @@
 
     long long mProtoToken;
 
-    // Read/Write mutex to make the producer thread-safe.
-    // TODO(yanglu): replace with std::shared_mutex when available in libc++.
-    mutable std::shared_timed_mutex mRWMutex;
-
     virtual void startNewProtoOutputStream(long long timestamp) = 0;
 
     std::unique_ptr<std::vector<uint8_t>> serializeProto();
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index eed7841..66c8419 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -121,7 +121,6 @@
 }
 
 void ValueMetricProducer::startNewProtoOutputStream(long long startTime) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     mProto = std::make_unique<ProtoOutputStream>();
     mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -137,8 +136,9 @@
     VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
 }
 
-void ValueMetricProducer::SerializeBuckets() {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
+    VLOG("metric %s dump report now...", mMetric.name().c_str());
+
     for (const auto& pair : mPastBuckets) {
         const HashableDimensionKey& hashableKey = pair.first;
         VLOG("  dimension key %s", hashableKey.c_str());
@@ -185,63 +185,47 @@
     mProto->end(mProtoToken);
     mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
                   (long long)mCurrentBucketStartTimeNs);
-    mPastBuckets.clear();
-}
 
-std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
     VLOG("metric %s dump report now...", mMetric.name().c_str());
-
-    SerializeBuckets();
     std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
 
     startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
+    mPastBuckets.clear();
 
     return buffer;
+
     // TODO: Clear mDimensionKeyMap once the report is dumped.
 }
 
 void ValueMetricProducer::onConditionChanged(const bool condition, const uint64_t eventTime) {
-    vector<shared_ptr<LogEvent>> allData;
+    AutoMutex _l(mLock);
+    mCondition = condition;
 
-    // TODO(yanglu): move the following logic to a seperate function to make it lockable.
-    {
-        std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-        mCondition = condition;
-
-        if (mPullTagId == -1) {
-            return;
-        }
-
+    if (mPullTagId != -1) {
         if (mCondition == true) {
             mStatsPullerManager->RegisterReceiver(mPullTagId, this,
                                                   mMetric.bucket().bucket_size_millis());
         } else if (mCondition == false) {
             mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
         }
-        if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
-            return;
-        }
-    }
 
-    if (allData.size() == 0) {
+        vector<shared_ptr<LogEvent>> allData;
+        if (mStatsPullerManager->Pull(mPullTagId, &allData)) {
+            if (allData.size() == 0) {
+                return;
+            }
+            for (const auto& data : allData) {
+                onMatchedLogEvent(0, *data, false);
+            }
+            flushIfNeeded(eventTime);
+        }
         return;
     }
-
-    // onMatchedLogEventInternal holds the write lock and is thread-safe.
-    for (const auto& data : allData) {
-        onMatchedLogEvent(0, *data, false);
-    }
-    // flushIfNeeded holds the write lock and is thread-safe.
-    flushIfNeeded(eventTime);
-}
-
-bool ValueMetricProducer::IsConditionMet() const {
-    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
-    return mCondition == true || !mMetric.has_condition();
 }
 
 void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
-    if (IsConditionMet()) {
+    AutoMutex _l(mLock);
+    if (mCondition == true || !mMetric.has_condition()) {
         if (allData.size() == 0) {
             return;
         }
@@ -258,7 +242,6 @@
 }
 
 bool ValueMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
-    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     // ===========GuardRail==============
     // 1. Report the tuple count if the tuple count > soft limit
     if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
@@ -279,75 +262,58 @@
     return false;
 }
 
-void ValueMetricProducer::onMatchedLogEventInternal_pull(const uint64_t& eventTimeNs,
-                                                         const HashableDimensionKey& eventKey,
-                                                         const long& value, bool scheduledPull) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-
-    if (eventTimeNs < mCurrentBucketStartTimeNs) {
-        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
-             (long long)mCurrentBucketStartTimeNs);
-        return;
-    }
-    Interval& interval = mCurrentSlicedBucket[eventKey];
-    if (scheduledPull) {
-        // scheduled pull always sets beginning of current bucket and end
-        // of next bucket
-        if (interval.raw.size() > 0) {
-            interval.raw.back().second = value;
-        } else {
-            interval.raw.push_back(make_pair(value, value));
-        }
-        Interval& nextInterval = mNextSlicedBucket[eventKey];
-        if (nextInterval.raw.size() == 0) {
-            nextInterval.raw.push_back(make_pair(value, 0));
-        } else {
-            nextInterval.raw.front().first = value;
-        }
-    } else {
-        if (mCondition == true) {
-            interval.raw.push_back(make_pair(value, 0));
-        } else {
-            if (interval.raw.size() != 0) {
-                interval.raw.back().second = value;
-            } else {
-                interval.tainted = true;
-                VLOG("Data on condition true missing!");
-            }
-        }
-    }
-}
-
-void ValueMetricProducer::onMatchedLogEventInternal_push(const uint64_t& eventTimeNs,
-                                                         const HashableDimensionKey& eventKey,
-                                                         const long& value) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-    if (eventTimeNs < mCurrentBucketStartTimeNs) {
-        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
-             (long long)mCurrentBucketStartTimeNs);
-        return;
-    }
-    mCurrentSlicedBucket[eventKey].raw.push_back(make_pair(value, 0));
-}
-
 void ValueMetricProducer::onMatchedLogEventInternal(
         const size_t matcherIndex, const HashableDimensionKey& eventKey,
         const map<string, HashableDimensionKey>& conditionKey, bool condition,
         const LogEvent& event, bool scheduledPull) {
     uint64_t eventTimeNs = event.GetTimestampNs();
-    long value = get_value(event);
+    if (eventTimeNs < mCurrentBucketStartTimeNs) {
+        VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
+             (long long)mCurrentBucketStartTimeNs);
+        return;
+    }
+
     if (hitGuardRail(eventKey)) {
         return;
     }
+    Interval& interval = mCurrentSlicedBucket[eventKey];
+
+    long value = get_value(event);
+
     if (mPullTagId != -1) {
-        onMatchedLogEventInternal_pull(eventTimeNs, eventKey, value, scheduledPull);
+        if (scheduledPull) {
+            // scheduled pull always sets beginning of current bucket and end
+            // of next bucket
+            if (interval.raw.size() > 0) {
+                interval.raw.back().second = value;
+            } else {
+                interval.raw.push_back(make_pair(value, value));
+            }
+            Interval& nextInterval = mNextSlicedBucket[eventKey];
+            if (nextInterval.raw.size() == 0) {
+                nextInterval.raw.push_back(make_pair(value, 0));
+            } else {
+                nextInterval.raw.front().first = value;
+            }
+        } else {
+            if (mCondition == true) {
+                interval.raw.push_back(make_pair(value, 0));
+            } else {
+                if (interval.raw.size() != 0) {
+                    interval.raw.back().second = value;
+                } else {
+                    interval.tainted = true;
+                    VLOG("Data on condition true missing!");
+                }
+            }
+        }
     } else {
         flushIfNeeded(eventTimeNs);
-        onMatchedLogEventInternal_push(eventTimeNs, eventKey, value);
+        interval.raw.push_back(make_pair(value, 0));
     }
 }
 
-long ValueMetricProducer::get_value(const LogEvent& event) const {
+long ValueMetricProducer::get_value(const LogEvent& event) {
     status_t err = NO_ERROR;
     long val = event.GetLong(mMetric.value_field(), &err);
     if (err == NO_ERROR) {
@@ -359,7 +325,6 @@
 }
 
 void ValueMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
-    std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
     if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTimeNs) {
         VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
              (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
@@ -408,7 +373,6 @@
 }
 
 size_t ValueMetricProducer::byteSize() const {
-    std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
     size_t totalSize = 0;
     for (const auto& pair : mPastBuckets) {
         totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index e87e9da..a024bd8 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -72,16 +72,6 @@
     void startNewProtoOutputStream(long long timestamp) override;
 
 private:
-    void onMatchedLogEventInternal_pull(const uint64_t& eventTimeNs,
-                                        const HashableDimensionKey& eventKey, const long& value,
-                                        bool scheduledPull);
-    void onMatchedLogEventInternal_push(const uint64_t& eventTimeNs,
-                                        const HashableDimensionKey& eventKey, const long& value);
-
-    void SerializeBuckets();
-
-    bool IsConditionMet() const;
-
     const ValueMetric mMetric;
 
     std::shared_ptr<StatsPullerManager> mStatsPullerManager;
@@ -92,6 +82,8 @@
                         const int pullTagId, const uint64_t startTimeNs,
                         std::shared_ptr<StatsPullerManager> statsPullerManager);
 
+    Mutex mLock;
+
     // tagId for pulled data. -1 if this is not pulled
     const int mPullTagId;
 
@@ -110,7 +102,7 @@
     // TODO: Add a lock to mPastBuckets.
     std::unordered_map<HashableDimensionKey, std::vector<ValueBucket>> mPastBuckets;
 
-    long get_value(const LogEvent& event) const;
+    long get_value(const LogEvent& event);
 
     bool hitGuardRail(const HashableDimensionKey& newKey);