Move statsd (and tests) to libbinder_ndk

Major changes include:
    - Removing unused permission checks within StatsService. These
      include ENFORCE_DUMP_AND_USAGE_STATS, checkDumpAndUsageStats,
      kOpUsage, and kPermissionUsage.
    - Converting from sp to shared_ptr
    - Using libbinder_ndk functions instead of libbinder functions
      (e.g. for installing death recipients, getting calling uids, etc.)
        - New death recipients were added in StatsService,
          ConfigManager, and SubscriberReporter.
    - Using a unique token (timestamp) to identify shell subscribers
      instead of IResultReceiver because IResultReceiver is not exposed by
      libbinder_ndk. Currently, statsd cannot detect if perfd dies; we
      will fix that later.

Bug: 145232107
Bug: 148609603
Test: m statsd
Test: m statsd_test
Test: bit stastd_test:*
Test: atest GtsStatsdHostTestCases
Change-Id: Ia1fda7280c22320bc4ebc8371acaadbe8eabcbd2
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index d6a0433..a861a3b 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -29,49 +29,88 @@
 
 const static int FIELD_ID_ATOM = 1;
 
-void ShellSubscriber::startNewSubscription(int in, int out, sp<IResultReceiver> resultReceiver,
-                                           int timeoutSec) {
+void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
     VLOG("start new shell subscription");
+    int64_t subscriberId = getElapsedRealtimeNs();
+
     {
         std::lock_guard<std::mutex> lock(mMutex);
-        if (mResultReceiver != nullptr) {
+        if (mSubscriberId> 0) {
             VLOG("Only one shell subscriber is allowed.");
             return;
         }
+        mSubscriberId = subscriberId;
         mInput = in;
         mOutput = out;
-        mResultReceiver = resultReceiver;
-        IInterface::asBinder(mResultReceiver)->linkToDeath(this);
     }
 
-    // Note that the following is blocking, and it's intended as we cannot return until the shell
-    // cmd exits, otherwise all resources & FDs will be automatically closed.
+    bool success = readConfig();
+    if (!success) {
+        std::lock_guard<std::mutex> lock(mMutex);
+        cleanUpLocked();
+    }
 
-    // Read config forever until EOF is reached. Clients may send multiple configs -- each new
-    // config replace the previous one.
-    readConfig(in);
-    VLOG("timeout : %d", timeoutSec);
-
-    // Now we have read an EOF we now wait for the semaphore until the client exits.
-    VLOG("Now wait for client to exit");
+    VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec);
     std::unique_lock<std::mutex> lk(mMutex);
 
+    // Note that the following is blocking, and it's intended as we cannot return until the shell
+    // cmd exits or we time out.
     if (timeoutSec > 0) {
         mShellDied.wait_for(lk, timeoutSec * 1s,
-                            [this, resultReceiver] { return mResultReceiver != resultReceiver; });
+                            [this, subscriberId] { return mSubscriberId != subscriberId; });
     } else {
-        mShellDied.wait(lk, [this, resultReceiver] { return mResultReceiver != resultReceiver; });
+        mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; });
+    }
+}
+
+
+// Read configs until EOF is reached. There may be multiple configs in the input
+// -- each new config should replace the previous one.
+//
+// Returns a boolean indicating whether the input was read successfully.
+bool ShellSubscriber::readConfig() {
+    if (mInput < 0) {
+        return false;
+    }
+
+    while (true) {
+        // Read the size of the config.
+        size_t bufferSize = 0;
+        ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize));
+        if (bytesRead == 0) {
+            VLOG("We have reached the end of the input.");
+            return true;
+        } else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) {
+            ALOGE("Error reading config size");
+            return false;
+        }
+
+        // Read and parse the config.
+        vector<uint8_t> buffer(bufferSize);
+        bytesRead = read(mInput, buffer.data(), bufferSize);
+        if (bytesRead > 0 && (size_t)bytesRead == bufferSize) {
+            ShellSubscription config;
+            if (config.ParseFromArray(buffer.data(), bufferSize)) {
+                updateConfig(config);
+            } else {
+                ALOGE("Error parsing the config");
+                return false;
+            }
+        } else {
+            VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize, 
+                 bytesRead);
+            return false;
+        }
     }
 }
 
 void ShellSubscriber::updateConfig(const ShellSubscription& config) {
-    std::lock_guard<std::mutex> lock(mMutex);
     mPushedMatchers.clear();
     mPulledInfo.clear();
 
     for (const auto& pushed : config.pushed()) {
         mPushedMatchers.push_back(pushed);
-        VLOG("adding matcher for atom %d", pushed.atom_id());
+        VLOG("adding matcher for pushed atom %d", pushed.atom_id());
     }
 
     int64_t token = getElapsedRealtimeNs();
@@ -89,46 +128,20 @@
     }
 
     if (mPulledInfo.size() > 0 && minInterval > 0) {
-        // This thread is guaranteed to terminate after it detects the token is different or
-        // cleaned up.
+        // This thread is guaranteed to terminate after it detects the token is
+        // different.
         std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
         puller.detach();
     }
 }
 
