Add interning support to NetworkTracing
Interning is a feature Perfetto offers. You can store details in the
intern table and associate it with an id. Then, each trace packet can
reference just the id rather than the full proto contents. In our case,
we already identify unique contexts, so all we need to do is give them
unique IDs and record that instead.
Bug: 246985031
Test: atest libnetworkstats_test
Change-Id: I84f7673bc41b89390c02b8ec5460adfadbb36173
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
index cd6480c..696a29a 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
@@ -34,6 +34,7 @@
namespace android {
namespace bpf {
using ::android::bpf::internal::NetworkTracePoller;
+using ::perfetto::protos::pbzero::NetworkPacketBundle;
using ::perfetto::protos::pbzero::NetworkPacketEvent;
using ::perfetto::protos::pbzero::NetworkPacketTraceConfig;
using ::perfetto::protos::pbzero::TracePacket;
@@ -59,9 +60,6 @@
// the amount of redundant information written, thus reducing the overall trace
// size. Interning ids are similarly based on unique bundle contexts.
-// Keys are PacketTraces where timestamp and length are ignored.
-using BundleKey = PacketTrace;
-
// Based on boost::hash_combine
template <typename T, typename... Rest>
void HashCombine(std::size_t& seed, const T& val, const Rest&... rest) {
@@ -81,19 +79,15 @@
(x).ifindex, (x).uid, (x).tag, (x).sport, (x).dport, (x).egress, \
(x).ipProto, (x).tcpFlags
-struct BundleHash {
- std::size_t operator()(const BundleKey& a) const {
- std::size_t seed = 0;
- HashCombine(seed, AGG_FIELDS(a));
- return seed;
- }
-};
+std::size_t BundleHash::operator()(const BundleKey& a) const {
+ std::size_t seed = 0;
+ HashCombine(seed, AGG_FIELDS(a));
+ return seed;
+}
-struct BundleEq {
- bool operator()(const BundleKey& a, const BundleKey& b) const {
- return std::tie(AGG_FIELDS(a)) == std::tie(AGG_FIELDS(b));
- }
-};
+bool BundleEq::operator()(const BundleKey& a, const BundleKey& b) const {
+ return std::tie(AGG_FIELDS(a)) == std::tie(AGG_FIELDS(b));
+}
// static
void NetworkTraceHandler::RegisterDataSource() {
@@ -163,6 +157,8 @@
}
return;
}
+
+ uint64_t minTs = std::numeric_limits<uint64_t>::max();
std::unordered_map<BundleKey, BundleDetails, BundleHash, BundleEq> bundles;
for (const PacketTrace& pkt : packets) {
BundleKey key = pkt;
@@ -174,6 +170,8 @@
if (mDropLocalPort) (key.egress ? key.sport : key.dport) = 0;
if (mDropRemotePort) (key.egress ? key.dport : key.sport) = 0;
+ minTs = std::min(minTs, pkt.timestampNs);
+
BundleDetails& bundle = bundles[key];
bundle.time_and_len.emplace_back(pkt.timestampNs, pkt.length);
bundle.minTs = std::min(bundle.minTs, pkt.timestampNs);
@@ -181,15 +179,25 @@
bundle.bytes += pkt.length;
}
+ // If state was cleared, emit a separate packet to indicate it. This uses the
+ // overall minTs so it is sorted before any packets that follow.
+ NetworkTraceState* incr_state = ctx.GetIncrementalState();
+ if (!bundles.empty() && mInternLimit && incr_state->cleared) {
+ auto clear = ctx.NewTracePacket();
+ clear->set_sequence_flags(TracePacket::SEQ_INCREMENTAL_STATE_CLEARED);
+ clear->set_timestamp(minTs);
+ incr_state->cleared = false;
+ }
+
for (const auto& kv : bundles) {
const BundleKey& key = kv.first;
const BundleDetails& details = kv.second;
auto dst = ctx.NewTracePacket();
+ dst->set_sequence_flags(TracePacket::SEQ_NEEDS_INCREMENTAL_STATE);
dst->set_timestamp(details.minTs);
- auto* event = dst->set_network_packet_bundle();
- Fill(key, event->set_ctx());
+ auto* event = FillWithInterning(incr_state, key, dst.get());
int count = details.time_and_len.size();
if (!mAggregationThreshold || count < mAggregationThreshold) {
@@ -237,5 +245,39 @@
}
}
+NetworkPacketBundle* NetworkTraceHandler::FillWithInterning(
+ NetworkTraceState* state, const BundleKey& key, TracePacket* dst) {
+ uint64_t iid = 0;
+ bool found = false;
+
+ if (state->iids.size() < mInternLimit) {
+ auto [iter, success] = state->iids.try_emplace(key, state->iids.size() + 1);
+ iid = iter->second;
+ found = true;
+
+ if (success) {
+ // If we successfully empaced, record the newly interned data.
+ auto* packet_context = dst->set_interned_data()->add_packet_context();
+ Fill(key, packet_context->set_ctx());
+ packet_context->set_iid(iid);
+ }
+ } else {
+ auto iter = state->iids.find(key);
+ if (iter != state->iids.end()) {
+ iid = iter->second;
+ found = true;
+ }
+ }
+
+ auto* event = dst->set_network_packet_bundle();
+ if (found) {
+ event->set_iid(iid);
+ } else {
+ Fill(key, event->set_ctx());
+ }
+
+ return event;
+}
+
} // namespace bpf
} // namespace android
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
index d2c1d38..c9eb183 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
@@ -338,5 +338,57 @@
EXPECT_FALSE(events[1].network_packet_bundle().ctx().has_tcp_flags());
}
+TEST_F(NetworkTraceHandlerTest, Interning) {
+ NetworkPacketTraceConfig config;
+ config.set_intern_limit(2);
+
+ // The test writes 4 packets coming from three sources (uids). With an intern
+ // limit of 2, the first two sources should be interned. This test splits this
+ // into individual writes since internally an unordered map is used and would
+ // otherwise non-deterministically choose what to intern (this is fine for
+ // real use, but not good for test assertions).
+ std::vector<std::vector<PacketTrace>> inputs = {
+ {PacketTrace{.timestampNs = 1, .uid = 123}},
+ {PacketTrace{.timestampNs = 2, .uid = 456}},
+ {PacketTrace{.timestampNs = 3, .uid = 789}},
+ {PacketTrace{.timestampNs = 4, .uid = 123}},
+ };
+
+ auto session = StartTracing(config);
+
+ HandlerForTest::Trace([&](HandlerForTest::TraceContext ctx) {
+ ctx.GetDataSourceLocked()->Write(inputs[0], ctx);
+ ctx.GetDataSourceLocked()->Write(inputs[1], ctx);
+ ctx.GetDataSourceLocked()->Write(inputs[2], ctx);
+ ctx.GetDataSourceLocked()->Write(inputs[3], ctx);
+ ctx.Flush();
+ });
+
+ std::vector<TracePacket> events;
+ ASSERT_TRUE(StopTracing(session.get(), &events));
+
+ ASSERT_EQ(events.size(), 4);
+
+ // First time seen, emit new interned data, bundle uses iid instead of ctx.
+ EXPECT_EQ(events[0].network_packet_bundle().iid(), 1);
+ ASSERT_EQ(events[0].interned_data().packet_context().size(), 1);
+ EXPECT_EQ(events[0].interned_data().packet_context(0).iid(), 1);
+ EXPECT_EQ(events[0].interned_data().packet_context(0).ctx().uid(), 123);
+
+ // First time seen, emit new interned data, bundle uses iid instead of ctx.
+ EXPECT_EQ(events[1].network_packet_bundle().iid(), 2);
+ ASSERT_EQ(events[1].interned_data().packet_context().size(), 1);
+ EXPECT_EQ(events[1].interned_data().packet_context(0).iid(), 2);
+ EXPECT_EQ(events[1].interned_data().packet_context(0).ctx().uid(), 456);
+
+ // Not enough room in intern table (limit 2), inline the context.
+ EXPECT_EQ(events[2].network_packet_bundle().ctx().uid(), 789);
+ EXPECT_EQ(events[2].interned_data().packet_context().size(), 0);
+
+ // Second time seen, no need to re-emit interned data, only record iid.
+ EXPECT_EQ(events[3].network_packet_bundle().iid(), 1);
+ EXPECT_EQ(events[3].interned_data().packet_context().size(), 0);
+}
+
} // namespace bpf
} // namespace android
diff --git a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
index e9b194b..80871c6 100644
--- a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
+++ b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
@@ -30,10 +30,39 @@
namespace android {
namespace bpf {
+// BundleKeys are PacketTraces where timestamp and length are ignored.
+using BundleKey = PacketTrace;
+
+// BundleKeys are hashed using all fields except timestamp/length.
+struct BundleHash {
+ std::size_t operator()(const BundleKey& a) const;
+};
+
+// BundleKeys are equal if all fields except timestamp/length are equal.
+struct BundleEq {
+ bool operator()(const BundleKey& a, const BundleKey& b) const;
+};
+
+// Track the bundles we've interned and their corresponding intern id (iid). We
+// use IncrementalState (rather than state in the Handler) so that we stay in
+// sync with Perfetto's periodic state clearing (which helps recover from packet
+// loss). When state is cleared, the state object is replaced with a new default
+// constructed instance.
+struct NetworkTraceState {
+ bool cleared;
+ std::unordered_map<BundleKey, uint64_t, BundleHash, BundleEq> iids;
+};
+
+// Inject our custom incremental state type using type traits.
+struct NetworkTraceTraits : public perfetto::DefaultDataSourceTraits {
+ using IncrementalStateType = NetworkTraceState;
+};
+
// NetworkTraceHandler implements the android.network_packets data source. This
// class is registered with Perfetto and is instantiated when tracing starts and
// destroyed when tracing ends. There is one instance per trace session.
-class NetworkTraceHandler : public perfetto::DataSource<NetworkTraceHandler> {
+class NetworkTraceHandler
+ : public perfetto::DataSource<NetworkTraceHandler, NetworkTraceTraits> {
public:
// Registers this DataSource.
static void RegisterDataSource();
@@ -56,6 +85,11 @@
void Fill(const PacketTrace& src,
::perfetto::protos::pbzero::NetworkPacketEvent* event);
+ // Fills in contextual information either inline or via interning.
+ ::perfetto::protos::pbzero::NetworkPacketBundle* FillWithInterning(
+ NetworkTraceState* state, const BundleKey& key,
+ ::perfetto::protos::pbzero::TracePacket* dst);
+
static internal::NetworkTracePoller sPoller;
bool mStarted;