Log bucket drop reasons for ValueMetric and GaugeMetric

- For all cases where ValueMetric and GaugeMetric drop buckets, we will
log the reason for the drop and the timestamp of the drop event. All the
cases where ValueMetric drops buckets is documented in
go/valuemetric-bucketdropping. GaugeMetric only drops buckets when the
size of the bucket is less than the specified minimum bucket size.
- Dropped/skipped bucket data structures have been moved to
MetricProducer so all metrics can potentially log bucket drop cases.
- Added unit tests for ValueMetricProducer and GaugeMetricProducer to
check that all bucket drop cases can be logged and the number of drop
events is capped at 10 for one skipped bucket.
- ***Could not test drops due to a pull delay because pull delays are
measured by real elapsed time in #accumulateEvents.

Test: bit statsd_test:*
Bug: 145836670
Bug: 140421050
Bug: 146082150
Bug: 146082276
Change-Id: I0197660bf89837162330aeaafb5397c7328e3993
diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h
index 23d2ace..2ce4b25 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.h
+++ b/cmds/statsd/src/guardrail/StatsdStats.h
@@ -181,6 +181,8 @@
 
     static const int64_t kInt64Max = 0x7fffffffffffffffLL;
 
+    static const int32_t kMaxLoggedBucketDropEvents = 10;
+
     /**
      * Report a new config has been received and report the static stats about the config.
      *
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 64344e8..4ab6ec3 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -52,8 +52,13 @@
 // for GaugeMetricDataWrapper
 const int FIELD_ID_DATA = 1;
 const int FIELD_ID_SKIPPED = 2;
+// for SkippedBuckets
 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
+const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
+// for DumpEvent Proto
+const int FIELD_ID_BUCKET_DROP_REASON = 1;
+const int FIELD_ID_DROP_TIME = 2;
 // for GaugeMetricData
 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
 const int FIELD_ID_BUCKET_INFO = 3;
@@ -193,7 +198,7 @@
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
     protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
 
-    if (mPastBuckets.empty()) {
+    if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
         return;
     }
 
@@ -212,13 +217,21 @@
 
     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
 
-    for (const auto& pair : mSkippedBuckets) {
+    for (const auto& skippedBucket : mSkippedBuckets) {
         uint64_t wrapperToken =
                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
-                           (long long)(NanoToMillis(pair.first)));
+                           (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
-                           (long long)(NanoToMillis(pair.second)));
+                           (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
+
+        for (const auto& dropEvent : skippedBucket.dropEvents) {
+            uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
+                                                         FIELD_ID_SKIPPED_DROP_EVENT);
+            protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME, (long long) (NanoToMillis(dropEvent.dropTimeNs)));
+            protoOutput->end(dropEventToken);
+        }
         protoOutput->end(wrapperToken);
     }
 
@@ -545,7 +558,10 @@
         info.mBucketEndNs = fullBucketEndTimeNs;
     }
 
-    if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+    // Add bucket to mPastBuckets if bucket is large enough.
+    // Otherwise, drop the bucket data and add bucket metadata to mSkippedBuckets.
+    bool isBucketLargeEnough = info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
+    if (isBucketLargeEnough) {
         for (const auto& slice : *mCurrentSlicedBucket) {
             info.mGaugeAtoms = slice.second;
             auto& bucketList = mPastBuckets[slice.first];
@@ -554,7 +570,13 @@
                  slice.first.toString().c_str());
         }
     } else {
-        mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
+        mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
+        mCurrentSkippedBucket.bucketEndTimeNs = eventTimeNs;
+        if (!maxDropEventsReached()) {
+            mCurrentSkippedBucket.dropEvents.emplace_back(
+                    buildDropEvent(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL));
+        }
+        mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
     }
 
     // If we have anomaly trackers, we need to update the partial bucket values.
@@ -573,6 +595,7 @@
     StatsdStats::getInstance().noteBucketCount(mMetricId);
     mCurrentSlicedBucket = std::make_shared<DimToGaugeAtomsMap>();
     mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
+    mCurrentSkippedBucket.reset();
 }
 
 size_t GaugeMetricProducer::byteSizeLocked() const {
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 640a02a..12dcaa4 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -158,9 +158,6 @@
     // this slice (ie, for partial buckets, we use the last partial bucket in this full bucket).
     std::shared_ptr<DimToValMap> mCurrentSlicedBucketForAnomaly;
 
-    // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped.
-    std::list<std::pair<int64_t, int64_t>> mSkippedBuckets;
-
     const int64_t mMinBucketSizeNs;
 
     // Translate Atom based bucket to single numeric value bucket for anomaly and updates the map
diff --git a/cmds/statsd/src/metrics/MetricProducer.cpp b/cmds/statsd/src/metrics/MetricProducer.cpp
index 2c8f0e3..cf1d2f3 100644
--- a/cmds/statsd/src/metrics/MetricProducer.cpp
+++ b/cmds/statsd/src/metrics/MetricProducer.cpp
@@ -19,6 +19,7 @@
 
 #include "MetricProducer.h"
 
+#include "../guardrail/StatsdStats.h"
 #include "state/StateTracker.h"
 
 using android::util::FIELD_COUNT_REPEATED;
@@ -289,6 +290,17 @@
     }
 }
 
+DropEvent MetricProducer::buildDropEvent(const int64_t dropTimeNs, const BucketDropReason reason) {
+    DropEvent event;
+    event.reason = reason;
+    event.dropTimeNs = dropTimeNs;
+    return event;
+}
+
+bool MetricProducer::maxDropEventsReached() {
+    return mCurrentSkippedBucket.dropEvents.size() >= StatsdStats::kMaxLoggedBucketDropEvents;
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index 3512f18..30675fc 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -70,6 +70,22 @@
     NO_TIME_CONSTRAINTS = 2
 };
 
+// Keep this in sync with BucketDropReason enum in stats_log.proto
+enum BucketDropReason {
+    // For ValueMetric, a bucket is dropped during a dump report request iff
+    // current bucket should be included, a pull is needed (pulled metric and
+    // condition is true), and we are under fast time constraints.
+    DUMP_REPORT_REQUESTED = 1,
+    EVENT_IN_WRONG_BUCKET = 2,
+    CONDITION_UNKNOWN = 3,
+    PULL_FAILED = 4,
+    PULL_DELAYED = 5,
+    DIMENSION_GUARDRAIL_REACHED = 6,
+    MULTIPLE_BUCKETS_SKIPPED = 7,
+    // Not an invalid bucket case, but the bucket is dropped.
+    BUCKET_TOO_SMALL = 8
+};
+
 struct Activation {
     Activation(const ActivationType& activationType, const int64_t ttlNs)
         : ttl_ns(ttlNs),
@@ -83,6 +99,28 @@
     const ActivationType activationType;
 };
 
+struct DropEvent {
+    // Reason for dropping the bucket and/or marking the bucket invalid.
+    BucketDropReason reason;
+    // The timestamp of the drop event.
+    int64_t dropTimeNs;
+};
+
+struct SkippedBucket {
+    // Start time of the dropped bucket.
+    int64_t bucketStartTimeNs;
+    // End time of the dropped bucket.
+    int64_t bucketEndTimeNs;
+    // List of events that invalidated this bucket.
+    std::vector<DropEvent> dropEvents;
+
+    void reset() {
+        bucketStartTimeNs = 0;
+        bucketEndTimeNs = 0;
+        dropEvents.clear();
+    }
+};
+
 // A MetricProducer is responsible for compute one single metrics, creating stats log report, and
 // writing the report to dropbox. MetricProducers should respond to package changes as required in
 // PackageInfoListener, but if none of the metrics are slicing by package name, then the update can
@@ -342,6 +380,12 @@
     void getMappedStateValue(const int32_t atomId, const HashableDimensionKey& queryKey,
                              FieldValue* value);
 
+    DropEvent buildDropEvent(const int64_t dropTimeNs, const BucketDropReason reason);
+
+    // Returns true if the number of drop events in the current bucket has
+    // exceeded the maximum number allowed, which is currently capped at 10.
+    bool maxDropEventsReached();
+
     const int64_t mMetricId;
 
     const ConfigKey mConfigKey;
@@ -403,6 +447,10 @@
     // atom to fields in the "what" atom.
     std::vector<Metric2State> mMetric2StateLinks;
 
+    SkippedBucket mCurrentSkippedBucket;
+    // Buckets that were invalidated and had their data dropped.
+    std::vector<SkippedBucket> mSkippedBuckets;
+
     FRIEND_TEST(CountMetricE2eTest, TestSlicedState);
     FRIEND_TEST(CountMetricE2eTest, TestSlicedStateWithMap);
     FRIEND_TEST(CountMetricE2eTest, TestMultipleSlicedStates);
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 2c99911..d8f399f 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -55,8 +55,13 @@
 // for ValueMetricDataWrapper
 const int FIELD_ID_DATA = 1;
 const int FIELD_ID_SKIPPED = 2;
+// for SkippedBuckets
 const int FIELD_ID_SKIPPED_START_MILLIS = 3;
 const int FIELD_ID_SKIPPED_END_MILLIS = 4;
+const int FIELD_ID_SKIPPED_DROP_EVENT = 5;
+// for DumpEvent Proto
+const int FIELD_ID_BUCKET_DROP_REASON = 1;
+const int FIELD_ID_DROP_TIME = 2;
 // for ValueMetricData
 const int FIELD_ID_DIMENSION_IN_WHAT = 1;
 const int FIELD_ID_BUCKET_INFO = 3;
@@ -211,7 +216,7 @@
         if (pullNeeded) {
             switch (dumpLatency) {
                 case FAST:
-                    invalidateCurrentBucket();
+                    invalidateCurrentBucket(dumpTimeNs, BucketDropReason::DUMP_REPORT_REQUESTED);
                     break;
                 case NO_TIME_CONSTRAINTS:
                     pullAndMatchEventsLocked(dumpTimeNs);
@@ -240,13 +245,22 @@
 
     uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
 
-    for (const auto& pair : mSkippedBuckets) {
+    for (const auto& skippedBucket : mSkippedBuckets) {
         uint64_t wrapperToken =
                 protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START_MILLIS,
-                           (long long)(NanoToMillis(pair.first)));
+                           (long long)(NanoToMillis(skippedBucket.bucketStartTimeNs)));
         protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END_MILLIS,
-                           (long long)(NanoToMillis(pair.second)));
+                           (long long)(NanoToMillis(skippedBucket.bucketEndTimeNs)));
+        for (const auto& dropEvent : skippedBucket.dropEvents) {
+            uint64_t dropEventToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
+                                                         FIELD_ID_SKIPPED_DROP_EVENT);
+            protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_BUCKET_DROP_REASON, dropEvent.reason);
+            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DROP_TIME,
+                               (long long)(NanoToMillis(dropEvent.dropTimeNs)));
+            ;
+            protoOutput->end(dropEventToken);
+        }
         protoOutput->end(wrapperToken);
     }
 
@@ -321,16 +335,25 @@
     }
 }
 
-void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase() {
+void ValueMetricProducer::invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,
+                                                                  const BucketDropReason reason) {
     if (!mCurrentBucketIsInvalid) {
-        // Only report once per invalid bucket.
+        // Only report to StatsdStats once per invalid bucket.
         StatsdStats::getInstance().noteInvalidatedBucket(mMetricId);
+
+        mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
+        mCurrentSkippedBucket.bucketEndTimeNs = getCurrentBucketEndTimeNs();
+    }
+
+    if (!maxDropEventsReached()) {
+        mCurrentSkippedBucket.dropEvents.emplace_back(buildDropEvent(dropTimeNs, reason));
     }
     mCurrentBucketIsInvalid = true;
 }
 
-void ValueMetricProducer::invalidateCurrentBucket() {
-    invalidateCurrentBucketWithoutResetBase();
+void ValueMetricProducer::invalidateCurrentBucket(const int64_t dropTimeNs,
+                                                  const BucketDropReason reason) {
+    invalidateCurrentBucketWithoutResetBase(dropTimeNs, reason);
     resetBase();
 }
 
@@ -351,7 +374,8 @@
     bool isEventTooLate  = eventTimeNs < mCurrentBucketStartTimeNs;
     if (isEventTooLate) {
         // Drop bucket because event arrived too late, ie. we are missing data for this bucket.
-        invalidateCurrentBucket();
+        StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
+        invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
     }
 
     // Call parent method once we've verified the validity of current bucket.
@@ -394,8 +418,9 @@
     if (isEventTooLate) {
         VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
              (long long)mCurrentBucketStartTimeNs);
+        StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
         StatsdStats::getInstance().noteConditionChangeInNextBucket(mMetricId);
-        invalidateCurrentBucket();
+        invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
         mCondition = ConditionState::kUnknown;
         mConditionTimer.onConditionChanged(mCondition, eventTimeNs);
         return;
@@ -408,7 +433,7 @@
     //
     // We still want to pull to set the base.
     if (mCondition == ConditionState::kUnknown) {
-        invalidateCurrentBucket();
+        invalidateCurrentBucket(eventTimeNs, BucketDropReason::CONDITION_UNKNOWN);
     }
 
     // Pull and match for the following condition change cases:
@@ -445,7 +470,7 @@
     vector<std::shared_ptr<LogEvent>> allData;
     if (!mPullerManager->Pull(mPullTagId, &allData)) {
         ALOGE("Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
-        invalidateCurrentBucket();
+        invalidateCurrentBucket(timestampNs, BucketDropReason::PULL_FAILED);
         return;
     }
 
@@ -465,7 +490,7 @@
     if (mCondition == ConditionState::kTrue) {
         // If the pull failed, we won't be able to compute a diff.
         if (!pullSuccess) {
-            invalidateCurrentBucket();
+            invalidateCurrentBucket(originalPullTimeNs, BucketDropReason::PULL_FAILED);
         } else {
             bool isEventLate = originalPullTimeNs < getCurrentBucketEndTimeNs();
             if (isEventLate) {
@@ -502,11 +527,12 @@
         VLOG("Skip bucket end pull due to late arrival: %lld vs %lld",
              (long long)eventElapsedTimeNs, (long long)mCurrentBucketStartTimeNs);
         StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
-        invalidateCurrentBucket();
+        invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
         return;
     }
 
-    const int64_t pullDelayNs = getElapsedRealtimeNs() - originalPullTimeNs;
+    const int64_t elapsedRealtimeNs = getElapsedRealtimeNs();
+    const int64_t pullDelayNs = elapsedRealtimeNs - originalPullTimeNs;
     StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
     if (pullDelayNs > mMaxPullDelayNs) {
         ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
@@ -514,7 +540,7 @@
         StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
         // We are missing one pull from the bucket which means we will not have a complete view of
         // what's going on.
-        invalidateCurrentBucket();
+        invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::PULL_DELAYED);
         return;
     }
 
@@ -553,7 +579,7 @@
     // incorrectly compute the diff when mUseZeroDefaultBase is true since an existing key
     // might be missing from mCurrentSlicedBucket.
     if (hasReachedGuardRailLimit()) {
-        invalidateCurrentBucket();
+        invalidateCurrentBucket(eventElapsedTimeNs, BucketDropReason::DIMENSION_GUARDRAIL_REACHED);
         mCurrentSlicedBucket.clear();
     }
 }
@@ -839,7 +865,8 @@
         StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
         // Something went wrong. Maybe the device was sleeping for a long time. It is better
         // to mark the current bucket as invalid. The last pull might have been successful through.
-        invalidateCurrentBucketWithoutResetBase();
+        invalidateCurrentBucketWithoutResetBase(eventTimeNs,
+                                                BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
     }
 
     VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
@@ -849,6 +876,18 @@
     // Close the current bucket.
     int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
     bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
+    if (!isBucketLargeEnough) {
+        // If the bucket is valid, this is the only drop reason and we need to
+        // set the skipped bucket start and end times.
+        if (!mCurrentBucketIsInvalid) {
+            mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
+            mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
+        }
+        if (!maxDropEventsReached()) {
+            mCurrentSkippedBucket.dropEvents.emplace_back(
+                    buildDropEvent(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL));
+        }
+    }
     if (isBucketLargeEnough && !mCurrentBucketIsInvalid) {
         // The current bucket is large enough to keep.
         for (const auto& slice : mCurrentSlicedBucket) {
@@ -861,7 +900,7 @@
             }
         }
     } else {
-        mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
+        mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
     }
 
     appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
@@ -917,6 +956,8 @@
     }
 
     mCurrentBucketIsInvalid = false;
+    mCurrentSkippedBucket.reset();
+
     // If we do not have a global base when the condition is true,
     // we will have incomplete bucket for the next bucket.
     if (mUseDiff && !mHasGlobalBase && mCondition) {
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 2033a2a..4eae99b 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -130,8 +130,9 @@
     int64_t calcBucketsForwardCount(const int64_t& eventTimeNs) const;
 
     // Mark the data as invalid.
-    void invalidateCurrentBucket();
-    void invalidateCurrentBucketWithoutResetBase();
+    void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);
+    void invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,
+                                                 const BucketDropReason reason);
 
     const int mWhatMatcherIndex;
 
@@ -177,9 +178,6 @@
     // Save the past buckets and we can clear when the StatsLogReport is dumped.
     std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>> mPastBuckets;
 
-    // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped.
-    std::list<std::pair<int64_t, int64_t>> mSkippedBuckets;
-
     const int64_t mMinBucketSizeNs;
 
     // Util function to check whether the specified dimension hits the guardrail.
@@ -248,7 +246,6 @@
     FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition);
     FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition);
     FRIEND_TEST(ValueMetricProducerTest, TestBucketBoundaryWithCondition2);
-    FRIEND_TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid);
     FRIEND_TEST(ValueMetricProducerTest, TestBucketInvalidIfGlobalBaseIsNotSet);
     FRIEND_TEST(ValueMetricProducerTest, TestCalcPreviousBucketEndTime);
     FRIEND_TEST(ValueMetricProducerTest, TestDataIsNotUpdatedWhenNoConditionChanged);
@@ -258,10 +255,6 @@
     FRIEND_TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition);
     FRIEND_TEST(ValueMetricProducerTest, TestFirstBucket);
     FRIEND_TEST(ValueMetricProducerTest, TestFullBucketResetWhenLastBucketInvalid);
-    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit);
-    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed);
-    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed);
-    FRIEND_TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed);
     FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithDiff);
     FRIEND_TEST(ValueMetricProducerTest, TestLateOnDataPulledWithoutDiff);
     FRIEND_TEST(ValueMetricProducerTest, TestPartialBucketCreated);
@@ -295,6 +288,13 @@
     FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
     FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
     FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
+
+    FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenOneConditionFailed);
+    FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenInitialPullFailed);
+    FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenLastPullFailed);
+    FRIEND_TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenGuardRailHit);
+    FRIEND_TEST(ValueMetricProducerTest_BucketDrop,
+                TestInvalidBucketWhenAccumulateEventWrongBucket);
     friend class ValueMetricProducerTestHelper;
 };
 
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index e45e24fe..8b4d781 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -191,11 +191,40 @@
 
   // Fields 2 and 3 are reserved.
 
+  // Keep this in sync with BucketDropReason enum in MetricProducer.h.
+  enum BucketDropReason {
+      // For ValueMetric, a bucket is dropped during a dump report request iff
+      // current bucket should be included, a pull is needed (pulled metric and
+      // condition is true), and we are under fast time constraints.
+      DUMP_REPORT_REQUESTED = 1;
+      EVENT_IN_WRONG_BUCKET = 2;
+      CONDITION_UNKNOWN = 3;
+      PULL_FAILED = 4;
+      PULL_DELAYED = 5;
+      DIMENSION_GUARDRAIL_REACHED = 6;
+      MULTIPLE_BUCKETS_SKIPPED = 7;
+      // Not an invalid bucket case, but the bucket is dropped.
+      BUCKET_TOO_SMALL = 8;
+  };
+
+  message DropEvent {
+      optional BucketDropReason drop_reason = 1;
+
+      optional int64 drop_time_millis = 2;
+  }
+
   message SkippedBuckets {
       optional int64 start_bucket_elapsed_nanos = 1;
+
       optional int64 end_bucket_elapsed_nanos = 2;
+
       optional int64 start_bucket_elapsed_millis = 3;
+
       optional int64 end_bucket_elapsed_millis = 4;
+
+      // The number of drop events is capped by StatsdStats::kMaxLoggedBucketDropEvents.
+      // The current maximum is 10 drop events.
+      repeated DropEvent drop_event = 5;
   }
 
   message EventMetricDataWrapper {
diff --git a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
index b027e8e..308c43d 100644
--- a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
@@ -12,19 +12,22 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include "src/matchers/SimpleLogMatchingTracker.h"
 #include "src/metrics/GaugeMetricProducer.h"
-#include "src/stats_log_util.h"
-#include "logd/LogEvent.h"
-#include "metrics_test_helper.h"
-#include "tests/statsd_test_util.h"
 
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 #include <math.h>
 #include <stdio.h>
+
 #include <vector>
 
+#include "logd/LogEvent.h"
+#include "metrics_test_helper.h"
+#include "src/matchers/SimpleLogMatchingTracker.h"
+#include "src/metrics/MetricProducer.h"
+#include "src/stats_log_util.h"
+#include "tests/statsd_test_util.h"
+
 using namespace testing;
 using android::sp;
 using std::set;
@@ -784,6 +787,70 @@
     EXPECT_EQ(6, bucketIt->second.back().mGaugeAtoms[1].mFields->begin()->mValue.int_value);
 }
 
+/*
+ * Test that BUCKET_TOO_SMALL dump reason is logged when a flushed bucket size
+ * is smaller than the "min_bucket_size_nanos" specified in the metric config.
+ */
+TEST(GaugeMetricProducerTest_BucketDrop, TestBucketDropWhenBucketTooSmall) {
+    GaugeMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(FIVE_MINUTES);
+    metric.set_sampling_type(GaugeMetric::FIRST_N_SAMPLES);
+    metric.set_min_bucket_size_nanos(10000000000);  // 10 seconds
+
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard = new EventMatcherWizard({
+        new SimpleLogMatchingTracker(atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Bucket start.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
+                event->write("field1");
+                event->write(10);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    int triggerId = 5;
+    GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
+                                      logEventMatcherIndex, eventMatcherWizard,
+                                      tagId, triggerId, tagId, bucketStartTimeNs, bucketStartTimeNs,
+                                      pullerManager);
+
+    LogEvent trigger(triggerId, bucketStartTimeNs + 3);
+    trigger.init();
+    gaugeProducer.onMatchedLogEvent(1 /*log matcher index*/, trigger);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    gaugeProducer.onDumpReport(bucketStartTimeNs + 9000000, true /* include recent buckets */,
+                                true, FAST /* dump_latency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_gauge_metrics());
+    EXPECT_EQ(0, report.gauge_metrics().data_size());
+    EXPECT_EQ(1, report.gauge_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.gauge_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 9000000),
+              report.gauge_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.gauge_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.gauge_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::BUCKET_TOO_SMALL, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 9000000), dropEvent.drop_time_millis());
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index 4b9d0c0..da0a672 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -12,21 +12,23 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-#include "src/matchers/SimpleLogMatchingTracker.h"
 #include "src/metrics/ValueMetricProducer.h"
-#include "src/stats_log_util.h"
-#include "metrics_test_helper.h"
-#include "tests/statsd_test_util.h"
 
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 #include <math.h>
 #include <stdio.h>
+
 #include <vector>
 
+#include "metrics_test_helper.h"
+#include "src/matchers/SimpleLogMatchingTracker.h"
+#include "src/metrics/MetricProducer.h"
+#include "src/stats_log_util.h"
+#include "tests/statsd_test_util.h"
+
 using namespace testing;
 using android::sp;
-using android::util::ProtoReader;
 using std::make_shared;
 using std::set;
 using std::shared_ptr;
@@ -128,6 +130,24 @@
         return valueProducer;
     }
 
+    static sp<ValueMetricProducer> createValueProducerWithNoInitialCondition(
+            sp<MockStatsPullerManager>& pullerManager, ValueMetric& metric) {
+        UidMap uidMap;
+        SimpleAtomMatcher atomMatcher;
+        atomMatcher.set_atom_id(tagId);
+        sp<EventMatcherWizard> eventMatcherWizard =
+                new EventMatcherWizard({new SimpleLogMatchingTracker(
+                        atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+        sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+        EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+        EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+        sp<ValueMetricProducer> valueProducer = new ValueMetricProducer(
+                kConfigKey, metric, 1, wizard, logEventMatcherIndex, eventMatcherWizard, tagId,
+                bucketStartTimeNs, bucketStartTimeNs, pullerManager);
+        return valueProducer;
+    }
+
     static ValueMetric createMetric() {
         ValueMetric metric;
         metric.set_id(metricId);
@@ -206,7 +226,7 @@
  * Tests pulled atoms with no conditions
  */
 TEST(ValueMetricProducerTest, TestPulledEventsNoCondition) {
-    ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); 
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
     EXPECT_CALL(*pullerManager, Pull(tagId, _))
             .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
@@ -290,7 +310,7 @@
 }
 
 TEST(ValueMetricProducerTest, TestPartialBucketCreated) {
-    ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); 
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
     EXPECT_CALL(*pullerManager, Pull(tagId, _))
             // Initialize bucket.
@@ -347,7 +367,7 @@
  * Tests pulled atoms with filtering
  */
 TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering) {
-    ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); 
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
 
     UidMap uidMap;
     SimpleAtomMatcher atomMatcher;
@@ -696,7 +716,7 @@
 }
 
 TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) {
-    ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); 
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
 
     UidMap uidMap;
     SimpleAtomMatcher atomMatcher;
