Use zetasketch for input statistics

Switch LatencyAggregator to use the actual zetasketch library. Report
the data to westworld via 2 atoms. 1 atom is for slow events, and 1 is
for latency sketches.

Estimating < 25 Kb per atom after few minutes of non-stop interaction with the screen.
Added a hard limit at 20 Kb

Dumped the data to a file on the device, and then read that data using
the full zetasketch implementation. Confirmed that the data looks OK.

Bug: 167947340
Test: adb shell dumpsys input and review the size of the resulting
sketches

Change-Id: I69b0f3753c86226886186d5f6edd779d2f9404e5
diff --git a/services/inputflinger/dispatcher/LatencyAggregator.cpp b/services/inputflinger/dispatcher/LatencyAggregator.cpp
index d79e6ae..a5bfc25 100644
--- a/services/inputflinger/dispatcher/LatencyAggregator.cpp
+++ b/services/inputflinger/dispatcher/LatencyAggregator.cpp
@@ -25,8 +25,23 @@
 #include <server_configurable_flags/get_flags.h>
 
 using android::base::StringPrintf;
+using dist_proc::aggregation::KllQuantile;
 using std::chrono_literals::operator""ms;
 
+// Convert the provided nanoseconds into hundreds of microseconds.
+// Use hundreds of microseconds (as opposed to microseconds) to preserve space.
+static inline int64_t ns2hus(nsecs_t nanos) {
+    return ns2us(nanos) / 100;
+}
+
+// The maximum number of events that we will store in the statistics. Any events that we will
+// receive after we have reached this number will be ignored. We could also implement this by
+// checking the actual size of the current data and making sure that we do not go over. However,
+// the serialization process of sketches is too heavy (1 ms for all 14 sketches), and would be too
+// much to do (even if infrequently).
+// The value here has been determined empirically.
+static constexpr size_t MAX_EVENTS_FOR_STATISTICS = 20000;
+
 // Category (=namespace) name for the input settings that are applied at boot time
 static const char* INPUT_NATIVE_BOOT = "input_native_boot";
 // Feature flag name for the threshold of end-to-end touch latency that would trigger
