Merge "Add interning support to NetworkTracing"
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
index cd6480c..696a29a 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
@@ -34,6 +34,7 @@
 namespace android {
 namespace bpf {
 using ::android::bpf::internal::NetworkTracePoller;
+using ::perfetto::protos::pbzero::NetworkPacketBundle;
 using ::perfetto::protos::pbzero::NetworkPacketEvent;
 using ::perfetto::protos::pbzero::NetworkPacketTraceConfig;
 using ::perfetto::protos::pbzero::TracePacket;
@@ -59,9 +60,6 @@
 // the amount of redundant information written, thus reducing the overall trace
 // size. Interning ids are similarly based on unique bundle contexts.
 
-// Keys are PacketTraces where timestamp and length are ignored.
-using BundleKey = PacketTrace;
-
 // Based on boost::hash_combine
 template <typename T, typename... Rest>
 void HashCombine(std::size_t& seed, const T& val, const Rest&... rest) {
@@ -81,19 +79,15 @@
   (x).ifindex, (x).uid, (x).tag, (x).sport, (x).dport, (x).egress, \
       (x).ipProto, (x).tcpFlags
 
-struct BundleHash {
-  std::size_t operator()(const BundleKey& a) const {
-    std::size_t seed = 0;
-    HashCombine(seed, AGG_FIELDS(a));
-    return seed;
-  }
-};
+std::size_t BundleHash::operator()(const BundleKey& a) const {
+  std::size_t seed = 0;
+  HashCombine(seed, AGG_FIELDS(a));
+  return seed;
+}
 
-struct BundleEq {
-  bool operator()(const BundleKey& a, const BundleKey& b) const {
-    return std::tie(AGG_FIELDS(a)) == std::tie(AGG_FIELDS(b));
-  }
-};
+bool BundleEq::operator()(const BundleKey& a, const BundleKey& b) const {
+  return std::tie(AGG_FIELDS(a)) == std::tie(AGG_FIELDS(b));
+}
 
 // static
 void NetworkTraceHandler::RegisterDataSource() {
@@ -163,6 +157,8 @@
     }
     return;
   }
+
+  uint64_t minTs = std::numeric_limits<uint64_t>::max();
   std::unordered_map<BundleKey, BundleDetails, BundleHash, BundleEq> bundles;
   for (const PacketTrace& pkt : packets) {
     BundleKey key = pkt;
@@ -174,6 +170,8 @@
     if (mDropLocalPort) (key.egress ? key.sport : key.dport) = 0;
     if (mDropRemotePort) (key.egress ? key.dport : key.sport) = 0;
 
+    minTs = std::min(minTs, pkt.timestampNs);
+
     BundleDetails& bundle = bundles[key];
     bundle.time_and_len.emplace_back(pkt.timestampNs, pkt.length);
     bundle.minTs = std::min(bundle.minTs, pkt.timestampNs);
@@ -181,15 +179,25 @@
     bundle.bytes += pkt.length;
   }
 
+  // If state was cleared, emit a separate packet to indicate it. This uses the
+  // overall minTs so it is sorted before any packets that follow.
+  NetworkTraceState* incr_state = ctx.GetIncrementalState();
+  if (!bundles.empty() && mInternLimit && incr_state->cleared) {
+    auto clear = ctx.NewTracePacket();
+    clear->set_sequence_flags(TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
+    clear->set_timestamp(minTs);
+    incr_state->cleared = false;
+  }
+
   for (const auto& kv : bundles) {
     const BundleKey& key = kv.first;
     const BundleDetails& details = kv.second;
 
     auto dst = ctx.NewTracePacket();
+    dst->set_sequence_flags(TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
     dst->set_timestamp(details.minTs);
 
-    auto* event = dst->set_network_packet_bundle();
-    Fill(key, event->set_ctx());
+    auto* event = FillWithInterning(incr_state, key, dst.get());
 
     int count = details.time_and_len.size();
     if (!mAggregationThreshold || count < mAggregationThreshold) {
@@ -237,5 +245,39 @@
   }
 }
 
+NetworkPacketBundle* NetworkTraceHandler::FillWithInterning(
+    NetworkTraceState* state, const BundleKey& key, TracePacket* dst) {
+  uint64_t iid = 0;
+  bool found = false;
+
+  if (state->iids.size() < mInternLimit) {
+    auto [iter, success] = state->iids.try_emplace(key, state->iids.size() + 1);
+    iid = iter->second;
+    found = true;
+
+    if (success) {
+      // If we successfully empaced, record the newly interned data.
+      auto* packet_context = dst->set_interned_data()->add_packet_context();
+      Fill(key, packet_context->set_ctx());
+      packet_context->set_iid(iid);
+    }
+  } else {
+    auto iter = state->iids.find(key);
+    if (iter != state->iids.end()) {
+      iid = iter->second;
+      found = true;
+    }
+  }
+
+  auto* event = dst->set_network_packet_bundle();
+  if (found) {
+    event->set_iid(iid);
+  } else {
+    Fill(key, event->set_ctx());
+  }
+
+  return event;
+}
+
 }  // namespace bpf
 }  // namespace android
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
index d2c1d38..c9eb183 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
@@ -338,5 +338,57 @@
   EXPECT_FALSE(events[1].network_packet_bundle().ctx().has_tcp_flags());
 }
 
+TEST_F(NetworkTraceHandlerTest, Interning) {
+  NetworkPacketTraceConfig config;
+  config.set_intern_limit(2);
+
+  // The test writes 4 packets coming from three sources (uids). With an intern
+  // limit of 2, the first two sources should be interned. This test splits this
+  // into individual writes since internally an unordered map is used and would
+  // otherwise non-deterministically choose what to intern (this is fine for
+  // real use, but not good for test assertions).
+  std::vector<std::vector<PacketTrace>> inputs = {
+      {PacketTrace{.timestampNs = 1, .uid = 123}},
+      {PacketTrace{.timestampNs = 2, .uid = 456}},
+      {PacketTrace{.timestampNs = 3, .uid = 789}},
+      {PacketTrace{.timestampNs = 4, .uid = 123}},
+  };
+
+  auto session = StartTracing(config);
+
+  HandlerForTest::Trace([&](HandlerForTest::TraceContext ctx) {
+    ctx.GetDataSourceLocked()->Write(inputs[0], ctx);
+    ctx.GetDataSourceLocked()->Write(inputs[1], ctx);
+    ctx.GetDataSourceLocked()->Write(inputs[2], ctx);
+    ctx.GetDataSourceLocked()->Write(inputs[3], ctx);
+    ctx.Flush();
+  });
+
+  std::vector<TracePacket> events;
+  ASSERT_TRUE(StopTracing(session.get(), &events));
+
+  ASSERT_EQ(events.size(), 4);
+
+  // First time seen, emit new interned data, bundle uses iid instead of ctx.
+  EXPECT_EQ(events[0].network_packet_bundle().iid(), 1);
+  ASSERT_EQ(events[0].interned_data().packet_context().size(), 1);
+  EXPECT_EQ(events[0].interned_data().packet_context(0).iid(), 1);
+  EXPECT_EQ(events[0].interned_data().packet_context(0).ctx().uid(), 123);
+
+  // First time seen, emit new interned data, bundle uses iid instead of ctx.
+  EXPECT_EQ(events[1].network_packet_bundle().iid(), 2);
+  ASSERT_EQ(events[1].interned_data().packet_context().size(), 1);
+  EXPECT_EQ(events[1].interned_data().packet_context(0).iid(), 2);
+  EXPECT_EQ(events[1].interned_data().packet_context(0).ctx().uid(), 456);
+
+  // Not enough room in intern table (limit 2), inline the context.
+  EXPECT_EQ(events[2].network_packet_bundle().ctx().uid(), 789);
+  EXPECT_EQ(events[2].interned_data().packet_context().size(), 0);
+
+  // Second time seen, no need to re-emit interned data, only record iid.
+  EXPECT_EQ(events[3].network_packet_bundle().iid(), 1);
+  EXPECT_EQ(events[3].interned_data().packet_context().size(), 0);
+}
+
 }  // namespace bpf
 }  // namespace android