@@ -1026,7 +1046,7 @@
 
 // Test value metric no condition, the pull on bucket boundary come in time and too late
 TEST(ValueMetricProducerTest, TestBucketBoundaryNoCondition) {
-    ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); 
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
     EXPECT_CALL(*pullerManager, Pull(tagId, _)).WillOnce(Return(true));
     sp<ValueMetricProducer> valueProducer =
@@ -2104,7 +2124,10 @@
     EXPECT_EQ(true, valueProducer->mHasGlobalBase);
 }
 
-TEST(ValueMetricProducerTest, TestInvalidBucketWhenOneConditionFailed) {
+/*
+ * Tests that a bucket is marked invalid when a condition change pull fails.
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenOneConditionFailed) {
     ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
 
     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
@@ -2162,9 +2185,33 @@
     EXPECT_EQ(140, curInterval.base.long_value);
     EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(true, valueProducer->mHasGlobalBase);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucket2StartTimeNs + 10, false /* include partial bucket */, true,
+                                FAST /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(0, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 2), dropEvent.drop_time_millis());
 }
 
-TEST(ValueMetricProducerTest, TestInvalidBucketWhenGuardRailHit) {
+/*
+ * Tests that a bucket is marked invalid when the guardrail is hit.
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenGuardRailHit) {
     ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
     metric.mutable_dimensions_in_what()->set_field(tagId);
     metric.mutable_dimensions_in_what()->add_child()->set_field(1);
@@ -2191,9 +2238,47 @@
     valueProducer->onConditionChanged(true, bucketStartTimeNs + 2);
     EXPECT_EQ(true, valueProducer->mCurrentBucketIsInvalid);
     EXPECT_EQ(0UL, valueProducer->mCurrentSlicedBucket.size());
+    EXPECT_EQ(0UL, valueProducer->mSkippedBuckets.size());
+
+    // Bucket 2 start.
+    vector<shared_ptr<LogEvent>> allData;
+    allData.clear();
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+    event->write(1);
+    event->write(10);
+    event->init();
+    allData.push_back(event);
+    valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
+
+    // First bucket added to mSkippedBuckets after flush.
+    EXPECT_EQ(1UL, valueProducer->mSkippedBuckets.size());
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucket2StartTimeNs + 10000, false /* include recent buckets */,
+                                true, FAST /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(0, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::DIMENSION_GUARDRAIL_REACHED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 2), dropEvent.drop_time_millis());
 }
 
