blob: be030d856aee4e4b91924c071e074a846adef35f [file] [log] [blame]
Yangster1d4d6862017-10-31 12:58:51 -07001/*
2* Copyright (C) 2017 The Android Open Source Project
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*/
16
17#define DEBUG true // STOPSHIP if true
18#include "Log.h"
19
20#include "GaugeMetricProducer.h"
21#include "stats_util.h"
22
23#include <cutils/log.h>
24#include <limits.h>
25#include <stdlib.h>
26
yrob0378b02017-11-09 20:36:25 -080027using android::util::FIELD_COUNT_REPEATED;
yro2b0f8862017-11-06 14:27:31 -080028using android::util::FIELD_TYPE_BOOL;
29using android::util::FIELD_TYPE_FLOAT;
30using android::util::FIELD_TYPE_INT32;
31using android::util::FIELD_TYPE_INT64;
32using android::util::FIELD_TYPE_MESSAGE;
Yangster-macd1815dc2017-11-13 21:43:15 -080033using android::util::FIELD_TYPE_STRING;
yro2b0f8862017-11-06 14:27:31 -080034using android::util::ProtoOutputStream;
Yangster1d4d6862017-10-31 12:58:51 -070035using std::map;
36using std::string;
37using std::unordered_map;
38using std::vector;
39
40namespace android {
41namespace os {
42namespace statsd {
43
yro2b0f8862017-11-06 14:27:31 -080044// for StatsLogReport
Yangster-macd1815dc2017-11-13 21:43:15 -080045const int FIELD_ID_NAME = 1;
yro2b0f8862017-11-06 14:27:31 -080046const int FIELD_ID_START_REPORT_NANOS = 2;
47const int FIELD_ID_END_REPORT_NANOS = 3;
48const int FIELD_ID_GAUGE_METRICS = 8;
49// for GaugeMetricDataWrapper
50const int FIELD_ID_DATA = 1;
51// for GaugeMetricData
52const int FIELD_ID_DIMENSION = 1;
53const int FIELD_ID_BUCKET_INFO = 2;
54// for KeyValuePair
55const int FIELD_ID_KEY = 1;
56const int FIELD_ID_VALUE_STR = 2;
57const int FIELD_ID_VALUE_INT = 3;
58const int FIELD_ID_VALUE_BOOL = 4;
59const int FIELD_ID_VALUE_FLOAT = 5;
60// for GaugeBucketInfo
61const int FIELD_ID_START_BUCKET_NANOS = 1;
62const int FIELD_ID_END_BUCKET_NANOS = 2;
63const int FIELD_ID_GAUGE = 3;
64
Yangster1d4d6862017-10-31 12:58:51 -070065GaugeMetricProducer::GaugeMetricProducer(const GaugeMetric& metric, const int conditionIndex,
Yangster-mace2cd6d52017-11-09 20:38:30 -080066 const sp<ConditionWizard>& wizard, const int pullTagId,
67 const int64_t startTimeNs)
68 : MetricProducer(startTimeNs, conditionIndex, wizard),
Yangster1d4d6862017-10-31 12:58:51 -070069 mMetric(metric),
70 mPullTagId(pullTagId) {
71 if (metric.has_bucket() && metric.bucket().has_bucket_size_millis()) {
72 mBucketSizeNs = metric.bucket().bucket_size_millis() * 1000 * 1000;
73 } else {
74 mBucketSizeNs = kDefaultGaugemBucketSizeNs;
75 }
76
77 // TODO: use UidMap if uid->pkg_name is required
78 mDimension.insert(mDimension.begin(), metric.dimension().begin(), metric.dimension().end());
79
80 if (metric.links().size() > 0) {
81 mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(),
82 metric.links().end());
83 mConditionSliced = true;
84 }
85
86 // Kicks off the puller immediately.
87 if (mPullTagId != -1) {
88 mStatsPullerManager.RegisterReceiver(mPullTagId, this,
89 metric.bucket().bucket_size_millis());
90 }
91
yro2b0f8862017-11-06 14:27:31 -080092 startNewProtoOutputStream(mStartTimeNs);
93
Yangster-macd1815dc2017-11-13 21:43:15 -080094 VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
Yangster1d4d6862017-10-31 12:58:51 -070095 (long long)mBucketSizeNs, (long long)mStartTimeNs);
96}
97
98GaugeMetricProducer::~GaugeMetricProducer() {
99 VLOG("~GaugeMetricProducer() called");
100}
101
yro2b0f8862017-11-06 14:27:31 -0800102void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) {
103 mProto = std::make_unique<ProtoOutputStream>();
Yangster-macd1815dc2017-11-13 21:43:15 -0800104 mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
yro2b0f8862017-11-06 14:27:31 -0800105 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
106 mProtoToken = mProto->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
Yangster1d4d6862017-10-31 12:58:51 -0700107}
108
yro2b0f8862017-11-06 14:27:31 -0800109void GaugeMetricProducer::finish() {
Yangster1d4d6862017-10-31 12:58:51 -0700110}
111
yro17adac92017-11-08 23:16:29 -0800112std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
Yangster-macd1815dc2017-11-13 21:43:15 -0800113 VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
Yangster1d4d6862017-10-31 12:58:51 -0700114
Yangster1d4d6862017-10-31 12:58:51 -0700115 // Dump current bucket if it's stale.
116 // If current bucket is still on-going, don't force dump current bucket.
117 // In finish(), We can force dump current bucket.
Yangster-mace2cd6d52017-11-09 20:38:30 -0800118 flushIfNeeded(time(nullptr) * NS_PER_SEC);
Yangster1d4d6862017-10-31 12:58:51 -0700119
120 for (const auto& pair : mPastBuckets) {
121 const HashableDimensionKey& hashableKey = pair.first;
122 auto it = mDimensionKeyMap.find(hashableKey);
123 if (it == mDimensionKeyMap.end()) {
124 ALOGE("Dimension key %s not found?!?! skip...", hashableKey.c_str());
125 continue;
126 }
127
128 VLOG(" dimension key %s", hashableKey.c_str());
yrob0378b02017-11-09 20:36:25 -0800129 long long wrapperToken =
130 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DATA);
yro2b0f8862017-11-06 14:27:31 -0800131
132 // First fill dimension (KeyValuePairs).
133 for (const auto& kv : it->second) {
yrob0378b02017-11-09 20:36:25 -0800134 long long dimensionToken =
135 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_DIMENSION);
yro2b0f8862017-11-06 14:27:31 -0800136 mProto->write(FIELD_TYPE_INT32 | FIELD_ID_KEY, kv.key());
137 if (kv.has_value_str()) {
Yao Chen1ff4f432017-11-16 17:01:40 -0800138 mProto->write(FIELD_TYPE_STRING | FIELD_ID_VALUE_STR, kv.value_str());
yro2b0f8862017-11-06 14:27:31 -0800139 } else if (kv.has_value_int()) {
140 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_INT, kv.value_int());
141 } else if (kv.has_value_bool()) {
142 mProto->write(FIELD_TYPE_BOOL | FIELD_ID_VALUE_BOOL, kv.value_bool());
143 } else if (kv.has_value_float()) {
144 mProto->write(FIELD_TYPE_FLOAT | FIELD_ID_VALUE_FLOAT, kv.value_float());
145 }
146 mProto->end(dimensionToken);
147 }
148
149 // Then fill bucket_info (GaugeBucketInfo).
150 for (const auto& bucket : pair.second) {
yrob0378b02017-11-09 20:36:25 -0800151 long long bucketInfoToken =
152 mProto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_BUCKET_INFO);
yro2b0f8862017-11-06 14:27:31 -0800153 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_BUCKET_NANOS,
154 (long long)bucket.mBucketStartNs);
155 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
156 (long long)bucket.mBucketEndNs);
157 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge);
158 mProto->end(bucketInfoToken);
159 VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
160 (long long)bucket.mBucketEndNs, (long long)bucket.mGauge);
161 }
162 mProto->end(wrapperToken);
Yangster1d4d6862017-10-31 12:58:51 -0700163 }
yro2b0f8862017-11-06 14:27:31 -0800164 mProto->end(mProtoToken);
165 mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
166 (long long)mCurrentBucketStartTimeNs);
167
yro17adac92017-11-08 23:16:29 -0800168 std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
yro2b0f8862017-11-06 14:27:31 -0800169
170 startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
171 mPastBuckets.clear();
yro2b0f8862017-11-06 14:27:31 -0800172
yro17adac92017-11-08 23:16:29 -0800173 return buffer;
yro2b0f8862017-11-06 14:27:31 -0800174
175 // TODO: Clear mDimensionKeyMap once the report is dumped.
Yangster1d4d6862017-10-31 12:58:51 -0700176}
177
178void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
179 AutoMutex _l(mLock);
Yangster-macd1815dc2017-11-13 21:43:15 -0800180 VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
Yangster-mace2cd6d52017-11-09 20:38:30 -0800181 flushIfNeeded(eventTime);
Yangster1d4d6862017-10-31 12:58:51 -0700182 mCondition = conditionMet;
183
Yangster-mace2cd6d52017-11-09 20:38:30 -0800184 // Push mode. No need to proactively pull the gauge data.
Yangster1d4d6862017-10-31 12:58:51 -0700185 if (mPullTagId == -1) {
186 return;
187 }
Yangster-mace2cd6d52017-11-09 20:38:30 -0800188 if (!mCondition) {
189 return;
190 }
191 // Already have gauge metric for the current bucket, do not do it again.
192 if (mCurrentSlicedBucket->size() > 0) {
Yangster1d4d6862017-10-31 12:58:51 -0700193 return;
194 }
195 vector<std::shared_ptr<LogEvent>> allData;
196 if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
197 ALOGE("Stats puller failed for tag: %d", mPullTagId);
198 return;
199 }
200 for (const auto& data : allData) {
201 onMatchedLogEvent(0, *data, false /*scheduledPull*/);
202 }
Yangster-mace2cd6d52017-11-09 20:38:30 -0800203 flushIfNeeded(eventTime);
Yangster1d4d6862017-10-31 12:58:51 -0700204}
205
206void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
Yangster-macd1815dc2017-11-13 21:43:15 -0800207 VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
Yangster1d4d6862017-10-31 12:58:51 -0700208}
209
Yangster-mace2cd6d52017-11-09 20:38:30 -0800210int64_t GaugeMetricProducer::getGauge(const LogEvent& event) {
Yangster1d4d6862017-10-31 12:58:51 -0700211 status_t err = NO_ERROR;
Yangster-mace2cd6d52017-11-09 20:38:30 -0800212 int64_t val = event.GetLong(mMetric.gauge_field(), &err);
Yangster1d4d6862017-10-31 12:58:51 -0700213 if (err == NO_ERROR) {
214 return val;
215 } else {
216 VLOG("Can't find value in message.");
217 return -1;
218 }
219}
220
221void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
222 AutoMutex mutex(mLock);
Yangster1d4d6862017-10-31 12:58:51 -0700223 for (const auto& data : allData) {
224 onMatchedLogEvent(0, *data, true /*scheduledPull*/);
225 }
Yangster1d4d6862017-10-31 12:58:51 -0700226}
227
228void GaugeMetricProducer::onMatchedLogEventInternal(
229 const size_t matcherIndex, const HashableDimensionKey& eventKey,
230 const map<string, HashableDimensionKey>& conditionKey, bool condition,
231 const LogEvent& event, bool scheduledPull) {
232 if (condition == false) {
233 return;
234 }
235 uint64_t eventTimeNs = event.GetTimestampNs();
236 if (eventTimeNs < mCurrentBucketStartTimeNs) {
237 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
238 (long long)mCurrentBucketStartTimeNs);
239 return;
240 }
241
Yangster-mace2cd6d52017-11-09 20:38:30 -0800242 // When the event happens in a new bucket, flush the old buckets.
243 if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) {
244 flushIfNeeded(eventTimeNs);
245 }
246
247 // For gauge metric, we just simply use the first guage in the given bucket.
248 if (!mCurrentSlicedBucket->empty()) {
Yangster1d4d6862017-10-31 12:58:51 -0700249 return;
250 }
Yangster-mace2cd6d52017-11-09 20:38:30 -0800251 const long gauge = getGauge(event);
252 if (gauge >= 0) {
253 (*mCurrentSlicedBucket)[eventKey] = gauge;
254 }
255 for (auto& tracker : mAnomalyTrackers) {
256 tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge);
Yangster1d4d6862017-10-31 12:58:51 -0700257 }
258}
259
260// When a new matched event comes in, we check if event falls into the current
261// bucket. If not, flush the old counter to past buckets and initialize the new
262// bucket.
263// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
264// the GaugeMetricProducer while holding the lock.
Yangster-mace2cd6d52017-11-09 20:38:30 -0800265void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
266 if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
Yangster1d4d6862017-10-31 12:58:51 -0700267 return;
268 }
269
yro2b0f8862017-11-06 14:27:31 -0800270 GaugeBucket info;
271 info.mBucketStartNs = mCurrentBucketStartTimeNs;
272 info.mBucketEndNs = mCurrentBucketStartTimeNs + mBucketSizeNs;
Yangster-mace2cd6d52017-11-09 20:38:30 -0800273 info.mBucketNum = mCurrentBucketNum;
Yangster1d4d6862017-10-31 12:58:51 -0700274
Yangster-mace2cd6d52017-11-09 20:38:30 -0800275 for (const auto& slice : *mCurrentSlicedBucket) {
yro2b0f8862017-11-06 14:27:31 -0800276 info.mGauge = slice.second;
Yangster1d4d6862017-10-31 12:58:51 -0700277 auto& bucketList = mPastBuckets[slice.first];
278 bucketList.push_back(info);
Yangster-mace2cd6d52017-11-09 20:38:30 -0800279 VLOG("gauge metric %s, dump key value: %s -> %lld", mMetric.name().c_str(),
280 slice.first.c_str(), (long long)slice.second);
Yangster1d4d6862017-10-31 12:58:51 -0700281 }
Yangster1d4d6862017-10-31 12:58:51 -0700282
Yangster-mace2cd6d52017-11-09 20:38:30 -0800283 // Reset counters
284 for (auto& tracker : mAnomalyTrackers) {
285 tracker->addPastBucket(mCurrentSlicedBucket, mCurrentBucketNum);
286 }
287
288 mCurrentSlicedBucket = std::make_shared<DimToValMap>();
289
290 // Adjusts the bucket start time
291 int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
Yangster1d4d6862017-10-31 12:58:51 -0700292 mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs;
Yangster-mace2cd6d52017-11-09 20:38:30 -0800293 mCurrentBucketNum += numBucketsForward;
Yangster-macd1815dc2017-11-13 21:43:15 -0800294 VLOG("metric %s: new bucket start time: %lld", mMetric.name().c_str(),
Yangster1d4d6862017-10-31 12:58:51 -0700295 (long long)mCurrentBucketStartTimeNs);
296}
297
Yangster7c334a12017-11-22 14:24:24 -0800298size_t GaugeMetricProducer::byteSize() const {
Yangster-mace2cd6d52017-11-09 20:38:30 -0800299 size_t totalSize = 0;
300 for (const auto& pair : mPastBuckets) {
301 totalSize += pair.second.size() * kBucketSize;
302 }
303 return totalSize;
yro2b0f8862017-11-06 14:27:31 -0800304}
305
Yangster1d4d6862017-10-31 12:58:51 -0700306} // namespace statsd
307} // namespace os
308} // namespace android