diff options
3 files changed, 108 insertions, 31 deletions
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt index 2482c262e1d1..b6918703b404 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt @@ -24,6 +24,7 @@ import kotlin.coroutines.EmptyCoroutineContext import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred +import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.Job import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.coroutineScope @@ -117,7 +118,7 @@ interface BuildScope : StateScope { fun <A> Events<A>.observe( coroutineContext: CoroutineContext = EmptyCoroutineContext, block: EffectScope.(A) -> Unit = {}, - ): Job + ): DisposableHandle /** * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the @@ -212,7 +213,7 @@ interface BuildScope : StateScope { * * @see observe */ - fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = + fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle = mapBuild(block).observe() /** @@ -568,7 +569,7 @@ interface BuildScope : StateScope { /** Returns a [Deferred] containing the next value to be emitted from this [Events]. */ fun <R> Events<R>.nextDeferred(): Deferred<R> { lateinit var next: CompletableDeferred<R> - val job = nextOnly().observe { next.complete(it) } + val job = launchScope { nextOnly().observe { next.complete(it) } } next = CompletableDeferred<R>(parent = job) return next } @@ -618,7 +619,7 @@ interface BuildScope : StateScope { * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are * cancelled). */ - fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): Job = + fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle = mapLatestBuild { block(it) }.observe() /** @@ -627,7 +628,7 @@ interface BuildScope : StateScope { * * With each invocation of [block], running effects from the previous invocation are cancelled. */ - fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): Job { + fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): DisposableHandle { var innerJob: Job? = null return observeBuild { innerJob?.cancel() @@ -696,7 +697,7 @@ interface BuildScope : StateScope { * emitting) then [block] will be invoked for the first time with the new value; otherwise, it * will be invoked with the [current][sample] value. */ - fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): Job = + fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): DisposableHandle = now.map { sample() }.mergeWith(changes) { _, new -> new }.observe { block(it) } } @@ -731,14 +732,14 @@ fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> = * * Shorthand for: * ```kotlin - * now.observe { block() } + * launchScope { now.observe { block() } } * ``` */ @ExperimentalKairosApi fun BuildScope.effect( context: CoroutineContext = EmptyCoroutineContext, block: EffectScope.() -> Unit, -): Job = now.observe(context) { block() } +): Job = launchScope { now.observe(context) { block() } } /** * Launches [block] in a new coroutine, returning a [Job] bound to the coroutine. @@ -773,7 +774,7 @@ fun BuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asy @ExperimentalKairosApi fun <R> BuildScope.asyncEffect(block: suspend CoroutineScope.() -> R): Deferred<R> { val result = CompletableDeferred<R>() - val job = now.observe { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } } + val job = effect { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } } val handle = job.invokeOnCompletion { result.cancel() } result.invokeOnCompletion { handle.dispose() diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt index bc5454ac724b..b20e77a31dab 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.CoroutineContext import kotlinx.coroutines.CompletableJob import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.job @@ -117,12 +118,13 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope override fun <A> Events<A>.observe( coroutineContext: CoroutineContext, block: EffectScope.(A) -> Unit, - ): Job { + ): DisposableHandle { val subRef = AtomicReference<Maybe<Output<A>>>(null) val childScope = coroutineScope.childScope() - // When our scope is cancelled, deactivate this observer. - childScope.coroutineContext.job.invokeOnCompletion { + lateinit var cancelHandle: DisposableHandle + val handle = DisposableHandle { subRef.getAndSet(None)?.let { output -> + cancelHandle.dispose() if (output is Just) { @Suppress("DeferredResultUnused") network.transaction("observeEffect cancelled") { @@ -131,25 +133,29 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope } } } + // When our scope is cancelled, deactivate this observer. + cancelHandle = childScope.coroutineContext.job.invokeOnCompletion { handle.dispose() } val localNetwork = LocalNetwork(network, childScope, endSignal) - // Defer so that we don't suspend the caller + val outputNode = + Output<A>( + context = coroutineContext, + onDeath = { subRef.set(None) }, + onEmit = { output -> + if (subRef.get() is Just) { + // Not cancelled, safe to emit + val scope = + object : EffectScope, TransactionScope by this { + override val effectCoroutineScope: CoroutineScope = childScope + override val kairosNetwork: KairosNetwork = localNetwork + } + scope.block(output) + } + }, + ) + // Defer, in case any EventsLoops / StateLoops still need to be set deferAction { - val outputNode = - Output<A>( - context = coroutineContext, - onDeath = { subRef.getAndSet(None)?.let { childScope.cancel() } }, - onEmit = { output -> - if (subRef.get() is Just) { - // Not cancelled, safe to emit - val scope = - object : EffectScope, TransactionScope by this@BuildScopeImpl { - override val effectCoroutineScope: CoroutineScope = childScope - override val kairosNetwork: KairosNetwork = localNetwork - } - block(scope, output) - } - }, - ) + // Check for immediate cancellation + if (subRef.get() != null) return@deferAction this@observe.takeUntil(endSignal) .init .connect(evalScope = stateScope.evalScope) @@ -164,7 +170,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope } } ?: run { childScope.cancel() } } - return childScope.coroutineContext.job + return handle } override fun <A, B> Events<A>.mapBuild(transform: BuildScope.(A) -> B): Events<B> { diff --git a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt index edfed15e8f6f..150b462df655 100644 --- a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt +++ b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt @@ -1083,7 +1083,7 @@ class KairosTests { } @Test - fun inpueventsCompleted() = runFrpTest { network -> + fun inputEventsCompleted() = runFrpTest { network -> val results = mutableListOf<Int>() val e = network.mutableEvents<Int>() activateSpec(network) { e.nextOnly().observe { results.add(it) } } @@ -1376,6 +1376,76 @@ class KairosTests { assertEquals(1, count) } + @Test + fun observeEffect_disposeHandle() = runFrpTest { network -> + val input = network.mutableEvents<Unit>() + val stopper = network.mutableEvents<Unit>() + var runningCount = 0 + val specJob = + activateSpec(network) { + val handle = + input.observe { + effectCoroutineScope.launch { + runningCount++ + awaitClose { runningCount-- } + } + } + stopper.nextOnly().observe { handle.dispose() } + } + runCurrent() + assertEquals(0, runningCount) + + input.emit(Unit) + assertEquals(1, runningCount) + + input.emit(Unit) + assertEquals(2, runningCount) + + stopper.emit(Unit) + assertEquals(2, runningCount) + + input.emit(Unit) + assertEquals(2, runningCount) + + specJob.cancel() + runCurrent() + assertEquals(0, runningCount) + } + + @Test + fun observeEffect_takeUntil() = runFrpTest { network -> + val input = network.mutableEvents<Unit>() + val stopper = network.mutableEvents<Unit>() + var runningCount = 0 + val specJob = + activateSpec(network) { + input.takeUntil(stopper).observe { + effectCoroutineScope.launch { + runningCount++ + awaitClose { runningCount-- } + } + } + } + runCurrent() + assertEquals(0, runningCount) + + input.emit(Unit) + assertEquals(1, runningCount) + + input.emit(Unit) + assertEquals(2, runningCount) + + stopper.emit(Unit) + assertEquals(2, runningCount) + + input.emit(Unit) + assertEquals(2, runningCount) + + specJob.cancel() + runCurrent() + assertEquals(0, runningCount) + } + private fun runFrpTest( timeout: Duration = 3.seconds, block: suspend TestScope.(KairosNetwork) -> Unit, |