Add event bundling in NetworkTraceHandler

Bundling groups PacketTraces by their various attributes (e.g. uid, tag,
ports) and outputs a single Perfetto TracePacket per group. Rather than
repeat this information many times, it's only written once. In most
cases, this should reduce the trace size by 5x.

Bug: 246985031
Test: atest libnetworkstats_test
Change-Id: Ia9cb163fb4c673abdab8d442576cf4b12a98dbc6
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
index cd62bc5..ded5eb3 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
@@ -39,6 +39,61 @@
 using ::perfetto::protos::pbzero::TracePacket;
 using ::perfetto::protos::pbzero::TrafficDirection;
 
+// Bundling takes groups of packets with similar contextual fields (generally,
+// all fields except timestamp and length) and summarises them in a single trace
+// packet. For example, rather than
+//
+//   {.timestampNs = 1, .uid = 1000, .tag = 123, .len = 72}
+//   {.timestampNs = 2, .uid = 1000, .tag = 123, .len = 100}
+//   {.timestampNs = 5, .uid = 1000, .tag = 123, .len = 456}
+//
+// The output will be something like
+//   {
+//     .timestamp = 1
+//     .ctx = {.uid = 1000, .tag = 123}
+//     .timestamp = [0, 1, 4], // delta encoded
+//     .length = [72, 100, 456], // should be zipped with timestamps
+//   }
+//
+// Most workloads have many packets from few contexts. Bundling greatly reduces
+// the amount of redundant information written, thus reducing the overall trace
+// size. Interning ids are similarly based on unique bundle contexts.
+
+// Keys are PacketTraces where timestamp and length are ignored.
+using BundleKey = PacketTrace;
+
+// Based on boost::hash_combine
+template <typename T, typename... Rest>
+void HashCombine(std::size_t& seed, const T& val, const Rest&... rest) {
+  seed ^= std::hash<T>()(val) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+  (HashCombine(seed, rest), ...);
+}
+
+// Details summarises the timestamp and lengths of packets in a bundle.
+struct BundleDetails {
+  std::vector<std::pair<uint64_t, uint32_t>> time_and_len;
+  uint64_t minTs = std::numeric_limits<uint64_t>::max();
+  uint64_t maxTs = std::numeric_limits<uint64_t>::min();
+  uint32_t bytes = 0;
+};
+
+#define AGG_FIELDS(x) \
+  x.ifindex, x.uid, x.tag, x.sport, x.dport, x.egress, x.ipProto, x.tcpFlags
+
+struct BundleHash {
+  std::size_t operator()(const BundleKey& a) const {
+    std::size_t seed = 0;
+    HashCombine(seed, AGG_FIELDS(a));
+    return seed;
+  }
+};
+
+struct BundleEq {
+  bool operator()(const BundleKey& a, const BundleKey& b) const {
+    return std::tie(AGG_FIELDS(a)) == std::tie(AGG_FIELDS(b));
+  }
+};
+
 // static
 void NetworkTraceHandler::RegisterDataSource() {
   ALOGD("Registering Perfetto data source");
@@ -96,18 +151,42 @@
 
 void NetworkTraceHandler::Write(const std::vector<PacketTrace>& packets,
                                 NetworkTraceHandler::TraceContext& ctx) {
+  std::unordered_map<BundleKey, BundleDetails, BundleHash, BundleEq> bundles;
   for (const PacketTrace& pkt : packets) {
-    Fill(pkt, *ctx.NewTracePacket());
+    BundleDetails& bundle = bundles[pkt];
+    bundle.time_and_len.emplace_back(pkt.timestampNs, pkt.length);
+    bundle.minTs = std::min(bundle.minTs, pkt.timestampNs);
+    bundle.maxTs = std::max(bundle.maxTs, pkt.timestampNs);
+    bundle.bytes += pkt.length;
+  }
+
+  for (const auto& kv : bundles) {
+    const BundleKey& key = kv.first;
+    const BundleDetails& details = kv.second;
+
+    auto dst = ctx.NewTracePacket();
+    dst->set_timestamp(details.minTs);
+
+    auto* event = dst->set_network_packet_bundle();
+    Fill(key, event->set_ctx());
+
+    protozero::PackedVarInt offsets;
+    protozero::PackedVarInt lengths;
+    for (const auto& kv : details.time_and_len) {
+      offsets.Append(kv.first - details.minTs);
+      lengths.Append(kv.second);
+    }
+
+    event->set_packet_timestamps(offsets);
+    event->set_packet_lengths(lengths);
   }
 }
 
 // static class method
-void NetworkTraceHandler::Fill(const PacketTrace& src, TracePacket& dst) {
-  dst.set_timestamp(src.timestampNs);
-  auto* event = dst.set_network_packet();
+void NetworkTraceHandler::Fill(const PacketTrace& src,
+                               NetworkPacketEvent* event) {
   event->set_direction(src.egress ? TrafficDirection::DIR_EGRESS
                                   : TrafficDirection::DIR_INGRESS);
-  event->set_length(src.length);
   event->set_uid(src.uid);
   event->set_tag(src.tag);
 
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
index 2318da5..d943ae0 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
@@ -76,7 +76,7 @@
     // This is a real trace and includes irrelevant trace packets such as trace
     // metadata. The following strips the results to just the packets we want.
     for (const auto& pkt : trace.packet()) {
-      if (pkt.has_network_packet()) {
+      if (pkt.has_network_packet() || pkt.has_network_packet_bundle()) {
         output->emplace_back(pkt);
       }
     }
@@ -125,11 +125,12 @@
 
   ASSERT_EQ(events.size(), 1);
   EXPECT_THAT(events[0].timestamp(), 1000);
-  EXPECT_THAT(events[0].network_packet().length(), 100);
-  EXPECT_THAT(events[0].network_packet().uid(), 10);
-  EXPECT_THAT(events[0].network_packet().tag(), 123);
-  EXPECT_THAT(events[0].network_packet().ip_proto(), 6);
-  EXPECT_THAT(events[0].network_packet().tcp_flags(), 1);
+  EXPECT_THAT(events[0].network_packet_bundle().ctx().uid(), 10);
+  EXPECT_THAT(events[0].network_packet_bundle().ctx().tag(), 123);
+  EXPECT_THAT(events[0].network_packet_bundle().ctx().ip_proto(), 6);
+  EXPECT_THAT(events[0].network_packet_bundle().ctx().tcp_flags(), 1);
+  EXPECT_THAT(events[0].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(100));
 }
 
 TEST_F(NetworkTraceHandlerTest, WriteDirectionAndPorts) {
@@ -152,15 +153,45 @@
   ASSERT_TRUE(TraceAndSortPackets(input, &events));
 
   ASSERT_EQ(events.size(), 2);
-  EXPECT_THAT(events[0].network_packet().local_port(), 8080);
-  EXPECT_THAT(events[0].network_packet().remote_port(), 443);
-  EXPECT_THAT(events[0].network_packet().direction(),
+  EXPECT_THAT(events[0].network_packet_bundle().ctx().local_port(), 8080);
+  EXPECT_THAT(events[0].network_packet_bundle().ctx().remote_port(), 443);
+  EXPECT_THAT(events[0].network_packet_bundle().ctx().direction(),
               TrafficDirection::DIR_EGRESS);
-  EXPECT_THAT(events[1].network_packet().local_port(), 8080);
-  EXPECT_THAT(events[1].network_packet().remote_port(), 443);
-  EXPECT_THAT(events[1].network_packet().direction(),
+  EXPECT_THAT(events[1].network_packet_bundle().ctx().local_port(), 8080);
+  EXPECT_THAT(events[1].network_packet_bundle().ctx().remote_port(), 443);
+  EXPECT_THAT(events[1].network_packet_bundle().ctx().direction(),
               TrafficDirection::DIR_INGRESS);
 }
 
+TEST_F(NetworkTraceHandlerTest, BasicBundling) {
+  std::vector<PacketTrace> input = {
+      PacketTrace{.uid = 123, .timestampNs = 2, .length = 200},
+      PacketTrace{.uid = 123, .timestampNs = 1, .length = 100},
+      PacketTrace{.uid = 123, .timestampNs = 4, .length = 300},
+
+      PacketTrace{.uid = 456, .timestampNs = 2, .length = 400},
+      PacketTrace{.uid = 456, .timestampNs = 4, .length = 100},
+  };
+
+  std::vector<TracePacket> events;
+  ASSERT_TRUE(TraceAndSortPackets(input, &events));
+
+  ASSERT_EQ(events.size(), 2);
+
+  EXPECT_THAT(events[0].timestamp(), 1);
+  EXPECT_THAT(events[0].network_packet_bundle().ctx().uid(), 123);
+  EXPECT_THAT(events[0].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(200, 100, 300));
+  EXPECT_THAT(events[0].network_packet_bundle().packet_timestamps(),
+              testing::ElementsAre(1, 0, 3));
+
+  EXPECT_THAT(events[1].timestamp(), 2);
+  EXPECT_THAT(events[1].network_packet_bundle().ctx().uid(), 456);
+  EXPECT_THAT(events[1].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(400, 100));
+  EXPECT_THAT(events[1].network_packet_bundle().packet_timestamps(),
+              testing::ElementsAre(0, 2));
+}
+
 }  // namespace bpf
 }  // namespace android
diff --git a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
index 9520d7b..03f8651 100644
--- a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
+++ b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
@@ -54,7 +54,7 @@
  private:
   // Convert a PacketTrace into a Perfetto trace packet.
   static void Fill(const PacketTrace& src,
-                   ::perfetto::protos::pbzero::TracePacket& dst);
+                   ::perfetto::protos::pbzero::NetworkPacketEvent* event);
 
   static internal::NetworkTracePoller sPoller;
   bool mStarted;