Unify the way we process events on condition change and bucket
boundaries.
- We now always process empty data and events that exceeds the max delay
in a consistent way
- Fix one bug which caused base data not to be reset, e.g.
bucket h+1 on condition change to true, we pull the following data (key A / value 1, key B / value 2)
bucket h+2 on bucket boundary, we pull the following data (key A / value 2)
In this case the previous code, did not reset Key B. It should be reset
since it is not present in the most recent data.
Test: atest statsd_test
Bug: 124046337
Change-Id: I19456bbe1529e72befbb621be185ce24bd65077a
diff --git a/cmds/statsd/src/external/PullDataReceiver.h b/cmds/statsd/src/external/PullDataReceiver.h
index b071682..d2193f4 100644
--- a/cmds/statsd/src/external/PullDataReceiver.h
+++ b/cmds/statsd/src/external/PullDataReceiver.h
@@ -32,9 +32,10 @@
* @param data The pulled data.
* @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the
* bucket should be invalidated.
+ * @param originalPullTimeNs This is when all the pulls have been initiated (elapsed time).
*/
virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
- bool pullSuccess) = 0;
+ bool pullSuccess, int64_t originalPullTimeNs) = 0;
};
} // namespace statsd
diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp
index 9b603d6..f055939 100644
--- a/cmds/statsd/src/external/StatsPullerManager.cpp
+++ b/cmds/statsd/src/external/StatsPullerManager.cpp
@@ -381,7 +381,7 @@
for (const auto& receiverInfo : pullInfo.second) {
sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
if (receiverPtr != nullptr) {
- receiverPtr->onDataPulled(data, pullSuccess);
+ receiverPtr->onDataPulled(data, pullSuccess, elapsedTimeNs);
// We may have just come out of a coma, compute next pull time.
int numBucketsAhead =
(elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 2609937..d56a355 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -407,7 +407,7 @@
}
void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
- bool pullSuccess) {
+ bool pullSuccess, int64_t originalPullTimeNs) {
std::lock_guard<std::mutex> lock(mMutex);
if (!pullSuccess || allData.size() == 0) {
return;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index d480941..64a1833 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -68,7 +68,7 @@
// Handles when the pulled data arrives.
void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
- bool pullSuccess) override;
+ bool pullSuccess, int64_t originalPullTimeNs) override;
// GaugeMetric needs to immediately trigger another pull when we create the partial bucket.
void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 1bd3ef2..4fc9c37 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -361,7 +361,55 @@
invalidateCurrentBucket();
return;
}
- const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+
+ accumulateEvents(allData, timestampNs, timestampNs);
+}
+
+int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
+ return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
+}
+
+void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+ bool pullSuccess, int64_t originalPullTimeNs) {
+ std::lock_guard<std::mutex> lock(mMutex);
+ if (mCondition) {
+ if (!pullSuccess) {
+ // If the pull failed, we won't be able to compute a diff.
+ invalidateCurrentBucket();
+ return;
+ }
+
+ // For scheduled pulled data, the effective event time is snap to the nearest
+ // bucket end. In the case of waking up from a deep sleep state, we will
+ // attribute to the previous bucket end. If the sleep was long but not very long, we
+ // will be in the immediate next bucket. Previous bucket may get a larger number as
+ // we pull at a later time than real bucket end.
+ // If the sleep was very long, we skip more than one bucket before sleep. In this case,
+ // if the diff base will be cleared and this new data will serve as new diff base.
+ int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
+ accumulateEvents(allData, originalPullTimeNs, bucketEndTime);
+
+ // We can probably flush the bucket. Since we used bucketEndTime when calling
+ // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
+ flushIfNeededLocked(originalPullTimeNs);
+
+ } else {
+ VLOG("No need to commit data on condition false.");
+ }
+}
+
+void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
+ int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) {
+ bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
+ if (isEventLate) {
+ VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
+ (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
+ StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
+ invalidateCurrentBucket();
+ return;
+ }
+
+ const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs;
StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
if (pullDelayNs > mMaxPullDelayNs) {
ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
@@ -373,75 +421,33 @@
return;
}
- if (timestampNs < mCurrentBucketStartTimeNs) {
- // The data will be skipped in onMatchedLogEventInternalLocked, but we don't want to report
- // for every event, just the pull
- StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
+ if (allData.size() == 0) {
+ VLOG("Data pulled is empty");
+ StatsdStats::getInstance().noteEmptyData(mPullTagId);
}
+ mMatchedMetricDimensionKeys.clear();
for (const auto& data : allData) {
- // make a copy before doing and changes
LogEvent localCopy = data->makeCopy();
- localCopy.setElapsedTimestampNs(timestampNs);
if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
MatchingState::kMatched) {
+ localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
}
}
- mHasGlobalBase = true;
-}
-
-int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
- return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
-}
-
-void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
- bool pullSuccess) {
- std::lock_guard<std::mutex> lock(mMutex);
- if (mCondition) {
- if (!pullSuccess) {
- // If the pull failed, we won't be able to compute a diff.
- invalidateCurrentBucket();
- return;
- }
-
- if (allData.size() == 0) {
- VLOG("Data pulled is empty");
- StatsdStats::getInstance().noteEmptyData(mPullTagId);
- return;
- }
- // For scheduled pulled data, the effective event time is snap to the nearest
- // bucket end. In the case of waking up from a deep sleep state, we will
- // attribute to the previous bucket end. If the sleep was long but not very long, we
- // will be in the immediate next bucket. Previous bucket may get a larger number as
- // we pull at a later time than real bucket end.
- // If the sleep was very long, we skip more than one bucket before sleep. In this case,
- // if the diff base will be cleared and this new data will serve as new diff base.
- int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
- int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
- bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs;
- if (isEventLate) {
- VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
- (long long)mCurrentBucketStartTimeNs);
- StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
- }
-
- for (const auto& data : allData) {
- LogEvent localCopy = data->makeCopy();
- if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
- MatchingState::kMatched) {
- localCopy.setElapsedTimestampNs(bucketEndTime);
- onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
+ // If the new pulled data does not contains some keys we track in our intervals, we need to
+ // reset the base.
+ for (auto& slice : mCurrentSlicedBucket) {
+ bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first)
+ != mMatchedMetricDimensionKeys.end();
+ if (!presentInPulledData) {
+ for (auto& interval : slice.second) {
+ interval.hasBase = false;
}
}
- mHasGlobalBase = true;
-
- // We can probably flush the bucket. Since we used bucketEndTime when calling
- // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
- flushIfNeededLocked(realEventTime);
- } else {
- VLOG("No need to commit data on condition false.");
}
+ mMatchedMetricDimensionKeys.clear();
+ mHasGlobalBase = true;
}
void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
@@ -539,6 +545,7 @@
(long long)mCurrentBucketStartTimeNs);
return;
}
+ mMatchedMetricDimensionKeys.insert(eventKey);
flushIfNeededLocked(eventTimeNs);
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 0cfefa9..d1c2315 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -52,7 +52,7 @@
// Process data pulled on bucket boundary.
void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
- bool pullSuccess) override;
+ bool pullSuccess, int64_t originalPullTimeNs) override;
// ValueMetric needs special logic if it's a pulled atom.
void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
@@ -116,6 +116,9 @@
// Value fields for matching.
std::vector<Matcher> mFieldMatchers;
+ // Value fields for matching.
+ std::set<MetricDimensionKey> mMatchedMetricDimensionKeys;
+
// tagId for pulled data. -1 if this is not pulled
const int mPullTagId;
@@ -160,6 +163,9 @@
void pullAndMatchEventsLocked(const int64_t timestampNs);
+ void accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData,
+ int64_t originalPullTimeNs, int64_t eventElapsedTimeNs);
+
ValueBucket buildPartialBucket(int64_t bucketEndTime,
const std::vector<Interval>& intervals);
void initCurrentSlicedBucket();
@@ -242,6 +248,10 @@
FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed);
FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullDelayExceeded);
FRIEND_TEST(ValueMetricProducerTest, TestBaseSetOnConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled);
+ FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onConditionChanged);
+ FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary);
+ FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries);
};
} // namespace statsd