Slice by state in ValueMetricProducer
- Added #onStateChanged logic to ValueMetricProducer
- #onDumpReport fills in slice_by_state values in ValueMetricProducer
- Base information is tracked in a separate data structure and keyed by
dimensions_in_what HashableDimensionKey. The current state key is also
stored in base info so we can close the interval for this state once we
get a new state key.
- Added unit tests for ValueMetricProducer onStateChange
Other changes:
- mCondition parameter was removed from #pullAndMatchEventsLocked and
#accumulateEvents because it is unused in these functions.
- The event key is initialized in each metric's internal
onMatchedLogEvent function. This allows ValueMetric to skip events that
have primary keys that do not match the current state change primary
key.
Test: bit statsd_test:*
Change-Id: I565c6d251262be57abf10fdef243b8bdc01f3772
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index d8f399f..d2db6e9 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -66,6 +66,7 @@
const int FIELD_ID_DIMENSION_IN_WHAT = 1;
const int FIELD_ID_BUCKET_INFO = 3;
const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
+const int FIELD_ID_SLICE_BY_STATE = 6;
// for ValueBucketInfo
const int FIELD_ID_VALUE_INDEX = 1;
const int FIELD_ID_VALUE_LONG = 2;
@@ -146,6 +147,14 @@
mConditionSliced = true;
}
+ for (const auto& stateLink : metric.state_link()) {
+ Metric2State ms;
+ ms.stateAtomId = stateLink.state_atom_id();
+ translateFieldMatcher(stateLink.fields_in_what(), &ms.metricFields);
+ translateFieldMatcher(stateLink.fields_in_state(), &ms.stateFields);
+ mMetric2StateLinks.push_back(ms);
+ }
+
int64_t numBucketsForward = calcBucketsForwardCount(startTimeNs);
mCurrentBucketNum += numBucketsForward;
@@ -181,6 +190,33 @@
}
}
+void ValueMetricProducer::onStateChanged(int64_t eventTimeNs, int32_t atomId,
+ const HashableDimensionKey& primaryKey, int oldState,
+ int newState) {
+ VLOG("ValueMetric %lld onStateChanged time %lld, State %d, key %s, %d -> %d",
+ (long long)mMetricId, (long long)eventTimeNs, atomId, primaryKey.toString().c_str(),
+ oldState, newState);
+ // If condition is not true, we do not need to pull for this state change.
+ if (mCondition != ConditionState::kTrue) {
+ return;
+ }
+ bool isEventLate = eventTimeNs < mCurrentBucketStartTimeNs;
+ if (isEventLate) {
+ VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
+ (long long)mCurrentBucketStartTimeNs);
+ invalidateCurrentBucket(eventTimeNs, BucketDropReason::EVENT_IN_WRONG_BUCKET);
+ return;
+ }
+ mStateChangePrimaryKey.first = atomId;
+ mStateChangePrimaryKey.second = primaryKey;
+ if (mIsPulled) {
+ pullAndMatchEventsLocked(eventTimeNs);
+ }
+ mStateChangePrimaryKey.first = 0;
+ mStateChangePrimaryKey.second = DEFAULT_DIMENSION_KEY;
+ flushIfNeededLocked(eventTimeNs);
+}
+
void ValueMetricProducer::onSlicedConditionMayChangeLocked(bool overallCondition,
const int64_t eventTime) {
VLOG("Metric %lld onSlicedConditionMayChange", (long long)mMetricId);
@@ -281,6 +317,14 @@
FIELD_ID_DIMENSION_LEAF_IN_WHAT, str_set, protoOutput);
}
+ // Then fill slice_by_state.
+ for (auto state : dimensionKey.getStateValuesKey().getValues()) {
+ uint64_t stateToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED |
+ FIELD_ID_SLICE_BY_STATE);
+ writeStateToProto(state, protoOutput);
+ protoOutput->end(stateToken);
+ }
+
// Then fill bucket_info (ValueBucketInfo).
for (const auto& bucket : pair.second) {
uint64_t bucketInfoToken = protoOutput->start(
@@ -300,7 +344,7 @@
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_CONDITION_TRUE_NS,
(long long)bucket.mConditionTrueNs);
}
- for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) {
+ for (int i = 0; i < (int)bucket.valueIndex.size(); i++) {
int index = bucket.valueIndex[i];
const Value& value = bucket.values[i];
uint64_t valueToken = protoOutput->start(
@@ -358,9 +402,10 @@
}
void ValueMetricProducer::resetBase() {
- for (auto& slice : mCurrentSlicedBucket) {
- for (auto& interval : slice.second) {
- interval.hasBase = false;
+ for (auto& slice : mCurrentBaseInfo) {
+ for (auto& baseInfo : slice.second) {
+ baseInfo.hasBase = false;
+ baseInfo.hasCurrentState = false;
}
}
mHasGlobalBase = false;
@@ -558,14 +603,20 @@
onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
}
}
- // If the new pulled data does not contains some keys we track in our intervals, we need to
- // reset the base.
+ // If a key that is:
+ // 1. Tracked in mCurrentSlicedBucket and
+ // 2. A superset of the current mStateChangePrimaryKey
+ // was not found in the new pulled data (i.e. not in mMatchedDimensionInWhatKeys)
+ // then we need to reset the base.
for (auto& slice : mCurrentSlicedBucket) {
- bool presentInPulledData = mMatchedMetricDimensionKeys.find(slice.first)
- != mMatchedMetricDimensionKeys.end();
- if (!presentInPulledData) {
- for (auto& interval : slice.second) {
- interval.hasBase = false;
+ const auto& whatKey = slice.first.getDimensionKeyInWhat();
+ bool presentInPulledData =
+ mMatchedMetricDimensionKeys.find(whatKey) != mMatchedMetricDimensionKeys.end();
+ if (!presentInPulledData && whatKey.contains(mStateChangePrimaryKey.second)) {
+ auto it = mCurrentBaseInfo.find(whatKey);
+ for (auto& baseInfo : it->second) {
+ baseInfo.hasBase = false;
+ baseInfo.hasCurrentState = false;
}
}
}
@@ -674,17 +725,30 @@
return false;
}
-void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
- const MetricDimensionKey& eventKey,
- const ConditionKey& conditionKey,
- bool condition, const LogEvent& event) {
+void ValueMetricProducer::onMatchedLogEventInternalLocked(
+ const size_t matcherIndex, const MetricDimensionKey& eventKey,
+ const ConditionKey& conditionKey, bool condition, const LogEvent& event,
+ const map<int, HashableDimensionKey>& statePrimaryKeys) {
+ auto whatKey = eventKey.getDimensionKeyInWhat();
+ auto stateKey = eventKey.getStateValuesKey();
+
+ // Skip this event if a state changed occurred for a different primary key.
+ auto it = statePrimaryKeys.find(mStateChangePrimaryKey.first);
+ // Check that both the atom id and the primary key are equal.
+ if (it != statePrimaryKeys.end() && it->second != mStateChangePrimaryKey.second) {
+ VLOG("ValueMetric skip event with primary key %s because state change primary key "
+ "is %s",
+ it->second.toString().c_str(), mStateChangePrimaryKey.second.toString().c_str());
+ return;
+ }
+
int64_t eventTimeNs = event.GetElapsedTimestampNs();
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
(long long)mCurrentBucketStartTimeNs);
return;
}
- mMatchedMetricDimensionKeys.insert(eventKey);
+ mMatchedMetricDimensionKeys.insert(whatKey);
if (!mIsPulled) {
// We cannot flush without doing a pull first.
@@ -709,10 +773,26 @@
if (hitGuardRailLocked(eventKey)) {
return;
}
- vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey];
- if (multiIntervals.size() < mFieldMatchers.size()) {
+ vector<BaseInfo>& baseInfos = mCurrentBaseInfo[whatKey];
+ if (baseInfos.size() < mFieldMatchers.size()) {
VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
- multiIntervals.resize(mFieldMatchers.size());
+ baseInfos.resize(mFieldMatchers.size());
+ }
+
+ for (auto baseInfo : baseInfos) {
+ if (!baseInfo.hasCurrentState) {
+ baseInfo.currentState = DEFAULT_DIMENSION_KEY;
+ baseInfo.hasCurrentState = true;
+ }
+ }
+
+ // We need to get the intervals stored with the previous state key so we can
+ // close these value intervals.
+ const auto oldStateKey = baseInfos[0].currentState;
+ vector<Interval>& intervals = mCurrentSlicedBucket[MetricDimensionKey(whatKey, oldStateKey)];
+ if (intervals.size() < mFieldMatchers.size()) {
+ VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
+ intervals.resize(mFieldMatchers.size());
}
// We only use anomaly detection under certain cases.
@@ -725,7 +805,8 @@
for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
const Matcher& matcher = mFieldMatchers[i];
- Interval& interval = multiIntervals[i];
+ BaseInfo& baseInfo = baseInfos[i];
+ Interval& interval = intervals[i];
interval.valueIndex = i;
Value value;
if (!getDoubleOrLong(event, matcher, value)) {
@@ -736,60 +817,61 @@
interval.seenNewData = true;
if (mUseDiff) {
- if (!interval.hasBase) {
+ if (!baseInfo.hasBase) {
if (mHasGlobalBase && mUseZeroDefaultBase) {
// The bucket has global base. This key does not.
// Optionally use zero as base.
- interval.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
- interval.hasBase = true;
+ baseInfo.base = (value.type == LONG ? ZERO_LONG : ZERO_DOUBLE);
+ baseInfo.hasBase = true;
} else {
// no base. just update base and return.
- interval.base = value;
- interval.hasBase = true;
+ baseInfo.base = value;
+ baseInfo.hasBase = true;
// If we're missing a base, do not use anomaly detection on incomplete data
useAnomalyDetection = false;
- // Continue (instead of return) here in order to set interval.base and
- // interval.hasBase for other intervals
+ // Continue (instead of return) here in order to set baseInfo.base and
+ // baseInfo.hasBase for other baseInfos
continue;
}
}
+
Value diff;
switch (mValueDirection) {
case ValueMetric::INCREASING:
- if (value >= interval.base) {
- diff = value - interval.base;
+ if (value >= baseInfo.base) {
+ diff = value - baseInfo.base;
} else if (mUseAbsoluteValueOnReset) {
diff = value;
} else {
VLOG("Unexpected decreasing value");
StatsdStats::getInstance().notePullDataError(mPullTagId);
- interval.base = value;
+ baseInfo.base = value;
// If we've got bad data, do not use anomaly detection
useAnomalyDetection = false;
continue;
}
break;
case ValueMetric::DECREASING:
- if (interval.base >= value) {
- diff = interval.base - value;
+ if (baseInfo.base >= value) {
+ diff = baseInfo.base - value;
} else if (mUseAbsoluteValueOnReset) {
diff = value;
} else {
VLOG("Unexpected increasing value");
StatsdStats::getInstance().notePullDataError(mPullTagId);
- interval.base = value;
+ baseInfo.base = value;
// If we've got bad data, do not use anomaly detection
useAnomalyDetection = false;
continue;
}
break;
case ValueMetric::ANY:
- diff = value - interval.base;
+ diff = value - baseInfo.base;
break;
default:
break;
}
- interval.base = value;
+ baseInfo.base = value;
value = diff;
}
@@ -814,12 +896,13 @@
interval.hasValue = true;
}
interval.sampleSize += 1;
+ baseInfo.currentState = stateKey;
}
// Only trigger the tracker if all intervals are correct
if (useAnomalyDetection) {
// TODO: propgate proper values down stream when anomaly support doubles
- long wholeBucketVal = multiIntervals[0].value.long_value;
+ long wholeBucketVal = intervals[0].value.long_value;
auto prev = mCurrentFullBucket.find(eventKey);
if (prev != mCurrentFullBucket.end()) {
wholeBucketVal += prev->second;
@@ -953,6 +1036,7 @@
} else {
it++;
}
+ // TODO: remove mCurrentBaseInfo entries when obsolete
}
mCurrentBucketIsInvalid = false;