Merge "Implementation of sharding for procstats atoms" into rvc-dev
diff --git a/core/java/com/android/internal/app/procstats/ProcessStats.java b/core/java/com/android/internal/app/procstats/ProcessStats.java
index 7455ad0..11e55b8 100644
--- a/core/java/com/android/internal/app/procstats/ProcessStats.java
+++ b/core/java/com/android/internal/app/procstats/ProcessStats.java
@@ -2232,24 +2232,43 @@
     }
 
     /** Similar to {@code #dumpDebug}, but with a reduced/aggregated subset of states. */
-    public void dumpAggregatedProtoForStatsd(ProtoOutputStream proto) {
-        dumpProtoPreamble(proto);
+    public void dumpAggregatedProtoForStatsd(ProtoOutputStream[] protoStreams,
+            long maxRawShardSizeBytes) {
+        int shardIndex = 0;
+        dumpProtoPreamble(protoStreams[shardIndex]);
+
         final ArrayMap<String, SparseArray<ProcessState>> procMap = mProcesses.getMap();
         final ProcessMap<ArraySet<PackageState>> procToPkgMap = new ProcessMap<>();
         final SparseArray<ArraySet<String>> uidToPkgMap = new SparseArray<>();
         collectProcessPackageMaps(null, false, procToPkgMap, uidToPkgMap);
+
         for (int ip = 0; ip < procMap.size(); ip++) {
             final String procName = procMap.keyAt(ip);
+            if (protoStreams[shardIndex].getRawSize() > maxRawShardSizeBytes) {
+                shardIndex++;
+                if (shardIndex >= protoStreams.length) {
+                    // We have run out of space; we'll drop the rest of the processes.
+                    Slog.d(TAG, String.format("Dropping process indices from %d to %d from "
+                            + "statsd proto (too large)", ip, procMap.size()));
+                    break;
+                }
+                dumpProtoPreamble(protoStreams[shardIndex]);
+            }
+
             final SparseArray<ProcessState> uids = procMap.valueAt(ip);
             for (int iu = 0; iu < uids.size(); iu++) {
                 final int uid = uids.keyAt(iu);
                 final ProcessState procState = uids.valueAt(iu);
-                procState.dumpAggregatedProtoForStatsd(proto,
+                procState.dumpAggregatedProtoForStatsd(protoStreams[shardIndex],
                         ProcessStatsSectionProto.PROCESS_STATS,
                         procName, uid, mTimePeriodEndRealtime,
                         procToPkgMap, uidToPkgMap);
             }
         }
+
+        for (int i = 0; i <= shardIndex; i++) {
+            protoStreams[i].flush();
+        }
     }
 
     private void dumpProtoPreamble(ProtoOutputStream proto) {
@@ -2403,10 +2422,11 @@
                 final SourceKey key = assocVals.keyAt(i);
                 final long[] vals = assocVals.valueAt(i);
                 final long token = proto.start(fieldId);
+                final int idx = uidToPkgMap.indexOfKey(key.mUid);
                 ProcessState.writeCompressedProcessName(proto,
                         ProcessStatsAssociationProto.ASSOC_PROCESS_NAME,
                         key.mProcess, key.mPackage,
-                        uidToPkgMap.get(key.mUid).size() > 1);
+                        idx >= 0 && uidToPkgMap.valueAt(idx).size() > 1);
                 proto.write(ProcessStatsAssociationProto.ASSOC_UID, key.mUid);
                 proto.write(ProcessStatsAssociationProto.TOTAL_COUNT, (int) vals[1]);
                 proto.write(ProcessStatsAssociationProto.TOTAL_DURATION_SECS,
diff --git a/services/core/java/com/android/server/am/ProcessStatsService.java b/services/core/java/com/android/server/am/ProcessStatsService.java
index a034949..a168af5a 100644
--- a/services/core/java/com/android/server/am/ProcessStatsService.java
+++ b/services/core/java/com/android/server/am/ProcessStatsService.java
@@ -1269,12 +1269,12 @@
      * Dump proto for the statsd, mainly for testing.
      */
     private void dumpProtoForStatsd(FileDescriptor fd) {
-        final ProtoOutputStream proto = new ProtoOutputStream(fd);
+        final ProtoOutputStream[] protos = {new ProtoOutputStream(fd)};
 
         ProcessStats procStats = new ProcessStats(false);
         getCommittedStatsMerged(0, 0, true, null, procStats);
-        procStats.dumpAggregatedProtoForStatsd(proto);
+        procStats.dumpAggregatedProtoForStatsd(protos, 999999 /* max bytes per shard */);
 
-        proto.flush();
+        protos[0].flush();
     }
 }
diff --git a/services/core/java/com/android/server/stats/pull/StatsPullAtomService.java b/services/core/java/com/android/server/stats/pull/StatsPullAtomService.java
index 7fe21e3..802a355 100644
--- a/services/core/java/com/android/server/stats/pull/StatsPullAtomService.java
+++ b/services/core/java/com/android/server/stats/pull/StatsPullAtomService.java
@@ -236,6 +236,17 @@
     private static final String DANGEROUS_PERMISSION_STATE_SAMPLE_RATE =
             "dangerous_permission_state_sample_rate";
 
+    /** Parameters relating to ProcStats data upload. */
+    // Maximum shards to use when generating StatsEvent objects from ProcStats.
+    private static final int MAX_PROCSTATS_SHARDS = 5;
+    // Should match MAX_PAYLOAD_SIZE in StatsEvent, minus a small amount for overhead/metadata.
+    private static final int MAX_PROCSTATS_SHARD_SIZE = 48 * 1024; // 48 KB
+    // In ProcessStats, we measure the size of a raw ProtoOutputStream, before compaction. This
+    // typically runs 35-45% larger than the compacted size that will be written to StatsEvent.
+    // Hence, we can allow a little more room in each shard before moving to the next. Make this
+    // 20% as a conservative estimate.
+    private static final int MAX_PROCSTATS_RAW_SHARD_SIZE = (int) (MAX_PROCSTATS_SHARD_SIZE * 1.20);
+
     private final Object mThermalLock = new Object();
     @GuardedBy("mThermalLock")
     private IThermalService mThermalService;
@@ -2554,19 +2565,26 @@
             long lastHighWaterMark = readProcStatsHighWaterMark(section);
             List<ParcelFileDescriptor> statsFiles = new ArrayList<>();
 
+            ProtoOutputStream[] protoStreams = new ProtoOutputStream[MAX_PROCSTATS_SHARDS];
+            for (int i = 0; i < protoStreams.length; i++) {
+                protoStreams[i] = new ProtoOutputStream();
+            }
+
             ProcessStats procStats = new ProcessStats(false);
+            // Force processStatsService to aggregate all in-storage and in-memory data.
             long highWaterMark = processStatsService.getCommittedStatsMerged(
                     lastHighWaterMark, section, true, statsFiles, procStats);
+            procStats.dumpAggregatedProtoForStatsd(protoStreams, MAX_PROCSTATS_RAW_SHARD_SIZE);
 
-            // aggregate the data together for westworld consumption
-            ProtoOutputStream proto = new ProtoOutputStream();
-            procStats.dumpAggregatedProtoForStatsd(proto);
-
-            StatsEvent e = StatsEvent.newBuilder()
-                    .setAtomId(atomTag)
-                    .writeByteArray(proto.getBytes())
-                    .build();
-            pulledData.add(e);
+            for (ProtoOutputStream proto : protoStreams) {
+                if (proto.getBytes().length > 0) {
+                    StatsEvent e = StatsEvent.newBuilder()
+                            .setAtomId(atomTag)
+                            .writeByteArray(proto.getBytes())
+                            .build();
+                    pulledData.add(e);
+                }
+            }
 
             new File(mBaseDir.getAbsolutePath() + "/" + section + "_" + lastHighWaterMark)
                     .delete();