-TEST(ValueMetricProducerTest, TestInvalidBucketWhenInitialPullFailed) {
+/*
+ * Tests that a bucket is marked invalid when the bucket's initial pull fails.
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenInitialPullFailed) {
     ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
 
     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
@@ -2257,9 +2342,34 @@
     EXPECT_EQ(140, curInterval.base.long_value);
     EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(true, valueProducer->mHasGlobalBase);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucket2StartTimeNs + 10000, false /* include recent buckets */,
+                                true, FAST /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(0, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 2), dropEvent.drop_time_millis());
 }
 
-TEST(ValueMetricProducerTest, TestInvalidBucketWhenLastPullFailed) {
+/*
+ * Tests that a bucket is marked invalid when the bucket's final pull fails
+ * (i.e. failed pull on bucket boundary).
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenLastPullFailed) {
     ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
 
     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
@@ -2300,8 +2410,6 @@
     allData.push_back(event);
     valueProducer->onDataPulled(allData, /** succeed */ true, bucketStartTimeNs);
 
-    // This will fail and should invalidate the whole bucket since we do not have all the data
-    // needed to compute the metric value when the screen was on.
     valueProducer->onConditionChanged(false, bucketStartTimeNs + 2);
     valueProducer->onConditionChanged(true, bucketStartTimeNs + 3);
 
