summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Treehugger Robot <android-test-infra-autosubmit@system.gserviceaccount.com> 2024-12-09 21:20:11 +0000
committer Android (Google) Code Review <android-gerrit@google.com> 2024-12-09 21:20:11 +0000
commit6dfcf25a7f3973e064b6a1e10d214ef572dd8d1f (patch)
treed43be946b9fda428e3c315b56fd8d7c8d35cf641
parentb496de4906c96d23b21973c97d1c7eeccd420d21 (diff)
parent2570a8df7fc493a7e06a543a37d0fd28a3359b54 (diff)
Merge changes I9dd35754,I82c2f1d4,I31b82c5c,I0a851515 into main
* changes: [kairos] fix early termination of state accumulation [kairos] reduce coroutine usage in buildscope [kairos] annotate combinators as experimental [kairos] remove redundant NetworkScope.schedule(MuxNode)
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt23
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt30
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt2
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt10
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt2
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt6
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt5
-rw-r--r--packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt20
8 files changed, 68 insertions, 30 deletions
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt
index 17e407e06ea5..ae9b8c85910f 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt
@@ -28,15 +28,18 @@ import kotlinx.coroutines.flow.conflate
* Returns a [TFlow] that emits the value sampled from the [Transactional] produced by each emission
* of the original [TFlow], within the same transaction of the original emission.
*/
+@ExperimentalFrpApi
fun <A> TFlow<Transactional<A>>.sampleTransactionals(): TFlow<A> = map { it.sample() }
/** @see FrpTransactionScope.sample */
+@ExperimentalFrpApi
fun <A, B, C> TFlow<A>.sample(
state: TState<B>,
transform: suspend FrpTransactionScope.(A, B) -> C,
): TFlow<C> = map { transform(it, state.sample()) }
/** @see FrpTransactionScope.sample */
+@ExperimentalFrpApi
fun <A, B, C> TFlow<A>.sample(
transactional: Transactional<B>,
transform: suspend FrpTransactionScope.(A, B) -> C,
@@ -51,6 +54,7 @@ fun <A, B, C> TFlow<A>.sample(
*
* @see sample
*/
+@ExperimentalFrpApi
fun <A, B, C> TFlow<A>.samplePromptly(
state: TState<B>,
transform: suspend FrpTransactionScope.(A, B) -> C,
@@ -74,6 +78,7 @@ fun <A, B, C> TFlow<A>.samplePromptly(
* Returns a cold [Flow] that, when collected, emits from this [TFlow]. [network] is needed to
* transactionally connect to / disconnect from the [TFlow] when collection starts/stops.
*/
+@ExperimentalFrpApi
fun <A> TFlow<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
channelFlow { network.activateSpec { observe { trySend(it) } } }.conflate()
@@ -81,6 +86,7 @@ fun <A> TFlow<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
* Returns a cold [Flow] that, when collected, emits from this [TState]. [network] is needed to
* transactionally connect to / disconnect from the [TState] when collection starts/stops.
*/
+@ExperimentalFrpApi
fun <A> TState<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
channelFlow { network.activateSpec { observe { trySend(it) } } }.conflate()
@@ -90,6 +96,7 @@ fun <A> TState<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
*
* When collection is cancelled, so is the [FrpSpec]. This means all ongoing work is cleaned up.
*/
+@ExperimentalFrpApi
@JvmName("flowSpecToColdConflatedFlow")
fun <A> FrpSpec<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
channelFlow { network.activateSpec { applySpec().observe { trySend(it) } } }.conflate()
@@ -100,6 +107,7 @@ fun <A> FrpSpec<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
*
* When collection is cancelled, so is the [FrpSpec]. This means all ongoing work is cleaned up.
*/
+@ExperimentalFrpApi
@JvmName("stateSpecToColdConflatedFlow")
fun <A> FrpSpec<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
channelFlow { network.activateSpec { applySpec().observe { trySend(it) } } }.conflate()
@@ -108,6 +116,7 @@ fun <A> FrpSpec<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
* Returns a cold [Flow] that, when collected, applies this [Transactional] in a new transaction in
* this [network], and then emits from the returned [TFlow].
*/
+@ExperimentalFrpApi
@JvmName("transactionalFlowToColdConflatedFlow")
fun <A> Transactional<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
channelFlow { network.activateSpec { sample().observe { trySend(it) } } }.conflate()
@@ -116,6 +125,7 @@ fun <A> Transactional<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A
* Returns a cold [Flow] that, when collected, applies this [Transactional] in a new transaction in
* this [network], and then emits from the returned [TState].
*/
+@ExperimentalFrpApi
@JvmName("transactionalStateToColdConflatedFlow")
fun <A> Transactional<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
channelFlow { network.activateSpec { sample().observe { trySend(it) } } }.conflate()
@@ -126,6 +136,7 @@ fun <A> Transactional<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<
*
* When collection is cancelled, so is the [FrpStateful]. This means all ongoing work is cleaned up.
*/
+@ExperimentalFrpApi
@JvmName("statefulFlowToColdConflatedFlow")
fun <A> FrpStateful<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
channelFlow { network.activateSpec { applyStateful().observe { trySend(it) } } }.conflate()
@@ -136,11 +147,13 @@ fun <A> FrpStateful<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A>
*
* When collection is cancelled, so is the [FrpStateful]. This means all ongoing work is cleaned up.
*/
+@ExperimentalFrpApi
@JvmName("statefulStateToColdConflatedFlow")
fun <A> FrpStateful<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
channelFlow { network.activateSpec { applyStateful().observe { trySend(it) } } }.conflate()
/** Return a [TFlow] that emits from the original [TFlow] only when [state] is `true`. */
+@ExperimentalFrpApi
fun <A> TFlow<A>.filter(state: TState<Boolean>): TFlow<A> = filter { state.sample() }
private fun Iterable<Boolean>.allTrue() = all { it }
@@ -148,13 +161,15 @@ private fun Iterable<Boolean>.allTrue() = all { it }
private fun Iterable<Boolean>.anyTrue() = any { it }
/** Returns a [TState] that is `true` only when all of [states] are `true`. */
+@ExperimentalFrpApi
fun allOf(vararg states: TState<Boolean>): TState<Boolean> = combine(*states) { it.allTrue() }
/** Returns a [TState] that is `true` when any of [states] are `true`. */
+@ExperimentalFrpApi
fun anyOf(vararg states: TState<Boolean>): TState<Boolean> = combine(*states) { it.anyTrue() }
/** Returns a [TState] containing the inverse of the Boolean held by the original [TState]. */
-fun not(state: TState<Boolean>): TState<Boolean> = state.mapCheapUnsafe { !it }
+@ExperimentalFrpApi fun not(state: TState<Boolean>): TState<Boolean> = state.mapCheapUnsafe { !it }
/**
* Represents a modal FRP sub-network.
@@ -168,6 +183,7 @@ fun not(state: TState<Boolean>): TState<Boolean> = state.mapCheapUnsafe { !it }
*
* @see FrpStatefulMode
*/
+@ExperimentalFrpApi
fun interface FrpBuildMode<out A> {
/**
* Invoked when this mode is enabled. Returns a value and a [TFlow] that signals a switch to a
@@ -183,6 +199,7 @@ fun interface FrpBuildMode<out A> {
*
* @see FrpBuildMode
*/
+@ExperimentalFrpApi
val <A> FrpBuildMode<A>.compiledFrpSpec: FrpSpec<TState<A>>
get() = frpSpec {
var modeChangeEvents by TFlowLoop<FrpBuildMode<A>>()
@@ -206,6 +223,7 @@ val <A> FrpBuildMode<A>.compiledFrpSpec: FrpSpec<TState<A>>
*
* @see FrpBuildMode
*/
+@ExperimentalFrpApi
fun interface FrpStatefulMode<out A> {
/**
* Invoked when this mode is enabled. Returns a value and a [TFlow] that signals a switch to a
@@ -221,6 +239,7 @@ fun interface FrpStatefulMode<out A> {
*
* @see FrpBuildMode
*/
+@ExperimentalFrpApi
val <A> FrpStatefulMode<A>.compiledStateful: FrpStateful<TState<A>>
get() = statefully {
var modeChangeEvents by TFlowLoop<FrpStatefulMode<A>>()
@@ -237,6 +256,7 @@ val <A> FrpStatefulMode<A>.compiledStateful: FrpStateful<TState<A>>
* Runs [spec] in this [FrpBuildScope], and then re-runs it whenever [rebuildSignal] emits. Returns
* a [TState] that holds the result of the currently-active [FrpSpec].
*/
+@ExperimentalFrpApi
fun <A> FrpBuildScope.rebuildOn(rebuildSignal: TFlow<*>, spec: FrpSpec<A>): TState<A> =
rebuildSignal.map { spec }.holdLatestSpec(spec)
@@ -248,5 +268,6 @@ fun <A> FrpBuildScope.rebuildOn(rebuildSignal: TFlow<*>, spec: FrpSpec<A>): TSta
* stateChanges.map { WithPrev(previousValue = sample(), newValue = it) }
* ```
*/
+@ExperimentalFrpApi
val <A> TState<A>.transitions: TFlow<WithPrev<A, A>>
get() = stateChanges.map { WithPrev(previousValue = sample(), newValue = it) }
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
index 90f1aea3e42f..86f35f7b5a0f 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
@@ -34,9 +34,9 @@ import com.android.systemui.kairos.TFlowInit
import com.android.systemui.kairos.groupByKey
import com.android.systemui.kairos.init
import com.android.systemui.kairos.internal.util.childScope
-import com.android.systemui.kairos.internal.util.launchOnCancel
import com.android.systemui.kairos.internal.util.mapValuesParallel
import com.android.systemui.kairos.launchEffect
+import com.android.systemui.kairos.mergeLeft
import com.android.systemui.kairos.util.Just
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.None
@@ -49,7 +49,6 @@ import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.startCoroutine
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CompletableJob
-import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
@@ -87,7 +86,6 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
): TFlow<A> {
var job: Job? = null
val stopEmitter = newStopEmitter()
- val handle = this.job.invokeOnCompletion { stopEmitter.emit(Unit) }
// Create a child scope that will be kept alive beyond the end of this transaction.
val childScope = coroutineScope.childScope()
lateinit var emitter: Pair<T, S>
@@ -99,7 +97,6 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
reenterBuildScope(this@BuildScopeImpl, childScope).runInBuildScope {
launchEffect {
builder(emitter.second)
- handle.dispose()
stopEmitter.emit(Unit)
}
}
@@ -110,7 +107,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
},
)
emitter = constructFlow(inputNode)
- return with(frpScope) { emitter.first.takeUntil(stopEmitter) }
+ return with(frpScope) { emitter.first.takeUntil(mergeLeft(stopEmitter, endSignal)) }
}
private fun <T> tFlowInternal(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> =
@@ -164,7 +161,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
val subRef = AtomicReference<Maybe<Output<A>>>(null)
val childScope = coroutineScope.childScope()
// When our scope is cancelled, deactivate this observer.
- childScope.launchOnCancel(CoroutineName("TFlow.observeEffect")) {
+ childScope.coroutineContext.job.invokeOnCompletion {
subRef.getAndSet(None)?.let { output ->
if (output is Just) {
@Suppress("DeferredResultUnused")
@@ -215,7 +212,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
} else if (needsEval) {
outputNode.schedule(evalScope = stateScope.evalScope)
}
- } ?: childScope.cancel()
+ } ?: run { childScope.cancel() }
}
return childScope.coroutineContext.job
}
@@ -229,10 +226,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
"mapBuild",
mapImpl({ init.connect(evalScope = this) }) { spec ->
reenterBuildScope(outerScope = this@BuildScopeImpl, childScope)
- .runInBuildScope {
- val (result, _) = asyncScope { transform(spec) }
- result.get()
- }
+ .runInBuildScope { transform(spec) }
}
.cached(),
)
@@ -304,12 +298,14 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
childScope.coroutineContext.job.invokeOnCompletion { stopEmitter.emit(Unit) }
// Ensure that once this transaction is done, the new child scope enters the completing
// state (kept alive so long as there are child jobs).
- scheduleOutput(
- OneShot {
- // TODO: don't like this cast
- (childScope.coroutineContext.job as CompletableJob).complete()
- }
- )
+ // TODO: need to keep the scope alive if it's used to accumulate state.
+ // Otherwise, stopEmitter will emit early, due to the call to complete().
+ // scheduleOutput(
+ // OneShot {
+ // // TODO: don't like this cast
+ // (childScope.coroutineContext.job as CompletableJob).complete()
+ // }
+ // )
return BuildScopeImpl(
stateScope = StateScopeImpl(evalScope = stateScope.evalScope, endSignal = stopEmitter),
coroutineScope = childScope,
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt
index af864e6c3496..69994ba6e866 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt
@@ -66,8 +66,6 @@ internal interface NetworkScope : InitScope {
fun schedule(state: TStateSource<*>)
- suspend fun schedule(node: MuxNode<*, *, *>)
-
fun scheduleDeactivation(node: PushNode<*>)
fun scheduleDeactivation(output: Output<*>)
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt
index f7ff15f0507b..af68a1e3d83c 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt
@@ -188,6 +188,14 @@ internal sealed class MuxNode<K : Any, V, Output>(val lifecycle: MuxLifecycle<Ou
}
abstract fun hasCurrentValueLocked(transactionStore: TransactionStore): Boolean
+
+ fun schedule(evalScope: EvalScope) {
+ // TODO: Potential optimization
+ // Detect if this node is guaranteed to have a single upstream within this transaction,
+ // then bypass scheduling it. Instead immediately schedule its downstream and treat this
+ // MuxNode as a Pull (effectively making it a mapCheap).
+ depthTracker.schedule(evalScope.scheduler, this)
+ }
}
/** An input branch of a mux node, associated with a key. */
@@ -202,7 +210,7 @@ internal class MuxBranchNode<K : Any, V>(private val muxNode: MuxNode<K, V, *>,
val upstreamResult = upstream.getPushEvent(evalScope)
if (upstreamResult is Just) {
muxNode.upstreamData[key] = upstreamResult.value
- evalScope.schedule(muxNode)
+ muxNode.schedule(evalScope)
}
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt
index 08bee855831a..3b9502a5d812 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt
@@ -409,7 +409,7 @@ internal fun <K : Any, A> switchDeferredImpl(
// Schedule for evaluation if any switched-in nodes have already emitted within
// this transaction.
if (muxNode.upstreamData.isNotEmpty()) {
- evalScope.schedule(muxNode)
+ muxNode.schedule(evalScope)
}
return muxNode.takeUnless { muxNode.switchedIn.isEmpty() && !isIndirect }
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt
index cdfafa943121..b291c879b449 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt
@@ -75,7 +75,7 @@ internal class MuxPromptMovingNode<K : Any, V>(
if (depthTracker.dirty_depthIncreased()) {
depthTracker.schedule(evalScope.compactor, node = this)
}
- evalScope.schedule(this)
+ schedule(evalScope)
} else {
val compactDownstream = depthTracker.isDirty()
if (evalResult != null || compactDownstream) {
@@ -291,7 +291,7 @@ internal class MuxPromptPatchNode<K : Any, V>(private val muxNode: MuxPromptMovi
val upstreamResult = upstream.getPushEvent(evalScope)
if (upstreamResult is Just) {
muxNode.patchData = upstreamResult.value
- evalScope.schedule(muxNode)
+ muxNode.schedule(evalScope)
}
}
@@ -451,7 +451,7 @@ internal fun <K : Any, A> switchPromptImpl(
// Schedule for evaluation if any switched-in nodes or the patches node have
// already emitted within this transaction.
if (movingNode.patchData != null || movingNode.upstreamData.isNotEmpty()) {
- evalScope.schedule(movingNode)
+ movingNode.schedule(evalScope)
}
return movingNode.takeUnless { it.patches == null && it.switchedIn.isEmpty() }
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt
index 83440d85419e..f466113e42a8 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt
@@ -81,11 +81,6 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope {
stateWrites.add(state)
}
- // TODO: weird that we have this *and* scheduler exposed
- override suspend fun schedule(node: MuxNode<*, *, *>) {
- scheduler.schedule(node.depthTracker.dirty_directDepth, node)
- }
-
override fun scheduleDeactivation(node: PushNode<*>) {
deactivations.add(node)
}
diff --git a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
index 7294ef33cbc5..688adae8fcae 100644
--- a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
+++ b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt
@@ -1300,6 +1300,26 @@ class KairosTests {
}
@Test
+ fun buildScope_stateAccumulation() = runFrpTest { network ->
+ val input = network.mutableTFlow<Unit>()
+ var observedCount: Int? = null
+ activateSpec(network) {
+ val (c, j) = asyncScope { input.fold(0) { _, x -> x + 1 } }
+ deferredBuildScopeAction { c.get().observe { observedCount = it } }
+ }
+ runCurrent()
+ assertEquals(0, observedCount)
+
+ input.emit(Unit)
+ runCurrent()
+ assertEquals(1, observedCount)
+
+ input.emit(Unit)
+ runCurrent()
+ assertEquals(2, observedCount)
+ }
+
+ @Test
fun effect() = runFrpTest { network ->
val input = network.mutableTFlow<Unit>()
var effectRunning = false