Add pulled atom subscription for shell.

+ Changed the output format from Atom to ShellData, which is a wrapper for repeated Atom
  This is useful because pulled atoms are usually a list of atoms.

Test: statsd_test added
Bug: 110536553

Change-Id: I0e2f55bdd9015c9bc95b87a630297c6f13e39636
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 8da2d44..9d82bc0 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -319,7 +319,7 @@
         }
         if (!args[0].compare(String8("data-subscribe"))) {
             if (mShellSubscriber == nullptr) {
-                mShellSubscriber = new ShellSubscriber(mUidMap);
+                mShellSubscriber = new ShellSubscriber(mUidMap, mPullerManager);
             }
             mShellSubscriber->startNewSubscription(in, out, resultReceiver);
             return NO_ERROR;
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index 1306a46..dffff7a 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -18,9 +18,9 @@
 
 #include "ShellSubscriber.h"
 
-#include "matchers/matcher_util.h"
-
 #include <android-base/file.h>
+#include "matchers/matcher_util.h"
+#include "stats_log_util.h"
 
 using android::util::ProtoOutputStream;
 
@@ -28,6 +28,8 @@
 namespace os {
 namespace statsd {
 
+const static int FIELD_ID_ATOM = 1;
+
 void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver) {
     VLOG("start new shell subscription");
     {
@@ -42,25 +44,106 @@
         IInterface::asBinder(mResultReceiver)->linkToDeath(this);
     }
 
-    // Spawn another thread to read the config updates from the input file descriptor
-    std::thread reader([in, this] { readConfig(in); });
-    reader.detach();
+    // Note that the following is blocking, and it's intended as we cannot return until the shell
+    // cmd exits, otherwise all resources & FDs will be automatically closed.
 
+    // Read config forever until EOF is reached. Clients may send multiple configs -- each new
+    // config replace the previous one.
+    readConfig(in);
+
+    // Now we have read an EOF we now wait for the semaphore until the client exits.
+    VLOG("Now wait for client to exit");
     std::unique_lock<std::mutex> lk(mMutex);
-
     mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
-    if (reader.joinable()) {
-        reader.join();
-    }
 }
 
 void ShellSubscriber::updateConfig(const ShellSubscription& config) {
     std::lock_guard<std::mutex> lock(mMutex);
     mPushedMatchers.clear();
+    mPulledInfo.clear();
+
     for (const auto& pushed : config.pushed()) {
         mPushedMatchers.push_back(pushed);
         VLOG("adding matcher for atom %d", pushed.atom_id());
     }
+
+    int64_t token = getElapsedRealtimeNs();
+    mPullToken = token;
+
+    int64_t minInterval = -1;
+    for (const auto& pulled : config.pulled()) {
+        // All intervals need to be multiples of the min interval.
+        if (minInterval < 0 || pulled.freq_millis() < minInterval) {
+            minInterval = pulled.freq_millis();
+        }
+
+        mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
+        VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
+    }
+
+    if (mPulledInfo.size() > 0 && minInterval > 0) {
+        // This thread is guaranteed to terminate after it detects the token is different or
+        // cleaned up.
+        std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
+        puller.detach();
+    }
+}
+
+void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+                                          const SimpleAtomMatcher& matcher) {
+    if (mOutput == 0) return;
+    int count = 0;
+    mProto.clear();
+    for (const auto& event : data) {
+        VLOG("%s", event->ToString().c_str());
+        if (matchesSimple(*mUidMap, matcher, *event)) {
+            VLOG("matched");
+            count++;
+            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
+            event->ToProto(mProto);
+            mProto.end(atomToken);
+        }
+    }
+
+    if (count > 0) {
+        // First write the payload size.
+        size_t bufferSize = mProto.size();
+        write(mOutput, &bufferSize, sizeof(bufferSize));
+        VLOG("%d atoms, proto size: %zu", count, bufferSize);
+        // Then write the payload.
+        mProto.flush(mOutput);
+    }
+    mProto.clear();
+}
+
+void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
+    while (1) {
+        int64_t nowMillis = getElapsedRealtimeMillis();
+        {
+            std::lock_guard<std::mutex> lock(mMutex);
+            if (mPulledInfo.size() == 0 || mPullToken != token) {
+                VLOG("Pulling thread %lld done!", (long long)token);
+                return;
+            }
+            for (auto& pullInfo : mPulledInfo) {
+                if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
+                    VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
+
+                    vector<std::shared_ptr<LogEvent>> data;
+                    mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), nowMillis * 1000000L,
+                                     &data);
+                    VLOG("pulled %zu atoms", data.size());
+                    if (data.size() > 0) {
+                        writeToOutputLocked(data, pullInfo.mPullerMatcher);
+                    }
+                    pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
+                }
+            }
+        }
+        VLOG("Pulling thread %lld sleep....", (long long)token);
+        std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
+    }
 }
 
 void ShellSubscriber::readConfig(int in) {
@@ -101,6 +184,8 @@
     mOutput = 0;
     mResultReceiver = nullptr;
     mPushedMatchers.clear();
+    mPulledInfo.clear();
+    mPullToken = 0;
     VLOG("done clean up");
 }
 
