Parse new config options and to batch in callback

This converts the poller's callback to batch-style pass all of the
events each time it polls rather than one-by-one. This requires a bit
more memory (up to 32kb) but will allow optimizations in the following
changes that should reduce the trace size and cpu by ~10x.

Bug: 246985031
Test: atest libnetworkstats_test
Change-Id: Ia3223ba8b27b825e2d63d6b3b8ac09b8eb17b3f8
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
index 8e70950..07f8054 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
@@ -57,11 +57,14 @@
 }
 
 // static
-NetworkTracePoller NetworkTraceHandler::sPoller([](const PacketTrace& pkt) {
-  NetworkTraceHandler::Trace([pkt](NetworkTraceHandler::TraceContext ctx) {
-    NetworkTraceHandler::Fill(pkt, *ctx.NewTracePacket());
-  });
-});
+NetworkTracePoller NetworkTraceHandler::sPoller(
+    [](const std::vector<PacketTrace>& packets) {
+      NetworkTraceHandler::Trace([&](NetworkTraceHandler::TraceContext ctx) {
+        for (const PacketTrace& pkt : packets) {
+          NetworkTraceHandler::Fill(pkt, *ctx.NewTracePacket());
+        }
+      });
+    });
 
 void NetworkTraceHandler::OnSetup(const SetupArgs& args) {
   const std::string& raw = args.config->network_packet_trace_config_raw();
@@ -72,6 +75,12 @@
     ALOGI("poll_ms is missing or below the 100ms minimum. Increasing to 100ms");
     mPollMs = 100;
   }
+
+  mInternLimit = config.intern_limit();
+  mAggregationThreshold = config.aggregation_threshold();
+  mDropLocalPort = config.drop_local_port();
+  mDropRemotePort = config.drop_remote_port();
+  mDropTcpFlags = config.drop_tcp_flags();
 }
 
 void NetworkTraceHandler::OnStart(const StartArgs&) {
diff --git a/service-t/native/libs/libnetworkstats/NetworkTracePoller.cpp b/service-t/native/libs/libnetworkstats/NetworkTracePoller.cpp
index 34dbf9e..3abb49a 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTracePoller.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTracePoller.cpp
@@ -116,12 +116,16 @@
     return false;
   }
 
-  base::Result<int> ret = mRingBuffer->ConsumeAll(mCallback);
+  std::vector<PacketTrace> packets;
+  base::Result<int> ret = mRingBuffer->ConsumeAll(
+      [&](const PacketTrace& pkt) { packets.push_back(pkt); });
   if (!ret.ok()) {
     ALOGW("Failed to poll ringbuf: %s", ret.error().message().c_str());
     return false;
   }
 
+  mCallback(packets);
+
   return true;
 }
 
diff --git a/service-t/native/libs/libnetworkstats/NetworkTracePollerTest.cpp b/service-t/native/libs/libnetworkstats/NetworkTracePollerTest.cpp
index 28ec208..725cec1 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTracePollerTest.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTracePollerTest.cpp
@@ -100,7 +100,7 @@
 };
 
 TEST_F(NetworkTracePollerTest, PollWhileInactive) {
-  NetworkTracePoller handler([&](const PacketTrace& pkt) {});
+  NetworkTracePoller handler([&](const std::vector<PacketTrace>& pkt) {});
 
   // One succeed after start and before stop.
   EXPECT_FALSE(handler.ConsumeAll());
@@ -113,7 +113,7 @@
 TEST_F(NetworkTracePollerTest, ConcurrentSessions) {
   // Simulate two concurrent sessions (two starts followed by two stops). Check
   // that tracing is stopped only after both sessions finish.
-  NetworkTracePoller handler([&](const PacketTrace& pkt) {});
+  NetworkTracePoller handler([&](const std::vector<PacketTrace>& pkt) {});
 
   ASSERT_TRUE(handler.Start(kNeverPoll));
   EXPECT_TRUE(handler.ConsumeAll());
@@ -135,10 +135,12 @@
   // Record all packets with the bound address and current uid. This callback is
   // involked only within ConsumeAll, at which point the port should have
   // already been filled in and all packets have been processed.
-  NetworkTracePoller handler([&](const PacketTrace& pkt) {
-    if (pkt.sport != server_port && pkt.dport != server_port) return;
-    if (pkt.uid != getuid()) return;
-    packets.push_back(pkt);
+  NetworkTracePoller handler([&](const std::vector<PacketTrace>& pkts) {
+    for (const PacketTrace& pkt : pkts) {
+      if (pkt.sport != server_port && pkt.dport != server_port) return;
+      if (pkt.uid != getuid()) return;
+      packets.push_back(pkt);
+    }
   });
 
   ASSERT_TRUE(handler.Start(kNeverPoll));
diff --git a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
index 1266237..bdc18fb 100644
--- a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
+++ b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
@@ -52,8 +52,15 @@
                    ::perfetto::protos::pbzero::TracePacket& dst);
 
   static internal::NetworkTracePoller sPoller;
-  uint32_t mPollMs;
   bool mStarted;
+
+  // Values from config, see proto for details.
+  uint32_t mPollMs;
+  uint32_t mInternLimit;
+  uint32_t mAggregationThreshold;
+  bool mDropLocalPort;
+  bool mDropRemotePort;
+  bool mDropTcpFlags;
 };
 
 }  // namespace bpf
diff --git a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTracePoller.h b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTracePoller.h
index b0189a7..adde51e 100644
--- a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTracePoller.h
+++ b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTracePoller.h
@@ -38,9 +38,10 @@
 // it is not meant to be used elsewhere.
 class NetworkTracePoller {
  public:
+  using EventSink = std::function<void(const std::vector<PacketTrace>&)>;
+
   // Testonly: initialize with a callback capable of intercepting data.
-  NetworkTracePoller(std::function<void(const PacketTrace&)> callback)
-      : mCallback(std::move(callback)) {}
+  NetworkTracePoller(EventSink callback) : mCallback(std::move(callback)) {}
 
   // Starts tracing with the given poll interval.
   bool Start(uint32_t pollMs) EXCLUDES(mMutex);
@@ -67,7 +68,7 @@
   uint32_t mPollMs GUARDED_BY(mMutex);
 
   // The function to process PacketTrace, typically a Perfetto sink.
-  std::function<void(const PacketTrace&)> mCallback GUARDED_BY(mMutex);
+  EventSink mCallback GUARDED_BY(mMutex);
 
   // The BPF ring buffer handle.
   std::unique_ptr<BpfRingbuf<PacketTrace>> mRingBuffer GUARDED_BY(mMutex);