Refactor ValueMetricProducer bucket flush
refactor to simplify the condition nesting in flushCurrentBucketLocked
Bug: 157155330
Test: atest statsd_test
Change-Id: Ieff0e8d8d65e1a4c9fc576e68fd7fa912849f806
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index dbec24b..2af9f6c 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -948,6 +948,11 @@
StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
}
+ VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
+ (int)mCurrentSlicedBucket.size());
+
+ int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
+ int64_t bucketEndTime = fullBucketEndTimeNs;
int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
if (numBucketsForward > 1) {
VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
@@ -956,20 +961,20 @@
// to mark the current bucket as invalid. The last pull might have been successful through.
invalidateCurrentBucketWithoutResetBase(eventTimeNs,
BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
+ // End the bucket at the next bucket start time so the entire interval is skipped.
+ bucketEndTime = nextBucketStartTimeNs;
+ } else if (eventTimeNs < fullBucketEndTimeNs) {
+ bucketEndTime = eventTimeNs;
}
- VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
- (int)mCurrentSlicedBucket.size());
- int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
- int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
// Close the current bucket.
int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
if (!isBucketLargeEnough) {
skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
}
- bool bucketHasData = false;
if (!mCurrentBucketIsSkipped) {
+ bool bucketHasData = false;
// The current bucket is large enough to keep.
for (const auto& slice : mCurrentSlicedBucket) {
ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
@@ -981,22 +986,22 @@
bucketHasData = true;
}
}
- }
-
- if (!bucketHasData && !mCurrentBucketIsSkipped) {
- skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
+ if (!bucketHasData) {
+ skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
+ }
}
if (mCurrentBucketIsSkipped) {
mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
- // Fill in the gap if we skipped multiple buckets.
- mCurrentSkippedBucket.bucketEndTimeNs =
- numBucketsForward > 1 ? nextBucketStartTimeNs : bucketEndTime;
+ mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
}
// This means that the current bucket was not flushed before a forced bucket split.
- if (bucketEndTime < nextBucketStartTimeNs && numBucketsForward <= 1) {
+ // This can happen if an app update or a dump report with include_current_partial_bucket is
+ // requested before we get a chance to flush the bucket due to receiving new data, either from
+ // the statsd socket or the StatsPullerManager.
+ if (bucketEndTime < nextBucketStartTimeNs) {
SkippedBucket bucketInGap;
bucketInGap.bucketStartTimeNs = bucketEndTime;
bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
@@ -1005,7 +1010,7 @@
mSkippedBuckets.emplace_back(bucketInGap);
}
- appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
+ appendToFullBucket(eventTimeNs > fullBucketEndTimeNs);
initCurrentSlicedBucket(nextBucketStartTimeNs);
// Update the condition timer again, in case we skipped buckets.
mConditionTimer.newBucketStart(nextBucketStartTimeNs);
@@ -1071,8 +1076,7 @@
(long long)mCurrentBucketStartTimeNs);
}
-void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
- bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs;
+void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
if (mCurrentBucketIsSkipped) {
if (isFullBucketReached) {
// If the bucket is invalid, we ignore the full bucket since it contains invalid data.