Reset the base when pull fails.

Without resetting the base, we will compute the wrong diff when we have
a condition change.

Move the responsability of handling failures to the PullReceivers. This
is more consistent with #onConditionChanged which does handle the
failure cases.

We also always compute nextPullTimeNs in order to only call onDataPulled
on bucket boundaries. The current code does not update nextPullTimeNs
which means a new alarm might trigger a pull and onDataPulled in the
middle of the bucket. The behavior in this case is undefined.

Bug: 123181864
Test: atest statsd_test
Change-Id: I0910b7db26a0de764436c46c8d7d11cafa52dcd9
diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp
index c69384c..9b603d6 100644
--- a/cmds/statsd/src/external/StatsPullerManager.cpp
+++ b/cmds/statsd/src/external/StatsPullerManager.cpp
@@ -358,12 +358,13 @@
 
     for (const auto& pullInfo : needToPull) {
         vector<shared_ptr<LogEvent>> data;
-        if (!Pull(pullInfo.first, &data)) {
+        bool pullSuccess = Pull(pullInfo.first, &data);
+        if (pullSuccess) {
+            StatsdStats::getInstance().notePullDelay(
+                    pullInfo.first, getElapsedRealtimeNs() - elapsedTimeNs);
+        } else {
             VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
-            continue;
         }
-        StatsdStats::getInstance().notePullDelay(pullInfo.first,
-                                                 getElapsedRealtimeNs() - elapsedTimeNs);
 
         // Convention is to mark pull atom timestamp at request time.
         // If we pull at t0, puller starts at t1, finishes at t2, and send back
@@ -380,8 +381,8 @@
         for (const auto& receiverInfo : pullInfo.second) {
             sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
             if (receiverPtr != nullptr) {
-                receiverPtr->onDataPulled(data);
-                // we may have just come out of a coma, compute next pull time
+                receiverPtr->onDataPulled(data, pullSuccess);
+                // We may have just come out of a coma, compute next pull time.
                 int numBucketsAhead =
                         (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
                 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;