[kairos] fix early termination of state accumulation

Calling CompletableJob.complete() puts the Job in the *completing*
state, where it stays until all child Jobs complete. If there are no
child Jobs, then it completes immedidately.

We want to do this because it allows the structured concurrency tree to
be cleaned up once it is marked as complete. No doing so means that a
build scope is *running* until it is explicitly cancelled. If child Jobs
complete normally, the parent Job stays *running*, wasting some memory.

However, because we also use the Job as a bridge between the pure Kairos
world, and the impure external world. Specifically, when the Job
completes (or is cancelled), we use that signal to terminate any ongoing
state accumulation. This is fine if it is the semantic end of the scope,
but if the scope is still "semantically" valid, but just not running any
more coroutines (via effects), then we still want state accumulation to
continue, even if we want to mark the Job as complete so that the
coroutines infra can free up some memory.

For now, lets just disable the optimization, as the cost probably isn't
too high.

Flag: EXEMPT unused
Test: atest kairos-tests
Change-Id: I9dd3575451cb146237ce74c12498e2c764576a1e
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
index 87897d8..86f35f7 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
@@ -36,6 +36,7 @@
 import com.android.systemui.kairos.internal.util.childScope
 import com.android.systemui.kairos.internal.util.mapValuesParallel
 import com.android.systemui.kairos.launchEffect
+import com.android.systemui.kairos.mergeLeft
 import com.android.systemui.kairos.util.Just
 import com.android.systemui.kairos.util.Maybe
 import com.android.systemui.kairos.util.None
@@ -85,7 +86,6 @@
     ): TFlow<A> {
         var job: Job? = null
         val stopEmitter = newStopEmitter()
-        val handle = this.job.invokeOnCompletion { stopEmitter.emit(Unit) }
         // Create a child scope that will be kept alive beyond the end of this transaction.
         val childScope = coroutineScope.childScope()
         lateinit var emitter: Pair<T, S>
@@ -97,7 +97,6 @@
                         reenterBuildScope(this@BuildScopeImpl, childScope).runInBuildScope {
                             launchEffect {
                                 builder(emitter.second)
-                                handle.dispose()
                                 stopEmitter.emit(Unit)
                             }
                         }
@@ -108,7 +107,7 @@
                 },
             )
         emitter = constructFlow(inputNode)
-        return with(frpScope) { emitter.first.takeUntil(stopEmitter) }
+        return with(frpScope) { emitter.first.takeUntil(mergeLeft(stopEmitter, endSignal)) }
     }
 
     private fun <T> tFlowInternal(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> =
@@ -299,12 +298,14 @@
         childScope.coroutineContext.job.invokeOnCompletion { stopEmitter.emit(Unit) }
         // Ensure that once this transaction is done, the new child scope enters the completing
         // state (kept alive so long as there are child jobs).
-        scheduleOutput(
-            OneShot {
-                // TODO: don't like this cast
-                (childScope.coroutineContext.job as CompletableJob).complete()
-            }
-        )
+        // TODO: need to keep the scope alive if it's used to accumulate state.
+        //  Otherwise, stopEmitter will emit early, due to the call to complete().
+        //        scheduleOutput(
+        //            OneShot {
+        //                // TODO: don't like this cast
+        //                (childScope.coroutineContext.job as CompletableJob).complete()
+        //            }
+        //        )
         return BuildScopeImpl(
             stateScope = StateScopeImpl(evalScope = stateScope.evalScope, endSignal = stopEmitter),
             coroutineScope = childScope,
diff --git a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
index 7294ef3..688adae 100644
--- a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
+++ b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
@@ -1300,6 +1300,26 @@
     }
 
     @Test
+    fun buildScope_stateAccumulation() = runFrpTest { network ->
+        val input = network.mutableTFlow<Unit>()
+        var observedCount: Int? = null
+        activateSpec(network) {
+            val (c, j) = asyncScope { input.fold(0) { _, x -> x + 1 } }
+            deferredBuildScopeAction { c.get().observe { observedCount = it } }
+        }
+        runCurrent()
+        assertEquals(0, observedCount)
+
+        input.emit(Unit)
+        runCurrent()
+        assertEquals(1, observedCount)
+
+        input.emit(Unit)
+        runCurrent()
+        assertEquals(2, observedCount)
+    }
+
+    @Test
     fun effect() = runFrpTest { network ->
         val input = network.mutableTFlow<Unit>()
         var effectRunning = false