@@ -2317,17 +2425,38 @@
     valueProducer->flushIfNeededLocked(bucket2StartTimeNs + 1);
 
     EXPECT_EQ(0UL, valueProducer->mPastBuckets.size());
-    // Last pull failed so based has been reset.
+    // Last pull failed so base has been reset.
     EXPECT_EQ(1UL, valueProducer->mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval& curInterval =
             valueProducer->mCurrentSlicedBucket.begin()->second[0];
     EXPECT_EQ(false, curInterval.hasBase);
     EXPECT_EQ(false, curInterval.hasValue);
     EXPECT_EQ(false, valueProducer->mHasGlobalBase);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucket2StartTimeNs + 10000, false /* include recent buckets */,
+                                true, FAST /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(0, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), dropEvent.drop_time_millis());
 }
 
 TEST(ValueMetricProducerTest, TestEmptyDataResetsBase_onDataPulled) {
-    ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); 
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
     sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
     EXPECT_CALL(*pullerManager, Pull(tagId, _))
             // Start bucket.
@@ -2520,48 +2649,6 @@
     EXPECT_EQ(true, valueProducer->mHasGlobalBase);
 }
 
-TEST(ValueMetricProducerTest, TestBucketIncludingUnknownConditionIsInvalid) {
-    ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
-    metric.mutable_dimensions_in_what()->set_field(tagId);
-    metric.mutable_dimensions_in_what()->add_child()->set_field(1);
-    metric.set_condition(StringToId("SCREEN_ON"));
-
-    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
-    EXPECT_CALL(*pullerManager, Pull(tagId, _))
-            // Second onConditionChanged.
-            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
-                data->clear();
-                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
-                event->write(tagId);
-                event->write(2);
-                event->write(2);
-                event->init();
-                data->push_back(event);
-                return true;
-            }));
-
-    sp<ValueMetricProducer> valueProducer =
-            ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
-    valueProducer->mCondition = ConditionState::kUnknown;
-
-    valueProducer->onConditionChanged(false, bucketStartTimeNs + 10);
-    valueProducer->onConditionChanged(true, bucketStartTimeNs + 20);
-
-    // End of bucket
-    vector<shared_ptr<LogEvent>> allData;
-    allData.clear();
-    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
-    event->write(4);
-    event->write(4);
-    event->init();
-    allData.push_back(event);
-    valueProducer->onDataPulled(allData, /** succeed */ true, bucket2StartTimeNs);
-
-    // Bucket is incomplete so it is mark as invalid, however the base is fine since the last pull
-    // succeeded.
-    EXPECT_EQ(0UL, valueProducer->mPastBuckets.size());
-}
-
 TEST(ValueMetricProducerTest, TestFullBucketResetWhenLastBucketInvalid) {
     ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
 
@@ -2750,6 +2837,7 @@
     assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {2}, {2});
 }
 
