Add support for concurrent data sources.

Perfetto allows multiple trace sessions to run in parallel. Each trace
session creates an instance of the registered DataSource. Bpf ring
buffers only support a single consumer, so we don't want multiple
instances reading concurrently.

This patch fixes things by making the DataSource a very thing wrapper
which delegates everything to a singleton. The singleton counts the
number of active sessions so that start is only called if not already
started, and stop is called if there are no remaining sessions.

Note: it's not clear whether it would be better to take the min or max
of poll_ms for active sessions. Min would be good for callers wanting
high throughput data collection, but doing so could jeopordise callers
using the poll_ms to limit the trace size (e.g. longer traces that are
alright dropping >5kpps scenarios). In this change, we use whichever
poll_ms was set first and make no promises.

Bug: 246985031
Test: atest libnetworkstats_test
Change-Id: Ic85cab2205e6d426bcfc913450edff50be373bb0
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
index c9c79df..be4ffe3 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
@@ -55,13 +55,12 @@
   NetworkTraceHandler::RegisterDataSource();
 }
 
-NetworkTraceHandler::NetworkTraceHandler()
-    : mPoller([](const PacketTrace& pkt) {
-        NetworkTraceHandler::Trace(
-            [pkt](NetworkTraceHandler::TraceContext ctx) {
-              NetworkTraceHandler::Fill(pkt, *ctx.NewTracePacket());
-            });
-      }) {}
+// static
+NetworkTracePoller NetworkTraceHandler::sPoller([](const PacketTrace& pkt) {
+  NetworkTraceHandler::Trace([pkt](NetworkTraceHandler::TraceContext ctx) {
+    NetworkTraceHandler::Fill(pkt, *ctx.NewTracePacket());
+  });
+});
 
 void NetworkTraceHandler::OnSetup(const SetupArgs& args) {
   const std::string& raw = args.config->network_packet_trace_config_raw();
@@ -74,8 +73,14 @@
   }
 }
 
-void NetworkTraceHandler::OnStart(const StartArgs&) { mPoller.Start(mPollMs); }
-void NetworkTraceHandler::OnStop(const StopArgs&) { mPoller.Stop(); }
+void NetworkTraceHandler::OnStart(const StartArgs&) {
+  mStarted = sPoller.Start(mPollMs);
+}
+
+void NetworkTraceHandler::OnStop(const StopArgs&) {
+  if (mStarted) sPoller.Stop();
+  mStarted = false;
+}
 
 void NetworkTracePoller::SchedulePolling() {
   // Schedules another run of ourselves to recursively poll periodically.
@@ -117,6 +122,19 @@
   ALOGD("Starting datasource");
 
   std::scoped_lock<std::mutex> lock(mMutex);
+  if (mSessionCount > 0) {
+    if (mPollMs != pollMs) {
+      // Nothing technical prevents mPollMs from changing, it's just unclear
+      // what the right behavior is. Taking the min of active values could poll
+      // too frequently giving some sessions too much data. Taking the max could
+      // be too infrequent. For now, do nothing.
+      ALOGI("poll_ms can't be changed while running, ignoring poll_ms=%d",
+            pollMs);
+    }
+    mSessionCount++;
+    return true;
+  }
+
   auto status = mConfigurationMap.init(PACKET_TRACE_ENABLED_MAP_PATH);
   if (!status.ok()) {
     ALOGW("Failed to bind config map: %s", status.error().message().c_str());
@@ -142,6 +160,7 @@
   mPollMs = pollMs;
   SchedulePolling();
 
+  mSessionCount++;
   return true;
 }
 
@@ -149,6 +168,11 @@
   ALOGD("Stopping datasource");
 
   std::scoped_lock<std::mutex> lock(mMutex);
+  if (mSessionCount == 0) return false;  // This should never happen
+
+  // If this isn't the last session, don't clean up yet.
+  if (--mSessionCount > 0) return true;
+
   auto res = mConfigurationMap.writeValue(0, false, BPF_ANY);
   if (!res.ok()) {
     ALOGW("Failed to disable tracing: %s", res.error().message().c_str());
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
index 8d03e2a..543be21 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
@@ -109,6 +109,24 @@
   EXPECT_FALSE(handler.ConsumeAll());
 }
 
+TEST_F(NetworkTracePollerTest, ConcurrentSessions) {
+  // Simulate two concurrent sessions (two starts followed by two stops). Check
+  // that tracing is stopped only after both sessions finish.
+  NetworkTracePoller handler([&](const PacketTrace& pkt) {});
+
+  ASSERT_TRUE(handler.Start(kNeverPoll));
+  EXPECT_TRUE(handler.ConsumeAll());
+
+  ASSERT_TRUE(handler.Start(kNeverPoll));
+  EXPECT_TRUE(handler.ConsumeAll());
+
+  ASSERT_TRUE(handler.Stop());
+  EXPECT_TRUE(handler.ConsumeAll());
+
+  ASSERT_TRUE(handler.Stop());
+  EXPECT_FALSE(handler.ConsumeAll());
+}
+
 TEST_F(NetworkTracePollerTest, TraceTcpSession) {
   __be16 server_port = 0;
   std::vector<PacketTrace> packets;
diff --git a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
index 2a2243b..3f244b3 100644
--- a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
+++ b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
@@ -56,6 +56,12 @@
 
   std::mutex mMutex;
 
+  // Records the number of successfully started active sessions so that only the
+  // first active session attempts setup and only the last cleans up. Note that
+  // the session count will remain zero if Start fails. It is expected that Stop
+  // will not be called for any trace session where Start fails.
+  int mSessionCount GUARDED_BY(mMutex);
+
   // How often to poll the ring buffer, defined by the trace config.
   uint32_t mPollMs GUARDED_BY(mMutex);
 
@@ -84,9 +90,6 @@
   // Connects to the system Perfetto daemon and registers the trace handler.
   static void InitPerfettoTracing();
 
-  // Initialize with the default Perfetto callback.
-  NetworkTraceHandler();
-
   // perfetto::DataSource overrides:
   void OnSetup(const SetupArgs& args) override;
   void OnStart(const StartArgs&) override;
@@ -97,8 +100,9 @@
   static void Fill(const PacketTrace& src,
                    ::perfetto::protos::pbzero::TracePacket& dst);
 
-  NetworkTracePoller mPoller;
+  static NetworkTracePoller sPoller;
   uint32_t mPollMs;
+  bool mStarted;
 };
 
 }  // namespace bpf