Thread-safety at log processor level.
Test: statsd unit test passed.
Change-Id: Ibe8c8d3cc8297875b16ee385c077b71c87353147
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index 991badc..a9e0f23 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -82,9 +82,7 @@
void StatsLogProcessor::onAnomalyAlarmFired(
const uint64_t timestampNs,
unordered_set<sp<const AnomalyAlarm>, SpHash<AnomalyAlarm>> anomalySet) {
- // TODO: This is a thread-safety issue. mMetricsManagers could change under our feet.
- // TODO: Solution? Lock everything! :(
- // TODO: Question: Can we replace the other lock (broadcast), or do we need to supplement it?
+ std::lock_guard<std::mutex> lock(mMetricsMutex);
for (const auto& itr : mMetricsManagers) {
itr.second->onAnomalyAlarmFired(timestampNs, anomalySet);
}
@@ -92,11 +90,13 @@
// TODO: what if statsd service restarts? How do we know what logs are already processed before?
void StatsLogProcessor::OnLogEvent(const LogEvent& msg) {
+ std::lock_guard<std::mutex> lock(mMetricsMutex);
+
StatsdStats::getInstance().noteAtomLogged(msg.GetTagId(), msg.GetTimestampNs() / NS_PER_SEC);
// pass the event to metrics managers.
for (auto& pair : mMetricsManagers) {
pair.second->onLogEvent(msg);
- flushIfNecessary(msg.GetTimestampNs(), pair.first, *(pair.second));
+ flushIfNecessaryLocked(msg.GetTimestampNs(), pair.first, *(pair.second));
}
// Hard-coded logic to update the isolated uid's in the uid-map.
// The field numbers need to be currently updated by hand with atoms.proto
@@ -116,6 +116,7 @@
}
void StatsLogProcessor::OnConfigUpdated(const ConfigKey& key, const StatsdConfig& config) {
+ std::lock_guard<std::mutex> lock(mMetricsMutex);
ALOGD("Updated configuration for key %s", key.ToString().c_str());
sp<MetricsManager> newMetricsManager = new MetricsManager(key, config, mTimeBaseSec, mUidMap);
auto it = mMetricsManagers.find(key);
@@ -142,6 +143,7 @@
}
size_t StatsLogProcessor::GetMetricsSize(const ConfigKey& key) const {
+ std::lock_guard<std::mutex> lock(mMetricsMutex);
auto it = mMetricsManagers.find(key);
if (it == mMetricsManagers.end()) {
ALOGW("Config source %s does not exist", key.ToString().c_str());
@@ -152,6 +154,7 @@
void StatsLogProcessor::onDumpReport(const ConfigKey& key, const uint64_t& dumpTimeStampNs,
ConfigMetricsReportList* report) {
+ std::lock_guard<std::mutex> lock(mMetricsMutex);
auto it = mMetricsManagers.find(key);
if (it == mMetricsManagers.end()) {
ALOGW("Config source %s does not exist", key.ToString().c_str());
@@ -165,6 +168,7 @@
}
void StatsLogProcessor::onDumpReport(const ConfigKey& key, vector<uint8_t>* outData) {
+ std::lock_guard<std::mutex> lock(mMetricsMutex);
auto it = mMetricsManagers.find(key);
if (it == mMetricsManagers.end()) {
ALOGW("Config source %s does not exist", key.ToString().c_str());
@@ -173,9 +177,7 @@
// This allows another broadcast to be sent within the rate-limit period if we get close to
// filling the buffer again soon.
- mBroadcastTimesMutex.lock();
mLastBroadcastTimes.erase(key);
- mBroadcastTimesMutex.unlock();
ProtoOutputStream proto;
@@ -224,6 +226,7 @@
}
void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
+ std::lock_guard<std::mutex> lock(mMetricsMutex);
auto it = mMetricsManagers.find(key);
if (it != mMetricsManagers.end()) {
mMetricsManagers.erase(it);
@@ -231,14 +234,11 @@
}
StatsdStats::getInstance().noteConfigRemoved(key);
- std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);
mLastBroadcastTimes.erase(key);
}
-void StatsLogProcessor::flushIfNecessary(uint64_t timestampNs, const ConfigKey& key,
- MetricsManager& metricsManager) {
- std::lock_guard<std::mutex> lock(mBroadcastTimesMutex);
-
+void StatsLogProcessor::flushIfNecessaryLocked(
+ uint64_t timestampNs, const ConfigKey& key, MetricsManager& metricsManager) {
auto lastCheckTime = mLastByteSizeTimes.find(key);
if (lastCheckTime != mLastByteSizeTimes.end()) {
if (timestampNs - lastCheckTime->second < StatsdStats::kMinByteSizeCheckPeriodNs) {
@@ -274,6 +274,7 @@
void StatsLogProcessor::WriteDataToDisk() {
mkdir(STATS_DATA_DIR, S_IRWXU);
+ std::lock_guard<std::mutex> lock(mMetricsMutex);
for (auto& pair : mMetricsManagers) {
const ConfigKey& key = pair.first;
vector<uint8_t> data;