[Sat] Add retrySignal to the device-based satellite repository

In the event that a telephony process crashes, SystemUI needs to
potentially re-register our callbacks with the SatelliteManager service.
To achieve this, we define a new [retrySignal] option in the wrapper for
flows that query [SatelliteManager] APIs.

Test: DeviceBasedSatelliteRepositoryImplTest
Bug: 337258696
Flag: com.android.internal.telephony.flags.oem_enabled_satellite_flag
Change-Id: I22176b104adfbfa5b9b792114d98a05d5e6ce75e
diff --git a/packages/SystemUI/src/com/android/systemui/statusbar/pipeline/satellite/data/prod/DeviceBasedSatelliteRepositoryImpl.kt b/packages/SystemUI/src/com/android/systemui/statusbar/pipeline/satellite/data/prod/DeviceBasedSatelliteRepositoryImpl.kt
index 12f252d..9d9cc2a 100644
--- a/packages/SystemUI/src/com/android/systemui/statusbar/pipeline/satellite/data/prod/DeviceBasedSatelliteRepositoryImpl.kt
+++ b/packages/SystemUI/src/com/android/systemui/statusbar/pipeline/satellite/data/prod/DeviceBasedSatelliteRepositoryImpl.kt
@@ -17,6 +17,8 @@
 package com.android.systemui.statusbar.pipeline.satellite.data.prod
 
 import android.os.OutcomeReceiver
+import android.telephony.TelephonyCallback
+import android.telephony.TelephonyManager
 import android.telephony.satellite.NtnSignalStrengthCallback
 import android.telephony.satellite.SatelliteManager
 import android.telephony.satellite.SatelliteManager.SATELLITE_RESULT_SUCCESS
@@ -38,6 +40,7 @@
 import com.android.systemui.statusbar.pipeline.satellite.data.prod.SatelliteSupport.Unknown
 import com.android.systemui.statusbar.pipeline.satellite.shared.model.SatelliteConnectionState
 import com.android.systemui.util.kotlin.getOrNull
+import com.android.systemui.util.kotlin.pairwise
 import com.android.systemui.util.time.SystemClock
 import java.util.Optional
 import javax.inject.Inject
@@ -51,12 +54,15 @@
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.MutableStateFlow
 import kotlinx.coroutines.flow.SharingStarted
+import kotlinx.coroutines.flow.StateFlow
 import kotlinx.coroutines.flow.collectLatest
 import kotlinx.coroutines.flow.distinctUntilChanged
 import kotlinx.coroutines.flow.flatMapLatest
 import kotlinx.coroutines.flow.flowOf
 import kotlinx.coroutines.flow.flowOn
 import kotlinx.coroutines.flow.map
