Reset the base when pull fails.
Without resetting the base, we will compute the wrong diff when we have
a condition change.
Move the responsability of handling failures to the PullReceivers. This
is more consistent with #onConditionChanged which does handle the
failure cases.
We also always compute nextPullTimeNs in order to only call onDataPulled
on bucket boundaries. The current code does not update nextPullTimeNs
which means a new alarm might trigger a pull and onDataPulled in the
middle of the bucket. The behavior in this case is undefined.
Bug: 123181864
Test: atest statsd_test
Change-Id: I0910b7db26a0de764436c46c8d7d11cafa52dcd9
diff --git a/cmds/statsd/src/external/PullDataReceiver.h b/cmds/statsd/src/external/PullDataReceiver.h
index 0d505cb..b071682 100644
--- a/cmds/statsd/src/external/PullDataReceiver.h
+++ b/cmds/statsd/src/external/PullDataReceiver.h
@@ -28,9 +28,15 @@
class PullDataReceiver : virtual public RefBase{
public:
virtual ~PullDataReceiver() {}
- virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) = 0;
+ /**
+ * @param data The pulled data.
+ * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the
+ * bucket should be invalidated.
+ */
+ virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+ bool pullSuccess) = 0;
};
} // namespace statsd
} // namespace os
-} // namespace android
\ No newline at end of file
+} // namespace android
diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp
index c69384c..9b603d6 100644
--- a/cmds/statsd/src/external/StatsPullerManager.cpp
+++ b/cmds/statsd/src/external/StatsPullerManager.cpp
@@ -358,12 +358,13 @@
for (const auto& pullInfo : needToPull) {
vector<shared_ptr<LogEvent>> data;
- if (!Pull(pullInfo.first, &data)) {
+ bool pullSuccess = Pull(pullInfo.first, &data);
+ if (pullSuccess) {
+ StatsdStats::getInstance().notePullDelay(
+ pullInfo.first, getElapsedRealtimeNs() - elapsedTimeNs);
+ } else {
VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
- continue;
}
- StatsdStats::getInstance().notePullDelay(pullInfo.first,
- getElapsedRealtimeNs() - elapsedTimeNs);
// Convention is to mark pull atom timestamp at request time.
// If we pull at t0, puller starts at t1, finishes at t2, and send back
@@ -380,8 +381,8 @@
for (const auto& receiverInfo : pullInfo.second) {
sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
if (receiverPtr != nullptr) {
- receiverPtr->onDataPulled(data);
- // we may have just come out of a coma, compute next pull time
+ receiverPtr->onDataPulled(data, pullSuccess);
+ // We may have just come out of a coma, compute next pull time.
int numBucketsAhead =
(elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index c2878f0..c9b7165 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -406,9 +406,10 @@
return gaugeFields;
}
-void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
+void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+ bool pullSuccess) {
std::lock_guard<std::mutex> lock(mMutex);
- if (allData.size() == 0) {
+ if (!pullSuccess || allData.size() == 0) {
return;
}
for (const auto& data : allData) {
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index df08779..d480941 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -67,7 +67,8 @@
virtual ~GaugeMetricProducer();
// Handles when the pulled data arrives.
- void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
+ void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+ bool pullSuccess) override;
// GaugeMetric needs to immediately trigger another pull when we create the partial bucket.
void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 6aa8e84..9fe07e1 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -382,9 +382,16 @@
return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
}
-void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
+void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+ bool pullSuccess) {
std::lock_guard<std::mutex> lock(mMutex);
if (mCondition) {
+ if (!pullSuccess) {
+ // If the pull failed, we won't be able to compute a diff.
+ resetBase();
+ return;
+ }
+
if (allData.size() == 0) {
VLOG("Data pulled is empty");
StatsdStats::getInstance().noteEmptyData(mPullTagId);
@@ -399,12 +406,13 @@
// if the diff base will be cleared and this new data will serve as new diff base.
int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
- if (bucketEndTime < mCurrentBucketStartTimeNs) {
+ bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs;
+ if (isEventLate) {
VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
(long long)mCurrentBucketStartTimeNs);
StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
- return;
}
+
for (const auto& data : allData) {
LogEvent localCopy = data->makeCopy();
if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index a8dfc5b..cab50aa 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -51,7 +51,8 @@
virtual ~ValueMetricProducer();
// Process data pulled on bucket boundary.
- void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
+ void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+ bool pullSuccess) override;
// ValueMetric needs special logic if it's a pulled atom.
void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
@@ -216,7 +217,9 @@
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
- FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFail);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket);
FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
};