Parse new config options and to batch in callback
This converts the poller's callback to batch-style pass all of the
events each time it polls rather than one-by-one. This requires a bit
more memory (up to 32kb) but will allow optimizations in the following
changes that should reduce the trace size and cpu by ~10x.
Bug: 246985031
Test: atest libnetworkstats_test
Change-Id: Ia3223ba8b27b825e2d63d6b3b8ac09b8eb17b3f8
diff --git a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
index 8e70950..07f8054 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTraceHandler.cpp
@@ -57,11 +57,14 @@
}
// static
-NetworkTracePoller NetworkTraceHandler::sPoller([](const PacketTrace& pkt) {
- NetworkTraceHandler::Trace([pkt](NetworkTraceHandler::TraceContext ctx) {
- NetworkTraceHandler::Fill(pkt, *ctx.NewTracePacket());
- });
-});
+NetworkTracePoller NetworkTraceHandler::sPoller(
+ [](const std::vector<PacketTrace>& packets) {
+ NetworkTraceHandler::Trace([&](NetworkTraceHandler::TraceContext ctx) {
+ for (const PacketTrace& pkt : packets) {
+ NetworkTraceHandler::Fill(pkt, *ctx.NewTracePacket());
+ }
+ });
+ });
void NetworkTraceHandler::OnSetup(const SetupArgs& args) {
const std::string& raw = args.config->network_packet_trace_config_raw();
@@ -72,6 +75,12 @@
ALOGI("poll_ms is missing or below the 100ms minimum. Increasing to 100ms");
mPollMs = 100;
}
+
+ mInternLimit = config.intern_limit();
+ mAggregationThreshold = config.aggregation_threshold();
+ mDropLocalPort = config.drop_local_port();
+ mDropRemotePort = config.drop_remote_port();
+ mDropTcpFlags = config.drop_tcp_flags();
}
void NetworkTraceHandler::OnStart(const StartArgs&) {
diff --git a/service-t/native/libs/libnetworkstats/NetworkTracePoller.cpp b/service-t/native/libs/libnetworkstats/NetworkTracePoller.cpp
index 34dbf9e..3abb49a 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTracePoller.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTracePoller.cpp
@@ -116,12 +116,16 @@
return false;
}
- base::Result<int> ret = mRingBuffer->ConsumeAll(mCallback);
+ std::vector<PacketTrace> packets;
+ base::Result<int> ret = mRingBuffer->ConsumeAll(
+ [&](const PacketTrace& pkt) { packets.push_back(pkt); });
if (!ret.ok()) {
ALOGW("Failed to poll ringbuf: %s", ret.error().message().c_str());
return false;
}
+ mCallback(packets);
+
return true;
}
diff --git a/service-t/native/libs/libnetworkstats/NetworkTracePollerTest.cpp b/service-t/native/libs/libnetworkstats/NetworkTracePollerTest.cpp
index 28ec208..725cec1 100644
--- a/service-t/native/libs/libnetworkstats/NetworkTracePollerTest.cpp
+++ b/service-t/native/libs/libnetworkstats/NetworkTracePollerTest.cpp
@@ -100,7 +100,7 @@
};
TEST_F(NetworkTracePollerTest, PollWhileInactive) {
- NetworkTracePoller handler([&](const PacketTrace& pkt) {});
+ NetworkTracePoller handler([&](const std::vector<PacketTrace>& pkt) {});
// One succeed after start and before stop.
EXPECT_FALSE(handler.ConsumeAll());
@@ -113,7 +113,7 @@
TEST_F(NetworkTracePollerTest, ConcurrentSessions) {
// Simulate two concurrent sessions (two starts followed by two stops). Check
// that tracing is stopped only after both sessions finish.
- NetworkTracePoller handler([&](const PacketTrace& pkt) {});
+ NetworkTracePoller handler([&](const std::vector<PacketTrace>& pkt) {});
ASSERT_TRUE(handler.Start(kNeverPoll));
EXPECT_TRUE(handler.ConsumeAll());
@@ -135,10 +135,12 @@
// Record all packets with the bound address and current uid. This callback is
// involked only within ConsumeAll, at which point the port should have
// already been filled in and all packets have been processed.
- NetworkTracePoller handler([&](const PacketTrace& pkt) {
- if (pkt.sport != server_port && pkt.dport != server_port) return;
- if (pkt.uid != getuid()) return;
- packets.push_back(pkt);
+ NetworkTracePoller handler([&](const std::vector<PacketTrace>& pkts) {
+ for (const PacketTrace& pkt : pkts) {
+ if (pkt.sport != server_port && pkt.dport != server_port) return;
+ if (pkt.uid != getuid()) return;
+ packets.push_back(pkt);
+ }
});
ASSERT_TRUE(handler.Start(kNeverPoll));
diff --git a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
index 1266237..bdc18fb 100644
--- a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
+++ b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTraceHandler.h
@@ -52,8 +52,15 @@
::perfetto::protos::pbzero::TracePacket& dst);
static internal::NetworkTracePoller sPoller;
- uint32_t mPollMs;
bool mStarted;
+
+ // Values from config, see proto for details.
+ uint32_t mPollMs;
+ uint32_t mInternLimit;
+ uint32_t mAggregationThreshold;
+ bool mDropLocalPort;
+ bool mDropRemotePort;
+ bool mDropTcpFlags;
};
} // namespace bpf
diff --git a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTracePoller.h b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTracePoller.h
index b0189a7..adde51e 100644
--- a/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTracePoller.h
+++ b/service-t/native/libs/libnetworkstats/include/netdbpf/NetworkTracePoller.h
@@ -38,9 +38,10 @@
// it is not meant to be used elsewhere.
class NetworkTracePoller {
public:
+ using EventSink = std::function<void(const std::vector<PacketTrace>&)>;
+
// Testonly: initialize with a callback capable of intercepting data.
- NetworkTracePoller(std::function<void(const PacketTrace&)> callback)
- : mCallback(std::move(callback)) {}
+ NetworkTracePoller(EventSink callback) : mCallback(std::move(callback)) {}
// Starts tracing with the given poll interval.
bool Start(uint32_t pollMs) EXCLUDES(mMutex);
@@ -67,7 +68,7 @@
uint32_t mPollMs GUARDED_BY(mMutex);
// The function to process PacketTrace, typically a Perfetto sink.
- std::function<void(const PacketTrace&)> mCallback GUARDED_BY(mMutex);
+ EventSink mCallback GUARDED_BY(mMutex);
// The BPF ring buffer handle.
std::unique_ptr<BpfRingbuf<PacketTrace>> mRingBuffer GUARDED_BY(mMutex);