Combine startPull and sendHeartbeat threads
This ensures that only one helper thread is created per subscription.
Previously, there could be up to two.
+ fixes thread sleep duration if the pulled atoms have different pull
frequencies
+ rename attemptWriteToSocketLocked to attemptWriteToPipeLocked
Test: atest statsd_test
Test: atest CtsStatsdHostTestCases:ShellSubscriberTest
Test: manual testing on Android Studio
Bug: 156678125
Change-Id: I7074bbba5981a591a30e8b70a1ad1d83eadfcc30
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index 361b161..fd883c2 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -41,13 +41,8 @@
{
std::unique_lock<std::mutex> lock(mMutex);
- if (myToken != mToken) {
- // Some other subscription has already come in. Stop.
- return;
- }
mSubscriptionInfo = mySubscriptionInfo;
-
- spawnHelperThreadsLocked(mySubscriptionInfo, myToken);
+ spawnHelperThread(myToken);
waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);
if (mSubscriptionInfo == mySubscriptionInfo) {
@@ -57,14 +52,9 @@
}
}
-void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) {
- if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) {
- std::thread puller([this, myToken] { startPull(myToken); });
- puller.detach();
- }
-
- std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); });
- heartbeatSender.detach();
+void ShellSubscriber::spawnHelperThread(int myToken) {
+ std::thread t([this, myToken] { pullAndSendHeartbeats(myToken); });
+ t.detach();
}
void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
@@ -114,13 +104,7 @@
subscriptionInfo->mPushedMatchers.push_back(pushed);
}
- int minInterval = -1;
for (const auto& pulled : config.pulled()) {
- // All intervals need to be multiples of the min interval.
- if (minInterval < 0 || pulled.freq_millis() < minInterval) {
- minInterval = pulled.freq_millis();
- }
-
vector<string> packages;
vector<int32_t> uids;
for (const string& pkg : pulled.packages()) {
@@ -136,18 +120,18 @@
uids);
VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
}
- subscriptionInfo->mPullIntervalMin = minInterval;
return true;
}
-void ShellSubscriber::startPull(int myToken) {
- VLOG("ShellSubscriber: pull thread %d starting", myToken);
+void ShellSubscriber::pullAndSendHeartbeats(int myToken) {
+ VLOG("ShellSubscriber: helper thread %d starting", myToken);
while (true) {
+ int64_t sleepTimeMs = INT_MAX;
{
std::lock_guard<std::mutex> lock(mMutex);
if (!mSubscriptionInfo || mToken != myToken) {
- VLOG("ShellSubscriber: pulling thread %d done!", myToken);
+ VLOG("ShellSubscriber: helper thread %d done!", myToken);
return;
}
@@ -168,11 +152,27 @@
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
}
+
+ // Send a heartbeat, consisting of a data size of 0, if perfd hasn't recently received
+ // data from statsd. When it receives the data size of 0, perfd will not expect any
+ // atoms and recheck whether the subscription should end.
+ if (nowMillis - mLastWriteMs > kMsBetweenHeartbeats) {
+ attemptWriteToPipeLocked(/*dataSize=*/0);
+ }
+
+ // Determine how long to sleep before doing more work.
+ for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
+ int64_t nextPullTime = pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval;
+ int64_t timeBeforePull = nextPullTime - nowMillis; // guaranteed to be non-negative
+ if (timeBeforePull < sleepTimeMs) sleepTimeMs = timeBeforePull;
+ }
+ int64_t timeBeforeHeartbeat = (mLastWriteMs + kMsBetweenHeartbeats) - nowMillis;
+ if (timeBeforeHeartbeat < sleepTimeMs) sleepTimeMs = timeBeforeHeartbeat;
}
- VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken,
- mSubscriptionInfo->mPullIntervalMin);
- std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
+ VLOG("ShellSubscriber: helper thread %d sleeping for %lld ms", myToken,
+ (long long)sleepTimeMs);
+ std::this_thread::sleep_for(std::chrono::milliseconds(sleepTimeMs));
}
}
@@ -200,7 +200,7 @@
}
}
- if (count > 0) attemptWriteToSocketLocked(mProto.size());
+ if (count > 0) attemptWriteToPipeLocked(mProto.size());
}
void ShellSubscriber::onLogEvent(const LogEvent& event) {
@@ -214,26 +214,24 @@
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event.ToProto(mProto);
mProto.end(atomToken);
- attemptWriteToSocketLocked(mProto.size());
+ attemptWriteToPipeLocked(mProto.size());
}
}
}
-// Tries to write the atom encoded in mProto to the socket. If the write fails
+// Tries to write the atom encoded in mProto to the pipe. If the write fails
// because the read end of the pipe has closed, signals to other threads that
// the subscription should end.
-void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) {
- // First write the payload size.
+void ShellSubscriber::attemptWriteToPipeLocked(size_t dataSize) {
+ // First, write the payload size.
if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
mSubscriptionInfo->mClientAlive = false;
mSubscriptionShouldEnd.notify_one();
return;
}
- if (dataSize == 0) return;
-
- // Then, write the payload.
- if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
+ // Then, write the payload if this is not just a heartbeat.
+ if (dataSize > 0 && !mProto.flush(mSubscriptionInfo->mOutputFd)) {
mSubscriptionInfo->mClientAlive = false;
mSubscriptionShouldEnd.notify_one();
return;
@@ -242,28 +240,6 @@
mLastWriteMs = getElapsedRealtimeMillis();
}
-// Send a heartbeat, consisting solely of a data size of 0, if perfd has not
-// recently received any writes from statsd. When it receives the data size of
-// 0, perfd will not expect any data and recheck whether the shell command is
-// still running.
-void ShellSubscriber::sendHeartbeats(int myToken) {
- while (true) {
- {
- std::lock_guard<std::mutex> lock(mMutex);
- if (!mSubscriptionInfo || myToken != mToken) {
- VLOG("ShellSubscriber: heartbeat thread %d done!", myToken);
- return;
- }
-
- if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) {
- VLOG("ShellSubscriber: sending a heartbeat to perfd");
- attemptWriteToSocketLocked(/*dataSize=*/0);
- }
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats));
- }
-}
-
} // namespace statsd
} // namespace os
} // namespace android