Support atoms subscription via shell cmd.

+ This feature is for supporting perfd.
+ Perfd is built outside android and doesnot depend on any android libraries.
  So the communication between perfd and statsd can only be via non-android IPCs
  (such as socket, pipe, file descriptors etc.)
+ Perfd runs as a shell user, so using the existing shell cmd is a natural choice.
+ The input is a simple config, and output is a stream of atoms proto binary data.

+ Also cleaned up the code so that we use file descriptor directly instead of creating another
  FILE*.

TODO: pulled atom subscription.

Bug: 110536553
Test: statsd_test and manually tested
Change-Id: I64b0061cc66b5f7648147885a2ac1af531c19917
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 9a79345..2ef1169 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -34,13 +34,14 @@
 #include <dirent.h>
 #include <frameworks/base/cmds/statsd/src/statsd_config.pb.h>
 #include <private/android_filesystem_config.h>
-#include <utils/Looper.h>
-#include <utils/String16.h>
 #include <statslog.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <sys/system_properties.h>
 #include <unistd.h>
+#include <utils/Looper.h>
+#include <utils/String16.h>
+#include <chrono>
 
 using namespace android;
 
@@ -214,30 +215,8 @@
             sp<IResultReceiver> resultReceiver =
                     IResultReceiver::asInterface(data.readStrongBinder());
 
-            FILE* fin = fdopen(in, "r");
-            FILE* fout = fdopen(out, "w");
-            FILE* ferr = fdopen(err, "w");
-
-            if (fin == NULL || fout == NULL || ferr == NULL) {
-                resultReceiver->send(NO_MEMORY);
-            } else {
-                err = command(fin, fout, ferr, args);
-                resultReceiver->send(err);
-            }
-
-            if (fin != NULL) {
-                fflush(fin);
-                fclose(fin);
-            }
-            if (fout != NULL) {
-                fflush(fout);
-                fclose(fout);
-            }
-            if (fout != NULL) {
-                fflush(ferr);
-                fclose(ferr);
-            }
-
+            err = command(in, out, err, args, resultReceiver);
+            resultReceiver->send(err);
             return NO_ERROR;
         }
         default: { return BnStatsManager::onTransact(code, data, reply, flags); }
@@ -251,10 +230,6 @@
     if (!checkCallingPermission(String16(kPermissionDump))) {
         return PERMISSION_DENIED;
     }
-    FILE* out = fdopen(fd, "w");
-    if (out == NULL) {
-        return NO_MEMORY;  // the fd is already open
-    }
 
     bool verbose = false;
     bool proto = false;
@@ -265,21 +240,20 @@
         proto = true;
     }
 
-    dump_impl(out, verbose, proto);
+    dump_impl(fd, verbose, proto);
 
-    fclose(out);
     return NO_ERROR;
 }
 
 /**
  * Write debugging data about statsd in text or proto format.
  */
