Unify the way we process events on condition change and bucket
boundaries.

- We now always process empty data and events that exceeds the max delay
in a consistent way

- Fix one bug which caused base data not to be reset, e.g.
bucket h+1 on condition change to true, we pull the following data (key A / value 1, key B / value 2)
bucket h+2 on bucket boundary, we pull the following data (key A / value 2)
In this case the previous code, did not reset Key B. It should be reset
since it is not present in the most recent data.

Test: atest statsd_test
Bug: 124046337
Change-Id: I19456bbe1529e72befbb621be185ce24bd65077a
diff --git a/cmds/statsd/src/external/PullDataReceiver.h b/cmds/statsd/src/external/PullDataReceiver.h
index b071682..d2193f4 100644
--- a/cmds/statsd/src/external/PullDataReceiver.h
+++ b/cmds/statsd/src/external/PullDataReceiver.h
@@ -32,9 +32,10 @@
    * @param data The pulled data.
    * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the
    * bucket should be invalidated.
+   * @param originalPullTimeNs This is when all the pulls have been initiated (elapsed time).
    */
   virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, 
-                            bool pullSuccess) = 0;
+                            bool pullSuccess, int64_t originalPullTimeNs) = 0;
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp
index 9b603d6..f055939 100644
--- a/cmds/statsd/src/external/StatsPullerManager.cpp
+++ b/cmds/statsd/src/external/StatsPullerManager.cpp
@@ -381,7 +381,7 @@
         for (const auto& receiverInfo : pullInfo.second) {
             sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
             if (receiverPtr != nullptr) {
-                receiverPtr->onDataPulled(data, pullSuccess);
+                receiverPtr->onDataPulled(data, pullSuccess, elapsedTimeNs);
                 // We may have just come out of a coma, compute next pull time.
                 int numBucketsAhead =
                         (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 2609937..d56a355 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -407,7 +407,7 @@
 }
 
 void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
-                                       bool pullSuccess) {
+                                       bool pullSuccess, int64_t originalPullTimeNs) {
     std::lock_guard<std::mutex> lock(mMutex);
     if (!pullSuccess || allData.size() == 0) {
         return;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index d480941..64a1833 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -68,7 +68,7 @@
 
     // Handles when the pulled data arrives.
     void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
-                      bool pullSuccess) override;
+                      bool pullSuccess, int64_t originalPullTimeNs) override;
 
     // GaugeMetric needs to immediately trigger another pull when we create the partial bucket.
     void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 1bd3ef2..4fc9c37 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -361,7 +361,55 @@
         invalidateCurrentBucket();
         return;
     }
