Implement new perfd<->statsd ShellSubscriber comm.

Because we no longer linkToDeath against a binder object to detect if
the cmd process has died, we detect deaths by checking if writes fail.
ag/10476582 proves that writes fail if the cmd process dies.

Test: m statsd
Test: bit statsd_test:ShellSubscriberTest.testPushedSubscription
Test: bit statsd_test:ShellSubscriberTest.testPulledSubscription
Bug: 150619687
Change-Id: I44a777ffff11e5b9298912b2906063c65e9009eb
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index a861a3b..6fa1654 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -18,6 +18,7 @@
 
 #include "ShellSubscriber.h"
 
+#include <android-base/file.h>
 #include "matchers/matcher_util.h"
 #include "stats_log_util.h"
 
@@ -30,154 +31,129 @@
 const static int FIELD_ID_ATOM = 1;
 
 void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
-    VLOG("start new shell subscription");
-    int64_t subscriberId = getElapsedRealtimeNs();
+    int myToken = claimToken();
+    mSubscriptionShouldEnd.notify_one();
 
-    {
-        std::lock_guard<std::mutex> lock(mMutex);
-        if (mSubscriberId> 0) {
-            VLOG("Only one shell subscriber is allowed.");
-            return;
-        }
-        mSubscriberId = subscriberId;
-        mInput = in;
-        mOutput = out;
+    shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
+    if (!readConfig(mySubscriptionInfo)) {
+        return;
     }
 
-    bool success = readConfig();
-    if (!success) {
-        std::lock_guard<std::mutex> lock(mMutex);
-        cleanUpLocked();
+    // critical-section
+    std::unique_lock<std::mutex> lock(mMutex);
+    if (myToken < mToken) {
+        // Some other subscription has already come in. Stop.
+        return;
+    }
+    mSubscriptionInfo = mySubscriptionInfo;
+
+    if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) {
+        // This thread terminates after it detects that mToken has changed.
+        std::thread puller([this, myToken] { startPull(myToken); });
+        puller.detach();
     }
 
-    VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec);
-    std::unique_lock<std::mutex> lk(mMutex);
-
-    // Note that the following is blocking, and it's intended as we cannot return until the shell
-    // cmd exits or we time out.
+    // Block until subscription has ended.
     if (timeoutSec > 0) {
-        mShellDied.wait_for(lk, timeoutSec * 1s,
-                            [this, subscriberId] { return mSubscriberId != subscriberId; });
+        mSubscriptionShouldEnd.wait_for(
+                lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] {
+                    return mToken != myToken || !mySubscriptionInfo->mClientAlive;
+                });
     } else {
-        mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; });
+        mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] {
+            return mToken != myToken || !mySubscriptionInfo->mClientAlive;
+        });
+    }
+
+    if (mSubscriptionInfo == mySubscriptionInfo) {
+        mSubscriptionInfo = nullptr;
     }
 }
 
+// Atomically claim the next token. Token numbers denote subscriber ordering.
+int ShellSubscriber::claimToken() {
+    std::unique_lock<std::mutex> lock(mMutex);
+    int myToken = ++mToken;
+    return myToken;
+}
 
-// Read configs until EOF is reached. There may be multiple configs in the input
-// -- each new config should replace the previous one.
-//
-// Returns a boolean indicating whether the input was read successfully.
-bool ShellSubscriber::readConfig() {
-    if (mInput < 0) {
+// Read and parse single config. There should only one config per input.
+bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) {
+    // Read the size of the config.
+    size_t bufferSize;
+    if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) {
         return false;
     }
 
-    while (true) {
-        // Read the size of the config.
-        size_t bufferSize = 0;
-        ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize));
-        if (bytesRead == 0) {
-            VLOG("We have reached the end of the input.");
-            return true;
-        } else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) {
-            ALOGE("Error reading config size");
-            return false;
-        }
-
-        // Read and parse the config.
-        vector<uint8_t> buffer(bufferSize);
-        bytesRead = read(mInput, buffer.data(), bufferSize);
-        if (bytesRead > 0 && (size_t)bytesRead == bufferSize) {
-            ShellSubscription config;
-            if (config.ParseFromArray(buffer.data(), bufferSize)) {
-                updateConfig(config);
-            } else {
-                ALOGE("Error parsing the config");
-                return false;
-            }
-        } else {
-            VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize, 
-                 bytesRead);
-            return false;
-        }
+    // Read the config.
+    vector<uint8_t> buffer(bufferSize);
+    if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) {
+        return false;
     }
-}
 