@@ -110,10 +195,13 @@
     if (mOutput <= 0) {
         return;
     }
-
     for (const auto& matcher : mPushedMatchers) {
         if (matchesSimple(*mUidMap, matcher, event)) {
+            VLOG("%s", event.ToString().c_str());
+            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
             event.ToProto(mProto);
+            mProto.end(atomToken);
             // First write the payload size.
             size_t bufferSize = mProto.size();
             write(mOutput, &bufferSize, sizeof(bufferSize));
diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h
index 0ace35f..5401f31 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.h
+++ b/cmds/statsd/src/shell/ShellSubscriber.h
@@ -24,6 +24,7 @@
 #include <mutex>
 #include <string>
 #include <thread>
+#include "external/StatsPullerManager.h"
 #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
 #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
 #include "packages/UidMap.h"
@@ -51,14 +52,15 @@
  * with sizeof(size_t) bytes indicating the size of the proto message payload.
  *
  * The stream would be in the following format:
- * |size_t|atom1 proto|size_t|atom2 proto|....
+ * |size_t|shellData proto|size_t|shellData proto|....
  *
  * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
  * until it exits.
  */
 class ShellSubscriber : public virtual IBinder::DeathRecipient {
 public:
-    ShellSubscriber(sp<UidMap> uidMap) : mUidMap(uidMap){};
+    ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr)
+        : mUidMap(uidMap), mPullerMgr(pullerMgr){};
 
     /**
      * Start a new subscription.
@@ -70,15 +72,28 @@
     void onLogEvent(const LogEvent& event);
 
 private:
+    struct PullInfo {
+        PullInfo(const SimpleAtomMatcher& matcher, int64_t interval)
+            : mPullerMatcher(matcher), mInterval(interval), mPrevPullElapsedRealtimeMs(0) {
+        }
+        SimpleAtomMatcher mPullerMatcher;
+        int64_t mInterval;
+        int64_t mPrevPullElapsedRealtimeMs;
+    };
     void readConfig(int in);
 
     void updateConfig(const ShellSubscription& config);
 
+    void startPull(int64_t token, int64_t intervalMillis);
+
     void cleanUpLocked();
 
+    void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+                             const SimpleAtomMatcher& matcher);
+
     sp<UidMap> mUidMap;
 
-    // bool mWritten = false;
+    sp<StatsPullerManager> mPullerMgr;
 
     android::util::ProtoOutputStream mProto;
 
@@ -93,6 +108,10 @@
     sp<IResultReceiver> mResultReceiver;
 
     std::vector<SimpleAtomMatcher> mPushedMatchers;
+
+    std::vector<PullInfo> mPulledInfo;
+
+    int64_t mPullToken = 0;  // A unique token to identify a puller thread.
 };
 
 }  // namespace statsd
diff --git a/cmds/statsd/src/shell/shell_config.proto b/cmds/statsd/src/shell/shell_config.proto
index 516693d..73cb49a 100644
--- a/cmds/statsd/src/shell/shell_config.proto
+++ b/cmds/statsd/src/shell/shell_config.proto
@@ -24,7 +24,7 @@
 import "frameworks/base/cmds/statsd/src/statsd_config.proto";
 
 message PulledAtomSubscription {
-    optional int32 atom_id = 1;
+    optional SimpleAtomMatcher matcher = 1;
 
     /* gap between two pulls in milliseconds */
     optional int32 freq_millis = 2;
diff --git a/cmds/statsd/src/shell/shell_data.proto b/cmds/statsd/src/shell/shell_data.proto
new file mode 100644
index 0000000..236bdbd
--- /dev/null
+++ b/cmds/statsd/src/shell/shell_data.proto
@@ -0,0 +1,29 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+package android.os.statsd;
+
+option java_package = "com.android.os.statsd";
+option java_outer_classname = "ShellDataProto";
+
+import "frameworks/base/cmds/statsd/src/atoms.proto";
+
+// The output of shell subscription, including both pulled and pushed subscriptions.
+message ShellData {
+    repeated Atom atom = 1;
+}