diff options
8 files changed, 68 insertions, 30 deletions
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt index 17e407e06ea5..ae9b8c85910f 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt @@ -28,15 +28,18 @@ import kotlinx.coroutines.flow.conflate * Returns a [TFlow] that emits the value sampled from the [Transactional] produced by each emission * of the original [TFlow], within the same transaction of the original emission. */ +@ExperimentalFrpApi fun <A> TFlow<Transactional<A>>.sampleTransactionals(): TFlow<A> = map { it.sample() } /** @see FrpTransactionScope.sample */ +@ExperimentalFrpApi fun <A, B, C> TFlow<A>.sample( state: TState<B>, transform: suspend FrpTransactionScope.(A, B) -> C, ): TFlow<C> = map { transform(it, state.sample()) } /** @see FrpTransactionScope.sample */ +@ExperimentalFrpApi fun <A, B, C> TFlow<A>.sample( transactional: Transactional<B>, transform: suspend FrpTransactionScope.(A, B) -> C, @@ -51,6 +54,7 @@ fun <A, B, C> TFlow<A>.sample( * * @see sample */ +@ExperimentalFrpApi fun <A, B, C> TFlow<A>.samplePromptly( state: TState<B>, transform: suspend FrpTransactionScope.(A, B) -> C, @@ -74,6 +78,7 @@ fun <A, B, C> TFlow<A>.samplePromptly( * Returns a cold [Flow] that, when collected, emits from this [TFlow]. [network] is needed to * transactionally connect to / disconnect from the [TFlow] when collection starts/stops. */ +@ExperimentalFrpApi fun <A> TFlow<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = channelFlow { network.activateSpec { observe { trySend(it) } } }.conflate() @@ -81,6 +86,7 @@ fun <A> TFlow<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = * Returns a cold [Flow] that, when collected, emits from this [TState]. [network] is needed to * transactionally connect to / disconnect from the [TState] when collection starts/stops. */ +@ExperimentalFrpApi fun <A> TState<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = channelFlow { network.activateSpec { observe { trySend(it) } } }.conflate() @@ -90,6 +96,7 @@ fun <A> TState<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = * * When collection is cancelled, so is the [FrpSpec]. This means all ongoing work is cleaned up. */ +@ExperimentalFrpApi @JvmName("flowSpecToColdConflatedFlow") fun <A> FrpSpec<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = channelFlow { network.activateSpec { applySpec().observe { trySend(it) } } }.conflate() @@ -100,6 +107,7 @@ fun <A> FrpSpec<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = * * When collection is cancelled, so is the [FrpSpec]. This means all ongoing work is cleaned up. */ +@ExperimentalFrpApi @JvmName("stateSpecToColdConflatedFlow") fun <A> FrpSpec<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = channelFlow { network.activateSpec { applySpec().observe { trySend(it) } } }.conflate() @@ -108,6 +116,7 @@ fun <A> FrpSpec<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = * Returns a cold [Flow] that, when collected, applies this [Transactional] in a new transaction in * this [network], and then emits from the returned [TFlow]. */ +@ExperimentalFrpApi @JvmName("transactionalFlowToColdConflatedFlow") fun <A> Transactional<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = channelFlow { network.activateSpec { sample().observe { trySend(it) } } }.conflate() @@ -116,6 +125,7 @@ fun <A> Transactional<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A * Returns a cold [Flow] that, when collected, applies this [Transactional] in a new transaction in * this [network], and then emits from the returned [TState]. */ +@ExperimentalFrpApi @JvmName("transactionalStateToColdConflatedFlow") fun <A> Transactional<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = channelFlow { network.activateSpec { sample().observe { trySend(it) } } }.conflate() @@ -126,6 +136,7 @@ fun <A> Transactional<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow< * * When collection is cancelled, so is the [FrpStateful]. This means all ongoing work is cleaned up. */ +@ExperimentalFrpApi @JvmName("statefulFlowToColdConflatedFlow") fun <A> FrpStateful<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = channelFlow { network.activateSpec { applyStateful().observe { trySend(it) } } }.conflate() @@ -136,11 +147,13 @@ fun <A> FrpStateful<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> * * When collection is cancelled, so is the [FrpStateful]. This means all ongoing work is cleaned up. */ +@ExperimentalFrpApi @JvmName("statefulStateToColdConflatedFlow") fun <A> FrpStateful<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> = channelFlow { network.activateSpec { applyStateful().observe { trySend(it) } } }.conflate() /** Return a [TFlow] that emits from the original [TFlow] only when [state] is `true`. */ +@ExperimentalFrpApi fun <A> TFlow<A>.filter(state: TState<Boolean>): TFlow<A> = filter { state.sample() } private fun Iterable<Boolean>.allTrue() = all { it } @@ -148,13 +161,15 @@ private fun Iterable<Boolean>.allTrue() = all { it } private fun Iterable<Boolean>.anyTrue() = any { it } /** Returns a [TState] that is `true` only when all of [states] are `true`. */ +@ExperimentalFrpApi fun allOf(vararg states: TState<Boolean>): TState<Boolean> = combine(*states) { it.allTrue() } /** Returns a [TState] that is `true` when any of [states] are `true`. */ +@ExperimentalFrpApi fun anyOf(vararg states: TState<Boolean>): TState<Boolean> = combine(*states) { it.anyTrue() } /** Returns a [TState] containing the inverse of the Boolean held by the original [TState]. */ -fun not(state: TState<Boolean>): TState<Boolean> = state.mapCheapUnsafe { !it } +@ExperimentalFrpApi fun not(state: TState<Boolean>): TState<Boolean> = state.mapCheapUnsafe { !it } /** * Represents a modal FRP sub-network. @@ -168,6 +183,7 @@ fun not(state: TState<Boolean>): TState<Boolean> = state.mapCheapUnsafe { !it } * * @see FrpStatefulMode */ +@ExperimentalFrpApi fun interface FrpBuildMode<out A> { /** * Invoked when this mode is enabled. Returns a value and a [TFlow] that signals a switch to a @@ -183,6 +199,7 @@ fun interface FrpBuildMode<out A> { * * @see FrpBuildMode */ +@ExperimentalFrpApi val <A> FrpBuildMode<A>.compiledFrpSpec: FrpSpec<TState<A>> get() = frpSpec { var modeChangeEvents by TFlowLoop<FrpBuildMode<A>>() @@ -206,6 +223,7 @@ val <A> FrpBuildMode<A>.compiledFrpSpec: FrpSpec<TState<A>> * * @see FrpBuildMode */ +@ExperimentalFrpApi fun interface FrpStatefulMode<out A> { /** * Invoked when this mode is enabled. Returns a value and a [TFlow] that signals a switch to a @@ -221,6 +239,7 @@ fun interface FrpStatefulMode<out A> { * * @see FrpBuildMode */ +@ExperimentalFrpApi val <A> FrpStatefulMode<A>.compiledStateful: FrpStateful<TState<A>> get() = statefully { var modeChangeEvents by TFlowLoop<FrpStatefulMode<A>>() @@ -237,6 +256,7 @@ val <A> FrpStatefulMode<A>.compiledStateful: FrpStateful<TState<A>> * Runs [spec] in this [FrpBuildScope], and then re-runs it whenever [rebuildSignal] emits. Returns * a [TState] that holds the result of the currently-active [FrpSpec]. */ +@ExperimentalFrpApi fun <A> FrpBuildScope.rebuildOn(rebuildSignal: TFlow<*>, spec: FrpSpec<A>): TState<A> = rebuildSignal.map { spec }.holdLatestSpec(spec) @@ -248,5 +268,6 @@ fun <A> FrpBuildScope.rebuildOn(rebuildSignal: TFlow<*>, spec: FrpSpec<A>): TSta * stateChanges.map { WithPrev(previousValue = sample(), newValue = it) } * ``` */ +@ExperimentalFrpApi val <A> TState<A>.transitions: TFlow<WithPrev<A, A>> get() = stateChanges.map { WithPrev(previousValue = sample(), newValue = it) } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt index 90f1aea3e42f..86f35f7b5a0f 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt @@ -34,9 +34,9 @@ import com.android.systemui.kairos.TFlowInit import com.android.systemui.kairos.groupByKey import com.android.systemui.kairos.init import com.android.systemui.kairos.internal.util.childScope -import com.android.systemui.kairos.internal.util.launchOnCancel import com.android.systemui.kairos.internal.util.mapValuesParallel import com.android.systemui.kairos.launchEffect +import com.android.systemui.kairos.mergeLeft import com.android.systemui.kairos.util.Just import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.None @@ -49,7 +49,6 @@ import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.startCoroutine import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CompletableJob -import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.Job @@ -87,7 +86,6 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope ): TFlow<A> { var job: Job? = null val stopEmitter = newStopEmitter() - val handle = this.job.invokeOnCompletion { stopEmitter.emit(Unit) } // Create a child scope that will be kept alive beyond the end of this transaction. val childScope = coroutineScope.childScope() lateinit var emitter: Pair<T, S> @@ -99,7 +97,6 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope reenterBuildScope(this@BuildScopeImpl, childScope).runInBuildScope { launchEffect { builder(emitter.second) - handle.dispose() stopEmitter.emit(Unit) } } @@ -110,7 +107,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope }, ) emitter = constructFlow(inputNode) - return with(frpScope) { emitter.first.takeUntil(stopEmitter) } + return with(frpScope) { emitter.first.takeUntil(mergeLeft(stopEmitter, endSignal)) } } private fun <T> tFlowInternal(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> = @@ -164,7 +161,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope val subRef = AtomicReference<Maybe<Output<A>>>(null) val childScope = coroutineScope.childScope() // When our scope is cancelled, deactivate this observer. - childScope.launchOnCancel(CoroutineName("TFlow.observeEffect")) { + childScope.coroutineContext.job.invokeOnCompletion { subRef.getAndSet(None)?.let { output -> if (output is Just) { @Suppress("DeferredResultUnused") @@ -215,7 +212,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope } else if (needsEval) { outputNode.schedule(evalScope = stateScope.evalScope) } - } ?: childScope.cancel() + } ?: run { childScope.cancel() } } return childScope.coroutineContext.job } @@ -229,10 +226,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope "mapBuild", mapImpl({ init.connect(evalScope = this) }) { spec -> reenterBuildScope(outerScope = this@BuildScopeImpl, childScope) - .runInBuildScope { - val (result, _) = asyncScope { transform(spec) } - result.get() - } + .runInBuildScope { transform(spec) } } .cached(), ) @@ -304,12 +298,14 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope childScope.coroutineContext.job.invokeOnCompletion { stopEmitter.emit(Unit) } // Ensure that once this transaction is done, the new child scope enters the completing // state (kept alive so long as there are child jobs). - scheduleOutput( - OneShot { - // TODO: don't like this cast - (childScope.coroutineContext.job as CompletableJob).complete() - } - ) + // TODO: need to keep the scope alive if it's used to accumulate state. + // Otherwise, stopEmitter will emit early, due to the call to complete(). + // scheduleOutput( + // OneShot { + // // TODO: don't like this cast + // (childScope.coroutineContext.job as CompletableJob).complete() + // } + // ) return BuildScopeImpl( stateScope = StateScopeImpl(evalScope = stateScope.evalScope, endSignal = stopEmitter), coroutineScope = childScope, diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt index af864e6c3496..69994ba6e866 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt @@ -66,8 +66,6 @@ internal interface NetworkScope : InitScope { fun schedule(state: TStateSource<*>) - suspend fun schedule(node: MuxNode<*, *, *>) - fun scheduleDeactivation(node: PushNode<*>) fun scheduleDeactivation(output: Output<*>) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt index f7ff15f0507b..af68a1e3d83c 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt @@ -188,6 +188,14 @@ internal sealed class MuxNode<K : Any, V, Output>(val lifecycle: MuxLifecycle<Ou } abstract fun hasCurrentValueLocked(transactionStore: TransactionStore): Boolean + + fun schedule(evalScope: EvalScope) { + // TODO: Potential optimization + // Detect if this node is guaranteed to have a single upstream within this transaction, + // then bypass scheduling it. Instead immediately schedule its downstream and treat this + // MuxNode as a Pull (effectively making it a mapCheap). + depthTracker.schedule(evalScope.scheduler, this) + } } /** An input branch of a mux node, associated with a key. */ @@ -202,7 +210,7 @@ internal class MuxBranchNode<K : Any, V>(private val muxNode: MuxNode<K, V, *>, val upstreamResult = upstream.getPushEvent(evalScope) if (upstreamResult is Just) { muxNode.upstreamData[key] = upstreamResult.value - evalScope.schedule(muxNode) + muxNode.schedule(evalScope) } } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt index 08bee855831a..3b9502a5d812 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt @@ -409,7 +409,7 @@ internal fun <K : Any, A> switchDeferredImpl( // Schedule for evaluation if any switched-in nodes have already emitted within // this transaction. if (muxNode.upstreamData.isNotEmpty()) { - evalScope.schedule(muxNode) + muxNode.schedule(evalScope) } return muxNode.takeUnless { muxNode.switchedIn.isEmpty() && !isIndirect } } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt index cdfafa943121..b291c879b449 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt @@ -75,7 +75,7 @@ internal class MuxPromptMovingNode<K : Any, V>( if (depthTracker.dirty_depthIncreased()) { depthTracker.schedule(evalScope.compactor, node = this) } - evalScope.schedule(this) + schedule(evalScope) } else { val compactDownstream = depthTracker.isDirty() if (evalResult != null || compactDownstream) { @@ -291,7 +291,7 @@ internal class MuxPromptPatchNode<K : Any, V>(private val muxNode: MuxPromptMovi val upstreamResult = upstream.getPushEvent(evalScope) if (upstreamResult is Just) { muxNode.patchData = upstreamResult.value - evalScope.schedule(muxNode) + muxNode.schedule(evalScope) } } @@ -451,7 +451,7 @@ internal fun <K : Any, A> switchPromptImpl( // Schedule for evaluation if any switched-in nodes or the patches node have // already emitted within this transaction. if (movingNode.patchData != null || movingNode.upstreamData.isNotEmpty()) { - evalScope.schedule(movingNode) + movingNode.schedule(evalScope) } return movingNode.takeUnless { it.patches == null && it.switchedIn.isEmpty() } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt index 83440d85419e..f466113e42a8 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt @@ -81,11 +81,6 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope { stateWrites.add(state) } - // TODO: weird that we have this *and* scheduler exposed - override suspend fun schedule(node: MuxNode<*, *, *>) { - scheduler.schedule(node.depthTracker.dirty_directDepth, node) - } - override fun scheduleDeactivation(node: PushNode<*>) { deactivations.add(node) } diff --git a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt index 7294ef33cbc5..688adae8fcae 100644 --- a/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt +++ b/packages/SystemUI/utils/kairos/test/com/android/systemui/kairos/KairosTests.kt @@ -1300,6 +1300,26 @@ class KairosTests { } @Test + fun buildScope_stateAccumulation() = runFrpTest { network -> + val input = network.mutableTFlow<Unit>() + var observedCount: Int? = null + activateSpec(network) { + val (c, j) = asyncScope { input.fold(0) { _, x -> x + 1 } } + deferredBuildScopeAction { c.get().observe { observedCount = it } } + } + runCurrent() + assertEquals(0, observedCount) + + input.emit(Unit) + runCurrent() + assertEquals(1, observedCount) + + input.emit(Unit) + runCurrent() + assertEquals(2, observedCount) + } + + @Test fun effect() = runFrpTest { network -> val input = network.mutableTFlow<Unit>() var effectRunning = false |