Thread-safe metric producers.
Test: unit test passed
Change-Id: Ie47404e8649b63ee8ac32e40189a47f6cb7a9def
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index ce60eb9..4eb121c 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -96,6 +96,8 @@
}
void CountMetricProducer::startNewProtoOutputStream(long long startTime) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -109,13 +111,8 @@
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
-std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
- long long endTime = time(nullptr) * NS_PER_SEC;
-
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeeded(endTime);
+void CountMetricProducer::serializeBuckets() {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
VLOG("metric %s dump report now...", mMetric.name().c_str());
for (const auto& counter : mPastBuckets) {
@@ -161,28 +158,40 @@
}
mProto->end(wrapperToken);
}
-
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
+ mPastBuckets.clear();
+ // TODO: Clear mDimensionKeyMap once the report is dumped.
+}
+
+std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
+ long long endTime = time(nullptr) * NS_PER_SEC;
VLOG("metric %s dump report now...", mMetric.name().c_str());
+ // Dump current bucket if it's stale.
+ // If current bucket is still on-going, don't force dump current bucket.
+ // In finish(), We can force dump current bucket.
+ flushIfNeeded(endTime);
+
+ // TODO(yanglu): merge these three functions to one to avoid three locks.
+ serializeBuckets();
+
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
startNewProtoOutputStream(endTime);
- mPastBuckets.clear();
return buffer;
-
- // TODO: Clear mDimensionKeyMap once the report is dumped.
}
void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mCondition = conditionMet;
}
bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) {
return false;
}
@@ -210,38 +219,40 @@
flushIfNeeded(eventTimeNs);
- if (condition == false) {
+ // ===========GuardRail==============
+ if (hitGuardRail(eventKey)) {
return;
}
- auto it = mCurrentSlicedCounter->find(eventKey);
-
- if (it == mCurrentSlicedCounter->end()) {
- // ===========GuardRail==============
- if (hitGuardRail(eventKey)) {
+ // TODO(yanglu): move the following logic to a seperate function to make it lockable.
+ {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+ if (condition == false) {
return;
}
- // create a counter for the new key
- (*mCurrentSlicedCounter)[eventKey] = 1;
- } else {
- // increment the existing value
- auto& count = it->second;
- count++;
+ auto it = mCurrentSlicedCounter->find(eventKey);
+ if (it == mCurrentSlicedCounter->end()) {
+ // create a counter for the new key
+ mCurrentSlicedCounter->insert({eventKey, 1});
+ } else {
+ // increment the existing value
+ auto& count = it->second;
+ count++;
+ }
+ const int64_t& count = mCurrentSlicedCounter->find(eventKey)->second;
+ for (auto& tracker : mAnomalyTrackers) {
+ tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, count);
+ }
+ VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(), (long long)(count));
}
-
- for (auto& tracker : mAnomalyTrackers) {
- tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
- mCurrentSlicedCounter->find(eventKey)->second);
- }
-
- VLOG("metric %s %s->%lld", mMetric.name().c_str(), eventKey.c_str(),
- (long long)(*mCurrentSlicedCounter)[eventKey]);
}
// When a new matched event comes in, we check if event falls into the current
// bucket. If not, flush the old counter to past buckets and initialize the new bucket.
void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
return;
}
@@ -275,6 +286,7 @@
// greater than actual data size as it contains each dimension of
// CountMetricData is duplicated.
size_t CountMetricProducer::byteSize() const {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h
index f78a199..164dfb2 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.h
+++ b/cmds/statsd/src/metrics/CountMetricProducer.h
@@ -75,6 +75,8 @@
void startNewProtoOutputStream(long long timestamp) override;
private:
+ void serializeBuckets();
+
const CountMetric mMetric;
// TODO: Add a lock to mPastBuckets.
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index a0374c0..3b49b9a 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -104,6 +104,7 @@
}
void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,7 +112,7 @@
}
unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
- const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) {
+ const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const {
switch (mMetric.aggregation_type()) {
case DurationMetric_AggregationType_SUM:
return make_unique<OringDurationTracker>(
@@ -130,6 +131,7 @@
}
void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
flushIfNeeded(eventTime);
// Now for each of the on-going event, check if the condition has changed for them.
@@ -139,6 +141,7 @@
}
void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
mCondition = conditionMet;
flushIfNeeded(eventTime);
@@ -149,15 +152,8 @@
}
}
-std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
- long long endTime = time(nullptr) * NS_PER_SEC;
-
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeeded(endTime);
- VLOG("metric %s dump report now...", mMetric.name().c_str());
-
+void DurationMetricProducer::SerializeBuckets() {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
VLOG(" dimension key %s", hashableKey.c_str());
@@ -214,13 +210,29 @@
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
+}
+
+std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
+ VLOG("metric %s dump report now...", mMetric.name().c_str());
+
+ long long endTime = time(nullptr) * NS_PER_SEC;
+
+ // Dump current bucket if it's stale.
+ // If current bucket is still on-going, don't force dump current bucket.
+ // In finish(), We can force dump current bucket.
+ flushIfNeeded(endTime);
+
+ SerializeBuckets();
+
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
+
startNewProtoOutputStream(endTime);
// TODO: Properly clear the old buckets.
return buffer;
}
void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
return;
}
@@ -240,6 +252,7 @@
}
bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
// the key is not new, we are good.
if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) {
return false;
@@ -265,32 +278,37 @@
const LogEvent& event, bool scheduledPull) {
flushIfNeeded(event.GetTimestampNs());
- if (matcherIndex == mStopAllIndex) {
- for (auto& pair : mCurrentSlicedDuration) {
- pair.second->noteStopAll(event.GetTimestampNs());
- }
- return;
- }
-
- HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
-
- if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
- if (hitGuardRail(eventKey)) {
+ // TODO(yanglu): move the following logic to a seperate function to make it lockable.
+ {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+ if (matcherIndex == mStopAllIndex) {
+ for (auto& pair : mCurrentSlicedDuration) {
+ pair.second->noteStopAll(event.GetTimestampNs());
+ }
return;
}
- mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]);
- }
- auto it = mCurrentSlicedDuration.find(eventKey);
+ HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
- if (matcherIndex == mStartIndex) {
- it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
- } else if (matcherIndex == mStopIndex) {
- it->second->noteStop(atomKey, event.GetTimestampNs(), false);
+ if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
+ if (hitGuardRail(eventKey)) {
+ return;
+ }
+ mCurrentSlicedDuration[eventKey] =
+ createDurationTracker(eventKey, mPastBuckets[eventKey]);
+ }
+ auto it = mCurrentSlicedDuration.find(eventKey);
+
+ if (matcherIndex == mStartIndex) {
+ it->second->noteStart(atomKey, condition, event.GetTimestampNs(), conditionKeys);
+ } else if (matcherIndex == mStopIndex) {
+ it->second->noteStop(atomKey, event.GetTimestampNs(), false);
+ }
}
}
size_t DurationMetricProducer::byteSize() const {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h
index 5b5373e..68fff48 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.h
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.h
@@ -71,6 +71,8 @@
void startNewProtoOutputStream(long long timestamp) override;
private:
+ void SerializeBuckets();
+
const DurationMetric mMetric;
// Index of the SimpleLogEntryMatcher which defines the start.
@@ -96,8 +98,8 @@
std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>>
mCurrentSlicedDuration;
- std::unique_ptr<DurationTracker> createDurationTracker(const HashableDimensionKey& eventKey,
- std::vector<DurationBucket>& bucket);
+ std::unique_ptr<DurationTracker> createDurationTracker(
+ const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const;
bool hitGuardRail(const HashableDimensionKey& newKey);
static const size_t kBucketSize = sizeof(DurationBucket{});
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 95a18f7..53f112a 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -73,6 +73,7 @@
}
void EventMetricProducer::startNewProtoOutputStream(long long startTime) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mProto = std::make_unique<ProtoOutputStream>();
// TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData,
// and StatsEvent.
@@ -89,11 +90,16 @@
std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() {
long long endTime = time(nullptr) * NS_PER_SEC;
- mProto->end(mProtoToken);
- mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
+ // TODO(yanglu): make this section to an util function.
+ {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+ mProto->end(mProtoToken);
+ mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
- size_t bufferSize = mProto->size();
- VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
+ size_t bufferSize = mProto->size();
+ VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
+ }
+
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
startNewProtoOutputStream(endTime);
@@ -103,6 +109,7 @@
void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mCondition = conditionMet;
}
@@ -110,6 +117,7 @@
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (!condition) {
return;
}
@@ -124,6 +132,7 @@
}
size_t EventMetricProducer::byteSize() const {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
return mProto->bytesWritten();
}
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 1791654..ed4c760 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -102,6 +102,7 @@
}
void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,14 +112,8 @@
void GaugeMetricProducer::finish() {
}
-std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
- VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
-
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeeded(time(nullptr) * NS_PER_SEC);
-
+void GaugeMetricProducer::SerializeBuckets() {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
auto it = mDimensionKeyMap.find(hashableKey);
@@ -166,50 +161,69 @@
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
+ mPastBuckets.clear();
+}
+
+std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
+ VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
+
+ // Dump current bucket if it's stale.
+ // If current bucket is still on-going, don't force dump current bucket.
+ // In finish(), We can force dump current bucket.
+ flushIfNeeded(time(nullptr) * NS_PER_SEC);
+
+ SerializeBuckets();
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
- mPastBuckets.clear();
-
return buffer;
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
- AutoMutex _l(mLock);
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
- flushIfNeeded(eventTime);
- mCondition = conditionMet;
- // Push mode. No need to proactively pull the gauge data.
- if (mPullTagId == -1) {
- return;
- }
- if (!mCondition) {
- return;
- }
- // Already have gauge metric for the current bucket, do not do it again.
- if (mCurrentSlicedBucket->size() > 0) {
- return;
- }
+ // flushIfNeeded holds the write lock and is thread-safe.
+ flushIfNeeded(eventTime);
+
vector<std::shared_ptr<LogEvent>> allData;
- if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
- ALOGE("Stats puller failed for tag: %d", mPullTagId);
- return;
+ // The following section is to update the condition and re-pull the gauge.
+ // TODO(yanglu): make it a seperate lockable function.
+ {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
+ mCondition = conditionMet;
+
+ // Push mode. No need to proactively pull the gauge data.
+ if (mPullTagId == -1) {
+ return;
+ }
+ if (!mCondition) {
+ return;
+ }
+ // Already have gauge metric for the current bucket, do not do it again.
+ if (mCurrentSlicedBucket->size() > 0) {
+ return;
+ }
+ if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
+ ALOGE("Stats puller failed for tag: %d", mPullTagId);
+ return;
+ }
}
+
+ // onMatchedLogEventInternal holds the write lock and is thread-safe.
for (const auto& data : allData) {
onMatchedLogEvent(0, *data, false /*scheduledPull*/);
}
- flushIfNeeded(eventTime);
}
void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
-int64_t GaugeMetricProducer::getGauge(const LogEvent& event) {
+int64_t GaugeMetricProducer::getGauge(const LogEvent& event) const {
status_t err = NO_ERROR;
int64_t val = event.GetLong(mMetric.gauge_field(), &err);
if (err == NO_ERROR) {
@@ -221,13 +235,14 @@
}
void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
- AutoMutex mutex(mLock);
+ // onMatchedLogEventInternal holds the write lock and is thread-safe.
for (const auto& data : allData) {
onMatchedLogEvent(0, *data, true /*scheduledPull*/);
}
}
bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
return false;
}
@@ -251,32 +266,32 @@
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const map<string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) {
+ uint64_t eventTimeNs = event.GetTimestampNs();
+ flushIfNeeded(eventTimeNs);
+
if (condition == false) {
return;
}
- uint64_t eventTimeNs = event.GetTimestampNs();
+ const long gauge = getGauge(event);
+ if (gauge < 0) {
+ return;
+ }
+ if (hitGuardRail(eventKey)) {
+ return;
+ }
+
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
(long long)mCurrentBucketStartTimeNs);
return;
}
- // When the event happens in a new bucket, flush the old buckets.
- if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) {
- flushIfNeeded(eventTimeNs);
- }
-
// For gauge metric, we just simply use the first gauge in the given bucket.
if (!mCurrentSlicedBucket->empty()) {
return;
}
- const long gauge = getGauge(event);
- if (gauge >= 0) {
- if (hitGuardRail(eventKey)) {
- return;
- }
(*mCurrentSlicedBucket)[eventKey] = gauge;
- }
for (auto& tracker : mAnomalyTrackers) {
tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge);
}
@@ -288,6 +303,7 @@
// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
// the GaugeMetricProducer while holding the lock.
void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
return;
}
@@ -321,6 +337,7 @@
}
size_t GaugeMetricProducer::byteSize() const {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index f344303..8df6111 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -81,6 +81,8 @@
void startNewProtoOutputStream(long long timestamp) override;
private:
+ void SerializeBuckets();
+
// The default bucket size for gauge metric is 1 second.
static const uint64_t kDefaultGaugemBucketSizeNs = 1000 * 1000 * 1000;
const GaugeMetric mMetric;
@@ -89,8 +91,6 @@
// tagId for pulled data. -1 if this is not pulled
const int mPullTagId;
- Mutex mLock;
-
// Save the past buckets and we can clear when the StatsLogReport is dumped.
// TODO: Add a lock to mPastBuckets.
std::unordered_map<HashableDimensionKey, std::vector<GaugeBucket>> mPastBuckets;
@@ -98,7 +98,7 @@
// The current bucket.
std::shared_ptr<DimToValMap> mCurrentSlicedBucket = std::make_shared<DimToValMap>();
- int64_t getGauge(const LogEvent& event);
+ int64_t getGauge(const LogEvent& event) const;
bool hitGuardRail(const HashableDimensionKey& newKey);
diff --git a/cmds/statsd/src/metrics/MetricProducer.cpp b/cmds/statsd/src/metrics/MetricProducer.cpp
index 62fb632..7542a94 100644
--- a/cmds/statsd/src/metrics/MetricProducer.cpp
+++ b/cmds/statsd/src/metrics/MetricProducer.cpp
@@ -23,6 +23,7 @@
void MetricProducer::onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event,
bool scheduledPull) {
+ std::unique_lock<std::shared_timed_mutex> writeLock(mRWMutex);
uint64_t eventTimeNs = event.GetTimestampNs();
// this is old event, maybe statsd restarted?
if (eventTimeNs < mStartTimeNs) {
@@ -59,12 +60,16 @@
} else {
condition = mCondition;
}
+ // Unlock as onMatchedLogEventInternal is threadsafe.
+ writeLock.unlock();
onMatchedLogEventInternal(matcherIndex, eventKey, conditionKeys, condition, event,
scheduledPull);
}
std::unique_ptr<std::vector<uint8_t>> MetricProducer::serializeProto() {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
size_t bufferSize = mProto->size();
std::unique_ptr<std::vector<uint8_t>> buffer(new std::vector<uint8_t>(bufferSize));
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index b22ff6f..27343ad 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -17,6 +17,8 @@
#ifndef METRIC_PRODUCER_H
#define METRIC_PRODUCER_H
+#include <shared_mutex>
+
#include "anomaly/AnomalyTracker.h"
#include "condition/ConditionWizard.h"
#include "config/ConfigKey.h"
@@ -137,6 +139,10 @@
long long mProtoToken;
+ // Read/Write mutex to make the producer thread-safe.
+ // TODO(yanglu): replace with std::shared_mutex when available in libc++.
+ mutable std::shared_timed_mutex mRWMutex;
+
virtual void startNewProtoOutputStream(long long timestamp) = 0;
std::unique_ptr<std::vector<uint8_t>> serializeProto();
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 66c8419..eed7841 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -121,6 +121,7 @@
}
void ValueMetricProducer::startNewProtoOutputStream(long long startTime) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -136,9 +137,8 @@
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
-std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
- VLOG("metric %s dump report now...", mMetric.name().c_str());
-
+void ValueMetricProducer::SerializeBuckets() {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
VLOG(" dimension key %s", hashableKey.c_str());
@@ -185,47 +185,63 @@
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
+ mPastBuckets.clear();
+}
+std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
VLOG("metric %s dump report now...", mMetric.name().c_str());
+
+ SerializeBuckets();
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
- mPastBuckets.clear();
return buffer;
-
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
void ValueMetricProducer::onConditionChanged(const bool condition, const uint64_t eventTime) {
- AutoMutex _l(mLock);
- mCondition = condition;
+ vector<shared_ptr<LogEvent>> allData;
- if (mPullTagId != -1) {
+ // TODO(yanglu): move the following logic to a seperate function to make it lockable.
+ {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+ mCondition = condition;
+
+ if (mPullTagId == -1) {
+ return;
+ }
+
if (mCondition == true) {
mStatsPullerManager->RegisterReceiver(mPullTagId, this,
mMetric.bucket().bucket_size_millis());
} else if (mCondition == false) {
mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
}
-
- vector<shared_ptr<LogEvent>> allData;
- if (mStatsPullerManager->Pull(mPullTagId, &allData)) {
- if (allData.size() == 0) {
- return;
- }
- for (const auto& data : allData) {
- onMatchedLogEvent(0, *data, false);
- }
- flushIfNeeded(eventTime);
+ if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
+ return;
}
+ }
+
+ if (allData.size() == 0) {
return;
}
+
+ // onMatchedLogEventInternal holds the write lock and is thread-safe.
+ for (const auto& data : allData) {
+ onMatchedLogEvent(0, *data, false);
+ }
+ // flushIfNeeded holds the write lock and is thread-safe.
+ flushIfNeeded(eventTime);
+}
+
+bool ValueMetricProducer::IsConditionMet() const {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
+ return mCondition == true || !mMetric.has_condition();
}
void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
- AutoMutex _l(mLock);
- if (mCondition == true || !mMetric.has_condition()) {
+ if (IsConditionMet()) {
if (allData.size() == 0) {
return;
}
@@ -242,6 +258,7 @@
}
bool ValueMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
// ===========GuardRail==============
// 1. Report the tuple count if the tuple count > soft limit
if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
@@ -262,58 +279,75 @@
return false;
}
-void ValueMetricProducer::onMatchedLogEventInternal(
- const size_t matcherIndex, const HashableDimensionKey& eventKey,
- const map<string, HashableDimensionKey>& conditionKey, bool condition,
- const LogEvent& event, bool scheduledPull) {
- uint64_t eventTimeNs = event.GetTimestampNs();
+void ValueMetricProducer::onMatchedLogEventInternal_pull(const uint64_t& eventTimeNs,
+ const HashableDimensionKey& eventKey,
+ const long& value, bool scheduledPull) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
(long long)mCurrentBucketStartTimeNs);
return;
}
-
- if (hitGuardRail(eventKey)) {
- return;
- }
Interval& interval = mCurrentSlicedBucket[eventKey];
-
- long value = get_value(event);
-
- if (mPullTagId != -1) {
- if (scheduledPull) {
- // scheduled pull always sets beginning of current bucket and end
- // of next bucket
- if (interval.raw.size() > 0) {
- interval.raw.back().second = value;
- } else {
- interval.raw.push_back(make_pair(value, value));
- }
- Interval& nextInterval = mNextSlicedBucket[eventKey];
- if (nextInterval.raw.size() == 0) {
- nextInterval.raw.push_back(make_pair(value, 0));
- } else {
- nextInterval.raw.front().first = value;
- }
+ if (scheduledPull) {
+ // scheduled pull always sets beginning of current bucket and end
+ // of next bucket
+ if (interval.raw.size() > 0) {
+ interval.raw.back().second = value;
} else {
- if (mCondition == true) {
- interval.raw.push_back(make_pair(value, 0));
- } else {
- if (interval.raw.size() != 0) {
- interval.raw.back().second = value;
- } else {
- interval.tainted = true;
- VLOG("Data on condition true missing!");
- }
- }
+ interval.raw.push_back(make_pair(value, value));
+ }
+ Interval& nextInterval = mNextSlicedBucket[eventKey];
+ if (nextInterval.raw.size() == 0) {
+ nextInterval.raw.push_back(make_pair(value, 0));
+ } else {
+ nextInterval.raw.front().first = value;
}
} else {
- flushIfNeeded(eventTimeNs);
- interval.raw.push_back(make_pair(value, 0));
+ if (mCondition == true) {
+ interval.raw.push_back(make_pair(value, 0));
+ } else {
+ if (interval.raw.size() != 0) {
+ interval.raw.back().second = value;
+ } else {
+ interval.tainted = true;
+ VLOG("Data on condition true missing!");
+ }
+ }
}
}
-long ValueMetricProducer::get_value(const LogEvent& event) {
+void ValueMetricProducer::onMatchedLogEventInternal_push(const uint64_t& eventTimeNs,
+ const HashableDimensionKey& eventKey,
+ const long& value) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+ if (eventTimeNs < mCurrentBucketStartTimeNs) {
+ VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
+ (long long)mCurrentBucketStartTimeNs);
+ return;
+ }
+ mCurrentSlicedBucket[eventKey].raw.push_back(make_pair(value, 0));
+}
+
+void ValueMetricProducer::onMatchedLogEventInternal(
+ const size_t matcherIndex, const HashableDimensionKey& eventKey,
+ const map<string, HashableDimensionKey>& conditionKey, bool condition,
+ const LogEvent& event, bool scheduledPull) {
+ uint64_t eventTimeNs = event.GetTimestampNs();
+ long value = get_value(event);
+ if (hitGuardRail(eventKey)) {
+ return;
+ }
+ if (mPullTagId != -1) {
+ onMatchedLogEventInternal_pull(eventTimeNs, eventKey, value, scheduledPull);
+ } else {
+ flushIfNeeded(eventTimeNs);
+ onMatchedLogEventInternal_push(eventTimeNs, eventKey, value);
+ }
+}
+
+long ValueMetricProducer::get_value(const LogEvent& event) const {
status_t err = NO_ERROR;
long val = event.GetLong(mMetric.value_field(), &err);
if (err == NO_ERROR) {
@@ -325,6 +359,7 @@
}
void ValueMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTimeNs) {
VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
(long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
@@ -373,6 +408,7 @@
}
size_t ValueMetricProducer::byteSize() const {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index a024bd8..e87e9da 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -72,6 +72,16 @@
void startNewProtoOutputStream(long long timestamp) override;
private:
+ void onMatchedLogEventInternal_pull(const uint64_t& eventTimeNs,
+ const HashableDimensionKey& eventKey, const long& value,
+ bool scheduledPull);
+ void onMatchedLogEventInternal_push(const uint64_t& eventTimeNs,
+ const HashableDimensionKey& eventKey, const long& value);
+
+ void SerializeBuckets();
+
+ bool IsConditionMet() const;
+
const ValueMetric mMetric;
std::shared_ptr<StatsPullerManager> mStatsPullerManager;
@@ -82,8 +92,6 @@
const int pullTagId, const uint64_t startTimeNs,
std::shared_ptr<StatsPullerManager> statsPullerManager);
- Mutex mLock;
-
// tagId for pulled data. -1 if this is not pulled
const int mPullTagId;
@@ -102,7 +110,7 @@
// TODO: Add a lock to mPastBuckets.
std::unordered_map<HashableDimensionKey, std::vector<ValueBucket>> mPastBuckets;
- long get_value(const LogEvent& event);
+ long get_value(const LogEvent& event) const;
bool hitGuardRail(const HashableDimensionKey& newKey);