Merge "Make pairwise() only subscribe to upstream once" into tm-qpr-dev am: a117bbe3f2

Original change: https://googleplex-android-review.googlesource.com/c/platform/frameworks/base/+/19827976

Change-Id: I9ab32af500d6a177acf94fb517c598fe8c85a3bc
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/packages/SystemUI/src/com/android/systemui/util/kotlin/Flow.kt b/packages/SystemUI/src/com/android/systemui/util/kotlin/Flow.kt
index 729fdfe..84305cc 100644
--- a/packages/SystemUI/src/com/android/systemui/util/kotlin/Flow.kt
+++ b/packages/SystemUI/src/com/android/systemui/util/kotlin/Flow.kt
@@ -17,10 +17,10 @@
 package com.android.systemui.util.kotlin
 
 import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collect
 import kotlinx.coroutines.flow.distinctUntilChanged
-import kotlinx.coroutines.flow.drop
+import kotlinx.coroutines.flow.flow
 import kotlinx.coroutines.flow.onStart
-import kotlinx.coroutines.flow.zip
 
 /**
  * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
@@ -29,15 +29,16 @@
  *
  * Useful for code that needs to compare the current value to the previous value.
  */
-fun <T, R> Flow<T>.pairwiseBy(transform: suspend (old: T, new: T) -> R): Flow<R> {
-    // same as current flow, but with the very first event skipped
-    val nextEvents = drop(1)
-    // zip current flow and nextEvents; transform will receive a pair of old and new value. This
-    // works because zip will suppress emissions until both flows have emitted something; since in
-    // this case both flows are emitting at the same rate, but the current flow just has one extra
-    // thing emitted at the start, the effect is that zip will cache the most recent value while
-    // waiting for the next emission from nextEvents.
-    return zip(nextEvents, transform)
+fun <T, R> Flow<T>.pairwiseBy(transform: suspend (old: T, new: T) -> R): Flow<R> = flow {
+    val noVal = Any()
+    var previousValue: Any? = noVal
+    collect { newVal ->
+        if (previousValue != noVal) {
+            @Suppress("UNCHECKED_CAST")
+            emit(transform(previousValue as T, newVal))
+        }
+        previousValue = newVal
+    }
 }
 
 /**