-    const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+
+    accumulateEvents(allData, timestampNs, timestampNs);
+}
+
+int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
+    return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
+}
+
+void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+                                       bool pullSuccess, int64_t originalPullTimeNs) {
+    std::lock_guard<std::mutex> lock(mMutex);
+    if (mCondition) {
+        if (!pullSuccess) {
+            // If the pull failed, we won't be able to compute a diff.
+            invalidateCurrentBucket();
+            return;
+        }
+
+        // For scheduled pulled data, the effective event time is snap to the nearest
+        // bucket end. In the case of waking up from a deep sleep state, we will
+        // attribute to the previous bucket end. If the sleep was long but not very long, we
+        // will be in the immediate next bucket. Previous bucket may get a larger number as
+        // we pull at a later time than real bucket end.
+        // If the sleep was very long, we skip more than one bucket before sleep. In this case,
+        // if the diff base will be cleared and this new data will serve as new diff base.
+        int64_t bucketEndTime = calcPreviousBucketEndTime(originalPullTimeNs) - 1;
+        accumulateEvents(allData, originalPullTimeNs, bucketEndTime);
+
+        // We can probably flush the bucket. Since we used bucketEndTime when calling
+        // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
+        flushIfNeededLocked(originalPullTimeNs);
+
+    } else {
+        VLOG("No need to commit data on condition false.");
+    }
+}
+
+void ValueMetricProducer::accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, 
+                                           int64_t originalPullTimeNs, int64_t eventElapsedTimeNs) {
+    bool isEventLate = eventElapsedTimeNs < mCurrentBucketStartTimeNs;
+    if (isEventLate) {
+        VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
+             (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
+        StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
+        invalidateCurrentBucket();
+        return;
+    }
+
+    const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs;
     StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
     if (pullDelayNs > mMaxPullDelayNs) {
         ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
@@ -373,75 +421,33 @@
         return;
     }
 
-    if (timestampNs < mCurrentBucketStartTimeNs) {
-        // The data will be skipped in onMatchedLogEventInternalLocked, but we don't want to report
-        // for every event, just the pull
-        StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
+    if (allData.size() == 0) {
+        VLOG("Data pulled is empty");
+        StatsdStats::getInstance().noteEmptyData(mPullTagId);
     }
 
+    mMatchedMetricDimensionKeys.clear();
     for (const auto& data : allData) {
-        // make a copy before doing and changes
         LogEvent localCopy = data->makeCopy();
-        localCopy.setElapsedTimestampNs(timestampNs);
         if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
             MatchingState::kMatched) {
+            localCopy.setElapsedTimestampNs(eventElapsedTimeNs);
             onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
         }
     }
-    mHasGlobalBase = true;
-}
-
-int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
-    return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
-}
-
-void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
-                                       bool pullSuccess) {
-    std::lock_guard<std::mutex> lock(mMutex);
-    if (mCondition) {
-        if (!pullSuccess) {
-            // If the pull failed, we won't be able to compute a diff.
-            invalidateCurrentBucket();
-            return;
-        }
-
-        if (allData.size() == 0) {
-            VLOG("Data pulled is empty");
-            StatsdStats::getInstance().noteEmptyData(mPullTagId);
-            return;
-        }
-        // For scheduled pulled data, the effective event time is snap to the nearest
-        // bucket end. In the case of waking up from a deep sleep state, we will
-        // attribute to the previous bucket end. If the sleep was long but not very long, we
-        // will be in the immediate next bucket. Previous bucket may get a larger number as
-        // we pull at a later time than real bucket end.
-        // If the sleep was very long, we skip more than one bucket before sleep. In this case,
-        // if the diff base will be cleared and this new data will serve as new diff base.
-        int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
-        int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
-        bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs;
-        if (isEventLate) {
-            VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
-                 (long long)mCurrentBucketStartTimeNs);
-            StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
-        }
-
-        for (const auto& data : allData) {
-            LogEvent localCopy = data->makeCopy();
-            if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
-                MatchingState::kMatched) {
-                localCopy.setElapsedTimestampNs(bucketEndTime);
-                onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
+    // If the new pulled data does not contains some keys we track in our intervals, we need to
+    // reset the base.
+    for (auto& slice : mCurrentSlicedBucket) {
+        bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first) 
+                != mMatchedMetricDimensionKeys.end();
+        if (!presentInPulledData) {
+            for (auto& interval : slice.second) {
+                interval.hasBase = false;
             }
         }
-        mHasGlobalBase = true;
-
-        // We can probably flush the bucket. Since we used bucketEndTime when calling
-        // #onMatchedLogEventInternalLocked, the current bucket will not have been flushed.
-        flushIfNeededLocked(realEventTime);
-    } else {
-        VLOG("No need to commit data on condition false.");
     }
+    mMatchedMetricDimensionKeys.clear();
+    mHasGlobalBase = true;
 }
 
 void ValueMetricProducer::dumpStatesLocked(FILE* out, bool verbose) const {
@@ -539,6 +545,7 @@
              (long long)mCurrentBucketStartTimeNs);
         return;
     }
+    mMatchedMetricDimensionKeys.insert(eventKey);
 
     flushIfNeededLocked(eventTimeNs);
 
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 0cfefa9..d1c2315 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -52,7 +52,7 @@
 
     // Process data pulled on bucket boundary.
     void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
-                      bool pullSuccess) override;
+                      bool pullSuccess, int64_t originalPullTimeNs) override;
 
     // ValueMetric needs special logic if it's a pulled atom.
     void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
@@ -116,6 +116,9 @@
     // Value fields for matching.
     std::vector<Matcher> mFieldMatchers;
 
+    // Value fields for matching.
+    std::set<MetricDimensionKey> mMatchedMetricDimensionKeys;
+
     // tagId for pulled data. -1 if this is not pulled
     const int mPullTagId;
 
@@ -160,6 +163,9 @@
 
     void pullAndMatchEventsLocked(const int64_t timestampNs);
 
+    void accumulateEvents(const std::vector<std::shared_ptr<LogEvent>>& allData, 
+                          int64_t originalPullTimeNs, int64_t eventElapsedTimeNs);
+
     ValueBucket buildPartialBucket(int64_t bucketEndTime,
                                    const std::vector<Interval>& intervals);
     void initCurrentSlicedBucket();
