Combine startPull and sendHeartbeat threads

This ensures that only one helper thread is created per subscription.
Previously, there could be up to two.

+ fixes thread sleep duration if the pulled atoms have different pull
frequencies
+ rename attemptWriteToSocketLocked to attemptWriteToPipeLocked

Test: atest statsd_test
Test: atest CtsStatsdHostTestCases:ShellSubscriberTest
Test: manual testing on Android Studio
Bug: 156678125
Change-Id: I7074bbba5981a591a30e8b70a1ad1d83eadfcc30
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index 361b161..fd883c2 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -41,13 +41,8 @@
 
     {
         std::unique_lock<std::mutex> lock(mMutex);
-        if (myToken != mToken) {
-            // Some other subscription has already come in. Stop.
-            return;
-        }
         mSubscriptionInfo = mySubscriptionInfo;
-
-        spawnHelperThreadsLocked(mySubscriptionInfo, myToken);
+        spawnHelperThread(myToken);
         waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);
 
         if (mSubscriptionInfo == mySubscriptionInfo) {
@@ -57,14 +52,9 @@
     }
 }
 
-void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) {
-    if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) {
-        std::thread puller([this, myToken] { startPull(myToken); });
-        puller.detach();
-    }
-
-    std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); });
-    heartbeatSender.detach();
+void ShellSubscriber::spawnHelperThread(int myToken) {
+    std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); });
+    t.detach();
 }
 
 void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
@@ -114,13 +104,7 @@
         subscriptionInfo->mPushedMatchers.push_back(pushed);
     }
 
-    int minInterval = -1;
     for (const auto& pulled : config.pulled()) {
-        // All intervals need to be multiples of the min interval.
-        if (minInterval < 0 || pulled.freq_millis() < minInterval) {
-            minInterval = pulled.freq_millis();
-        }
-
         vector<string> packages;
         vector<int32_t> uids;
         for (const string& pkg : pulled.packages()) {
@@ -136,18 +120,18 @@
                                                    uids);
         VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
     }
-    subscriptionInfo->mPullIntervalMin = minInterval;
 
     return true;
 }
 
-void ShellSubscriber::startPull(int myToken) {
-    VLOG("ShellSubscriber: pull thread %d starting", myToken);
+void ShellSubscriber::pullAndSendHeartbeats(int myToken) {
+    VLOG("ShellSubscriber: helper thread %d starting", myToken);
     while (true) {
+        int64_t sleepTimeMs = INT_MAX;
         {
             std::lock_guard<std::mutex> lock(mMutex);
             if (!mSubscriptionInfo || mToken != myToken) {
-                VLOG("ShellSubscriber: pulling thread %d done!", myToken);
+                VLOG("ShellSubscriber: helper thread %d done!", myToken);
                 return;
             }
 
@@ -168,11 +152,27 @@
 
                 pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
             }
+
+            // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received
+            // data from statsd. When it receives the data size of 0, perfd will not expect any
+            // atoms and recheck whether the subscription should end.
+            if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) {
+                attemptWriteToPipeLocked(/*dataSize=*/0);
+            }
+
+            // Determine how long to sleep before doing more work.
+            for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
+                int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval;
+                int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative
+                if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull;
+            }
+            int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis;
+            if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat;
         }
 
-        VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken,
-             mSubscriptionInfo->mPullIntervalMin);
-        std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
+        VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken,
+             (long long)sleepTimeMs);
+        std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs));
     }
 }
 
@@ -200,7 +200,7 @@
         }
     }
 
-    if (count > 0) attemptWriteToSocketLocked(mProto.size());
+    if (count > 0) attemptWriteToPipeLocked(mProto.size());
 }
 
 void ShellSubscriber::onLogEvent(const LogEvent& event) {
@@ -214,26 +214,24 @@
                                               util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
             event.ToProto(mProto);
             mProto.end(atomToken);
-            attemptWriteToSocketLocked(mProto.size());
+            attemptWriteToPipeLocked(mProto.size());
         }
     }
 }
 
-// Tries to write the atom encoded in mProto to the socket. If the write fails
+// Tries to write the atom encoded in mProto to the pipe. If the write fails
 // because the read end of the pipe has closed, signals to other threads that
 // the subscription should end.
-void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) {
-    // First write the payload size.
+void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) {
+    // First, write the payload size.
     if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
         mSubscriptionInfo->mClientAlive = false;
         mSubscriptionShouldEnd.notify_one();
         return;
     }
 
-    if (dataSize == 0) return;
-
-    // Then, write the payload.
-    if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
+    // Then, write the payload if this is not just a heartbeat.
+    if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) {
         mSubscriptionInfo->mClientAlive = false;
         mSubscriptionShouldEnd.notify_one();
         return;
@@ -242,28 +240,6 @@
     mLastWriteMs = getElapsedRealtimeMillis();
 }
 
-// Send a heartbeat, consisting solely of a data size of 0, if perfd has not
-// recently received any writes from statsd. When it receives the data size of
-// 0, perfd will not expect any data and recheck whether the shell command is
-// still running.
-void ShellSubscriber::sendHeartbeats(int myToken) {
-    while (true) {
-        {
-            std::lock_guard<std::mutex> lock(mMutex);
-            if (!mSubscriptionInfo || myToken != mToken) {
-                VLOG("ShellSubscriber: heartbeat thread %d done!", myToken);
-                return;
-            }
-
-            if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) {
-                VLOG("ShellSubscriber: sending a heartbeat to perfd");
-                attemptWriteToSocketLocked(/*dataSize=*/0);
-            }
-        }
-        std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats));
-    }
-}
-
 }  // namespace statsd
 }  // namespace os
 }  // namespace android
diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h
index 26c8a2a..4c05fa7 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.h
+++ b/cmds/statsd/src/shell/ShellSubscriber.h
@@ -92,7 +92,6 @@
         int mOutputFd;
         std::vector<SimpleAtomMatcher> mPushedMatchers;
         std::vector<PullInfo> mPulledInfo;
-        int mPullIntervalMin;
         bool mClientAlive;
     };
 
@@ -100,27 +99,25 @@
 
     bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);
 
-    void spawnHelperThreadsLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken);
+    void spawnHelperThread(int myToken);
 
     void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo,
                                         int myToken,
                                         std::unique_lock<std::mutex>& lock,
                                         int timeoutSec);
 
-    void startPull(int myToken);
+    // Helper thread that pulls atoms at a regular frequency and sends
+    // heartbeats to perfd if statsd hasn't recently sent any data. Statsd must
+    // send heartbeats for perfd to escape a blocking read call and recheck if
+    // the user has terminated the subscription.
+    void pullAndSendHeartbeats(int myToken);
 
     void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
                                 const SimpleAtomMatcher& matcher);
 
     void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo);
 
-    void attemptWriteToSocketLocked(size_t dataSize);
-
-    // Send ocassional heartbeats for two reasons: (a) for statsd to detect when
-    // the read end of the pipe has closed and (b) for perfd to escape a
-    // blocking read call and recheck if the user has terminated the
-    // subscription.
-    void sendHeartbeats(int myToken);
+    void attemptWriteToPipeLocked(size_t dataSize);
 
     sp<UidMap> mUidMap;