Instrument MesssageQueue with the Perfetto SDK

1. Instrument MQ depth
2. Flows between messages

Added PerfettoTrace methods to be pre-compiled since
it can impact MessageQueue perf.

Test: atest PerfettoTraceTest
Change-Id: Id2046767c4ad15ede677bbea0e43b6005b6cc6d0
Bug: 303199244
Flag: android.os.perfetto_sdk_tracing_v2
diff --git a/boot/boot-image-profile-extra.txt b/boot/boot-image-profile-extra.txt
index fd51f9c..ced0d17 100644
--- a/boot/boot-image-profile-extra.txt
+++ b/boot/boot-image-profile-extra.txt
@@ -45,3 +45,24 @@
 HSPLandroid/os/MessageQueue$StackNodeType;->*
 HSPLandroid/os/MessageQueue$StateNode;->*
 HSPLandroid/os/MessageQueue$TimedParkStateNode;->*
+HSPLandroid/os/PerfettoTrace$Category;->*
+HSPLandroid/os/PerfettoTrace;->*
+HSPLandroid/os/PerfettoTrackEventExtra;->*
+HSPLandroid/os/PerfettoTrackEventExtra$BuilderImpl;->*
+HSPLandroid/os/PerfettoTrackEventExtra$NoOpBuilder;->*
+HSPLandroid/os/PerfettoTrackEventExtra$ArgBool;->*
+HSPLandroid/os/PerfettoTrackEventExtra$ArgInt64;->*
+HSPLandroid/os/PerfettoTrackEventExtra$ArgDouble;->*
+HSPLandroid/os/PerfettoTrackEventExtra$ArgString;->*
+HSPLandroid/os/PerfettoTrackEventExtra$CounterInt64;->*
+HSPLandroid/os/PerfettoTrackEventExtra$CounterDouble;->*
+HSPLandroid/os/PerfettoTrackEventExtra$CounterTrack;->*
+HSPLandroid/os/PerfettoTrackEventExtra$NamedTrack;->*
+HSPLandroid/os/PerfettoTrackEventExtra$Flow;->*
+HSPLandroid/os/PerfettoTrackEventExtra$Proto;->*
+HSPLandroid/os/PerfettoTrackEventExtra$FieldInt64;->*
+HSPLandroid/os/PerfettoTrackEventExtra$FieldDouble;->*
+HSPLandroid/os/PerfettoTrackEventExtra$FieldString;->*
+HSPLandroid/os/PerfettoTrackEventExtra$FieldNested;->*
+HSPLandroid/os/PerfettoTrackEventExtra$Pool;->*
+HSPLandroid/os/PerfettoTrackEventExtra$RingBuffer;->*
diff --git a/core/java/android/os/CombinedMessageQueue/MessageQueue.java b/core/java/android/os/CombinedMessageQueue/MessageQueue.java
index 877f130..349a2f0 100644
--- a/core/java/android/os/CombinedMessageQueue/MessageQueue.java
+++ b/core/java/android/os/CombinedMessageQueue/MessageQueue.java
@@ -24,8 +24,6 @@
 import android.app.ActivityThread;
 import android.app.Instrumentation;
 import android.compat.annotation.UnsupportedAppUsage;
-import android.os.Process;
-import android.os.UserHandle;
 import android.ravenwood.annotation.RavenwoodKeepWholeClass;
 import android.ravenwood.annotation.RavenwoodRedirect;
 import android.ravenwood.annotation.RavenwoodRedirectionClass;