diff --git a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
index e9b194b..80871c6 100644
--- a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
+++ b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
@@ -30,10 +30,39 @@
 namespace android {
 namespace bpf {
 
+// BundleKeys are PacketTraces where timestamp and length are ignored.
+using BundleKey = PacketTrace;
+
+// BundleKeys are hashed using all fields except timestamp/length.
+struct BundleHash {
+  std::size_t operator()(const BundleKey& a) const;
+};
+
+// BundleKeys are equal if all fields except timestamp/length are equal.
+struct BundleEq {
+  bool operator()(const BundleKey& a, const BundleKey& b) const;
+};
+
+// Track the bundles we've interned and their corresponding intern id (iid). We
+// use IncrementalState (rather than state in the Handler) so that we stay in
+// sync with Perfetto's periodic state clearing (which helps recover from packet
+// loss). When state is cleared, the state object is replaced with a new default
+// constructed instance.
+struct NetworkTraceState {
+  bool cleared;
+  std::unordered_map<BundleKey, uint64_t, BundleHash, BundleEq> iids;
+};
+
+// Inject our custom incremental state type using type traits.
+struct NetworkTraceTraits : public perfetto::DefaultDataSourceTraits {
+  using IncrementalStateType = NetworkTraceState;
+};
+
 // NetworkTraceHandler implements the android.network_packets data source. This
 // class is registered with Perfetto and is instantiated when tracing starts and
 // destroyed when tracing ends. There is one instance per trace session.
-class NetworkTraceHandler : public perfetto::DataSource<NetworkTraceHandler> {
+class NetworkTraceHandler
+    : public perfetto::DataSource<NetworkTraceHandler, NetworkTraceTraits> {
  public:
   // Registers this DataSource.
   static void RegisterDataSource();
@@ -56,6 +85,11 @@
   void Fill(const PacketTrace& src,
             ::perfetto::protos::pbzero::NetworkPacketEvent* event);
 
+  // Fills in contextual information either inline or via interning.
+  ::perfetto::protos::pbzero::NetworkPacketBundle* FillWithInterning(
+      NetworkTraceState* state, const BundleKey& key,
+      ::perfetto::protos::pbzero::TracePacket* dst);
+
   static internal::NetworkTracePoller sPoller;
   bool mStarted;