Add pulled atom subscription for shell.

+ Changed the output format from Atom to ShellData, which is a wrapper for repeated Atom
  This is useful because pulled atoms are usually a list of atoms.

Test: statsd_test added
Bug: 110536553

Change-Id: I0e2f55bdd9015c9bc95b87a630297c6f13e39636
diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h
index 0ace35f..5401f31 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.h
+++ b/cmds/statsd/src/shell/ShellSubscriber.h
@@ -24,6 +24,7 @@
 #include <mutex>
 #include <string>
 #include <thread>
+#include "external/StatsPullerManager.h"
 #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
 #include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
 #include "packages/UidMap.h"
@@ -51,14 +52,15 @@
  * with sizeof(size_t) bytes indicating the size of the proto message payload.
  *
  * The stream would be in the following format:
- * |size_t|atom1 proto|size_t|atom2 proto|....
+ * |size_t|shellData proto|size_t|shellData proto|....
  *
  * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
  * until it exits.
  */
 class ShellSubscriber : public virtual IBinder::DeathRecipient {
 public:
-    ShellSubscriber(sp<UidMap> uidMap) : mUidMap(uidMap){};
+    ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr)
+        : mUidMap(uidMap), mPullerMgr(pullerMgr){};
 
     /**
      * Start a new subscription.
@@ -70,15 +72,28 @@
     void onLogEvent(const LogEvent& event);
 
 private:
+    struct PullInfo {
+        PullInfo(const SimpleAtomMatcher& matcher, int64_t interval)
+            : mPullerMatcher(matcher), mInterval(interval), mPrevPullElapsedRealtimeMs(0) {
+        }
+        SimpleAtomMatcher mPullerMatcher;
+        int64_t mInterval;
+        int64_t mPrevPullElapsedRealtimeMs;
+    };
     void readConfig(int in);
 
     void updateConfig(const ShellSubscription& config);
 
+    void startPull(int64_t token, int64_t intervalMillis);
+
     void cleanUpLocked();
 
+    void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+                             const SimpleAtomMatcher& matcher);
+
     sp<UidMap> mUidMap;
 
-    // bool mWritten = false;
+    sp<StatsPullerManager> mPullerMgr;
 
     android::util::ProtoOutputStream mProto;
 
@@ -93,6 +108,10 @@
     sp<IResultReceiver> mResultReceiver;
 
     std::vector<SimpleAtomMatcher> mPushedMatchers;
+
+    std::vector<PullInfo> mPulledInfo;
+
+    int64_t mPullToken = 0;  // A unique token to identify a puller thread.
 };
 
 }  // namespace statsd