Thread-safe metric producers.
Test: unit test passed
Change-Id: Ie47404e8649b63ee8ac32e40189a47f6cb7a9def
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 1791654..ed4c760 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -102,6 +102,7 @@
}
void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,14 +112,8 @@
void GaugeMetricProducer::finish() {
}
-std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
- VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
-
- // Dump current bucket if it's stale.
- // If current bucket is still on-going, don't force dump current bucket.
- // In finish(), We can force dump current bucket.
- flushIfNeeded(time(nullptr) * NS_PER_SEC);
-
+void GaugeMetricProducer::SerializeBuckets() {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
auto it = mDimensionKeyMap.find(hashableKey);
@@ -166,50 +161,69 @@
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
+ mPastBuckets.clear();
+}
+
+std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
+ VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
+
+ // Dump current bucket if it's stale.
+ // If current bucket is still on-going, don't force dump current bucket.
+ // In finish(), We can force dump current bucket.
+ flushIfNeeded(time(nullptr) * NS_PER_SEC);
+
+ SerializeBuckets();
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
- mPastBuckets.clear();
-
return buffer;
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
- AutoMutex _l(mLock);
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
- flushIfNeeded(eventTime);
- mCondition = conditionMet;
- // Push mode. No need to proactively pull the gauge data.
- if (mPullTagId == -1) {
- return;
- }
- if (!mCondition) {
- return;
- }
- // Already have gauge metric for the current bucket, do not do it again.
- if (mCurrentSlicedBucket->size() > 0) {
- return;
- }
+ // flushIfNeeded holds the write lock and is thread-safe.
+ flushIfNeeded(eventTime);
+
vector<std::shared_ptr<LogEvent>> allData;
- if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
- ALOGE("Stats puller failed for tag: %d", mPullTagId);
- return;
+ // The following section is to update the condition and re-pull the gauge.
+ // TODO(yanglu): make it a seperate lockable function.
+ {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
+
+ mCondition = conditionMet;
+
+ // Push mode. No need to proactively pull the gauge data.
+ if (mPullTagId == -1) {
+ return;
+ }
+ if (!mCondition) {
+ return;
+ }
+ // Already have gauge metric for the current bucket, do not do it again.
+ if (mCurrentSlicedBucket->size() > 0) {
+ return;
+ }
+ if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
+ ALOGE("Stats puller failed for tag: %d", mPullTagId);
+ return;
+ }
}
+
+ // onMatchedLogEventInternal holds the write lock and is thread-safe.
for (const auto& data : allData) {
onMatchedLogEvent(0, *data, false /*scheduledPull*/);
}
- flushIfNeeded(eventTime);
}
void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
-int64_t GaugeMetricProducer::getGauge(const LogEvent& event) {
+int64_t GaugeMetricProducer::getGauge(const LogEvent& event) const {
status_t err = NO_ERROR;
int64_t val = event.GetLong(mMetric.gauge_field(), &err);
if (err == NO_ERROR) {
@@ -221,13 +235,14 @@
}
void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
- AutoMutex mutex(mLock);
+ // onMatchedLogEventInternal holds the write lock and is thread-safe.
for (const auto& data : allData) {
onMatchedLogEvent(0, *data, true /*scheduledPull*/);
}
}
bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
return false;
}
@@ -251,32 +266,32 @@
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const map<string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) {
+ uint64_t eventTimeNs = event.GetTimestampNs();
+ flushIfNeeded(eventTimeNs);
+
if (condition == false) {
return;
}
- uint64_t eventTimeNs = event.GetTimestampNs();
+ const long gauge = getGauge(event);
+ if (gauge < 0) {
+ return;
+ }
+ if (hitGuardRail(eventKey)) {
+ return;
+ }
+
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
(long long)mCurrentBucketStartTimeNs);
return;
}
- // When the event happens in a new bucket, flush the old buckets.
- if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) {
- flushIfNeeded(eventTimeNs);
- }
-
// For gauge metric, we just simply use the first gauge in the given bucket.
if (!mCurrentSlicedBucket->empty()) {
return;
}
- const long gauge = getGauge(event);
- if (gauge >= 0) {
- if (hitGuardRail(eventKey)) {
- return;
- }
(*mCurrentSlicedBucket)[eventKey] = gauge;
- }
for (auto& tracker : mAnomalyTrackers) {
tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge);
}
@@ -288,6 +303,7 @@
// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
// the GaugeMetricProducer while holding the lock.
void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
+ std::lock_guard<std::shared_timed_mutex> writeLock(mRWMutex);
if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
return;
}
@@ -321,6 +337,7 @@
}
size_t GaugeMetricProducer::byteSize() const {
+ std::shared_lock<std::shared_timed_mutex> readLock(mRWMutex);
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;