diff options
author | 2024-12-06 12:24:09 -0500 | |
---|---|---|
committer | 2025-01-03 15:58:38 -0500 | |
commit | a1debf09033e90d47872fd888b1e3c0f33b31ec7 (patch) | |
tree | eb97fc453f4d0efbcb455d63a099c24ddce0c492 | |
parent | bafc44ea3f1f9a59a0c28caafe90d8301bd9d6cb (diff) |
[kairos] keep effect coroutines running for duration of build scope
Some API adjustments to ensure that, even if an Events ends, any effects launched by observers continue in the expected build scope.
Flag: EXEMPT unused
Test: atest kairos-tests
Change-Id: I363cd841a887d8e633a3c0f290305b1a76d7b944
3 files changed, 108 insertions, 31 deletions
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt index 2482c262e1d1..b6918703b404 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt @@ -24,6 +24,7 @@ import kotlin.coroutines.EmptyCoroutineContext import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred +import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.Job import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.coroutineScope @@ -117,7 +118,7 @@ interface BuildScope : StateScope { fun <A> Events<A>.observe( coroutineContext: CoroutineContext = EmptyCoroutineContext, block: EffectScope.(A) -> Unit = {}, - ): Job + ): DisposableHandle /** * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the @@ -212,7 +213,7 @@ interface BuildScope : StateScope { * * @see observe */ - fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = + fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle = mapBuild(block).observe() /** @@ -568,7 +569,7 @@ interface BuildScope : StateScope { /** Returns a [Deferred] containing the next value to be emitted from this [Events]. */ fun <R> Events<R>.nextDeferred(): Deferred<R> { lateinit var next: CompletableDeferred<R> - val job = nextOnly().observe { next.complete(it) } + val job = launchScope { nextOnly().observe { next.complete(it) } } next = CompletableDeferred<R>(parent = job) return next } @@ -618,7 +619,7 @@ interface BuildScope : StateScope { * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are * cancelled). */ - fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): Job = + fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle = mapLatestBuild { block(it) }.observe() /** @@ -627,7 +628,7 @@ interface BuildScope : StateScope { * * With each invocation of [block], running effects from the previous invocation are cancelled. */ - fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): Job { + fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): DisposableHandle { var innerJob: Job? = null return observeBuild { innerJob?.cancel() @@ -696,7 +697,7 @@ interface BuildScope : StateScope { * emitting) then [block] will be invoked for the first time with the new value; otherwise, it * will be invoked with the [current][sample] value. */ - fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): Job = + fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): DisposableHandle = now.map { sample() }.mergeWith(changes) { _, new -> new }.observe { block(it) } } @@ -731,14 +732,14 @@ fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> = * * Shorthand for: * ```kotlin - * now.observe { block() } + * launchScope { now.observe { block() } } * ``` */ @ExperimentalKairosApi fun BuildScope.effect( context: CoroutineContext = EmptyCoroutineContext, block: EffectScope.() -> Unit, -): Job = now.observe(context) { block() } +): Job = launchScope { now.observe(context) { block() } } /** * Launches [block] in a new coroutine, returning a [Job] bound to the coroutine. @@ -773,7 +774,7 @@ fun BuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asy @ExperimentalKairosApi fun <R> BuildScope.asyncEffect(block: suspend CoroutineScope.() -> R): Deferred<R> { val result = CompletableDeferred<R>() - val job = now.observe { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } } + val job = effect { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } } val handle = job.invokeOnCompletion { result.cancel() } result.invokeOnCompletion { handle.dispose() diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt index bc5454ac724b..b20e77a31dab 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.CoroutineContext import kotlinx.coroutines.CompletableJob import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.job @@ -117,12 +118,13 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope override fun <A> Events<A>.observe( coroutineContext: CoroutineContext, block: EffectScope.(A) -> Unit, - ): Job { + ): DisposableHandle { val subRef = AtomicReference<Maybe<Output<A>>>(null) val childScope = coroutineScope.childScope() - // When our scope is cancelled, deactivate this observer. - childScope.coroutineContext.job.invokeOnCompletion { + lateinit var cancelHandle: DisposableHandle + val handle = DisposableHandle { subRef.getAndSet(None)?.let { output -> + cancelHandle.dispose() if (output is Just) { @Suppress("DeferredResultUnused") network.transaction("observeEffect cancelled") { @@ -131,25 +133,29 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope } } } + // When our scope is cancelled, deactivate this observer. + cancelHandle = childScope.coroutineContext.job.invokeOnCompletion { handle.dispose() } val localNetwork = LocalNetwork(network, childScope, endSignal) - // Defer so that we don't suspend the caller + val outputNode = + Output<A>( + context = coroutineContext, + onDeath = { subRef.set(None) }, + onEmit = { output -> + if (subRef.get() is Just) { + // Not cancelled, safe to emit + val scope = + object : EffectScope, TransactionScope by this { + override val effectCoroutineScope: CoroutineScope = childScope + override val kairosNetwork: KairosNetwork = localNetwork + } + scope.block(output) + } + }, + ) + // Defer, in case any EventsLoops / StateLoops still need to be set deferAction { - val outputNode = - Output<A>( - context = coroutineContext, - onDeath = { subRef.getAndSet(None)?.let { childScope.cancel() } }, - onEmit = { output -> - if (subRef.get() is Just) { - // Not cancelled, safe to emit - val scope = - object : EffectScope, TransactionScope by this@BuildScopeImpl { - override val effectCoroutineScope: CoroutineScope = childScope - override val kairosNetwork: KairosNetwork = localNetwork - } - block(scope, output) - } - }, - ) + // Check for immediate cancellation + if (subRef.get() != null) return@deferAction this@observe.takeUntil(endSignal) .init .connect(evalScope = stateScope.evalScope) @@ -164,7 +170,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope } } ?: run { childScope.cancel() } } - return childScope.coroutineContext.job + return handle } override fun <A, B> Events<A>.mapBuild(transform: BuildScope.(A) -> B): Events<B> { diff --git a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt index edfed15e8f6f..150b462df655 100644 --- a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt +++ b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt @@ -1083,7 +1083,7 @@ class KairosTests { } @Test - fun inpueventsCompleted() = runFrpTest { network -> + fun inputEventsCompleted() = runFrpTest { network -> val results = mutableListOf<Int>() val e = network.mutableEvents<Int>() activateSpec(network) { e.nextOnly().observe { results.add(it) } } @@ -1376,6 +1376,76 @@ class KairosTests { assertEquals(1, count) } + @Test + fun observeEffect_disposeHandle() = runFrpTest { network -> + val input = network.mutableEvents<Unit>() + val stopper = network.mutableEvents<Unit>() + var runningCount = 0 + val specJob = + activateSpec(network) { + val handle = + input.observe { + effectCoroutineScope.launch { + runningCount++ + awaitClose { runningCount-- } + } + } + stopper.nextOnly().observe { handle.dispose() } + } + runCurrent() + assertEquals(0, runningCount) + + input.emit(Unit) + assertEquals(1, runningCount) + + input.emit(Unit) + assertEquals(2, runningCount) + + stopper.emit(Unit) + assertEquals(2, runningCount) + + input.emit(Unit) + assertEquals(2, runningCount) + + specJob.cancel() + runCurrent() + assertEquals(0, runningCount) + } + + @Test + fun observeEffect_takeUntil() = runFrpTest { network -> + val input = network.mutableEvents<Unit>() + val stopper = network.mutableEvents<Unit>() + var runningCount = 0 + val specJob = + activateSpec(network) { + input.takeUntil(stopper).observe { + effectCoroutineScope.launch { + runningCount++ + awaitClose { runningCount-- } + } + } + } + runCurrent() + assertEquals(0, runningCount) + + input.emit(Unit) + assertEquals(1, runningCount) + + input.emit(Unit) + assertEquals(2, runningCount) + + stopper.emit(Unit) + assertEquals(2, runningCount) + + input.emit(Unit) + assertEquals(2, runningCount) + + specJob.cancel() + runCurrent() + assertEquals(0, runningCount) + } + private fun runFrpTest( timeout: Duration = 3.seconds, block: suspend TestScope.(KairosNetwork) -> Unit, |