summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--packages/SystemUI/src/com/android/systemui/util/kotlin/Flow.kt98
-rw-r--r--packages/SystemUI/tests/src/com/android/systemui/util/kotlin/FlowUtilTests.kt141
2 files changed, 239 insertions, 0 deletions
diff --git a/packages/SystemUI/src/com/android/systemui/util/kotlin/Flow.kt b/packages/SystemUI/src/com/android/systemui/util/kotlin/Flow.kt
new file mode 100644
index 000000000000..7baebf4ef600
--- /dev/null
+++ b/packages/SystemUI/src/com/android/systemui/util/kotlin/Flow.kt
@@ -0,0 +1,98 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.systemui.util.kotlin
+
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.distinctUntilChanged
+import kotlinx.coroutines.flow.drop
+import kotlinx.coroutines.flow.onStart
+import kotlinx.coroutines.flow.zip
+
+/**
+ * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
+ * Note that the new Flow will not start emitting until it has received two emissions from the
+ * upstream Flow.
+ *
+ * Useful for code that needs to compare the current value to the previous value.
+ */
+fun <T, R> Flow<T>.pairwiseBy(transform: suspend (old: T, new: T) -> R): Flow<R> {
+ // same as current flow, but with the very first event skipped
+ val nextEvents = drop(1)
+ // zip current flow and nextEvents; transform will receive a pair of old and new value. This
+ // works because zip will suppress emissions until both flows have emitted something; since in
+ // this case both flows are emitting at the same rate, but the current flow just has one extra
+ // thing emitted at the start, the effect is that zip will cache the most recent value while
+ // waiting for the next emission from nextEvents.
+ return zip(nextEvents, transform)
+}
+
+/**
+ * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
+ * [initialValue] will be used as the "old" value for the first emission.
+ *
+ * Useful for code that needs to compare the current value to the previous value.
+ */
+fun <T, R> Flow<T>.pairwiseBy(
+ initialValue: T,
+ transform: suspend (previousValue: T, newValue: T) -> R,
+): Flow<R> =
+ onStart { emit(initialValue) }.pairwiseBy(transform)
+
+/**
+ * Returns a new [Flow] that produces the two most recent emissions from [this]. Note that the new
+ * Flow will not start emitting until it has received two emissions from the upstream Flow.
+ *
+ * Useful for code that needs to compare the current value to the previous value.
+ */
+fun <T> Flow<T>.pairwise(): Flow<WithPrev<T>> = pairwiseBy(::WithPrev)
+
+/**
+ * Returns a new [Flow] that produces the two most recent emissions from [this]. [initialValue]
+ * will be used as the "old" value for the first emission.
+ *
+ * Useful for code that needs to compare the current value to the previous value.
+ */
+fun <T> Flow<T>.pairwise(initialValue: T): Flow<WithPrev<T>> = pairwiseBy(initialValue, ::WithPrev)
+
+/** Holds a [newValue] emitted from a [Flow], along with the [previousValue] emitted value. */
+data class WithPrev<T>(val previousValue: T, val newValue: T)
+
+/**
+ * Returns a new [Flow] that combines the [Set] changes between each emission from [this] using
+ * [transform].
+ */
+fun <T, R> Flow<Set<T>>.setChangesBy(
+ transform: suspend (removed: Set<T>, added: Set<T>) -> R,
+): Flow<R> = onStart { emit(emptySet()) }.distinctUntilChanged()
+ .pairwiseBy { old: Set<T>, new: Set<T> ->
+ // If an element was present in the old set, but not the new one, then it was removed
+ val removed = old - new
+ // If an element is present in the new set, but on the old one, then it was added
+ val added = new - old
+ transform(removed, added)
+ }
+
+/** Returns a new [Flow] that produces the [Set] changes between each emission from [this]. */
+fun <T> Flow<Set<T>>.setChanges(): Flow<SetChanges<T>> = setChangesBy(::SetChanges)
+
+/** Contains the difference in elements between two [Set]s. */
+data class SetChanges<T>(
+ /** Elements that are present in the first [Set] but not in the second. */
+ val removed: Set<T>,
+ /** Elements that are present in the second [Set] but not in the first. */
+ val added: Set<T>,
+)
diff --git a/packages/SystemUI/tests/src/com/android/systemui/util/kotlin/FlowUtilTests.kt b/packages/SystemUI/tests/src/com/android/systemui/util/kotlin/FlowUtilTests.kt
new file mode 100644
index 000000000000..092e82c526e3
--- /dev/null
+++ b/packages/SystemUI/tests/src/com/android/systemui/util/kotlin/FlowUtilTests.kt
@@ -0,0 +1,141 @@
+/*
+ * Copyright (C) 2022 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.systemui.util.kotlin
+
+import android.testing.AndroidTestingRunner
+import androidx.test.filters.SmallTest
+import com.android.systemui.SysuiTestCase
+import com.google.common.truth.Truth.assertThat
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.Job
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.MutableSharedFlow
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.flow.asFlow
+import kotlinx.coroutines.flow.emptyFlow
+import kotlinx.coroutines.flow.filterIsInstance
+import kotlinx.coroutines.flow.flowOf
+import kotlinx.coroutines.flow.merge
+import kotlinx.coroutines.flow.takeWhile
+import kotlinx.coroutines.flow.toList
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.runBlocking
+import org.junit.Test
+import org.junit.runner.RunWith
+
+@SmallTest
+@RunWith(AndroidTestingRunner::class)
+class PairwiseFlowTest : SysuiTestCase() {
+ @Test
+ fun simple() = runBlocking {
+ assertThatFlow((1..3).asFlow().pairwise())
+ .emitsExactly(
+ WithPrev(1, 2),
+ WithPrev(2, 3),
+ )
+ }
+
+ @Test
+ fun notEnough() = runBlocking {
+ assertThatFlow(flowOf(1).pairwise()).emitsNothing()
+ }
+
+ @Test
+ fun withInit() = runBlocking {
+ assertThatFlow(flowOf(2).pairwise(initialValue = 1))
+ .emitsExactly(WithPrev(1, 2))
+ }
+
+ @Test
+ fun notEnoughWithInit() = runBlocking {
+ assertThatFlow(emptyFlow<Int>().pairwise(initialValue = 1)).emitsNothing()
+ }
+
+ @Test
+ fun withStateFlow() = runBlocking(Dispatchers.Main.immediate) {
+ val state = MutableStateFlow(1)
+ val stop = MutableSharedFlow<Unit>()
+
+ val stoppable = merge(state, stop)
+ .takeWhile { it is Int }
+ .filterIsInstance<Int>()
+
+ val job1 = launch {
+ assertThatFlow(stoppable.pairwise()).emitsExactly(WithPrev(1, 2))
+ }
+ state.value = 2
+ val job2 = launch { assertThatFlow(stoppable.pairwise()).emitsNothing() }
+
+ stop.emit(Unit)
+
+ assertThatJob(job1).isCompleted()
+ assertThatJob(job2).isCompleted()
+ }
+}
+
+@SmallTest
+@RunWith(AndroidTestingRunner::class)
+class SetChangesFlowTest : SysuiTestCase() {
+ @Test
+ fun simple() = runBlocking {
+ assertThatFlow(
+ flowOf(setOf(1, 2, 3), setOf(2, 3, 4)).setChanges()
+ ).emitsExactly(
+ SetChanges(
+ added = setOf(1, 2, 3),
+ removed = emptySet(),
+ ),
+ SetChanges(
+ added = setOf(4),
+ removed = setOf(1),
+ ),
+ )
+ }
+
+ @Test
+ fun onlyOneEmission() = runBlocking {
+ assertThatFlow(flowOf(setOf(1)).setChanges())
+ .emitsExactly(
+ SetChanges(
+ added = setOf(1),
+ removed = emptySet(),
+ )
+ )
+ }
+
+ @Test
+ fun fromEmptySet() = runBlocking {
+ assertThatFlow(flowOf(emptySet(), setOf(1, 2)).setChanges())
+ .emitsExactly(
+ SetChanges(
+ removed = emptySet(),
+ added = setOf(1, 2),
+ )
+ )
+ }
+}
+
+private fun <T> assertThatFlow(flow: Flow<T>) = object {
+ suspend fun emitsExactly(vararg emissions: T) =
+ assertThat(flow.toList()).containsExactly(*emissions).inOrder()
+ suspend fun emitsNothing() =
+ assertThat(flow.toList()).isEmpty()
+}
+
+private fun assertThatJob(job: Job) = object {
+ fun isCompleted() = assertThat(job.isCompleted).isTrue()
+}