@@ -242,6 +248,10 @@
     FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed);
     FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullDelayExceeded);
     FRIEND_TEST(ValueMetricProducerTest, TestBaseSetOnConditionChange);
+    FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled);
+    FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onConditionChanged);
+    FRIEND_TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary);
+    FRIEND_TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries);
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
index 1725160..6286823 100644
--- a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
@@ -133,7 +133,7 @@
     event->init();
     allData.push_back(event);
 
-    gaugeProducer.onDataPulled(allData, /** succeed */ true);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     auto it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
     EXPECT_EQ(INT, it->mValue.getType());
@@ -151,7 +151,7 @@
     event2->write(25);
     event2->init();
     allData.push_back(event2);
-    gaugeProducer.onDataPulled(allData, /** succeed */ true);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
     EXPECT_EQ(INT, it->mValue.getType());
@@ -305,7 +305,7 @@
     event->write(1);
     event->init();
     allData.push_back(event);
-    gaugeProducer.onDataPulled(allData, /** succeed */ true);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true, bucketStartTimeNs);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
                          ->second.front()
@@ -328,7 +328,7 @@
     event->write(3);
     event->init();
     allData.push_back(event);
-    gaugeProducer.onDataPulled(allData, /** succeed */ true);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true, bucketStartTimeNs + bucketSizeNs);
     EXPECT_EQ(2UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(3, gaugeProducer.mCurrentSlicedBucket->begin()
@@ -371,7 +371,7 @@
     event->write(1);
     event->init();
     allData.push_back(event);
-    gaugeProducer.onDataPulled(allData, /** succeed */ true);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true, bucketStartTimeNs);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
                          ->second.front()
@@ -440,7 +440,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    gaugeProducer.onDataPulled(allData, /** succeed */ true);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
 
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin()
@@ -541,7 +541,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    gaugeProducer.onDataPulled(allData, /** succeed */ true);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
 
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
@@ -590,7 +590,7 @@
     event1->write(13);
     event1->init();
 
-    gaugeProducer.onDataPulled({event1}, /** succeed */ true);
+    gaugeProducer.onDataPulled({event1}, /** succeed */ true, bucketStartTimeNs);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()
                            ->second.front()
@@ -604,7 +604,7 @@
     event2->write(15);
     event2->init();
 
-    gaugeProducer.onDataPulled({event2}, /** succeed */ true);
+    gaugeProducer.onDataPulled({event2}, /** succeed */ true, bucketStartTimeNs + bucketSizeNs);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()
                            ->second.front()
@@ -619,7 +619,7 @@
     event3->write(26);
     event3->init();
 
-    gaugeProducer.onDataPulled({event3}, /** succeed */ true);
+    gaugeProducer.onDataPulled({event3}, /** succeed */ true, bucket2StartTimeNs + 2 * bucketSizeNs);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(26L, gaugeProducer.mCurrentSlicedBucket->begin()
                            ->second.front()
@@ -633,7 +633,7 @@
             std::make_shared<LogEvent>(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 10);
     event4->write("some value");
     event4->init();
-    gaugeProducer.onDataPulled({event4}, /** succeed */ true);
+    gaugeProducer.onDataPulled({event4}, /** succeed */ true, bucketStartTimeNs + 3 * bucketSizeNs);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_TRUE(gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->empty());
 }
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index 91b98ec..ae3cdbc 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -160,7 +160,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -178,7 +178,7 @@
     event->write(23);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -198,7 +198,7 @@
     event->write(36);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
 
@@ -265,7 +265,7 @@
     event->write(2);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** success */ true);
+    valueProducer.onDataPulled(allData, /** success */ true, bucket2StartTimeNs);
 
     // Partial buckets created in 2nd bucket.
     valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 2, "com.foo", 10000, 1);
@@ -327,7 +327,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval =
@@ -346,7 +346,7 @@
     event->write(23);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs);
     // No new data seen, so data has been cleared.
     EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
 
@@ -363,7 +363,7 @@
     event->write(36);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
 
@@ -412,7 +412,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -428,7 +428,7 @@
     event->write(10);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -445,7 +445,7 @@
     event->write(36);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
     EXPECT_EQ(true, curInterval.hasBase);
@@ -493,7 +493,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -509,7 +509,7 @@
     event->write(10);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -524,7 +524,7 @@
     event->write(36);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
     EXPECT_EQ(true, curInterval.hasBase);
@@ -599,7 +599,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
 
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
@@ -710,7 +710,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
 
     valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1);
