Merge changes I2ae32895,I9e3c6f01,Ifc0e00f3,Ia9cb163f
* changes:
Add support for dropped fields in NetworkTracing
Add support for NetworkTrace aggregation threshold
Add fallback in Network Tracing to old format
Add event bundling in NetworkTraceHandler
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
index cd62bc5..cd6480c 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
@@ -39,6 +39,62 @@
using ::perfetto::protos::pbzero::TracePacket;
using ::perfetto::protos::pbzero::TrafficDirection;
+// Bundling takes groups of packets with similar contextual fields (generally,
+// all fields except timestamp and length) and summarises them in a single trace
+// packet. For example, rather than
+//
+// {.timestampNs = 1, .uid = 1000, .tag = 123, .len = 72}
+// {.timestampNs = 2, .uid = 1000, .tag = 123, .len = 100}
+// {.timestampNs = 5, .uid = 1000, .tag = 123, .len = 456}
+//
+// The output will be something like
+// {
+// .timestamp = 1
+// .ctx = {.uid = 1000, .tag = 123}
+// .timestamp = [0, 1, 4], // delta encoded
+// .length = [72, 100, 456], // should be zipped with timestamps
+// }
+//
+// Most workloads have many packets from few contexts. Bundling greatly reduces
+// the amount of redundant information written, thus reducing the overall trace
+// size. Interning ids are similarly based on unique bundle contexts.
+
+// Keys are PacketTraces where timestamp and length are ignored.
+using BundleKey = PacketTrace;
+
+// Based on boost::hash_combine
+template <typename T, typename... Rest>
+void HashCombine(std::size_t& seed, const T& val, const Rest&... rest) {
+ seed ^= std::hash<T>()(val) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+ (HashCombine(seed, rest), ...);
+}
+
+// Details summarises the timestamp and lengths of packets in a bundle.
+struct BundleDetails {
+ std::vector<std::pair<uint64_t, uint32_t>> time_and_len;
+ uint64_t minTs = std::numeric_limits<uint64_t>::max();
+ uint64_t maxTs = std::numeric_limits<uint64_t>::min();
+ uint32_t bytes = 0;
+};
+
+#define AGG_FIELDS(x) \
+ (x).ifindex, (x).uid, (x).tag, (x).sport, (x).dport, (x).egress, \
+ (x).ipProto, (x).tcpFlags
+
+struct BundleHash {
+ std::size_t operator()(const BundleKey& a) const {
+ std::size_t seed = 0;
+ HashCombine(seed, AGG_FIELDS(a));
+ return seed;
+ }
+};
+
+struct BundleEq {
+ bool operator()(const BundleKey& a, const BundleKey& b) const {
+ return std::tie(AGG_FIELDS(a)) == std::tie(AGG_FIELDS(b));
+ }
+};
+
// static
void NetworkTraceHandler::RegisterDataSource() {
ALOGD("Registering Perfetto data source");
@@ -96,26 +152,82 @@
void NetworkTraceHandler::Write(const std::vector<PacketTrace>& packets,
NetworkTraceHandler::TraceContext& ctx) {
+ // TODO: remove this fallback once Perfetto stable has support for bundles.
+ if (!mInternLimit && !mAggregationThreshold) {
+ for (const PacketTrace& pkt : packets) {
+ auto dst = ctx.NewTracePacket();
+ dst->set_timestamp(pkt.timestampNs);
+ auto* event = dst->set_network_packet();
+ event->set_length(pkt.length);
+ Fill(pkt, event);
+ }
+ return;
+ }
+ std::unordered_map<BundleKey, BundleDetails, BundleHash, BundleEq> bundles;
for (const PacketTrace& pkt : packets) {
- Fill(pkt, *ctx.NewTracePacket());
+ BundleKey key = pkt;
+
+ // Dropping fields should remove them from the output and remove them from
+ // the aggregation key. In order to do the latter without changing the hash
+ // function, set the dropped fields to zero.
+ if (mDropTcpFlags) key.tcpFlags = 0;
+ if (mDropLocalPort) (key.egress ? key.sport : key.dport) = 0;
+ if (mDropRemotePort) (key.egress ? key.dport : key.sport) = 0;
+
+ BundleDetails& bundle = bundles[key];
+ bundle.time_and_len.emplace_back(pkt.timestampNs, pkt.length);
+ bundle.minTs = std::min(bundle.minTs, pkt.timestampNs);
+ bundle.maxTs = std::max(bundle.maxTs, pkt.timestampNs);
+ bundle.bytes += pkt.length;
+ }
+
+ for (const auto& kv : bundles) {
+ const BundleKey& key = kv.first;
+ const BundleDetails& details = kv.second;
+
+ auto dst = ctx.NewTracePacket();
+ dst->set_timestamp(details.minTs);
+
+ auto* event = dst->set_network_packet_bundle();
+ Fill(key, event->set_ctx());
+
+ int count = details.time_and_len.size();
+ if (!mAggregationThreshold || count < mAggregationThreshold) {
+ protozero::PackedVarInt offsets;
+ protozero::PackedVarInt lengths;
+ for (const auto& kv : details.time_and_len) {
+ offsets.Append(kv.first - details.minTs);
+ lengths.Append(kv.second);
+ }
+
+ event->set_packet_timestamps(offsets);
+ event->set_packet_lengths(lengths);
+ } else {
+ event->set_total_duration(details.maxTs - details.minTs);
+ event->set_total_length(details.bytes);
+ event->set_total_packets(count);
+ }
}
}
-// static class method
-void NetworkTraceHandler::Fill(const PacketTrace& src, TracePacket& dst) {
- dst.set_timestamp(src.timestampNs);
- auto* event = dst.set_network_packet();
+void NetworkTraceHandler::Fill(const PacketTrace& src,
+ NetworkPacketEvent* event) {
event->set_direction(src.egress ? TrafficDirection::DIR_EGRESS
: TrafficDirection::DIR_INGRESS);
- event->set_length(src.length);
event->set_uid(src.uid);
event->set_tag(src.tag);
- event->set_local_port(src.egress ? ntohs(src.sport) : ntohs(src.dport));
- event->set_remote_port(src.egress ? ntohs(src.dport) : ntohs(src.sport));
+ if (!mDropLocalPort) {
+ event->set_local_port(ntohs(src.egress ? src.sport : src.dport));
+ }
+ if (!mDropRemotePort) {
+ event->set_remote_port(ntohs(src.egress ? src.dport : src.sport));
+ }
+ if (!mDropTcpFlags) {
+ event->set_tcp_flags(src.tcpFlags);
+ }
event->set_ip_proto(src.ipProto);
- event->set_tcp_flags(src.tcpFlags);
char ifname[IF_NAMESIZE] = {};
if (if_indextoname(src.ifindex, ifname) == ifname) {
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
index 2318da5..d2c1d38 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandlerTest.cpp
@@ -20,6 +20,7 @@
#include <vector>
#include "netdbpf/NetworkTraceHandler.h"
+#include "protos/perfetto/config/android/network_trace_config.gen.h"
#include "protos/perfetto/trace/android/network_trace.pb.h"
#include "protos/perfetto/trace/trace.pb.h"
#include "protos/perfetto/trace/trace_packet.pb.h"
@@ -27,6 +28,7 @@
namespace android {
namespace bpf {
using ::perfetto::protos::NetworkPacketEvent;
+using ::perfetto::protos::NetworkPacketTraceConfig;
using ::perfetto::protos::Trace;
using ::perfetto::protos::TracePacket;
using ::perfetto::protos::TrafficDirection;
@@ -42,7 +44,8 @@
class NetworkTraceHandlerTest : public testing::Test {
protected:
// Starts a tracing session with the handler under test.
- std::unique_ptr<perfetto::TracingSession> StartTracing() {
+ std::unique_ptr<perfetto::TracingSession> StartTracing(
+ NetworkPacketTraceConfig settings) {
perfetto::TracingInitArgs args;
args.backends = perfetto::kInProcessBackend;
perfetto::Tracing::Initialize(args);
@@ -53,7 +56,9 @@
perfetto::TraceConfig cfg;
cfg.add_buffers()->set_size_kb(1024);
- cfg.add_data_sources()->mutable_config()->set_name("test.network_packets");
+ auto* config = cfg.add_data_sources()->mutable_config();
+ config->set_name("test.network_packets");
+ config->set_network_packet_trace_config_raw(settings.SerializeAsString());
auto session = perfetto::Tracing::NewTrace(perfetto::kInProcessBackend);
session->Setup(cfg);
@@ -76,7 +81,7 @@
// This is a real trace and includes irrelevant trace packets such as trace
// metadata. The following strips the results to just the packets we want.
for (const auto& pkt : trace.packet()) {
- if (pkt.has_network_packet()) {
+ if (pkt.has_network_packet() || pkt.has_network_packet_bundle()) {
output->emplace_back(pkt);
}
}
@@ -86,8 +91,9 @@
// This runs a trace with a single call to Write.
bool TraceAndSortPackets(const std::vector<PacketTrace>& input,
- std::vector<TracePacket>* output) {
- auto session = StartTracing();
+ std::vector<TracePacket>* output,
+ NetworkPacketTraceConfig config = {}) {
+ auto session = StartTracing(config);
HandlerForTest::Trace([&](HandlerForTest::TraceContext ctx) {
ctx.GetDataSourceLocked()->Write(input, ctx);
ctx.Flush();
@@ -125,11 +131,11 @@
ASSERT_EQ(events.size(), 1);
EXPECT_THAT(events[0].timestamp(), 1000);
- EXPECT_THAT(events[0].network_packet().length(), 100);
EXPECT_THAT(events[0].network_packet().uid(), 10);
EXPECT_THAT(events[0].network_packet().tag(), 123);
EXPECT_THAT(events[0].network_packet().ip_proto(), 6);
EXPECT_THAT(events[0].network_packet().tcp_flags(), 1);
+ EXPECT_THAT(events[0].network_packet().length(), 100);
}
TEST_F(NetworkTraceHandlerTest, WriteDirectionAndPorts) {
@@ -162,5 +168,175 @@
TrafficDirection::DIR_INGRESS);
}
+TEST_F(NetworkTraceHandlerTest, BasicBundling) {
+ // TODO: remove this once bundling becomes default. Until then, set arbitrary
+ // aggregation threshold to enable bundling.
+ NetworkPacketTraceConfig config;
+ config.set_aggregation_threshold(10);
+
+ std::vector<PacketTrace> input = {
+ PacketTrace{.uid = 123, .timestampNs = 2, .length = 200},
+ PacketTrace{.uid = 123, .timestampNs = 1, .length = 100},
+ PacketTrace{.uid = 123, .timestampNs = 4, .length = 300},
+
+ PacketTrace{.uid = 456, .timestampNs = 2, .length = 400},
+ PacketTrace{.uid = 456, .timestampNs = 4, .length = 100},
+ };
+
+ std::vector<TracePacket> events;
+ ASSERT_TRUE(TraceAndSortPackets(input, &events, config));
+
+ ASSERT_EQ(events.size(), 2);
+
+ EXPECT_THAT(events[0].timestamp(), 1);
+ EXPECT_THAT(events[0].network_packet_bundle().ctx().uid(), 123);
+ EXPECT_THAT(events[0].network_packet_bundle().packet_lengths(),
+ testing::ElementsAre(200, 100, 300));
+ EXPECT_THAT(events[0].network_packet_bundle().packet_timestamps(),
+ testing::ElementsAre(1, 0, 3));
+
+ EXPECT_THAT(events[1].timestamp(), 2);
+ EXPECT_THAT(events[1].network_packet_bundle().ctx().uid(), 456);
+ EXPECT_THAT(events[1].network_packet_bundle().packet_lengths(),
+ testing::ElementsAre(400, 100));
+ EXPECT_THAT(events[1].network_packet_bundle().packet_timestamps(),
+ testing::ElementsAre(0, 2));
+}
+
+TEST_F(NetworkTraceHandlerTest, AggregationThreshold) {
+ // With an aggregation threshold of 3, the set of packets with uid=123 will
+ // be aggregated (3>=3) whereas packets with uid=456 get per-packet info.
+ NetworkPacketTraceConfig config;
+ config.set_aggregation_threshold(3);
+
+ std::vector<PacketTrace> input = {
+ PacketTrace{.uid = 123, .timestampNs = 2, .length = 200},
+ PacketTrace{.uid = 123, .timestampNs = 1, .length = 100},
+ PacketTrace{.uid = 123, .timestampNs = 4, .length = 300},
+
+ PacketTrace{.uid = 456, .timestampNs = 2, .length = 400},
+ PacketTrace{.uid = 456, .timestampNs = 4, .length = 100},
+ };
+
+ std::vector<TracePacket> events;
+ ASSERT_TRUE(TraceAndSortPackets(input, &events, config));
+
+ ASSERT_EQ(events.size(), 2);
+
+ EXPECT_EQ(events[0].timestamp(), 1);
+ EXPECT_EQ(events[0].network_packet_bundle().ctx().uid(), 123);
+ EXPECT_EQ(events[0].network_packet_bundle().total_duration(), 3);
+ EXPECT_EQ(events[0].network_packet_bundle().total_packets(), 3);
+ EXPECT_EQ(events[0].network_packet_bundle().total_length(), 600);
+
+ EXPECT_EQ(events[1].timestamp(), 2);
+ EXPECT_EQ(events[1].network_packet_bundle().ctx().uid(), 456);
+ EXPECT_THAT(events[1].network_packet_bundle().packet_lengths(),
+ testing::ElementsAre(400, 100));
+ EXPECT_THAT(events[1].network_packet_bundle().packet_timestamps(),
+ testing::ElementsAre(0, 2));
+}
+
+TEST_F(NetworkTraceHandlerTest, DropLocalPort) {
+ NetworkPacketTraceConfig config;
+ config.set_drop_local_port(true);
+ config.set_aggregation_threshold(10);
+
+ __be16 a = htons(10000);
+ __be16 b = htons(10001);
+ std::vector<PacketTrace> input = {
+ // Recall that local is `src` for egress and `dst` for ingress.
+ PacketTrace{.timestampNs = 1, .length = 2, .egress = true, .sport = a},
+ PacketTrace{.timestampNs = 2, .length = 4, .egress = false, .dport = a},
+ PacketTrace{.timestampNs = 3, .length = 6, .egress = true, .sport = b},
+ PacketTrace{.timestampNs = 4, .length = 8, .egress = false, .dport = b},
+ };
+
+ std::vector<TracePacket> events;
+ ASSERT_TRUE(TraceAndSortPackets(input, &events, config));
+ ASSERT_EQ(events.size(), 2);
+
+ // Despite having different local ports, drop and bundle by remaining fields.
+ EXPECT_EQ(events[0].network_packet_bundle().ctx().direction(),
+ TrafficDirection::DIR_EGRESS);
+ EXPECT_THAT(events[0].network_packet_bundle().packet_lengths(),
+ testing::ElementsAre(2, 6));
+
+ EXPECT_EQ(events[1].network_packet_bundle().ctx().direction(),
+ TrafficDirection::DIR_INGRESS);
+ EXPECT_THAT(events[1].network_packet_bundle().packet_lengths(),
+ testing::ElementsAre(4, 8));
+
+ // Local port shouldn't be in output.
+ EXPECT_FALSE(events[0].network_packet_bundle().ctx().has_local_port());
+ EXPECT_FALSE(events[1].network_packet_bundle().ctx().has_local_port());
+}
+
+TEST_F(NetworkTraceHandlerTest, DropRemotePort) {
+ NetworkPacketTraceConfig config;
+ config.set_drop_remote_port(true);
+ config.set_aggregation_threshold(10);
+
+ __be16 a = htons(443);
+ __be16 b = htons(80);
+ std::vector<PacketTrace> input = {
+ // Recall that remote is `dst` for egress and `src` for ingress.
+ PacketTrace{.timestampNs = 1, .length = 2, .egress = true, .dport = a},
+ PacketTrace{.timestampNs = 2, .length = 4, .egress = false, .sport = a},
+ PacketTrace{.timestampNs = 3, .length = 6, .egress = true, .dport = b},
+ PacketTrace{.timestampNs = 4, .length = 8, .egress = false, .sport = b},
+ };
+
+ std::vector<TracePacket> events;
+ ASSERT_TRUE(TraceAndSortPackets(input, &events, config));
+ ASSERT_EQ(events.size(), 2);
+
+ // Despite having different remote ports, drop and bundle by remaining fields.
+ EXPECT_EQ(events[0].network_packet_bundle().ctx().direction(),
+ TrafficDirection::DIR_EGRESS);
+ EXPECT_THAT(events[0].network_packet_bundle().packet_lengths(),
+ testing::ElementsAre(2, 6));
+
+ EXPECT_EQ(events[1].network_packet_bundle().ctx().direction(),
+ TrafficDirection::DIR_INGRESS);
+ EXPECT_THAT(events[1].network_packet_bundle().packet_lengths(),
+ testing::ElementsAre(4, 8));
+
+ // Remote port shouldn't be in output.
+ EXPECT_FALSE(events[0].network_packet_bundle().ctx().has_remote_port());
+ EXPECT_FALSE(events[1].network_packet_bundle().ctx().has_remote_port());
+}
+
+TEST_F(NetworkTraceHandlerTest, DropTcpFlags) {
+ NetworkPacketTraceConfig config;
+ config.set_drop_tcp_flags(true);
+ config.set_aggregation_threshold(10);
+
+ std::vector<PacketTrace> input = {
+ PacketTrace{.timestampNs = 1, .uid = 123, .length = 1, .tcpFlags = 1},
+ PacketTrace{.timestampNs = 2, .uid = 123, .length = 2, .tcpFlags = 2},
+ PacketTrace{.timestampNs = 3, .uid = 456, .length = 3, .tcpFlags = 1},
+ PacketTrace{.timestampNs = 4, .uid = 456, .length = 4, .tcpFlags = 2},
+ };
+
+ std::vector<TracePacket> events;
+ ASSERT_TRUE(TraceAndSortPackets(input, &events, config));
+
+ ASSERT_EQ(events.size(), 2);
+
+ // Despite having different tcp flags, drop and bundle by remaining fields.
+ EXPECT_EQ(events[0].network_packet_bundle().ctx().uid(), 123);
+ EXPECT_THAT(events[0].network_packet_bundle().packet_lengths(),
+ testing::ElementsAre(1, 2));
+
+ EXPECT_EQ(events[1].network_packet_bundle().ctx().uid(), 456);
+ EXPECT_THAT(events[1].network_packet_bundle().packet_lengths(),
+ testing::ElementsAre(3, 4));
+
+ // Tcp flags shouldn't be in output.
+ EXPECT_FALSE(events[0].network_packet_bundle().ctx().has_tcp_flags());
+ EXPECT_FALSE(events[1].network_packet_bundle().ctx().has_tcp_flags());
+}
+
} // namespace bpf
} // namespace android
diff --git a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
index 9520d7b..e9b194b 100644
--- a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
+++ b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
@@ -53,8 +53,8 @@
private:
// Convert a PacketTrace into a Perfetto trace packet.
- static void Fill(const PacketTrace& src,
- ::perfetto::protos::pbzero::TracePacket& dst);
+ void Fill(const PacketTrace& src,
+ ::perfetto::protos::pbzero::NetworkPacketEvent* event);
static internal::NetworkTracePoller sPoller;
bool mStarted;