Revert "Thread-safe metric producers."
This reverts commit 8de6939c494da838f6dbbda0631f66425dbbd25b.
Change-Id: Ieae841bfc5339b569f0fca909a6066de72806617
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index 4eb121c..ce60eb9 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -96,8 +96,6 @@
}
void CountMetricProducer::startNewProtoOutputStream(long long startTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,8 +109,13 @@
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
-void CountMetricProducer::serializeBuckets() {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
+ long long endTime = time(nullptr) * NS_PER_SEC;
+
+ // Dump current bucket if it's stale.
+ // If current bucket is still on-going, don't force dump current bucket.
+ // In finish(), We can force dump current bucket.
+ flushIfNeeded(endTime);
VLOG("metric %s dump report now...", mMetric.name().c_str());
for (const auto& counter : mPastBuckets) {
@@ -158,40 +161,28 @@
}
mProto->end(wrapperToken);
}
+
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
- mPastBuckets.clear();
- // TODO: Clear mDimensionKeyMap once the report is dumped.
-}
-
-std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
- long long endTime = time(nullptr) * NS_PER_SEC;
VLOG("metric %s dump report now...", mMetric.name().c_str());
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeeded(endTime);
-
- // TODO(yanglu): merge these three functions to one to avoid three locks.
- serializeBuckets();
-
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
startNewProtoOutputStream(endTime);
+ mPastBuckets.clear();
return buffer;
+
+ // TODO: Clear mDimensionKeyMap once the report is dumped.
}
void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mCondition = conditionMet;
}
bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) {
return false;
}
@@ -219,40 +210,38 @@
flushIfNeeded(eventTimeNs);
- // ===========GuardRail==============
- if (hitGuardRail(eventKey)) {
+ if (condition == false) {
return;
}
- // TODO(yanglu): move the following logic to a seperate function to make it lockable.
- {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
- if (condition == false) {
+ auto it = mCurrentSlicedCounter->find(eventKey);
+
+ if (it == mCurrentSlicedCounter->end()) {
+ // ===========GuardRail==============
+ if (hitGuardRail(eventKey)) {
return;
}
- auto it = mCurrentSlicedCounter->find(eventKey);
- if (it == mCurrentSlicedCounter->end()) {
- // create a counter for the new key
- mCurrentSlicedCounter->insert({eventKey, 1});
- } else {
- // increment the existing value
- auto& count = it->second;
- count++;
- }
- const int64_t& count = mCurrentSlicedCounter->find(eventKey)->second;
- for (auto& tracker : mAnomalyTrackers) {
- tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, count);
- }
- VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(count));
+ // create a counter for the new key
+ (*mCurrentSlicedCounter)[eventKey] = 1;
+ } else {
+ // increment the existing value
+ auto& count = it->second;
+ count++;
}
+
+ for (auto& tracker : mAnomalyTrackers) {
+ tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
+ mCurrentSlicedCounter->find(eventKey)->second);
+ }
+
+ VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(),
+ (long long)(*mCurrentSlicedCounter)[eventKey]);
}
// When a new matched event comes in, we check if event falls into the current
// bucket. If not, flush the old counter to past buckets and initialize the new bucket.
void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-
if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
return;
}
@@ -286,7 +275,6 @@
// greater than actual data size as it contains each dimension of
// CountMetricData is duplicated.
size_t CountMetricProducer::byteSize() const {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h
index 164dfb2..f78a199 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.h
+++ b/cmds/statsd/src/metrics/CountMetricProducer.h
@@ -75,8 +75,6 @@
void startNewProtoOutputStream(long long timestamp) override;
private:
- void serializeBuckets();
-
const CountMetric mMetric;
// TODO: Add a lock to mPastBuckets.
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index 3b49b9a..a0374c0 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -104,7 +104,6 @@
}
void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -112,7 +111,7 @@
}
unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
- const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const {
+ const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) {
switch (mMetric.aggregation_type()) {
case DurationMetric_AggregationType_SUM:
return make_unique<OringDurationTracker>(
@@ -131,7 +130,6 @@
}
void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
flushIfNeeded(eventTime);
// Now for each of the on-going event, check if the condition has changed for them.
@@ -141,7 +139,6 @@
}
void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
mCondition = conditionMet;
flushIfNeeded(eventTime);
@@ -152,8 +149,15 @@
}
}
-void DurationMetricProducer::SerializeBuckets() {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
+ long long endTime = time(nullptr) * NS_PER_SEC;
+
+ // Dump current bucket if it's stale.
+ // If current bucket is still on-going, don't force dump current bucket.
+ // In finish(), We can force dump current bucket.
+ flushIfNeeded(endTime);
+ VLOG("metric %s dump report now...", mMetric.name().c_str());
+
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
VLOG(" dimension key %s", hashableKey.c_str());
@@ -210,29 +214,13 @@
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
-}
-
-std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
- VLOG("metric %s dump report now...", mMetric.name().c_str());
-
- long long endTime = time(nullptr) * NS_PER_SEC;
-
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeeded(endTime);
-
- SerializeBuckets();
-
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
-
startNewProtoOutputStream(endTime);
// TODO: Properly clear the old buckets.
return buffer;
}
void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
return;
}
@@ -252,7 +240,6 @@
}
bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
// the key is not new, we are good.
if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) {
return false;
@@ -278,37 +265,32 @@
const LogEvent& event, bool scheduledPull) {
flushIfNeeded(event.GetTimestampNs());
- // TODO(yanglu): move the following logic to a seperate function to make it lockable.
- {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
- if (matcherIndex == mStopAllIndex) {
- for (auto& pair : mCurrentSlicedDuration) {
- pair.second->noteStopAll(event.GetTimestampNs());
- }
+ if (matcherIndex == mStopAllIndex) {
+ for (auto& pair : mCurrentSlicedDuration) {
+ pair.second->noteStopAll(event.GetTimestampNs());
+ }
+ return;
+ }
+
+ HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
+
+ if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
+ if (hitGuardRail(eventKey)) {
return;
}
+ mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]);
+ }
- HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
+ auto it = mCurrentSlicedDuration.find(eventKey);
- if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
- if (hitGuardRail(eventKey)) {
- return;
- }
- mCurrentSlicedDuration[eventKey] =
- createDurationTracker(eventKey, mPastBuckets[eventKey]);
- }
- auto it = mCurrentSlicedDuration.find(eventKey);
-
- if (matcherIndex == mStartIndex) {
- it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
- } else if (matcherIndex == mStopIndex) {
- it->second->noteStop(atomKey, event.GetTimestampNs(), false);
- }
+ if (matcherIndex == mStartIndex) {
+ it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
+ } else if (matcherIndex == mStopIndex) {
+ it->second->noteStop(atomKey, event.GetTimestampNs(), false);
}
}
size_t DurationMetricProducer::byteSize() const {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h
index 68fff48..5b5373e 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.h
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.h
@@ -71,8 +71,6 @@
void startNewProtoOutputStream(long long timestamp) override;
private:
- void SerializeBuckets();
-
const DurationMetric mMetric;
// Index of the SimpleLogEntryMatcher which defines the start.
@@ -98,8 +96,8 @@
std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>>
mCurrentSlicedDuration;
- std::unique_ptr<DurationTracker> createDurationTracker(
- const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const;
+ std::unique_ptr<DurationTracker> createDurationTracker(const HashableDimensionKey& eventKey,
+ std::vector<DurationBucket>& bucket);
bool hitGuardRail(const HashableDimensionKey& newKey);
static const size_t kBucketSize = sizeof(DurationBucket{});
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 53f112a..95a18f7 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -73,7 +73,6 @@
}
void EventMetricProducer::startNewProtoOutputStream(long long startTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mProto = std::make_unique<ProtoOutputStream>();
// TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData,
// and StatsEvent.
@@ -90,16 +89,11 @@
std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() {
long long endTime = time(nullptr) * NS_PER_SEC;
- // TODO(yanglu): make this section to an util function.
- {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
- mProto->end(mProtoToken);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
+ mProto->end(mProtoToken);
+ mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
- size_t bufferSize = mProto->size();
- VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
- }
-
+ size_t bufferSize = mProto->size();
+ VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
startNewProtoOutputStream(endTime);
@@ -109,7 +103,6 @@
void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mCondition = conditionMet;
}
@@ -117,7 +110,6 @@
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (!condition) {
return;
}
@@ -132,7 +124,6 @@
}
size_t EventMetricProducer::byteSize() const {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
return mProto->bytesWritten();
}
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index ed4c760..1791654 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -102,7 +102,6 @@
}
void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -112,8 +111,14 @@
void GaugeMetricProducer::finish() {
}
-void GaugeMetricProducer::SerializeBuckets() {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
+ VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
+
+ // Dump current bucket if it's stale.
+ // If current bucket is still on-going, don't force dump current bucket.
+ // In finish(), We can force dump current bucket.
+ flushIfNeeded(time(nullptr) * NS_PER_SEC);
+
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
auto it = mDimensionKeyMap.find(hashableKey);
@@ -161,69 +166,50 @@
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
- mPastBuckets.clear();
-}
-
-std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
- VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
-
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeeded(time(nullptr) * NS_PER_SEC);
-
- SerializeBuckets();
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
+ mPastBuckets.clear();
+
return buffer;
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
+ AutoMutex _l(mLock);
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
-
- // flushIfNeeded holds the write lock and is thread-safe.
flushIfNeeded(eventTime);
+ mCondition = conditionMet;
- vector<std::shared_ptr<LogEvent>> allData;
- // The following section is to update the condition and re-pull the gauge.
- // TODO(yanglu): make it a seperate lockable function.
- {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-
- mCondition = conditionMet;
-
- // Push mode. No need to proactively pull the gauge data.
- if (mPullTagId == -1) {
- return;
- }
- if (!mCondition) {
- return;
- }
- // Already have gauge metric for the current bucket, do not do it again.
- if (mCurrentSlicedBucket->size() > 0) {
- return;
- }
- if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
- ALOGE("Stats puller failed for tag: %d", mPullTagId);
- return;
- }
+ // Push mode. No need to proactively pull the gauge data.
+ if (mPullTagId == -1) {
+ return;
}
-
- // onMatchedLogEventInternal holds the write lock and is thread-safe.
+ if (!mCondition) {
+ return;
+ }
+ // Already have gauge metric for the current bucket, do not do it again.
+ if (mCurrentSlicedBucket->size() > 0) {
+ return;
+ }
+ vector<std::shared_ptr<LogEvent>> allData;
+ if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
+ ALOGE("Stats puller failed for tag: %d", mPullTagId);
+ return;
+ }
for (const auto& data : allData) {
onMatchedLogEvent(0, *data, false /*scheduledPull*/);
}
+ flushIfNeeded(eventTime);
}
void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
-int64_t GaugeMetricProducer::getGauge(const LogEvent& event) const {
+int64_t GaugeMetricProducer::getGauge(const LogEvent& event) {
status_t err = NO_ERROR;
int64_t val = event.GetLong(mMetric.gauge_field(), &err);
if (err == NO_ERROR) {
@@ -235,14 +221,13 @@
}
void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
- // onMatchedLogEventInternal holds the write lock and is thread-safe.
+ AutoMutex mutex(mLock);
for (const auto& data : allData) {
onMatchedLogEvent(0, *data, true /*scheduledPull*/);
}
}
bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
return false;
}
@@ -266,32 +251,32 @@
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const map<string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) {
- uint64_t eventTimeNs = event.GetTimestampNs();
- flushIfNeeded(eventTimeNs);
-
if (condition == false) {
return;
}
- const long gauge = getGauge(event);
- if (gauge < 0) {
- return;
- }
- if (hitGuardRail(eventKey)) {
- return;
- }
-
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+ uint64_t eventTimeNs = event.GetTimestampNs();
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
(long long)mCurrentBucketStartTimeNs);
return;
}
+ // When the event happens in a new bucket, flush the old buckets.
+ if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) {
+ flushIfNeeded(eventTimeNs);
+ }
+
// For gauge metric, we just simply use the first gauge in the given bucket.
if (!mCurrentSlicedBucket->empty()) {
return;
}
+ const long gauge = getGauge(event);
+ if (gauge >= 0) {
+ if (hitGuardRail(eventKey)) {
+ return;
+ }
(*mCurrentSlicedBucket)[eventKey] = gauge;
+ }
for (auto& tracker : mAnomalyTrackers) {
tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge);
}
@@ -303,7 +288,6 @@
// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
// the GaugeMetricProducer while holding the lock.
void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
return;
}
@@ -337,7 +321,6 @@
}
size_t GaugeMetricProducer::byteSize() const {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 8df6111..f344303 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -81,8 +81,6 @@
void startNewProtoOutputStream(long long timestamp) override;
private:
- void SerializeBuckets();
-
// The default bucket size for gauge metric is 1 second.
static const uint64_t kDefaultGaugemBucketSizeNs = 1000 * 1000 * 1000;
const GaugeMetric mMetric;
@@ -91,6 +89,8 @@
// tagId for pulled data. -1 if this is not pulled
const int mPullTagId;
+ Mutex mLock;
+
// Save the past buckets and we can clear when the StatsLogReport is dumped.
// TODO: Add a lock to mPastBuckets.
std::unordered_map<HashableDimensionKey, std::vector<GaugeBucket>> mPastBuckets;
@@ -98,7 +98,7 @@
// The current bucket.
std::shared_ptr<DimToValMap> mCurrentSlicedBucket = std::make_shared<DimToValMap>();
- int64_t getGauge(const LogEvent& event) const;
+ int64_t getGauge(const LogEvent& event);
bool hitGuardRail(const HashableDimensionKey& newKey);
diff --git a/cmds/statsd/src/metrics/MetricProducer.cpp b/cmds/statsd/src/metrics/MetricProducer.cpp
index 7542a94..62fb632 100644
--- a/cmds/statsd/src/metrics/MetricProducer.cpp
+++ b/cmds/statsd/src/metrics/MetricProducer.cpp
@@ -23,7 +23,6 @@
void MetricProducer::onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event,
bool scheduledPull) {
- std::unique_lock<std::shared_timed_mutex> writeLock(mRWMutex);
uint64_t eventTimeNs = event.GetTimestampNs();
// this is old event, maybe statsd restarted?
if (eventTimeNs < mStartTimeNs) {
@@ -60,16 +59,12 @@
} else {
condition = mCondition;
}
- // Unlock as onMatchedLogEventInternal is threadsafe.
- writeLock.unlock();
onMatchedLogEventInternal(matcherIndex, eventKey, conditionKeys, condition, event,
scheduledPull);
}
std::unique_ptr<std::vector<uint8_t>> MetricProducer::serializeProto() {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-
size_t bufferSize = mProto->size();
std::unique_ptr<std::vector<uint8_t>> buffer(new std::vector<uint8_t>(bufferSize));
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index 27343ad..b22ff6f 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -17,8 +17,6 @@
#ifndef METRIC_PRODUCER_H
#define METRIC_PRODUCER_H
-#include <shared_mutex>
-
#include "anomaly/AnomalyTracker.h"
#include "condition/ConditionWizard.h"
#include "config/ConfigKey.h"
@@ -139,10 +137,6 @@
long long mProtoToken;
- // Read/Write mutex to make the producer thread-safe.
- // TODO(yanglu): replace with std::shared_mutex when available in libc++.
- mutable std::shared_timed_mutex mRWMutex;
-
virtual void startNewProtoOutputStream(long long timestamp) = 0;
std::unique_ptr<std::vector<uint8_t>> serializeProto();
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index eed7841..66c8419 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -121,7 +121,6 @@
}
void ValueMetricProducer::startNewProtoOutputStream(long long startTime) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -137,8 +136,9 @@
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
-void ValueMetricProducer::SerializeBuckets() {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
+ VLOG("metric %s dump report now...", mMetric.name().c_str());
+
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
VLOG(" dimension key %s", hashableKey.c_str());
@@ -185,63 +185,47 @@
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
- mPastBuckets.clear();
-}
-std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
VLOG("metric %s dump report now...", mMetric.name().c_str());
-
- SerializeBuckets();
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
+ mPastBuckets.clear();
return buffer;
+
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
void ValueMetricProducer::onConditionChanged(const bool condition, const uint64_t eventTime) {
- vector<shared_ptr<LogEvent>> allData;
+ AutoMutex _l(mLock);
+ mCondition = condition;
- // TODO(yanglu): move the following logic to a seperate function to make it lockable.
- {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
- mCondition = condition;
-
- if (mPullTagId == -1) {
- return;
- }
-
+ if (mPullTagId != -1) {
if (mCondition == true) {
mStatsPullerManager->RegisterReceiver(mPullTagId, this,
mMetric.bucket().bucket_size_millis());
} else if (mCondition == false) {
mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
}
- if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
- return;
- }
- }
- if (allData.size() == 0) {
+ vector<shared_ptr<LogEvent>> allData;
+ if (mStatsPullerManager->Pull(mPullTagId, &allData)) {
+ if (allData.size() == 0) {
+ return;
+ }
+ for (const auto& data : allData) {
+ onMatchedLogEvent(0, *data, false);
+ }
+ flushIfNeeded(eventTime);
+ }
return;
}
-
- // onMatchedLogEventInternal holds the write lock and is thread-safe.
- for (const auto& data : allData) {
- onMatchedLogEvent(0, *data, false);
- }
- // flushIfNeeded holds the write lock and is thread-safe.
- flushIfNeeded(eventTime);
-}
-
-bool ValueMetricProducer::IsConditionMet() const {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
- return mCondition == true || !mMetric.has_condition();
}
void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
- if (IsConditionMet()) {
+ AutoMutex _l(mLock);
+ if (mCondition == true || !mMetric.has_condition()) {
if (allData.size() == 0) {
return;
}
@@ -258,7 +242,6 @@
}
bool ValueMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
// ===========GuardRail==============
// 1. Report the tuple count if the tuple count > soft limit
if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
@@ -279,75 +262,58 @@
return false;
}
-void ValueMetricProducer::onMatchedLogEventInternal_pull(const uint64_t& eventTimeNs,
- const HashableDimensionKey& eventKey,
- const long& value, bool scheduledPull) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
-
- if (eventTimeNs < mCurrentBucketStartTimeNs) {
- VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
- (long long)mCurrentBucketStartTimeNs);
- return;
- }
- Interval& interval = mCurrentSlicedBucket[eventKey];
- if (scheduledPull) {
- // scheduled pull always sets beginning of current bucket and end
- // of next bucket
- if (interval.raw.size() > 0) {
- interval.raw.back().second = value;
- } else {
- interval.raw.push_back(make_pair(value, value));
- }
- Interval& nextInterval = mNextSlicedBucket[eventKey];
- if (nextInterval.raw.size() == 0) {
- nextInterval.raw.push_back(make_pair(value, 0));
- } else {
- nextInterval.raw.front().first = value;
- }
- } else {
- if (mCondition == true) {
- interval.raw.push_back(make_pair(value, 0));
- } else {
- if (interval.raw.size() != 0) {
- interval.raw.back().second = value;
- } else {
- interval.tainted = true;
- VLOG("Data on condition true missing!");
- }
- }
- }
-}
-
-void ValueMetricProducer::onMatchedLogEventInternal_push(const uint64_t& eventTimeNs,
- const HashableDimensionKey& eventKey,
- const long& value) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
- if (eventTimeNs < mCurrentBucketStartTimeNs) {
- VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
- (long long)mCurrentBucketStartTimeNs);
- return;
- }
- mCurrentSlicedBucket[eventKey].raw.push_back(make_pair(value, 0));
-}
-
void ValueMetricProducer::onMatchedLogEventInternal(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const map<string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) {
uint64_t eventTimeNs = event.GetTimestampNs();
- long value = get_value(event);
+ if (eventTimeNs < mCurrentBucketStartTimeNs) {
+ VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
+ (long long)mCurrentBucketStartTimeNs);
+ return;
+ }
+
if (hitGuardRail(eventKey)) {
return;
}
+ Interval& interval = mCurrentSlicedBucket[eventKey];
+
+ long value = get_value(event);
+
if (mPullTagId != -1) {
- onMatchedLogEventInternal_pull(eventTimeNs, eventKey, value, scheduledPull);
+ if (scheduledPull) {
+ // scheduled pull always sets beginning of current bucket and end
+ // of next bucket
+ if (interval.raw.size() > 0) {
+ interval.raw.back().second = value;
+ } else {
+ interval.raw.push_back(make_pair(value, value));
+ }
+ Interval& nextInterval = mNextSlicedBucket[eventKey];
+ if (nextInterval.raw.size() == 0) {
+ nextInterval.raw.push_back(make_pair(value, 0));
+ } else {
+ nextInterval.raw.front().first = value;
+ }
+ } else {
+ if (mCondition == true) {
+ interval.raw.push_back(make_pair(value, 0));
+ } else {
+ if (interval.raw.size() != 0) {
+ interval.raw.back().second = value;
+ } else {
+ interval.tainted = true;
+ VLOG("Data on condition true missing!");
+ }
+ }
+ }
} else {
flushIfNeeded(eventTimeNs);
- onMatchedLogEventInternal_push(eventTimeNs, eventKey, value);
+ interval.raw.push_back(make_pair(value, 0));
}
}
-long ValueMetricProducer::get_value(const LogEvent& event) const {
+long ValueMetricProducer::get_value(const LogEvent& event) {
status_t err = NO_ERROR;
long val = event.GetLong(mMetric.value_field(), &err);
if (err == NO_ERROR) {
@@ -359,7 +325,6 @@
}
void ValueMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
- std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTimeNs) {
VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
(long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
@@ -408,7 +373,6 @@
}
size_t ValueMetricProducer::byteSize() const {
- std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index e87e9da..a024bd8 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -72,16 +72,6 @@
void startNewProtoOutputStream(long long timestamp) override;
private:
- void onMatchedLogEventInternal_pull(const uint64_t& eventTimeNs,
- const HashableDimensionKey& eventKey, const long& value,
- bool scheduledPull);
- void onMatchedLogEventInternal_push(const uint64_t& eventTimeNs,
- const HashableDimensionKey& eventKey, const long& value);
-
- void SerializeBuckets();
-
- bool IsConditionMet() const;
-
const ValueMetric mMetric;
std::shared_ptr<StatsPullerManager> mStatsPullerManager;
@@ -92,6 +82,8 @@
const int pullTagId, const uint64_t startTimeNs,
std::shared_ptr<StatsPullerManager> statsPullerManager);
+ Mutex mLock;
+
// tagId for pulled data. -1 if this is not pulled
const int mPullTagId;
@@ -110,7 +102,7 @@
// TODO: Add a lock to mPastBuckets.
std::unordered_map<HashableDimensionKey, std::vector<ValueBucket>> mPastBuckets;
- long get_value(const LogEvent& event) const;
+ long get_value(const LogEvent& event);
bool hitGuardRail(const HashableDimensionKey& newKey);