@@ -725,7 +725,7 @@
     event->write(150);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs);
     EXPECT_EQ(2UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
     EXPECT_EQ(bucket3StartTimeNs, valueProducer.mCurrentBucketStartTimeNs);
     EXPECT_EQ(20L,
@@ -764,7 +764,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
 
     valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1);
@@ -1068,7 +1068,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -1086,7 +1086,7 @@
     event->write(23);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -1107,7 +1107,7 @@
     event->write(36);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket6StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
     // startUpdated:false sum:12
@@ -1195,7 +1195,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
 
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
     EXPECT_EQ(false, curInterval.hasBase);
@@ -1291,7 +1291,7 @@
     EXPECT_EQ(20, curInterval.value.long_value);
     EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
 
-    // Now the alarm is delivered, but it is considered late, it has no effect
+    // Now the alarm is delivered, but it is considered late, the bucket is invalidated.
     vector<shared_ptr<LogEvent>> allData;
     allData.clear();
     shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 50);
@@ -1299,10 +1299,10 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
 
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
-    EXPECT_EQ(true, curInterval.hasBase);
+    EXPECT_EQ(false, curInterval.hasBase);
     EXPECT_EQ(130, curInterval.base.long_value);
     EXPECT_EQ(true, curInterval.hasValue);
     EXPECT_EQ(20, curInterval.value.long_value);
@@ -1752,7 +1752,7 @@
     allData.push_back(event1);
     allData.push_back(event2);
 
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
     EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(true, interval1.hasBase);
     EXPECT_EQ(11, interval1.base.long_value);
@@ -1842,7 +1842,7 @@
     allData.push_back(event1);
     allData.push_back(event2);
 
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
     EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(true, interval1.hasBase);
     EXPECT_EQ(11, interval1.base.long_value);
@@ -1871,7 +1871,7 @@
     event1->write(5);
     event1->init();
     allData.push_back(event1);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs);
 
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(true, interval2.hasBase);
@@ -1893,7 +1893,7 @@
     event2->write(5);
     event2->init();
     allData.push_back(event2);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket5StartTimeNs);
 
     EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(true, interval2.hasBase);
@@ -1968,7 +1968,7 @@
     allData.push_back(event1);
     allData.push_back(event2);
 
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
     EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(true, interval1.hasBase);
     EXPECT_EQ(11, interval1.base.long_value);
@@ -2000,7 +2000,7 @@
     event1->write(5);
     event1->init();
     allData.push_back(event1);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket4StartTimeNs);
 
 	// Only one interval left. One was trimmed.
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
@@ -2018,7 +2018,7 @@
     event1->write(14);
     event1->init();
     allData.push_back(event1);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket5StartTimeNs);
 
     interval2 = valueProducer.mCurrentSlicedBucket.begin()->second[0];
     EXPECT_EQ(true, interval2.hasBase);
@@ -2078,7 +2078,7 @@
     EXPECT_EQ(false, curInterval.hasValue);
 
     vector<shared_ptr<LogEvent>> allData;
-    valueProducer.onDataPulled(allData, /** succeed */ false);
+    valueProducer.onDataPulled(allData, /** succeed */ false, bucket2StartTimeNs);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(false, curInterval.hasBase);
     EXPECT_EQ(false, curInterval.hasValue);
@@ -2179,7 +2179,7 @@
     valueProducer.mCondition = true;
 
     vector<shared_ptr<LogEvent>> allData;
-    valueProducer.onDataPulled(allData, /** succeed */ false);
+    valueProducer.onDataPulled(allData, /** succeed */ false, bucketStartTimeNs);
     EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
 
     valueProducer.onConditionChanged(false, bucketStartTimeNs + 1);
@@ -2361,7 +2361,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucketStartTimeNs);
 
     // This will fail and should invalidate the whole bucket since we do not have all the data
     // needed to compute the metric value when the screen was on.
@@ -2375,7 +2375,7 @@
     event2->write(140);
     event2->init();
     allData.push_back(event2);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
 
     valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1);
     
@@ -2446,7 +2446,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ false);
+    valueProducer.onDataPulled(allData, /** succeed */ false, bucketStartTimeNs);
 
     valueProducer.onConditionChanged(false, bucketStartTimeNs + 2);
     valueProducer.onConditionChanged(true, bucketStartTimeNs + 3);
@@ -2458,7 +2458,7 @@
     event2->write(140);
     event2->init();
     allData.push_back(event2);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
 
     valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1);
     