@@ -95,6 +93,13 @@
     private int mAsyncMessageCount;
 
     /**
+     * @hide
+     */
+    private final AtomicLong mMessageCount = new AtomicLong();
+    private final Thread mThread;
+    private final long mTid;
+
+    /**
      * Select between two implementations of message queue. The legacy implementation is used
      * by default as it provides maximum compatibility with applications and tests that
      * reach into MessageQueue via the mMessages field. The concurrent implemmentation is used for
@@ -128,6 +133,8 @@
         mUseConcurrent = sIsProcessAllowedToUseConcurrent && !isInstrumenting();
         mQuitAllowed = quitAllowed;
         mPtr = nativeInit();
+        mThread = Thread.currentThread();
+        mTid = Process.myTid();
     }
 
     private static void initIsProcessAllowedToUseConcurrent() {
@@ -218,6 +225,32 @@
         }
     }
 
+    private void decAndTraceMessageCount() {
+        mMessageCount.decrementAndGet();
+        traceMessageCount();
+    }
+
+    private void incAndTraceMessageCount(Message msg, long when) {
+        mMessageCount.incrementAndGet();
+        msg.mSendingThreadName = Thread.currentThread().getName();
+        msg.mEventId.set(PerfettoTrace.getFlowId());
+
+        traceMessageCount();
+        PerfettoTrace.instant(PerfettoTrace.MQ_CATEGORY, "message_queue_send")
+                .addFlow(msg.mEventId.get())
+                .addArg("receiving_thread", mThread.getName())
+                .addArg("delay", when - SystemClock.uptimeMillis())
+                .addArg("what", msg.what)
+                .emit();
+    }
+
+    /** @hide */
+    private void traceMessageCount() {
+        PerfettoTrace.counter(PerfettoTrace.MQ_CATEGORY, mMessageCount.get())
+                .usingThreadCounterTrack(mTid, mThread.getName())
+                .emit();
+    }
+
     // Disposes of the underlying message queue.
     // Must only be called on the looper thread or the finalizer.
     private void dispose() {
@@ -800,6 +833,7 @@
             Message msg = nextMessage(false, false);
             if (msg != null) {
                 msg.markInUse();
+                decAndTraceMessageCount();
                 return msg;
             }
 
@@ -909,6 +943,7 @@
                         if (msg.isAsynchronous()) {
                             mAsyncMessageCount--;
                         }
+                        decAndTraceMessageCount();
                         if (TRACE) {
                             Trace.setCounter("MQ.Delivered", mMessagesDelivered.incrementAndGet());
                         }
@@ -1075,6 +1110,7 @@
 
             msg.markInUse();
             msg.arg1 = token;
+            incAndTraceMessageCount(msg, when);
 
             if (!enqueueMessageUnchecked(msg, when)) {
                 Log.wtf(TAG_C, "Unexpected error while adding sync barrier!");
@@ -1090,6 +1126,7 @@
             msg.markInUse();
             msg.when = when;
             msg.arg1 = token;
+            incAndTraceMessageCount(msg, when);
 
             if (Flags.messageQueueTailTracking() && mLast != null && mLast.when <= when) {
                 /* Message goes to tail of list */
@@ -1196,6 +1233,7 @@
                 needWake = mMessages == null || mMessages.target != null;
             }
             p.recycleUnchecked();
+            decAndTraceMessageCount();
 
             // If the loop is quitting then it is already awake.
             // We can assume mPtr != 0 when mQuitting is false.
@@ -1252,6 +1290,8 @@
 
             msg.markInUse();
             msg.when = when;
+            incAndTraceMessageCount(msg, when);
+
             Message p = mMessages;
             boolean needWake;
             if (p == null || when == 0 || when < p.when) {
@@ -1391,6 +1431,7 @@
                 if (msg.isAsynchronous()) {
                     mAsyncMessageCount--;
                 }
+                decAndTraceMessageCount();
                 if (TRACE) {
                     Trace.setCounter("MQ.Delivered", mMessagesDelivered.incrementAndGet());
                 }
@@ -1642,6 +1683,7 @@
                     mAsyncMessageCount--;
                 }
                 p.recycleUnchecked();
+                decAndTraceMessageCount();
                 p = n;
             }
 
@@ -1660,6 +1702,7 @@
                             mAsyncMessageCount--;
                         }
                         n.recycleUnchecked();
