Add BroadcastDispatcher.broadcastFlow

Bug: 242040009
Test: atest BroadcastDispatcherTest
Change-Id: I688d686950239abab207d13d50cfb896303531d1
diff --git a/packages/SystemUI/src/com/android/systemui/broadcast/BroadcastDispatcher.kt b/packages/SystemUI/src/com/android/systemui/broadcast/BroadcastDispatcher.kt
index d757b62..eb8cb47 100644
--- a/packages/SystemUI/src/com/android/systemui/broadcast/BroadcastDispatcher.kt
+++ b/packages/SystemUI/src/com/android/systemui/broadcast/BroadcastDispatcher.kt
@@ -31,6 +31,8 @@
 import com.android.internal.annotations.VisibleForTesting
 import com.android.systemui.Dumpable
 import com.android.systemui.broadcast.logging.BroadcastDispatcherLogger
+import com.android.systemui.common.coroutine.ChannelExt.trySendWithFailureLogging
+import com.android.systemui.common.coroutine.ConflatedCallbackFlow.conflatedCallbackFlow
 import com.android.systemui.dagger.SysUISingleton
 import com.android.systemui.dagger.qualifiers.Background
 import com.android.systemui.dump.DumpManager
@@ -38,6 +40,8 @@
 import java.io.PrintWriter
 import java.util.concurrent.Executor
 import javax.inject.Inject
+import kotlinx.coroutines.channels.awaitClose
+import kotlinx.coroutines.flow.Flow
 
 data class ReceiverData(
     val receiver: BroadcastReceiver,
@@ -153,6 +157,55 @@
                 .sendToTarget()
     }
 
+    /**
+     * Returns a [Flow] that, when collected, emits a new value whenever a broadcast matching
+     * [filter] is received. The value will be computed from the intent and the registered receiver
+     * using [map].
+     *
+     * @see registerReceiver
+     */
+    @JvmOverloads
+    fun <T> broadcastFlow(
+        filter: IntentFilter,
+        user: UserHandle? = null,
+        @Context.RegisterReceiverFlags flags: Int = Context.RECEIVER_EXPORTED,
+        permission: String? = null,
+        map: (Intent, BroadcastReceiver) -> T,
+    ): Flow<T> = conflatedCallbackFlow {
+        val receiver = object : BroadcastReceiver() {
+            override fun onReceive(context: Context, intent: Intent) {
+                trySendWithFailureLogging(map(intent, this), TAG)
+            }
+        }
+
+        registerReceiver(
+            receiver,
+            filter,
+            bgExecutor,
+            user,
+            flags,
+            permission,
+        )
+
+        awaitClose {
+            unregisterReceiver(receiver)
+        }
+    }
+
+    /**
+     * Returns a [Flow] that, when collected, emits `Unit` whenever a broadcast matching [filter] is
+     * received.
+     *
+     * @see registerReceiver
+     */
+    @JvmOverloads
+    fun broadcastFlow(
+        filter: IntentFilter,
+        user: UserHandle? = null,
+        @Context.RegisterReceiverFlags flags: Int = Context.RECEIVER_EXPORTED,
+        permission: String? = null,
+    ): Flow<Unit> = broadcastFlow(filter, user, flags, permission) { _, _ -> Unit }
+
     private fun checkFilter(filter: IntentFilter) {
         val sb = StringBuilder()
         if (filter.countActions() == 0) sb.append("Filter must contain at least one action. ")
diff --git a/packages/SystemUI/tests/src/com/android/systemui/broadcast/BroadcastDispatcherTest.kt b/packages/SystemUI/tests/src/com/android/systemui/broadcast/BroadcastDispatcherTest.kt
index 7795d2c..434cb48 100644
--- a/packages/SystemUI/tests/src/com/android/systemui/broadcast/BroadcastDispatcherTest.kt
+++ b/packages/SystemUI/tests/src/com/android/systemui/broadcast/BroadcastDispatcherTest.kt
@@ -18,6 +18,7 @@
 
 import android.content.BroadcastReceiver
 import android.content.Context
+import android.content.Intent
 import android.content.IntentFilter
 import android.os.Handler
 import android.os.Looper
@@ -32,22 +33,27 @@
 import com.android.systemui.settings.UserTracker
 import com.android.systemui.util.concurrency.FakeExecutor
 import com.android.systemui.util.mockito.eq
+import com.android.systemui.util.mockito.mock
 import com.android.systemui.util.time.FakeSystemClock
+import com.google.common.truth.Truth.assertThat
+import java.util.concurrent.Executor
 import junit.framework.Assert.assertSame
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.test.runBlockingTest
 import org.junit.Before
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.mockito.ArgumentCaptor
 import org.mockito.Captor
 import org.mockito.Mock
-import org.mockito.Mockito.`when`
 import org.mockito.Mockito.anyInt
 import org.mockito.Mockito.inOrder
 import org.mockito.Mockito.mock
 import org.mockito.Mockito.never
 import org.mockito.Mockito.verify
+import org.mockito.Mockito.`when`
 import org.mockito.MockitoAnnotations
-import java.util.concurrent.Executor
 
 @RunWith(AndroidTestingRunner::class)
 @TestableLooper.RunWithLooper
@@ -381,6 +387,39 @@
             .clearPendingRemoval(broadcastReceiver, user1.identifier)
     }
 
+    @Test
+    fun testBroadcastFlow() = runBlockingTest {
+        val flow = broadcastDispatcher.broadcastFlow(intentFilter, user1) { intent, receiver ->
+            intent to receiver
+        }
+
+        // Collect the values into collectedValues.
+        val collectedValues = mutableListOf<Pair<Intent, BroadcastReceiver>>()
+        val job = launch {
+            flow.collect { collectedValues.add(it) }
+        }
+
+        testableLooper.processAllMessages()
+        verify(mockUBRUser1).registerReceiver(capture(argumentCaptor), eq(DEFAULT_FLAG))
+        val receiver = argumentCaptor.value.receiver
+
+        // Simulate fake broadcasted intents.
+        val fakeIntents = listOf<Intent>(mock(), mock(), mock())
+        fakeIntents.forEach { receiver.onReceive(mockContext, it) }
+
+        // The intents should have been collected.
+        advanceUntilIdle()
+
+        val expectedValues = fakeIntents.map { it to receiver }
+        assertThat(collectedValues).containsExactlyElementsIn(expectedValues)
+
+        // Stop the collection.
+        job.cancel()
+
+        testableLooper.processAllMessages()
+        verify(mockUBRUser1).unregisterReceiver(receiver)
+    }
+
     private fun setUserMock(mockContext: Context, user: UserHandle) {
         `when`(mockContext.user).thenReturn(user)
         `when`(mockContext.userId).thenReturn(user.identifier)