Implement new perfd<->statsd ShellSubscriber comm.
Because we no longer linkToDeath against a binder object to detect if
the cmd process has died, we detect deaths by checking if writes fail.
ag/10476582 proves that writes fail if the cmd process dies.
Test: m statsd
Test: bit statsd_test:ShellSubscriberTest.testPushedSubscription
Test: bit statsd_test:ShellSubscriberTest.testPulledSubscription
Bug: 150619687
Change-Id: I44a777ffff11e5b9298912b2906063c65e9009eb
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index a861a3b..6fa1654 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -18,6 +18,7 @@
#include "ShellSubscriber.h"
+#include <android-base/file.h>
#include "matchers/matcher_util.h"
#include "stats_log_util.h"
@@ -30,154 +31,129 @@
const static int FIELD_ID_ATOM = 1;
void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
- VLOG("start new shell subscription");
- int64_t subscriberId = getElapsedRealtimeNs();
+ int myToken = claimToken();
+ mSubscriptionShouldEnd.notify_one();
- {
- std::lock_guard<std::mutex> lock(mMutex);
- if (mSubscriberId> 0) {
- VLOG("Only one shell subscriber is allowed.");
- return;
- }
- mSubscriberId = subscriberId;
- mInput = in;
- mOutput = out;
+ shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
+ if (!readConfig(mySubscriptionInfo)) {
+ return;
}
- bool success = readConfig();
- if (!success) {
- std::lock_guard<std::mutex> lock(mMutex);
- cleanUpLocked();
+ // critical-section
+ std::unique_lock<std::mutex> lock(mMutex);
+ if (myToken < mToken) {
+ // Some other subscription has already come in. Stop.
+ return;
+ }
+ mSubscriptionInfo = mySubscriptionInfo;
+
+ if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) {
+ // This thread terminates after it detects that mToken has changed.
+ std::thread puller([this, myToken] { startPull(myToken); });
+ puller.detach();
}
- VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec);
- std::unique_lock<std::mutex> lk(mMutex);
-
- // Note that the following is blocking, and it's intended as we cannot return until the shell
- // cmd exits or we time out.
+ // Block until subscription has ended.
if (timeoutSec > 0) {
- mShellDied.wait_for(lk, timeoutSec * 1s,
- [this, subscriberId] { return mSubscriberId != subscriberId; });
+ mSubscriptionShouldEnd.wait_for(
+ lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] {
+ return mToken != myToken || !mySubscriptionInfo->mClientAlive;
+ });
} else {
- mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; });
+ mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] {
+ return mToken != myToken || !mySubscriptionInfo->mClientAlive;
+ });
+ }
+
+ if (mSubscriptionInfo == mySubscriptionInfo) {
+ mSubscriptionInfo = nullptr;
}
}
+// Atomically claim the next token. Token numbers denote subscriber ordering.
+int ShellSubscriber::claimToken() {
+ std::unique_lock<std::mutex> lock(mMutex);
+ int myToken = ++mToken;
+ return myToken;
+}
-// Read configs until EOF is reached. There may be multiple configs in the input
-// -- each new config should replace the previous one.
-//
-// Returns a boolean indicating whether the input was read successfully.
-bool ShellSubscriber::readConfig() {
- if (mInput < 0) {
+// Read and parse single config. There should only one config per input.
+bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) {
+ // Read the size of the config.
+ size_t bufferSize;
+ if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) {
return false;
}
- while (true) {
- // Read the size of the config.
- size_t bufferSize = 0;
- ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize));
- if (bytesRead == 0) {
- VLOG("We have reached the end of the input.");
- return true;
- } else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) {
- ALOGE("Error reading config size");
- return false;
- }
-
- // Read and parse the config.
- vector<uint8_t> buffer(bufferSize);
- bytesRead = read(mInput, buffer.data(), bufferSize);
- if (bytesRead > 0 && (size_t)bytesRead == bufferSize) {
- ShellSubscription config;
- if (config.ParseFromArray(buffer.data(), bufferSize)) {
- updateConfig(config);
- } else {
- ALOGE("Error parsing the config");
- return false;
- }
- } else {
- VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize,
- bytesRead);
- return false;
- }
+ // Read the config.
+ vector<uint8_t> buffer(bufferSize);
+ if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) {
+ return false;
}
-}
-void ShellSubscriber::updateConfig(const ShellSubscription& config) {
- mPushedMatchers.clear();
- mPulledInfo.clear();
+ // Parse the config.
+ ShellSubscription config;
+ if (!config.ParseFromArray(buffer.data(), bufferSize)) {
+ return false;
+ }
+ // Update SubscriptionInfo with state from config
for (const auto& pushed : config.pushed()) {
- mPushedMatchers.push_back(pushed);
- VLOG("adding matcher for pushed atom %d", pushed.atom_id());
+ subscriptionInfo->mPushedMatchers.push_back(pushed);
}
- int64_t token = getElapsedRealtimeNs();
- mPullToken = token;
-
- int64_t minInterval = -1;
+ int minInterval = -1;
for (const auto& pulled : config.pulled()) {
// All intervals need to be multiples of the min interval.
if (minInterval < 0 || pulled.freq_millis() < minInterval) {
minInterval = pulled.freq_millis();
}
-
- mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
- VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
+ subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
}
+ subscriptionInfo->mPullIntervalMin = minInterval;
- if (mPulledInfo.size() > 0 && minInterval > 0) {
- // This thread is guaranteed to terminate after it detects the token is
- // different.
- std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
- puller.detach();
- }
+ return true;
}
-void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
+void ShellSubscriber::startPull(int64_t myToken) {
while (true) {
- int64_t nowMillis = getElapsedRealtimeMillis();
- {
- std::lock_guard<std::mutex> lock(mMutex);
- // If the token has changed, the config has changed, so this
- // puller can now stop.
- if (mPulledInfo.size() == 0 || mPullToken != token) {
- VLOG("Pulling thread %lld done!", (long long)token);
- return;
- }
- for (auto& pullInfo : mPulledInfo) {
- if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
- VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
+ std::lock_guard<std::mutex> lock(mMutex);
+ if (!mSubscriptionInfo || mToken != myToken) {
+ VLOG("Pulling thread %lld done!", (long long)myToken);
+ return;
+ }
- vector<std::shared_ptr<LogEvent>> data;
- mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
- VLOG("pulled %zu atoms", data.size());
- if (data.size() > 0) {
- writeToOutputLocked(data, pullInfo.mPullerMatcher);
- }
- pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
+ int64_t nowMillis = getElapsedRealtimeMillis();
+ for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) {
+ if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
+ vector<std::shared_ptr<LogEvent>> data;
+ mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
+ VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
+
+ // TODO(b/150969574): Don't write to a pipe while holding a lock.
+ if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) {
+ mSubscriptionInfo->mClientAlive = false;
+ mSubscriptionShouldEnd.notify_one();
+ return;
}
+ pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
}
}
- VLOG("Pulling thread %lld sleep....", (long long)token);
- std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
+
+ VLOG("Pulling thread %lld sleep....", (long long)myToken);
+ std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
}
}
-// Must be called with the lock acquired, so that mProto isn't being written to
-// at the same time by multiple threads.
-void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
- const SimpleAtomMatcher& matcher) {
- if (mOutput < 0) {
- return;
- }
- int count = 0;
+// \return boolean indicating if writes were successful (will return false if
+// client dies)
+bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
+ const SimpleAtomMatcher& matcher) {
mProto.clear();
+ int count = 0;
for (const auto& event : data) {
VLOG("%s", event->ToString().c_str());
if (matchesSimple(*mUidMap, matcher, *event)) {
- VLOG("matched");
count++;
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
@@ -189,24 +165,29 @@
if (count > 0) {
// First write the payload size.
size_t bufferSize = mProto.size();
- write(mOutput, &bufferSize, sizeof(bufferSize));
+ if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
+ sizeof(bufferSize))) {
+ return false;
+ }
VLOG("%d atoms, proto size: %zu", count, bufferSize);
// Then write the payload.
- mProto.flush(mOutput);
+ if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
+ return false;
+ }
}
+
+ return true;
}
void ShellSubscriber::onLogEvent(const LogEvent& event) {
- // Acquire a lock to prevent corruption from multiple threads writing to
- // mProto.
std::lock_guard<std::mutex> lock(mMutex);
- if (mOutput < 0) {
+ if (!mSubscriptionInfo) {
return;
}
mProto.clear();
- for (const auto& matcher : mPushedMatchers) {
+ for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) {
VLOG("%s", event.ToString().c_str());
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
@@ -216,26 +197,23 @@
// First write the payload size.
size_t bufferSize = mProto.size();
- write(mOutput, &bufferSize, sizeof(bufferSize));
+ if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
+ sizeof(bufferSize))) {
+ mSubscriptionInfo->mClientAlive = false;
+ mSubscriptionShouldEnd.notify_one();
+ return;
+ }
// Then write the payload.
- mProto.flush(mOutput);
+ if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
+ mSubscriptionInfo->mClientAlive = false;
+ mSubscriptionShouldEnd.notify_one();
+ return;
+ }
}
}
}
-void ShellSubscriber::cleanUpLocked() {
- // The file descriptors will be closed by binder.
- mInput = -1;
- mOutput = -1;
- mSubscriberId = 0;
- mPushedMatchers.clear();
- mPulledInfo.clear();
- // Setting mPullToken == 0 tells pull thread that its work is done.
- mPullToken = 0;
- VLOG("done clean up");
-}
-
} // namespace statsd
} // namespace os
} // namespace android