Fix ShellSubscriber concurrency issues
This CL creates a sendHeartbeat thread that ocassionally sends
heartbeats, consisting of a dataSize of 0, to perfd. perfd will discard
those heartbeats, recheck if the user has canceled the subscription, and
if not, wait for more data from statsd. Sending heartbeats solves two
big problems:
(1) Allows statsd to robustly check if writes to the socket fail because
the read end of the pipe has closed. Previously, if no atoms were pushed
or pulled, statsd never attempted to write to perfd, so statsd could
never detect the end of the subscription. However, now, writes are
regularly made regardless of if statsd receives data. Note that even if
atoms were pushed or pulled, there is no guarantee that they would have
matched the atom matchers sent in perfd's config.
(2) Allows perfd to escape a blocking read call and recheck whether the
user has canceled the subscription. If no data is sent to perfd, perfd
will block in this read call and the AndroidStudio UI will freeze up.
Heartbeats are only sent if statsd has not sent any data to perfd within
the last second, so we do not spam perfd with writes.
+ decomposes the startNewSubscription function
+ prevents startPull from holding the lock while sleeping
Test: atest stastd_test
Test: atest CtsStatsdHostTestCases
Test: manually confirm that AndroidStudio is not freezing
Bug: 153595161
Change-Id: I78f0818e8ed29bdadd02c151444ee7c9555623a4
diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h
index 61457d8..26c8a2a 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.h
+++ b/cmds/statsd/src/shell/ShellSubscriber.h
@@ -38,11 +38,11 @@
*
* A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client
* communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms.
- * The atoms are sent back to the client in real time, as opposed to
- * keeping the data in memory. Shell clients do not subscribe aggregated metrics, as they are
- * responsible for doing the aggregation after receiving the atom events.
+ * The atoms are sent back to the client in real time, as opposed to keeping the data in memory.
+ * Shell clients do not subscribe aggregated metrics, as they are responsible for doing the
+ * aggregation after receiving the atom events.
*
- * Shell client pass ShellSubscription in the proto binary format. Client can update the
+ * Shell clients pass ShellSubscription in the proto binary format. Clients can update the
* subscription by sending a new subscription. The new subscription would replace the old one.
* Input data stream format is:
*
@@ -54,7 +54,7 @@
* The stream would be in the following format:
* |size_t|shellData proto|size_t|shellData proto|....
*
- * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
+ * Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread
* until it exits.
*/
class ShellSubscriber : public virtual RefBase {
@@ -100,11 +100,28 @@
bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);
- void startPull(int64_t myToken);
+ void spawnHelperThreadsLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken);
- bool writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
+ void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo,
+ int myToken,
+ std::unique_lock<std::mutex>& lock,
+ int timeoutSec);
+
+ void startPull(int myToken);
+
+ void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher);
+ void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo);
+
+ void attemptWriteToSocketLocked(size_t dataSize);
+
+ // Send ocassional heartbeats for two reasons: (a) for statsd to detect when
+ // the read end of the pipe has closed and (b) for perfd to escape a
+ // blocking read call and recheck if the user has terminated the
+ // subscription.
+ void sendHeartbeats(int myToken);
+
sp<UidMap> mUidMap;
sp<StatsPullerManager> mPullerMgr;
@@ -120,6 +137,11 @@
int mToken = 0;
const int32_t DEFAULT_PULL_UID = AID_SYSTEM;
+
+ // Tracks when we last send data to perfd. We need that time to determine
+ // when next to send a heartbeat.
+ int64_t mLastWriteMs = 0;
+ const int64_t kMsBetweenHeartbeats = 1000;
};
} // namespace statsd