-void StatsService::dump_impl(FILE* out, bool verbose, bool proto) {
+void StatsService::dump_impl(int out, bool verbose, bool proto) {
     if (proto) {
         vector<uint8_t> data;
         StatsdStats::getInstance().dumpStats(&data, false); // does not reset statsdStats.
         for (size_t i = 0; i < data.size(); i ++) {
-            fprintf(out, "%c", data[i]);
+            dprintf(out, "%c", data[i]);
         }
     } else {
         StatsdStats::getInstance().dumpStats(out);
@@ -290,7 +264,8 @@
 /**
  * Implementation of the adb shell cmd stats command.
  */
-status_t StatsService::command(FILE* in, FILE* out, FILE* err, Vector<String8>& args) {
+status_t StatsService::command(int in, int out, int err, Vector<String8>& args,
+                               sp<IResultReceiver> resultReceiver) {
     uid_t uid = IPCThreadState::self()->getCallingUid();
     if (uid != AID_ROOT && uid != AID_SHELL) {
         return PERMISSION_DENIED;
@@ -342,97 +317,106 @@
         if (!args[0].compare(String8("print-logs"))) {
             return cmd_print_logs(out, args);
         }
+        if (!args[0].compare(String8("data-subscribe"))) {
+            if (mShellSubscriber == nullptr) {
+                mShellSubscriber = new ShellSubscriber(mUidMap);
+            }
+            mShellSubscriber->startNewSubscription(in, out, resultReceiver);
+            return NO_ERROR;
+        }
     }
 
     print_cmd_help(out);
     return NO_ERROR;
 }
 
-void StatsService::print_cmd_help(FILE* out) {
-    fprintf(out,
+void StatsService::print_cmd_help(int out) {
+    dprintf(out,
             "usage: adb shell cmd stats print-stats-log [tag_required] "
             "[timestamp_nsec_optional]\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats meminfo\n");
-    fprintf(out, "\n");
-    fprintf(out, "  Prints the malloc debug information. You need to run the following first: \n");
-    fprintf(out, "   # adb shell stop\n");
-    fprintf(out, "   # adb shell setprop libc.debug.malloc.program statsd \n");
-    fprintf(out, "   # adb shell setprop libc.debug.malloc.options backtrace \n");
-    fprintf(out, "   # adb shell start\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats print-uid-map [PKG]\n");
-    fprintf(out, "\n");
-    fprintf(out, "  Prints the UID, app name, version mapping.\n");
-    fprintf(out, "  PKG           Optional package name to print the uids of the package\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats pull-source [int] \n");
-    fprintf(out, "\n");
-    fprintf(out, "  Prints the output of a pulled metrics source (int indicates source)\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats write-to-disk \n");
-    fprintf(out, "\n");
-    fprintf(out, "  Flushes all data on memory to disk.\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats log-app-breadcrumb [UID] LABEL STATE\n");
-    fprintf(out, "  Writes an AppBreadcrumbReported event to the statslog buffer.\n");
-    fprintf(out, "  UID           The uid to use. It is only possible to pass a UID\n");
-    fprintf(out, "                parameter on eng builds. If UID is omitted the calling\n");
-    fprintf(out, "                uid is used.\n");
-    fprintf(out, "  LABEL         Integer in [0, 15], as per atoms.proto.\n");
-    fprintf(out, "  STATE         Integer in [0, 3], as per atoms.proto.\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats config remove [UID] [NAME]\n");
-    fprintf(out, "usage: adb shell cmd stats config update [UID] NAME\n");
-    fprintf(out, "\n");
-    fprintf(out, "  Adds, updates or removes a configuration. The proto should be in\n");
-    fprintf(out, "  wire-encoded protobuf format and passed via stdin. If no UID and name is\n");
-    fprintf(out, "  provided, then all configs will be removed from memory and disk.\n");
-    fprintf(out, "\n");
-    fprintf(out, "  UID           The uid to use. It is only possible to pass the UID\n");
-    fprintf(out, "                parameter on eng builds. If UID is omitted the calling\n");
-    fprintf(out, "                uid is used.\n");
-    fprintf(out, "  NAME          The per-uid name to use\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n              *Note: If both UID and NAME are omitted then all configs will\n");
-    fprintf(out, "\n                     be removed from memory and disk!\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats dump-report [UID] NAME [--include_current_bucket] [--proto]\n");
-    fprintf(out, "  Dump all metric data for a configuration.\n");
-    fprintf(out, "  UID           The uid of the configuration. It is only possible to pass\n");
-    fprintf(out, "                the UID parameter on eng builds. If UID is omitted the\n");
-    fprintf(out, "                calling uid is used.\n");
-    fprintf(out, "  NAME          The name of the configuration\n");
-    fprintf(out, "  --proto       Print proto binary.\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats send-broadcast [UID] NAME\n");
-    fprintf(out, "  Send a broadcast that triggers the subscriber to fetch metrics.\n");
-    fprintf(out, "  UID           The uid of the configuration. It is only possible to pass\n");
-    fprintf(out, "                the UID parameter on eng builds. If UID is omitted the\n");
-    fprintf(out, "                calling uid is used.\n");
-    fprintf(out, "  NAME          The name of the configuration\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats print-stats\n");
-    fprintf(out, "  Prints some basic stats.\n");
-    fprintf(out, "  --proto       Print proto binary instead of string format.\n");
-    fprintf(out, "\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats clear-puller-cache\n");
-    fprintf(out, "  Clear cached puller data.\n");
-    fprintf(out, "\n");
-    fprintf(out, "usage: adb shell cmd stats print-logs\n");
-    fprintf(out, "      Only works on eng build\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats meminfo\n");
+    dprintf(out, "\n");
+    dprintf(out, "  Prints the malloc debug information. You need to run the following first: \n");
+    dprintf(out, "   # adb shell stop\n");
+    dprintf(out, "   # adb shell setprop libc.debug.malloc.program statsd \n");
+    dprintf(out, "   # adb shell setprop libc.debug.malloc.options backtrace \n");
+    dprintf(out, "   # adb shell start\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats print-uid-map [PKG]\n");
+    dprintf(out, "\n");
+    dprintf(out, "  Prints the UID, app name, version mapping.\n");
+    dprintf(out, "  PKG           Optional package name to print the uids of the package\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats pull-source [int] \n");
+    dprintf(out, "\n");
+    dprintf(out, "  Prints the output of a pulled metrics source (int indicates source)\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats write-to-disk \n");
+    dprintf(out, "\n");
+    dprintf(out, "  Flushes all data on memory to disk.\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats log-app-breadcrumb [UID] LABEL STATE\n");
+    dprintf(out, "  Writes an AppBreadcrumbReported event to the statslog buffer.\n");
+    dprintf(out, "  UID           The uid to use. It is only possible to pass a UID\n");
+    dprintf(out, "                parameter on eng builds. If UID is omitted the calling\n");
+    dprintf(out, "                uid is used.\n");
+    dprintf(out, "  LABEL         Integer in [0, 15], as per atoms.proto.\n");
+    dprintf(out, "  STATE         Integer in [0, 3], as per atoms.proto.\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats config remove [UID] [NAME]\n");
+    dprintf(out, "usage: adb shell cmd stats config update [UID] NAME\n");
+    dprintf(out, "\n");
+    dprintf(out, "  Adds, updates or removes a configuration. The proto should be in\n");
+    dprintf(out, "  wire-encoded protobuf format and passed via stdin. If no UID and name is\n");
+    dprintf(out, "  provided, then all configs will be removed from memory and disk.\n");
+    dprintf(out, "\n");
+    dprintf(out, "  UID           The uid to use. It is only possible to pass the UID\n");
+    dprintf(out, "                parameter on eng builds. If UID is omitted the calling\n");
+    dprintf(out, "                uid is used.\n");
+    dprintf(out, "  NAME          The per-uid name to use\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n              *Note: If both UID and NAME are omitted then all configs will\n");
+    dprintf(out, "\n                     be removed from memory and disk!\n");
+    dprintf(out, "\n");
+    dprintf(out,
+            "usage: adb shell cmd stats dump-report [UID] NAME [--include_current_bucket] "
+            "[--proto]\n");
+    dprintf(out, "  Dump all metric data for a configuration.\n");
+    dprintf(out, "  UID           The uid of the configuration. It is only possible to pass\n");
+    dprintf(out, "                the UID parameter on eng builds. If UID is omitted the\n");
+    dprintf(out, "                calling uid is used.\n");
+    dprintf(out, "  NAME          The name of the configuration\n");
+    dprintf(out, "  --proto       Print proto binary.\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats send-broadcast [UID] NAME\n");
+    dprintf(out, "  Send a broadcast that triggers the subscriber to fetch metrics.\n");
+    dprintf(out, "  UID           The uid of the configuration. It is only possible to pass\n");
+    dprintf(out, "                the UID parameter on eng builds. If UID is omitted the\n");
+    dprintf(out, "                calling uid is used.\n");
+    dprintf(out, "  NAME          The name of the configuration\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats print-stats\n");
+    dprintf(out, "  Prints some basic stats.\n");
+    dprintf(out, "  --proto       Print proto binary instead of string format.\n");
+    dprintf(out, "\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats clear-puller-cache\n");
+    dprintf(out, "  Clear cached puller data.\n");
+    dprintf(out, "\n");
+    dprintf(out, "usage: adb shell cmd stats print-logs\n");
+    dprintf(out, "      Only works on eng build\n");
 }
 
-status_t StatsService::cmd_trigger_broadcast(FILE* out, Vector<String8>& args) {
+status_t StatsService::cmd_trigger_broadcast(int out, Vector<String8>& args) {
     string name;
     bool good = false;
     int uid;
@@ -456,9 +440,9 @@
                 }
             }
         } else {
-            fprintf(out,
+            dprintf(out,
                     "The metrics can only be dumped for other UIDs on eng or userdebug "
-                            "builds.\n");
+                    "builds.\n");
         }
     }
     if (!good) {
@@ -481,7 +465,7 @@
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_config(FILE* in, FILE* out, FILE* err, Vector<String8>& args) {
+status_t StatsService::cmd_config(int in, int out, int err, Vector<String8>& args) {
     const int argCount = args.size();
     if (argCount >= 2) {
         if (args[1] == "update" || args[1] == "remove") {
@@ -508,7 +492,7 @@
                         }
                     }
                 } else {
-                    fprintf(err,
+                    dprintf(err,
                             "The config can only be set for other UIDs on eng or userdebug "
                             "builds.\n");
                 }
@@ -526,21 +510,21 @@
                 char* endp;
                 int64_t configID = strtoll(name.c_str(), &endp, 10);
                 if (endp == name.c_str() || *endp != '\0') {
-                    fprintf(err, "Error parsing config ID.\n");
+                    dprintf(err, "Error parsing config ID.\n");
                     return UNKNOWN_ERROR;
                 }
 
                 // Read stream into buffer.
                 string buffer;
-                if (!android::base::ReadFdToString(fileno(in), &buffer)) {
-                    fprintf(err, "Error reading stream for StatsConfig.\n");
+                if (!android::base::ReadFdToString(in, &buffer)) {
+                    dprintf(err, "Error reading stream for StatsConfig.\n");
                     return UNKNOWN_ERROR;
                 }
 
                 // Parse buffer.
                 StatsdConfig config;
                 if (!config.ParseFromString(buffer)) {
-                    fprintf(err, "Error parsing proto stream for StatsConfig.\n");
+                    dprintf(err, "Error parsing proto stream for StatsConfig.\n");
                     return UNKNOWN_ERROR;
                 }
 
@@ -562,7 +546,7 @@
     return UNKNOWN_ERROR;
 }
 
-status_t StatsService::cmd_dump_report(FILE* out, FILE* err, const Vector<String8>& args) {
+status_t StatsService::cmd_dump_report(int out, int err, const Vector<String8>& args) {
     if (mProcessor != nullptr) {
         int argCount = args.size();
         bool good = false;
@@ -597,7 +581,7 @@
                     }
                 }
             } else {
-                fprintf(out,
+                dprintf(out,
                         "The metrics can only be dumped for other UIDs on eng or userdebug "
                         "builds.\n");
             }
@@ -608,11 +592,11 @@
                                      includeCurrentBucket, ADB_DUMP, &data);
             if (proto) {
                 for (size_t i = 0; i < data.size(); i ++) {
-                    fprintf(out, "%c", data[i]);
+                    dprintf(out, "%c", data[i]);
                 }
             } else {
-                fprintf(out, "Dump report for Config [%d,%s]\n", uid, name.c_str());
-                fprintf(out, "See the StatsLogReport in logcat...\n");
+                dprintf(out, "Dump report for Config [%d,%s]\n", uid, name.c_str());
+                dprintf(out, "See the StatsLogReport in logcat...\n");
             }
             return android::OK;
         } else {
@@ -621,12 +605,12 @@
             return UNKNOWN_ERROR;
         }
     } else {
-        fprintf(out, "Log processor does not exist...\n");
+        dprintf(out, "Log processor does not exist...\n");
         return UNKNOWN_ERROR;
     }
 }
 
-status_t StatsService::cmd_print_stats(FILE* out, const Vector<String8>& args) {
+status_t StatsService::cmd_print_stats(int out, const Vector<String8>& args) {
     int argCount = args.size();
     bool proto = false;
     if (!std::strcmp("--proto", args[argCount-1].c_str())) {
@@ -638,13 +622,13 @@
         vector<uint8_t> data;
         statsdStats.dumpStats(&data, false); // does not reset statsdStats.
         for (size_t i = 0; i < data.size(); i ++) {
-            fprintf(out, "%c", data[i]);
+            dprintf(out, "%c", data[i]);
         }
 
     } else {
         vector<ConfigKey> configs = mConfigManager->GetAllConfigKeys();
         for (const ConfigKey& key : configs) {
-            fprintf(out, "Config %s uses %zu bytes\n", key.ToString().c_str(),
+            dprintf(out, "Config %s uses %zu bytes\n", key.ToString().c_str(),
                     mProcessor->GetMetricsSize(key));
         }
         statsdStats.dumpStats(out);
@@ -652,29 +636,29 @@
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_print_uid_map(FILE* out, const Vector<String8>& args) {
+status_t StatsService::cmd_print_uid_map(int out, const Vector<String8>& args) {
     if (args.size() > 1) {
         string pkg;
         pkg.assign(args[1].c_str(), args[1].size());
         auto uids = mUidMap->getAppUid(pkg);
-        fprintf(out, "%s -> [ ", pkg.c_str());
+        dprintf(out, "%s -> [ ", pkg.c_str());
         for (const auto& uid : uids) {
-            fprintf(out, "%d ", uid);
+            dprintf(out, "%d ", uid);
         }
-        fprintf(out, "]\n");
+        dprintf(out, "]\n");
     } else {
         mUidMap->printUidMap(out);
     }
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_write_data_to_disk(FILE* out) {
-    fprintf(out, "Writing data to disk\n");
+status_t StatsService::cmd_write_data_to_disk(int out) {
+    dprintf(out, "Writing data to disk\n");
     mProcessor->WriteDataToDisk(ADB_DUMP);
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_log_app_breadcrumb(FILE* out, const Vector<String8>& args) {
+status_t StatsService::cmd_log_app_breadcrumb(int out, const Vector<String8>& args) {
     bool good = false;
     int32_t uid;
     int32_t label;
@@ -695,13 +679,13 @@
             state = atoi(args[3].c_str());
             good = true;
         } else {
-            fprintf(out,
+            dprintf(out,
                     "Selecting a UID for writing AppBreadcrumb can only be done for other UIDs "
-                            "on eng or userdebug builds.\n");
+                    "on eng or userdebug builds.\n");
         }
     }
     if (good) {
-        fprintf(out, "Logging AppBreadcrumbReported(%d, %d, %d) to statslog.\n", uid, label, state);
+        dprintf(out, "Logging AppBreadcrumbReported(%d, %d, %d) to statslog.\n", uid, label, state);
         android::util::stats_write(android::util::APP_BREADCRUMB_REPORTED, uid, label, state);
     } else {
         print_cmd_help(out);
@@ -710,46 +694,46 @@
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_print_pulled_metrics(FILE* out, const Vector<String8>& args) {
+status_t StatsService::cmd_print_pulled_metrics(int out, const Vector<String8>& args) {
     int s = atoi(args[1].c_str());
     vector<shared_ptr<LogEvent> > stats;
     if (mPullerManager->Pull(s, getElapsedRealtimeNs(), &stats)) {
         for (const auto& it : stats) {
-            fprintf(out, "Pull from %d: %s\n", s, it->ToString().c_str());
+            dprintf(out, "Pull from %d: %s\n", s, it->ToString().c_str());
         }
-        fprintf(out, "Pull from %d: Received %zu elements\n", s, stats.size());
+        dprintf(out, "Pull from %d: Received %zu elements\n", s, stats.size());
         return NO_ERROR;
     }
     return UNKNOWN_ERROR;
 }
 
-status_t StatsService::cmd_remove_all_configs(FILE* out) {
-    fprintf(out, "Removing all configs...\n");
+status_t StatsService::cmd_remove_all_configs(int out) {
+    dprintf(out, "Removing all configs...\n");
     VLOG("StatsService::cmd_remove_all_configs was called");
     mConfigManager->RemoveAllConfigs();
     StorageManager::deleteAllFiles(STATS_SERVICE_DIR);
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_dump_memory_info(FILE* out) {
-    fprintf(out, "meminfo not available.\n");
+status_t StatsService::cmd_dump_memory_info(int out) {
+    dprintf(out, "meminfo not available.\n");
     return NO_ERROR;
 }
 
-status_t StatsService::cmd_clear_puller_cache(FILE* out) {
+status_t StatsService::cmd_clear_puller_cache(int out) {
     IPCThreadState* ipc = IPCThreadState::self();
     VLOG("StatsService::cmd_clear_puller_cache with Pid %i, Uid %i",
             ipc->getCallingPid(), ipc->getCallingUid());
     if (checkCallingPermission(String16(kPermissionDump))) {
         int cleared = mPullerManager->ForceClearPullerCache();
-        fprintf(out, "Puller removed %d cached data!\n", cleared);
+        dprintf(out, "Puller removed %d cached data!\n", cleared);
         return NO_ERROR;
     } else {
         return PERMISSION_DENIED;
     }
 }
 
-status_t StatsService::cmd_print_logs(FILE* out, const Vector<String8>& args) {
+status_t StatsService::cmd_print_logs(int out, const Vector<String8>& args) {
     IPCThreadState* ipc = IPCThreadState::self();
     VLOG("StatsService::cmd_print_logs with Pid %i, Uid %i", ipc->getCallingPid(),
          ipc->getCallingUid());
@@ -885,6 +869,9 @@
 
 void StatsService::OnLogEvent(LogEvent* event) {
     mProcessor->OnLogEvent(event);
+    if (mShellSubscriber != nullptr) {
+        mShellSubscriber->onLogEvent(*event);
+    }
 }
 
 Status StatsService::getData(int64_t key, const String16& packageName, vector<uint8_t>* output) {