+// TODO: b/145705635 fix or delete this test
 TEST(ValueMetricProducerTest, TestBucketInvalidIfGlobalBaseIsNotSet) {
     ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
 
@@ -2797,25 +2885,8 @@
     assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {}, {});
 }
 
-static StatsLogReport outputStreamToProto(ProtoOutputStream* proto) {
-    vector<uint8_t> bytes;
-    bytes.resize(proto->size());
-    size_t pos = 0;
-    sp<ProtoReader> reader = proto->data();
-    while (reader->readBuffer() != NULL) {
-        size_t toRead = reader->currentToRead();
-        std::memcpy(&((bytes)[pos]), reader->readBuffer(), toRead);
-        pos += toRead;
-        reader->move(toRead);
-    }
-
-    StatsLogReport report;
-    report.ParseFromArray(bytes.data(), bytes.size());
-    return report;
-}
-
 TEST(ValueMetricProducerTest, TestPullNeededFastDump) {
-    ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); 
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
 
     UidMap uidMap;
     SimpleAtomMatcher atomMatcher;
@@ -2857,7 +2928,7 @@
 }
 
 TEST(ValueMetricProducerTest, TestFastDumpWithoutCurrentBucket) {
-    ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); 
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
 
     UidMap uidMap;
     SimpleAtomMatcher atomMatcher;
