blob: 22883f3c205a0c1b18f1f18ba5478c9efa787a32 [file] [log] [blame]
Yao Chena80e5c02018-09-04 13:55:29 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#define DEBUG true // STOPSHIP if true
17#include "Log.h"
18
19#include "ShellSubscriber.h"
20
Yao Chena80e5c02018-09-04 13:55:29 -070021#include <android-base/file.h>
Yao Chen41e606c2018-10-05 15:54:11 -070022#include "matchers/matcher_util.h"
23#include "stats_log_util.h"
Yao Chena80e5c02018-09-04 13:55:29 -070024
25using android::util::ProtoOutputStream;
26
27namespace android {
28namespace os {
29namespace statsd {
30
Yao Chen41e606c2018-10-05 15:54:11 -070031const static int FIELD_ID_ATOM = 1;
32
Yao Chena80e5c02018-09-04 13:55:29 -070033void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver) {
34 VLOG("start new shell subscription");
35 {
36 std::lock_guard<std::mutex> lock(mMutex);
37 if (mResultReceiver != nullptr) {
38 VLOG("Only one shell subscriber is allowed.");
39 return;
40 }
41 mInput = in;
42 mOutput = out;
43 mResultReceiver = resultReceiver;
44 IInterface::asBinder(mResultReceiver)->linkToDeath(this);
45 }
46
Yao Chen41e606c2018-10-05 15:54:11 -070047 // Note that the following is blocking, and it's intended as we cannot return until the shell
48 // cmd exits, otherwise all resources & FDs will be automatically closed.
Yao Chena80e5c02018-09-04 13:55:29 -070049
Yao Chen41e606c2018-10-05 15:54:11 -070050 // Read config forever until EOF is reached. Clients may send multiple configs -- each new
51 // config replace the previous one.
52 readConfig(in);
53
54 // Now we have read an EOF we now wait for the semaphore until the client exits.
55 VLOG("Now wait for client to exit");
Yao Chena80e5c02018-09-04 13:55:29 -070056 std::unique_lock<std::mutex> lk(mMutex);
Yao Chena80e5c02018-09-04 13:55:29 -070057 mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
Yao Chena80e5c02018-09-04 13:55:29 -070058}
59
60void ShellSubscriber::updateConfig(const ShellSubscription& config) {
61 std::lock_guard<std::mutex> lock(mMutex);
62 mPushedMatchers.clear();
Yao Chen41e606c2018-10-05 15:54:11 -070063 mPulledInfo.clear();
64
Yao Chena80e5c02018-09-04 13:55:29 -070065 for (const auto& pushed : config.pushed()) {
66 mPushedMatchers.push_back(pushed);
67 VLOG("adding matcher for atom %d", pushed.atom_id());
68 }
Yao Chen41e606c2018-10-05 15:54:11 -070069
70 int64_t token = getElapsedRealtimeNs();
71 mPullToken = token;
72
73 int64_t minInterval = -1;
74 for (const auto& pulled : config.pulled()) {
75 // All intervals need to be multiples of the min interval.
76 if (minInterval < 0 || pulled.freq_millis() < minInterval) {
77 minInterval = pulled.freq_millis();
78 }
79
80 mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
81 VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
82 }
83
84 if (mPulledInfo.size() > 0 && minInterval > 0) {
85 // This thread is guaranteed to terminate after it detects the token is different or
86 // cleaned up.
87 std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
88 puller.detach();
89 }
90}
91
92void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
93 const SimpleAtomMatcher& matcher) {
94 if (mOutput == 0) return;
95 int count = 0;
96 mProto.clear();
97 for (const auto& event : data) {
98 VLOG("%s", event->ToString().c_str());
99 if (matchesSimple(*mUidMap, matcher, *event)) {
100 VLOG("matched");
101 count++;
102 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
103 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
104 event->ToProto(mProto);
105 mProto.end(atomToken);
106 }
107 }
108
109 if (count > 0) {
110 // First write the payload size.
111 size_t bufferSize = mProto.size();
112 write(mOutput, &bufferSize, sizeof(bufferSize));
113 VLOG("%d atoms, proto size: %zu", count, bufferSize);
114 // Then write the payload.
115 mProto.flush(mOutput);
116 }
117 mProto.clear();
118}
119
120void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
121 while (1) {
122 int64_t nowMillis = getElapsedRealtimeMillis();
123 {
124 std::lock_guard<std::mutex> lock(mMutex);
125 if (mPulledInfo.size() == 0 || mPullToken != token) {
126 VLOG("Pulling thread %lld done!", (long long)token);
127 return;
128 }
129 for (auto& pullInfo : mPulledInfo) {
130 if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
131 VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
132
133 vector<std::shared_ptr<LogEvent>> data;
Chenjie Yu0bd73db2018-12-16 07:37:04 -0800134 mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
Yao Chen41e606c2018-10-05 15:54:11 -0700135 VLOG("pulled %zu atoms", data.size());
136 if (data.size() > 0) {
137 writeToOutputLocked(data, pullInfo.mPullerMatcher);
138 }
139 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
140 }
141 }
142 }
143 VLOG("Pulling thread %lld sleep....", (long long)token);
144 std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
145 }
Yao Chena80e5c02018-09-04 13:55:29 -0700146}
147
148void ShellSubscriber::readConfig(int in) {
149 if (in <= 0) {
150 return;
151 }
152
153 while (1) {
154 size_t bufferSize = 0;
155 int result = 0;
156 if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
157 VLOG("Done reading");
158 break;
159 } else if (result < 0 || result != sizeof(bufferSize)) {
160 ALOGE("Error reading config size");
161 break;
162 }
163
164 vector<uint8_t> buffer(bufferSize);
165 if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
166 ShellSubscription config;
167 if (config.ParseFromArray(buffer.data(), bufferSize)) {
168 updateConfig(config);
169 } else {
170 ALOGE("error parsing the config");
171 break;
172 }
173 } else {
174 VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
175 break;
176 }
177 }
178}
179
180void ShellSubscriber::cleanUpLocked() {
181 // The file descriptors will be closed by binder.
182 mInput = 0;
183 mOutput = 0;
184 mResultReceiver = nullptr;
185 mPushedMatchers.clear();
Yao Chen41e606c2018-10-05 15:54:11 -0700186 mPulledInfo.clear();
187 mPullToken = 0;
Yao Chena80e5c02018-09-04 13:55:29 -0700188 VLOG("done clean up");
189}
190
191void ShellSubscriber::onLogEvent(const LogEvent& event) {
192 std::lock_guard<std::mutex> lock(mMutex);
193
194 if (mOutput <= 0) {
195 return;
196 }
Yao Chena80e5c02018-09-04 13:55:29 -0700197 for (const auto& matcher : mPushedMatchers) {
198 if (matchesSimple(*mUidMap, matcher, event)) {
Yao Chen41e606c2018-10-05 15:54:11 -0700199 VLOG("%s", event.ToString().c_str());
200 uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
201 util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
Yao Chen398dd192018-10-01 14:49:03 -0700202 event.ToProto(mProto);
Yao Chen41e606c2018-10-05 15:54:11 -0700203 mProto.end(atomToken);
Yao Chena80e5c02018-09-04 13:55:29 -0700204 // First write the payload size.
205 size_t bufferSize = mProto.size();
206 write(mOutput, &bufferSize, sizeof(bufferSize));
207
208 // Then write the payload.
Yao Chena80e5c02018-09-04 13:55:29 -0700209 mProto.flush(mOutput);
210 mProto.clear();
211 break;
212 }
213 }
214}
215
216void ShellSubscriber::binderDied(const wp<IBinder>& who) {
217 {
218 VLOG("Shell exits");
219 std::lock_guard<std::mutex> lock(mMutex);
220 cleanUpLocked();
221 }
222 mShellDied.notify_all();
223}
224
225} // namespace statsd
226} // namespace os
Yao Chen398dd192018-10-01 14:49:03 -0700227} // namespace android