+                        decAndTraceMessageCount();
                         p.next = nn;
                         if (p.next == null) {
                             mLast = p;
@@ -1718,6 +1761,7 @@
                             mAsyncMessageCount--;
                         }
                         n.recycleUnchecked();
+                        decAndTraceMessageCount();
                         p.next = nn;
                         if (p.next == null) {
                             mLast = p;
@@ -1759,6 +1803,7 @@
                     mAsyncMessageCount--;
                 }
                 p.recycleUnchecked();
+                decAndTraceMessageCount();
                 p = n;
             }
 
@@ -1777,6 +1822,7 @@
                             mAsyncMessageCount--;
                         }
                         n.recycleUnchecked();
+                        decAndTraceMessageCount();
                         p.next = nn;
                         if (p.next == null) {
                             mLast = p;
@@ -1832,6 +1878,7 @@
                     mAsyncMessageCount--;
                 }
                 p.recycleUnchecked();
+                decAndTraceMessageCount();
                 p = n;
             }
 
@@ -1850,6 +1897,7 @@
                             mAsyncMessageCount--;
                         }
                         n.recycleUnchecked();
+                        decAndTraceMessageCount();
                         p.next = nn;
                         if (p.next == null) {
                             mLast = p;
@@ -1904,6 +1952,7 @@
                     mAsyncMessageCount--;
                 }
                 p.recycleUnchecked();
+                decAndTraceMessageCount();
                 p = n;
             }
 
@@ -1921,6 +1970,7 @@
                             mAsyncMessageCount--;
                         }
                         n.recycleUnchecked();
+                        decAndTraceMessageCount();
                         p.next = nn;
                         if (p.next == null) {
                             mLast = p;
@@ -1976,6 +2026,7 @@
                     mAsyncMessageCount--;
                 }
                 p.recycleUnchecked();
+                decAndTraceMessageCount();
                 p = n;
             }
 
@@ -1993,6 +2044,7 @@
                             mAsyncMessageCount--;
                         }
                         n.recycleUnchecked();
+                        decAndTraceMessageCount();
                         p.next = nn;
                         if (p.next == null) {
                             mLast = p;
@@ -2027,6 +2079,8 @@
         mMessages = null;
         mLast = null;
         mAsyncMessageCount = 0;
+        mMessageCount.set(0);
+        traceMessageCount();
     }
 
     private void removeAllFutureMessagesLocked() {
@@ -2057,6 +2111,7 @@
                         mAsyncMessageCount--;
                     }
                     p.recycleUnchecked();
+                    decAndTraceMessageCount();
                 } while (n != null);
             }
         }
@@ -2701,6 +2756,7 @@
         MessageNode node = new MessageNode(msg, seq);
         msg.when = when;
         msg.markInUse();
