Add support for concurrent data sources.
Perfetto allows multiple trace sessions to run in parallel. Each trace
session creates an instance of the registered DataSource. Bpf ring
buffers only support a single consumer, so we don't want multiple
instances reading concurrently.
This patch fixes things by making the DataSource a very thing wrapper
which delegates everything to a singleton. The singleton counts the
number of active sessions so that start is only called if not already
started, and stop is called if there are no remaining sessions.
Note: it's not clear whether it would be better to take the min or max
of poll_ms for active sessions. Min would be good for callers wanting
high throughput data collection, but doing so could jeopordise callers
using the poll_ms to limit the trace size (e.g. longer traces that are
alright dropping >5kpps scenarios). In this change, we use whichever
poll_ms was set first and make no promises.
Bug: 246985031
Test: atest libnetworkstats_test
Change-Id: Ic85cab2205e6d426bcfc913450edff50be373bb0
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
index c9c79df..be4ffe3 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
@@ -55,13 +55,12 @@
NetworkTraceHandler::RegisterDataSource();
}
-NetworkTraceHandler::NetworkTraceHandler()
- : mPoller([](const PacketTrace& pkt) {
- NetworkTraceHandler::Trace(
- [pkt](NetworkTraceHandler::TraceContext ctx) {
- NetworkTraceHandler::Fill(pkt, *ctx.NewTracePacket());
- });
- }) {}
+// static
+NetworkTracePoller NetworkTraceHandler::sPoller([](const PacketTrace& pkt) {
+ NetworkTraceHandler::Trace([pkt](NetworkTraceHandler::TraceContext ctx) {
+ NetworkTraceHandler::Fill(pkt, *ctx.NewTracePacket());
+ });
+});
void NetworkTraceHandler::OnSetup(const SetupArgs& args) {
const std::string& raw = args.config->network_packet_trace_config_raw();
@@ -74,8 +73,14 @@
}
}
-void NetworkTraceHandler::OnStart(const StartArgs&) { mPoller.Start(mPollMs); }
-void NetworkTraceHandler::OnStop(const StopArgs&) { mPoller.Stop(); }
+void NetworkTraceHandler::OnStart(const StartArgs&) {
+ mStarted = sPoller.Start(mPollMs);
+}
+
+void NetworkTraceHandler::OnStop(const StopArgs&) {
+ if (mStarted) sPoller.Stop();
+ mStarted = false;
+}
void NetworkTracePoller::SchedulePolling() {
// Schedules another run of ourselves to recursively poll periodically.
@@ -117,6 +122,19 @@
ALOGD("Starting datasource");
std::scoped_lock<std::mutex> lock(mMutex);
+ if (mSessionCount > 0) {
+ if (mPollMs != pollMs) {
+ // Nothing technical prevents mPollMs from changing, it's just unclear
+ // what the right behavior is. Taking the min of active values could poll
+ // too frequently giving some sessions too much data. Taking the max could
+ // be too infrequent. For now, do nothing.
+ ALOGI("poll_ms can't be changed while running, ignoring poll_ms=%d",
+ pollMs);
+ }
+ mSessionCount++;
+ return true;
+ }
+
auto status = mConfigurationMap.init(PACKET_TRACE_ENABLED_MAP_PATH);
if (!status.ok()) {
ALOGW("Failed to bind config map: %s", status.error().message().c_str());
@@ -142,6 +160,7 @@
mPollMs = pollMs;
SchedulePolling();
+ mSessionCount++;
return true;
}
@@ -149,6 +168,11 @@
ALOGD("Stopping datasource");
std::scoped_lock<std::mutex> lock(mMutex);
+ if (mSessionCount == 0) return false; // This should never happen
+
+ // If this isn't the last session, don't clean up yet.
+ if (--mSessionCount > 0) return true;
+
auto res = mConfigurationMap.writeValue(0, false, BPF_ANY);
if (!res.ok()) {
ALOGW("Failed to disable tracing: %s", res.error().message().c_str());
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
index 8d03e2a..543be21 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
@@ -109,6 +109,24 @@
EXPECT_FALSE(handler.ConsumeAll());
}
+TEST_F(NetworkTracePollerTest, ConcurrentSessions) {
+ // Simulate two concurrent sessions (two starts followed by two stops). Check
+ // that tracing is stopped only after both sessions finish.
+ NetworkTracePoller handler([&](const PacketTrace& pkt) {});
+
+ ASSERT_TRUE(handler.Start(kNeverPoll));
+ EXPECT_TRUE(handler.ConsumeAll());
+
+ ASSERT_TRUE(handler.Start(kNeverPoll));
+ EXPECT_TRUE(handler.ConsumeAll());
+
+ ASSERT_TRUE(handler.Stop());
+ EXPECT_TRUE(handler.ConsumeAll());
+
+ ASSERT_TRUE(handler.Stop());
+ EXPECT_FALSE(handler.ConsumeAll());
+}
+
TEST_F(NetworkTracePollerTest, TraceTcpSession) {
__be16 server_port = 0;
std::vector<PacketTrace> packets;
diff --git a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
index 2a2243b..3f244b3 100644
--- a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
+++ b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
@@ -56,6 +56,12 @@
std::mutex mMutex;
+ // Records the number of successfully started active sessions so that only the
+ // first active session attempts setup and only the last cleans up. Note that
+ // the session count will remain zero if Start fails. It is expected that Stop
+ // will not be called for any trace session where Start fails.
+ int mSessionCount GUARDED_BY(mMutex);
+
// How often to poll the ring buffer, defined by the trace config.
uint32_t mPollMs GUARDED_BY(mMutex);
@@ -84,9 +90,6 @@
// Connects to the system Perfetto daemon and registers the trace handler.
static void InitPerfettoTracing();
- // Initialize with the default Perfetto callback.
- NetworkTraceHandler();
-
// perfetto::DataSource overrides:
void OnSetup(const SetupArgs& args) override;
void OnStart(const StartArgs&) override;
@@ -97,8 +100,9 @@
static void Fill(const PacketTrace& src,
::perfetto::protos::pbzero::TracePacket& dst);
- NetworkTracePoller mPoller;
+ static NetworkTracePoller sPoller;
uint32_t mPollMs;
+ bool mStarted;
};
} // namespace bpf