summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt19
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt48
-rw-r--r--packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt72
3 files changed, 108 insertions, 31 deletions
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt
index 2482c262e1d1..b6918703b404 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt
@@ -24,6 +24,7 @@ import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
+import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.Job
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.coroutineScope
@@ -117,7 +118,7 @@ interface BuildScope : StateScope {
fun <A> Events<A>.observe(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: EffectScope.(A) -> Unit = {},
- ): Job
+ ): DisposableHandle
/**
* Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
@@ -212,7 +213,7 @@ interface BuildScope : StateScope {
*
* @see observe
*/
- fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job =
+ fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
mapBuild(block).observe()
/**
@@ -568,7 +569,7 @@ interface BuildScope : StateScope {
/** Returns a [Deferred] containing the next value to be emitted from this [Events]. */
fun <R> Events<R>.nextDeferred(): Deferred<R> {
lateinit var next: CompletableDeferred<R>
- val job = nextOnly().observe { next.complete(it) }
+ val job = launchScope { nextOnly().observe { next.complete(it) } }
next = CompletableDeferred<R>(parent = job)
return next
}
@@ -618,7 +619,7 @@ interface BuildScope : StateScope {
* registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
* cancelled).
*/
- fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): Job =
+ fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
mapLatestBuild { block(it) }.observe()
/**
@@ -627,7 +628,7 @@ interface BuildScope : StateScope {
*
* With each invocation of [block], running effects from the previous invocation are cancelled.
*/
- fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): Job {
+ fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): DisposableHandle {
var innerJob: Job? = null
return observeBuild {
innerJob?.cancel()
@@ -696,7 +697,7 @@ interface BuildScope : StateScope {
* emitting) then [block] will be invoked for the first time with the new value; otherwise, it
* will be invoked with the [current][sample] value.
*/
- fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): Job =
+ fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): DisposableHandle =
now.map { sample() }.mergeWith(changes) { _, new -> new }.observe { block(it) }
}
@@ -731,14 +732,14 @@ fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> =
*
* Shorthand for:
* ```kotlin
- * now.observe { block() }
+ * launchScope { now.observe { block() } }
* ```
*/
@ExperimentalKairosApi
fun BuildScope.effect(
context: CoroutineContext = EmptyCoroutineContext,
block: EffectScope.() -> Unit,
-): Job = now.observe(context) { block() }
+): Job = launchScope { now.observe(context) { block() } }
/**
* Launches [block] in a new coroutine, returning a [Job] bound to the coroutine.
@@ -773,7 +774,7 @@ fun BuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asy
@ExperimentalKairosApi
fun <R> BuildScope.asyncEffect(block: suspend CoroutineScope.() -> R): Deferred<R> {
val result = CompletableDeferred<R>()
- val job = now.observe { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } }
+ val job = effect { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } }
val handle = job.invokeOnCompletion { result.cancel() }
result.invokeOnCompletion {
handle.dispose()
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
index bc5454ac724b..b20e77a31dab 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.job
@@ -117,12 +118,13 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
override fun <A> Events<A>.observe(
coroutineContext: CoroutineContext,
block: EffectScope.(A) -> Unit,
- ): Job {
+ ): DisposableHandle {
val subRef = AtomicReference<Maybe<Output<A>>>(null)
val childScope = coroutineScope.childScope()
- // When our scope is cancelled, deactivate this observer.
- childScope.coroutineContext.job.invokeOnCompletion {
+ lateinit var cancelHandle: DisposableHandle
+ val handle = DisposableHandle {
subRef.getAndSet(None)?.let { output ->
+ cancelHandle.dispose()
if (output is Just) {
@Suppress("DeferredResultUnused")
network.transaction("observeEffect cancelled") {
@@ -131,25 +133,29 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
}
}
}
+ // When our scope is cancelled, deactivate this observer.
+ cancelHandle = childScope.coroutineContext.job.invokeOnCompletion { handle.dispose() }
val localNetwork = LocalNetwork(network, childScope, endSignal)
- // Defer so that we don't suspend the caller
+ val outputNode =
+ Output<A>(
+ context = coroutineContext,
+ onDeath = { subRef.set(None) },
+ onEmit = { output ->
+ if (subRef.get() is Just) {
+ // Not cancelled, safe to emit
+ val scope =
+ object : EffectScope, TransactionScope by this {
+ override val effectCoroutineScope: CoroutineScope = childScope
+ override val kairosNetwork: KairosNetwork = localNetwork
+ }
+ scope.block(output)
+ }
+ },
+ )
+ // Defer, in case any EventsLoops / StateLoops still need to be set
deferAction {
- val outputNode =
- Output<A>(
- context = coroutineContext,
- onDeath = { subRef.getAndSet(None)?.let { childScope.cancel() } },
- onEmit = { output ->
- if (subRef.get() is Just) {
- // Not cancelled, safe to emit
- val scope =
- object : EffectScope, TransactionScope by this@BuildScopeImpl {
- override val effectCoroutineScope: CoroutineScope = childScope
- override val kairosNetwork: KairosNetwork = localNetwork
- }
- block(scope, output)
- }
- },
- )
+ // Check for immediate cancellation
+ if (subRef.get() != null) return@deferAction
this@observe.takeUntil(endSignal)
.init
.connect(evalScope = stateScope.evalScope)
@@ -164,7 +170,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
}
} ?: run { childScope.cancel() }
}
- return childScope.coroutineContext.job
+ return handle
}
override fun <A, B> Events<A>.mapBuild(transform: BuildScope.(A) -> B): Events<B> {
diff --git a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
index edfed15e8f6f..150b462df655 100644
--- a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
+++ b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
@@ -1083,7 +1083,7 @@ class KairosTests {
}
@Test
- fun inpueventsCompleted() = runFrpTest { network ->
+ fun inputEventsCompleted() = runFrpTest { network ->
val results = mutableListOf<Int>()
val e = network.mutableEvents<Int>()
activateSpec(network) { e.nextOnly().observe { results.add(it) } }
@@ -1376,6 +1376,76 @@ class KairosTests {
assertEquals(1, count)
}
+ @Test
+ fun observeEffect_disposeHandle() = runFrpTest { network ->
+ val input = network.mutableEvents<Unit>()
+ val stopper = network.mutableEvents<Unit>()
+ var runningCount = 0
+ val specJob =
+ activateSpec(network) {
+ val handle =
+ input.observe {
+ effectCoroutineScope.launch {
+ runningCount++
+ awaitClose { runningCount-- }
+ }
+ }
+ stopper.nextOnly().observe { handle.dispose() }
+ }
+ runCurrent()
+ assertEquals(0, runningCount)
+
+ input.emit(Unit)
+ assertEquals(1, runningCount)
+
+ input.emit(Unit)
+ assertEquals(2, runningCount)
+
+ stopper.emit(Unit)
+ assertEquals(2, runningCount)
+
+ input.emit(Unit)
+ assertEquals(2, runningCount)
+
+ specJob.cancel()
+ runCurrent()
+ assertEquals(0, runningCount)
+ }
+
+ @Test
+ fun observeEffect_takeUntil() = runFrpTest { network ->
+ val input = network.mutableEvents<Unit>()
+ val stopper = network.mutableEvents<Unit>()
+ var runningCount = 0
+ val specJob =
+ activateSpec(network) {
+ input.takeUntil(stopper).observe {
+ effectCoroutineScope.launch {
+ runningCount++
+ awaitClose { runningCount-- }
+ }
+ }
+ }
+ runCurrent()
+ assertEquals(0, runningCount)
+
+ input.emit(Unit)
+ assertEquals(1, runningCount)
+
+ input.emit(Unit)
+ assertEquals(2, runningCount)
+
+ stopper.emit(Unit)
+ assertEquals(2, runningCount)
+
+ input.emit(Unit)
+ assertEquals(2, runningCount)
+
+ specJob.cancel()
+ runCurrent()
+ assertEquals(0, runningCount)
+ }
+
private fun runFrpTest(
timeout: Duration = 3.seconds,
block: suspend TestScope.(KairosNetwork) -> Unit,