blob: a861a3b76868e10bf0c2d45b95efb9639893888c [file] [log] [blame]
Yao Chena80e5c02018-09-04 13:55:29 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Yi Yangbdee4862019-03-18 14:53:32 -070016#define DEBUG false // STOPSHIP if true
Yao Chena80e5c02018-09-04 13:55:29 -070017#include "Log.h"
18
19#include "ShellSubscriber.h"
20
Yao Chen41e606c2018-10-05 15:54:11 -070021#include "matchers/matcher_util.h"
22#include "stats_log_util.h"
Yao Chena80e5c02018-09-04 13:55:29 -070023
24using android::util::ProtoOutputStream;
25
26namespace android {
27namespace os {
28namespace statsd {
29
Yao Chen41e606c2018-10-05 15:54:11 -070030const static int FIELD_ID_ATOM = 1;
31
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080032void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
Yao Chena80e5c02018-09-04 13:55:29 -070033 VLOG("start new shell subscription");
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080034 int64_t subscriberId = getElapsedRealtimeNs();
35
Yao Chena80e5c02018-09-04 13:55:29 -070036 {
37 std::lock_guard<std::mutex> lock(mMutex);
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080038 if (mSubscriberId> 0) {
Yao Chena80e5c02018-09-04 13:55:29 -070039 VLOG("Only one shell subscriber is allowed.");
40 return;
41 }
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080042 mSubscriberId = subscriberId;
Yao Chena80e5c02018-09-04 13:55:29 -070043 mInput = in;
44 mOutput = out;
Yao Chena80e5c02018-09-04 13:55:29 -070045 }
46
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080047 bool success = readConfig();
48 if (!success) {
49 std::lock_guard<std::mutex> lock(mMutex);
50 cleanUpLocked();
51 }
Yao Chena80e5c02018-09-04 13:55:29 -070052
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080053 VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec);
Yao Chena80e5c02018-09-04 13:55:29 -070054 std::unique_lock<std::mutex> lk(mMutex);
Yao Chen35cb8d62019-01-03 16:49:14 -080055
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080056 // Note that the following is blocking, and it's intended as we cannot return until the shell
57 // cmd exits or we time out.
Yao Chen35cb8d62019-01-03 16:49:14 -080058 if (timeoutSec > 0) {
59 mShellDied.wait_for(lk, timeoutSec * 1s,
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080060 [this, subscriberId] { return mSubscriberId != subscriberId; });
Yao Chen35cb8d62019-01-03 16:49:14 -080061 } else {
Ruchir Rastogie449b0c2020-02-10 17:40:09 -080062 mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; });
63 }
64}
65
66
67// Read configs until EOF is reached. There may be multiple configs in the input
68// -- each new config should replace the previous one.
69//
70// Returns a boolean indicating whether the input was read successfully.
71bool ShellSubscriber::readConfig() {
72 if (mInput < 0) {
73 return false;
74 }
75
76 while (true) {
77 // Read the size of the config.
78 size_t bufferSize = 0;
79 ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize));
80 if (bytesRead == 0) {
81 VLOG("We have reached the end of the input.");
82 return true;
83 } else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) {
84 ALOGE("Error reading config size");
85 return false;
86 }
87
88 // Read and parse the config.
89 vector<uint8_t> buffer(bufferSize);
90 bytesRead = read(mInput, buffer.data(), bufferSize);
91 if (bytesRead > 0 && (size_t)bytesRead == bufferSize) {
92 ShellSubscription config;
93 if (config.ParseFromArray(buffer.data(), bufferSize)) {
94 updateConfig(config);
95 } else {
96 ALOGE("Error parsing the config");
97 return false;
98 }
99 } else {
100 VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize,
101 bytesRead);
102 return false;
103 }
Yao Chen35cb8d62019-01-03 16:49:14 -0800104 }
Yao Chena80e5c02018-09-04 13:55:29 -0700105}
106
107void ShellSubscriber::updateConfig(const ShellSubscription& config) {
Yao Chena80e5c02018-09-04 13:55:29 -0700108 mPushedMatchers.clear();
Yao Chen41e606c2018-10-05 15:54:11 -0700109 mPulledInfo.clear();
110
Yao Chena80e5c02018-09-04 13:55:29 -0700111 for (const auto& pushed : config.pushed()) {
112 mPushedMatchers.push_back(pushed);
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800113 VLOG("adding matcher for pushed atom %d", pushed.atom_id());
Yao Chena80e5c02018-09-04 13:55:29 -0700114 }
Yao Chen41e606c2018-10-05 15:54:11 -0700115
116 int64_t token = getElapsedRealtimeNs();
117 mPullToken = token;
118
119 int64_t minInterval = -1;
120 for (const auto& pulled : config.pulled()) {
121 // All intervals need to be multiples of the min interval.
122 if (minInterval < 0 || pulled.freq_millis() < minInterval) {
123 minInterval = pulled.freq_millis();
124 }
125
126 mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
127 VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
128 }
129
130 if (mPulledInfo.size() > 0 && minInterval > 0) {
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800131 // This thread is guaranteed to terminate after it detects the token is
132 // different.
Yao Chen41e606c2018-10-05 15:54:11 -0700133 std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
134 puller.detach();
135 }
136}
137
Yao Chen41e606c2018-10-05 15:54:11 -0700138void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800139 while (true) {
Yao Chen41e606c2018-10-05 15:54:11 -0700140 int64_t nowMillis = getElapsedRealtimeMillis();
141 {
142 std::lock_guard<std::mutex> lock(mMutex);
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800143 // If the token has changed, the config has changed, so this
144 // puller can now stop.
Yao Chen41e606c2018-10-05 15:54:11 -0700145 if (mPulledInfo.size() == 0 || mPullToken != token) {
146 VLOG("Pulling thread %lld done!", (long long)token);
147 return;
148 }
149 for (auto& pullInfo : mPulledInfo) {
150 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
151 VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
152
153 vector<std::shared_ptr<LogEvent>> data;
Chenjie Yu0bd73db2018-12-16 07:37:04 -0800154 mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
Yao Chen41e606c2018-10-05 15:54:11 -0700155 VLOG("pulled %zu atoms", data.size());
156 if (data.size() > 0) {
157 writeToOutputLocked(data, pullInfo.mPullerMatcher);
158 }
159 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
160 }
161 }
162 }
163 VLOG("Pulling thread %lld sleep....", (long long)token);
164 std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
165 }
Yao Chena80e5c02018-09-04 13:55:29 -0700166}
167
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800168// Must be called with the lock acquired, so that mProto isn't being written to
169// at the same time by multiple threads.
170void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
171 const SimpleAtomMatcher& matcher) {
172 if (mOutput < 0) {
Yao Chena80e5c02018-09-04 13:55:29 -0700173 return;
174 }
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800175 int count = 0;
176 mProto.clear();
177 for (const auto& event : data) {
178 VLOG("%s", event->ToString().c_str());
179 if (matchesSimple(*mUidMap, matcher, *event)) {
180 VLOG("matched");
181 count++;
182 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
183 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
184 event->ToProto(mProto);
185 mProto.end(atomToken);
Yao Chena80e5c02018-09-04 13:55:29 -0700186 }
187 }
Yao Chena80e5c02018-09-04 13:55:29 -0700188
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800189 if (count > 0) {
190 // First write the payload size.
191 size_t bufferSize = mProto.size();
192 write(mOutput, &bufferSize, sizeof(bufferSize));
193
194 VLOG("%d atoms, proto size: %zu", count, bufferSize);
195 // Then write the payload.
196 mProto.flush(mOutput);
197 }
Yao Chena80e5c02018-09-04 13:55:29 -0700198}
199
200void ShellSubscriber::onLogEvent(const LogEvent& event) {
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800201 // Acquire a lock to prevent corruption from multiple threads writing to
202 // mProto.
Yao Chena80e5c02018-09-04 13:55:29 -0700203 std::lock_guard<std::mutex> lock(mMutex);
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800204 if (mOutput < 0) {
Yao Chena80e5c02018-09-04 13:55:29 -0700205 return;
206 }
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800207
208 mProto.clear();
Yao Chena80e5c02018-09-04 13:55:29 -0700209 for (const auto& matcher : mPushedMatchers) {
210 if (matchesSimple(*mUidMap, matcher, event)) {
Yao Chen41e606c2018-10-05 15:54:11 -0700211 VLOG("%s", event.ToString().c_str());
212 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
213 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
Yao Chen398dd192018-10-01 14:49:03 -0700214 event.ToProto(mProto);
Yao Chen41e606c2018-10-05 15:54:11 -0700215 mProto.end(atomToken);
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800216
Yao Chena80e5c02018-09-04 13:55:29 -0700217 // First write the payload size.
218 size_t bufferSize = mProto.size();
219 write(mOutput, &bufferSize, sizeof(bufferSize));
220
221 // Then write the payload.
Yao Chena80e5c02018-09-04 13:55:29 -0700222 mProto.flush(mOutput);
Yao Chena80e5c02018-09-04 13:55:29 -0700223 }
224 }
225}
226
Ruchir Rastogie449b0c2020-02-10 17:40:09 -0800227void ShellSubscriber::cleanUpLocked() {
228 // The file descriptors will be closed by binder.
229 mInput = -1;
230 mOutput = -1;
231 mSubscriberId = 0;
232 mPushedMatchers.clear();
233 mPulledInfo.clear();
234 // Setting mPullToken == 0 tells pull thread that its work is done.
235 mPullToken = 0;
236 VLOG("done clean up");
Yao Chena80e5c02018-09-04 13:55:29 -0700237}
238
239} // namespace statsd
240} // namespace os
Yao Chen398dd192018-10-01 14:49:03 -0700241} // namespace android