Refactor ValueMetricProducer bucket flush

refactor to simplify the condition nesting in flushCurrentBucketLocked

Bug: 157155330
Test: atest statsd_test
Change-Id: Ieff0e8d8d65e1a4c9fc576e68fd7fa912849f806
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index dbec24b..2af9f6c 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -948,6 +948,11 @@
         StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
     }
 
+    VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
+         (int)mCurrentSlicedBucket.size());
+
+    int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
+    int64_t bucketEndTime = fullBucketEndTimeNs;
     int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
     if (numBucketsForward > 1) {
         VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
@@ -956,20 +961,20 @@
         // to mark the current bucket as invalid. The last pull might have been successful through.
         invalidateCurrentBucketWithoutResetBase(eventTimeNs,
                                                 BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
+        // End the bucket at the next bucket start time so the entire interval is skipped.
+        bucketEndTime = nextBucketStartTimeNs;
+    } else if (eventTimeNs < fullBucketEndTimeNs) {
+        bucketEndTime = eventTimeNs;
     }
 
-    VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
-         (int)mCurrentSlicedBucket.size());
-    int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
-    int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
     // Close the current bucket.
     int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
     bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
     if (!isBucketLargeEnough) {
         skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
     }
-    bool bucketHasData = false;
     if (!mCurrentBucketIsSkipped) {
+        bool bucketHasData = false;
         // The current bucket is large enough to keep.
         for (const auto& slice : mCurrentSlicedBucket) {
             ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
@@ -981,22 +986,22 @@
                 bucketHasData = true;
             }
         }
-    }
-
-    if (!bucketHasData && !mCurrentBucketIsSkipped) {
-        skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
+        if (!bucketHasData) {
+            skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
+        }
     }
 
     if (mCurrentBucketIsSkipped) {
         mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
-        // Fill in the gap if we skipped multiple buckets.
-        mCurrentSkippedBucket.bucketEndTimeNs =
-                numBucketsForward > 1 ? nextBucketStartTimeNs : bucketEndTime;
+        mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
         mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
     }
 
     // This means that the current bucket was not flushed before a forced bucket split.
-    if (bucketEndTime < nextBucketStartTimeNs && numBucketsForward <= 1) {
+    // This can happen if an app update or a dump report with include_current_partial_bucket is
+    // requested before we get a chance to flush the bucket due to receiving new data, either from
+    // the statsd socket or the StatsPullerManager.
+    if (bucketEndTime < nextBucketStartTimeNs) {
         SkippedBucket bucketInGap;
         bucketInGap.bucketStartTimeNs = bucketEndTime;
         bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
@@ -1005,7 +1010,7 @@
         mSkippedBuckets.emplace_back(bucketInGap);
     }
 
-    appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
+    appendToFullBucket(eventTimeNs > fullBucketEndTimeNs);
     initCurrentSlicedBucket(nextBucketStartTimeNs);
     // Update the condition timer again, in case we skipped buckets.
     mConditionTimer.newBucketStart(nextBucketStartTimeNs);
@@ -1071,8 +1076,7 @@
          (long long)mCurrentBucketStartTimeNs);
 }
 
-void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
-    bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs;
+void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
     if (mCurrentBucketIsSkipped) {
         if (isFullBucketReached) {
             // If the bucket is invalid, we ignore the full bucket since it contains invalid data.
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index bb4a661..aaf7df5 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -142,8 +142,10 @@
 
     // Mark the data as invalid.
     void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);
+
     void invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,
                                                  const BucketDropReason reason);
+
     // Skips the current bucket without notifying StatsdStats of the skipped bucket.
     // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that
     // causes the bucket to be invalidated will not notify StatsdStats.
@@ -209,6 +211,7 @@
 
     // Util function to check whether the specified dimension hits the guardrail.
     bool hitGuardRailLocked(const MetricDimensionKey& newKey);
+
     bool hasReachedGuardRailLimit() const;
 
     bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey);
@@ -220,8 +223,10 @@
 
     ValueBucket buildPartialBucket(int64_t bucketEndTime,
                                    const std::vector<Interval>& intervals);
+
     void initCurrentSlicedBucket(int64_t nextBucketStartTimeNs);
-    void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs);
+
+    void appendToFullBucket(const bool isFullBucketReached);
 
     // Reset diff base and mHasGlobalBase
     void resetBase();