-void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
-                                          const SimpleAtomMatcher& matcher) {
-    if (mOutput == 0) return;
-    int count = 0;
-    mProto.clear();
-    for (const auto& event : data) {
-        VLOG("%s", event->ToString().c_str());
-        if (matchesSimple(*mUidMap, matcher, *event)) {
-            VLOG("matched");
-            count++;
-            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
-                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
-            event->ToProto(mProto);
-            mProto.end(atomToken);
-        }
-    }
-
-    if (count > 0) {
-        // First write the payload size.
-        size_t bufferSize = mProto.size();
-        write(mOutput, &bufferSize, sizeof(bufferSize));
-        VLOG("%d atoms, proto size: %zu", count, bufferSize);
-        // Then write the payload.
-        mProto.flush(mOutput);
-    }
-    mProto.clear();
-}
-
 void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
-    while (1) {
+    while (true) {
         int64_t nowMillis = getElapsedRealtimeMillis();
         {
             std::lock_guard<std::mutex> lock(mMutex);
+            // If the token has changed, the config has changed, so this
+            // puller can now stop.
             if (mPulledInfo.size() == 0 || mPullToken != token) {
                 VLOG("Pulling thread %lld done!", (long long)token);
                 return;
@@ -152,55 +165,47 @@
     }
 }
 
-void ShellSubscriber::readConfig(int in) {
-    if (in <= 0) {
+// Must be called with the lock acquired, so that mProto isn't being written to
+// at the same time by multiple threads.
+void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
+                                          const SimpleAtomMatcher& matcher) {
+    if (mOutput < 0) {
         return;
     }
-
-    while (1) {
-        size_t bufferSize = 0;
-        int result = 0;
-        if ((result = read(in, &bufferSize, sizeof(bufferSize))) == 0) {
-            VLOG("Done reading");
-            break;
-        } else if (result < 0 || result != sizeof(bufferSize)) {
-            ALOGE("Error reading config size");
-            break;
-        }
-
-        vector<uint8_t> buffer(bufferSize);
-        if ((result = read(in, buffer.data(), bufferSize)) > 0 && ((size_t)result) == bufferSize) {
-            ShellSubscription config;
-            if (config.ParseFromArray(buffer.data(), bufferSize)) {
-                updateConfig(config);
-            } else {
-                ALOGE("error parsing the config");
-                break;
-            }
-        } else {
-            VLOG("Error reading the config, returned: %d, expecting %zu", result, bufferSize);
-            break;
+    int count = 0;
+    mProto.clear();
+    for (const auto& event : data) {
+        VLOG("%s", event->ToString().c_str());
+        if (matchesSimple(*mUidMap, matcher, *event)) {
+            VLOG("matched");
+            count++;
+            uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
+                                              util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
+            event->ToProto(mProto);
+            mProto.end(atomToken);
         }
     }
-}
 
-void ShellSubscriber::cleanUpLocked() {
-    // The file descriptors will be closed by binder.
-    mInput = 0;
-    mOutput = 0;
-    mResultReceiver = nullptr;
-    mPushedMatchers.clear();
-    mPulledInfo.clear();
-    mPullToken = 0;
-    VLOG("done clean up");
+    if (count > 0) {
+        // First write the payload size.
+        size_t bufferSize = mProto.size();
+        write(mOutput, &bufferSize, sizeof(bufferSize));
+
+        VLOG("%d atoms, proto size: %zu", count, bufferSize);
+        // Then write the payload.
+        mProto.flush(mOutput);
+    }
 }
 
 void ShellSubscriber::onLogEvent(const LogEvent& event) {
+    // Acquire a lock to prevent corruption from multiple threads writing to
+    // mProto.
     std::lock_guard<std::mutex> lock(mMutex);
-
-    if (mOutput <= 0) {
+    if (mOutput < 0) {
         return;
     }
+
+    mProto.clear();
     for (const auto& matcher : mPushedMatchers) {
         if (matchesSimple(*mUidMap, matcher, event)) {
             VLOG("%s", event.ToString().c_str());
@@ -208,25 +213,27 @@
                                               util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
             event.ToProto(mProto);
             mProto.end(atomToken);
+
             // First write the payload size.
             size_t bufferSize = mProto.size();
             write(mOutput, &bufferSize, sizeof(bufferSize));
 
             // Then write the payload.
             mProto.flush(mOutput);
-            mProto.clear();
-            break;
         }
     }
 }
 
-void ShellSubscriber::binderDied(const wp<IBinder>& who) {
-    {
-        VLOG("Shell exits");
-        std::lock_guard<std::mutex> lock(mMutex);
-        cleanUpLocked();
-    }
-    mShellDied.notify_all();
+void ShellSubscriber::cleanUpLocked() {
+    // The file descriptors will be closed by binder.
+    mInput = -1;
+    mOutput = -1;
+    mSubscriberId = 0;
+    mPushedMatchers.clear();
+    mPulledInfo.clear();
+    // Setting mPullToken == 0 tells pull thread that its work is done.
+    mPullToken = 0;
+    VLOG("done clean up");
 }
 
 }  // namespace statsd