Move statsd (and tests) to libbinder_ndk
Major changes include:
- Removing unused permission checks within StatsService. These
include ENFORCE_DUMP_AND_USAGE_STATS, checkDumpAndUsageStats,
kOpUsage, and kPermissionUsage.
- Converting from sp to shared_ptr
- Using libbinder_ndk functions instead of libbinder functions
(e.g. for installing death recipients, getting calling uids, etc.)
- New death recipients were added in StatsService,
ConfigManager, and SubscriberReporter.
- Using a unique token (timestamp) to identify shell subscribers
instead of IResultReceiver because IResultReceiver is not exposed by
libbinder_ndk. Currently, statsd cannot detect if perfd dies; we
will fix that later.
Bug: 145232107
Bug: 148609603
Test: m statsd
Test: m statsd_test
Test: bit stastd_test:*
Test: atest GtsStatsdHostTestCases
Change-Id: Ia1fda7280c22320bc4ebc8371acaadbe8eabcbd2
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index d6a0433..a861a3b 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -29,49 +29,88 @@
const static int FIELD_ID_ATOM = 1;
-void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver,
- int timeoutSec) {
+void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
VLOG("start new shell subscription");
+ int64_t subscriberId = getElapsedRealtimeNs();
+
{
std::lock_guard<std::mutex> lock(mMutex);
- if (mResultReceiver != nullptr) {
+ if (mSubscriberId> 0) {
VLOG("Only one shell subscriber is allowed.");
return;
}
+ mSubscriberId = subscriberId;
mInput = in;
mOutput = out;
- mResultReceiver = resultReceiver;
- IInterface::asBinder(mResultReceiver)->linkToDeath(this);
}
- // Note that the following is blocking, and it's intended as we cannot return until the shell
- // cmd exits, otherwise all resources & FDs will be automatically closed.
+ bool success = readConfig();
+ if (!success) {
+ std::lock_guard<std::mutex> lock(mMutex);
+ cleanUpLocked();
+ }
- // Read config forever until EOF is reached. Clients may send multiple configs -- each new
- // config replace the previous one.
- readConfig(in);
- VLOG("timeout : %d", timeoutSec);
-
- // Now we have read an EOF we now wait for the semaphore until the client exits.
- VLOG("Now wait for client to exit");
+ VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec);
std::unique_lock<std::mutex> lk(mMutex);
+ // Note that the following is blocking, and it's intended as we cannot return until the shell
+ // cmd exits or we time out.
if (timeoutSec > 0) {
mShellDied.wait_for(lk, timeoutSec * 1s,
- [this, resultReceiver] { return mResultReceiver != resultReceiver; });
+ [this, subscriberId] { return mSubscriberId != subscriberId; });
} else {
- mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
+ mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; });
+ }
+}
+
+
+// Read configs until EOF is reached. There may be multiple configs in the input
+// -- each new config should replace the previous one.
+//
+// Returns a boolean indicating whether the input was read successfully.
+bool ShellSubscriber::readConfig() {
+ if (mInput < 0) {
+ return false;
+ }
+
+ while (true) {
+ // Read the size of the config.
+ size_t bufferSize = 0;
+ ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize));
+ if (bytesRead == 0) {
+ VLOG("We have reached the end of the input.");
+ return true;
+ } else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) {
+ ALOGE("Error reading config size");
+ return false;
+ }
+
+ // Read and parse the config.
+ vector<uint8_t> buffer(bufferSize);
+ bytesRead = read(mInput, buffer.data(), bufferSize);
+ if (bytesRead > 0 && (size_t)bytesRead == bufferSize) {
+ ShellSubscription config;
+ if (config.ParseFromArray(buffer.data(), bufferSize)) {
+ updateConfig(config);
+ } else {
+ ALOGE("Error parsing the config");
+ return false;
+ }
+ } else {
+ VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize,
+ bytesRead);
+ return false;
+ }
}
}
void ShellSubscriber::updateConfig(const ShellSubscription& config) {
- std::lock_guard<std::mutex> lock(mMutex);
mPushedMatchers.clear();
mPulledInfo.clear();
for (const auto& pushed : config.pushed()) {
mPushedMatchers.push_back(pushed);
- VLOG("adding matcher for atom %d", pushed.atom_id());
+ VLOG("adding matcher for pushed atom %d", pushed.atom_id());
}
int64_t token = getElapsedRealtimeNs();
@@ -89,46 +128,20 @@
}
if (mPulledInfo.size() > 0 && minInterval > 0) {
- // This thread is guaranteed to terminate after it detects the token is different or
- // cleaned up.
+ // This thread is guaranteed to terminate after it detects the token is
+ // different.
std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
puller.detach();
}
}
-void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
- const SimpleAtomMatcher& matcher) {
- if (mOutput == 0) return;
- int count = 0;
- mProto.clear();
- for (const auto& event : data) {
- VLOG("%s", event->ToString().c_str());
- if (matchesSimple(*mUidMap, matcher, *event)) {
- VLOG("matched");
- count++;
- uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
- util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
- event->ToProto(mProto);
- mProto.end(atomToken);
- }
- }
-
- if (count > 0) {
- // First write the payload size.
- size_t bufferSize = mProto.size();
- write(mOutput, &bufferSize, sizeof(bufferSize));
- VLOG("%d atoms, proto size: %zu", count, bufferSize);
- // Then write the payload.
- mProto.flush(mOutput);
- }
- mProto.clear();
-}
-
void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
- while (1) {
+ while (true) {
int64_t nowMillis = getElapsedRealtimeMillis();
{
std::lock_guard<std::mutex> lock(mMutex);
+ // If the token has changed, the config has changed, so this
+ // puller can now stop.
if (mPulledInfo.size() == 0 || mPullToken != token) {
VLOG("Pulling thread %lld done!", (long long)token);
return;
@@ -152,55 +165,47 @@
}
}
-void ShellSubscriber::readConfig(int in) {
- if (in <= 0) {
+// Must be called with the lock acquired, so that mProto isn't being written to
+// at the same time by multiple threads.
+void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+ const SimpleAtomMatcher& matcher) {
+ if (mOutput < 0) {
return;
}
-
- while (1) {
- size_t bufferSize = 0;
- int result = 0;
- if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
- VLOG("Done reading");
- break;
- } else if (result < 0 || result != sizeof(bufferSize)) {
- ALOGE("Error reading config size");
- break;
- }
-
- vector<uint8_t> buffer(bufferSize);
- if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
- ShellSubscription config;
- if (config.ParseFromArray(buffer.data(), bufferSize)) {
- updateConfig(config);
- } else {
- ALOGE("error parsing the config");
- break;
- }
- } else {
- VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
- break;
+ int count = 0;
+ mProto.clear();
+ for (const auto& event : data) {
+ VLOG("%s", event->ToString().c_str());
+ if (matchesSimple(*mUidMap, matcher, *event)) {
+ VLOG("matched");
+ count++;
+ uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+ util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
+ event->ToProto(mProto);
+ mProto.end(atomToken);
}
}
-}
-void ShellSubscriber::cleanUpLocked() {
- // The file descriptors will be closed by binder.
- mInput = 0;
- mOutput = 0;
- mResultReceiver = nullptr;
- mPushedMatchers.clear();
- mPulledInfo.clear();
- mPullToken = 0;
- VLOG("done clean up");
+ if (count > 0) {
+ // First write the payload size.
+ size_t bufferSize = mProto.size();
+ write(mOutput, &bufferSize, sizeof(bufferSize));
+
+ VLOG("%d atoms, proto size: %zu", count, bufferSize);
+ // Then write the payload.
+ mProto.flush(mOutput);
+ }
}
void ShellSubscriber::onLogEvent(const LogEvent& event) {
+ // Acquire a lock to prevent corruption from multiple threads writing to
+ // mProto.
std::lock_guard<std::mutex> lock(mMutex);
-
- if (mOutput <= 0) {
+ if (mOutput < 0) {
return;
}
+
+ mProto.clear();
for (const auto& matcher : mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) {
VLOG("%s", event.ToString().c_str());
@@ -208,25 +213,27 @@
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event.ToProto(mProto);
mProto.end(atomToken);
+
// First write the payload size.
size_t bufferSize = mProto.size();
write(mOutput, &bufferSize, sizeof(bufferSize));
// Then write the payload.
mProto.flush(mOutput);
- mProto.clear();
- break;
}
}
}
-void ShellSubscriber::binderDied(const wp<IBinder>& who) {
- {
- VLOG("Shell exits");
- std::lock_guard<std::mutex> lock(mMutex);
- cleanUpLocked();
- }
- mShellDied.notify_all();
+void ShellSubscriber::cleanUpLocked() {
+ // The file descriptors will be closed by binder.
+ mInput = -1;
+ mOutput = -1;
+ mSubscriberId = 0;
+ mPushedMatchers.clear();
+ mPulledInfo.clear();
+ // Setting mPullToken == 0 tells pull thread that its work is done.
+ mPullToken = 0;
+ VLOG("done clean up");
}
} // namespace statsd