@@ -2529,7 +2529,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData, /** succeed */ true);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucketStartTimeNs);
 
     // This will fail and should invalidate the whole bucket since we do not have all the data
     // needed to compute the metric value when the screen was on.
@@ -2543,7 +2543,7 @@
     event2->write(140);
     event2->init();
     allData.push_back(event2);
-    valueProducer.onDataPulled(allData, /** succeed */ false);
+    valueProducer.onDataPulled(allData, /** succeed */ false, bucket2StartTimeNs);
 
     valueProducer.flushIfNeededLocked(bucket2StartTimeNs + 1);
     
@@ -2557,6 +2557,268 @@
     EXPECT_EQ(false, valueProducer.mHasGlobalBase);
 }
 
+TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_max_pull_delay_sec(INT_MAX);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Start bucket.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+                event->write(tagId);
+                event->write(3);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
+                                      logEventMatcherIndex, eventMatcherWizard, tagId,
+                                      bucketStartTimeNs, bucketStartTimeNs, pullerManager);
+
+    // Bucket 2 start.
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+    event->write(tagId);
+    event->write(110);
+    event->init();
+    allData.push_back(event);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+
+    // Bucket 3 empty.
+    allData.clear();
+    shared_ptr<LogEvent> event2 = make_shared<LogEvent>(tagId, bucket3StartTimeNs + 1);
+    event2->init();
+    allData.push_back(event2);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket3StartTimeNs);
+    // Data has been trimmed.
+    EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+}
+
+TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onConditionChanged) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_condition(StringToId("SCREEN_ON"));
+    metric.set_max_pull_delay_sec(INT_MAX);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // First onConditionChanged
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+                event->write(tagId);
+                event->write(3);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+                                      eventMatcherWizard, tagId, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 10);
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval& curInterval =
+            valueProducer.mCurrentSlicedBucket.begin()->second[0];
+    EXPECT_EQ(true, curInterval.hasBase);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+
+    // Empty pull.
+    valueProducer.onConditionChanged(false, bucketStartTimeNs + 10);
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
+    EXPECT_EQ(false, curInterval.hasBase);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(false, valueProducer.mHasGlobalBase);
+}
+
+TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onBucketBoundary) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_condition(StringToId("SCREEN_ON"));
+    metric.set_max_pull_delay_sec(INT_MAX);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // First onConditionChanged
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+                event->write(tagId);
+                event->write(1);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+                event->write(tagId);
+                event->write(2);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+                event->write(tagId);
+                event->write(5);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+                                      eventMatcherWizard, tagId, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 10);
+    valueProducer.onConditionChanged(false, bucketStartTimeNs + 11);
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 12);
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval& curInterval =
+            valueProducer.mCurrentSlicedBucket.begin()->second[0];
+    EXPECT_EQ(true, curInterval.hasBase);
+    EXPECT_EQ(true, curInterval.hasValue);
+    EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+
+    // End of bucket
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
+    // Data is empty, base should be reset.
+    EXPECT_EQ(false, curInterval.hasBase);
+    EXPECT_EQ(5, curInterval.base.long_value);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+
+    EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
+    EXPECT_EQ(1, valueProducer.mPastBuckets.begin()->second[0].values[0].long_value);
+}
+
+
+TEST(ValueMetricProducerTest, TestPartialResetOnBucketBoundaries) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.mutable_dimensions_in_what()->set_field(tagId);
+    metric.mutable_dimensions_in_what()->add_child()->set_field(1);
+    metric.set_condition(StringToId("SCREEN_ON"));
+    metric.set_max_pull_delay_sec(INT_MAX);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // First onConditionChanged
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+                event->write(tagId);
+                event->write(1);
+                event->write(1);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+                                      eventMatcherWizard, tagId, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 10);
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+
+    // End of bucket
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+    event->write(2);
+    event->write(2);
+    event->init();
+    allData.push_back(event);
+    valueProducer.onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
+
+    // Key 1 should be reset since in not present in the most pull.
+    EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
+    auto iterator = valueProducer.mCurrentSlicedBucket.begin();
+    EXPECT_EQ(true, iterator->second[0].hasBase);
+    EXPECT_EQ(2, iterator->second[0].base.long_value);
+    EXPECT_EQ(false, iterator->second[0].hasValue);
+    iterator++;
+    EXPECT_EQ(false, iterator->second[0].hasBase);
+    EXPECT_EQ(1, iterator->second[0].base.long_value);
+    EXPECT_EQ(false, iterator->second[0].hasValue);
+
+    EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android