@@ -61,21 +76,34 @@
 
 namespace android::inputdispatcher {
 
-void Sketch::addValue(nsecs_t value) {
-    // TODO(b/167947340): replace with real sketch
-}
+/**
+ * Same as android::util::BytesField, but doesn't store raw pointers, and therefore deletes its
+ * resources automatically.
+ */
+class SafeBytesField {
+public:
+    explicit SafeBytesField(dist_proc::aggregation::KllQuantile& quantile) {
+        const zetasketch::android::AggregatorStateProto aggProto = quantile.SerializeToProto();
+        mBuffer.resize(aggProto.ByteSizeLong());
+        aggProto.SerializeToArray(mBuffer.data(), mBuffer.size());
+    }
+    android::util::BytesField getBytesField() {
+        return android::util::BytesField(mBuffer.data(), mBuffer.size());
+    }
 
-android::util::BytesField Sketch::serialize() {
-    return android::util::BytesField("TODO(b/167947340): use real sketch data", 4 /*length*/);
-}
-
-void Sketch::reset() {
-    // TODO(b/167947340): reset the sketch
-}
+private:
+    std::vector<char> mBuffer;
+};
 
 LatencyAggregator::LatencyAggregator() {
     AStatsManager_setPullAtomCallback(android::util::INPUT_EVENT_LATENCY_SKETCH, nullptr,
                                       LatencyAggregator::pullAtomCallback, this);
+    dist_proc::aggregation::KllQuantileOptions options;
+    options.set_inv_eps(100); // Request precision of 1.0%, instead of default 0.1%
+    for (size_t i = 0; i < SketchIndex::SIZE; i++) {
+        mDownSketches[i] = KllQuantile::Create(options);
+        mMoveSketches[i] = KllQuantile::Create(options);
+    }
 }
 
 LatencyAggregator::~LatencyAggregator() {
@@ -98,13 +126,18 @@
 }
 
 void LatencyAggregator::processStatistics(const InputEventTimeline& timeline) {
-    std::array<Sketch, SketchIndex::SIZE>& sketches =
+    // Before we do any processing, check that we have not yet exceeded MAX_SIZE
+    if (mNumSketchEventsProcessed >= MAX_EVENTS_FOR_STATISTICS) {
+        return;
+    }
+    mNumSketchEventsProcessed++;
+
+    std::array<std::unique_ptr<KllQuantile>, SketchIndex::SIZE>& sketches =
             timeline.isDown ? mDownSketches : mMoveSketches;
 
     // Process common ones first
     const nsecs_t eventToRead = timeline.readTime - timeline.eventTime;
-
-    sketches[SketchIndex::EVENT_TO_READ].addValue(eventToRead);
+    sketches[SketchIndex::EVENT_TO_READ]->Add(ns2hus(eventToRead));
 
     // Now process per-connection ones
     for (const auto& [connectionToken, connectionTimeline] : timeline.connectionTimelines) {
@@ -124,38 +157,51 @@
         const nsecs_t gpuCompleteToPresent = presentTime - gpuCompletedTime;
         const nsecs_t endToEnd = presentTime - timeline.eventTime;
 
-        sketches[SketchIndex::READ_TO_DELIVER].addValue(readToDeliver);
-        sketches[SketchIndex::DELIVER_TO_CONSUME].addValue(deliverToConsume);
-        sketches[SketchIndex::CONSUME_TO_FINISH].addValue(consumeToFinish);
-        sketches[SketchIndex::CONSUME_TO_GPU_COMPLETE].addValue(consumeToGpuComplete);
-        sketches[SketchIndex::GPU_COMPLETE_TO_PRESENT].addValue(gpuCompleteToPresent);
-        sketches[SketchIndex::END_TO_END].addValue(endToEnd);
+        sketches[SketchIndex::READ_TO_DELIVER]->Add(ns2hus(readToDeliver));
+        sketches[SketchIndex::DELIVER_TO_CONSUME]->Add(ns2hus(deliverToConsume));
+        sketches[SketchIndex::CONSUME_TO_FINISH]->Add(ns2hus(consumeToFinish));
+        sketches[SketchIndex::CONSUME_TO_GPU_COMPLETE]->Add(ns2hus(consumeToGpuComplete));
+        sketches[SketchIndex::GPU_COMPLETE_TO_PRESENT]->Add(ns2hus(gpuCompleteToPresent));
+        sketches[SketchIndex::END_TO_END]->Add(ns2hus(endToEnd));
     }
 }
 
 AStatsManager_PullAtomCallbackReturn LatencyAggregator::pullData(AStatsEventList* data) {
-    android::util::addAStatsEvent(data, android::util::INPUT_EVENT_LATENCY_SKETCH,
-                                  // DOWN sketches
-                                  mDownSketches[SketchIndex::EVENT_TO_READ].serialize(),
-                                  mDownSketches[SketchIndex::READ_TO_DELIVER].serialize(),
-                                  mDownSketches[SketchIndex::DELIVER_TO_CONSUME].serialize(),
-                                  mDownSketches[SketchIndex::CONSUME_TO_FINISH].serialize(),
-                                  mDownSketches[SketchIndex::CONSUME_TO_GPU_COMPLETE].serialize(),
-                                  mDownSketches[SketchIndex::GPU_COMPLETE_TO_PRESENT].serialize(),
-                                  mDownSketches[SketchIndex::END_TO_END].serialize(),
-                                  // MOVE sketches
-                                  mMoveSketches[SketchIndex::EVENT_TO_READ].serialize(),
-                                  mMoveSketches[SketchIndex::READ_TO_DELIVER].serialize(),
-                                  mMoveSketches[SketchIndex::DELIVER_TO_CONSUME].serialize(),
-                                  mMoveSketches[SketchIndex::CONSUME_TO_FINISH].serialize(),
-                                  mMoveSketches[SketchIndex::CONSUME_TO_GPU_COMPLETE].serialize(),
-                                  mMoveSketches[SketchIndex::GPU_COMPLETE_TO_PRESENT].serialize(),
-                                  mMoveSketches[SketchIndex::END_TO_END].serialize());
+    std::array<std::unique_ptr<SafeBytesField>, SketchIndex::SIZE> serializedDownData;
+    std::array<std::unique_ptr<SafeBytesField>, SketchIndex::SIZE> serializedMoveData;
+    for (size_t i = 0; i < SketchIndex::SIZE; i++) {
+        serializedDownData[i] = std::make_unique<SafeBytesField>(*mDownSketches[i]);
+        serializedMoveData[i] = std::make_unique<SafeBytesField>(*mMoveSketches[i]);
+    }
+    android::util::
+            addAStatsEvent(data, android::util::INPUT_EVENT_LATENCY_SKETCH,
+                           // DOWN sketches
+                           serializedDownData[SketchIndex::EVENT_TO_READ]->getBytesField(),
+                           serializedDownData[SketchIndex::READ_TO_DELIVER]->getBytesField(),
+                           serializedDownData[SketchIndex::DELIVER_TO_CONSUME]->getBytesField(),
+                           serializedDownData[SketchIndex::CONSUME_TO_FINISH]->getBytesField(),
+                           serializedDownData[SketchIndex::CONSUME_TO_GPU_COMPLETE]
+                                   ->getBytesField(),
+                           serializedDownData[SketchIndex::GPU_COMPLETE_TO_PRESENT]
+                                   ->getBytesField(),
+                           serializedDownData[SketchIndex::END_TO_END]->getBytesField(),
+                           // MOVE sketches
+                           serializedMoveData[SketchIndex::EVENT_TO_READ]->getBytesField(),
+                           serializedMoveData[SketchIndex::READ_TO_DELIVER]->getBytesField(),
+                           serializedMoveData[SketchIndex::DELIVER_TO_CONSUME]->getBytesField(),
+                           serializedMoveData[SketchIndex::CONSUME_TO_FINISH]->getBytesField(),
+                           serializedMoveData[SketchIndex::CONSUME_TO_GPU_COMPLETE]
+                                   ->getBytesField(),
+                           serializedMoveData[SketchIndex::GPU_COMPLETE_TO_PRESENT]
+                                   ->getBytesField(),
+                           serializedMoveData[SketchIndex::END_TO_END]->getBytesField());
 
     for (size_t i = 0; i < SketchIndex::SIZE; i++) {
-        mDownSketches[i].reset();
-        mMoveSketches[i].reset();
+        mDownSketches[i]->Reset();
+        mMoveSketches[i]->Reset();
     }
+    // Start new aggregations
+    mNumSketchEventsProcessed = 0;
     return AStatsManager_PULL_SUCCESS;
 }
 
@@ -211,11 +257,26 @@
 }
 
 std::string LatencyAggregator::dump(const char* prefix) {
-    return StringPrintf("%sLatencyAggregator:", prefix) +
-            StringPrintf("\n%s  mLastSlowEventTime=%" PRId64, prefix, mLastSlowEventTime) +
-            StringPrintf("\n%s  mNumEventsSinceLastSlowEventReport = %zu", prefix,
+    std::string sketchDump = StringPrintf("%s  Sketches:\n", prefix);
+    for (size_t i = 0; i < SketchIndex::SIZE; i++) {
+        const int64_t numDown = mDownSketches[i]->num_values();
+        SafeBytesField downBytesField(*mDownSketches[i]);
+        const float downBytesKb = downBytesField.getBytesField().arg_length * 1E-3;
+        const int64_t numMove = mMoveSketches[i]->num_values();
+        SafeBytesField moveBytesField(*mMoveSketches[i]);
+        const float moveBytesKb = moveBytesField.getBytesField().arg_length * 1E-3;
+        sketchDump +=
+                StringPrintf("%s    mDownSketches[%zu]->num_values = %" PRId64 " size = %.1fKB"
+                             " mMoveSketches[%zu]->num_values = %" PRId64 " size = %.1fKB\n",
+                             prefix, i, numDown, downBytesKb, i, numMove, moveBytesKb);
+    }
+
+    return StringPrintf("%sLatencyAggregator:\n", prefix) + sketchDump +
+            StringPrintf("%s  mNumSketchEventsProcessed=%zu\n", prefix, mNumSketchEventsProcessed) +
+            StringPrintf("%s  mLastSlowEventTime=%" PRId64 "\n", prefix, mLastSlowEventTime) +
+            StringPrintf("%s  mNumEventsSinceLastSlowEventReport = %zu\n", prefix,
                          mNumEventsSinceLastSlowEventReport) +
-            StringPrintf("\n%s  mNumSkippedSlowEvents = %zu", prefix, mNumSkippedSlowEvents);
+            StringPrintf("%s  mNumSkippedSlowEvents = %zu\n", prefix, mNumSkippedSlowEvents);
 }
 
 } // namespace android::inputdispatcher