@@ -2910,7 +2981,7 @@
 }
 
 TEST(ValueMetricProducerTest, TestPullNeededNoTimeConstraints) {
-    ValueMetric metric = ValueMetricProducerTestHelper::createMetric(); 
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetric();
 
     UidMap uidMap;
     SimpleAtomMatcher atomMatcher;
@@ -3103,6 +3174,600 @@
     assertPastBucketValuesSingleKey(valueProducer->mPastBuckets, {}, {});
 }
 
+/*
+ * Test that DUMP_REPORT_REQUESTED dump reason is logged.
+ *
+ * For the bucket to be marked invalid during a dump report requested,
+ * three things must be true:
+ * - we want to include the current partial bucket
+ * - we need a pull (metric is pulled and condition is true)
+ * - the dump latency must be FAST
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenDumpReportRequested) {
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Condition change to true.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
+                event->write("field1");
+                event->write(10);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    sp<ValueMetricProducer> valueProducer =
+            ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
+
+    // Condition change event.
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 20);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucketStartTimeNs + 40, true /* include recent buckets */, true,
+                                FAST /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(0, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::DUMP_REPORT_REQUESTED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 40), dropEvent.drop_time_millis());
+}
+
+/*
+ * Test that EVENT_IN_WRONG_BUCKET dump reason is logged for a late condition
+ * change event (i.e. the condition change occurs in the wrong bucket).
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenConditionEventWrongBucket) {
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Condition change to true.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 50);
+                event->write("field1");
+                event->write(10);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    sp<ValueMetricProducer> valueProducer =
+            ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
+
+    // Condition change event.
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 50);
+
+    // Bucket boundary pull.
+    vector<shared_ptr<LogEvent>> allData;
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs);
+    event->write("field1");
+    event->write(15);
+    event->init();
+    allData.push_back(event);
+    valueProducer->onDataPulled(allData, /** succeeds */ true, bucket2StartTimeNs + 1);
+
+    // Late condition change event.
+    valueProducer->onConditionChanged(false, bucket2StartTimeNs - 100);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucket2StartTimeNs + 100, true /* include recent buckets */, true,
+                                NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(1, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket3StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::EVENT_IN_WRONG_BUCKET, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs - 100), dropEvent.drop_time_millis());
+}
+
+/*
+ * Test that EVENT_IN_WRONG_BUCKET dump reason is logged for a late accumulate
+ * event (i.e. the accumulate events call occurs in the wrong bucket).
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenAccumulateEventWrongBucket) {
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Condition change to true.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 50);
+                event->write("field1");
+                event->write(10);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            // Dump report requested.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 100);
+                event->write("field1");
+                event->write(15);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    sp<ValueMetricProducer> valueProducer =
+            ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
+
+    // Condition change event.
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 50);
+
+    // Bucket boundary pull.
+    vector<shared_ptr<LogEvent>> allData;
+    shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs);
+    event->write("field1");
+    event->write(15);
+    event->init();
+    allData.push_back(event);
+    valueProducer->onDataPulled(allData, /** succeeds */ true, bucket2StartTimeNs + 1);
+
+    allData.clear();
+    event = make_shared<LogEvent>(tagId, bucket2StartTimeNs - 100);
+    event->write("field1");
+    event->write(20);
+    event->init();
+    allData.push_back(event);
+
+    // Late accumulateEvents event.
+    valueProducer->accumulateEvents(allData, bucket2StartTimeNs - 100, bucket2StartTimeNs - 100);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucket2StartTimeNs + 100, true /* include recent buckets */, true,
+                                NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(1, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket3StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::EVENT_IN_WRONG_BUCKET, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs - 100), dropEvent.drop_time_millis());
+}
+
+/*
+ * Test that CONDITION_UNKNOWN dump reason is logged due to an unknown condition
+ * when a metric is initialized.
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenConditionUnknown) {
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Condition change to true.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 50);
+                event->write("field1");
+                event->write(10);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            // Dump report requested.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 100);
+                event->write("field1");
+                event->write(15);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    sp<ValueMetricProducer> valueProducer =
+            ValueMetricProducerTestHelper::createValueProducerWithNoInitialCondition(pullerManager,
+                                                                                     metric);
+
+    // Condition change event.
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 50);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucketStartTimeNs + 100, true /* include recent buckets */, true,
+                                NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(0, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::CONDITION_UNKNOWN, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 100), dropEvent.drop_time_millis());
+}
+
+/*
+ * Test that PULL_FAILED dump reason is logged due to a pull failure in
+ * #pullAndMatchEventsLocked.
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenPullFailed) {
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Condition change to true.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 50);
+                event->write("field1");
+                event->write(10);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            // Dump report requested, pull fails.
+            .WillOnce(Return(false));
+
+    sp<ValueMetricProducer> valueProducer =
+            ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
+
+    // Condition change event.
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 50);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucketStartTimeNs + 100, true /* include recent buckets */, true,
+                                NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(0, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 100), dropEvent.drop_time_millis());
+}
+
+/*
+ * Test that MULTIPLE_BUCKETS_SKIPPED dump reason is logged when a log event
+ * skips over more than one bucket.
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestInvalidBucketWhenMultipleBucketsSkipped) {
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Condition change to true.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+                event->write("field1");
+                event->write(10);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            // Dump report requested.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event =
+                        make_shared<LogEvent>(tagId, bucket4StartTimeNs + 1000);
+                event->write("field1");
+                event->write(15);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    sp<ValueMetricProducer> valueProducer =
+            ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
+
+    // Condition change event.
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 10);
+
+    // Condition change event that skips forward by three buckets.
+    valueProducer->onConditionChanged(false, bucket4StartTimeNs + 10);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucket4StartTimeNs + 1000, true /* include recent buckets */, true,
+                                NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(0, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::MULTIPLE_BUCKETS_SKIPPED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucket4StartTimeNs + 10), dropEvent.drop_time_millis());
+}
+
+/*
+ * Test that BUCKET_TOO_SMALL dump reason is logged when a flushed bucket size
+ * is smaller than the "min_bucket_size_nanos" specified in the metric config.
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenBucketTooSmall) {
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+    metric.set_min_bucket_size_nanos(10000000000);  // 10 seconds
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Condition change to true.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+                event->write("field1");
+                event->write(10);
+                event->init();
+                data->push_back(event);
+                return true;
+            }))
+            // Dump report requested.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event =
+                        make_shared<LogEvent>(tagId, bucketStartTimeNs + 9000000);
+                event->write("field1");
+                event->write(15);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    sp<ValueMetricProducer> valueProducer =
+            ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
+
+    // Condition change event.
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 10);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucketStartTimeNs + 9000000, true /* include recent buckets */,
+                                true, NO_TIME_CONSTRAINTS /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(0, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 9000000),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::BUCKET_TOO_SMALL, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 9000000), dropEvent.drop_time_millis());
+}
+
+/*
+ * Test multiple bucket drop events in the same bucket.
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestMultipleBucketDropEvents) {
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // Condition change to true.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
+                event->write("field1");
+                event->write(10);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    sp<ValueMetricProducer> valueProducer =
+            ValueMetricProducerTestHelper::createValueProducerWithNoInitialCondition(pullerManager,
+                                                                                     metric);
+
+    // Condition change event.
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 10);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    valueProducer->onDumpReport(bucketStartTimeNs + 1000, true /* include recent buckets */, true,
+                                FAST /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(0, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(2, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::CONDITION_UNKNOWN, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 10), dropEvent.drop_time_millis());
+
+    dropEvent = report.value_metrics().skipped(0).drop_event(1);
+    EXPECT_EQ(BucketDropReason::DUMP_REPORT_REQUESTED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 1000), dropEvent.drop_time_millis());
+}
+
+/*
+ * Test that the number of logged bucket drop events is capped at the maximum.
+ * The maximum is currently 10 and is set in MetricProducer::maxDropEventsReached().
+ */
+TEST(ValueMetricProducerTest_BucketDrop, TestMaxBucketDropEvents) {
+    ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
+
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            // First condition change event.
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                for (int i = 0; i < 2000; i++) {
+                    shared_ptr<LogEvent> event =
+                            make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
+                    event->write(i);
+                    event->write(i);
+                    event->init();
+                    data->push_back(event);
+                }
+                return true;
+            }))
+            .WillOnce(Return(false))
+            .WillOnce(Return(false))
+            .WillOnce(Return(false))
+            .WillOnce(Return(false))
+            .WillOnce(Return(false))
+            .WillOnce(Return(false))
+            .WillOnce(Return(false))
+            .WillOnce(Return(false))
+            .WillOnce(Return(false))
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 220);
+                event->write("field1");
+                event->write(10);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    sp<ValueMetricProducer> valueProducer =
+            ValueMetricProducerTestHelper::createValueProducerWithNoInitialCondition(pullerManager,
+                                                                                     metric);
+
+    // First condition change event causes guardrail to be reached.
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 10);
+
+    // 2-10 condition change events result in failed pulls.
+    valueProducer->onConditionChanged(false, bucketStartTimeNs + 30);
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 50);
+    valueProducer->onConditionChanged(false, bucketStartTimeNs + 70);
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 90);
+    valueProducer->onConditionChanged(false, bucketStartTimeNs + 100);
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 150);
+    valueProducer->onConditionChanged(false, bucketStartTimeNs + 170);
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 190);
+    valueProducer->onConditionChanged(false, bucketStartTimeNs + 200);
+
+    // Condition change event 11
+    valueProducer->onConditionChanged(true, bucketStartTimeNs + 220);
+
+    // Check dump report.
+    ProtoOutputStream output;
+    std::set<string> strSet;
+    // Because we already have 10 dump events in the current bucket,
+    // this case should not be added to the list of dump events.
+    valueProducer->onDumpReport(bucketStartTimeNs + 1000, true /* include recent buckets */, true,
+                                FAST /* dumpLatency */, &strSet, &output);
+
+    StatsLogReport report = outputStreamToProto(&output);
+    EXPECT_TRUE(report.has_value_metrics());
+    EXPECT_EQ(0, report.value_metrics().data_size());
+    EXPECT_EQ(1, report.value_metrics().skipped_size());
+
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
+              report.value_metrics().skipped(0).start_bucket_elapsed_millis());
+    EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
+              report.value_metrics().skipped(0).end_bucket_elapsed_millis());
+    EXPECT_EQ(10, report.value_metrics().skipped(0).drop_event_size());
+
+    auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
+    EXPECT_EQ(BucketDropReason::CONDITION_UNKNOWN, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 10), dropEvent.drop_time_millis());
+
+    dropEvent = report.value_metrics().skipped(0).drop_event(1);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 30), dropEvent.drop_time_millis());
+
+    dropEvent = report.value_metrics().skipped(0).drop_event(2);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 50), dropEvent.drop_time_millis());
+
+    dropEvent = report.value_metrics().skipped(0).drop_event(3);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 70), dropEvent.drop_time_millis());
+
+    dropEvent = report.value_metrics().skipped(0).drop_event(4);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 90), dropEvent.drop_time_millis());
+
+    dropEvent = report.value_metrics().skipped(0).drop_event(5);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 100), dropEvent.drop_time_millis());
+
+    dropEvent = report.value_metrics().skipped(0).drop_event(6);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 150), dropEvent.drop_time_millis());
+
+    dropEvent = report.value_metrics().skipped(0).drop_event(7);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 170), dropEvent.drop_time_millis());
+
+    dropEvent = report.value_metrics().skipped(0).drop_event(8);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 190), dropEvent.drop_time_millis());
+
+    dropEvent = report.value_metrics().skipped(0).drop_event(9);
+    EXPECT_EQ(BucketDropReason::PULL_FAILED, dropEvent.drop_reason());
+    EXPECT_EQ(NanoToMillis(bucketStartTimeNs + 200), dropEvent.drop_time_millis());
+}
+
 }  // namespace statsd
 }  // namespace os
 }  // namespace android