+        incAndTraceMessageCount(msg, when);
 
         if (DEBUG) {
             Log.d(TAG_C, "Insert message what: " + msg.what + " when: " + msg.when + " seq: "
@@ -2828,6 +2884,7 @@
                 if (removeMatches) {
                     if (p.removeFromStack()) {
                         p.mMessage.recycleUnchecked();
+                        decAndTraceMessageCount();
                         if (mMessageCounts.incrementCancelled()) {
                             nativeWake(mPtr);
                         }
@@ -2870,6 +2927,7 @@
                     found = true;
                     if (queue.remove(msg)) {
                         msg.mMessage.recycleUnchecked();
+                        decAndTraceMessageCount();
                     }
                 } else {
                     return true;
diff --git a/core/java/android/os/Looper.java b/core/java/android/os/Looper.java
index 2fe4871..d16e447 100644
--- a/core/java/android/os/Looper.java
+++ b/core/java/android/os/Looper.java
@@ -199,7 +199,12 @@
             return false;
         }
 
-        // This must be in a local variable, in case a UI event sets the logger
+        PerfettoTrace.begin(PerfettoTrace.MQ_CATEGORY, "message_queue_receive")
+                .addArg("sending_thread", msg.mSendingThreadName)
+                .addTerminatingFlow(msg.mEventId.get())
+                .emit();
+
+        // This must be in a local variabe, in case a UI event sets the logger
         final Printer logging = me.mLogging;
         if (logging != null) {
             logging.println(">>>>> Dispatching to " + msg.target + " "
@@ -289,6 +294,7 @@
                     + msg.callback + " what=" + msg.what);
         }
 
+        PerfettoTrace.end(PerfettoTrace.MQ_CATEGORY).emit();
         msg.recycleUnchecked();
 
         return true;
diff --git a/core/java/android/os/Message.java b/core/java/android/os/Message.java
index 702fdc2..b22d177 100644
--- a/core/java/android/os/Message.java
+++ b/core/java/android/os/Message.java
@@ -23,6 +23,8 @@
 
 import com.android.internal.annotations.VisibleForTesting;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  *
  * Defines a message containing a description and arbitrary data object that can be
@@ -37,6 +39,13 @@
 @android.ravenwood.annotation.RavenwoodKeepWholeClass
 public final class Message implements Parcelable {
     /**
+     * For tracing
+     *
+     * @hide Only for use within the system server.
+     */
+    public final AtomicInteger mEventId = new AtomicInteger();
+
+    /**
      * User-defined message code so that the recipient can identify
      * what this message is about. Each {@link Handler} has its own name-space
      * for message codes, so you do not need to worry about yours conflicting
@@ -101,6 +110,13 @@
      */
     public int workSourceUid = UID_NONE;
 
+    /**
+     * Sending thread
+     *
+     * @hide
+     */
+    public String mSendingThreadName;
+
     /** If set message is in use.
      * This flag is set when the message is enqueued and remains set while it
      * is delivered and afterwards when it is recycled.  The flag is only cleared
diff --git a/core/java/android/os/PerfettoTrace.java b/core/java/android/os/PerfettoTrace.java
index 076437f..741d542 100644
--- a/core/java/android/os/PerfettoTrace.java
+++ b/core/java/android/os/PerfettoTrace.java
@@ -51,6 +51,8 @@
      */
     private static final AtomicInteger sFlowEventId = new AtomicInteger();
 
+    public static final PerfettoTrace.Category MQ_CATEGORY = new PerfettoTrace.Category("mq");
+
     /**
      * Perfetto category a trace event belongs to.
      * Registering a category is not sufficient to capture events within the category, it must
@@ -370,4 +372,11 @@
     public static void register(boolean isBackendInProcess) {
         native_register(isBackendInProcess);
     }
+
+    /**
+     * Registers categories with Perfetto.
+     */
+    public static void registerCategories() {
+        MQ_CATEGORY.register();
+    }
 }
diff --git a/core/java/android/os/Trace.java b/core/java/android/os/Trace.java
index 09e6a45..bf4c4d1 100644
--- a/core/java/android/os/Trace.java
+++ b/core/java/android/os/Trace.java
@@ -544,5 +544,6 @@
      */
     public static void registerWithPerfetto() {
         PerfettoTrace.register(false /* isBackendInProcess */);
+        PerfettoTrace.registerCategories();
     }
 }
diff --git a/core/tests/coretests/src/android/os/PerfettoTraceTest.java b/core/tests/coretests/src/android/os/PerfettoTraceTest.java
index 0b5a446..91efacf 100644
--- a/core/tests/coretests/src/android/os/PerfettoTraceTest.java
+++ b/core/tests/coretests/src/android/os/PerfettoTraceTest.java
@@ -59,6 +59,8 @@
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This class is used to test the native tracing support. Run this test
@@ -77,6 +79,8 @@
     private static final String BAR = "bar";
 
     private static final Category FOO_CATEGORY = new Category(FOO);
+    private static final int MESSAGE = 1234567;
+    private static final int MESSAGE_COUNT = 3;
 
     private final Set<String> mCategoryNames = new ArraySet<>();
     private final Set<String> mEventNames = new ArraySet<>();
@@ -592,6 +596,89 @@
         assertThat(mDebugAnnotationNames).doesNotContain("before");
     }
 
+    @Test
+    @RequiresFlagsEnabled(android.os.Flags.FLAG_PERFETTO_SDK_TRACING_V2)
+    public void testMessageQueue() throws Exception {
+        TraceConfig traceConfig = getTraceConfig("mq");
+
+        PerfettoTrace.MQ_CATEGORY.register();
+        final HandlerThread thread = new HandlerThread("test");
+        thread.start();
+        final Handler handler = thread.getThreadHandler();
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        PerfettoTrace.Session session = new PerfettoTrace.Session(true,
+                getTraceConfig("mq").toByteArray());
+
+        handler.sendEmptyMessage(MESSAGE);
+        handler.sendEmptyMessageDelayed(MESSAGE, 10);
+        handler.sendEmptyMessage(MESSAGE);
+        handler.postDelayed(() -> {
+            latch.countDown();
+        }, 10);
+        assertThat(latch.await(100, TimeUnit.MILLISECONDS)).isTrue();
+
+        byte[] traceBytes = session.close();
+
+        Trace trace = Trace.parseFrom(traceBytes);
+
+        boolean hasTrackEvent = false;
+        int instantCount = 0;
+        int counterCount = 0;
+        int beginCount = 0;
+        int endCount = 0;
+
+        Set<Long> flowIds = new ArraySet<>();
+        for (TracePacket packet: trace.getPacketList()) {
+            TrackEvent event;
+            if (packet.hasTrackEvent()) {
+                hasTrackEvent = true;
+                event = packet.getTrackEvent();
+            } else {
+                continue;
+            }
+
+            List<DebugAnnotation> annotations = event.getDebugAnnotationsList();
+            switch (event.getType()) {
+                case TrackEvent.Type.TYPE_INSTANT:
+                    if (annotations.get(2).getIntValue() == MESSAGE) {
+                        instantCount++;
+                        assertThat(annotations.get(0).getStringValue()).isEqualTo("test");
+                        assertThat(event.getFlowIdsCount()).isEqualTo(1);
+                        flowIds.addAll(event.getFlowIdsList());
+                    }
+                    break;
+                case TrackEvent.Type.TYPE_COUNTER:
+                    counterCount++;
+                    break;
+                case TrackEvent.Type.TYPE_SLICE_BEGIN:
+                    annotations = event.getDebugAnnotationsList();
+                    if (flowIds.containsAll(event.getTerminatingFlowIdsList())) {
+                        beginCount++;
+                        assertThat(event.getTerminatingFlowIdsCount()).isEqualTo(1);
+                    }
+                    break;
+                case TrackEvent.Type.TYPE_SLICE_END:
+                    endCount++;
+                    break;
+                default:
+                    break;
+            }
+            collectInternedData(packet);
+        }
+
+        assertThat(hasTrackEvent).isTrue();
+        assertThat(mCategoryNames).contains("mq");
+        assertThat(mEventNames).contains("message_queue_send");
+        assertThat(mEventNames).contains("message_queue_receive");
+        assertThat(mDebugAnnotationNames).contains("what");
+        assertThat(mDebugAnnotationNames).contains("delay");
+        assertThat(instantCount).isEqualTo(MESSAGE_COUNT);
+        assertThat(beginCount).isEqualTo(MESSAGE_COUNT);
+        assertThat(endCount).isAtLeast(MESSAGE_COUNT);
+        assertThat(counterCount).isAtLeast(MESSAGE_COUNT);
+    }
+
     private TrackEvent getTrackEvent(Trace trace, int idx) {
         int curIdx = 0;
         for (TracePacket packet: trace.getPacketList()) {