-void ShellSubscriber::updateConfig(const ShellSubscription& config) {
-    mPushedMatchers.clear();
-    mPulledInfo.clear();
+    // Parse the config.
+    ShellSubscription config;
+    if (!config.ParseFromArray(buffer.data(), bufferSize)) {
+        return false;
+    }
 
+    // Update SubscriptionInfo with state from config
     for (const auto& pushed : config.pushed()) {
-        mPushedMatchers.push_back(pushed);
-        VLOG("adding matcher for pushed atom %d", pushed.atom_id());
+        subscriptionInfo->mPushedMatchers.push_back(pushed);
     }
 
-    int64_t token = getElapsedRealtimeNs();
-    mPullToken = token;
-
-    int64_t minInterval = -1;
+    int minInterval = -1;
     for (const auto& pulled : config.pulled()) {
         // All intervals need to be multiples of the min interval.
         if (minInterval < 0 || pulled.freq_millis() < minInterval) {
             minInterval = pulled.freq_millis();
         }
-
-        mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
-        VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
+        subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
     }
+    subscriptionInfo->mPullIntervalMin = minInterval;
 
-    if (mPulledInfo.size() > 0 && minInterval > 0) {
-        // This thread is guaranteed to terminate after it detects the token is
-        // different.
-        std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
-        puller.detach();
-    }
+    return true;
 }
 
-void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
+void ShellSubscriber::startPull(int64_t myToken) {
     while (true) {
-        int64_t nowMillis = getElapsedRealtimeMillis();
-        {
-            std::lock_guard<std::mutex> lock(mMutex);
-            // If the token has changed, the config has changed, so this
-            // puller can now stop.
-            if (mPulledInfo.size() == 0 || mPullToken != token) {
-                VLOG("Pulling thread %lld done!", (long long)token);
-                return;
-            }
-            for (auto& pullInfo : mPulledInfo) {
-                if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
-                    VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
+        std::lock_guard<std::mutex> lock(mMutex);
+        if (!mSubscriptionInfo || mToken != myToken) {
+            VLOG("Pulling thread %lld done!", (long long)myToken);
+            return;
+        }
 
-                    vector<std::shared_ptr<LogEvent>> data;
-                    mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
-                    VLOG("pulled %zu atoms", data.size());
-                    if (data.size() > 0) {
-                        writeToOutputLocked(data, pullInfo.mPullerMatcher);
-                    }
-                    pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
+        int64_t nowMillis = getElapsedRealtimeMillis();
+        for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) {
+            if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
+                vector<std::shared_ptr<LogEvent>> data;
+                mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
+                VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
+
+                // TODO(b/150969574): Don't write to a pipe while holding a lock.
+                if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) {
+                    mSubscriptionInfo->mClientAlive = false;
+                    mSubscriptionShouldEnd.notify_one();
+                    return;
                 }
+                pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
             }
         }
-        VLOG("Pulling thread %lld sleep....", (long long)token);
-        std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
+
+        VLOG("Pulling thread %lld sleep....", (long long)myToken);
+        std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
     }
 }
 
-// Must be called with the lock acquired, so that mProto isn't being written to
-// at the same time by multiple threads.
-void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
-                                          const SimpleAtomMatcher& matcher) {
-    if (mOutput < 0) {
-        return;
-    }
-    int count = 0;
+// \return boolean indicating if writes were successful (will return false if
+// client dies)
+bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
+                                             const SimpleAtomMatcher& matcher) {
     mProto.clear();
+    int count = 0;
     for (const auto& event : data) {
         VLOG("%s", event->ToString().c_str());
         if (matchesSimple(*mUidMap, matcher, *event)) {
-            VLOG("matched");
             count++;
             uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
                                               util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
@@ -189,24 +165,29 @@
     if (count > 0) {
         // First write the payload size.
         size_t bufferSize = mProto.size();
-        write(mOutput, &bufferSize, sizeof(bufferSize));
+        if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
+                                       sizeof(bufferSize))) {
+            return false;
+        }
 
         VLOG("%d atoms, proto size: %zu", count, bufferSize);
         // Then write the payload.
-        mProto.flush(mOutput);
+        if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
+            return false;
+        }
     }
+
+    return true;
 }
 
 void ShellSubscriber::onLogEvent(const LogEvent& event) {
-    // Acquire a lock to prevent corruption from multiple threads writing to
-    // mProto.
     std::lock_guard<std::mutex> lock(mMutex);
-    if (mOutput < 0) {
+    if (!mSubscriptionInfo) {
         return;
     }
 
     mProto.clear();
-    for (const auto& matcher : mPushedMatchers) {
+    for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
         if (matchesSimple(*mUidMap, matcher, event)) {
             VLOG("%s", event.ToString().c_str());
             uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
@@ -216,26 +197,23 @@
 
             // First write the payload size.
             size_t bufferSize = mProto.size();
-            write(mOutput, &bufferSize, sizeof(bufferSize));
+            if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
+                                           sizeof(bufferSize))) {
+                mSubscriptionInfo->mClientAlive = false;
+                mSubscriptionShouldEnd.notify_one();
+                return;
+            }
 
             // Then write the payload.
-            mProto.flush(mOutput);
+            if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
+                mSubscriptionInfo->mClientAlive = false;
+                mSubscriptionShouldEnd.notify_one();
+                return;
+            }
         }
     }
 }
 
-void ShellSubscriber::cleanUpLocked() {
-    // The file descriptors will be closed by binder.
-    mInput = -1;
-    mOutput = -1;
-    mSubscriberId = 0;
-    mPushedMatchers.clear();
-    mPulledInfo.clear();
-    // Setting mPullToken == 0 tells pull thread that its work is done.
-    mPullToken = 0;
-    VLOG("done clean up");
-}
-
 }  // namespace statsd
 }  // namespace os
 }  // namespace android