diff --git a/cmds/statsd/tests/statsd_test_util.cpp b/cmds/statsd/tests/statsd_test_util.cpp
index d154b1b..7b651df 100644
--- a/cmds/statsd/tests/statsd_test_util.cpp
+++ b/cmds/statsd/tests/statsd_test_util.cpp
@@ -18,6 +18,23 @@
 namespace os {
 namespace statsd {
 
+StatsLogReport outputStreamToProto(ProtoOutputStream* proto) {
+    vector<uint8_t> bytes;
+    bytes.resize(proto->size());
+    size_t pos = 0;
+    sp<ProtoReader> reader = proto->data();
+
+    while (reader->readBuffer() != NULL) {
+        size_t toRead = reader->currentToRead();
+        std::memcpy(&((bytes)[pos]), reader->readBuffer(), toRead);
+        pos += toRead;
+        reader->move(toRead);
+    }
+
+    StatsLogReport report;
+    report.ParseFromArray(bytes.data(), bytes.size());
+    return report;
+}
 
 AtomMatcher CreateSimpleAtomMatcher(const string& name, int atomId) {
     AtomMatcher atom_matcher;
diff --git a/cmds/statsd/tests/statsd_test_util.h b/cmds/statsd/tests/statsd_test_util.h
index e1e134b..010c194 100644
--- a/cmds/statsd/tests/statsd_test_util.h
+++ b/cmds/statsd/tests/statsd_test_util.h
@@ -27,8 +27,12 @@
 namespace os {
 namespace statsd {
 
+using android::util::ProtoReader;
 using google::protobuf::RepeatedPtrField;
 
+// Converts a ProtoOutputStream to a StatsLogReport proto.
+StatsLogReport outputStreamToProto(ProtoOutputStream* proto);
+
 // Create AtomMatcher proto to simply match a specific atom type.
 AtomMatcher CreateSimpleAtomMatcher(const string& name, int atomId);