Reset the base when pull fails.

Without resetting the base, we will compute the wrong diff when we have
a condition change.

Move the responsability of handling failures to the PullReceivers. This
is more consistent with #onConditionChanged which does handle the
failure cases.

We also always compute nextPullTimeNs in order to only call onDataPulled
on bucket boundaries. The current code does not update nextPullTimeNs
which means a new alarm might trigger a pull and onDataPulled in the
middle of the bucket. The behavior in this case is undefined.

Bug: 123181864
Test: atest statsd_test
Change-Id: I0910b7db26a0de764436c46c8d7d11cafa52dcd9
diff --git a/cmds/statsd/src/external/PullDataReceiver.h b/cmds/statsd/src/external/PullDataReceiver.h
index 0d505cb..b071682 100644
--- a/cmds/statsd/src/external/PullDataReceiver.h
+++ b/cmds/statsd/src/external/PullDataReceiver.h
@@ -28,9 +28,15 @@
 class PullDataReceiver : virtual public RefBase{
  public:
   virtual ~PullDataReceiver() {}
-  virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) = 0;
+  /**
+   * @param data The pulled data.
+   * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the
+   * bucket should be invalidated.
+   */
+  virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, 
+                            bool pullSuccess) = 0;
 };
 
 }  // namespace statsd
 }  // namespace os
-}  // namespace android
\ No newline at end of file
+}  // namespace android
diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp
index c69384c..9b603d6 100644
--- a/cmds/statsd/src/external/StatsPullerManager.cpp
+++ b/cmds/statsd/src/external/StatsPullerManager.cpp
@@ -358,12 +358,13 @@
 
     for (const auto& pullInfo : needToPull) {
         vector<shared_ptr<LogEvent>> data;
-        if (!Pull(pullInfo.first, &data)) {
+        bool pullSuccess = Pull(pullInfo.first, &data);
+        if (pullSuccess) {
+            StatsdStats::getInstance().notePullDelay(
+                    pullInfo.first, getElapsedRealtimeNs() - elapsedTimeNs);
+        } else {
             VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
-            continue;
         }
-        StatsdStats::getInstance().notePullDelay(pullInfo.first,
-                                                 getElapsedRealtimeNs() - elapsedTimeNs);
 
         // Convention is to mark pull atom timestamp at request time.
         // If we pull at t0, puller starts at t1, finishes at t2, and send back
@@ -380,8 +381,8 @@
         for (const auto& receiverInfo : pullInfo.second) {
             sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
             if (receiverPtr != nullptr) {
-                receiverPtr->onDataPulled(data);
-                // we may have just come out of a coma, compute next pull time
+                receiverPtr->onDataPulled(data, pullSuccess);
+                // We may have just come out of a coma, compute next pull time.
                 int numBucketsAhead =
                         (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
                 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index c2878f0..c9b7165 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -406,9 +406,10 @@
     return gaugeFields;
 }
 
-void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
+void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+                                       bool pullSuccess) {
     std::lock_guard<std::mutex> lock(mMutex);
-    if (allData.size() == 0) {
+    if (!pullSuccess || allData.size() == 0) {
         return;
     }
     for (const auto& data : allData) {
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index df08779..d480941 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -67,7 +67,8 @@
     virtual ~GaugeMetricProducer();
 
     // Handles when the pulled data arrives.
-    void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
+    void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+                      bool pullSuccess) override;
 
     // GaugeMetric needs to immediately trigger another pull when we create the partial bucket.
     void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 6aa8e84..9fe07e1 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -382,9 +382,16 @@
     return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
 }
 
-void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
+void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+                                       bool pullSuccess) {
     std::lock_guard<std::mutex> lock(mMutex);
     if (mCondition) {
+        if (!pullSuccess) {
+            // If the pull failed, we won't be able to compute a diff.
+            resetBase();
+            return;
+        }
+
         if (allData.size() == 0) {
             VLOG("Data pulled is empty");
             StatsdStats::getInstance().noteEmptyData(mPullTagId);
@@ -399,12 +406,13 @@
         // if the diff base will be cleared and this new data will serve as new diff base.
         int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
         int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
-        if (bucketEndTime < mCurrentBucketStartTimeNs) {
+        bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs;
+        if (isEventLate) {
             VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
                  (long long)mCurrentBucketStartTimeNs);
             StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
-            return;
         }
+
         for (const auto& data : allData) {
             LogEvent localCopy = data->makeCopy();
             if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index a8dfc5b..cab50aa 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -51,7 +51,8 @@
     virtual ~ValueMetricProducer();
 
     // Process data pulled on bucket boundary.
-    void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
+    void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+                      bool pullSuccess) override;
 
     // ValueMetric needs special logic if it's a pulled atom.
     void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
@@ -216,7 +217,9 @@
     FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
     FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
     FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
-    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFail);
+    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange);
+    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
+    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket);
     FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
 };