multi-value aggregation in ValueMetric
Allow aggregation on multiple fields, instead of one at a time.
All these fields should use the same aggregation time, use_diff,
direction, etc.
The config reuses value_field but allows multiple fields to be
specified.
The order they are specified determines the "index" of a value in the
output.
Bug: 119217634
Test: unit test
Change-Id: I38b1465d13723a897b30ee0b4f868498f60ad4db
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index c8b1cf0..7250b17 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -64,8 +64,10 @@
const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
// for ValueBucketInfo
-const int FIELD_ID_VALUE_LONG = 7;
-const int FIELD_ID_VALUE_DOUBLE = 8;
+const int FIELD_ID_VALUE_INDEX = 1;
+const int FIELD_ID_VALUE_LONG = 2;
+const int FIELD_ID_VALUE_DOUBLE = 3;
+const int FIELD_ID_VALUES = 9;
const int FIELD_ID_BUCKET_NUM = 4;
const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
@@ -78,7 +80,6 @@
const sp<StatsPullerManager>& pullerManager)
: MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
mPullerManager(pullerManager),
- mValueField(metric.value_field()),
mPullTagId(pullTagId),
mIsPulled(pullTagId != -1),
mMinBucketSizeNs(metric.min_bucket_size_nanos()),
@@ -103,6 +104,9 @@
}
mBucketSizeNs = bucketSizeMills * 1000000;
+
+ translateFieldMatcher(metric.value_field(), &mFieldMatchers);
+
if (metric.has_dimensions_in_what()) {
translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
@@ -122,9 +126,6 @@
}
}
- if (mValueField.child_size() > 0) {
- mField = mValueField.child(0).field();
- }
mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
HasPositionALL(metric.dimensions_in_condition());
@@ -259,18 +260,27 @@
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
(long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
}
- if (bucket.value.getType() == LONG) {
- protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
- (long long)bucket.value.long_value);
- VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
- (long long)bucket.mBucketEndNs, (long long)bucket.value.long_value);
- } else if (bucket.value.getType() == DOUBLE) {
- protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
- bucket.value.double_value);
- VLOG("\t bucket [%lld - %lld] count: %.2f", (long long)bucket.mBucketStartNs,
- (long long)bucket.mBucketEndNs, bucket.value.double_value);
- } else {
- VLOG("Wrong value type for ValueMetric output: %d", bucket.value.getType());
+ for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) {
+ int index = bucket.valueIndex[i];
+ const Value& value = bucket.values[i];
+ uint64_t valueToken = protoOutput->start(
+ FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
+ protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX,
+ index);
+ if (value.getType() == LONG) {
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
+ (long long)value.long_value);
+ VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs,
+ (long long)bucket.mBucketEndNs, index, (long long)value.long_value);
+ } else if (value.getType() == DOUBLE) {
+ protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
+ value.double_value);
+ VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs,
+ (long long)bucket.mBucketEndNs, index, value.double_value);
+ } else {
+ VLOG("Wrong value type for ValueMetric output: %d", value.getType());
+ }
+ protoOutput->end(valueToken);
}
protoOutput->end(bucketInfoToken);
}
@@ -303,7 +313,9 @@
// when condition change from true to false, clear diff base
if (mUseDiff && mCondition && !condition) {
for (auto& slice : mCurrentSlicedBucket) {
- slice.second.hasBase = false;
+ for (auto& interval : slice.second) {
+ interval.hasBase = false;
+ }
}
}
@@ -363,10 +375,12 @@
(unsigned long)mCurrentSlicedBucket.size());
if (verbose) {
for (const auto& it : mCurrentSlicedBucket) {
+ for (const auto& interval : it.second) {
fprintf(out, "\t(what)%s\t(condition)%s (value)%s\n",
it.first.getDimensionKeyInWhat().toString().c_str(),
it.first.getDimensionKeyInCondition().toString().c_str(),
- it.second.value.toString().c_str());
+ interval.value.toString().c_str());
+ }
}
}
}
@@ -391,25 +405,29 @@
return false;
}
-const Value getDoubleOrLong(const Value& value) {
- Value v;
- switch (value.type) {
- case INT:
- v.setLong(value.int_value);
- break;
- case LONG:
- v.setLong(value.long_value);
- break;
- case FLOAT:
- v.setDouble(value.float_value);
- break;
- case DOUBLE:
- v.setDouble(value.double_value);
- break;
- default:
- break;
+bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
+ for (const FieldValue& value : event.getValues()) {
+ if (value.mField.matches(matcher)) {
+ switch (value.mValue.type) {
+ case INT:
+ ret.setLong(value.mValue.int_value);
+ break;
+ case LONG:
+ ret.setLong(value.mValue.long_value);
+ break;
+ case FLOAT:
+ ret.setDouble(value.mValue.float_value);
+ break;
+ case DOUBLE:
+ ret.setDouble(value.mValue.double_value);
+ break;
+ default:
+ break;
+ }
+ return true;
+ }
}
- return v;
+ return false;
}
void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
@@ -436,82 +454,90 @@
if (hitGuardRailLocked(eventKey)) {
return;
}
- Interval& interval = mCurrentSlicedBucket[eventKey];
-
- if (mField > event.size()) {
- VLOG("Failed to extract value field %d from atom %s. %d", mField, event.ToString().c_str(),
- (int)event.size());
- return;
+ vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey];
+ if (multiIntervals.size() < mFieldMatchers.size()) {
+ VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
+ multiIntervals.resize(mFieldMatchers.size());
}
- Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue);
- if (mUseDiff) {
- // no base. just update base and return.
- if (!interval.hasBase) {
- interval.base = value;
- interval.hasBase = true;
+ for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
+ const Matcher& matcher = mFieldMatchers[i];
+ Interval& interval = multiIntervals[i];
+ interval.valueIndex = i;
+ Value value;
+ if (!getDoubleOrLong(event, matcher, value)) {
+ VLOG("Failed to get value %d from event %s", i, event.ToString().c_str());
return;
}
- Value diff;
- switch (mValueDirection) {
- case ValueMetric::INCREASING:
- if (value >= interval.base) {
- diff = value - interval.base;
- } else if (mUseAbsoluteValueOnReset) {
- diff = value;
- } else {
- VLOG("Unexpected decreasing value");
- StatsdStats::getInstance().notePullDataError(mPullTagId);
- interval.base = value;
- return;
- }
- break;
- case ValueMetric::DECREASING:
- if (interval.base >= value) {
- diff = interval.base - value;
- } else if (mUseAbsoluteValueOnReset) {
- diff = value;
- } else {
- VLOG("Unexpected increasing value");
- StatsdStats::getInstance().notePullDataError(mPullTagId);
- interval.base = value;
- return;
- }
- break;
- case ValueMetric::ANY:
- diff = value - interval.base;
- break;
- default:
- break;
- }
- interval.base = value;
- value = diff;
- }
- if (interval.hasValue) {
- switch (mAggregationType) {
- case ValueMetric::SUM:
- // for AVG, we add up and take average when flushing the bucket
- case ValueMetric::AVG:
- interval.value += value;
- break;
- case ValueMetric::MIN:
- interval.value = std::min(value, interval.value);
- break;
- case ValueMetric::MAX:
- interval.value = std::max(value, interval.value);
- break;
- default:
- break;
+ if (mUseDiff) {
+ // no base. just update base and return.
+ if (!interval.hasBase) {
+ interval.base = value;
+ interval.hasBase = true;
+ return;
+ }
+ Value diff;
+ switch (mValueDirection) {
+ case ValueMetric::INCREASING:
+ if (value >= interval.base) {
+ diff = value - interval.base;
+ } else if (mUseAbsoluteValueOnReset) {
+ diff = value;
+ } else {
+ VLOG("Unexpected decreasing value");
+ StatsdStats::getInstance().notePullDataError(mPullTagId);
+ interval.base = value;
+ return;
+ }
+ break;
+ case ValueMetric::DECREASING:
+ if (interval.base >= value) {
+ diff = interval.base - value;
+ } else if (mUseAbsoluteValueOnReset) {
+ diff = value;
+ } else {
+ VLOG("Unexpected increasing value");
+ StatsdStats::getInstance().notePullDataError(mPullTagId);
+ interval.base = value;
+ return;
+ }
+ break;
+ case ValueMetric::ANY:
+ diff = value - interval.base;
+ break;
+ default:
+ break;
+ }
+ interval.base = value;
+ value = diff;
}
- } else {
- interval.value = value;
- interval.hasValue = true;
+
+ if (interval.hasValue) {
+ switch (mAggregationType) {
+ case ValueMetric::SUM:
+ // for AVG, we add up and take average when flushing the bucket
+ case ValueMetric::AVG:
+ interval.value += value;
+ break;
+ case ValueMetric::MIN:
+ interval.value = std::min(value, interval.value);
+ break;
+ case ValueMetric::MAX:
+ interval.value = std::max(value, interval.value);
+ break;
+ default:
+ break;
+ }
+ } else {
+ interval.value = value;
+ interval.hasValue = true;
+ }
+ interval.sampleSize += 1;
}
- interval.sampleSize += 1;
// TODO: propgate proper values down stream when anomaly support doubles
- long wholeBucketVal = interval.value.long_value;
+ long wholeBucketVal = multiIntervals[0].value.long_value;
auto prev = mCurrentFullBucket.find(eventKey);
if (prev != mCurrentFullBucket.end()) {
wholeBucketVal += prev->second;
@@ -540,7 +566,9 @@
VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
// take base again in future good bucket.
for (auto& slice : mCurrentSlicedBucket) {
- slice.second.hasBase = false;
+ for (auto& interval : slice.second) {
+ interval.hasBase = false;
+ }
}
}
VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
@@ -552,37 +580,38 @@
(int)mCurrentSlicedBucket.size());
int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
- ValueBucket info;
- info.mBucketStartNs = mCurrentBucketStartTimeNs;
- if (eventTimeNs < fullBucketEndTimeNs) {
- info.mBucketEndNs = eventTimeNs;
- } else {
- info.mBucketEndNs = fullBucketEndTimeNs;
- }
+ int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
- if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+ if (bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
// The current bucket is large enough to keep.
for (const auto& slice : mCurrentSlicedBucket) {
- if (slice.second.hasValue) {
- // skip the output if the diff is zero
- if (mSkipZeroDiffOutput && mUseDiff && slice.second.value.isZero()) {
- continue;
+ ValueBucket bucket;
+ bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
+ bucket.mBucketEndNs = bucketEndTime;
+ for (const auto& interval : slice.second) {
+ if (interval.hasValue) {
+ // skip the output if the diff is zero
+ if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
+ continue;
+ }
+ bucket.valueIndex.push_back(interval.valueIndex);
+ if (mAggregationType != ValueMetric::AVG) {
+ bucket.values.push_back(interval.value);
+ } else {
+ double sum = interval.value.type == LONG ? (double)interval.value.long_value
+ : interval.value.double_value;
+ bucket.values.push_back(Value((double)sum / interval.sampleSize));
+ }
}
- if (mAggregationType != ValueMetric::AVG) {
- info.value = slice.second.value;
- } else {
- double sum = slice.second.value.type == LONG
- ? (double)slice.second.value.long_value
- : slice.second.value.double_value;
- info.value.setDouble(sum / slice.second.sampleSize);
- }
- // it will auto create new vector of ValuebucketInfo if the key is not found.
+ }
+ // it will auto create new vector of ValuebucketInfo if the key is not found.
+ if (bucket.valueIndex.size() > 0) {
auto& bucketList = mPastBuckets[slice.first];
- bucketList.push_back(info);
+ bucketList.push_back(bucket);
}
}
} else {
- mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
+ mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
}
if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker.
@@ -590,7 +619,7 @@
if (mCurrentFullBucket.size() > 0) {
for (const auto& slice : mCurrentSlicedBucket) {
// TODO: fix this when anomaly can accept double values
- mCurrentFullBucket[slice.first] += slice.second.value.long_value;
+ mCurrentFullBucket[slice.first] += slice.second[0].value.long_value;
}
for (const auto& slice : mCurrentFullBucket) {
for (auto& tracker : mAnomalyTrackers) {
@@ -606,7 +635,7 @@
for (auto& tracker : mAnomalyTrackers) {
if (tracker != nullptr) {
// TODO: fix this when anomaly can accept double values
- tracker->addPastBucket(slice.first, slice.second.value.long_value,
+ tracker->addPastBucket(slice.first, slice.second[0].value.long_value,
mCurrentBucketNum);
}
}
@@ -616,14 +645,16 @@
// Accumulate partial bucket.
for (const auto& slice : mCurrentSlicedBucket) {
// TODO: fix this when anomaly can accept double values
- mCurrentFullBucket[slice.first] += slice.second.value.long_value;
+ mCurrentFullBucket[slice.first] += slice.second[0].value.long_value;
}
}
// Reset counters
for (auto& slice : mCurrentSlicedBucket) {
- slice.second.hasValue = false;
- slice.second.sampleSize = 0;
+ for (auto& interval : slice.second) {
+ interval.hasValue = false;
+ interval.sampleSize = 0;
+ }
}
}
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 3416afe..c682a66 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -34,7 +34,8 @@
struct ValueBucket {
int64_t mBucketStartNs;
int64_t mBucketEndNs;
- Value value;
+ std::vector<int> valueIndex;
+ std::vector<Value> values;
};
class ValueMetricProducer : public virtual MetricProducer, public virtual PullDataReceiver {
@@ -97,7 +98,8 @@
sp<StatsPullerManager> mPullerManager;
- const FieldMatcher mValueField;
+ // Value fields for matching.
+ std::vector<Matcher> mFieldMatchers;
// tagId for pulled data. -1 if this is not pulled
const int mPullTagId;
@@ -105,10 +107,10 @@
// if this is pulled metric
const bool mIsPulled;
- int mField;
-
- // internal state of a bucket.
+ // internal state of an ongoing aggregation bucket.
typedef struct {
+ // Index in multi value aggregation.
+ int valueIndex;
// Holds current base value of the dimension. Take diff and update if necessary.
Value base;
// Whether there is a base to diff to.
@@ -122,7 +124,7 @@
bool hasValue;
} Interval;
- std::unordered_map<MetricDimensionKey, Interval> mCurrentSlicedBucket;
+ std::unordered_map<MetricDimensionKey, std::vector<Interval>> mCurrentSlicedBucket;
std::unordered_map<MetricDimensionKey, int64_t> mCurrentFullBucket;
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index 3b42b31..5d0f3d1 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -108,12 +108,22 @@
optional int64 value = 3 [deprecated = true];
- oneof values {
- int64 value_long = 7;
+ oneof single_value {
+ int64 value_long = 7 [deprecated = true];
- double value_double = 8;
+ double value_double = 8 [deprecated = true];
}
+ message Value {
+ optional int32 index = 1;
+ oneof value {
+ int64 value_long = 2;
+ double value_double = 3;
+ }
+ }
+
+ repeated Value values = 9;
+
optional int64 bucket_num = 4;
optional int64 start_bucket_elapsed_millis = 5;