summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Steve Elliott <steell@google.com> 2024-12-06 12:24:09 -0500
committer Steve Elliott <steell@google.com> 2025-01-03 15:58:38 -0500
commita1debf09033e90d47872fd888b1e3c0f33b31ec7 (patch)
treeeb97fc453f4d0efbcb455d63a099c24ddce0c492
parentbafc44ea3f1f9a59a0c28caafe90d8301bd9d6cb (diff)
[kairos] keep effect coroutines running for duration of build scope
Some API adjustments to ensure that, even if an Events ends, any effects launched by observers continue in the expected build scope. Flag: EXEMPT unused Test: atest kairos-tests Change-Id: I363cd841a887d8e633a3c0f290305b1a76d7b944
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt19
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt48
-rw-r--r--packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt72
3 files changed, 108 insertions, 31 deletions
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt
index 2482c262e1d1..b6918703b404 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt
@@ -24,6 +24,7 @@ import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
+import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.Job
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.coroutineScope
@@ -117,7 +118,7 @@ interface BuildScope : StateScope {
fun <A> Events<A>.observe(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
block: EffectScope.(A) -> Unit = {},
- ): Job
+ ): DisposableHandle
/**
* Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
@@ -212,7 +213,7 @@ interface BuildScope : StateScope {
*
* @see observe
*/
- fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job =
+ fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
mapBuild(block).observe()
/**
@@ -568,7 +569,7 @@ interface BuildScope : StateScope {
/** Returns a [Deferred] containing the next value to be emitted from this [Events]. */
fun <R> Events<R>.nextDeferred(): Deferred<R> {
lateinit var next: CompletableDeferred<R>
- val job = nextOnly().observe { next.complete(it) }
+ val job = launchScope { nextOnly().observe { next.complete(it) } }
next = CompletableDeferred<R>(parent = job)
return next
}
@@ -618,7 +619,7 @@ interface BuildScope : StateScope {
* registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
* cancelled).
*/
- fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): Job =
+ fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
mapLatestBuild { block(it) }.observe()
/**
@@ -627,7 +628,7 @@ interface BuildScope : StateScope {
*
* With each invocation of [block], running effects from the previous invocation are cancelled.
*/
- fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): Job {
+ fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): DisposableHandle {
var innerJob: Job? = null
return observeBuild {
innerJob?.cancel()
@@ -696,7 +697,7 @@ interface BuildScope : StateScope {
* emitting) then [block] will be invoked for the first time with the new value; otherwise, it
* will be invoked with the [current][sample] value.
*/
- fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): Job =
+ fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): DisposableHandle =
now.map { sample() }.mergeWith(changes) { _, new -> new }.observe { block(it) }
}
@@ -731,14 +732,14 @@ fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> =
*
* Shorthand for:
* ```kotlin
- * now.observe { block() }
+ * launchScope { now.observe { block() } }
* ```
*/
@ExperimentalKairosApi
fun BuildScope.effect(
context: CoroutineContext = EmptyCoroutineContext,
block: EffectScope.() -> Unit,
-): Job = now.observe(context) { block() }
+): Job = launchScope { now.observe(context) { block() } }
/**
* Launches [block] in a new coroutine, returning a [Job] bound to the coroutine.
@@ -773,7 +774,7 @@ fun BuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asy
@ExperimentalKairosApi
fun <R> BuildScope.asyncEffect(block: suspend CoroutineScope.() -> R): Deferred<R> {
val result = CompletableDeferred<R>()
- val job = now.observe { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } }
+ val job = effect { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } }
val handle = job.invokeOnCompletion { result.cancel() }
result.invokeOnCompletion {
handle.dispose()
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
index bc5454ac724b..b20e77a31dab 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.job
@@ -117,12 +118,13 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
override fun <A> Events<A>.observe(
coroutineContext: CoroutineContext,
block: EffectScope.(A) -> Unit,
- ): Job {
+ ): DisposableHandle {
val subRef = AtomicReference<Maybe<Output<A>>>(null)
val childScope = coroutineScope.childScope()
- // When our scope is cancelled, deactivate this observer.
- childScope.coroutineContext.job.invokeOnCompletion {
+ lateinit var cancelHandle: DisposableHandle
+ val handle = DisposableHandle {
subRef.getAndSet(None)?.let { output ->
+ cancelHandle.dispose()
if (output is Just) {
@Suppress("DeferredResultUnused")
network.transaction("observeEffect cancelled") {
@@ -131,25 +133,29 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
}
}
}
+ // When our scope is cancelled, deactivate this observer.
+ cancelHandle = childScope.coroutineContext.job.invokeOnCompletion { handle.dispose() }
val localNetwork = LocalNetwork(network, childScope, endSignal)
- // Defer so that we don't suspend the caller
+ val outputNode =
+ Output<A>(
+ context = coroutineContext,
+ onDeath = { subRef.set(None) },
+ onEmit = { output ->
+ if (subRef.get() is Just) {
+ // Not cancelled, safe to emit
+ val scope =
+ object : EffectScope, TransactionScope by this {
+ override val effectCoroutineScope: CoroutineScope = childScope
+ override val kairosNetwork: KairosNetwork = localNetwork
+ }
+ scope.block(output)
+ }
+ },
+ )
+ // Defer, in case any EventsLoops / StateLoops still need to be set
deferAction {
- val outputNode =
- Output<A>(
- context = coroutineContext,
- onDeath = { subRef.getAndSet(None)?.let { childScope.cancel() } },
- onEmit = { output ->
- if (subRef.get() is Just) {
- // Not cancelled, safe to emit
- val scope =
- object : EffectScope, TransactionScope by this@BuildScopeImpl {
- override val effectCoroutineScope: CoroutineScope = childScope
- override val kairosNetwork: KairosNetwork = localNetwork
- }
- block(scope, output)
- }
- },
- )
+ // Check for immediate cancellation
+ if (subRef.get() != null) return@deferAction
this@observe.takeUntil(endSignal)
.init
.connect(evalScope = stateScope.evalScope)
@@ -164,7 +170,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
}
} ?: run { childScope.cancel() }
}
- return childScope.coroutineContext.job
+ return handle
}
override fun <A, B> Events<A>.mapBuild(transform: BuildScope.(A) -> B): Events<B> {
diff --git a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
index edfed15e8f6f..150b462df655 100644
--- a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
+++ b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
@@ -1083,7 +1083,7 @@ class KairosTests {
}
@Test
- fun inpueventsCompleted() = runFrpTest { network ->
+ fun inputEventsCompleted() = runFrpTest { network ->
val results = mutableListOf<Int>()
val e = network.mutableEvents<Int>()
activateSpec(network) { e.nextOnly().observe { results.add(it) } }
@@ -1376,6 +1376,76 @@ class KairosTests {
assertEquals(1, count)
}
+ @Test
+ fun observeEffect_disposeHandle() = runFrpTest { network ->
+ val input = network.mutableEvents<Unit>()
+ val stopper = network.mutableEvents<Unit>()
+ var runningCount = 0
+ val specJob =
+ activateSpec(network) {
+ val handle =
+ input.observe {
+ effectCoroutineScope.launch {
+ runningCount++
+ awaitClose { runningCount-- }
+ }
+ }
+ stopper.nextOnly().observe { handle.dispose() }
+ }
+ runCurrent()
+ assertEquals(0, runningCount)
+
+ input.emit(Unit)
+ assertEquals(1, runningCount)
+
+ input.emit(Unit)
+ assertEquals(2, runningCount)
+
+ stopper.emit(Unit)
+ assertEquals(2, runningCount)
+
+ input.emit(Unit)
+ assertEquals(2, runningCount)
+
+ specJob.cancel()
+ runCurrent()
+ assertEquals(0, runningCount)
+ }
+
+ @Test
+ fun observeEffect_takeUntil() = runFrpTest { network ->
+ val input = network.mutableEvents<Unit>()
+ val stopper = network.mutableEvents<Unit>()
+ var runningCount = 0
+ val specJob =
+ activateSpec(network) {
+ input.takeUntil(stopper).observe {
+ effectCoroutineScope.launch {
+ runningCount++
+ awaitClose { runningCount-- }
+ }
+ }
+ }
+ runCurrent()
+ assertEquals(0, runningCount)
+
+ input.emit(Unit)
+ assertEquals(1, runningCount)
+
+ input.emit(Unit)
+ assertEquals(2, runningCount)
+
+ stopper.emit(Unit)
+ assertEquals(2, runningCount)
+
+ input.emit(Unit)
+ assertEquals(2, runningCount)
+
+ specJob.cancel()
+ runCurrent()
+ assertEquals(0, runningCount)
+ }
+
private fun runFrpTest(
timeout: Duration = 3.seconds,
block: suspend TestScope.(KairosNetwork) -> Unit,