blob: fd883c29dba04f13d1681c5a14b9da1f92f4846e [file] [log] [blame]
Yao Chena80e5c02018-09-04 13:55:29 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Yi Yangbdee4862019-03-18 14:53:32 -070016#define DEBUG false // STOPSHIP if true
Yao Chena80e5c02018-09-04 13:55:29 -070017#include "Log.h"
18
19#include "ShellSubscriber.h"
20
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -080021#include <android-base/file.h>
Ruchir Rastogi1e240512020-04-22 09:03:22 -070022
Yao Chen41e606c2018-10-05 15:54:11 -070023#include "matchers/matcher_util.h"
24#include "stats_log_util.h"
Yao Chena80e5c02018-09-04 13:55:29 -070025
26using android::util::ProtoOutputStream;
27
28namespace android {
29namespace os {
30namespace statsd {
31
Yao Chen41e606c2018-10-05 15:54:11 -070032const static int FIELD_ID_ATOM = 1;
33
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080034void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -080035 int myToken = claimToken();
Ruchir Rastogi1e240512020-04-22 09:03:22 -070036 VLOG("ShellSubscriber: new subscription %d has come in", myToken);
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -080037 mSubscriptionShouldEnd.notify_one();
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080038
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -080039 shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
Ruchir Rastogi1e240512020-04-22 09:03:22 -070040 if (!readConfig(mySubscriptionInfo)) return;
Yao Chena80e5c02018-09-04 13:55:29 -070041
Ruchir Rastogi1e240512020-04-22 09:03:22 -070042 {
43 std::unique_lock<std::mutex> lock(mMutex);
Ruchir Rastogi1e240512020-04-22 09:03:22 -070044 mSubscriptionInfo = mySubscriptionInfo;
Ruchir Rastogi835e75c2020-05-15 12:57:15 -070045 spawnHelperThread(myToken);
Ruchir Rastogi1e240512020-04-22 09:03:22 -070046 waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);
47
48 if (mSubscriptionInfo == mySubscriptionInfo) {
49 mSubscriptionInfo = nullptr;
50 }
51
52 }
53}
54
Ruchir Rastogi835e75c2020-05-15 12:57:15 -070055void ShellSubscriber::spawnHelperThread(int myToken) {
56 std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); });
57 t.detach();
Ruchir Rastogi1e240512020-04-22 09:03:22 -070058}
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -080059
Ruchir Rastogi1e240512020-04-22 09:03:22 -070060void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
61 int myToken,
62 std::unique_lock<std::mutex>& lock,
63 int timeoutSec) {
64 if (timeoutSec > 0) {
65 mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] {
66 return mToken != myToken || !myInfo->mClientAlive;
67 });
68 } else {
69 mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] {
70 return mToken != myToken || !myInfo->mClientAlive;
71 });
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080072 }
73}
74
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -080075// Atomically claim the next token. Token numbers denote subscriber ordering.
76int ShellSubscriber::claimToken() {
77 std::unique_lock<std::mutex> lock(mMutex);
78 int myToken = ++mToken;
79 return myToken;
80}
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080081
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -080082// Read and parse single config. There should only one config per input.
83bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) {
84 // Read the size of the config.
85 size_t bufferSize;
86 if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) {
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080087 return false;
88 }
89
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -080090 // Read the config.
91 vector<uint8_t> buffer(bufferSize);
92 if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) {
93 return false;
Yao Chen35cb8d62019-01-03 16:49:14 -080094 }
Yao Chena80e5c02018-09-04 13:55:29 -070095
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -080096 // Parse the config.
97 ShellSubscription config;
98 if (!config.ParseFromArray(buffer.data(), bufferSize)) {
99 return false;
100 }
Yao Chen41e606c2018-10-05 15:54:11 -0700101
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -0800102 // Update SubscriptionInfo with state from config
Yao Chena80e5c02018-09-04 13:55:29 -0700103 for (const auto& pushed : config.pushed()) {
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -0800104 subscriptionInfo->mPushedMatchers.push_back(pushed);
Yao Chena80e5c02018-09-04 13:55:29 -0700105 }
Yao Chen41e606c2018-10-05 15:54:11 -0700106
Yao Chen41e606c2018-10-05 15:54:11 -0700107 for (const auto& pulled : config.pulled()) {
Tej Singh3be093b2020-03-04 20:08:38 -0800108 vector<string> packages;
109 vector<int32_t> uids;
110 for (const string& pkg : pulled.packages()) {
111 auto it = UidMap::sAidToUidMapping.find(pkg);
112 if (it != UidMap::sAidToUidMapping.end()) {
113 uids.push_back(it->second);
114 } else {
115 packages.push_back(pkg);
116 }
117 }
118
119 subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages,
120 uids);
121 VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
Yao Chen41e606c2018-10-05 15:54:11 -0700122 }
123
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -0800124 return true;
Yao Chen41e606c2018-10-05 15:54:11 -0700125}
126
Ruchir Rastogi835e75c2020-05-15 12:57:15 -0700127void ShellSubscriber::pullAndSendHeartbeats(int myToken) {
128 VLOG("ShellSubscriber: helper thread %d starting", myToken);
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800129 while (true) {
Ruchir Rastogi835e75c2020-05-15 12:57:15 -0700130 int64_t sleepTimeMs = INT_MAX;
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700131 {
132 std::lock_guard<std::mutex> lock(mMutex);
133 if (!mSubscriptionInfo || mToken != myToken) {
Ruchir Rastogi835e75c2020-05-15 12:57:15 -0700134 VLOG("ShellSubscriber: helper thread %d done!", myToken);
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700135 return;
136 }
Yao Chen41e606c2018-10-05 15:54:11 -0700137
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700138 int64_t nowMillis = getElapsedRealtimeMillis();
Tej Singh7b975a82020-05-11 11:05:08 -0700139 int64_t nowNanos = getElapsedRealtimeNs();
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700140 for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
141 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
142 continue;
143 }
144
Tej Singh3be093b2020-03-04 20:08:38 -0800145 vector<int32_t> uids;
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700146 getUidsForPullAtom(&uids, pullInfo);
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -0800147
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700148 vector<std::shared_ptr<LogEvent>> data;
Tej Singh7b975a82020-05-11 11:05:08 -0700149 mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700150 VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
151 writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
152
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -0800153 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
Yao Chen41e606c2018-10-05 15:54:11 -0700154 }
Ruchir Rastogi835e75c2020-05-15 12:57:15 -0700155
156 // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received
157 // data from statsd. When it receives the data size of 0, perfd will not expect any
158 // atoms and recheck whether the subscription should end.
159 if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) {
160 attemptWriteToPipeLocked(/*dataSize=*/0);
161 }
162
163 // Determine how long to sleep before doing more work.
164 for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
165 int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval;
166 int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative
167 if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull;
168 }
169 int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis;
170 if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat;
Yao Chen41e606c2018-10-05 15:54:11 -0700171 }
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -0800172
Ruchir Rastogi835e75c2020-05-15 12:57:15 -0700173 VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken,
174 (long long)sleepTimeMs);
175 std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs));
Yao Chen41e606c2018-10-05 15:54:11 -0700176 }
Yao Chena80e5c02018-09-04 13:55:29 -0700177}
178
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700179void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
180 uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
181 // This is slow. Consider storing the uids per app and listening to uidmap updates.
182 for (const string& pkg : pullInfo.mPullPackages) {
183 set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
184 uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
185 }
186 uids->push_back(DEFAULT_PULL_UID);
187}
188
189void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -0800190 const SimpleAtomMatcher& matcher) {
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800191 mProto.clear();
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -0800192 int count = 0;
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800193 for (const auto& event : data) {
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800194 if (matchesSimple(*mUidMap, matcher, *event)) {
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800195 count++;
196 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
197 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
198 event->ToProto(mProto);
199 mProto.end(atomToken);
Yao Chena80e5c02018-09-04 13:55:29 -0700200 }
201 }
Yao Chena80e5c02018-09-04 13:55:29 -0700202
Ruchir Rastogi835e75c2020-05-15 12:57:15 -0700203 if (count > 0) attemptWriteToPipeLocked(mProto.size());
Yao Chena80e5c02018-09-04 13:55:29 -0700204}
205
206void ShellSubscriber::onLogEvent(const LogEvent& event) {
207 std::lock_guard<std::mutex> lock(mMutex);
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700208 if (!mSubscriptionInfo) return;
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800209
210 mProto.clear();
Ruchir Rastogia5e4bb52020-03-04 17:11:58 -0800211 for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
Yao Chena80e5c02018-09-04 13:55:29 -0700212 if (matchesSimple(*mUidMap, matcher, event)) {
Yao Chen41e606c2018-10-05 15:54:11 -0700213 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
214 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
Yao Chen398dd192018-10-01 14:49:03 -0700215 event.ToProto(mProto);
Yao Chen41e606c2018-10-05 15:54:11 -0700216 mProto.end(atomToken);
Ruchir Rastogi835e75c2020-05-15 12:57:15 -0700217 attemptWriteToPipeLocked(mProto.size());
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700218 }
219 }
220}
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800221
Ruchir Rastogi835e75c2020-05-15 12:57:15 -0700222// Tries to write the atom encoded in mProto to the pipe. If the write fails
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700223// because the read end of the pipe has closed, signals to other threads that
224// the subscription should end.
Ruchir Rastogi835e75c2020-05-15 12:57:15 -0700225void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) {
226 // First, write the payload size.
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700227 if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
228 mSubscriptionInfo->mClientAlive = false;
229 mSubscriptionShouldEnd.notify_one();
230 return;
231 }
232
Ruchir Rastogi835e75c2020-05-15 12:57:15 -0700233 // Then, write the payload if this is not just a heartbeat.
234 if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) {
Ruchir Rastogi1e240512020-04-22 09:03:22 -0700235 mSubscriptionInfo->mClientAlive = false;
236 mSubscriptionShouldEnd.notify_one();
237 return;
238 }
239
240 mLastWriteMs = getElapsedRealtimeMillis();
241}
242
Yao Chena80e5c02018-09-04 13:55:29 -0700243} // namespace statsd
244} // namespace os
Yao Chen398dd192018-10-01 14:49:03 -0700245} // namespace android