Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright (C) 2018 The Android Open Source Project |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
Yi Yang | bdee486 | 2019-03-18 14:53:32 -0700 | [diff] [blame] | 16 | #define DEBUG false // STOPSHIP if true |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 17 | #include "Log.h" |
| 18 | |
| 19 | #include "ShellSubscriber.h" |
| 20 | |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 21 | #include <android-base/file.h> |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 22 | |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 23 | #include "matchers/matcher_util.h" |
| 24 | #include "stats_log_util.h" |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 25 | |
| 26 | using android::util::ProtoOutputStream; |
| 27 | |
| 28 | namespace android { |
| 29 | namespace os { |
| 30 | namespace statsd { |
| 31 | |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 32 | const static int FIELD_ID_ATOM = 1; |
| 33 | |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 34 | void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 35 | int myToken = claimToken(); |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 36 | VLOG("ShellSubscriber: new subscription %d has come in", myToken); |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 37 | mSubscriptionShouldEnd.notify_one(); |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 38 | |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 39 | shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out); |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 40 | if (!readConfig(mySubscriptionInfo)) return; |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 41 | |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 42 | { |
| 43 | std::unique_lock<std::mutex> lock(mMutex); |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 44 | mSubscriptionInfo = mySubscriptionInfo; |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 45 | spawnHelperThread(myToken); |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 46 | waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec); |
| 47 | |
| 48 | if (mSubscriptionInfo == mySubscriptionInfo) { |
| 49 | mSubscriptionInfo = nullptr; |
| 50 | } |
| 51 | |
| 52 | } |
| 53 | } |
| 54 | |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 55 | void ShellSubscriber::spawnHelperThread(int myToken) { |
| 56 | std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); }); |
| 57 | t.detach(); |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 58 | } |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 59 | |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 60 | void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo, |
| 61 | int myToken, |
| 62 | std::unique_lock<std::mutex>& lock, |
| 63 | int timeoutSec) { |
| 64 | if (timeoutSec > 0) { |
| 65 | mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] { |
| 66 | return mToken != myToken || !myInfo->mClientAlive; |
| 67 | }); |
| 68 | } else { |
| 69 | mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] { |
| 70 | return mToken != myToken || !myInfo->mClientAlive; |
| 71 | }); |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 72 | } |
| 73 | } |
| 74 | |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 75 | // Atomically claim the next token. Token numbers denote subscriber ordering. |
| 76 | int ShellSubscriber::claimToken() { |
| 77 | std::unique_lock<std::mutex> lock(mMutex); |
| 78 | int myToken = ++mToken; |
| 79 | return myToken; |
| 80 | } |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 81 | |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 82 | // Read and parse single config. There should only one config per input. |
| 83 | bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) { |
| 84 | // Read the size of the config. |
| 85 | size_t bufferSize; |
| 86 | if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) { |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 87 | return false; |
| 88 | } |
| 89 | |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 90 | // Read the config. |
| 91 | vector<uint8_t> buffer(bufferSize); |
| 92 | if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) { |
| 93 | return false; |
Yao Chen | 35cb8d6 | 2019-01-03 16:49:14 -0800 | [diff] [blame] | 94 | } |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 95 | |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 96 | // Parse the config. |
| 97 | ShellSubscription config; |
| 98 | if (!config.ParseFromArray(buffer.data(), bufferSize)) { |
| 99 | return false; |
| 100 | } |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 101 | |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 102 | // Update SubscriptionInfo with state from config |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 103 | for (const auto& pushed : config.pushed()) { |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 104 | subscriptionInfo->mPushedMatchers.push_back(pushed); |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 105 | } |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 106 | |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 107 | for (const auto& pulled : config.pulled()) { |
Tej Singh | 3be093b | 2020-03-04 20:08:38 -0800 | [diff] [blame] | 108 | vector<string> packages; |
| 109 | vector<int32_t> uids; |
| 110 | for (const string& pkg : pulled.packages()) { |
| 111 | auto it = UidMap::sAidToUidMapping.find(pkg); |
| 112 | if (it != UidMap::sAidToUidMapping.end()) { |
| 113 | uids.push_back(it->second); |
| 114 | } else { |
| 115 | packages.push_back(pkg); |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages, |
| 120 | uids); |
| 121 | VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 122 | } |
| 123 | |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 124 | return true; |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 125 | } |
| 126 | |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 127 | void ShellSubscriber::pullAndSendHeartbeats(int myToken) { |
| 128 | VLOG("ShellSubscriber: helper thread %d starting", myToken); |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 129 | while (true) { |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 130 | int64_t sleepTimeMs = INT_MAX; |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 131 | { |
| 132 | std::lock_guard<std::mutex> lock(mMutex); |
| 133 | if (!mSubscriptionInfo || mToken != myToken) { |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 134 | VLOG("ShellSubscriber: helper thread %d done!", myToken); |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 135 | return; |
| 136 | } |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 137 | |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 138 | int64_t nowMillis = getElapsedRealtimeMillis(); |
Tej Singh | 7b975a8 | 2020-05-11 11:05:08 -0700 | [diff] [blame] | 139 | int64_t nowNanos = getElapsedRealtimeNs(); |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 140 | for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { |
| 141 | if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) { |
| 142 | continue; |
| 143 | } |
| 144 | |
Tej Singh | 3be093b | 2020-03-04 20:08:38 -0800 | [diff] [blame] | 145 | vector<int32_t> uids; |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 146 | getUidsForPullAtom(&uids, pullInfo); |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 147 | |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 148 | vector<std::shared_ptr<LogEvent>> data; |
Tej Singh | 7b975a8 | 2020-05-11 11:05:08 -0700 | [diff] [blame] | 149 | mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data); |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 150 | VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); |
| 151 | writePulledAtomsLocked(data, pullInfo.mPullerMatcher); |
| 152 | |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 153 | pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 154 | } |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 155 | |
| 156 | // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received |
| 157 | // data from statsd. When it receives the data size of 0, perfd will not expect any |
| 158 | // atoms and recheck whether the subscription should end. |
| 159 | if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) { |
| 160 | attemptWriteToPipeLocked(/*dataSize=*/0); |
| 161 | } |
| 162 | |
| 163 | // Determine how long to sleep before doing more work. |
| 164 | for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { |
| 165 | int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval; |
| 166 | int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative |
| 167 | if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull; |
| 168 | } |
| 169 | int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis; |
| 170 | if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat; |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 171 | } |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 172 | |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 173 | VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken, |
| 174 | (long long)sleepTimeMs); |
| 175 | std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs)); |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 176 | } |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 177 | } |
| 178 | |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 179 | void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) { |
| 180 | uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); |
| 181 | // This is slow. Consider storing the uids per app and listening to uidmap updates. |
| 182 | for (const string& pkg : pullInfo.mPullPackages) { |
| 183 | set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg); |
| 184 | uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end()); |
| 185 | } |
| 186 | uids->push_back(DEFAULT_PULL_UID); |
| 187 | } |
| 188 | |
| 189 | void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 190 | const SimpleAtomMatcher& matcher) { |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 191 | mProto.clear(); |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 192 | int count = 0; |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 193 | for (const auto& event : data) { |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 194 | if (matchesSimple(*mUidMap, matcher, *event)) { |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 195 | count++; |
| 196 | uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | |
| 197 | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); |
| 198 | event->ToProto(mProto); |
| 199 | mProto.end(atomToken); |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 200 | } |
| 201 | } |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 202 | |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 203 | if (count > 0) attemptWriteToPipeLocked(mProto.size()); |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 204 | } |
| 205 | |
| 206 | void ShellSubscriber::onLogEvent(const LogEvent& event) { |
| 207 | std::lock_guard<std::mutex> lock(mMutex); |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 208 | if (!mSubscriptionInfo) return; |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 209 | |
| 210 | mProto.clear(); |
Ruchir Rastogi | a5e4bb5 | 2020-03-04 17:11:58 -0800 | [diff] [blame] | 211 | for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 212 | if (matchesSimple(*mUidMap, matcher, event)) { |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 213 | uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | |
| 214 | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); |
Yao Chen | 398dd19 | 2018-10-01 14:49:03 -0700 | [diff] [blame] | 215 | event.ToProto(mProto); |
Yao Chen | 41e606c | 2018-10-05 15:54:11 -0700 | [diff] [blame] | 216 | mProto.end(atomToken); |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 217 | attemptWriteToPipeLocked(mProto.size()); |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 218 | } |
| 219 | } |
| 220 | } |
Ruchir Rastogi | e449b0c | 2020-02-10 17:40:09 -0800 | [diff] [blame] | 221 | |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 222 | // Tries to write the atom encoded in mProto to the pipe. If the write fails |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 223 | // because the read end of the pipe has closed, signals to other threads that |
| 224 | // the subscription should end. |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 225 | void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) { |
| 226 | // First, write the payload size. |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 227 | if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) { |
| 228 | mSubscriptionInfo->mClientAlive = false; |
| 229 | mSubscriptionShouldEnd.notify_one(); |
| 230 | return; |
| 231 | } |
| 232 | |
Ruchir Rastogi | 835e75c | 2020-05-15 12:57:15 -0700 | [diff] [blame^] | 233 | // Then, write the payload if this is not just a heartbeat. |
| 234 | if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) { |
Ruchir Rastogi | 1e24051 | 2020-04-22 09:03:22 -0700 | [diff] [blame] | 235 | mSubscriptionInfo->mClientAlive = false; |
| 236 | mSubscriptionShouldEnd.notify_one(); |
| 237 | return; |
| 238 | } |
| 239 | |
| 240 | mLastWriteMs = getElapsedRealtimeMillis(); |
| 241 | } |
| 242 | |
Yao Chen | a80e5c0 | 2018-09-04 13:55:29 -0700 | [diff] [blame] | 243 | } // namespace statsd |
| 244 | } // namespace os |
Yao Chen | 398dd19 | 2018-10-01 14:49:03 -0700 | [diff] [blame] | 245 | } // namespace android |