Merge changes I2ae32895,I9e3c6f01,Ifc0e00f3,Ia9cb163f

* changes:
  Add support for dropped fields in NetworkTracing
  Add support for NetworkTrace aggregation threshold
  Add fallback in Network Tracing to old format
  Add event bundling in NetworkTraceHandler
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
index cd62bc5..cd6480c 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
@@ -39,6 +39,62 @@
 using ::perfetto::protos::pbzero::TracePacket;
 using ::perfetto::protos::pbzero::TrafficDirection;
 
+// Bundling takes groups of packets with similar contextual fields (generally,
+// all fields except timestamp and length) and summarises them in a single trace
+// packet. For example, rather than
+//
+//   {.timestampNs = 1, .uid = 1000, .tag = 123, .len = 72}
+//   {.timestampNs = 2, .uid = 1000, .tag = 123, .len = 100}
+//   {.timestampNs = 5, .uid = 1000, .tag = 123, .len = 456}
+//
+// The output will be something like
+//   {
+//     .timestamp = 1
+//     .ctx = {.uid = 1000, .tag = 123}
+//     .timestamp = [0, 1, 4], // delta encoded
+//     .length = [72, 100, 456], // should be zipped with timestamps
+//   }
+//
+// Most workloads have many packets from few contexts. Bundling greatly reduces
+// the amount of redundant information written, thus reducing the overall trace
+// size. Interning ids are similarly based on unique bundle contexts.
+
+// Keys are PacketTraces where timestamp and length are ignored.
+using BundleKey = PacketTrace;
+
+// Based on boost::hash_combine
+template <typename T, typename... Rest>
+void HashCombine(std::size_t& seed, const T& val, const Rest&... rest) {
+  seed ^= std::hash<T>()(val) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+  (HashCombine(seed, rest), ...);
+}
+
+// Details summarises the timestamp and lengths of packets in a bundle.
+struct BundleDetails {
+  std::vector<std::pair<uint64_t, uint32_t>> time_and_len;
+  uint64_t minTs = std::numeric_limits<uint64_t>::max();
+  uint64_t maxTs = std::numeric_limits<uint64_t>::min();
+  uint32_t bytes = 0;
+};
+
+#define AGG_FIELDS(x)                                              \
+  (x).ifindex, (x).uid, (x).tag, (x).sport, (x).dport, (x).egress, \
+      (x).ipProto, (x).tcpFlags
+
+struct BundleHash {
+  std::size_t operator()(const BundleKey& a) const {
+    std::size_t seed = 0;
+    HashCombine(seed, AGG_FIELDS(a));
+    return seed;
+  }
+};
+
+struct BundleEq {
+  bool operator()(const BundleKey& a, const BundleKey& b) const {
+    return std::tie(AGG_FIELDS(a)) == std::tie(AGG_FIELDS(b));
+  }
+};
+
 // static
 void NetworkTraceHandler::RegisterDataSource() {
   ALOGD("Registering Perfetto data source");
@@ -96,26 +152,82 @@
 
 void NetworkTraceHandler::Write(const std::vector<PacketTrace>& packets,
                                 NetworkTraceHandler::TraceContext& ctx) {
+  // TODO: remove this fallback once Perfetto stable has support for bundles.
+  if (!mInternLimit && !mAggregationThreshold) {
+    for (const PacketTrace& pkt : packets) {
+      auto dst = ctx.NewTracePacket();
+      dst->set_timestamp(pkt.timestampNs);
+      auto* event = dst->set_network_packet();
+      event->set_length(pkt.length);
+      Fill(pkt, event);
+    }
+    return;
+  }
+  std::unordered_map<BundleKey, BundleDetails, BundleHash, BundleEq> bundles;
   for (const PacketTrace& pkt : packets) {
-    Fill(pkt, *ctx.NewTracePacket());
+    BundleKey key = pkt;
+
+    // Dropping fields should remove them from the output and remove them from
+    // the aggregation key. In order to do the latter without changing the hash
+    // function, set the dropped fields to zero.
+    if (mDropTcpFlags) key.tcpFlags = 0;
+    if (mDropLocalPort) (key.egress ? key.sport : key.dport) = 0;
+    if (mDropRemotePort) (key.egress ? key.dport : key.sport) = 0;
+
+    BundleDetails& bundle = bundles[key];
+    bundle.time_and_len.emplace_back(pkt.timestampNs, pkt.length);
+    bundle.minTs = std::min(bundle.minTs, pkt.timestampNs);
+    bundle.maxTs = std::max(bundle.maxTs, pkt.timestampNs);
+    bundle.bytes += pkt.length;
+  }
+
+  for (const auto& kv : bundles) {
+    const BundleKey& key = kv.first;
+    const BundleDetails& details = kv.second;
+
+    auto dst = ctx.NewTracePacket();
+    dst->set_timestamp(details.minTs);
+
+    auto* event = dst->set_network_packet_bundle();
+    Fill(key, event->set_ctx());
+
+    int count = details.time_and_len.size();
+    if (!mAggregationThreshold || count < mAggregationThreshold) {
+      protozero::PackedVarInt offsets;
+      protozero::PackedVarInt lengths;
+      for (const auto& kv : details.time_and_len) {
+        offsets.Append(kv.first - details.minTs);
+        lengths.Append(kv.second);
+      }
+
+      event->set_packet_timestamps(offsets);
+      event->set_packet_lengths(lengths);
+    } else {
+      event->set_total_duration(details.maxTs - details.minTs);
+      event->set_total_length(details.bytes);
+      event->set_total_packets(count);
+    }
   }
 }
 
-// static class method
-void NetworkTraceHandler::Fill(const PacketTrace& src, TracePacket& dst) {
-  dst.set_timestamp(src.timestampNs);
-  auto* event = dst.set_network_packet();
+void NetworkTraceHandler::Fill(const PacketTrace& src,
+                               NetworkPacketEvent* event) {
   event->set_direction(src.egress ? TrafficDirection::DIR_EGRESS
                                   : TrafficDirection::DIR_INGRESS);
-  event->set_length(src.length);
   event->set_uid(src.uid);
   event->set_tag(src.tag);
 
-  event->set_local_port(src.egress ? ntohs(src.sport) : ntohs(src.dport));
-  event->set_remote_port(src.egress ? ntohs(src.dport) : ntohs(src.sport));
+  if (!mDropLocalPort) {
+    event->set_local_port(ntohs(src.egress ? src.sport : src.dport));
+  }
+  if (!mDropRemotePort) {
+    event->set_remote_port(ntohs(src.egress ? src.dport : src.sport));
+  }
+  if (!mDropTcpFlags) {
+    event->set_tcp_flags(src.tcpFlags);
+  }
 
   event->set_ip_proto(src.ipProto);
-  event->set_tcp_flags(src.tcpFlags);
 
   char ifname[IF_NAMESIZE] = {};
   if (if_indextoname(src.ifindex, ifname) == ifname) {
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
index 2318da5..d2c1d38 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
@@ -20,6 +20,7 @@
 #include <vector>
 
 #include "netdbpf/NetworkTraceHandler.h"
+#include "protos/perfetto/config/android/network_trace_config.gen.h"
 #include "protos/perfetto/trace/android/network_trace.pb.h"
 #include "protos/perfetto/trace/trace.pb.h"
 #include "protos/perfetto/trace/trace_packet.pb.h"
@@ -27,6 +28,7 @@
 namespace android {
 namespace bpf {
 using ::perfetto::protos::NetworkPacketEvent;
+using ::perfetto::protos::NetworkPacketTraceConfig;
 using ::perfetto::protos::Trace;
 using ::perfetto::protos::TracePacket;
 using ::perfetto::protos::TrafficDirection;
@@ -42,7 +44,8 @@
 class NetworkTraceHandlerTest : public testing::Test {
  protected:
   // Starts a tracing session with the handler under test.
-  std::unique_ptr<perfetto::TracingSession> StartTracing() {
+  std::unique_ptr<perfetto::TracingSession> StartTracing(
+      NetworkPacketTraceConfig settings) {
     perfetto::TracingInitArgs args;
     args.backends = perfetto::kInProcessBackend;
     perfetto::Tracing::Initialize(args);
@@ -53,7 +56,9 @@
 
     perfetto::TraceConfig cfg;
     cfg.add_buffers()->set_size_kb(1024);
-    cfg.add_data_sources()->mutable_config()->set_name("test.network_packets");
+    auto* config = cfg.add_data_sources()->mutable_config();
+    config->set_name("test.network_packets");
+    config->set_network_packet_trace_config_raw(settings.SerializeAsString());
 
     auto session = perfetto::Tracing::NewTrace(perfetto::kInProcessBackend);
     session->Setup(cfg);
@@ -76,7 +81,7 @@
     // This is a real trace and includes irrelevant trace packets such as trace
     // metadata. The following strips the results to just the packets we want.
     for (const auto& pkt : trace.packet()) {
-      if (pkt.has_network_packet()) {
+      if (pkt.has_network_packet() || pkt.has_network_packet_bundle()) {
         output->emplace_back(pkt);
       }
     }
@@ -86,8 +91,9 @@
 
   // This runs a trace with a single call to Write.
   bool TraceAndSortPackets(const std::vector<PacketTrace>& input,
-                           std::vector<TracePacket>* output) {
-    auto session = StartTracing();
+                           std::vector<TracePacket>* output,
+                           NetworkPacketTraceConfig config = {}) {
+    auto session = StartTracing(config);
     HandlerForTest::Trace([&](HandlerForTest::TraceContext ctx) {
       ctx.GetDataSourceLocked()->Write(input, ctx);
       ctx.Flush();
@@ -125,11 +131,11 @@
 
   ASSERT_EQ(events.size(), 1);
   EXPECT_THAT(events[0].timestamp(), 1000);
-  EXPECT_THAT(events[0].network_packet().length(), 100);
   EXPECT_THAT(events[0].network_packet().uid(), 10);
   EXPECT_THAT(events[0].network_packet().tag(), 123);
   EXPECT_THAT(events[0].network_packet().ip_proto(), 6);
   EXPECT_THAT(events[0].network_packet().tcp_flags(), 1);
+  EXPECT_THAT(events[0].network_packet().length(), 100);
 }
 
 TEST_F(NetworkTraceHandlerTest, WriteDirectionAndPorts) {
@@ -162,5 +168,175 @@
               TrafficDirection::DIR_INGRESS);
 }
 
+TEST_F(NetworkTraceHandlerTest, BasicBundling) {
+  // TODO: remove this once bundling becomes default. Until then, set arbitrary
+  // aggregation threshold to enable bundling.
+  NetworkPacketTraceConfig config;
+  config.set_aggregation_threshold(10);
+
+  std::vector<PacketTrace> input = {
+      PacketTrace{.uid = 123, .timestampNs = 2, .length = 200},
+      PacketTrace{.uid = 123, .timestampNs = 1, .length = 100},
+      PacketTrace{.uid = 123, .timestampNs = 4, .length = 300},
+
+      PacketTrace{.uid = 456, .timestampNs = 2, .length = 400},
+      PacketTrace{.uid = 456, .timestampNs = 4, .length = 100},
+  };
+
+  std::vector<TracePacket> events;
+  ASSERT_TRUE(TraceAndSortPackets(input, &events, config));
+
+  ASSERT_EQ(events.size(), 2);
+
+  EXPECT_THAT(events[0].timestamp(), 1);
+  EXPECT_THAT(events[0].network_packet_bundle().ctx().uid(), 123);
+  EXPECT_THAT(events[0].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(200, 100, 300));
+  EXPECT_THAT(events[0].network_packet_bundle().packet_timestamps(),
+              testing::ElementsAre(1, 0, 3));
+
+  EXPECT_THAT(events[1].timestamp(), 2);
+  EXPECT_THAT(events[1].network_packet_bundle().ctx().uid(), 456);
+  EXPECT_THAT(events[1].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(400, 100));
+  EXPECT_THAT(events[1].network_packet_bundle().packet_timestamps(),
+              testing::ElementsAre(0, 2));
+}
+
+TEST_F(NetworkTraceHandlerTest, AggregationThreshold) {
+  // With an aggregation threshold of 3, the set of packets with uid=123 will
+  // be aggregated (3>=3) whereas packets with uid=456 get per-packet info.
+  NetworkPacketTraceConfig config;
+  config.set_aggregation_threshold(3);
+
+  std::vector<PacketTrace> input = {
+      PacketTrace{.uid = 123, .timestampNs = 2, .length = 200},
+      PacketTrace{.uid = 123, .timestampNs = 1, .length = 100},
+      PacketTrace{.uid = 123, .timestampNs = 4, .length = 300},
+
+      PacketTrace{.uid = 456, .timestampNs = 2, .length = 400},
+      PacketTrace{.uid = 456, .timestampNs = 4, .length = 100},
+  };
+
+  std::vector<TracePacket> events;
+  ASSERT_TRUE(TraceAndSortPackets(input, &events, config));
+
+  ASSERT_EQ(events.size(), 2);
+
+  EXPECT_EQ(events[0].timestamp(), 1);
+  EXPECT_EQ(events[0].network_packet_bundle().ctx().uid(), 123);
+  EXPECT_EQ(events[0].network_packet_bundle().total_duration(), 3);
+  EXPECT_EQ(events[0].network_packet_bundle().total_packets(), 3);
+  EXPECT_EQ(events[0].network_packet_bundle().total_length(), 600);
+
+  EXPECT_EQ(events[1].timestamp(), 2);
+  EXPECT_EQ(events[1].network_packet_bundle().ctx().uid(), 456);
+  EXPECT_THAT(events[1].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(400, 100));
+  EXPECT_THAT(events[1].network_packet_bundle().packet_timestamps(),
+              testing::ElementsAre(0, 2));
+}
+
+TEST_F(NetworkTraceHandlerTest, DropLocalPort) {
+  NetworkPacketTraceConfig config;
+  config.set_drop_local_port(true);
+  config.set_aggregation_threshold(10);
+
+  __be16 a = htons(10000);
+  __be16 b = htons(10001);
+  std::vector<PacketTrace> input = {
+      // Recall that local is `src` for egress and `dst` for ingress.
+      PacketTrace{.timestampNs = 1, .length = 2, .egress = true, .sport = a},
+      PacketTrace{.timestampNs = 2, .length = 4, .egress = false, .dport = a},
+      PacketTrace{.timestampNs = 3, .length = 6, .egress = true, .sport = b},
+      PacketTrace{.timestampNs = 4, .length = 8, .egress = false, .dport = b},
+  };
+
+  std::vector<TracePacket> events;
+  ASSERT_TRUE(TraceAndSortPackets(input, &events, config));
+  ASSERT_EQ(events.size(), 2);
+
+  // Despite having different local ports, drop and bundle by remaining fields.
+  EXPECT_EQ(events[0].network_packet_bundle().ctx().direction(),
+            TrafficDirection::DIR_EGRESS);
+  EXPECT_THAT(events[0].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(2, 6));
+
+  EXPECT_EQ(events[1].network_packet_bundle().ctx().direction(),
+            TrafficDirection::DIR_INGRESS);
+  EXPECT_THAT(events[1].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(4, 8));
+
+  // Local port shouldn't be in output.
+  EXPECT_FALSE(events[0].network_packet_bundle().ctx().has_local_port());
+  EXPECT_FALSE(events[1].network_packet_bundle().ctx().has_local_port());
+}
+
+TEST_F(NetworkTraceHandlerTest, DropRemotePort) {
+  NetworkPacketTraceConfig config;
+  config.set_drop_remote_port(true);
+  config.set_aggregation_threshold(10);
+
+  __be16 a = htons(443);
+  __be16 b = htons(80);
+  std::vector<PacketTrace> input = {
+      // Recall that remote is `dst` for egress and `src` for ingress.
+      PacketTrace{.timestampNs = 1, .length = 2, .egress = true, .dport = a},
+      PacketTrace{.timestampNs = 2, .length = 4, .egress = false, .sport = a},
+      PacketTrace{.timestampNs = 3, .length = 6, .egress = true, .dport = b},
+      PacketTrace{.timestampNs = 4, .length = 8, .egress = false, .sport = b},
+  };
+
+  std::vector<TracePacket> events;
+  ASSERT_TRUE(TraceAndSortPackets(input, &events, config));
+  ASSERT_EQ(events.size(), 2);
+
+  // Despite having different remote ports, drop and bundle by remaining fields.
+  EXPECT_EQ(events[0].network_packet_bundle().ctx().direction(),
+            TrafficDirection::DIR_EGRESS);
+  EXPECT_THAT(events[0].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(2, 6));
+
+  EXPECT_EQ(events[1].network_packet_bundle().ctx().direction(),
+            TrafficDirection::DIR_INGRESS);
+  EXPECT_THAT(events[1].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(4, 8));
+
+  // Remote port shouldn't be in output.
+  EXPECT_FALSE(events[0].network_packet_bundle().ctx().has_remote_port());
+  EXPECT_FALSE(events[1].network_packet_bundle().ctx().has_remote_port());
+}
+
+TEST_F(NetworkTraceHandlerTest, DropTcpFlags) {
+  NetworkPacketTraceConfig config;
+  config.set_drop_tcp_flags(true);
+  config.set_aggregation_threshold(10);
+
+  std::vector<PacketTrace> input = {
+      PacketTrace{.timestampNs = 1, .uid = 123, .length = 1, .tcpFlags = 1},
+      PacketTrace{.timestampNs = 2, .uid = 123, .length = 2, .tcpFlags = 2},
+      PacketTrace{.timestampNs = 3, .uid = 456, .length = 3, .tcpFlags = 1},
+      PacketTrace{.timestampNs = 4, .uid = 456, .length = 4, .tcpFlags = 2},
+  };
+
+  std::vector<TracePacket> events;
+  ASSERT_TRUE(TraceAndSortPackets(input, &events, config));
+
+  ASSERT_EQ(events.size(), 2);
+
+  // Despite having different tcp flags, drop and bundle by remaining fields.
+  EXPECT_EQ(events[0].network_packet_bundle().ctx().uid(), 123);
+  EXPECT_THAT(events[0].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(1, 2));
+
+  EXPECT_EQ(events[1].network_packet_bundle().ctx().uid(), 456);
+  EXPECT_THAT(events[1].network_packet_bundle().packet_lengths(),
+              testing::ElementsAre(3, 4));
+
+  // Tcp flags shouldn't be in output.
+  EXPECT_FALSE(events[0].network_packet_bundle().ctx().has_tcp_flags());
+  EXPECT_FALSE(events[1].network_packet_bundle().ctx().has_tcp_flags());
+}
+
 }  // namespace bpf
 }  // namespace android
diff --git a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
index 9520d7b..e9b194b 100644
--- a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
+++ b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
@@ -53,8 +53,8 @@
 
  private:
   // Convert a PacketTrace into a Perfetto trace packet.
-  static void Fill(const PacketTrace& src,
-                   ::perfetto::protos::pbzero::TracePacket& dst);
+  void Fill(const PacketTrace& src,
+            ::perfetto::protos::pbzero::NetworkPacketEvent* event);
 
   static internal::NetworkTracePoller sPoller;
   bool mStarted;