+import kotlinx.coroutines.flow.mapNotNull
+import kotlinx.coroutines.flow.onStart
 import kotlinx.coroutines.flow.stateIn
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.suspendCancellableCoroutine
@@ -92,13 +98,19 @@
 
     @OptIn(ExperimentalCoroutinesApi::class)
     companion object {
-        /** Convenience function to switch to the supported flow */
+        /**
+         * Convenience function to switch to the supported flow. [retrySignal] is a flow that emits
+         * [Unit] whenever the [supported] flow needs to be restarted
+         */
         fun <T> Flow<SatelliteSupport>.whenSupported(
             supported: (SatelliteManager) -> Flow<T>,
             orElse: Flow<T>,
-        ): Flow<T> = flatMapLatest {
-            when (it) {
-                is Supported -> supported(it.satelliteManager)
+            retrySignal: Flow<Unit>,
+        ): Flow<T> = flatMapLatest { satelliteSupport ->
+            when (satelliteSupport) {
+                is Supported -> {
+                    retrySignal.flatMapLatest { supported(satelliteSupport.satelliteManager) }
+                }
                 else -> orElse
             }
         }
@@ -132,6 +144,7 @@
 @Inject
 constructor(
     satelliteManagerOpt: Optional<SatelliteManager>,
+    telephonyManager: TelephonyManager,
     @Background private val bgDispatcher: CoroutineDispatcher,
     @Application private val scope: CoroutineScope,
     @OemSatelliteInputLog private val logBuffer: LogBuffer,
@@ -201,11 +214,65 @@
         }
     }
 
+    /**
+     * Note that we are given an "unbound" [TelephonyManager] (meaning it was not created with a
+     * specific `subscriptionId`). Therefore this is the radio power state of the
+     * DEFAULT_SUBSCRIPTION_ID subscription. This subscription, I am led to believe, is the one that
+     * would be used for the SatelliteManager subscription.
+     *
+     * By watching power state changes, we can detect if the telephony process crashes.
+     *
+     * See b/337258696 for details
+     */
+    private val radioPowerState: StateFlow<Int> =
+        conflatedCallbackFlow {
+                val cb =
+                    object : TelephonyCallback(), TelephonyCallback.RadioPowerStateListener {
+                        override fun onRadioPowerStateChanged(powerState: Int) {
+                            trySend(powerState)
+                        }
+                    }
+
+                telephonyManager.registerTelephonyCallback(bgDispatcher.asExecutor(), cb)
+
+                awaitClose { telephonyManager.unregisterTelephonyCallback(cb) }
+            }
+            .flowOn(bgDispatcher)
+            .stateIn(
+                scope,
+                SharingStarted.WhileSubscribed(),
+                TelephonyManager.RADIO_POWER_UNAVAILABLE
+            )
+
+    /**
+     * In the event that a telephony phone process has crashed, we expect to see a radio power state
+     * change from ON to something else. This trigger can be used to re-start a flow via
+     * [whenSupported]
+     *
+     * This flow emits [Unit] when started so that newly-started collectors always run, and only
+     * restart when the state goes from ON -> !ON
+     */
+    private val telephonyProcessCrashedEvent: Flow<Unit> =
+        radioPowerState
+            .pairwise()
+            .mapNotNull { (prev: Int, new: Int) ->
+                if (
+                    prev == TelephonyManager.RADIO_POWER_ON &&
+                        new != TelephonyManager.RADIO_POWER_ON
+                ) {
+                    Unit
+                } else {
+                    null
+                }
+            }
+            .onStart { emit(Unit) }
+
     override val connectionState =
         satelliteSupport
             .whenSupported(
                 supported = ::connectionStateFlow,
-                orElse = flowOf(SatelliteConnectionState.Off)
+                orElse = flowOf(SatelliteConnectionState.Off),
+                retrySignal = telephonyProcessCrashedEvent,
             )
             .stateIn(scope, SharingStarted.Eagerly, SatelliteConnectionState.Off)
 
@@ -232,7 +299,11 @@
 
     override val signalStrength =
         satelliteSupport
-            .whenSupported(supported = ::signalStrengthFlow, orElse = flowOf(0))
+            .whenSupported(
+                supported = ::signalStrengthFlow,
+                orElse = flowOf(0),
+                retrySignal = telephonyProcessCrashedEvent,
+            )
             .stateIn(scope, SharingStarted.Eagerly, 0)
 
     // By using the SupportedSatelliteManager here, we expect registration never to fail
diff --git a/packages/SystemUI/tests/src/com/android/systemui/statusbar/pipeline/satellite/data/DeviceBasedSatelliteRepositorySwitcherTest.kt b/packages/SystemUI/tests/src/com/android/systemui/statusbar/pipeline/satellite/data/DeviceBasedSatelliteRepositorySwitcherTest.kt
index 7ca3b1c..6300953 100644
--- a/packages/SystemUI/tests/src/com/android/systemui/statusbar/pipeline/satellite/data/DeviceBasedSatelliteRepositorySwitcherTest.kt
+++ b/packages/SystemUI/tests/src/com/android/systemui/statusbar/pipeline/satellite/data/DeviceBasedSatelliteRepositorySwitcherTest.kt
@@ -16,6 +16,7 @@
 
 package com.android.systemui.statusbar.pipeline.satellite.data
 
+import android.telephony.TelephonyManager
 import android.telephony.satellite.SatelliteManager
 import androidx.test.filters.SmallTest
 import com.android.systemui.SysuiTestCase
@@ -50,11 +51,13 @@
     private val demoModeController =
         mock<DemoModeController>().apply { whenever(this.isInDemoMode).thenReturn(false) }
     private val satelliteManager = mock<SatelliteManager>()
+    private val telephonyManager = mock<TelephonyManager>()
     private val systemClock = FakeSystemClock()
 
     private val realImpl =
         DeviceBasedSatelliteRepositoryImpl(
             Optional.of(satelliteManager),
+            telephonyManager,
             testDispatcher,
             testScope.backgroundScope,
             FakeLogBuffer.Factory.create(),
diff --git a/packages/SystemUI/tests/src/com/android/systemui/statusbar/pipeline/satellite/data/prod/DeviceBasedSatelliteRepositoryImplTest.kt b/packages/SystemUI/tests/src/com/android/systemui/statusbar/pipeline/satellite/data/prod/DeviceBasedSatelliteRepositoryImplTest.kt
index 6b0ad4b..6651676 100644
--- a/packages/SystemUI/tests/src/com/android/systemui/statusbar/pipeline/satellite/data/prod/DeviceBasedSatelliteRepositoryImplTest.kt
+++ b/packages/SystemUI/tests/src/com/android/systemui/statusbar/pipeline/satellite/data/prod/DeviceBasedSatelliteRepositoryImplTest.kt
@@ -18,6 +18,8 @@
 
 import android.os.OutcomeReceiver
 import android.os.Process
+import android.telephony.TelephonyCallback
+import android.telephony.TelephonyManager
 import android.telephony.satellite.NtnSignalStrength
 import android.telephony.satellite.NtnSignalStrengthCallback
 import android.telephony.satellite.SatelliteManager
@@ -36,6 +38,7 @@
 import com.android.systemui.SysuiTestCase
 import com.android.systemui.coroutines.collectLastValue
 import com.android.systemui.log.core.FakeLogBuffer
+import com.android.systemui.statusbar.pipeline.mobile.data.repository.prod.MobileTelephonyHelpers
 import com.android.systemui.statusbar.pipeline.satellite.data.prod.DeviceBasedSatelliteRepositoryImpl.Companion.MIN_UPTIME
 import com.android.systemui.statusbar.pipeline.satellite.data.prod.DeviceBasedSatelliteRepositoryImpl.Companion.POLLING_INTERVAL_MS
 import com.android.systemui.statusbar.pipeline.satellite.shared.model.SatelliteConnectionState
@@ -59,6 +62,7 @@
 import org.mockito.Mockito
 import org.mockito.Mockito.doAnswer
 import org.mockito.Mockito.never
+import org.mockito.Mockito.times
 import org.mockito.Mockito.verify
 import org.mockito.MockitoAnnotations
 
@@ -69,6 +73,7 @@
     private lateinit var underTest: DeviceBasedSatelliteRepositoryImpl
 
     @Mock private lateinit var satelliteManager: SatelliteManager
+    @Mock private lateinit var telephonyManager: TelephonyManager
 
     private val systemClock = FakeSystemClock()
     private val dispatcher = StandardTestDispatcher()
@@ -86,6 +91,7 @@
             underTest =
                 DeviceBasedSatelliteRepositoryImpl(
                     Optional.empty(),
+                    telephonyManager,
                     dispatcher,
                     testScope.backgroundScope,
                     FakeLogBuffer.Factory.create(),
@@ -362,6 +368,68 @@
             verify(satelliteManager).registerForModemStateChanged(any(), any())
         }
 
+    @Test
+    fun telephonyCrash_repoReregistersConnectionStateListener() =
+        testScope.runTest {
+            setupDefaultRepo()
+
+            // GIVEN connection state is requested
+            val connectionState by collectLastValue(underTest.connectionState)
+
+            runCurrent()
+
+            val telephonyCallback =
+                MobileTelephonyHelpers.getTelephonyCallbackForType<
+                    TelephonyCallback.RadioPowerStateListener
+                >(
+                    telephonyManager
+                )
+
+            // THEN listener is registered once
+            verify(satelliteManager, times(1)).registerForModemStateChanged(any(), any())
+
+            // WHEN a crash event happens (detected by radio state change)
+            telephonyCallback.onRadioPowerStateChanged(TelephonyManager.RADIO_POWER_ON)
+            runCurrent()
+            telephonyCallback.onRadioPowerStateChanged(TelephonyManager.RADIO_POWER_OFF)
+            runCurrent()
+
+            // THEN listeners are unregistered and re-registered
+            verify(satelliteManager, times(1)).unregisterForModemStateChanged(any())
+            verify(satelliteManager, times(2)).registerForModemStateChanged(any(), any())
+        }
+
+    @Test
+    fun telephonyCrash_repoReregistersSignalStrengthListener() =
+        testScope.runTest {
+            setupDefaultRepo()
+
+            // GIVEN signal strength is requested
+            val signalStrength by collectLastValue(underTest.signalStrength)
+
+            runCurrent()
+
+            val telephonyCallback =
+                MobileTelephonyHelpers.getTelephonyCallbackForType<
+                    TelephonyCallback.RadioPowerStateListener
+                >(
+                    telephonyManager
+                )
+
+            // THEN listeners are registered the first time
+            verify(satelliteManager, times(1)).registerForNtnSignalStrengthChanged(any(), any())
+
+            // WHEN a crash event happens (detected by radio state change)
+            telephonyCallback.onRadioPowerStateChanged(TelephonyManager.RADIO_POWER_ON)
+            runCurrent()
+            telephonyCallback.onRadioPowerStateChanged(TelephonyManager.RADIO_POWER_OFF)
+            runCurrent()
+
+            // THEN listeners are unregistered and re-registered
+            verify(satelliteManager, times(1)).unregisterForNtnSignalStrengthChanged(any())
+            verify(satelliteManager, times(2)).registerForNtnSignalStrengthChanged(any(), any())
+        }
+
     private fun setUpRepo(
         uptime: Long = MIN_UPTIME,
         satMan: SatelliteManager? = satelliteManager,
@@ -380,6 +448,7 @@
         underTest =
             DeviceBasedSatelliteRepositoryImpl(
                 if (satMan != null) Optional.of(satMan) else Optional.empty(),
+                telephonyManager,
                 dispatcher,
                 testScope.backgroundScope,
                 FakeLogBuffer.Factory.create(),