Add pulled atom subscription for shell.
+ Changed the output format from Atom to ShellData, which is a wrapper for repeated Atom
This is useful because pulled atoms are usually a list of atoms.
Test: statsd_test added
Bug: 110536553
Change-Id: I0e2f55bdd9015c9bc95b87a630297c6f13e39636
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 8da2d44..9d82bc0 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -319,7 +319,7 @@
}
if (!args[0].compare(String8("data-subscribe"))) {
if (mShellSubscriber == nullptr) {
- mShellSubscriber = new ShellSubscriber(mUidMap);
+ mShellSubscriber = new ShellSubscriber(mUidMap, mPullerManager);
}
mShellSubscriber->startNewSubscription(in, out, resultReceiver);
return NO_ERROR;
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index 1306a46..dffff7a 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -18,9 +18,9 @@
#include "ShellSubscriber.h"
-#include "matchers/matcher_util.h"
-
#include <android-base/file.h>
+#include "matchers/matcher_util.h"
+#include "stats_log_util.h"
using android::util::ProtoOutputStream;
@@ -28,6 +28,8 @@
namespace os {
namespace statsd {
+const static int FIELD_ID_ATOM = 1;
+
void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver) {
VLOG("start new shell subscription");
{
@@ -42,25 +44,106 @@
IInterface::asBinder(mResultReceiver)->linkToDeath(this);
}
- // Spawn another thread to read the config updates from the input file descriptor
- std::thread reader([in, this] { readConfig(in); });
- reader.detach();
+ // Note that the following is blocking, and it's intended as we cannot return until the shell
+ // cmd exits, otherwise all resources & FDs will be automatically closed.
+ // Read config forever until EOF is reached. Clients may send multiple configs -- each new
+ // config replace the previous one.
+ readConfig(in);
+
+ // Now we have read an EOF we now wait for the semaphore until the client exits.
+ VLOG("Now wait for client to exit");
std::unique_lock<std::mutex> lk(mMutex);
-
mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
- if (reader.joinable()) {
- reader.join();
- }
}
void ShellSubscriber::updateConfig(const ShellSubscription& config) {
std::lock_guard<std::mutex> lock(mMutex);
mPushedMatchers.clear();
+ mPulledInfo.clear();
+
for (const auto& pushed : config.pushed()) {
mPushedMatchers.push_back(pushed);
VLOG("adding matcher for atom %d", pushed.atom_id());
}
+
+ int64_t token = getElapsedRealtimeNs();
+ mPullToken = token;
+
+ int64_t minInterval = -1;
+ for (const auto& pulled : config.pulled()) {
+ // All intervals need to be multiples of the min interval.
+ if (minInterval < 0 || pulled.freq_millis() < minInterval) {
+ minInterval = pulled.freq_millis();
+ }
+
+ mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
+ VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
+ }
+
+ if (mPulledInfo.size() > 0 && minInterval > 0) {
+ // This thread is guaranteed to terminate after it detects the token is different or
+ // cleaned up.
+ std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
+ puller.detach();
+ }
+}
+
+void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+ const SimpleAtomMatcher& matcher) {
+ if (mOutput == 0) return;
+ int count = 0;
+ mProto.clear();
+ for (const auto& event : data) {
+ VLOG("%s", event->ToString().c_str());
+ if (matchesSimple(*mUidMap, matcher, *event)) {
+ VLOG("matched");
+ count++;
+ uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+ util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
+ event->ToProto(mProto);
+ mProto.end(atomToken);
+ }
+ }
+
+ if (count > 0) {
+ // First write the payload size.
+ size_t bufferSize = mProto.size();
+ write(mOutput, &bufferSize, sizeof(bufferSize));
+ VLOG("%d atoms, proto size: %zu", count, bufferSize);
+ // Then write the payload.
+ mProto.flush(mOutput);
+ }
+ mProto.clear();
+}
+
+void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
+ while (1) {
+ int64_t nowMillis = getElapsedRealtimeMillis();
+ {
+ std::lock_guard<std::mutex> lock(mMutex);
+ if (mPulledInfo.size() == 0 || mPullToken != token) {
+ VLOG("Pulling thread %lld done!", (long long)token);
+ return;
+ }
+ for (auto& pullInfo : mPulledInfo) {
+ if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
+ VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
+
+ vector<std::shared_ptr<LogEvent>> data;
+ mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), nowMillis * 1000000L,
+ &data);
+ VLOG("pulled %zu atoms", data.size());
+ if (data.size() > 0) {
+ writeToOutputLocked(data, pullInfo.mPullerMatcher);
+ }
+ pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
+ }
+ }
+ }
+ VLOG("Pulling thread %lld sleep....", (long long)token);
+ std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
+ }
}
void ShellSubscriber::readConfig(int in) {
@@ -101,6 +184,8 @@
mOutput = 0;
mResultReceiver = nullptr;
mPushedMatchers.clear();
+ mPulledInfo.clear();
+ mPullToken = 0;
VLOG("done clean up");
}
@@ -110,10 +195,13 @@
if (mOutput <= 0) {
return;
}
-
for (const auto& matcher : mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) {
+ VLOG("%s", event.ToString().c_str());
+ uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+ util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event.ToProto(mProto);
+ mProto.end(atomToken);
// First write the payload size.
size_t bufferSize = mProto.size();
write(mOutput, &bufferSize, sizeof(bufferSize));
diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h
index 0ace35f..5401f31 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.h
+++ b/cmds/statsd/src/shell/ShellSubscriber.h
@@ -24,6 +24,7 @@
#include <mutex>
#include <string>
#include <thread>
+#include "external/StatsPullerManager.h"
#include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
#include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
#include "packages/UidMap.h"
@@ -51,14 +52,15 @@
* with sizeof(size_t) bytes indicating the size of the proto message payload.
*
* The stream would be in the following format:
- * |size_t|atom1 proto|size_t|atom2 proto|....
+ * |size_t|shellData proto|size_t|shellData proto|....
*
* Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
* until it exits.
*/
class ShellSubscriber : public virtual IBinder::DeathRecipient {
public:
- ShellSubscriber(sp<UidMap> uidMap) : mUidMap(uidMap){};
+ ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr)
+ : mUidMap(uidMap), mPullerMgr(pullerMgr){};
/**
* Start a new subscription.
@@ -70,15 +72,28 @@
void onLogEvent(const LogEvent& event);
private:
+ struct PullInfo {
+ PullInfo(const SimpleAtomMatcher& matcher, int64_t interval)
+ : mPullerMatcher(matcher), mInterval(interval), mPrevPullElapsedRealtimeMs(0) {
+ }
+ SimpleAtomMatcher mPullerMatcher;
+ int64_t mInterval;
+ int64_t mPrevPullElapsedRealtimeMs;
+ };
void readConfig(int in);
void updateConfig(const ShellSubscription& config);
+ void startPull(int64_t token, int64_t intervalMillis);
+
void cleanUpLocked();
+ void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+ const SimpleAtomMatcher& matcher);
+
sp<UidMap> mUidMap;
- // bool mWritten = false;
+ sp<StatsPullerManager> mPullerMgr;
android::util::ProtoOutputStream mProto;
@@ -93,6 +108,10 @@
sp<IResultReceiver> mResultReceiver;
std::vector<SimpleAtomMatcher> mPushedMatchers;
+
+ std::vector<PullInfo> mPulledInfo;
+
+ int64_t mPullToken = 0; // A unique token to identify a puller thread.
};
} // namespace statsd
diff --git a/cmds/statsd/src/shell/shell_config.proto b/cmds/statsd/src/shell/shell_config.proto
index 516693d..73cb49a 100644
--- a/cmds/statsd/src/shell/shell_config.proto
+++ b/cmds/statsd/src/shell/shell_config.proto
@@ -24,7 +24,7 @@
import "frameworks/base/cmds/statsd/src/statsd_config.proto";
message PulledAtomSubscription {
- optional int32 atom_id = 1;
+ optional SimpleAtomMatcher matcher = 1;
/* gap between two pulls in milliseconds */
optional int32 freq_millis = 2;
diff --git a/cmds/statsd/src/shell/shell_data.proto b/cmds/statsd/src/shell/shell_data.proto
new file mode 100644
index 0000000..236bdbd
--- /dev/null
+++ b/cmds/statsd/src/shell/shell_data.proto
@@ -0,0 +1,29 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+package android.os.statsd;
+
+option java_package = "com.android.os.statsd";
+option java_outer_classname = "ShellDataProto";
+
+import "frameworks/base/cmds/statsd/src/atoms.proto";
+
+// The output of shell subscription, including both pulled and pushed subscriptions.
+message ShellData {
+ repeated Atom atom = 1;
+}