diff options
author | 2024-12-06 16:27:41 -0500 | |
---|---|---|
committer | 2025-01-03 15:55:35 -0500 | |
commit | d6865b295361c40202506a86aeaa6dfbbbb6e5c7 (patch) | |
tree | 161e5f649a44735dcd48a2457462c6f9ce24a43a | |
parent | 02946ae243e89bef2bae8087e91262e2ae8f499b (diff) |
[kairos] remove most internal usage of `suspend fun`
And by proxy, all concurrency from internal graph evaluation.
This produces a large performance improvement, mostly due to observed overhead with `suspend fun`.
Flag: EXEMPT unused
Test: atest kairos-tests
Change-Id: I72d3a0a15ae4d9a143eca8d587f177fdee171a44
34 files changed, 1589 insertions, 1723 deletions
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt index ae9b8c85910f..d5576b3b83df 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt @@ -35,14 +35,14 @@ fun <A> TFlow<Transactional<A>>.sampleTransactionals(): TFlow<A> = map { it.samp @ExperimentalFrpApi fun <A, B, C> TFlow<A>.sample( state: TState<B>, - transform: suspend FrpTransactionScope.(A, B) -> C, + transform: FrpTransactionScope.(A, B) -> C, ): TFlow<C> = map { transform(it, state.sample()) } /** @see FrpTransactionScope.sample */ @ExperimentalFrpApi fun <A, B, C> TFlow<A>.sample( transactional: Transactional<B>, - transform: suspend FrpTransactionScope.(A, B) -> C, + transform: FrpTransactionScope.(A, B) -> C, ): TFlow<C> = map { transform(it, transactional.sample()) } /** @@ -57,7 +57,7 @@ fun <A, B, C> TFlow<A>.sample( @ExperimentalFrpApi fun <A, B, C> TFlow<A>.samplePromptly( state: TState<B>, - transform: suspend FrpTransactionScope.(A, B) -> C, + transform: FrpTransactionScope.(A, B) -> C, ): TFlow<C> = sample(state) { a, b -> These.thiz<Pair<A, B>, B>(a to b) } .mergeWith(state.stateChanges.map { These.that(it) }) { thiz, that -> @@ -189,7 +189,7 @@ fun interface FrpBuildMode<out A> { * Invoked when this mode is enabled. Returns a value and a [TFlow] that signals a switch to a * new mode. */ - suspend fun FrpBuildScope.enableMode(): Pair<A, TFlow<FrpBuildMode<A>>> + fun FrpBuildScope.enableMode(): Pair<A, TFlow<FrpBuildMode<A>>> } /** @@ -229,7 +229,7 @@ fun interface FrpStatefulMode<out A> { * Invoked when this mode is enabled. Returns a value and a [TFlow] that signals a switch to a * new mode. */ - suspend fun FrpStateScope.enableMode(): Pair<A, TFlow<FrpStatefulMode<A>>> + fun FrpStateScope.enableMode(): Pair<A, TFlow<FrpStatefulMode<A>>> } /** diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpBuildScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpBuildScope.kt index 209a402bd629..31778dc32697 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpBuildScope.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpBuildScope.kt @@ -42,7 +42,7 @@ import kotlinx.coroutines.flow.scan import kotlinx.coroutines.launch /** A function that modifies the FrpNetwork. */ -typealias FrpSpec<A> = suspend FrpBuildScope.() -> A +typealias FrpSpec<A> = FrpBuildScope.() -> A /** * Constructs an [FrpSpec]. The passed [block] will be invoked with an [FrpBuildScope] that can be @@ -51,7 +51,7 @@ typealias FrpSpec<A> = suspend FrpBuildScope.() -> A */ @ExperimentalFrpApi @Suppress("NOTHING_TO_INLINE") -inline fun <A> frpSpec(noinline block: suspend FrpBuildScope.() -> A): FrpSpec<A> = block +inline fun <A> frpSpec(noinline block: FrpBuildScope.() -> A): FrpSpec<A> = block /** Applies the [FrpSpec] within this [FrpBuildScope]. */ @ExperimentalFrpApi @@ -63,11 +63,14 @@ inline operator fun <A> FrpBuildScope.invoke(block: FrpBuildScope.() -> A) = run interface FrpBuildScope : FrpStateScope { /** TODO: Javadoc */ + val frpNetwork: FrpNetwork + + /** TODO: Javadoc */ @ExperimentalFrpApi - fun <R> deferredBuildScope(block: suspend FrpBuildScope.() -> R): FrpDeferredValue<R> + fun <R> deferredBuildScope(block: FrpBuildScope.() -> R): FrpDeferredValue<R> /** TODO: Javadoc */ - @ExperimentalFrpApi fun deferredBuildScopeAction(block: suspend FrpBuildScope.() -> Unit) + @ExperimentalFrpApi fun deferredBuildScopeAction(block: FrpBuildScope.() -> Unit) /** * Returns a [TFlow] containing the results of applying [transform] to each value of the @@ -81,8 +84,7 @@ interface FrpBuildScope : FrpStateScope { * (or a downstream) [TFlow] is observed separately, [transform] will not be invoked, and no * internal side-effects will occur. */ - @ExperimentalFrpApi - fun <A, B> TFlow<A>.mapBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B> + @ExperimentalFrpApi fun <A, B> TFlow<A>.mapBuild(transform: FrpBuildScope.(A) -> B): TFlow<B> /** * Invokes [block] whenever this [TFlow] emits a value, allowing side-effects to be safely @@ -100,7 +102,7 @@ interface FrpBuildScope : FrpStateScope { @ExperimentalFrpApi fun <A> TFlow<A>.observe( coroutineContext: CoroutineContext = EmptyCoroutineContext, - block: suspend FrpEffectScope.(A) -> Unit = {}, + block: FrpEffectScope.(A) -> Unit = {}, ): Job /** @@ -133,7 +135,8 @@ interface FrpBuildScope : FrpStateScope { * tFlow { ... }.apply { observe() } * ``` */ - @ExperimentalFrpApi fun <T> tFlow(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> + @ExperimentalFrpApi + fun <T> tFlow(name: String? = null, builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> /** * Creates an instance of a [TFlow] with elements that are emitted from [builder]. @@ -197,7 +200,7 @@ interface FrpBuildScope : FrpStateScope { * @see observe */ @ExperimentalFrpApi - fun <A> TFlow<A>.observeBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job = + fun <A> TFlow<A>.observeBuild(block: FrpBuildScope.(A) -> Unit = {}): Job = mapBuild(block).observe() /** @@ -320,9 +323,8 @@ interface FrpBuildScope : FrpStateScope { * [observers][observe] are unregistered, and any pending [effects][effect] are cancelled). */ @ExperimentalFrpApi - fun <A, B> TFlow<A>.flatMapLatestBuild( - transform: suspend FrpBuildScope.(A) -> TFlow<B> - ): TFlow<B> = mapCheap { frpSpec { transform(it) } }.applyLatestSpec().flatten() + fun <A, B> TFlow<A>.flatMapLatestBuild(transform: FrpBuildScope.(A) -> TFlow<B>): TFlow<B> = + mapCheap { frpSpec { transform(it) } }.applyLatestSpec().flatten() /** * Returns a [TState] by applying [transform] to the value held by the original [TState]. @@ -333,9 +335,8 @@ interface FrpBuildScope : FrpStateScope { * cancelled). */ @ExperimentalFrpApi - fun <A, B> TState<A>.flatMapLatestBuild( - transform: suspend FrpBuildScope.(A) -> TState<B> - ): TState<B> = mapLatestBuild { transform(it) }.flatten() + fun <A, B> TState<A>.flatMapLatestBuild(transform: FrpBuildScope.(A) -> TState<B>): TState<B> = + mapLatestBuild { transform(it) }.flatten() /** * Returns a [TState] that transforms the value held inside this [TState] by applying it to the @@ -347,7 +348,7 @@ interface FrpBuildScope : FrpStateScope { * cancelled). */ @ExperimentalFrpApi - fun <A, B> TState<A>.mapLatestBuild(transform: suspend FrpBuildScope.(A) -> B): TState<B> = + fun <A, B> TState<A>.mapLatestBuild(transform: FrpBuildScope.(A) -> B): TState<B> = mapCheapUnsafe { frpSpec { transform(it) } }.applyLatestSpec() /** @@ -391,7 +392,7 @@ interface FrpBuildScope : FrpStateScope { * cancelled). */ @ExperimentalFrpApi - fun <A, B> TFlow<A>.mapLatestBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B> = + fun <A, B> TFlow<A>.mapLatestBuild(transform: FrpBuildScope.(A) -> B): TFlow<B> = mapCheap { frpSpec { transform(it) } }.applyLatestSpec() /** @@ -407,7 +408,7 @@ interface FrpBuildScope : FrpStateScope { @ExperimentalFrpApi fun <A, B> TFlow<A>.mapLatestBuild( initialValue: A, - transform: suspend FrpBuildScope.(A) -> B, + transform: FrpBuildScope.(A) -> B, ): Pair<TFlow<B>, FrpDeferredValue<B>> = mapLatestBuildDeferred(deferredOf(initialValue), transform) @@ -424,7 +425,7 @@ interface FrpBuildScope : FrpStateScope { @ExperimentalFrpApi fun <A, B> TFlow<A>.mapLatestBuildDeferred( initialValue: FrpDeferredValue<A>, - transform: suspend FrpBuildScope.(A) -> B, + transform: FrpBuildScope.(A) -> B, ): Pair<TFlow<B>, FrpDeferredValue<B>> = mapCheap { frpSpec { transform(it) } } .applyLatestSpec(initialSpec = frpSpec { transform(initialValue.get()) }) @@ -519,12 +520,12 @@ interface FrpBuildScope : FrpStateScope { fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey( initialValues: FrpDeferredValue<Map<K, A>>, numKeys: Int? = null, - transform: suspend FrpBuildScope.(A) -> B, + transform: FrpBuildScope.(K, A) -> B, ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> = - map { patch -> patch.mapValues { (_, v) -> v.map { frpSpec { transform(it) } } } } + map { patch -> patch.mapValues { (k, v) -> v.map { frpSpec { transform(k, it) } } } } .applyLatestSpecForKey( deferredBuildScope { - initialValues.get().mapValues { (_, v) -> frpSpec { transform(v) } } + initialValues.get().mapValues { (k, v) -> frpSpec { transform(k, v) } } }, numKeys = numKeys, ) @@ -546,7 +547,7 @@ interface FrpBuildScope : FrpStateScope { fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey( initialValues: Map<K, A>, numKeys: Int? = null, - transform: suspend FrpBuildScope.(A) -> B, + transform: FrpBuildScope.(K, A) -> B, ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> = mapLatestBuildForKey(deferredOf(initialValues), numKeys, transform) @@ -565,7 +566,7 @@ interface FrpBuildScope : FrpStateScope { @ExperimentalFrpApi fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey( numKeys: Int? = null, - transform: suspend FrpBuildScope.(A) -> B, + transform: FrpBuildScope.(K, A) -> B, ): TFlow<Map<K, Maybe<B>>> = mapLatestBuildForKey(emptyMap(), numKeys, transform).first /** Returns a [Deferred] containing the next value to be emitted from this [TFlow]. */ @@ -585,7 +586,8 @@ interface FrpBuildScope : FrpStateScope { } /** Returns a [TFlow] that emits whenever this [Flow] emits. */ - @ExperimentalFrpApi fun <A> Flow<A>.toTFlow(): TFlow<A> = tFlow { collect { emit(it) } } + @ExperimentalFrpApi + fun <A> Flow<A>.toTFlow(name: String? = null): TFlow<A> = tFlow(name) { collect { emit(it) } } /** * Shorthand for: @@ -626,7 +628,7 @@ interface FrpBuildScope : FrpStateScope { * cancelled). */ @ExperimentalFrpApi - fun <A> TFlow<A>.observeLatestBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job = + fun <A> TFlow<A>.observeLatestBuild(block: FrpBuildScope.(A) -> Unit = {}): Job = mapLatestBuild { block(it) }.observe() /** @@ -636,7 +638,7 @@ interface FrpBuildScope : FrpStateScope { * With each invocation of [block], running effects from the previous invocation are cancelled. */ @ExperimentalFrpApi - fun <A> TFlow<A>.observeLatest(block: suspend FrpEffectScope.(A) -> Unit = {}): Job { + fun <A> TFlow<A>.observeLatest(block: FrpEffectScope.(A) -> Unit = {}): Job { var innerJob: Job? = null return observeBuild { innerJob?.cancel() @@ -651,14 +653,13 @@ interface FrpBuildScope : FrpStateScope { * With each invocation of [block], running effects from the previous invocation are cancelled. */ @ExperimentalFrpApi - fun <A> TState<A>.observeLatest(block: suspend FrpEffectScope.(A) -> Unit = {}): Job = - launchScope { - var innerJob = effect { block(sample()) } - stateChanges.observeBuild { - innerJob.cancel() - innerJob = effect { block(it) } - } + fun <A> TState<A>.observeLatest(block: FrpEffectScope.(A) -> Unit = {}): Job = launchScope { + var innerJob = effect { block(sample()) } + stateChanges.observeBuild { + innerJob.cancel() + innerJob = effect { block(it) } } + } /** * Applies [block] to the value held by this [TState]. [block] receives an [FrpBuildScope] that @@ -670,17 +671,16 @@ interface FrpBuildScope : FrpStateScope { * [observers][observe] are unregistered, and any pending [side-effects][effect] are cancelled). */ @ExperimentalFrpApi - fun <A> TState<A>.observeLatestBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job = - launchScope { - var innerJob: Job = launchScope { block(sample()) } - stateChanges.observeBuild { - innerJob.cancel() - innerJob = launchScope { block(it) } - } + fun <A> TState<A>.observeLatestBuild(block: FrpBuildScope.(A) -> Unit = {}): Job = launchScope { + var innerJob: Job = launchScope { block(sample()) } + stateChanges.observeBuild { + innerJob.cancel() + innerJob = launchScope { block(it) } } + } /** Applies the [FrpSpec] within this [FrpBuildScope]. */ - @ExperimentalFrpApi suspend fun <A> FrpSpec<A>.applySpec(): A = this() + @ExperimentalFrpApi fun <A> FrpSpec<A>.applySpec(): A = this() /** * Applies the [FrpSpec] within this [FrpBuildScope], returning the result as an @@ -695,11 +695,10 @@ interface FrpBuildScope : FrpStateScope { * [effect]. */ @ExperimentalFrpApi - fun <A> TState<A>.observeBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job = - launchScope { - block(sample()) - stateChanges.observeBuild(block) - } + fun <A> TState<A>.observeBuild(block: FrpBuildScope.(A) -> Unit = {}): Job = launchScope { + block(sample()) + stateChanges.observeBuild(block) + } /** * Invokes [block] with the current value of this [TState], re-invoking whenever it changes, @@ -714,7 +713,7 @@ interface FrpBuildScope : FrpStateScope { * otherwise, it will be invoked with the [current][sample] value. */ @ExperimentalFrpApi - fun <A> TState<A>.observe(block: suspend FrpEffectScope.(A) -> Unit = {}): Job = + fun <A> TState<A>.observe(block: FrpEffectScope.(A) -> Unit = {}): Job = now.map { sample() }.mergeWith(stateChanges) { _, new -> new }.observe { block(it) } } @@ -753,7 +752,10 @@ fun <A> FrpBuildScope.asyncTFlow(block: suspend () -> A): TFlow<A> = * ``` */ @ExperimentalFrpApi -fun FrpBuildScope.effect(block: suspend FrpEffectScope.() -> Unit): Job = now.observe { block() } +fun FrpBuildScope.effect( + context: CoroutineContext = EmptyCoroutineContext, + block: FrpEffectScope.() -> Unit, +): Job = now.observe(context) { block() } /** * Launches [block] in a new coroutine, returning a [Job] bound to the coroutine. diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpNetwork.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpNetwork.kt index cec76886c06d..0679848c6c80 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpNetwork.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpNetwork.kt @@ -23,7 +23,6 @@ import com.android.systemui.kairos.internal.util.awaitCancellationAndThen import com.android.systemui.kairos.internal.util.childScope import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext -import kotlin.coroutines.coroutineContext import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job @@ -53,7 +52,7 @@ interface FrpNetwork { * If the network is cancelled while the caller of [transact] is suspended, then the call will * be cancelled. */ - @ExperimentalFrpApi suspend fun <R> transact(block: suspend FrpTransactionScope.() -> R): R + @ExperimentalFrpApi suspend fun <R> transact(block: FrpTransactionScope.() -> R): R /** * Activates [spec] in a transaction, suspending indefinitely. While suspended, all observers @@ -133,22 +132,36 @@ internal class LocalFrpNetwork( private val scope: CoroutineScope, private val endSignal: TFlow<Any>, ) : FrpNetwork { - override suspend fun <R> transact(block: suspend FrpTransactionScope.() -> R): R = + override suspend fun <R> transact(block: FrpTransactionScope.() -> R): R = network.transaction("FrpNetwork.transact") { runInTransactionScope { block() } }.await() override suspend fun activateSpec(spec: FrpSpec<*>) { + val stopEmitter = + CoalescingMutableTFlow( + name = "activateSpec", + coalesce = { _, _: Unit -> }, + network = network, + getInitialValue = {}, + ) val job = network .transaction("FrpNetwork.activateSpec") { val buildScope = BuildScopeImpl( - stateScope = StateScopeImpl(evalScope = this, endSignal = endSignal), + stateScope = + StateScopeImpl( + evalScope = this, + endSignal = mergeLeft(stopEmitter, endSignal), + ), coroutineScope = scope, ) buildScope.runInBuildScope { launchScope(spec) } } .await() - awaitCancellationAndThen { job.cancel() } + awaitCancellationAndThen { + stopEmitter.emit(Unit) + job.cancel() + } } override fun <In, Out> coalescingMutableTFlow( diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpScope.kt index ad6b2c8d04eb..92cb13f77d04 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpScope.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpScope.kt @@ -16,13 +16,8 @@ package com.android.systemui.kairos +import com.android.systemui.kairos.internal.CompletableLazy import kotlin.coroutines.RestrictsSuspension -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.suspendCancellableCoroutine /** Denotes [FrpScope] interfaces as [DSL markers][DslMarker]. */ @DslMarker annotation class FrpScopeMarker @@ -37,13 +32,7 @@ interface FrpScope { /** * Returns the value held by the [FrpDeferredValue], suspending until available if necessary. */ - @ExperimentalFrpApi - @OptIn(ExperimentalCoroutinesApi::class) - suspend fun <A> FrpDeferredValue<A>.get(): A = suspendCancellableCoroutine { k -> - unwrapped.invokeOnCompletion { ex -> - ex?.let { k.resumeWithException(ex) } ?: k.resume(unwrapped.getCompleted()) - } - } + @ExperimentalFrpApi fun <A> FrpDeferredValue<A>.get(): A = unwrapped.value } /** @@ -53,7 +42,7 @@ interface FrpScope { * @see FrpScope.get */ @ExperimentalFrpApi -class FrpDeferredValue<out A> internal constructor(internal val unwrapped: Deferred<A>) +class FrpDeferredValue<out A> internal constructor(internal val unwrapped: Lazy<A>) /** * Returns the value held by this [FrpDeferredValue], or throws [IllegalStateException] if it is not @@ -64,10 +53,8 @@ class FrpDeferredValue<out A> internal constructor(internal val unwrapped: Defer * * @see FrpScope.get */ -@ExperimentalFrpApi -@OptIn(ExperimentalCoroutinesApi::class) -fun <A> FrpDeferredValue<A>.getUnsafe(): A = unwrapped.getCompleted() +@ExperimentalFrpApi fun <A> FrpDeferredValue<A>.getUnsafe(): A = unwrapped.value /** Returns an already-available [FrpDeferredValue] containing [value]. */ @ExperimentalFrpApi -fun <A> deferredOf(value: A): FrpDeferredValue<A> = FrpDeferredValue(CompletableDeferred(value)) +fun <A> deferredOf(value: A): FrpDeferredValue<A> = FrpDeferredValue(CompletableLazy(value)) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpStateScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpStateScope.kt index 058fc1037e58..3de246300501 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpStateScope.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpStateScope.kt @@ -30,7 +30,7 @@ import com.android.systemui.kairos.util.partitionEithers import com.android.systemui.kairos.util.zipWith import kotlin.coroutines.RestrictsSuspension -typealias FrpStateful<R> = suspend FrpStateScope.() -> R +typealias FrpStateful<R> = FrpStateScope.() -> R /** * Returns a [FrpStateful] that, when [applied][FrpStateScope.applyStateful], invokes [block] with @@ -39,7 +39,7 @@ typealias FrpStateful<R> = suspend FrpStateScope.() -> R // TODO: caching story? should each Scope have a cache of applied FrpStateful instances? @ExperimentalFrpApi @Suppress("NOTHING_TO_INLINE") -inline fun <A> statefully(noinline block: suspend FrpStateScope.() -> A): FrpStateful<A> = block +inline fun <A> statefully(noinline block: FrpStateScope.() -> A): FrpStateful<A> = block /** * Operations that accumulate state within the FRP network. @@ -55,7 +55,7 @@ interface FrpStateScope : FrpTransactionScope { /** TODO */ @ExperimentalFrpApi // TODO: wish this could just be `deferred` but alas - fun <A> deferredStateScope(block: suspend FrpStateScope.() -> A): FrpDeferredValue<A> + fun <A> deferredStateScope(block: FrpStateScope.() -> A): FrpDeferredValue<A> /** * Returns a [TState] that holds onto the most recently emitted value from this [TFlow], or @@ -86,7 +86,8 @@ interface FrpStateScope : FrpTransactionScope { */ @ExperimentalFrpApi fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally( - initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>> + name: String? = null, + initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>, ): TFlow<Map<K, V>> /** @@ -108,7 +109,8 @@ interface FrpStateScope : FrpTransactionScope { */ @ExperimentalFrpApi fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly( - initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>> + initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>, + name: String? = null, ): TFlow<Map<K, V>> // TODO: everything below this comment can be made into extensions once we have context params @@ -132,8 +134,9 @@ interface FrpStateScope : FrpTransactionScope { */ @ExperimentalFrpApi fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally( - initialTFlows: Map<K, TFlow<V>> = emptyMap() - ): TFlow<Map<K, V>> = mergeIncrementally(deferredOf(initialTFlows)) + name: String? = null, + initialTFlows: Map<K, TFlow<V>> = emptyMap(), + ): TFlow<Map<K, V>> = mergeIncrementally(name, deferredOf(initialTFlows)) /** * Returns a [TFlow] that emits from a merged, incrementally-accumulated collection of [TFlow]s @@ -154,11 +157,12 @@ interface FrpStateScope : FrpTransactionScope { */ @ExperimentalFrpApi fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly( - initialTFlows: Map<K, TFlow<V>> = emptyMap() - ): TFlow<Map<K, V>> = mergeIncrementallyPromptly(deferredOf(initialTFlows)) + initialTFlows: Map<K, TFlow<V>> = emptyMap(), + name: String? = null, + ): TFlow<Map<K, V>> = mergeIncrementallyPromptly(deferredOf(initialTFlows), name) /** Applies the [FrpStateful] within this [FrpStateScope]. */ - @ExperimentalFrpApi suspend fun <A> FrpStateful<A>.applyStateful(): A = this() + @ExperimentalFrpApi fun <A> FrpStateful<A>.applyStateful(): A = this() /** * Applies the [FrpStateful] within this [FrpStateScope], returning the result as an @@ -198,7 +202,7 @@ interface FrpStateScope : FrpTransactionScope { * original [TFlow]. */ @ExperimentalFrpApi - fun <A, B> TFlow<A>.mapStateful(transform: suspend FrpStateScope.(A) -> B): TFlow<B> = + fun <A, B> TFlow<A>.mapStateful(transform: FrpStateScope.(A) -> B): TFlow<B> = mapPure { statefully { transform(it) } }.applyStatefuls() /** @@ -224,7 +228,7 @@ interface FrpStateScope : FrpTransactionScope { * invocation of [transform], state accumulation from previous invocation is stopped. */ @ExperimentalFrpApi - fun <A, B> TFlow<A>.mapLatestStateful(transform: suspend FrpStateScope.(A) -> B): TFlow<B> = + fun <A, B> TFlow<A>.mapLatestStateful(transform: FrpStateScope.(A) -> B): TFlow<B> = mapPure { statefully { transform(it) } }.applyLatestStateful() /** @@ -235,9 +239,8 @@ interface FrpStateScope : FrpTransactionScope { * invocation of [transform], state accumulation from previous invocation is stopped. */ @ExperimentalFrpApi - fun <A, B> TFlow<A>.flatMapLatestStateful( - transform: suspend FrpStateScope.(A) -> TFlow<B> - ): TFlow<B> = mapLatestStateful(transform).flatten() + fun <A, B> TFlow<A>.flatMapLatestStateful(transform: FrpStateScope.(A) -> TFlow<B>): TFlow<B> = + mapLatestStateful(transform).flatten() /** * Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the @@ -394,7 +397,7 @@ interface FrpStateScope : FrpTransactionScope { fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey( initialValues: FrpDeferredValue<Map<K, A>>, numKeys: Int? = null, - transform: suspend FrpStateScope.(A) -> B, + transform: FrpStateScope.(A) -> B, ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> = mapPure { patch -> patch.mapValues { (_, v) -> v.map { statefully { transform(it) } } } } .applyLatestStatefulForKey( @@ -419,7 +422,7 @@ interface FrpStateScope : FrpTransactionScope { fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey( initialValues: Map<K, A>, numKeys: Int? = null, - transform: suspend FrpStateScope.(A) -> B, + transform: FrpStateScope.(A) -> B, ): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> = mapLatestStatefulForKey(deferredOf(initialValues), numKeys, transform) @@ -436,7 +439,7 @@ interface FrpStateScope : FrpTransactionScope { @ExperimentalFrpApi fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey( numKeys: Int? = null, - transform: suspend FrpStateScope.(A) -> B, + transform: FrpStateScope.(A) -> B, ): TFlow<Map<K, Maybe<B>>> = mapLatestStatefulForKey(emptyMap(), numKeys, transform).first /** @@ -447,12 +450,12 @@ interface FrpStateScope : FrpTransactionScope { * even emitted from the result [TFlow]. */ @ExperimentalFrpApi - fun <A> TFlow<A>.nextOnly(): TFlow<A> = + fun <A> TFlow<A>.nextOnly(name: String? = null): TFlow<A> = if (this === emptyTFlow) { this } else { TFlowLoop<A>().also { - it.loopback = it.mapCheap { emptyTFlow }.hold(this@nextOnly).switch() + it.loopback = it.mapCheap { emptyTFlow }.hold(this@nextOnly).switch(name) } } @@ -501,7 +504,7 @@ interface FrpStateScope : FrpTransactionScope { * emitted that satisfies [predicate]. */ @ExperimentalFrpApi - fun <A> TFlow<A>.takeUntil(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> = + fun <A> TFlow<A>.takeUntil(predicate: FrpTransactionScope.(A) -> Boolean): TFlow<A> = takeUntil(filter(predicate)) /** @@ -515,7 +518,7 @@ interface FrpStateScope : FrpTransactionScope { @ExperimentalFrpApi fun <A, B> TFlow<A>.fold( initialValue: B, - transform: suspend FrpTransactionScope.(A, B) -> B, + transform: FrpTransactionScope.(A, B) -> B, ): TState<B> { lateinit var state: TState<B> return mapPure { a -> transform(a, state.sample()) }.hold(initialValue).also { state = it } @@ -532,7 +535,7 @@ interface FrpStateScope : FrpTransactionScope { @ExperimentalFrpApi fun <A, B> TFlow<A>.foldDeferred( initialValue: FrpDeferredValue<B>, - transform: suspend FrpTransactionScope.(A, B) -> B, + transform: FrpTransactionScope.(A, B) -> B, ): TState<B> { lateinit var state: TState<B> return mapPure { a -> transform(a, state.sample()) } @@ -657,7 +660,7 @@ interface FrpStateScope : FrpTransactionScope { * ``` */ @ExperimentalFrpApi - fun <A, B> TFlow<A>.mapIndexed(transform: suspend FrpTransactionScope.(Int, A) -> B): TFlow<B> { + fun <A, B> TFlow<A>.mapIndexed(transform: FrpTransactionScope.(Int, A) -> B): TFlow<B> { val index = fold(0) { _, i -> i + 1 } return sample(index) { a, idx -> transform(idx, a) } } @@ -679,7 +682,7 @@ interface FrpStateScope : FrpTransactionScope { @ExperimentalFrpApi fun <A, B, C> TFlow<A>.sample( other: TFlow<B>, - transform: suspend FrpTransactionScope.(A, B) -> C, + transform: FrpTransactionScope.(A, B) -> C, ): TFlow<C> { val state = other.mapCheap { just(it) }.hold(none) return sample(state) { a, b -> b.map { transform(a, it) } }.filterJust() @@ -700,7 +703,7 @@ interface FrpStateScope : FrpTransactionScope { * given function [transform]. */ @ExperimentalFrpApi - fun <A, B> TState<A>.map(transform: suspend FrpTransactionScope.(A) -> B): TState<B> = + fun <A, B> TState<A>.map(transform: FrpTransactionScope.(A) -> B): TState<B> = mapPure { transactionally { transform(it) } }.sampleTransactionals() /** @@ -713,7 +716,7 @@ interface FrpStateScope : FrpTransactionScope { fun <A, B, Z> combine( stateA: TState<A>, stateB: TState<B>, - transform: suspend FrpTransactionScope.(A, B) -> Z, + transform: FrpTransactionScope.(A, B) -> Z, ): TState<Z> = com.android.systemui.kairos .combine(stateA, stateB) { a, b -> transactionally { transform(a, b) } } @@ -723,6 +726,23 @@ interface FrpStateScope : FrpTransactionScope { * Returns a [TState] whose value is generated with [transform] by combining the current values * of each given [TState]. * + * @see TState.combineWithTransactionally + */ + @ExperimentalFrpApi + fun <A, B, C, Z> combine( + stateA: TState<A>, + stateB: TState<B>, + stateC: TState<C>, + transform: FrpTransactionScope.(A, B, C) -> Z, + ): TState<Z> = + com.android.systemui.kairos + .combine(stateA, stateB, stateC) { a, b, c -> transactionally { transform(a, b, c) } } + .sampleTransactionals() + + /** + * Returns a [TState] whose value is generated with [transform] by combining the current values + * of each given [TState]. + * * @see TState.combineWith */ @ExperimentalFrpApi @@ -731,7 +751,7 @@ interface FrpStateScope : FrpTransactionScope { stateB: TState<B>, stateC: TState<C>, stateD: TState<D>, - transform: suspend FrpTransactionScope.(A, B, C, D) -> Z, + transform: FrpTransactionScope.(A, B, C, D) -> Z, ): TState<Z> = com.android.systemui.kairos .combine(stateA, stateB, stateC, stateD) { a, b, c, d -> @@ -741,9 +761,8 @@ interface FrpStateScope : FrpTransactionScope { /** Returns a [TState] by applying [transform] to the value held by the original [TState]. */ @ExperimentalFrpApi - fun <A, B> TState<A>.flatMap( - transform: suspend FrpTransactionScope.(A) -> TState<B> - ): TState<B> = mapPure { transactionally { transform(it) } }.sampleTransactionals().flatten() + fun <A, B> TState<A>.flatMap(transform: FrpTransactionScope.(A) -> TState<B>): TState<B> = + mapPure { transactionally { transform(it) } }.sampleTransactionals().flatten() /** * Returns a [TState] whose value is generated with [transform] by combining the current values @@ -754,7 +773,7 @@ interface FrpStateScope : FrpTransactionScope { @ExperimentalFrpApi fun <A, Z> combine( vararg states: TState<A>, - transform: suspend FrpTransactionScope.(List<A>) -> Z, + transform: FrpTransactionScope.(List<A>) -> Z, ): TState<Z> = combinePure(*states).map(transform) /** @@ -765,7 +784,7 @@ interface FrpStateScope : FrpTransactionScope { */ @ExperimentalFrpApi fun <A, Z> Iterable<TState<A>>.combine( - transform: suspend FrpTransactionScope.(List<A>) -> Z + transform: FrpTransactionScope.(List<A>) -> Z ): TState<Z> = combinePure().map(transform) /** @@ -775,6 +794,6 @@ interface FrpStateScope : FrpTransactionScope { @ExperimentalFrpApi fun <A, B, C> TState<A>.combineWith( other: TState<B>, - transform: suspend FrpTransactionScope.(A, B) -> C, + transform: FrpTransactionScope.(A, B) -> C, ): TState<C> = combine(this, other, transform) } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpTransactionScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpTransactionScope.kt index a7ae1d9646b3..7d48b9853e1c 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpTransactionScope.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpTransactionScope.kt @@ -44,9 +44,7 @@ interface FrpTransactionScope : FrpScope { /** TODO */ @ExperimentalFrpApi - fun <A> deferredTransactionScope( - block: suspend FrpTransactionScope.() -> A - ): FrpDeferredValue<A> + fun <A> deferredTransactionScope(block: FrpTransactionScope.() -> A): FrpDeferredValue<A> /** A [TFlow] that emits once, within this transaction, and then never again. */ @ExperimentalFrpApi val now: TFlow<Unit> @@ -55,11 +53,11 @@ interface FrpTransactionScope : FrpScope { * Returns the current value held by this [TState]. Guaranteed to be consistent within the same * transaction. */ - @ExperimentalFrpApi suspend fun <A> TState<A>.sample(): A = sampleDeferred().get() + @ExperimentalFrpApi fun <A> TState<A>.sample(): A = sampleDeferred().get() /** * Returns the current value held by this [Transactional]. Guaranteed to be consistent within * the same transaction. */ - @ExperimentalFrpApi suspend fun <A> Transactional<A>.sample(): A = sampleDeferred().get() + @ExperimentalFrpApi fun <A> Transactional<A>.sample(): A = sampleDeferred().get() } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt index 362a890f44e2..96edc1043325 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt @@ -16,6 +16,7 @@ package com.android.systemui.kairos +import com.android.systemui.kairos.internal.CompletableLazy import com.android.systemui.kairos.internal.DemuxImpl import com.android.systemui.kairos.internal.Init import com.android.systemui.kairos.internal.InitScope @@ -47,7 +48,6 @@ import com.android.systemui.kairos.util.map import com.android.systemui.kairos.util.toMaybe import java.util.concurrent.atomic.AtomicReference import kotlin.reflect.KProperty -import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Job import kotlinx.coroutines.async @@ -73,17 +73,18 @@ sealed class TFlow<out A> { */ @ExperimentalFrpApi class TFlowLoop<A> : TFlow<A>() { - private val deferred = CompletableDeferred<TFlow<A>>() + private val deferred = CompletableLazy<TFlow<A>>() internal val init: Init<TFlowImpl<A>> = - init(name = null) { deferred.await().init.connect(evalScope = this) } + init(name = null) { deferred.value.init.connect(evalScope = this) } /** The [TFlow] this reference is referring to. */ @ExperimentalFrpApi var loopback: TFlow<A>? = null set(value) { value?.let { - check(deferred.complete(value)) { "TFlowLoop.loopback has already been set." } + check(!deferred.isInitialized()) { "TFlowLoop.loopback has already been set." } + deferred.setValue(value) field = value } } @@ -102,11 +103,11 @@ class TFlowLoop<A> : TFlow<A>() { /** TODO */ @ExperimentalFrpApi -fun <A> FrpDeferredValue<TFlow<A>>.defer(): TFlow<A> = deferInline { unwrapped.await() } +fun <A> FrpDeferredValue<TFlow<A>>.defer(): TFlow<A> = deferInline { unwrapped.value } /** TODO */ @ExperimentalFrpApi -fun <A> deferTFlow(block: suspend FrpScope.() -> TFlow<A>): TFlow<A> = deferInline { +fun <A> deferTFlow(block: FrpScope.() -> TFlow<A>): TFlow<A> = deferInline { NoScope.runInFrpScope(block) } @@ -122,7 +123,7 @@ val <A> TState<A>.stateChanges: TFlow<A> * @see mapNotNull */ @ExperimentalFrpApi -fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> = +fun <A, B> TFlow<A>.mapMaybe(transform: FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> = map(transform).filterJust() /** @@ -132,10 +133,9 @@ fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe * @see mapMaybe */ @ExperimentalFrpApi -fun <A, B> TFlow<A>.mapNotNull(transform: suspend FrpTransactionScope.(A) -> B?): TFlow<B> = - mapMaybe { - transform(it).toMaybe() - } +fun <A, B> TFlow<A>.mapNotNull(transform: FrpTransactionScope.(A) -> B?): TFlow<B> = mapMaybe { + transform(it).toMaybe() +} /** Returns a [TFlow] containing only values of the original [TFlow] that are not null. */ @ExperimentalFrpApi @@ -155,9 +155,11 @@ fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> = * [TFlow]. */ @ExperimentalFrpApi -fun <A, B> TFlow<A>.map(transform: suspend FrpTransactionScope.(A) -> B): TFlow<B> { +fun <A, B> TFlow<A>.map(transform: FrpTransactionScope.(A) -> B): TFlow<B> { val mapped: TFlowImpl<B> = - mapImpl({ init.connect(evalScope = this) }) { a -> runInTransactionScope { transform(a) } } + mapImpl({ init.connect(evalScope = this) }) { a, _ -> + runInTransactionScope { transform(a) } + } return TFlowInit(constInit(name = null, mapped.cached())) } @@ -168,11 +170,11 @@ fun <A, B> TFlow<A>.map(transform: suspend FrpTransactionScope.(A) -> B): TFlow< * @see map */ @ExperimentalFrpApi -fun <A, B> TFlow<A>.mapCheap(transform: suspend FrpTransactionScope.(A) -> B): TFlow<B> = +fun <A, B> TFlow<A>.mapCheap(transform: FrpTransactionScope.(A) -> B): TFlow<B> = TFlowInit( constInit( name = null, - mapImpl({ init.connect(evalScope = this) }) { a -> + mapImpl({ init.connect(evalScope = this) }) { a, _ -> runInTransactionScope { transform(a) } }, ) @@ -192,7 +194,7 @@ fun <A, B> TFlow<A>.mapCheap(transform: suspend FrpTransactionScope.(A) -> B): T * [FrpBuildScope.toSharedFlow] or [FrpBuildScope.observe]. */ @ExperimentalFrpApi -fun <A> TFlow<A>.onEach(action: suspend FrpTransactionScope.(A) -> Unit): TFlow<A> = map { +fun <A> TFlow<A>.onEach(action: FrpTransactionScope.(A) -> Unit): TFlow<A> = map { action(it) it } @@ -202,7 +204,7 @@ fun <A> TFlow<A>.onEach(action: suspend FrpTransactionScope.(A) -> Unit): TFlow< * [predicate]. */ @ExperimentalFrpApi -fun <A> TFlow<A>.filter(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> { +fun <A> TFlow<A>.filter(predicate: FrpTransactionScope.(A) -> Boolean): TFlow<A> { val pulse = filterImpl({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } } return TFlowInit(constInit(name = null, pulse)) @@ -236,10 +238,12 @@ fun <A, B> TFlow<Pair<A, B>>.unzip(): Pair<TFlow<A>, TFlow<B>> { @ExperimentalFrpApi fun <A> TFlow<A>.mergeWith( other: TFlow<A>, - transformCoincidence: suspend FrpTransactionScope.(A, A) -> A = { a, _ -> a }, + name: String? = null, + transformCoincidence: FrpTransactionScope.(A, A) -> A = { a, _ -> a }, ): TFlow<A> { val node = mergeNodes( + name = name, getPulse = { init.connect(evalScope = this) }, getOther = { other.init.connect(evalScope = this) }, ) { a, b -> @@ -355,7 +359,7 @@ fun <K, A> TFlow<Map<K, A>>.groupByKey(numKeys: Int? = null): GroupedTFlow<K, A> @ExperimentalFrpApi fun <K, A> TFlow<A>.groupBy( numKeys: Int? = null, - extractKey: suspend FrpTransactionScope.(A) -> K, + extractKey: FrpTransactionScope.(A) -> K, ): GroupedTFlow<K, A> = map { mapOf(extractKey(it) to it) }.groupByKey(numKeys) /** @@ -367,7 +371,7 @@ fun <K, A> TFlow<A>.groupBy( */ @ExperimentalFrpApi fun <A> TFlow<A>.partition( - predicate: suspend FrpTransactionScope.(A) -> Boolean + predicate: FrpTransactionScope.(A) -> Boolean ): Pair<TFlow<A>, TFlow<A>> { val grouped: GroupedTFlow<Boolean, A> = groupBy(numKeys = 2, extractKey = predicate) return Pair(grouped.eventsForKey(true), grouped.eventsForKey(false)) @@ -418,22 +422,22 @@ class GroupedTFlow<in K, out A> internal constructor(internal val impl: DemuxImp * that takes effect immediately, see [switchPromptly]. */ @ExperimentalFrpApi -fun <A> TState<TFlow<A>>.switch(): TFlow<A> = - TFlowInit( +fun <A> TState<TFlow<A>>.switch(name: String? = null): TFlow<A> { + val patches = + mapImpl({ init.connect(this).changes }) { newFlow, _ -> newFlow.init.connect(this) } + return TFlowInit( constInit( name = null, switchDeferredImplSingle( + name = name, getStorage = { init.connect(this).getCurrentWithEpoch(this).first.init.connect(this) }, - getPatches = { - mapImpl({ init.connect(this).changes }) { newFlow -> - newFlow.init.connect(this) - } - }, + getPatches = { patches }, ), ) ) +} /** * Returns a [TFlow] that switches to the [TFlow] contained within this [TState] whenever it @@ -444,22 +448,21 @@ fun <A> TState<TFlow<A>>.switch(): TFlow<A> = */ // TODO: parameter to handle coincidental emission from both old and new @ExperimentalFrpApi -fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> = - TFlowInit( +fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> { + val patches = + mapImpl({ init.connect(this).changes }) { newFlow, _ -> newFlow.init.connect(this) } + return TFlowInit( constInit( name = null, switchPromptImplSingle( getStorage = { init.connect(this).getCurrentWithEpoch(this).first.init.connect(this) }, - getPatches = { - mapImpl({ init.connect(this).changes }) { newFlow -> - newFlow.init.connect(this) - } - }, + getPatches = { patches }, ), ) ) +} /** * A mutable [TFlow] that provides the ability to [emit] values to the flow, handling backpressure @@ -559,5 +562,5 @@ internal val <A> TFlow<A>.init: Init<TFlowImpl<A>> is MutableTFlow -> constInit(name, impl.activated()) } -private inline fun <A> deferInline(crossinline block: suspend InitScope.() -> TFlow<A>): TFlow<A> = +private inline fun <A> deferInline(crossinline block: InitScope.() -> TFlow<A>): TFlow<A> = TFlowInit(init(name = null) { block().init.connect(evalScope = this) }) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt index 66aa2a950fcf..d84a6f2ddb34 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt @@ -16,6 +16,7 @@ package com.android.systemui.kairos +import com.android.systemui.kairos.internal.CompletableLazy import com.android.systemui.kairos.internal.DerivedMapCheap import com.android.systemui.kairos.internal.Init import com.android.systemui.kairos.internal.InitScope @@ -39,10 +40,6 @@ import com.android.systemui.kairos.internal.util.hashString import com.android.systemui.kairos.internal.zipStateMap import com.android.systemui.kairos.internal.zipStates import kotlin.reflect.KProperty -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.async -import kotlinx.coroutines.coroutineScope /** * A time-varying value with discrete changes. Essentially, a combination of a [Transactional] that @@ -63,11 +60,11 @@ fun <A> tStateOf(value: A): TState<A> { /** TODO */ @ExperimentalFrpApi -fun <A> FrpDeferredValue<TState<A>>.defer(): TState<A> = deferInline { unwrapped.await() } +fun <A> FrpDeferredValue<TState<A>>.defer(): TState<A> = deferInline { unwrapped.value } /** TODO */ @ExperimentalFrpApi -fun <A> deferTState(block: suspend FrpScope.() -> TState<A>): TState<A> = deferInline { +fun <A> deferTState(block: FrpScope.() -> TState<A>): TState<A> = deferInline { NoScope.runInFrpScope(block) } @@ -76,7 +73,7 @@ fun <A> deferTState(block: suspend FrpScope.() -> TState<A>): TState<A> = deferI * original [TState]. */ @ExperimentalFrpApi -fun <A, B> TState<A>.map(transform: suspend FrpScope.(A) -> B): TState<B> { +fun <A, B> TState<A>.map(transform: FrpScope.(A) -> B): TState<B> { val operatorName = "map" val name = operatorName return TStateInit( @@ -98,7 +95,7 @@ fun <A, B> TState<A>.map(transform: suspend FrpScope.(A) -> B): TState<B> { * observable change to the returned [TState]. */ @ExperimentalFrpApi -fun <A, B> TState<A>.mapCheapUnsafe(transform: suspend FrpScope.(A) -> B): TState<B> { +fun <A, B> TState<A>.mapCheapUnsafe(transform: FrpScope.(A) -> B): TState<B> { val operatorName = "map" val name = operatorName return TStateInit( @@ -115,10 +112,8 @@ fun <A, B> TState<A>.mapCheapUnsafe(transform: suspend FrpScope.(A) -> B): TStat * the given function [transform]. */ @ExperimentalFrpApi -fun <A, B, C> TState<A>.combineWith( - other: TState<B>, - transform: suspend FrpScope.(A, B) -> C, -): TState<C> = combine(this, other, transform) +fun <A, B, C> TState<A>.combineWith(other: TState<B>, transform: FrpScope.(A, B) -> C): TState<C> = + combine(this, other, transform) /** * Splits a [TState] of pairs into a pair of [TFlows][TState], where each returned [TState] holds @@ -181,7 +176,7 @@ fun <K, A> Map<K, TState<A>>.combine(): TState<Map<K, A>> { * @see TState.combineWith */ @ExperimentalFrpApi -fun <A, B> Iterable<TState<A>>.combine(transform: suspend FrpScope.(List<A>) -> B): TState<B> = +fun <A, B> Iterable<TState<A>>.combine(transform: FrpScope.(List<A>) -> B): TState<B> = combine().map(transform) /** @@ -199,10 +194,8 @@ fun <A> combine(vararg states: TState<A>): TState<List<A>> = states.asIterable() * @see TState.combineWith */ @ExperimentalFrpApi -fun <A, B> combine( - vararg states: TState<A>, - transform: suspend FrpScope.(List<A>) -> B, -): TState<B> = states.asIterable().combine(transform) +fun <A, B> combine(vararg states: TState<A>, transform: FrpScope.(List<A>) -> B): TState<B> = + states.asIterable().combine(transform) /** * Returns a [TState] whose value is generated with [transform] by combining the current values of @@ -214,22 +207,16 @@ fun <A, B> combine( fun <A, B, Z> combine( stateA: TState<A>, stateB: TState<B>, - transform: suspend FrpScope.(A, B) -> Z, + transform: FrpScope.(A, B) -> Z, ): TState<Z> { val operatorName = "combine" val name = operatorName return TStateInit( init(name) { - coroutineScope { - val dl1: Deferred<TStateImpl<A>> = async { - stateA.init.connect(evalScope = this@init) - } - val dl2: Deferred<TStateImpl<B>> = async { - stateB.init.connect(evalScope = this@init) - } - zipStates(name, operatorName, dl1.await(), dl2.await()) { a, b -> - NoScope.runInFrpScope { transform(a, b) } - } + val dl1 = stateA.init.connect(evalScope = this@init) + val dl2 = stateB.init.connect(evalScope = this@init) + zipStates(name, operatorName, dl1, dl2) { a, b -> + NoScope.runInFrpScope { transform(a, b) } } } ) @@ -246,25 +233,17 @@ fun <A, B, C, Z> combine( stateA: TState<A>, stateB: TState<B>, stateC: TState<C>, - transform: suspend FrpScope.(A, B, C) -> Z, + transform: FrpScope.(A, B, C) -> Z, ): TState<Z> { val operatorName = "combine" val name = operatorName return TStateInit( init(name) { - coroutineScope { - val dl1: Deferred<TStateImpl<A>> = async { - stateA.init.connect(evalScope = this@init) - } - val dl2: Deferred<TStateImpl<B>> = async { - stateB.init.connect(evalScope = this@init) - } - val dl3: Deferred<TStateImpl<C>> = async { - stateC.init.connect(evalScope = this@init) - } - zipStates(name, operatorName, dl1.await(), dl2.await(), dl3.await()) { a, b, c -> - NoScope.runInFrpScope { transform(a, b, c) } - } + val dl1 = stateA.init.connect(evalScope = this@init) + val dl2 = stateB.init.connect(evalScope = this@init) + val dl3 = stateC.init.connect(evalScope = this@init) + zipStates(name, operatorName, dl1, dl2, dl3) { a, b, c -> + NoScope.runInFrpScope { transform(a, b, c) } } } ) @@ -282,32 +261,18 @@ fun <A, B, C, D, Z> combine( stateB: TState<B>, stateC: TState<C>, stateD: TState<D>, - transform: suspend FrpScope.(A, B, C, D) -> Z, + transform: FrpScope.(A, B, C, D) -> Z, ): TState<Z> { val operatorName = "combine" val name = operatorName return TStateInit( init(name) { - coroutineScope { - val dl1: Deferred<TStateImpl<A>> = async { - stateA.init.connect(evalScope = this@init) - } - val dl2: Deferred<TStateImpl<B>> = async { - stateB.init.connect(evalScope = this@init) - } - val dl3: Deferred<TStateImpl<C>> = async { - stateC.init.connect(evalScope = this@init) - } - val dl4: Deferred<TStateImpl<D>> = async { - stateD.init.connect(evalScope = this@init) - } - zipStates(name, operatorName, dl1.await(), dl2.await(), dl3.await(), dl4.await()) { - a, - b, - c, - d -> - NoScope.runInFrpScope { transform(a, b, c, d) } - } + val dl1 = stateA.init.connect(evalScope = this@init) + val dl2 = stateB.init.connect(evalScope = this@init) + val dl3 = stateC.init.connect(evalScope = this@init) + val dl4 = stateD.init.connect(evalScope = this@init) + zipStates(name, operatorName, dl1, dl2, dl3, dl4) { a, b, c, d -> + NoScope.runInFrpScope { transform(a, b, c, d) } } } ) @@ -326,39 +291,19 @@ fun <A, B, C, D, E, Z> combine( stateC: TState<C>, stateD: TState<D>, stateE: TState<E>, - transform: suspend FrpScope.(A, B, C, D, E) -> Z, + transform: FrpScope.(A, B, C, D, E) -> Z, ): TState<Z> { val operatorName = "combine" val name = operatorName return TStateInit( init(name) { - coroutineScope { - val dl1: Deferred<TStateImpl<A>> = async { - stateA.init.connect(evalScope = this@init) - } - val dl2: Deferred<TStateImpl<B>> = async { - stateB.init.connect(evalScope = this@init) - } - val dl3: Deferred<TStateImpl<C>> = async { - stateC.init.connect(evalScope = this@init) - } - val dl4: Deferred<TStateImpl<D>> = async { - stateD.init.connect(evalScope = this@init) - } - val dl5: Deferred<TStateImpl<E>> = async { - stateE.init.connect(evalScope = this@init) - } - zipStates( - name, - operatorName, - dl1.await(), - dl2.await(), - dl3.await(), - dl4.await(), - dl5.await(), - ) { a, b, c, d, e -> - NoScope.runInFrpScope { transform(a, b, c, d, e) } - } + val dl1 = stateA.init.connect(evalScope = this@init) + val dl2 = stateB.init.connect(evalScope = this@init) + val dl3 = stateC.init.connect(evalScope = this@init) + val dl4 = stateD.init.connect(evalScope = this@init) + val dl5 = stateE.init.connect(evalScope = this@init) + zipStates(name, operatorName, dl1, dl2, dl3, dl4, dl5) { a, b, c, d, e -> + NoScope.runInFrpScope { transform(a, b, c, d, e) } } } ) @@ -366,7 +311,7 @@ fun <A, B, C, D, E, Z> combine( /** Returns a [TState] by applying [transform] to the value held by the original [TState]. */ @ExperimentalFrpApi -fun <A, B> TState<A>.flatMap(transform: suspend FrpScope.(A) -> TState<B>): TState<B> { +fun <A, B> TState<A>.flatMap(transform: FrpScope.(A) -> TState<B>): TState<B> { val operatorName = "flatMap" val name = operatorName return TStateInit( @@ -453,10 +398,10 @@ internal constructor( /** TODO */ @ExperimentalFrpApi -class MutableTState<T> -internal constructor(internal val network: Network, initialValue: Deferred<T>) : TState<T>() { +class MutableTState<T> internal constructor(internal val network: Network, initialValue: Lazy<T>) : + TState<T>() { - private val input: CoalescingMutableTFlow<Deferred<T>, Deferred<T>?> = + private val input: CoalescingMutableTFlow<Lazy<T>, Lazy<T>?> = CoalescingMutableTFlow( name = null, coalesce = { _, new -> new }, @@ -469,8 +414,9 @@ internal constructor(internal val network: Network, initialValue: Deferred<T>) : val name = null val operatorName = "MutableTState" lateinit var state: TStateSource<T> + val mapImpl = mapImpl(upstream = { changes.activated() }) { it, _ -> it!!.value } val calm: TFlowImpl<T> = - filterImpl({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new -> + filterImpl({ mapImpl }) { new -> new != state.getCurrentWithEpoch(evalScope = this).first } .cached() @@ -489,7 +435,7 @@ internal constructor(internal val network: Network, initialValue: Deferred<T>) : } /** TODO */ - @ExperimentalFrpApi fun setValue(value: T) = input.emit(CompletableDeferred(value)) + @ExperimentalFrpApi fun setValue(value: T) = input.emit(CompletableLazy(value)) @ExperimentalFrpApi fun setValueDeferred(value: FrpDeferredValue<T>) = input.emit(value.unwrapped) @@ -501,17 +447,18 @@ class TStateLoop<A> : TState<A>() { private val name: String? = null - private val deferred = CompletableDeferred<TState<A>>() + private val deferred = CompletableLazy<TState<A>>() internal val init: Init<TStateImpl<A>> = - init(name) { deferred.await().init.connect(evalScope = this) } + init(name) { deferred.value.init.connect(evalScope = this) } /** The [TState] this [TStateLoop] will forward to. */ @ExperimentalFrpApi var loopback: TState<A>? = null set(value) { value?.let { - check(deferred.complete(value)) { "TStateLoop.loopback has already been set." } + check(!deferred.isInitialized()) { "TStateLoop.loopback has already been set." } + deferred.setValue(value) field = value } } @@ -540,6 +487,5 @@ internal val <A> TState<A>.init: Init<TStateImpl<A>> is MutableTState -> tState.init } -private inline fun <A> deferInline( - crossinline block: suspend InitScope.() -> TState<A> -): TState<A> = TStateInit(init(name = null) { block().init.connect(evalScope = this) }) +private inline fun <A> deferInline(crossinline block: InitScope.() -> TState<A>): TState<A> = + TStateInit(init(name = null) { block().init.connect(evalScope = this) }) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Transactional.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Transactional.kt index 6b1c8c8fc3e5..e7a5b1bbd105 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Transactional.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Transactional.kt @@ -16,13 +16,13 @@ package com.android.systemui.kairos +import com.android.systemui.kairos.internal.CompletableLazy import com.android.systemui.kairos.internal.InitScope import com.android.systemui.kairos.internal.NoScope import com.android.systemui.kairos.internal.TransactionalImpl import com.android.systemui.kairos.internal.init import com.android.systemui.kairos.internal.transactionalImpl import com.android.systemui.kairos.internal.util.hashString -import kotlinx.coroutines.CompletableDeferred /** * A time-varying value. A [Transactional] encapsulates the idea of some continuous state; each time @@ -40,12 +40,12 @@ class Transactional<out A> internal constructor(internal val impl: TState<Transa /** A constant [Transactional] that produces [value] whenever it is sampled. */ @ExperimentalFrpApi fun <A> transactionalOf(value: A): Transactional<A> = - Transactional(tStateOf(TransactionalImpl.Const(CompletableDeferred(value)))) + Transactional(tStateOf(TransactionalImpl.Const(CompletableLazy(value)))) /** TODO */ @ExperimentalFrpApi fun <A> FrpDeferredValue<Transactional<A>>.defer(): Transactional<A> = deferInline { - unwrapped.await() + unwrapped.value } /** TODO */ @@ -53,13 +53,12 @@ fun <A> FrpDeferredValue<Transactional<A>>.defer(): Transactional<A> = deferInli /** TODO */ @ExperimentalFrpApi -fun <A> deferTransactional(block: suspend FrpScope.() -> Transactional<A>): Transactional<A> = - deferInline { - NoScope.runInFrpScope(block) - } +fun <A> deferTransactional(block: FrpScope.() -> Transactional<A>): Transactional<A> = deferInline { + NoScope.runInFrpScope(block) +} private inline fun <A> deferInline( - crossinline block: suspend InitScope.() -> Transactional<A> + crossinline block: InitScope.() -> Transactional<A> ): Transactional<A> = Transactional(TStateInit(init(name = null) { block().impl.init.connect(evalScope = this) })) @@ -68,5 +67,5 @@ private inline fun <A> deferInline( * transaction; any subsequent sampling within the same transaction will receive a cached value. */ @ExperimentalFrpApi -fun <A> transactionally(block: suspend FrpTransactionScope.() -> A): Transactional<A> = +fun <A> transactionally(block: FrpTransactionScope.() -> A): Transactional<A> = Transactional(tStateOf(transactionalImpl { runInTransactionScope(block) })) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt index 7e6384925f38..14488a3131c7 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt @@ -34,7 +34,7 @@ import com.android.systemui.kairos.TFlowInit import com.android.systemui.kairos.groupByKey import com.android.systemui.kairos.init import com.android.systemui.kairos.internal.util.childScope -import com.android.systemui.kairos.internal.util.mapValuesParallel +import com.android.systemui.kairos.internal.util.launchImmediate import com.android.systemui.kairos.launchEffect import com.android.systemui.kairos.mergeLeft import com.android.systemui.kairos.util.Just @@ -43,17 +43,12 @@ import com.android.systemui.kairos.util.None import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.map import java.util.concurrent.atomic.AtomicReference -import kotlin.coroutines.Continuation import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext -import kotlin.coroutines.startCoroutine -import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CompletableJob import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Deferred import kotlinx.coroutines.Job import kotlinx.coroutines.cancel -import kotlinx.coroutines.completeWith import kotlinx.coroutines.job internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope: CoroutineScope) : @@ -64,45 +59,43 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope override val frpScope: FrpBuildScope = FrpBuildScopeImpl() - override suspend fun <R> runInBuildScope(block: suspend FrpBuildScope.() -> R): R { - val complete = CompletableDeferred<R>(parent = coroutineContext.job) - block.startCoroutine( - frpScope, - object : Continuation<R> { - override val context: CoroutineContext - get() = EmptyCoroutineContext - - override fun resumeWith(result: Result<R>) { - complete.completeWith(result) - } - }, - ) - return complete.await() - } + override fun <R> runInBuildScope(block: FrpBuildScope.() -> R): R = frpScope.block() private fun <A, T : TFlow<A>, S> buildTFlow( + name: String? = null, constructFlow: (InputNode<A>) -> Pair<T, S>, builder: suspend S.() -> Unit, ): TFlow<A> { var job: Job? = null - val stopEmitter = newStopEmitter("buildTFlow") + val stopEmitter = newStopEmitter("buildTFlow[$name]") // Create a child scope that will be kept alive beyond the end of this transaction. val childScope = coroutineScope.childScope() lateinit var emitter: Pair<T, S> val inputNode = InputNode<A>( activate = { - check(job == null) { "already activated" } + // It's possible that activation occurs after all effects have been run, due + // to a MuxDeferred switch-in. For this reason, we need to activate in a new + // transaction. + check(job == null) { "[$name] already activated" } job = - reenterBuildScope(this@BuildScopeImpl, childScope).runInBuildScope { - launchEffect { - builder(emitter.second) - stopEmitter.emit(Unit) - } + childScope.launchImmediate { + network + .transaction("buildTFlow") { + reenterBuildScope(this@BuildScopeImpl, childScope) + .runInBuildScope { + launchEffect { + builder(emitter.second) + stopEmitter.emit(Unit) + } + } + } + .await() + .join() } }, deactivate = { - checkNotNull(job) { "already deactivated" }.cancel() + checkNotNull(job) { "[$name] already deactivated" }.cancel() job = null }, ) @@ -110,8 +103,12 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope return with(frpScope) { emitter.first.takeUntil(mergeLeft(stopEmitter, endSignal)) } } - private fun <T> tFlowInternal(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> = + private fun <T> tFlowInternal( + name: String?, + builder: suspend FrpProducerScope<T>.() -> Unit, + ): TFlow<T> = buildTFlow( + name, constructFlow = { inputNode -> val flow = MutableTFlow(network, inputNode) flow to @@ -148,16 +145,16 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope return FrpDeferredValue(deferAsync { childScope.runInBuildScope(block) }) to childScope.job } - private fun <R> deferredInternal(block: suspend FrpBuildScope.() -> R): FrpDeferredValue<R> = + private fun <R> deferredInternal(block: FrpBuildScope.() -> R): FrpDeferredValue<R> = FrpDeferredValue(deferAsync { runInBuildScope(block) }) - private fun deferredActionInternal(block: suspend FrpBuildScope.() -> Unit) { + private fun deferredActionInternal(block: FrpBuildScope.() -> Unit) { deferAction { runInBuildScope(block) } } private fun <A> TFlow<A>.observeEffectInternal( context: CoroutineContext, - block: suspend FrpEffectScope.(A) -> Unit, + block: FrpEffectScope.(A) -> Unit, ): Job { val subRef = AtomicReference<Maybe<Output<A>>>(null) val childScope = coroutineScope.childScope() @@ -181,25 +178,13 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope onEmit = { output -> if (subRef.get() is Just) { // Not cancelled, safe to emit - val coroutine: suspend FrpEffectScope.() -> Unit = { block(output) } - val complete = CompletableDeferred<Unit>(parent = coroutineContext.job) - coroutine.startCoroutine( + val scope = object : FrpEffectScope, FrpTransactionScope by frpScope { override val frpCoroutineScope: CoroutineScope = childScope override val frpNetwork: FrpNetwork = LocalFrpNetwork(network, childScope, endSignal) - }, - completion = - object : Continuation<Unit> { - override val context: CoroutineContext - get() = EmptyCoroutineContext - - override fun resumeWith(result: Result<Unit>) { - complete.completeWith(result) - } - }, - ) - complete.await() + } + scope.block(output) } }, ) @@ -213,21 +198,19 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope // Job's already been cancelled, schedule deactivation scheduleDeactivation(outputNode) } else if (needsEval) { - outputNode.schedule(evalScope = stateScope.evalScope) + outputNode.schedule(0, evalScope = stateScope.evalScope) } } ?: run { childScope.cancel() } } return childScope.coroutineContext.job } - private fun <A, B> TFlow<A>.mapBuildInternal( - transform: suspend FrpBuildScope.(A) -> B - ): TFlow<B> { + private fun <A, B> TFlow<A>.mapBuildInternal(transform: FrpBuildScope.(A) -> B): TFlow<B> { val childScope = coroutineScope.childScope() return TFlowInit( constInit( "mapBuild", - mapImpl({ init.connect(evalScope = this) }) { spec -> + mapImpl({ init.connect(evalScope = this) }) { spec, _ -> reenterBuildScope(outerScope = this@BuildScopeImpl, childScope) .runInBuildScope { transform(spec) } } @@ -241,9 +224,9 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope numKeys: Int?, ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> { val eventsByKey: GroupedTFlow<K, Maybe<FrpSpec<A>>> = groupByKey(numKeys) - val initOut: Deferred<Map<K, B>> = deferAsync { - init.unwrapped.await().mapValuesParallel { (k, spec) -> - val newEnd = with(frpScope) { eventsByKey[k].skipNext() } + val initOut: Lazy<Map<K, B>> = deferAsync { + init.unwrapped.value.mapValues { (k, spec) -> + val newEnd = eventsByKey[k] val newScope = childBuildScope(newEnd) newScope.runInBuildScope(spec) } @@ -251,9 +234,10 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope val childScope = coroutineScope.childScope() val changesNode: TFlowImpl<Map<K, Maybe<A>>> = mapImpl(upstream = { this@applyLatestForKeyInternal.init.connect(evalScope = this) }) { - upstreamMap -> + upstreamMap, + _ -> reenterBuildScope(this@BuildScopeImpl, childScope).run { - upstreamMap.mapValuesParallel { (k: K, ma: Maybe<FrpSpec<A>>) -> + upstreamMap.mapValues { (k: K, ma: Maybe<FrpSpec<A>>) -> ma.map { spec -> val newEnd = with(frpScope) { eventsByKey[k].skipNext() } val newScope = childBuildScope(newEnd) @@ -277,7 +261,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope getInitialValue = {}, ) - private suspend fun childBuildScope(newEnd: TFlow<Any>): BuildScopeImpl { + private fun childBuildScope(newEnd: TFlow<Any>): BuildScopeImpl { val newCoroutineScope: CoroutineScope = coroutineScope.childScope() return BuildScopeImpl( stateScope = stateScope.childStateScope(newEnd), @@ -292,7 +276,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope (newCoroutineScope.coroutineContext.job as CompletableJob).complete() } ) - runInBuildScope { endSignal.nextOnly().observe { newCoroutineScope.cancel() } } + runInBuildScope { endSignalOnce.observe { newCoroutineScope.cancel() } } } } @@ -318,8 +302,14 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope private inner class FrpBuildScopeImpl : FrpBuildScope, FrpStateScope by stateScope.frpScope { - override fun <T> tFlow(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> = - tFlowInternal(builder) + override val frpNetwork: FrpNetwork by lazy { + LocalFrpNetwork(network, coroutineScope, endSignal) + } + + override fun <T> tFlow( + name: String?, + builder: suspend FrpProducerScope<T>.() -> Unit, + ): TFlow<T> = tFlowInternal(name, builder) override fun <In, Out> coalescingTFlow( getInitialValue: () -> Out, @@ -330,19 +320,18 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope override fun <A> asyncScope(block: FrpSpec<A>): Pair<FrpDeferredValue<A>, Job> = asyncScopeInternal(block) - override fun <R> deferredBuildScope( - block: suspend FrpBuildScope.() -> R - ): FrpDeferredValue<R> = deferredInternal(block) + override fun <R> deferredBuildScope(block: FrpBuildScope.() -> R): FrpDeferredValue<R> = + deferredInternal(block) - override fun deferredBuildScopeAction(block: suspend FrpBuildScope.() -> Unit) = + override fun deferredBuildScopeAction(block: FrpBuildScope.() -> Unit) = deferredActionInternal(block) override fun <A> TFlow<A>.observe( coroutineContext: CoroutineContext, - block: suspend FrpEffectScope.(A) -> Unit, + block: FrpEffectScope.(A) -> Unit, ): Job = observeEffectInternal(coroutineContext, block) - override fun <A, B> TFlow<A>.mapBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B> = + override fun <A, B> TFlow<A>.mapBuild(transform: FrpBuildScope.(A) -> B): TFlow<B> = mapBuildInternal(transform) override fun <K, A, B> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestSpecForKey( diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt index f65307c6106f..8a66f9a0d40d 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt @@ -16,33 +16,58 @@ package com.android.systemui.kairos.internal -import com.android.systemui.kairos.internal.util.asyncImmediate -import com.android.systemui.kairos.internal.util.launchImmediate -import kotlinx.coroutines.CoroutineName -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.Job -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.isActive - -internal typealias DeferScope = CoroutineScope - -internal inline fun DeferScope.deferAction( - start: CoroutineStart = CoroutineStart.UNDISPATCHED, - crossinline block: suspend () -> Unit, -): Job { - check(isActive) { "Cannot perform deferral, scope already closed." } - return launchImmediate(start, CoroutineName("deferAction")) { block() } +internal interface DeferScope { + fun deferAction(block: () -> Unit) + + fun <R> deferAsync(block: () -> R): Lazy<R> } -internal inline fun <R> DeferScope.deferAsync( - start: CoroutineStart = CoroutineStart.UNDISPATCHED, - crossinline block: suspend () -> R, -): Deferred<R> { - check(isActive) { "Cannot perform deferral, scope already closed." } - return asyncImmediate(start, CoroutineName("deferAsync")) { block() } +internal inline fun <A> deferScope(block: DeferScope.() -> A): A { + val scope = + object : DeferScope { + val deferrals = ArrayDeque<() -> Unit>() // TODO: store lazies instead? + + fun drainDeferrals() { + while (deferrals.isNotEmpty()) { + deferrals.removeFirst().invoke() + } + } + + override fun deferAction(block: () -> Unit) { + deferrals.add(block) + } + + override fun <R> deferAsync(block: () -> R): Lazy<R> = + lazy(block).also { deferrals.add { it.value } } + } + return scope.block().also { scope.drainDeferrals() } } -internal suspend inline fun <A> deferScope(noinline block: suspend DeferScope.() -> A): A = - coroutineScope(block) +internal object NoValue + +internal class CompletableLazy<T> : Lazy<T> { + + private var _value: Any? + + constructor() { + _value = NoValue + } + + constructor(init: T) { + _value = init + } + + fun setValue(value: T) { + check(_value === NoValue) { "CompletableLazy value already set" } + _value = value + } + + override val value: T + get() { + check(_value !== NoValue) { "CompletableLazy accessed before initialized" } + @Suppress("UNCHECKED_CAST") + return _value as T + } + + override fun isInitialized(): Boolean = _value !== NoValue +} diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt index 5f652525f036..b71a245c71a2 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt @@ -21,10 +21,8 @@ import com.android.systemui.kairos.internal.store.MapHolder import com.android.systemui.kairos.internal.store.MapK import com.android.systemui.kairos.internal.store.MutableMapK import com.android.systemui.kairos.internal.util.hashString -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch +import com.android.systemui.kairos.internal.util.logDuration import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock internal class DemuxNode<W, K, A>( private val branchNodeByKey: MutableMapK<W, K, DemuxNode<W, K, A>.BranchNode>, @@ -34,156 +32,111 @@ internal class DemuxNode<W, K, A>( val schedulable = Schedulable.N(this) - inline val mutex - get() = lifecycle.mutex - lateinit var upstreamConnection: NodeConnection<MapK<W, K, A>> @Volatile private var epoch: Long = Long.MIN_VALUE - suspend fun hasCurrentValueLocked(evalScope: EvalScope, key: K): Boolean = - evalScope.epoch == epoch && upstreamConnection.getPushEvent(evalScope).contains(key) + fun hasCurrentValueLocked(logIndent: Int, evalScope: EvalScope, key: K): Boolean = + evalScope.epoch == epoch && + upstreamConnection.getPushEvent(logIndent, evalScope).contains(key) - suspend fun hasCurrentValue(evalScope: EvalScope, key: K): Boolean = - mutex.withLock { hasCurrentValueLocked(evalScope, key) } + fun hasCurrentValue(logIndent: Int, evalScope: EvalScope, key: K): Boolean = + hasCurrentValueLocked(logIndent, evalScope, key) fun getAndMaybeAddDownstream(key: K): BranchNode = branchNodeByKey.getOrPut(key) { BranchNode(key) } - override suspend fun schedule(evalScope: EvalScope) = coroutineScope { - val upstreamResult = upstreamConnection.getPushEvent(evalScope) - mutex.withLock { + override fun schedule(logIndent: Int, evalScope: EvalScope) = + logDuration(logIndent, "DemuxNode.schedule") { + val upstreamResult = + logDuration("upstream.getPushEvent") { + upstreamConnection.getPushEvent(currentLogIndent, evalScope) + } updateEpoch(evalScope) for ((key, _) in upstreamResult) { - if (key !in branchNodeByKey) continue + if (!branchNodeByKey.contains(key)) continue val branch = branchNodeByKey.getValue(key) - // TODO: launchImmediate? - launch { branch.schedule(evalScope) } + branch.schedule(currentLogIndent, evalScope) } } - } - override suspend fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) { - coroutineScope { - mutex.withLock { - for ((_, branchNode) in branchNodeByKey) { - branchNode.downstreamSet.adjustDirectUpstream( - coroutineScope = this, - scheduler, - oldDepth, - newDepth, - ) - } - } + override fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) { + for ((_, branchNode) in branchNodeByKey) { + branchNode.downstreamSet.adjustDirectUpstream(scheduler, oldDepth, newDepth) } } - override suspend fun moveIndirectUpstreamToDirect( + override fun moveIndirectUpstreamToDirect( scheduler: Scheduler, oldIndirectDepth: Int, oldIndirectSet: Set<MuxDeferredNode<*, *, *>>, newDirectDepth: Int, ) { - coroutineScope { - mutex.withLock { - for ((_, branchNode) in branchNodeByKey) { - branchNode.downstreamSet.moveIndirectUpstreamToDirect( - coroutineScope = this, - scheduler, - oldIndirectDepth, - oldIndirectSet, - newDirectDepth, - ) - } - } + for ((_, branchNode) in branchNodeByKey) { + branchNode.downstreamSet.moveIndirectUpstreamToDirect( + scheduler, + oldIndirectDepth, + oldIndirectSet, + newDirectDepth, + ) } } - override suspend fun adjustIndirectUpstream( + override fun adjustIndirectUpstream( scheduler: Scheduler, oldDepth: Int, newDepth: Int, removals: Set<MuxDeferredNode<*, *, *>>, additions: Set<MuxDeferredNode<*, *, *>>, ) { - coroutineScope { - mutex.withLock { - for ((_, branchNode) in branchNodeByKey) { - branchNode.downstreamSet.adjustIndirectUpstream( - coroutineScope = this, - scheduler, - oldDepth, - newDepth, - removals, - additions, - ) - } - } + for ((_, branchNode) in branchNodeByKey) { + branchNode.downstreamSet.adjustIndirectUpstream( + scheduler, + oldDepth, + newDepth, + removals, + additions, + ) } } - override suspend fun moveDirectUpstreamToIndirect( + override fun moveDirectUpstreamToIndirect( scheduler: Scheduler, oldDirectDepth: Int, newIndirectDepth: Int, newIndirectSet: Set<MuxDeferredNode<*, *, *>>, ) { - coroutineScope { - mutex.withLock { - for ((_, branchNode) in branchNodeByKey) { - branchNode.downstreamSet.moveDirectUpstreamToIndirect( - coroutineScope = this, - scheduler, - oldDirectDepth, - newIndirectDepth, - newIndirectSet, - ) - } - } + for ((_, branchNode) in branchNodeByKey) { + branchNode.downstreamSet.moveDirectUpstreamToIndirect( + scheduler, + oldDirectDepth, + newIndirectDepth, + newIndirectSet, + ) } } - override suspend fun removeIndirectUpstream( + override fun removeIndirectUpstream( scheduler: Scheduler, depth: Int, indirectSet: Set<MuxDeferredNode<*, *, *>>, ) { - coroutineScope { - mutex.withLock { - lifecycle.lifecycleState = DemuxLifecycleState.Dead - for ((_, branchNode) in branchNodeByKey) { - branchNode.downstreamSet.removeIndirectUpstream( - coroutineScope = this, - scheduler, - depth, - indirectSet, - ) - } - } + lifecycle.lifecycleState = DemuxLifecycleState.Dead + for ((_, branchNode) in branchNodeByKey) { + branchNode.downstreamSet.removeIndirectUpstream(scheduler, depth, indirectSet) } } - override suspend fun removeDirectUpstream(scheduler: Scheduler, depth: Int) { - coroutineScope { - mutex.withLock { - lifecycle.lifecycleState = DemuxLifecycleState.Dead - for ((_, branchNode) in branchNodeByKey) { - branchNode.downstreamSet.removeDirectUpstream( - coroutineScope = this, - scheduler, - depth, - ) - } - } + override fun removeDirectUpstream(scheduler: Scheduler, depth: Int) { + lifecycle.lifecycleState = DemuxLifecycleState.Dead + for ((_, branchNode) in branchNodeByKey) { + branchNode.downstreamSet.removeDirectUpstream(scheduler, depth) } } - suspend fun removeDownstreamAndDeactivateIfNeeded(key: K) { - val deactivate = - mutex.withLock { - branchNodeByKey.remove(key) - branchNodeByKey.isEmpty() - } + fun removeDownstreamAndDeactivateIfNeeded(key: K) { + branchNodeByKey.remove(key) + val deactivate = branchNodeByKey.isEmpty() if (deactivate) { // No need for mutex here; no more concurrent changes to can occur during this phase lifecycle.lifecycleState = DemuxLifecycleState.Inactive(spec) @@ -195,57 +148,57 @@ internal class DemuxNode<W, K, A>( epoch = evalScope.epoch } - suspend fun getPushEvent(evalScope: EvalScope, key: K): A = - upstreamConnection.getPushEvent(evalScope).getValue(key) + fun getPushEvent(logIndent: Int, evalScope: EvalScope, key: K): A = + logDuration(logIndent, "Demux.getPushEvent($key)") { + upstreamConnection.getPushEvent(currentLogIndent, evalScope).getValue(key) + } inner class BranchNode(val key: K) : PushNode<A> { - private val mutex = Mutex() - val downstreamSet = DownstreamSet() override val depthTracker: DepthTracker get() = upstreamConnection.depthTracker - override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = - hasCurrentValue(evalScope, key) + override fun hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean = + hasCurrentValue(logIndent, evalScope, key) - override suspend fun getPushEvent(evalScope: EvalScope): A = getPushEvent(evalScope, key) + override fun getPushEvent(logIndent: Int, evalScope: EvalScope): A = + getPushEvent(logIndent, evalScope, key) - override suspend fun addDownstream(downstream: Schedulable) { - mutex.withLock { downstreamSet.add(downstream) } + override fun addDownstream(downstream: Schedulable) { + downstreamSet.add(downstream) } - override suspend fun removeDownstream(downstream: Schedulable) { - mutex.withLock { downstreamSet.remove(downstream) } + override fun removeDownstream(downstream: Schedulable) { + downstreamSet.remove(downstream) } - override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) { - val canDeactivate = - mutex.withLock { - downstreamSet.remove(downstream) - downstreamSet.isEmpty() - } + override fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) { + downstreamSet.remove(downstream) + val canDeactivate = downstreamSet.isEmpty() if (canDeactivate) { removeDownstreamAndDeactivateIfNeeded(key) } } - override suspend fun deactivateIfNeeded() { - if (mutex.withLock { downstreamSet.isEmpty() }) { + override fun deactivateIfNeeded() { + if (downstreamSet.isEmpty()) { removeDownstreamAndDeactivateIfNeeded(key) } } - override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) { - if (mutex.withLock { downstreamSet.isEmpty() }) { + override fun scheduleDeactivationIfNeeded(evalScope: EvalScope) { + if (downstreamSet.isEmpty()) { evalScope.scheduleDeactivation(this) } } - suspend fun schedule(evalScope: EvalScope) { - if (!coroutineScope { mutex.withLock { scheduleAll(downstreamSet, evalScope) } }) { - evalScope.scheduleDeactivation(this) + fun schedule(logIndent: Int, evalScope: EvalScope) { + logDuration(logIndent, "DemuxBranchNode($key).schedule") { + if (!scheduleAll(currentLogIndent, downstreamSet, evalScope)) { + evalScope.scheduleDeactivation(this@BranchNode) + } } } } @@ -263,28 +216,27 @@ internal fun <W, K, A> DemuxImpl( ) internal fun <K, A> demuxMap( - upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>, + upstream: EvalScope.() -> TFlowImpl<Map<K, A>>, numKeys: Int?, ): DemuxImpl<K, A> = - DemuxImpl(mapImpl(upstream) { MapHolder(it) }, numKeys, ConcurrentHashMapK.Factory()) + DemuxImpl(mapImpl(upstream) { it, _ -> MapHolder(it) }, numKeys, ConcurrentHashMapK.Factory()) internal class DemuxActivator<W, K, A>( private val numKeys: Int?, private val upstream: TFlowImpl<MapK<W, K, A>>, private val storeFactory: MutableMapK.Factory<W, K>, ) { - suspend fun activate( + fun activate( evalScope: EvalScope, lifecycle: DemuxLifecycle<K, A>, ): Pair<DemuxNode<W, K, A>, Set<K>>? { val demux = DemuxNode(storeFactory.create(numKeys), lifecycle, this) - return upstream.activate(evalScope, downstream = demux.schedulable)?.let { (conn, needsEval) - -> + return upstream.activate(evalScope, demux.schedulable)?.let { (conn, needsEval) -> Pair( demux.apply { upstreamConnection = conn }, if (needsEval) { demux.updateEpoch(evalScope) - conn.getPushEvent(evalScope).keys + conn.getPushEvent(0, evalScope).keys } else { emptySet() }, @@ -297,7 +249,7 @@ internal class DemuxImpl<in K, out A>(private val dmux: DemuxLifecycle<K, A>) { fun eventsForKey(key: K): TFlowImpl<A> = TFlowCheap { downstream -> dmux.activate(evalScope = this, key)?.let { (branchNode, needsEval) -> branchNode.addDownstream(downstream) - val branchNeedsEval = needsEval && branchNode.hasCurrentValue(evalScope = this) + val branchNeedsEval = needsEval && branchNode.hasCurrentValue(0, evalScope = this) ActivationResult( connection = NodeConnection(branchNode, branchNode), needsEval = branchNeedsEval, @@ -311,31 +263,31 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle override fun toString(): String = "TFlowDmuxState[$hashString][$lifecycleState][$mutex]" - suspend fun activate( - evalScope: EvalScope, - key: K, - ): Pair<DemuxNode<*, K, A>.BranchNode, Boolean>? = - mutex.withLock { - when (val state = lifecycleState) { - is DemuxLifecycleState.Dead -> null - is DemuxLifecycleState.Active -> - state.node.getAndMaybeAddDownstream(key) to - state.node.hasCurrentValueLocked(evalScope, key) - is DemuxLifecycleState.Inactive -> { - state.spec - .activate(evalScope, this@DemuxLifecycle) - .also { result -> - lifecycleState = - if (result == null) { - DemuxLifecycleState.Dead - } else { - DemuxLifecycleState.Active(result.first) - } - } - ?.let { (node, needsEval) -> - node.getAndMaybeAddDownstream(key) to (key in needsEval) - } - } + fun activate(evalScope: EvalScope, key: K): Pair<DemuxNode<*, K, A>.BranchNode, Boolean>? = + when (val state = lifecycleState) { + is DemuxLifecycleState.Dead -> { + null + } + + is DemuxLifecycleState.Active -> { + state.node.getAndMaybeAddDownstream(key) to + state.node.hasCurrentValueLocked(0, evalScope, key) + } + + is DemuxLifecycleState.Inactive -> { + state.spec + .activate(evalScope, this@DemuxLifecycle) + .also { result -> + lifecycleState = + if (result == null) { + DemuxLifecycleState.Dead + } else { + DemuxLifecycleState.Active(result.first) + } + } + ?.let { (node, needsEval) -> + node.getAndMaybeAddDownstream(key) to (key in needsEval) + } } } } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt index afbd7120653c..9ecfbba7d647 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt @@ -28,21 +28,13 @@ import com.android.systemui.kairos.emptyTFlow import com.android.systemui.kairos.init import com.android.systemui.kairos.mapCheap import com.android.systemui.kairos.switch -import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext -import kotlin.coroutines.startCoroutine -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.completeWith -import kotlinx.coroutines.job internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope) : EvalScope, NetworkScope by networkScope, DeferScope by deferScope { - private suspend fun <A> Transactional<A>.sample(): A = - impl.sample().sample(this@EvalScopeImpl).await() + private fun <A> Transactional<A>.sample(): A = impl.sample().sample(this@EvalScopeImpl).value - private suspend fun <A> TState<A>.sample(): A = + private fun <A> TState<A>.sample(): A = init.connect(evalScope = this@EvalScopeImpl).getCurrentWithEpoch(this@EvalScopeImpl).first private val <A> Transactional<A>.deferredValue: FrpDeferredValue<A> @@ -62,7 +54,7 @@ internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope) "now", this, { result.mapCheap { emptyTFlow }.init.connect(evalScope = this) }, - CompletableDeferred( + CompletableLazy( TFlowInit( constInit( "now", @@ -82,25 +74,10 @@ internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope) result } - private fun <R> deferredInternal( - block: suspend FrpTransactionScope.() -> R - ): FrpDeferredValue<R> = FrpDeferredValue(deferAsync { runInTransactionScope(block) }) + private fun <R> deferredInternal(block: FrpTransactionScope.() -> R): FrpDeferredValue<R> = + FrpDeferredValue(deferAsync { runInTransactionScope(block) }) - override suspend fun <R> runInTransactionScope(block: suspend FrpTransactionScope.() -> R): R { - val complete = CompletableDeferred<R>(parent = coroutineContext.job) - block.startCoroutine( - frpScope, - object : Continuation<R> { - override val context: CoroutineContext - get() = EmptyCoroutineContext - - override fun resumeWith(result: Result<R>) { - complete.completeWith(result) - } - }, - ) - return complete.await() - } + override fun <R> runInTransactionScope(block: FrpTransactionScope.() -> R): R = frpScope.block() override val frpScope: FrpTransactionScope = FrpTransactionScopeImpl() @@ -110,7 +87,7 @@ internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope) override fun <A> TState<A>.sampleDeferred(): FrpDeferredValue<A> = deferredValue override fun <R> deferredTransactionScope( - block: suspend FrpTransactionScope.() -> R + block: FrpTransactionScope.() -> R ): FrpDeferredValue<R> = deferredInternal(block) override val now: TFlow<Unit> diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt index b60c227bcfbe..30c1a865f50a 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt @@ -24,10 +24,10 @@ import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.none internal inline fun <A> filterJustImpl( - crossinline getPulse: suspend EvalScope.() -> TFlowImpl<Maybe<A>> + crossinline getPulse: EvalScope.() -> TFlowImpl<Maybe<A>> ): TFlowImpl<A> = DemuxImpl( - mapImpl(getPulse) { maybeResult -> + mapImpl(getPulse) { maybeResult, _ -> if (maybeResult is Just) { Single(maybeResult.value) } else { @@ -40,6 +40,9 @@ internal inline fun <A> filterJustImpl( .eventsForKey(Unit) internal inline fun <A> filterImpl( - crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>, - crossinline f: suspend EvalScope.(A) -> Boolean, -): TFlowImpl<A> = filterJustImpl { mapImpl(getPulse) { if (f(it)) just(it) else none }.cached() } + crossinline getPulse: EvalScope.() -> TFlowImpl<A>, + crossinline f: EvalScope.(A) -> Boolean, +): TFlowImpl<A> { + val mapped = mapImpl(getPulse) { it, _ -> if (f(it)) just(it) else none }.cached() + return filterJustImpl { mapped } +} diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Graph.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Graph.kt index 828f13b026d3..667002bd413c 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Graph.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Graph.kt @@ -18,8 +18,6 @@ package com.android.systemui.kairos.internal import com.android.systemui.kairos.internal.util.Bag import java.util.TreeMap -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.launch /** * Tracks all upstream connections for Mux nodes. @@ -86,7 +84,7 @@ internal class DepthTracker { @Volatile private var dirty_depthIsDirect = true @Volatile private var dirty_isIndirectRoot = false - fun schedule(scheduler: Scheduler, node: MuxNode<*, *, *, *>) { + fun schedule(scheduler: Scheduler, node: MuxNode<*, *, *>) { if (dirty_depthIsDirect) { scheduler.schedule(dirty_directDepth, node) } else { @@ -192,30 +190,27 @@ internal class DepthTracker { return remainder } - suspend fun propagateChanges(scheduler: Scheduler, muxNode: MuxNode<*, *, *, *>) { + fun propagateChanges(scheduler: Scheduler, muxNode: MuxNode<*, *, *>) { if (isDirty()) { schedule(scheduler, muxNode) } } fun applyChanges( - coroutineScope: CoroutineScope, scheduler: Scheduler, downstreamSet: DownstreamSet, - muxNode: MuxNode<*, *, *, *>, + muxNode: MuxNode<*, *, *>, ) { when { dirty_depthIsDirect -> { if (snapshotIsDirect) { downstreamSet.adjustDirectUpstream( - coroutineScope, scheduler, oldDepth = snapshotDirectDepth, newDepth = dirty_directDepth, ) } else { downstreamSet.moveIndirectUpstreamToDirect( - coroutineScope, scheduler, oldIndirectDepth = snapshotIndirectDepth, oldIndirectSet = @@ -233,7 +228,6 @@ internal class DepthTracker { dirty_hasIndirectUpstream() || dirty_isIndirectRoot -> { if (snapshotIsDirect) { downstreamSet.moveDirectUpstreamToIndirect( - coroutineScope, scheduler, oldDirectDepth = snapshotDirectDepth, newIndirectDepth = dirty_indirectDepth, @@ -247,7 +241,6 @@ internal class DepthTracker { ) } else { downstreamSet.adjustIndirectUpstream( - coroutineScope, scheduler, oldDepth = snapshotIndirectDepth, newDepth = dirty_indirectDepth, @@ -274,14 +267,9 @@ internal class DepthTracker { muxNode.lifecycle.lifecycleState = MuxLifecycleState.Dead if (snapshotIsDirect) { - downstreamSet.removeDirectUpstream( - coroutineScope, - scheduler, - depth = snapshotDirectDepth, - ) + downstreamSet.removeDirectUpstream(scheduler, depth = snapshotDirectDepth) } else { downstreamSet.removeIndirectUpstream( - coroutineScope, scheduler, depth = snapshotIndirectDepth, indirectSet = @@ -374,125 +362,92 @@ internal class DownstreamSet { } } - fun adjustDirectUpstream( - coroutineScope: CoroutineScope, - scheduler: Scheduler, - oldDepth: Int, - newDepth: Int, - ) = - coroutineScope.run { - for (node in nodes) { - launch { node.adjustDirectUpstream(scheduler, oldDepth, newDepth) } - } + fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) { + for (node in nodes) { + node.adjustDirectUpstream(scheduler, oldDepth, newDepth) } + } fun moveIndirectUpstreamToDirect( - coroutineScope: CoroutineScope, scheduler: Scheduler, oldIndirectDepth: Int, oldIndirectSet: Set<MuxDeferredNode<*, *, *>>, newDirectDepth: Int, - ) = - coroutineScope.run { - for (node in nodes) { - launch { - node.moveIndirectUpstreamToDirect( - scheduler, - oldIndirectDepth, - oldIndirectSet, - newDirectDepth, - ) - } - } - for (mover in muxMovers) { - launch { - mover.moveIndirectPatchNodeToDirect(scheduler, oldIndirectDepth, oldIndirectSet) - } - } + ) { + for (node in nodes) { + node.moveIndirectUpstreamToDirect( + scheduler, + oldIndirectDepth, + oldIndirectSet, + newDirectDepth, + ) + } + for (mover in muxMovers) { + mover.moveIndirectPatchNodeToDirect(scheduler, oldIndirectDepth, oldIndirectSet) } + } fun adjustIndirectUpstream( - coroutineScope: CoroutineScope, scheduler: Scheduler, oldDepth: Int, newDepth: Int, removals: Set<MuxDeferredNode<*, *, *>>, additions: Set<MuxDeferredNode<*, *, *>>, - ) = - coroutineScope.run { - for (node in nodes) { - launch { - node.adjustIndirectUpstream(scheduler, oldDepth, newDepth, removals, additions) - } - } - for (mover in muxMovers) { - launch { - mover.adjustIndirectPatchNode( - scheduler, - oldDepth, - newDepth, - removals, - additions, - ) - } - } + ) { + for (node in nodes) { + node.adjustIndirectUpstream(scheduler, oldDepth, newDepth, removals, additions) + } + for (mover in muxMovers) { + mover.adjustIndirectPatchNode(scheduler, oldDepth, newDepth, removals, additions) } + } fun moveDirectUpstreamToIndirect( - coroutineScope: CoroutineScope, scheduler: Scheduler, oldDirectDepth: Int, newIndirectDepth: Int, newIndirectSet: Set<MuxDeferredNode<*, *, *>>, - ) = - coroutineScope.run { - for (node in nodes) { - launch { - node.moveDirectUpstreamToIndirect( - scheduler, - oldDirectDepth, - newIndirectDepth, - newIndirectSet, - ) - } - } - for (mover in muxMovers) { - launch { - mover.moveDirectPatchNodeToIndirect(scheduler, newIndirectDepth, newIndirectSet) - } - } + ) { + for (node in nodes) { + node.moveDirectUpstreamToIndirect( + scheduler, + oldDirectDepth, + newIndirectDepth, + newIndirectSet, + ) + } + for (mover in muxMovers) { + mover.moveDirectPatchNodeToIndirect(scheduler, newIndirectDepth, newIndirectSet) } + } fun removeIndirectUpstream( - coroutineScope: CoroutineScope, scheduler: Scheduler, depth: Int, indirectSet: Set<MuxDeferredNode<*, *, *>>, - ) = - coroutineScope.run { - for (node in nodes) { - launch { node.removeIndirectUpstream(scheduler, depth, indirectSet) } - } - for (mover in muxMovers) { - launch { mover.removeIndirectPatchNode(scheduler, depth, indirectSet) } - } - for (output in outputs) { - launch { output.kill() } - } + ) { + for (node in nodes) { + node.removeIndirectUpstream(scheduler, depth, indirectSet) + } + for (mover in muxMovers) { + mover.removeIndirectPatchNode(scheduler, depth, indirectSet) } + for (output in outputs) { + output.kill() + } + } - fun removeDirectUpstream(coroutineScope: CoroutineScope, scheduler: Scheduler, depth: Int) = - coroutineScope.run { - for (node in nodes) { - launch { node.removeDirectUpstream(scheduler, depth) } - } - for (mover in muxMovers) { - launch { mover.removeDirectPatchNode(scheduler) } - } - for (output in outputs) { - launch { output.kill() } - } + fun removeDirectUpstream(scheduler: Scheduler, depth: Int) { + for (node in nodes) { + node.removeDirectUpstream(scheduler, depth) + } + for (mover in muxMovers) { + mover.removeDirectPatchNode(scheduler) } + for (output in outputs) { + output.kill() + } + } fun clear() { outputs.clear() @@ -518,13 +473,14 @@ internal fun DownstreamSet.isEmpty() = @Suppress("NOTHING_TO_INLINE") internal inline fun DownstreamSet.isNotEmpty() = !isEmpty() -internal fun CoroutineScope.scheduleAll( +internal fun scheduleAll( + logIndent: Int, downstreamSet: DownstreamSet, evalScope: EvalScope, ): Boolean { - downstreamSet.nodes.forEach { launch { it.schedule(evalScope) } } - downstreamSet.muxMovers.forEach { launch { it.scheduleMover(evalScope) } } - downstreamSet.outputs.forEach { launch { it.schedule(evalScope) } } + downstreamSet.nodes.forEach { it.schedule(logIndent, evalScope) } + downstreamSet.muxMovers.forEach { it.scheduleMover(logIndent, evalScope) } + downstreamSet.outputs.forEach { it.schedule(logIndent, evalScope) } downstreamSet.stateWriters.forEach { evalScope.schedule(it) } return downstreamSet.isNotEmpty() } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt index 57db9a493e21..10a46775beb9 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt @@ -19,42 +19,37 @@ package com.android.systemui.kairos.internal import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.none -import java.util.concurrent.atomic.AtomicBoolean -import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.ExperimentalCoroutinesApi /** Performs actions once, when the reactive component is first connected to the network. */ -internal class Init<out A>(val name: String?, private val block: suspend InitScope.() -> A) { - - /** Has the initialization logic been evaluated yet? */ - private val initialized = AtomicBoolean() +internal class Init<out A>(val name: String?, private val block: InitScope.() -> A) { /** * Stores the result after initialization, as well as the id of the [Network] it's been * initialized with. */ - private val cache = CompletableDeferred<Pair<Any, A>>() + private val cache = CompletableLazy<Pair<Any, A>>() - suspend fun connect(evalScope: InitScope): A = - if (initialized.getAndSet(true)) { + fun connect(evalScope: InitScope): A = + if (cache.isInitialized()) { // Read from cache - val (networkId, result) = cache.await() + val (networkId, result) = cache.value check(networkId == evalScope.networkId) { "Network mismatch" } result } else { // Write to cache - block(evalScope).also { cache.complete(evalScope.networkId to it) } + block(evalScope).also { cache.setValue(evalScope.networkId to it) } } @OptIn(ExperimentalCoroutinesApi::class) fun getUnsafe(): Maybe<A> = - if (cache.isCompleted) { - just(cache.getCompleted().second) + if (cache.isInitialized()) { + just(cache.value.second) } else { none } } -internal fun <A> init(name: String?, block: suspend InitScope.() -> A) = Init(name, block) +internal fun <A> init(name: String?, block: InitScope.() -> A) = Init(name, block) internal fun <A> constInit(name: String?, value: A) = init(name) { value } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt index 1edc8c28b2ee..1dcba4433a8d 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt @@ -16,104 +16,103 @@ package com.android.systemui.kairos.internal -import com.android.systemui.kairos.internal.util.Key +import com.android.systemui.kairos.internal.util.logDuration import java.util.concurrent.atomic.AtomicBoolean -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock internal class InputNode<A>( - private val activate: suspend EvalScope.() -> Unit = {}, + private val activate: EvalScope.() -> Unit = {}, private val deactivate: () -> Unit = {}, -) : PushNode<A>, Key<A> { +) : PushNode<A> { private val downstreamSet = DownstreamSet() - private val mutex = Mutex() - private val activated = AtomicBoolean(false) + val activated = AtomicBoolean(false) - @Volatile private var epoch: Long = Long.MIN_VALUE + private val transactionCache = TransactionCache<A>() + private val epoch + get() = transactionCache.epoch override val depthTracker: DepthTracker = DepthTracker() - override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = epoch == evalScope.epoch + override fun hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean = + epoch == evalScope.epoch - suspend fun visit(evalScope: EvalScope, value: A) { - epoch = evalScope.epoch - evalScope.setResult(this, value) - coroutineScope { - if (!mutex.withLock { scheduleAll(downstreamSet, evalScope) }) { - evalScope.scheduleDeactivation(this@InputNode) - } + fun visit(evalScope: EvalScope, value: A) { + transactionCache.put(evalScope, value) + if (!scheduleAll(0, downstreamSet, evalScope)) { + evalScope.scheduleDeactivation(this@InputNode) } } - override suspend fun removeDownstream(downstream: Schedulable) { - mutex.withLock { downstreamSet.remove(downstream) } + override fun removeDownstream(downstream: Schedulable) { + downstreamSet.remove(downstream) } - override suspend fun deactivateIfNeeded() { - if (mutex.withLock { downstreamSet.isEmpty() && activated.getAndSet(false) }) { + override fun deactivateIfNeeded() { + if (downstreamSet.isEmpty() && activated.getAndSet(false)) { deactivate() } } - override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) { - if (mutex.withLock { downstreamSet.isEmpty() }) { + override fun scheduleDeactivationIfNeeded(evalScope: EvalScope) { + if (downstreamSet.isEmpty()) { evalScope.scheduleDeactivation(this) } } - override suspend fun addDownstream(downstream: Schedulable) { - mutex.withLock { downstreamSet.add(downstream) } + override fun addDownstream(downstream: Schedulable) { + downstreamSet.add(downstream) } - suspend fun addDownstreamAndActivateIfNeeded(downstream: Schedulable, evalScope: EvalScope) { - val needsActivation = - mutex.withLock { - val wasEmpty = downstreamSet.isEmpty() - downstreamSet.add(downstream) - wasEmpty && !activated.getAndSet(true) - } + fun addDownstreamAndActivateIfNeeded(downstream: Schedulable, evalScope: EvalScope) { + val needsActivation = run { + val wasEmpty = downstreamSet.isEmpty() + downstreamSet.add(downstream) + wasEmpty && !activated.getAndSet(true) + } if (needsActivation) { activate(evalScope) } } - override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) { - val needsDeactivation = - mutex.withLock { - downstreamSet.remove(downstream) - downstreamSet.isEmpty() && activated.getAndSet(false) - } + override fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) { + downstreamSet.remove(downstream) + val needsDeactivation = downstreamSet.isEmpty() && activated.getAndSet(false) if (needsDeactivation) { deactivate() } } - override suspend fun getPushEvent(evalScope: EvalScope): A = evalScope.getCurrentValue(this) + override fun getPushEvent(logIndent: Int, evalScope: EvalScope): A = + logDuration(logIndent, "Input.getPushEvent", false) { + transactionCache.getCurrentValue(evalScope) + } } internal fun <A> InputNode<A>.activated() = TFlowCheap { downstream -> val input = this@activated addDownstreamAndActivateIfNeeded(downstream, evalScope = this) - ActivationResult(connection = NodeConnection(input, input), needsEval = hasCurrentValue(input)) + ActivationResult( + connection = NodeConnection(input, input), + needsEval = input.hasCurrentValue(0, evalScope = this), + ) } internal data object AlwaysNode : PushNode<Unit> { override val depthTracker = DepthTracker() - override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = true + override fun hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean = true - override suspend fun removeDownstream(downstream: Schedulable) {} + override fun removeDownstream(downstream: Schedulable) {} - override suspend fun deactivateIfNeeded() {} + override fun deactivateIfNeeded() {} - override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {} + override fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {} - override suspend fun addDownstream(downstream: Schedulable) {} + override fun addDownstream(downstream: Schedulable) {} - override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {} + override fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {} - override suspend fun getPushEvent(evalScope: EvalScope) = Unit + override fun getPushEvent(logIndent: Int, evalScope: EvalScope) = + logDuration(logIndent, "Always.getPushEvent", false) { Unit } } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt index 80c40ba740a5..62bf34810de7 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt @@ -20,8 +20,6 @@ import com.android.systemui.kairos.FrpBuildScope import com.android.systemui.kairos.FrpStateScope import com.android.systemui.kairos.FrpTransactionScope import com.android.systemui.kairos.TFlow -import com.android.systemui.kairos.internal.util.HeteroMap -import com.android.systemui.kairos.internal.util.Key internal interface InitScope { val networkId: Any @@ -30,23 +28,25 @@ internal interface InitScope { internal interface EvalScope : NetworkScope, DeferScope { val frpScope: FrpTransactionScope - suspend fun <R> runInTransactionScope(block: suspend FrpTransactionScope.() -> R): R + fun <R> runInTransactionScope(block: FrpTransactionScope.() -> R): R } internal interface StateScope : EvalScope { override val frpScope: FrpStateScope - suspend fun <R> runInStateScope(block: suspend FrpStateScope.() -> R): R + fun <R> runInStateScope(block: FrpStateScope.() -> R): R val endSignal: TFlow<Any> fun childStateScope(newEnd: TFlow<Any>): StateScope + + val endSignalOnce: TFlow<Any> } internal interface BuildScope : StateScope { override val frpScope: FrpBuildScope - suspend fun <R> runInBuildScope(block: suspend FrpBuildScope.() -> R): R + fun <R> runInBuildScope(block: FrpBuildScope.() -> R): R } internal interface NetworkScope : InitScope { @@ -57,7 +57,7 @@ internal interface NetworkScope : InitScope { val compactor: Scheduler val scheduler: Scheduler - val transactionStore: HeteroMap + val transactionStore: TransactionStore fun scheduleOutput(output: Output<*>) @@ -69,12 +69,3 @@ internal interface NetworkScope : InitScope { fun scheduleDeactivation(output: Output<*>) } - -internal fun <A> NetworkScope.setResult(node: Key<A>, result: A) { - transactionStore[node] = result -} - -internal fun <A> NetworkScope.getCurrentValue(key: Key<A>): A = - transactionStore.getOrError(key) { "No value for $key in transaction $epoch" } - -internal fun NetworkScope.hasCurrentValue(key: Key<*>): Boolean = transactionStore.contains(key) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt index a479c90cc4de..1cdf895ec1ed 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt @@ -22,44 +22,39 @@ import com.android.systemui.kairos.internal.store.MapHolder import com.android.systemui.kairos.internal.store.MapK import com.android.systemui.kairos.internal.store.MutableMapK import com.android.systemui.kairos.internal.store.asMapHolder -import com.android.systemui.kairos.internal.util.asyncImmediate import com.android.systemui.kairos.internal.util.hashString -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.awaitAll -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.sync.Mutex -import kotlinx.coroutines.sync.withLock +import com.android.systemui.kairos.internal.util.logDuration internal typealias MuxResult<W, K, V> = MapK<W, K, PullNode<V>> /** Base class for muxing nodes, which have a (potentially dynamic) collection of upstream nodes. */ -internal sealed class MuxNode<W, K, V, Output>( - val lifecycle: MuxLifecycle<Output>, +internal sealed class MuxNode<W, K, V>( + val lifecycle: MuxLifecycle<W, K, V>, protected val storeFactory: MutableMapK.Factory<W, K>, -) : PushNode<Output> { +) : PushNode<MuxResult<W, K, V>> { - inline val mutex - get() = lifecycle.mutex + lateinit var upstreamData: MutableMapK<W, K, PullNode<V>> + lateinit var switchedIn: MutableMapK<W, K, BranchNode> - @Volatile lateinit var upstreamData: MutableMapK<W, K, PullNode<V>> - @Volatile lateinit var switchedIn: MutableMapK<W, K, BranchNode> + @Volatile var markedForCompaction = false + @Volatile var markedForEvaluation = false val downstreamSet: DownstreamSet = DownstreamSet() // TODO: inline DepthTracker? would need to be added to PushNode signature final override val depthTracker = DepthTracker() - @Volatile - var epoch: Long = Long.MIN_VALUE - protected set + val transactionCache = TransactionCache<MuxResult<W, K, V>>() + val epoch + get() = transactionCache.epoch inline fun hasCurrentValueLocked(evalScope: EvalScope): Boolean = epoch == evalScope.epoch - override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = - mutex.withLock { hasCurrentValueLocked(evalScope) } + override fun hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean = + hasCurrentValueLocked(evalScope) - final override suspend fun addDownstream(downstream: Schedulable) { - mutex.withLock { addDownstreamLocked(downstream) } + final override fun addDownstream(downstream: Schedulable) { + addDownstreamLocked(downstream) } /** @@ -72,135 +67,121 @@ internal sealed class MuxNode<W, K, V, Output>( downstreamSet.add(downstream) } - final override suspend fun removeDownstream(downstream: Schedulable) { + final override fun removeDownstream(downstream: Schedulable) { // TODO: return boolean? - mutex.withLock { downstreamSet.remove(downstream) } + downstreamSet.remove(downstream) } - final override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) { - val deactivate = - mutex.withLock { - downstreamSet.remove(downstream) - downstreamSet.isEmpty() - } + final override fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) { + downstreamSet.remove(downstream) + val deactivate = downstreamSet.isEmpty() if (deactivate) { doDeactivate() } } - final override suspend fun deactivateIfNeeded() { - if (mutex.withLock { downstreamSet.isEmpty() }) { + final override fun deactivateIfNeeded() { + if (downstreamSet.isEmpty()) { doDeactivate() } } /** visit this node from the scheduler (push eval) */ - abstract suspend fun visit(evalScope: EvalScope) + abstract fun visit(logIndent: Int, evalScope: EvalScope) /** perform deactivation logic, propagating to all upstream nodes. */ - protected abstract suspend fun doDeactivate() + protected abstract fun doDeactivate() - final override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) { - if (mutex.withLock { downstreamSet.isEmpty() }) { + final override fun scheduleDeactivationIfNeeded(evalScope: EvalScope) { + if (downstreamSet.isEmpty()) { evalScope.scheduleDeactivation(this) } } - suspend fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) { - mutex.withLock { - if (depthTracker.addDirectUpstream(oldDepth, newDepth)) { - depthTracker.schedule(scheduler, this) - } + fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) { + + if (depthTracker.addDirectUpstream(oldDepth, newDepth)) { + depthTracker.schedule(scheduler, this) } } - suspend fun moveIndirectUpstreamToDirect( + fun moveIndirectUpstreamToDirect( scheduler: Scheduler, oldIndirectDepth: Int, oldIndirectRoots: Set<MuxDeferredNode<*, *, *>>, newDepth: Int, ) { - mutex.withLock { - if ( - depthTracker.addDirectUpstream(oldDepth = null, newDepth) or - depthTracker.removeIndirectUpstream(depth = oldIndirectDepth) or - depthTracker.updateIndirectRoots(removals = oldIndirectRoots) - ) { - depthTracker.schedule(scheduler, this) - } + if ( + depthTracker.addDirectUpstream(oldDepth = null, newDepth) or + depthTracker.removeIndirectUpstream(depth = oldIndirectDepth) or + depthTracker.updateIndirectRoots(removals = oldIndirectRoots) + ) { + depthTracker.schedule(scheduler, this) } } - suspend fun adjustIndirectUpstream( + fun adjustIndirectUpstream( scheduler: Scheduler, oldDepth: Int, newDepth: Int, removals: Set<MuxDeferredNode<*, *, *>>, additions: Set<MuxDeferredNode<*, *, *>>, ) { - mutex.withLock { - if ( - depthTracker.addIndirectUpstream(oldDepth, newDepth) or - depthTracker.updateIndirectRoots( - additions, - removals, - butNot = this as? MuxDeferredNode<*, *, *>, - ) - ) { - depthTracker.schedule(scheduler, this) - } + if ( + depthTracker.addIndirectUpstream(oldDepth, newDepth) or + depthTracker.updateIndirectRoots( + additions, + removals, + butNot = this as? MuxDeferredNode<*, *, *>, + ) + ) { + depthTracker.schedule(scheduler, this) } } - suspend fun moveDirectUpstreamToIndirect( + fun moveDirectUpstreamToIndirect( scheduler: Scheduler, oldDepth: Int, newDepth: Int, newIndirectSet: Set<MuxDeferredNode<*, *, *>>, ) { - mutex.withLock { - if ( - depthTracker.addIndirectUpstream(oldDepth = null, newDepth) or - depthTracker.removeDirectUpstream(oldDepth) or - depthTracker.updateIndirectRoots( - additions = newIndirectSet, - butNot = this as? MuxDeferredNode<*, *, *>, - ) - ) { - depthTracker.schedule(scheduler, this) - } + if ( + depthTracker.addIndirectUpstream(oldDepth = null, newDepth) or + depthTracker.removeDirectUpstream(oldDepth) or + depthTracker.updateIndirectRoots( + additions = newIndirectSet, + butNot = this as? MuxDeferredNode<*, *, *>, + ) + ) { + depthTracker.schedule(scheduler, this) } } - suspend fun removeDirectUpstream(scheduler: Scheduler, depth: Int, key: K) { - mutex.withLock { - switchedIn.remove(key) - if (depthTracker.removeDirectUpstream(depth)) { - depthTracker.schedule(scheduler, this) - } + fun removeDirectUpstream(scheduler: Scheduler, depth: Int, key: K) { + switchedIn.remove(key) + if (depthTracker.removeDirectUpstream(depth)) { + depthTracker.schedule(scheduler, this) } } - suspend fun removeIndirectUpstream( + fun removeIndirectUpstream( scheduler: Scheduler, oldDepth: Int, indirectSet: Set<MuxDeferredNode<*, *, *>>, key: K, ) { - mutex.withLock { - switchedIn.remove(key) - if ( - depthTracker.removeIndirectUpstream(oldDepth) or - depthTracker.updateIndirectRoots(removals = indirectSet) - ) { - depthTracker.schedule(scheduler, this) - } + switchedIn.remove(key) + if ( + depthTracker.removeIndirectUpstream(oldDepth) or + depthTracker.updateIndirectRoots(removals = indirectSet) + ) { + depthTracker.schedule(scheduler, this) } } - suspend fun visitCompact(scheduler: Scheduler) = coroutineScope { + fun visitCompact(scheduler: Scheduler) { if (depthTracker.isDirty()) { - depthTracker.applyChanges(coroutineScope = this, scheduler, downstreamSet, this@MuxNode) + depthTracker.applyChanges(scheduler, downstreamSet, this@MuxNode) } } @@ -217,22 +198,23 @@ internal sealed class MuxNode<W, K, V, Output>( val schedulable = Schedulable.N(this) - @Volatile lateinit var upstream: NodeConnection<V> + lateinit var upstream: NodeConnection<V> - override suspend fun schedule(evalScope: EvalScope) { - upstreamData[key] = upstream.directUpstream - this@MuxNode.schedule(evalScope) + override fun schedule(logIndent: Int, evalScope: EvalScope) { + logDuration(logIndent, "MuxBranchNode.schedule") { + if (this@MuxNode is MuxPromptNode && this@MuxNode.name != null) { + logLn("[${this@MuxNode}] scheduling $key") + } + upstreamData[key] = upstream.directUpstream + this@MuxNode.schedule(evalScope) + } } - override suspend fun adjustDirectUpstream( - scheduler: Scheduler, - oldDepth: Int, - newDepth: Int, - ) { + override fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) { this@MuxNode.adjustDirectUpstream(scheduler, oldDepth, newDepth) } - override suspend fun moveIndirectUpstreamToDirect( + override fun moveIndirectUpstreamToDirect( scheduler: Scheduler, oldIndirectDepth: Int, oldIndirectSet: Set<MuxDeferredNode<*, *, *>>, @@ -246,7 +228,7 @@ internal sealed class MuxNode<W, K, V, Output>( ) } - override suspend fun adjustIndirectUpstream( + override fun adjustIndirectUpstream( scheduler: Scheduler, oldDepth: Int, newDepth: Int, @@ -256,7 +238,7 @@ internal sealed class MuxNode<W, K, V, Output>( this@MuxNode.adjustIndirectUpstream(scheduler, oldDepth, newDepth, removals, additions) } - override suspend fun moveDirectUpstreamToIndirect( + override fun moveDirectUpstreamToIndirect( scheduler: Scheduler, oldDirectDepth: Int, newIndirectDepth: Int, @@ -270,11 +252,11 @@ internal sealed class MuxNode<W, K, V, Output>( ) } - override suspend fun removeDirectUpstream(scheduler: Scheduler, depth: Int) { + override fun removeDirectUpstream(scheduler: Scheduler, depth: Int) { removeDirectUpstream(scheduler, depth, key) } - override suspend fun removeIndirectUpstream( + override fun removeIndirectUpstream( scheduler: Scheduler, depth: Int, indirectSet: Set<MuxDeferredNode<*, *, *>>, @@ -286,113 +268,110 @@ internal sealed class MuxNode<W, K, V, Output>( } } -internal typealias BranchNode<W, K, V> = MuxNode<W, K, V, *>.BranchNode +internal typealias BranchNode<W, K, V> = MuxNode<W, K, V>.BranchNode /** Tracks lifecycle of MuxNode in the network. Essentially a mutable ref for MuxLifecycleState. */ -internal class MuxLifecycle<A>(@Volatile var lifecycleState: MuxLifecycleState<A>) : TFlowImpl<A> { - val mutex = Mutex() +internal class MuxLifecycle<W, K, V>(var lifecycleState: MuxLifecycleState<W, K, V>) : + TFlowImpl<MuxResult<W, K, V>> { - override fun toString(): String = "TFlowLifecycle[$hashString][$lifecycleState][$mutex]" + override fun toString(): String = "TFlowMuxLifecycle[$hashString][$lifecycleState]" - override suspend fun activate( + override fun activate( evalScope: EvalScope, downstream: Schedulable, - ): ActivationResult<A>? = - mutex.withLock { - when (val state = lifecycleState) { - is MuxLifecycleState.Dead -> null - is MuxLifecycleState.Active -> { - state.node.addDownstreamLocked(downstream) - ActivationResult( - connection = NodeConnection(state.node, state.node), - needsEval = state.node.hasCurrentValueLocked(evalScope), - ) - } - is MuxLifecycleState.Inactive -> { - state.spec - .activate(evalScope, this@MuxLifecycle) - .also { node -> - lifecycleState = - if (node == null) { - MuxLifecycleState.Dead - } else { - MuxLifecycleState.Active(node) - } - } - ?.let { node -> - node.addDownstreamLocked(downstream) - ActivationResult( - connection = NodeConnection(node, node), - needsEval = false, - ) - } - } + ): ActivationResult<MuxResult<W, K, V>>? = + when (val state = lifecycleState) { + is MuxLifecycleState.Dead -> { + null + } + is MuxLifecycleState.Active -> { + state.node.addDownstreamLocked(downstream) + ActivationResult( + connection = NodeConnection(state.node, state.node), + needsEval = state.node.hasCurrentValueLocked(evalScope), + ) + } + is MuxLifecycleState.Inactive -> { + state.spec + .activate(evalScope, this@MuxLifecycle) + .also { node -> + lifecycleState = + if (node == null) { + MuxLifecycleState.Dead + } else { + MuxLifecycleState.Active(node.first) + } + } + ?.let { (node, postActivate) -> + postActivate?.invoke() + node.addDownstreamLocked(downstream) + ActivationResult(connection = NodeConnection(node, node), needsEval = false) + } } } } -internal sealed interface MuxLifecycleState<out A> { - class Inactive<A>(val spec: MuxActivator<A>) : MuxLifecycleState<A> { +internal sealed interface MuxLifecycleState<out W, out K, out V> { + class Inactive<W, K, V>(val spec: MuxActivator<W, K, V>) : MuxLifecycleState<W, K, V> { override fun toString(): String = "Inactive" } - class Active<A>(val node: MuxNode<*, *, *, A>) : MuxLifecycleState<A> { + class Active<W, K, V>(val node: MuxNode<W, K, V>) : MuxLifecycleState<W, K, V> { override fun toString(): String = "Active(node=$node)" } - data object Dead : MuxLifecycleState<Nothing> + data object Dead : MuxLifecycleState<Nothing, Nothing, Nothing> } -internal interface MuxActivator<A> { - suspend fun activate(evalScope: EvalScope, lifecycle: MuxLifecycle<A>): MuxNode<*, *, *, A>? +internal interface MuxActivator<W, K, V> { + fun activate( + evalScope: EvalScope, + lifecycle: MuxLifecycle<W, K, V>, + ): Pair<MuxNode<W, K, V>, (() -> Unit)?>? } -internal inline fun <A> MuxLifecycle(onSubscribe: MuxActivator<A>): TFlowImpl<A> = - MuxLifecycle(MuxLifecycleState.Inactive(onSubscribe)) +internal inline fun <W, K, V> MuxLifecycle( + onSubscribe: MuxActivator<W, K, V> +): TFlowImpl<MuxResult<W, K, V>> = MuxLifecycle(MuxLifecycleState.Inactive(onSubscribe)) internal fun <K, V> TFlowImpl<MuxResult<MapHolder.W, K, V>>.awaitValues(): TFlowImpl<Map<K, V>> = - mapImpl({ this@awaitValues }) { results -> - results.asMapHolder().unwrapped.mapValues { it.value.getPushEvent(this) } + mapImpl({ this@awaitValues }) { results, logIndent -> + results.asMapHolder().unwrapped.mapValues { it.value.getPushEvent(logIndent, this) } } // activation logic -internal suspend fun <W, K, V, O> MuxNode<W, K, V, O>.initializeUpstream( +internal fun <W, K, V> MuxNode<W, K, V>.initializeUpstream( evalScope: EvalScope, - getStorage: suspend EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>, + getStorage: EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>, storeFactory: MutableMapK.Factory<W, K>, ) { val storage = getStorage(evalScope) - coroutineScope { - val initUpstream = buildList { - storage.forEach { (key, flow) -> - val branchNode = BranchNode(key) - add( - asyncImmediate(start = CoroutineStart.LAZY) { - flow.activate(evalScope, branchNode.schedulable)?.let { (conn, needsEval) -> - Triple( - key, - branchNode.apply { upstream = conn }, - if (needsEval) conn.directUpstream else null, - ) - } - } - ) - } + val initUpstream = buildList { + storage.forEach { (key, flow) -> + val branchNode = BranchNode(key) + add( + flow.activate(evalScope, branchNode.schedulable)?.let { (conn, needsEval) -> + Triple( + key, + branchNode.apply { upstream = conn }, + if (needsEval) conn.directUpstream else null, + ) + } + ) } - val results = initUpstream.awaitAll() - switchedIn = storeFactory.create(initUpstream.size) - upstreamData = storeFactory.create(initUpstream.size) - for (triple in results) { - triple?.let { (key, branch, upstream) -> - switchedIn[key] = branch - upstream?.let { upstreamData[key] = upstream } - } + } + switchedIn = storeFactory.create(initUpstream.size) + upstreamData = storeFactory.create(initUpstream.size) + for (triple in initUpstream) { + triple?.let { (key, branch, upstream) -> + switchedIn[key] = branch + upstream?.let { upstreamData[key] = upstream } } } } -internal fun <W, K, V, O> MuxNode<W, K, V, O>.initializeDepth() { +internal fun <W, K, V> MuxNode<W, K, V>.initializeDepth() { switchedIn.forEach { (_, branch) -> val conn = branch.upstream if (conn.depthTracker.snapshotIsDirect) { diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt index 7f40df508fb1..5ce0248d0655 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt @@ -16,15 +16,17 @@ package com.android.systemui.kairos.internal +import com.android.systemui.kairos.internal.store.MapK import com.android.systemui.kairos.internal.store.MutableArrayMapK import com.android.systemui.kairos.internal.store.MutableMapK import com.android.systemui.kairos.internal.store.SingletonMapK import com.android.systemui.kairos.internal.store.StoreEntry import com.android.systemui.kairos.internal.store.asArrayHolder +import com.android.systemui.kairos.internal.store.asSingle import com.android.systemui.kairos.internal.store.singleOf -import com.android.systemui.kairos.internal.util.Key import com.android.systemui.kairos.internal.util.hashString -import com.android.systemui.kairos.internal.util.mapParallel +import com.android.systemui.kairos.internal.util.logDuration +import com.android.systemui.kairos.internal.util.logLn import com.android.systemui.kairos.util.Just import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.None @@ -37,41 +39,49 @@ import com.android.systemui.kairos.util.maybeThis import com.android.systemui.kairos.util.merge import com.android.systemui.kairos.util.orError import com.android.systemui.kairos.util.these -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.withLock internal class MuxDeferredNode<W, K, V>( - lifecycle: MuxLifecycle<MuxResult<W, K, V>>, - val spec: MuxActivator<MuxResult<W, K, V>>, + val name: String?, + lifecycle: MuxLifecycle<W, K, V>, + val spec: MuxActivator<W, K, V>, factory: MutableMapK.Factory<W, K>, -) : MuxNode<W, K, V, MuxResult<W, K, V>>(lifecycle, factory), Key<MuxResult<W, K, V>> { +) : MuxNode<W, K, V>(lifecycle, factory) { val schedulable = Schedulable.M(this) - - @Volatile var patches: NodeConnection<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>? = null - @Volatile var patchData: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = null - - override suspend fun visit(evalScope: EvalScope) { - val scheduleDownstream = upstreamData.isNotEmpty() - val result = upstreamData.readOnlyCopy() - upstreamData.clear() - val compactDownstream = depthTracker.isDirty() - if (scheduleDownstream || compactDownstream) { - coroutineScope { - mutex.withLock { - if (compactDownstream) { + var patches: NodeConnection<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>? = null + var patchData: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = null + + override fun visit(logIndent: Int, evalScope: EvalScope) { + check(epoch < evalScope.epoch) { "node unexpectedly visited multiple times in transaction" } + logDuration(logIndent, "MuxDeferred[$name].visit") { + val scheduleDownstream: Boolean + val result: MapK<W, K, PullNode<V>> + logDuration("copying upstream data", false) { + scheduleDownstream = upstreamData.isNotEmpty() + result = upstreamData.readOnlyCopy() + upstreamData.clear() + } + if (name != null) { + logLn("[${this@MuxDeferredNode}] result = $result") + } + val compactDownstream = depthTracker.isDirty() + if (scheduleDownstream || compactDownstream) { + if (compactDownstream) { + logDuration("compactDownstream", false) { depthTracker.applyChanges( - coroutineScope = this, evalScope.scheduler, downstreamSet, muxNode = this@MuxDeferredNode, ) } - if (scheduleDownstream) { - epoch = evalScope.epoch - evalScope.setResult(this@MuxDeferredNode, result) - if (!scheduleAll(downstreamSet, evalScope)) { + } + if (scheduleDownstream) { + logDuration("scheduleDownstream") { + if (name != null) { + logLn("[${this@MuxDeferredNode}] scheduling") + } + transactionCache.put(evalScope, result) + if (!scheduleAll(currentLogIndent, downstreamSet, evalScope)) { evalScope.scheduleDeactivation(this@MuxDeferredNode) } } @@ -80,26 +90,26 @@ internal class MuxDeferredNode<W, K, V>( } } - override suspend fun getPushEvent(evalScope: EvalScope): MuxResult<W, K, V> = - evalScope.getCurrentValue(key = this) + override fun getPushEvent(logIndent: Int, evalScope: EvalScope): MuxResult<W, K, V> = + logDuration(logIndent, "MuxDeferred.getPushEvent") { + transactionCache.getCurrentValue(evalScope).also { + if (name != null) { + logLn("[${this@MuxDeferredNode}] getPushEvent = $it") + } + } + } - private suspend fun compactIfNeeded(evalScope: EvalScope) { + private fun compactIfNeeded(evalScope: EvalScope) { depthTracker.propagateChanges(evalScope.compactor, this) } - override suspend fun doDeactivate() { + override fun doDeactivate() { // Update lifecycle - lifecycle.mutex.withLock { - if (lifecycle.lifecycleState !is MuxLifecycleState.Active) return@doDeactivate - lifecycle.lifecycleState = MuxLifecycleState.Inactive(spec) - } + if (lifecycle.lifecycleState !is MuxLifecycleState.Active) return@doDeactivate + lifecycle.lifecycleState = MuxLifecycleState.Inactive(spec) // Process branch nodes - coroutineScope { - switchedIn.forEach { (_, branchNode) -> - branchNode.upstream.let { - launch { it.removeDownstreamAndDeactivateIfNeeded(branchNode.schedulable) } - } - } + switchedIn.forEach { (_, branchNode) -> + branchNode.upstream.removeDownstreamAndDeactivateIfNeeded(branchNode.schedulable) } // Process patch node patches?.removeDownstreamAndDeactivateIfNeeded(schedulable) @@ -108,12 +118,15 @@ internal class MuxDeferredNode<W, K, V>( // MOVE phase // - concurrent moves may be occurring, but no more evals. all depth recalculations are // deferred to the end of this phase. - suspend fun performMove(evalScope: EvalScope) { + fun performMove(logIndent: Int, evalScope: EvalScope) { + if (name != null) { + logLn(logIndent, "[${this@MuxDeferredNode}] performMove (patchData = $patchData)") + } + val patch = patchData ?: return patchData = null - // TODO: this logic is very similar to what's in MuxPromptMoving, maybe turn into an inline - // fun? + // TODO: this logic is very similar to what's in MuxPrompt, maybe turn into an inline fun? // We have a patch, process additions/updates and removals val adds = mutableListOf<Pair<K, TFlowImpl<V>>>() @@ -127,131 +140,112 @@ internal class MuxDeferredNode<W, K, V>( val severed = mutableListOf<NodeConnection<*>>() - coroutineScope { - // remove and sever - removes.forEach { k -> - switchedIn.remove(k)?.let { branchNode: BranchNode -> - val conn = branchNode.upstream - severed.add(conn) - launch { conn.removeDownstream(downstream = branchNode.schedulable) } - depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth) - } + // remove and sever + removes.forEach { k -> + switchedIn.remove(k)?.let { branchNode: BranchNode -> + val conn = branchNode.upstream + severed.add(conn) + conn.removeDownstream(downstream = branchNode.schedulable) + depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth) } + } - // add or replace - adds - .mapParallel { (k, newUpstream: TFlowImpl<V>) -> - val branchNode = BranchNode(k) - k to - newUpstream.activate(evalScope, branchNode.schedulable)?.let { (conn, _) -> - branchNode.apply { upstream = conn } - } - } - .forEach { (k, newBranch: BranchNode?) -> - // remove old and sever, if present - switchedIn.remove(k)?.let { branchNode -> - val conn = branchNode.upstream - severed.add(conn) - launch { conn.removeDownstream(downstream = branchNode.schedulable) } - depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth) - } + // add or replace + adds.forEach { (k, newUpstream: TFlowImpl<V>) -> + // remove old and sever, if present + switchedIn.remove(k)?.let { branchNode -> + val conn = branchNode.upstream + severed.add(conn) + conn.removeDownstream(downstream = branchNode.schedulable) + depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth) + } - // add new - newBranch?.let { - switchedIn[k] = newBranch - val branchDepthTracker = newBranch.upstream.depthTracker - if (branchDepthTracker.snapshotIsDirect) { - depthTracker.addDirectUpstream( - oldDepth = null, - newDepth = branchDepthTracker.snapshotDirectDepth, - ) - } else { - depthTracker.addIndirectUpstream( - oldDepth = null, - newDepth = branchDepthTracker.snapshotIndirectDepth, - ) - depthTracker.updateIndirectRoots( - additions = branchDepthTracker.snapshotIndirectRoots, - butNot = this@MuxDeferredNode, - ) - } - } + // add new + val newBranch = BranchNode(k) + newUpstream.activate(evalScope, newBranch.schedulable)?.let { (conn, _) -> + newBranch.upstream = conn + switchedIn[k] = newBranch + val branchDepthTracker = newBranch.upstream.depthTracker + if (branchDepthTracker.snapshotIsDirect) { + depthTracker.addDirectUpstream( + oldDepth = null, + newDepth = branchDepthTracker.snapshotDirectDepth, + ) + } else { + depthTracker.addIndirectUpstream( + oldDepth = null, + newDepth = branchDepthTracker.snapshotIndirectDepth, + ) + depthTracker.updateIndirectRoots( + additions = branchDepthTracker.snapshotIndirectRoots, + butNot = this@MuxDeferredNode, + ) } + } } - coroutineScope { - for (severedNode in severed) { - launch { severedNode.scheduleDeactivationIfNeeded(evalScope) } - } + for (severedNode in severed) { + severedNode.scheduleDeactivationIfNeeded(evalScope) } compactIfNeeded(evalScope) } - suspend fun removeDirectPatchNode(scheduler: Scheduler) { - mutex.withLock { - if ( - depthTracker.removeIndirectUpstream(depth = 0) or - depthTracker.setIsIndirectRoot(false) - ) { - depthTracker.schedule(scheduler, this) - } - patches = null + fun removeDirectPatchNode(scheduler: Scheduler) { + if ( + depthTracker.removeIndirectUpstream(depth = 0) or depthTracker.setIsIndirectRoot(false) + ) { + depthTracker.schedule(scheduler, this) } + patches = null } - suspend fun removeIndirectPatchNode( + fun removeIndirectPatchNode( scheduler: Scheduler, depth: Int, indirectSet: Set<MuxDeferredNode<*, *, *>>, ) { // indirectly connected patches forward the indirectSet - mutex.withLock { - if ( - depthTracker.updateIndirectRoots(removals = indirectSet) or - depthTracker.removeIndirectUpstream(depth) - ) { - depthTracker.schedule(scheduler, this) - } - patches = null + if ( + depthTracker.updateIndirectRoots(removals = indirectSet) or + depthTracker.removeIndirectUpstream(depth) + ) { + depthTracker.schedule(scheduler, this) } + patches = null } - suspend fun moveIndirectPatchNodeToDirect( + fun moveIndirectPatchNodeToDirect( scheduler: Scheduler, oldIndirectDepth: Int, oldIndirectSet: Set<MuxDeferredNode<*, *, *>>, ) { // directly connected patches are stored as an indirect singleton set of the patchNode - mutex.withLock { - if ( - depthTracker.updateIndirectRoots(removals = oldIndirectSet) or - depthTracker.removeIndirectUpstream(oldIndirectDepth) or - depthTracker.setIsIndirectRoot(true) - ) { - depthTracker.schedule(scheduler, this) - } + if ( + depthTracker.updateIndirectRoots(removals = oldIndirectSet) or + depthTracker.removeIndirectUpstream(oldIndirectDepth) or + depthTracker.setIsIndirectRoot(true) + ) { + depthTracker.schedule(scheduler, this) } } - suspend fun moveDirectPatchNodeToIndirect( + fun moveDirectPatchNodeToIndirect( scheduler: Scheduler, newIndirectDepth: Int, newIndirectSet: Set<MuxDeferredNode<*, *, *>>, ) { // indirectly connected patches forward the indirectSet - mutex.withLock { - if ( - depthTracker.setIsIndirectRoot(false) or - depthTracker.updateIndirectRoots(additions = newIndirectSet, butNot = this) or - depthTracker.addIndirectUpstream(oldDepth = null, newDepth = newIndirectDepth) - ) { - depthTracker.schedule(scheduler, this) - } + if ( + depthTracker.setIsIndirectRoot(false) or + depthTracker.updateIndirectRoots(additions = newIndirectSet, butNot = this) or + depthTracker.addIndirectUpstream(oldDepth = null, newDepth = newIndirectDepth) + ) { + depthTracker.schedule(scheduler, this) } } - suspend fun adjustIndirectPatchNode( + fun adjustIndirectPatchNode( scheduler: Scheduler, oldDepth: Int, newDepth: Int, @@ -259,65 +253,73 @@ internal class MuxDeferredNode<W, K, V>( additions: Set<MuxDeferredNode<*, *, *>>, ) { // indirectly connected patches forward the indirectSet - mutex.withLock { - if ( - depthTracker.updateIndirectRoots( - additions = additions, - removals = removals, - butNot = this, - ) or depthTracker.addIndirectUpstream(oldDepth = oldDepth, newDepth = newDepth) - ) { - depthTracker.schedule(scheduler, this) - } + if ( + depthTracker.updateIndirectRoots( + additions = additions, + removals = removals, + butNot = this, + ) or depthTracker.addIndirectUpstream(oldDepth = oldDepth, newDepth = newDepth) + ) { + depthTracker.schedule(scheduler, this) } } - suspend fun scheduleMover(evalScope: EvalScope) { - patchData = - checkNotNull(patches) { "mux mover scheduled with unset patches upstream node" } - .getPushEvent(evalScope) - evalScope.scheduleMuxMover(this) + fun scheduleMover(logIndent: Int, evalScope: EvalScope) { + logDuration(logIndent, "MuxDeferred.scheduleMover") { + patchData = + checkNotNull(patches) { "mux mover scheduled with unset patches upstream node" } + .getPushEvent(currentLogIndent, evalScope) + evalScope.scheduleMuxMover(this@MuxDeferredNode) + } } - override fun toString(): String = "${this::class.simpleName}@$hashString" + override fun toString(): String = + "${this::class.simpleName}@$hashString${name?.let { "[$it]" }.orEmpty()}" } internal inline fun <A> switchDeferredImplSingle( - crossinline getStorage: suspend EvalScope.() -> TFlowImpl<A>, - crossinline getPatches: suspend EvalScope.() -> TFlowImpl<TFlowImpl<A>>, -): TFlowImpl<A> = - mapImpl({ + name: String? = null, + crossinline getStorage: EvalScope.() -> TFlowImpl<A>, + crossinline getPatches: EvalScope.() -> TFlowImpl<TFlowImpl<A>>, +): TFlowImpl<A> { + val patches = mapImpl(getPatches) { newFlow, _ -> singleOf(just(newFlow)).asIterable() } + val switchDeferredImpl = switchDeferredImpl( + name = name, getStorage = { singleOf(getStorage()).asIterable() }, - getPatches = { - mapImpl(getPatches) { newFlow -> singleOf(just(newFlow)).asIterable() } - }, + getPatches = { patches }, storeFactory = SingletonMapK.Factory(), ) - }) { map -> - map.getValue(Unit).getPushEvent(this) + return mapImpl({ switchDeferredImpl }) { map, logIndent -> + map.asSingle().getValue(Unit).getPushEvent(logIndent, this).also { + if (name != null) { + logLn(logIndent, "[$name] extracting single mux: $it") + } + } } +} internal fun <W, K, V> switchDeferredImpl( - getStorage: suspend EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>, - getPatches: suspend EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>, + name: String? = null, + getStorage: EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>, + getPatches: EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>, storeFactory: MutableMapK.Factory<W, K>, ): TFlowImpl<MuxResult<W, K, V>> = - MuxLifecycle(MuxDeferredActivator(getStorage, storeFactory, getPatches)) + MuxLifecycle(MuxDeferredActivator(name, getStorage, storeFactory, getPatches)) private class MuxDeferredActivator<W, K, V>( - private val getStorage: suspend EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>, + private val name: String?, + private val getStorage: EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>, private val storeFactory: MutableMapK.Factory<W, K>, - private val getPatches: - suspend EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>, -) : MuxActivator<MuxResult<W, K, V>> { - override suspend fun activate( + private val getPatches: EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>, +) : MuxActivator<W, K, V> { + override fun activate( evalScope: EvalScope, - lifecycle: MuxLifecycle<MuxResult<W, K, V>>, - ): MuxNode<W, *, *, MuxResult<W, K, V>>? { + lifecycle: MuxLifecycle<W, K, V>, + ): Pair<MuxNode<W, K, V>, (() -> Unit)?>? { // Initialize mux node and switched-in connections. val muxNode = - MuxDeferredNode(lifecycle, this, storeFactory).apply { + MuxDeferredNode(name, lifecycle, this, storeFactory).apply { initializeUpstream(evalScope, getStorage, storeFactory) // Update depth based on all initial switched-in nodes. initializeDepth() @@ -327,29 +329,34 @@ private class MuxDeferredActivator<W, K, V>( depthTracker.setIsIndirectRoot(true) depthTracker.reset() } - // Setup patches connection; deferring allows for a recursive connection, where - // muxNode is downstream of itself via patches. - var isIndirect = true - evalScope.deferAction { - val (patchesConn, needsEval) = - getPatches(evalScope).activate(evalScope, downstream = muxNode.schedulable) - ?: run { - isIndirect = false - // Turns out we can't connect to patches, so update our depth and - // propagate - muxNode.mutex.withLock { + + // Schedule for evaluation if any switched-in nodes have already emitted within + // this transaction. + if (muxNode.upstreamData.isNotEmpty()) { + muxNode.schedule(evalScope) + } + + return muxNode to + fun() { + // Setup patches connection; deferring allows for a recursive connection, where + // muxNode is downstream of itself via patches. + val (patchesConn, needsEval) = + getPatches(evalScope).activate(evalScope, downstream = muxNode.schedulable) + ?: run { + // Turns out we can't connect to patches, so update our depth and + // propagate if (muxNode.depthTracker.setIsIndirectRoot(false)) { + // TODO: schedules might not be necessary now that we're not + // parallel? muxNode.depthTracker.schedule(evalScope.scheduler, muxNode) } + return } - return@deferAction - } - muxNode.patches = patchesConn + muxNode.patches = patchesConn - if (!patchesConn.schedulerUpstream.depthTracker.snapshotIsDirect) { - // Turns out patches is indirect, so we are not a root. Update depth and - // propagate. - muxNode.mutex.withLock { + if (!patchesConn.schedulerUpstream.depthTracker.snapshotIsDirect) { + // Turns out patches is indirect, so we are not a root. Update depth and + // propagate. if ( muxNode.depthTracker.setIsIndirectRoot(false) or muxNode.depthTracker.addIndirectUpstream( @@ -363,63 +370,63 @@ private class MuxDeferredActivator<W, K, V>( muxNode.depthTracker.schedule(evalScope.scheduler, muxNode) } } + // Schedule mover to process patch emission at the end of this transaction, if + // needed. + if (needsEval) { + muxNode.patchData = patchesConn.getPushEvent(0, evalScope) + evalScope.scheduleMuxMover(muxNode) + } } - // Schedule mover to process patch emission at the end of this transaction, if - // needed. - if (needsEval) { - muxNode.patchData = patchesConn.getPushEvent(evalScope) - evalScope.scheduleMuxMover(muxNode) - } - } - - // Schedule for evaluation if any switched-in nodes have already emitted within - // this transaction. - if (muxNode.upstreamData.isNotEmpty()) { - muxNode.schedule(evalScope) - } - return muxNode.takeUnless { muxNode.switchedIn.isEmpty() && !isIndirect } } } internal inline fun <A> mergeNodes( - crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>, - crossinline getOther: suspend EvalScope.() -> TFlowImpl<A>, - crossinline f: suspend EvalScope.(A, A) -> A, + crossinline getPulse: EvalScope.() -> TFlowImpl<A>, + crossinline getOther: EvalScope.() -> TFlowImpl<A>, + name: String? = null, + crossinline f: EvalScope.(A, A) -> A, ): TFlowImpl<A> { + val mergedThese = mergeNodes(name, getPulse, getOther) val merged = - mapImpl({ mergeNodes(getPulse, getOther) }) { these -> - these.merge { thiz, that -> f(thiz, that) } - } + mapImpl({ mergedThese }) { these, _ -> these.merge { thiz, that -> f(thiz, that) } } return merged.cached() } -internal fun <T> Iterable<T>.asIterableWithIndex(): Iterable<StoreEntry<Int, T>> = +internal fun <T> Iterable<T>.asIterableWithIndex(): Iterable<Map.Entry<Int, T>> = asSequence().mapIndexed { i, t -> StoreEntry(i, t) }.asIterable() internal inline fun <A, B> mergeNodes( - crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>, - crossinline getOther: suspend EvalScope.() -> TFlowImpl<B>, + name: String? = null, + crossinline getPulse: EvalScope.() -> TFlowImpl<A>, + crossinline getOther: EvalScope.() -> TFlowImpl<B>, ): TFlowImpl<These<A, B>> { val storage = - listOf(mapImpl(getPulse) { These.thiz<A, B>(it) }, mapImpl(getOther) { These.that(it) }) + listOf( + mapImpl(getPulse) { it, _ -> These.thiz<A, B>(it) }, + mapImpl(getOther) { it, _ -> These.that(it) }, + ) .asIterableWithIndex() val switchNode = switchDeferredImpl( + name = name, getStorage = { storage }, getPatches = { neverImpl }, storeFactory = MutableArrayMapK.Factory(), ) val merged = - mapImpl({ switchNode }) { mergeResults -> - val first = mergeResults.getMaybe(0).flatMap { it.getPushEvent(this).maybeThis() } - val second = mergeResults.getMaybe(1).flatMap { it.getPushEvent(this).maybeThat() } + mapImpl({ switchNode }) { it, logIndent -> + val mergeResults = it.asArrayHolder() + val first = + mergeResults.getMaybe(0).flatMap { it.getPushEvent(logIndent, this).maybeThis() } + val second = + mergeResults.getMaybe(1).flatMap { it.getPushEvent(logIndent, this).maybeThat() } these(first, second).orError { "unexpected missing merge result" } } return merged.cached() } internal inline fun <A> mergeNodes( - crossinline getPulses: suspend EvalScope.() -> Iterable<TFlowImpl<A>> + crossinline getPulses: EvalScope.() -> Iterable<TFlowImpl<A>> ): TFlowImpl<List<A>> { val switchNode = switchDeferredImpl( @@ -428,15 +435,15 @@ internal inline fun <A> mergeNodes( storeFactory = MutableArrayMapK.Factory(), ) val merged = - mapImpl({ switchNode }) { + mapImpl({ switchNode }) { it, logIndent -> val mergeResults = it.asArrayHolder() - mergeResults.map { (_, node) -> node.getPushEvent(this) } + mergeResults.map { (_, node) -> node.getPushEvent(logIndent, this) } } return merged.cached() } internal inline fun <A> mergeNodesLeft( - crossinline getPulses: suspend EvalScope.() -> Iterable<TFlowImpl<A>> + crossinline getPulses: EvalScope.() -> Iterable<TFlowImpl<A>> ): TFlowImpl<A> { val switchNode = switchDeferredImpl( @@ -445,6 +452,9 @@ internal inline fun <A> mergeNodesLeft( storeFactory = MutableArrayMapK.Factory(), ) val merged = - mapImpl({ switchNode }) { mergeResults -> mergeResults.values.first().getPushEvent(this) } + mapImpl({ switchNode }) { it, logIndent -> + val mergeResults = it.asArrayHolder() + mergeResults.values.first().getPushEvent(logIndent, this) + } return merged.cached() } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt index 839c5a64272a..1c9af24a392f 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt @@ -18,207 +18,180 @@ package com.android.systemui.kairos.internal import com.android.systemui.kairos.internal.store.MutableMapK import com.android.systemui.kairos.internal.store.SingletonMapK +import com.android.systemui.kairos.internal.store.asSingle import com.android.systemui.kairos.internal.store.singleOf -import com.android.systemui.kairos.internal.util.Key -import com.android.systemui.kairos.internal.util.launchImmediate -import com.android.systemui.kairos.internal.util.mapParallel +import com.android.systemui.kairos.internal.util.LogIndent +import com.android.systemui.kairos.internal.util.hashString +import com.android.systemui.kairos.internal.util.logDuration import com.android.systemui.kairos.util.Just import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.None import com.android.systemui.kairos.util.just -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch -import kotlinx.coroutines.sync.withLock -private typealias MuxPromptMovingResult<W, K, V> = Pair<MuxResult<W, K, V>, MuxResult<W, K, V>?> - -internal class MuxPromptMovingNode<W, K, V>( - lifecycle: MuxLifecycle<MuxPromptMovingResult<W, K, V>>, - private val spec: MuxActivator<MuxPromptMovingResult<W, K, V>>, +internal class MuxPromptNode<W, K, V>( + val name: String?, + lifecycle: MuxLifecycle<W, K, V>, + private val spec: MuxActivator<W, K, V>, factory: MutableMapK.Factory<W, K>, -) : - MuxNode<W, K, V, MuxPromptMovingResult<W, K, V>>(lifecycle, factory), - Key<MuxPromptMovingResult<W, K, V>> { - - @Volatile var patchData: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = null - @Volatile var patches: PatchNode? = null +) : MuxNode<W, K, V>(lifecycle, factory) { - @Volatile private var reEval: MuxPromptMovingResult<W, K, V>? = null + var patchData: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = null + var patches: PatchNode? = null - override suspend fun visit(evalScope: EvalScope) { - val preSwitchNotEmpty = upstreamData.isNotEmpty() - val preSwitchResults: MuxResult<W, K, V> = upstreamData.readOnlyCopy() - upstreamData.clear() + override fun visit(logIndent: Int, evalScope: EvalScope) { + check(epoch < evalScope.epoch) { "node unexpectedly visited multiple times in transaction" } + logDuration(logIndent, "MuxPrompt.visit") { + val patch: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = patchData + patchData = null - val patch: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = patchData - patchData = null - - val (reschedule, evalResult) = - reEval?.let { false to it } - ?: if (preSwitchNotEmpty || patch != null) { - doEval(preSwitchNotEmpty, preSwitchResults, patch, evalScope) - } else { - false to null + // If there's a patch, process it. + patch?.let { + val needsReschedule = processPatch(patch, evalScope) + // We may need to reschedule if newly-switched-in nodes have not yet been + // visited within this transaction. + val depthIncreased = depthTracker.dirty_depthIncreased() + if (needsReschedule || depthIncreased) { + if (depthIncreased) { + depthTracker.schedule(evalScope.compactor, this@MuxPromptNode) + } + if (name != null) { + logLn( + "[${this@MuxPromptNode}] rescheduling (reschedule=$needsReschedule, depthIncrease=$depthIncreased)" + ) + } + schedule(evalScope) + return } - reEval = null - - if (reschedule || depthTracker.dirty_depthIncreased()) { - reEval = evalResult - // Can't schedule downstream yet, need to compact first - if (depthTracker.dirty_depthIncreased()) { - depthTracker.schedule(evalScope.compactor, node = this) } - schedule(evalScope) - } else { + val results = upstreamData.readOnlyCopy().also { upstreamData.clear() } + + // If we don't need to reschedule, or there wasn't a patch at all, then we proceed + // with merging pre-switch and post-switch results + val hasResult = results.isNotEmpty() val compactDownstream = depthTracker.isDirty() - if (evalResult != null || compactDownstream) { - coroutineScope { - mutex.withLock { - if (compactDownstream) { - adjustDownstreamDepths(evalScope, coroutineScope = this) - } - if (evalResult != null) { - epoch = evalScope.epoch - evalScope.setResult(this@MuxPromptMovingNode, evalResult) - if (!scheduleAll(downstreamSet, evalScope)) { - evalScope.scheduleDeactivation(this@MuxPromptMovingNode) - } - } + if (hasResult || compactDownstream) { + if (compactDownstream) { + adjustDownstreamDepths(evalScope) + } + if (hasResult) { + transactionCache.put(evalScope, results) + if (!scheduleAll(currentLogIndent, downstreamSet, evalScope)) { + evalScope.scheduleDeactivation(this@MuxPromptNode) } } } } } - private suspend fun doEval( - preSwitchNotEmpty: Boolean, - preSwitchResults: MuxResult<W, K, V>, - patch: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>?, + // side-effect: this will populate `upstreamData` with any immediately available results + private fun LogIndent.processPatch( + patch: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>, evalScope: EvalScope, - ): Pair<Boolean, MuxPromptMovingResult<W, K, V>?> { - val newlySwitchedIn: MuxResult<W, K, V>? = - patch?.let { - // We have a patch, process additions/updates and removals - val adds = mutableListOf<Pair<K, TFlowImpl<V>>>() - val removes = mutableListOf<K>() - patch.forEach { (k, newUpstream) -> - when (newUpstream) { - is Just -> adds.add(k to newUpstream.value) - None -> removes.add(k) - } - } + ): Boolean { + var needsReschedule = false + // We have a patch, process additions/updates and removals + val adds = mutableListOf<Pair<K, TFlowImpl<V>>>() + val removes = mutableListOf<K>() + patch.forEach { (k, newUpstream) -> + when (newUpstream) { + is Just -> adds.add(k to newUpstream.value) + None -> removes.add(k) + } + } - val additionsAndUpdates = mutableListOf<Pair<K, PullNode<V>>>() - val severed = mutableListOf<NodeConnection<*>>() - - coroutineScope { - // remove and sever - removes.forEach { k -> - switchedIn.remove(k)?.let { branchNode: BranchNode -> - val conn: NodeConnection<V> = branchNode.upstream - severed.add(conn) - launchImmediate { - conn.removeDownstream(downstream = branchNode.schedulable) - } - depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth) - } - } + val severed = mutableListOf<NodeConnection<*>>() - // add or replace - adds - .mapParallel { (k, newUpstream: TFlowImpl<V>) -> - val branchNode = BranchNode(k) - k to - newUpstream.activate(evalScope, branchNode.schedulable)?.let { - (conn, _) -> - branchNode.apply { upstream = conn } - } - } - .forEach { (k, newBranch: BranchNode?) -> - // remove old and sever, if present - switchedIn.remove(k)?.let { oldBranch: BranchNode -> - val conn: NodeConnection<V> = oldBranch.upstream - severed.add(conn) - launchImmediate { - conn.removeDownstream(downstream = oldBranch.schedulable) - } - depthTracker.removeDirectUpstream( - conn.depthTracker.snapshotDirectDepth - ) - } - - // add new - newBranch?.let { - switchedIn[k] = newBranch - additionsAndUpdates.add(k to newBranch.upstream.directUpstream) - val branchDepthTracker = newBranch.upstream.depthTracker - if (branchDepthTracker.snapshotIsDirect) { - depthTracker.addDirectUpstream( - oldDepth = null, - newDepth = branchDepthTracker.snapshotDirectDepth, - ) - } else { - depthTracker.addIndirectUpstream( - oldDepth = null, - newDepth = branchDepthTracker.snapshotIndirectDepth, - ) - depthTracker.updateIndirectRoots( - additions = branchDepthTracker.snapshotIndirectRoots, - butNot = null, - ) - } - } - } + // remove and sever + removes.forEach { k -> + switchedIn.remove(k)?.let { branchNode: BranchNode -> + if (name != null) { + logLn("[${this@MuxPromptNode}] removing $k") } + val conn: NodeConnection<V> = branchNode.upstream + severed.add(conn) + conn.removeDownstream(downstream = branchNode.schedulable) + depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth) + } + } - coroutineScope { - for (severedNode in severed) { - launch { severedNode.scheduleDeactivationIfNeeded(evalScope) } - } + // add or replace + adds.forEach { (k, newUpstream: TFlowImpl<V>) -> + // remove old and sever, if present + switchedIn.remove(k)?.let { oldBranch: BranchNode -> + if (name != null) { + logLn("[${this@MuxPromptNode}] replacing $k") } + val conn: NodeConnection<V> = oldBranch.upstream + severed.add(conn) + conn.removeDownstream(downstream = oldBranch.schedulable) + depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth) + } - val resultStore = storeFactory.create<PullNode<V>>(additionsAndUpdates.size) - for ((k, node) in additionsAndUpdates) { - resultStore[k] = node + // add new + val newBranch = BranchNode(k) + newUpstream.activate(evalScope, newBranch.schedulable)?.let { (conn, needsEval) -> + newBranch.upstream = conn + if (name != null) { + logLn("[${this@MuxPromptNode}] switching in $k") + } + switchedIn[k] = newBranch + if (needsEval) { + upstreamData[k] = newBranch.upstream.directUpstream + } else { + needsReschedule = true + } + val branchDepthTracker = newBranch.upstream.depthTracker + if (branchDepthTracker.snapshotIsDirect) { + depthTracker.addDirectUpstream( + oldDepth = null, + newDepth = branchDepthTracker.snapshotDirectDepth, + ) + } else { + depthTracker.addIndirectUpstream( + oldDepth = null, + newDepth = branchDepthTracker.snapshotIndirectDepth, + ) + depthTracker.updateIndirectRoots( + additions = branchDepthTracker.snapshotIndirectRoots, + butNot = null, + ) } - resultStore.takeIf { it.isNotEmpty() }?.asReadOnly() } + } - return if (preSwitchNotEmpty || newlySwitchedIn != null) { - (newlySwitchedIn != null) to (preSwitchResults to newlySwitchedIn) - } else { - false to null + for (severedNode in severed) { + severedNode.scheduleDeactivationIfNeeded(evalScope) } + + return needsReschedule } - private fun adjustDownstreamDepths(evalScope: EvalScope, coroutineScope: CoroutineScope) { + private fun adjustDownstreamDepths(evalScope: EvalScope) { if (depthTracker.dirty_depthIncreased()) { // schedule downstream nodes on the compaction scheduler; this scheduler is drained at // the end of this eval depth, so that all depth increases are applied before we advance // the eval step - depthTracker.schedule(evalScope.compactor, node = this@MuxPromptMovingNode) + depthTracker.schedule(evalScope.compactor, node = this@MuxPromptNode) } else if (depthTracker.isDirty()) { // schedule downstream nodes on the eval scheduler; this is more efficient and is only // safe if the depth hasn't increased depthTracker.applyChanges( - coroutineScope, evalScope.scheduler, downstreamSet, - muxNode = this@MuxPromptMovingNode, + muxNode = this@MuxPromptNode, ) } } - override suspend fun getPushEvent(evalScope: EvalScope): MuxPromptMovingResult<W, K, V> = - evalScope.getCurrentValue(key = this) + override fun getPushEvent(logIndent: Int, evalScope: EvalScope): MuxResult<W, K, V> = + logDuration(logIndent, "MuxPrompt.getPushEvent") { + transactionCache.getCurrentValue(evalScope) + } - override suspend fun doDeactivate() { + override fun doDeactivate() { // Update lifecycle - lifecycle.mutex.withLock { - if (lifecycle.lifecycleState !is MuxLifecycleState.Active) return@doDeactivate - lifecycle.lifecycleState = MuxLifecycleState.Inactive(spec) - } + if (lifecycle.lifecycleState !is MuxLifecycleState.Active) return + lifecycle.lifecycleState = MuxLifecycleState.Inactive(spec) // Process branch nodes switchedIn.forEach { (_, branchNode) -> branchNode.upstream.removeDownstreamAndDeactivateIfNeeded( @@ -231,57 +204,54 @@ internal class MuxPromptMovingNode<W, K, V>( } } - suspend fun removeIndirectPatchNode( + fun removeIndirectPatchNode( scheduler: Scheduler, oldDepth: Int, indirectSet: Set<MuxDeferredNode<*, *, *>>, ) { - mutex.withLock { - patches = null - if ( - depthTracker.removeIndirectUpstream(oldDepth) or - depthTracker.updateIndirectRoots(removals = indirectSet) - ) { - depthTracker.schedule(scheduler, this) - } + patches = null + if ( + depthTracker.removeIndirectUpstream(oldDepth) or + depthTracker.updateIndirectRoots(removals = indirectSet) + ) { + depthTracker.schedule(scheduler, this) } } - suspend fun removeDirectPatchNode(scheduler: Scheduler, depth: Int) { - mutex.withLock { - patches = null - if (depthTracker.removeDirectUpstream(depth)) { - depthTracker.schedule(scheduler, this) - } + fun removeDirectPatchNode(scheduler: Scheduler, depth: Int) { + patches = null + if (depthTracker.removeDirectUpstream(depth)) { + depthTracker.schedule(scheduler, this) } } + override fun toString(): String = + "${this::class.simpleName}@$hashString${name?.let { "[$it]" }.orEmpty()}" + inner class PatchNode : SchedulableNode { val schedulable = Schedulable.N(this) lateinit var upstream: NodeConnection<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>> - override suspend fun schedule(evalScope: EvalScope) { - patchData = upstream.getPushEvent(evalScope) - this@MuxPromptMovingNode.schedule(evalScope) + override fun schedule(logIndent: Int, evalScope: EvalScope) { + logDuration(logIndent, "MuxPromptPatchNode.schedule") { + patchData = upstream.getPushEvent(currentLogIndent, evalScope) + this@MuxPromptNode.schedule(evalScope) + } } - override suspend fun adjustDirectUpstream( - scheduler: Scheduler, - oldDepth: Int, - newDepth: Int, - ) { - this@MuxPromptMovingNode.adjustDirectUpstream(scheduler, oldDepth, newDepth) + override fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) { + this@MuxPromptNode.adjustDirectUpstream(scheduler, oldDepth, newDepth) } - override suspend fun moveIndirectUpstreamToDirect( + override fun moveIndirectUpstreamToDirect( scheduler: Scheduler, oldIndirectDepth: Int, oldIndirectSet: Set<MuxDeferredNode<*, *, *>>, newDirectDepth: Int, ) { - this@MuxPromptMovingNode.moveIndirectUpstreamToDirect( + this@MuxPromptNode.moveIndirectUpstreamToDirect( scheduler, oldIndirectDepth, oldIndirectSet, @@ -289,14 +259,14 @@ internal class MuxPromptMovingNode<W, K, V>( ) } - override suspend fun adjustIndirectUpstream( + override fun adjustIndirectUpstream( scheduler: Scheduler, oldDepth: Int, newDepth: Int, removals: Set<MuxDeferredNode<*, *, *>>, additions: Set<MuxDeferredNode<*, *, *>>, ) { - this@MuxPromptMovingNode.adjustIndirectUpstream( + this@MuxPromptNode.adjustIndirectUpstream( scheduler, oldDepth, newDepth, @@ -305,13 +275,13 @@ internal class MuxPromptMovingNode<W, K, V>( ) } - override suspend fun moveDirectUpstreamToIndirect( + override fun moveDirectUpstreamToIndirect( scheduler: Scheduler, oldDirectDepth: Int, newIndirectDepth: Int, newIndirectSet: Set<MuxDeferredNode<*, *, *>>, ) { - this@MuxPromptMovingNode.moveDirectUpstreamToIndirect( + this@MuxPromptNode.moveDirectUpstreamToIndirect( scheduler, oldDirectDepth, newIndirectDepth, @@ -319,98 +289,70 @@ internal class MuxPromptMovingNode<W, K, V>( ) } - override suspend fun removeDirectUpstream(scheduler: Scheduler, depth: Int) { - this@MuxPromptMovingNode.removeDirectPatchNode(scheduler, depth) + override fun removeDirectUpstream(scheduler: Scheduler, depth: Int) { + this@MuxPromptNode.removeDirectPatchNode(scheduler, depth) } - override suspend fun removeIndirectUpstream( + override fun removeIndirectUpstream( scheduler: Scheduler, depth: Int, indirectSet: Set<MuxDeferredNode<*, *, *>>, ) { - this@MuxPromptMovingNode.removeIndirectPatchNode(scheduler, depth, indirectSet) + this@MuxPromptNode.removeIndirectPatchNode(scheduler, depth, indirectSet) } } } -internal class MuxPromptEvalNode<W, K, V>( - private val movingNode: PullNode<MuxPromptMovingResult<W, K, V>>, - private val factory: MutableMapK.Factory<W, K>, -) : PullNode<MuxResult<W, K, V>> { - override suspend fun getPushEvent(evalScope: EvalScope): MuxResult<W, K, V> = - movingNode.getPushEvent(evalScope).let { (preSwitchResults, newlySwitchedIn) -> - newlySwitchedIn?.let { - factory - .create(preSwitchResults) - .also { store -> - newlySwitchedIn.forEach { k, pullNode -> store[k] = pullNode } - } - .asReadOnly() - } ?: preSwitchResults - } -} - internal inline fun <A> switchPromptImplSingle( - crossinline getStorage: suspend EvalScope.() -> TFlowImpl<A>, - crossinline getPatches: suspend EvalScope.() -> TFlowImpl<TFlowImpl<A>>, -): TFlowImpl<A> = - mapImpl({ + crossinline getStorage: EvalScope.() -> TFlowImpl<A>, + crossinline getPatches: EvalScope.() -> TFlowImpl<TFlowImpl<A>>, +): TFlowImpl<A> { + val switchPromptImpl = switchPromptImpl( getStorage = { singleOf(getStorage()).asIterable() }, getPatches = { - mapImpl(getPatches) { newFlow -> singleOf(just(newFlow)).asIterable() } + mapImpl(getPatches) { newFlow, _ -> singleOf(just(newFlow)).asIterable() } }, storeFactory = SingletonMapK.Factory(), ) - }) { map -> - map.getValue(Unit).getPushEvent(this) + return mapImpl({ switchPromptImpl }) { map, logIndent -> + map.asSingle().getValue(Unit).getPushEvent(logIndent, this) } +} internal fun <W, K, V> switchPromptImpl( - getStorage: suspend EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>, - getPatches: suspend EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>, + name: String? = null, + getStorage: EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>, + getPatches: EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>, storeFactory: MutableMapK.Factory<W, K>, -): TFlowImpl<MuxResult<W, K, V>> { - val moving = MuxLifecycle(MuxPromptActivator(getStorage, storeFactory, getPatches)) - val eval = TFlowCheap { downstream -> - moving.activate(evalScope = this, downstream)?.let { (connection, needsEval) -> - val evalNode = MuxPromptEvalNode(connection.directUpstream, storeFactory) - ActivationResult( - connection = NodeConnection(evalNode, connection.schedulerUpstream), - needsEval = needsEval, - ) - } - } - return eval.cached() -} +): TFlowImpl<MuxResult<W, K, V>> = + MuxLifecycle(MuxPromptActivator(name, getStorage, storeFactory, getPatches)) private class MuxPromptActivator<W, K, V>( - private val getStorage: suspend EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>, + private val name: String?, + private val getStorage: EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>, private val storeFactory: MutableMapK.Factory<W, K>, - private val getPatches: - suspend EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>, -) : MuxActivator<MuxPromptMovingResult<W, K, V>> { - override suspend fun activate( + private val getPatches: EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>, +) : MuxActivator<W, K, V> { + override fun activate( evalScope: EvalScope, - lifecycle: MuxLifecycle<MuxPromptMovingResult<W, K, V>>, - ): MuxNode<W, *, *, MuxPromptMovingResult<W, K, V>>? { + lifecycle: MuxLifecycle<W, K, V>, + ): Pair<MuxNode<W, K, V>, (() -> Unit)?>? { // Initialize mux node and switched-in connections. val movingNode = - MuxPromptMovingNode(lifecycle, this, storeFactory).apply { - coroutineScope { - launch { initializeUpstream(evalScope, getStorage, storeFactory) } - // Setup patches connection - val patchNode = PatchNode() - getPatches(evalScope) - .activate(evalScope = evalScope, downstream = patchNode.schedulable) - ?.let { (conn, needsEval) -> - patchNode.upstream = conn - patches = patchNode - if (needsEval) { - patchData = conn.getPushEvent(evalScope) - } + MuxPromptNode(name, lifecycle, this, storeFactory).apply { + initializeUpstream(evalScope, getStorage, storeFactory) + // Setup patches connection + val patchNode = PatchNode() + getPatches(evalScope) + .activate(evalScope = evalScope, downstream = patchNode.schedulable) + ?.let { (conn, needsEval) -> + patchNode.upstream = conn + patches = patchNode + if (needsEval) { + patchData = conn.getPushEvent(0, evalScope) } - } + } // Update depth based on all initial switched-in nodes. initializeDepth() // Update depth based on patches node. @@ -441,6 +383,10 @@ private class MuxPromptActivator<W, K, V>( movingNode.schedule(evalScope) } - return movingNode.takeUnless { it.patches == null && it.switchedIn.isEmpty() } + return if (movingNode.patches == null && movingNode.switchedIn.isEmpty()) { + null + } else { + movingNode to null + } } } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt index 79d4b7a843ac..b90c4c02fa7c 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt @@ -18,15 +18,15 @@ package com.android.systemui.kairos.internal import com.android.systemui.kairos.TState import com.android.systemui.kairos.internal.util.HeteroMap +import com.android.systemui.kairos.internal.util.logDuration +import com.android.systemui.kairos.internal.util.logLn import com.android.systemui.kairos.util.Just import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.none -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ConcurrentLinkedDeque -import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicLong import kotlin.coroutines.ContinuationInterceptor +import kotlin.time.measureTime import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred @@ -52,25 +52,34 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope { override val network get() = this - override val compactor = SchedulerImpl() - override val scheduler = SchedulerImpl() - override val transactionStore = HeteroMap() + override val compactor = SchedulerImpl { + if (it.markedForCompaction) false + else { + it.markedForCompaction = true + true + } + } + override val scheduler = SchedulerImpl { + if (it.markedForEvaluation) false + else { + it.markedForEvaluation = true + true + } + } + override val transactionStore = TransactionStore() - private val stateWrites = ConcurrentLinkedQueue<TStateSource<*>>() - private val outputsByDispatcher = - ConcurrentHashMap<ContinuationInterceptor, ConcurrentLinkedQueue<Output<*>>>() - private val muxMovers = ConcurrentLinkedQueue<MuxDeferredNode<*, *, *>>() - private val deactivations = ConcurrentLinkedDeque<PushNode<*>>() - private val outputDeactivations = ConcurrentLinkedQueue<Output<*>>() + private val stateWrites = ArrayDeque<TStateSource<*>>() + private val outputsByDispatcher = HashMap<ContinuationInterceptor, ArrayDeque<Output<*>>>() + private val muxMovers = ArrayDeque<MuxDeferredNode<*, *, *>>() + private val deactivations = ArrayDeque<PushNode<*>>() + private val outputDeactivations = ArrayDeque<Output<*>>() private val transactionMutex = Mutex() private val inputScheduleChan = Channel<ScheduledAction<*>>() override fun scheduleOutput(output: Output<*>) { val continuationInterceptor = output.context[ContinuationInterceptor] ?: Dispatchers.Unconfined - outputsByDispatcher - .computeIfAbsent(continuationInterceptor) { ConcurrentLinkedQueue() } - .add(output) + outputsByDispatcher.computeIfAbsent(continuationInterceptor) { ArrayDeque() }.add(output) } override fun scheduleMuxMover(muxMover: MuxDeferredNode<*, *, *>) { @@ -101,28 +110,37 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope { actions.add(func) } transactionMutex.withLock { - try { - // Run all actions - evalScope { - for (action in actions) { - launch { action.started(evalScope = this@evalScope) } + val e = epoch + val duration = measureTime { + logLn(0, "===starting transaction $e===") + try { + logDuration(1, "init actions") { + // Run all actions + evalScope { + for (action in actions) { + action.started(evalScope = this@evalScope) + } + } + } + // Step through the network + doTransaction(1) + } catch (e: Exception) { + // Signal failure + while (actions.isNotEmpty()) { + actions.removeLast().fail(e) + } + // re-throw, cancelling this coroutine + throw e + } finally { + logDuration(1, "signal completions") { + // Signal completion + while (actions.isNotEmpty()) { + actions.removeLast().completed() + } } - } - // Step through the network - doTransaction() - } catch (e: Exception) { - // Signal failure - while (actions.isNotEmpty()) { - actions.removeLast().fail(e) - } - // re-throw, cancelling this coroutine - throw e - } finally { - // Signal completion - while (actions.isNotEmpty()) { - actions.removeLast().completed() } } + logLn(0, "===transaction $e took $duration===") } } } @@ -139,35 +157,47 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope { onResult.invokeOnCompletion { job.cancel() } } - suspend fun <R> evalScope(block: suspend EvalScope.() -> R): R = deferScope { + inline fun <R> evalScope(block: EvalScope.() -> R): R = deferScope { block(EvalScopeImpl(this@Network, this)) } /** Performs a transactional update of the FRP network. */ - private suspend fun doTransaction() { + private suspend fun doTransaction(logIndent: Int) { // Traverse network, then run outputs - do { - scheduler.drainEval(this) - } while (evalScope { evalOutputs(this) }) + logDuration(logIndent, "traverse network") { + do { + val numNodes = + logDuration("drainEval") { scheduler.drainEval(currentLogIndent, this@Network) } + logLn("drained $numNodes nodes") + } while (logDuration("evalOutputs") { evalScope { evalOutputs(this) } }) + } // Update states - evalScope { evalStateWriters(this) } + logDuration(logIndent, "update states") { + evalScope { evalStateWriters(currentLogIndent, this) } + } // Invalidate caches // Note: this needs to occur before deferred switches - transactionStore.clear() + logDuration(logIndent, "clear store") { transactionStore.clear() } epoch++ // Perform deferred switches - evalScope { evalMuxMovers(this) } + logDuration(logIndent, "evalMuxMovers") { + evalScope { evalMuxMovers(currentLogIndent, this) } + } // Compact depths - scheduler.drainCompact() - compactor.drainCompact() + logDuration(logIndent, "compact") { + scheduler.drainCompact(currentLogIndent) + compactor.drainCompact(currentLogIndent) + } // Deactivate nodes with no downstream - evalDeactivations() + logDuration(logIndent, "deactivations") { evalDeactivations() } } /** Invokes all [Output]s that have received data within this transaction. */ private suspend fun evalOutputs(evalScope: EvalScope): Boolean { + if (outputsByDispatcher.isEmpty()) { + return false + } // Outputs can enqueue other outputs, so we need two loops - if (outputsByDispatcher.isEmpty()) return false while (outputsByDispatcher.isNotEmpty()) { var launchedAny = false coroutineScope { @@ -176,57 +206,50 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope { launchedAny = true launch(key) { while (outputs.isNotEmpty()) { - val output = outputs.remove() + val output = outputs.removeFirst() launch { output.visit(evalScope) } } } } } } - if (!launchedAny) outputsByDispatcher.clear() + if (!launchedAny) { + outputsByDispatcher.clear() + } } return true } - private suspend fun evalMuxMovers(evalScope: EvalScope) { + private fun evalMuxMovers(logIndent: Int, evalScope: EvalScope) { while (muxMovers.isNotEmpty()) { - coroutineScope { - val toMove = muxMovers.remove() - launch { toMove.performMove(evalScope) } - } + val toMove = muxMovers.removeFirst() + toMove.performMove(logIndent, evalScope) } } /** Updates all [TState]es that have changed within this transaction. */ - private suspend fun evalStateWriters(evalScope: EvalScope) { - coroutineScope { - while (stateWrites.isNotEmpty()) { - val latch = stateWrites.remove() - launch { latch.updateState(evalScope) } - } + private fun evalStateWriters(logIndent: Int, evalScope: EvalScope) { + while (stateWrites.isNotEmpty()) { + val latch = stateWrites.removeFirst() + latch.updateState(logIndent, evalScope) } } - private suspend fun evalDeactivations() { - coroutineScope { - launch { - while (deactivations.isNotEmpty()) { - // traverse in reverse order - // - deactivations are added in depth-order during the node traversal phase - // - perform deactivations in reverse order, in case later ones propagate to - // earlier ones - val toDeactivate = deactivations.removeLast() - launch { toDeactivate.deactivateIfNeeded() } - } - } - while (outputDeactivations.isNotEmpty()) { - val toDeactivate = outputDeactivations.remove() - launch { - toDeactivate.upstream?.removeDownstreamAndDeactivateIfNeeded( - downstream = toDeactivate.schedulable - ) - } - } + private fun evalDeactivations() { + while (deactivations.isNotEmpty()) { + // traverse in reverse order + // - deactivations are added in depth-order during the node traversal phase + // - perform deactivations in reverse order, in case later ones propagate to + // earlier ones + val toDeactivate = deactivations.removeLast() + toDeactivate.deactivateIfNeeded() + } + + while (outputDeactivations.isNotEmpty()) { + val toDeactivate = outputDeactivations.removeFirst() + toDeactivate.upstream?.removeDownstreamAndDeactivateIfNeeded( + downstream = toDeactivate.schedulable + ) } check(deactivations.isEmpty()) { "unexpected lingering deactivations" } check(outputDeactivations.isEmpty()) { "unexpected lingering output deactivations" } @@ -260,4 +283,39 @@ internal class ScheduledAction<T>( } } -internal typealias TransactionStore = HeteroMap +internal class TransactionStore private constructor(private val storage: HeteroMap) { + constructor(capacity: Int) : this(HeteroMap(capacity)) + + constructor() : this(HeteroMap()) + + operator fun <A> get(key: HeteroMap.Key<A>): A = + storage.getOrError(key) { "no value for $key in this transaction" } + + operator fun <A> set(key: HeteroMap.Key<A>, value: A) { + storage[key] = value + } + + fun clear() = storage.clear() +} + +internal class TransactionCache<A> { + private val key = object : HeteroMap.Key<A> {} + @Volatile + var epoch: Long = Long.MIN_VALUE + private set + + fun getOrPut(evalScope: EvalScope, block: () -> A): A = + if (epoch < evalScope.epoch) { + epoch = evalScope.epoch + block().also { evalScope.transactionStore[key] = it } + } else { + evalScope.transactionStore[key] + } + + fun put(evalScope: EvalScope, value: A) { + epoch = evalScope.epoch + evalScope.transactionStore[key] = value + } + + fun getCurrentValue(evalScope: EvalScope): A = evalScope.transactionStore[key] +} diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NoScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NoScope.kt index fbd9689eb1d0..14e4e1cfc143 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NoScope.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NoScope.kt @@ -17,31 +17,9 @@ package com.android.systemui.kairos.internal import com.android.systemui.kairos.FrpScope -import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext -import kotlin.coroutines.coroutineContext -import kotlin.coroutines.startCoroutine -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.completeWith -import kotlinx.coroutines.job internal object NoScope { private object FrpScopeImpl : FrpScope - suspend fun <R> runInFrpScope(block: suspend FrpScope.() -> R): R { - val complete = CompletableDeferred<R>(coroutineContext.job) - block.startCoroutine( - FrpScopeImpl, - object : Continuation<R> { - override val context: CoroutineContext - get() = EmptyCoroutineContext - - override fun resumeWith(result: Result<R>) { - complete.completeWith(result) - } - }, - ) - return complete.await() - } + fun <R> runInFrpScope(block: FrpScope.() -> R): R = FrpScopeImpl.block() } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt index 7a015d8ca1f6..39b8bfe540d2 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt @@ -22,18 +22,18 @@ Muxes + Branch */ internal sealed interface SchedulableNode { /** schedule this node w/ given NodeEvalScope */ - suspend fun schedule(evalScope: EvalScope) + fun schedule(logIndent: Int, evalScope: EvalScope) - suspend fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) + fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) - suspend fun moveIndirectUpstreamToDirect( + fun moveIndirectUpstreamToDirect( scheduler: Scheduler, oldIndirectDepth: Int, oldIndirectSet: Set<MuxDeferredNode<*, *, *>>, newDirectDepth: Int, ) - suspend fun adjustIndirectUpstream( + fun adjustIndirectUpstream( scheduler: Scheduler, oldDepth: Int, newDepth: Int, @@ -41,20 +41,20 @@ internal sealed interface SchedulableNode { additions: Set<MuxDeferredNode<*, *, *>>, ) - suspend fun moveDirectUpstreamToIndirect( + fun moveDirectUpstreamToIndirect( scheduler: Scheduler, oldDirectDepth: Int, newIndirectDepth: Int, newIndirectSet: Set<MuxDeferredNode<*, *, *>>, ) - suspend fun removeIndirectUpstream( + fun removeIndirectUpstream( scheduler: Scheduler, depth: Int, indirectSet: Set<MuxDeferredNode<*, *, *>>, ) - suspend fun removeDirectUpstream(scheduler: Scheduler, depth: Int) + fun removeDirectUpstream(scheduler: Scheduler, depth: Int) } /* @@ -66,7 +66,7 @@ internal sealed interface PullNode<out A> { * will read from the cache, otherwise it will perform a full evaluation, even if invoked * multiple times within a transaction. */ - suspend fun getPushEvent(evalScope: EvalScope): A + fun getPushEvent(logIndent: Int, evalScope: EvalScope): A } /* @@ -74,19 +74,19 @@ Muxes + DmuxBranch */ internal sealed interface PushNode<A> : PullNode<A> { - suspend fun hasCurrentValue(evalScope: EvalScope): Boolean + fun hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean val depthTracker: DepthTracker - suspend fun removeDownstream(downstream: Schedulable) + fun removeDownstream(downstream: Schedulable) /** called during cleanup phase */ - suspend fun deactivateIfNeeded() + fun deactivateIfNeeded() /** called from mux nodes after severs */ - suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) + fun scheduleDeactivationIfNeeded(evalScope: EvalScope) - suspend fun addDownstream(downstream: Schedulable) + fun addDownstream(downstream: Schedulable) - suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) + fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt index 3373de05249c..38d8cf70b36e 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt @@ -21,8 +21,8 @@ import kotlin.coroutines.EmptyCoroutineContext internal class Output<A>( val context: CoroutineContext = EmptyCoroutineContext, - val onDeath: suspend () -> Unit = {}, - val onEmit: suspend EvalScope.(A) -> Unit, + val onDeath: () -> Unit = {}, + val onEmit: EvalScope.(A) -> Unit, ) { val schedulable = Schedulable.O(this) @@ -33,23 +33,24 @@ internal class Output<A>( private object NoResult // invoked by network - suspend fun visit(evalScope: EvalScope) { + fun visit(evalScope: EvalScope) { val upstreamResult = result check(upstreamResult !== NoResult) { "output visited with null upstream result" } result = NoResult @Suppress("UNCHECKED_CAST") evalScope.onEmit(upstreamResult as A) } - suspend fun kill() { + fun kill() { onDeath() } - suspend fun schedule(evalScope: EvalScope) { + fun schedule(logIndent: Int, evalScope: EvalScope) { result = - checkNotNull(upstream) { "output scheduled with null upstream" }.getPushEvent(evalScope) + checkNotNull(upstream) { "output scheduled with null upstream" } + .getPushEvent(logIndent, evalScope) evalScope.scheduleOutput(this) } } -internal inline fun OneShot(crossinline onEmit: suspend EvalScope.() -> Unit): Output<Unit> = +internal inline fun OneShot(crossinline onEmit: EvalScope.() -> Unit): Output<Unit> = Output<Unit>(onEmit = { onEmit() }).apply { result = Unit } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt index 43b621fadc67..5ade401da1a5 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt @@ -16,21 +16,23 @@ package com.android.systemui.kairos.internal -import com.android.systemui.kairos.internal.util.Key -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.Deferred +import com.android.systemui.kairos.internal.util.logDuration internal val neverImpl: TFlowImpl<Nothing> = TFlowCheap { null } -internal class MapNode<A, B>(val upstream: PullNode<A>, val transform: suspend EvalScope.(A) -> B) : +internal class MapNode<A, B>(val upstream: PullNode<A>, val transform: EvalScope.(A, Int) -> B) : PullNode<B> { - override suspend fun getPushEvent(evalScope: EvalScope): B = - evalScope.transform(upstream.getPushEvent(evalScope)) + override fun getPushEvent(logIndent: Int, evalScope: EvalScope): B = + logDuration(logIndent, "MapNode.getPushEvent") { + val upstream = + logDuration("upstream event") { upstream.getPushEvent(currentLogIndent, evalScope) } + logDuration("transform") { evalScope.transform(upstream, currentLogIndent) } + } } internal inline fun <A, B> mapImpl( - crossinline upstream: suspend EvalScope.() -> TFlowImpl<A>, - noinline transform: suspend EvalScope.(A) -> B, + crossinline upstream: EvalScope.() -> TFlowImpl<A>, + noinline transform: EvalScope.(A, Int) -> B, ): TFlowImpl<B> = TFlowCheap { downstream -> upstream().activate(evalScope = this, downstream)?.let { (connection, needsEval) -> ActivationResult( @@ -44,19 +46,29 @@ internal inline fun <A, B> mapImpl( } } -internal class CachedNode<A>(val key: Key<Deferred<A>>, val upstream: PullNode<A>) : PullNode<A> { - override suspend fun getPushEvent(evalScope: EvalScope): A { - val deferred = - evalScope.transactionStore.getOrPut(key) { - evalScope.deferAsync(CoroutineStart.LAZY) { upstream.getPushEvent(evalScope) } - } - return deferred.await() - } +internal class CachedNode<A>( + private val transactionCache: TransactionCache<Lazy<A>>, + val upstream: PullNode<A>, +) : PullNode<A> { + override fun getPushEvent(logIndent: Int, evalScope: EvalScope): A = + logDuration(logIndent, "CachedNode.getPushEvent") { + val deferred = + logDuration("CachedNode.getOrPut", false) { + transactionCache.getOrPut(evalScope) { + evalScope.deferAsync { + logDuration("CachedNode.getUpstreamEvent") { + upstream.getPushEvent(currentLogIndent, evalScope) + } + } + } + } + logDuration("await") { deferred.value } + } } internal fun <A> TFlowImpl<A>.cached(): TFlowImpl<A> { - val key = object : Key<Deferred<A>> {} - return TFlowCheap { + val key = TransactionCache<Lazy<A>>() + return TFlowCheap { it -> activate(this, it)?.let { (connection, needsEval) -> ActivationResult( connection = diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt index d046420517fe..0529bcb63c07 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt @@ -14,66 +14,76 @@ * limitations under the License. */ -@file:OptIn(ExperimentalCoroutinesApi::class) - package com.android.systemui.kairos.internal -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.PriorityBlockingQueue -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch +import com.android.systemui.kairos.internal.util.LogIndent +import java.util.PriorityQueue internal interface Scheduler { - fun schedule(depth: Int, node: MuxNode<*, *, *, *>) + fun schedule(depth: Int, node: MuxNode<*, *, *>) - fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *, *>) + fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) } -internal class SchedulerImpl : Scheduler { - val enqueued = ConcurrentHashMap<MuxNode<*, *, *, *>, Any>() - val scheduledQ = - PriorityBlockingQueue<Pair<Int, MuxNode<*, *, *, *>>>(16, compareBy { it.first }) +internal class SchedulerImpl(private val enqueue: (MuxNode<*, *, *>) -> Boolean) : Scheduler { + private val scheduledQ = PriorityQueue<Pair<Int, MuxNode<*, *, *>>>(compareBy { it.first }) - override fun schedule(depth: Int, node: MuxNode<*, *, *, *>) { - if (enqueued.putIfAbsent(node, node) == null) { + override fun schedule(depth: Int, node: MuxNode<*, *, *>) { + if (enqueue(node)) { scheduledQ.add(Pair(depth, node)) } } - override fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *, *>) { + override fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) { schedule(Int.MIN_VALUE + indirectDepth, node) } - internal suspend fun drainEval(network: Network) { - drain { runStep -> - runStep { muxNode -> network.evalScope { muxNode.visit(this) } } + internal fun drainEval(logIndent: Int, network: Network): Int = + drain(logIndent) { runStep -> + runStep { muxNode -> + network.evalScope { + muxNode.markedForEvaluation = false + muxNode.visit(currentLogIndent, this) + } + } // If any visited MuxPromptNodes had their depths increased, eagerly propagate those // depth changes now before performing further network evaluation. - network.compactor.drainCompact() + val numNodes = network.compactor.drainCompact(currentLogIndent) + logLn("promptly compacted $numNodes nodes") } - } - internal suspend fun drainCompact() { - drain { runStep -> runStep { muxNode -> muxNode.visitCompact(scheduler = this) } } - } + internal fun drainCompact(logIndent: Int): Int = + drain(logIndent) { runStep -> + runStep { muxNode -> + muxNode.markedForCompaction = false + muxNode.visitCompact(scheduler = this@SchedulerImpl) + } + } - private suspend inline fun drain( + private inline fun drain( + logIndent: Int, crossinline onStep: - suspend ( - runStep: suspend (visit: suspend (MuxNode<*, *, *, *>) -> Unit) -> Unit - ) -> Unit - ): Unit = coroutineScope { + LogIndent.( + runStep: LogIndent.(visit: LogIndent.(MuxNode<*, *, *>) -> Unit) -> Unit + ) -> Unit, + ): Int { + var total = 0 while (scheduledQ.isNotEmpty()) { val maxDepth = scheduledQ.peek()?.first ?: error("Unexpected empty scheduler") - onStep { visit -> runStep(maxDepth, visit) } + LogIndent(logIndent).onStep { visit -> + logDuration("step $maxDepth") { + val subtotal = runStep(maxDepth) { visit(it) } + logLn("visited $subtotal nodes") + total += subtotal + } + } } + return total } - private suspend inline fun runStep( - maxDepth: Int, - crossinline visit: suspend (MuxNode<*, *, *, *>) -> Unit, - ) = coroutineScope { + private inline fun runStep(maxDepth: Int, crossinline visit: (MuxNode<*, *, *>) -> Unit): Int { + var total = 0 + val toVisit = mutableListOf<MuxNode<*, *, *>>() while (scheduledQ.peek()?.first?.let { it <= maxDepth } == true) { val (d, node) = scheduledQ.remove() if ( @@ -82,11 +92,15 @@ internal class SchedulerImpl : Scheduler { ) { scheduledQ.add(node.depthTracker.dirty_directDepth to node) } else { - launch { - enqueued.remove(node) - visit(node) - } + total++ + toVisit.add(node) } } + + for (node in toVisit) { + visit(node) + } + + return total } } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt index 94f94f510d48..48f69036df89 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt @@ -30,25 +30,16 @@ import com.android.systemui.kairos.emptyTFlow import com.android.systemui.kairos.groupByKey import com.android.systemui.kairos.init import com.android.systemui.kairos.internal.store.ConcurrentHashMapK -import com.android.systemui.kairos.internal.util.mapValuesParallel import com.android.systemui.kairos.mapCheap import com.android.systemui.kairos.merge import com.android.systemui.kairos.switch import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.map -import kotlin.coroutines.Continuation -import kotlin.coroutines.CoroutineContext -import kotlin.coroutines.EmptyCoroutineContext -import kotlin.coroutines.startCoroutine -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.completeWith -import kotlinx.coroutines.job internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: TFlow<Any>) : StateScope, EvalScope by evalScope { - private val endSignalOnce: TFlow<Any> = endSignal.nextOnlyInternal("StateScope.endSignal") + override val endSignalOnce: TFlow<Any> = endSignal.nextOnlyInternal("StateScope.endSignal") private fun <A> TFlow<A>.truncateToScope(operatorName: String): TFlow<A> = if (endSignalOnce === emptyTFlow) { @@ -70,11 +61,11 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: } private fun <A> TFlow<A>.toTStateInternal(operatorName: String, init: A): TState<A> = - toTStateInternalDeferred(operatorName, CompletableDeferred(init)) + toTStateInternalDeferred(operatorName, CompletableLazy(init)) private fun <A> TFlow<A>.toTStateInternalDeferred( operatorName: String, - init: Deferred<A>, + init: Lazy<A>, ): TState<A> { val changes = this@toTStateInternalDeferred val name = operatorName @@ -89,7 +80,7 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: return TStateInit(constInit(name, impl)) } - private fun <R> deferredInternal(block: suspend FrpStateScope.() -> R): FrpDeferredValue<R> = + private fun <R> deferredInternal(block: FrpStateScope.() -> R): FrpDeferredValue<R> = FrpDeferredValue(deferAsync { runInStateScope(block) }) private fun <A> TFlow<A>.toTStateDeferredInternal( @@ -102,30 +93,27 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: } private fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyInternal( - storage: TState<Map<K, TFlow<V>>> + name: String?, + storage: TState<Map<K, TFlow<V>>>, ): TFlow<Map<K, V>> { - val name = "mergeIncrementally" + val patches = + mapImpl({ init.connect(this) }) { patch, _ -> + patch.mapValues { (_, m) -> m.map { flow -> flow.init.connect(this) } }.asIterable() + } return TFlowInit( constInit( name, switchDeferredImpl( + name = name, getStorage = { storage.init .connect(this) .getCurrentWithEpoch(this) .first - .mapValuesParallel { (_, flow) -> flow.init.connect(this) } + .mapValues { (_, flow) -> flow.init.connect(this) } .asIterable() }, - getPatches = { - mapImpl({ init.connect(this) }) { patch -> - patch - .mapValuesParallel { (_, m) -> - m.map { flow -> flow.init.connect(this) } - } - .asIterable() - } - }, + getPatches = { patches }, storeFactory = ConcurrentHashMapK.Factory(), ) .awaitValues(), @@ -134,30 +122,27 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: } private fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptInternal( - storage: TState<Map<K, TFlow<V>>> + storage: TState<Map<K, TFlow<V>>>, + name: String?, ): TFlow<Map<K, V>> { - val name = "mergeIncrementallyPrompt" + val patches = + mapImpl({ init.connect(this) }) { patch, _ -> + patch.mapValues { (_, m) -> m.map { flow -> flow.init.connect(this) } }.asIterable() + } return TFlowInit( constInit( name, switchPromptImpl( + name = name, getStorage = { storage.init .connect(this) .getCurrentWithEpoch(this) .first - .mapValuesParallel { (_, flow) -> flow.init.connect(this) } + .mapValues { (_, flow) -> flow.init.connect(this) } .asIterable() }, - getPatches = { - mapImpl({ init.connect(this) }) { patch -> - patch - .mapValuesParallel { (_, m) -> - m.map { flow -> flow.init.connect(this) } - } - .asIterable() - } - }, + getPatches = { patches }, storeFactory = ConcurrentHashMapK.Factory(), ) .awaitValues(), @@ -170,9 +155,9 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: numKeys: Int?, ): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> { val eventsByKey: GroupedTFlow<K, Maybe<FrpStateful<A>>> = groupByKey(numKeys) - val initOut: Deferred<Map<K, B>> = deferAsync { - init.unwrapped.await().mapValuesParallel { (k, stateful) -> - val newEnd = with(frpScope) { eventsByKey[k].skipNext() } + val initOut: Lazy<Map<K, B>> = deferAsync { + init.unwrapped.value.mapValues { (k, stateful) -> + val newEnd = with(frpScope) { eventsByKey[k] } val newScope = childStateScope(newEnd) newScope.runInStateScope(stateful) } @@ -180,8 +165,8 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: val changesNode: TFlowImpl<Map<K, Maybe<A>>> = mapImpl( upstream = { this@applyLatestStatefulForKeyInternal.init.connect(evalScope = this) } - ) { upstreamMap -> - upstreamMap.mapValuesParallel { (k: K, ma: Maybe<FrpStateful<A>>) -> + ) { upstreamMap, _ -> + upstreamMap.mapValues { (k: K, ma: Maybe<FrpStateful<A>>) -> reenterStateScope(this@StateScopeImpl).run { ma.map { stateful -> val newEnd = with(frpScope) { eventsByKey[k].skipNext() } @@ -205,7 +190,7 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: name, mapImpl( upstream = { this@observeStatefulsInternal.init.connect(evalScope = this) } - ) { stateful -> + ) { stateful, _ -> reenterStateScope(outerScope = this@StateScopeImpl) .runInStateScope(stateful) } @@ -219,25 +204,26 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: private inner class FrpStateScopeImpl : FrpStateScope, FrpTransactionScope by evalScope.frpScope { - override fun <A> deferredStateScope( - block: suspend FrpStateScope.() -> A - ): FrpDeferredValue<A> = deferredInternal(block) + override fun <A> deferredStateScope(block: FrpStateScope.() -> A): FrpDeferredValue<A> = + deferredInternal(block) override fun <A> TFlow<A>.holdDeferred(initialValue: FrpDeferredValue<A>): TState<A> = toTStateDeferredInternal(initialValue) override fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally( - initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>> + name: String?, + initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>, ): TFlow<Map<K, V>> { val storage: TState<Map<K, TFlow<V>>> = foldMapIncrementally(initialTFlows) - return mergeIncrementallyInternal(storage) + return mergeIncrementallyInternal(name, storage) } override fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly( - initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>> + initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>, + name: String?, ): TFlow<Map<K, V>> { val storage: TState<Map<K, TFlow<V>>> = foldMapIncrementally(initialTFlows) - return mergeIncrementallyPromptInternal(storage) + return mergeIncrementallyPromptInternal(storage, name) } override fun <K, A, B> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKey( @@ -250,21 +236,7 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: observeStatefulsInternal() } - override suspend fun <R> runInStateScope(block: suspend FrpStateScope.() -> R): R { - val complete = CompletableDeferred<R>(parent = coroutineContext.job) - block.startCoroutine( - frpScope, - object : Continuation<R> { - override val context: CoroutineContext - get() = EmptyCoroutineContext - - override fun resumeWith(result: Result<R>) { - complete.completeWith(result) - } - }, - ) - return complete.await() - } + override fun <R> runInStateScope(block: FrpStateScope.() -> R): R = frpScope.block() override fun childStateScope(newEnd: TFlow<Any>) = StateScopeImpl(evalScope, merge(newEnd, endSignal)) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt index 784a2afe0992..47a585abac5f 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt @@ -18,7 +18,7 @@ package com.android.systemui.kairos.internal /* Initialized TFlow */ internal fun interface TFlowImpl<out A> { - suspend fun activate(evalScope: EvalScope, downstream: Schedulable): ActivationResult<A>? + fun activate(evalScope: EvalScope, downstream: Schedulable): ActivationResult<A>? } internal data class ActivationResult<out A>( @@ -32,28 +32,27 @@ internal inline fun <A> TFlowCheap(crossinline cheap: CheapNodeSubscribe<A>) = } internal typealias CheapNodeSubscribe<A> = - suspend EvalScope.(downstream: Schedulable) -> ActivationResult<A>? + EvalScope.(downstream: Schedulable) -> ActivationResult<A>? internal data class NodeConnection<out A>( val directUpstream: PullNode<A>, val schedulerUpstream: PushNode<*>, ) -internal suspend fun <A> NodeConnection<A>.hasCurrentValue(evalScope: EvalScope): Boolean = - schedulerUpstream.hasCurrentValue(evalScope) +internal fun <A> NodeConnection<A>.hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean = + schedulerUpstream.hasCurrentValue(logIndent, evalScope) -internal suspend fun <A> NodeConnection<A>.removeDownstreamAndDeactivateIfNeeded( - downstream: Schedulable -) = schedulerUpstream.removeDownstreamAndDeactivateIfNeeded(downstream) +internal fun <A> NodeConnection<A>.removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) = + schedulerUpstream.removeDownstreamAndDeactivateIfNeeded(downstream) -internal suspend fun <A> NodeConnection<A>.scheduleDeactivationIfNeeded(evalScope: EvalScope) = +internal fun <A> NodeConnection<A>.scheduleDeactivationIfNeeded(evalScope: EvalScope) = schedulerUpstream.scheduleDeactivationIfNeeded(evalScope) -internal suspend fun <A> NodeConnection<A>.removeDownstream(downstream: Schedulable) = +internal fun <A> NodeConnection<A>.removeDownstream(downstream: Schedulable) = schedulerUpstream.removeDownstream(downstream) -internal suspend fun <A> NodeConnection<A>.getPushEvent(evalScope: EvalScope): A = - directUpstream.getPushEvent(evalScope) +internal fun <A> NodeConnection<A>.getPushEvent(logIndent: Int, evalScope: EvalScope): A = + directUpstream.getPushEvent(logIndent, evalScope) internal val <A> NodeConnection<A>.depthTracker: DepthTracker get() = schedulerUpstream.depthTracker diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt index 916f22575b0c..9565a9c12d38 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt @@ -20,29 +20,22 @@ import com.android.systemui.kairos.internal.store.ConcurrentHashMapK import com.android.systemui.kairos.internal.store.MutableArrayMapK import com.android.systemui.kairos.internal.store.MutableMapK import com.android.systemui.kairos.internal.store.StoreEntry -import com.android.systemui.kairos.internal.util.Key import com.android.systemui.kairos.internal.util.hashString -import com.android.systemui.kairos.internal.util.launchImmediate import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.none import java.util.concurrent.atomic.AtomicLong -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.Deferred import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.coroutineScope internal sealed interface TStateImpl<out A> { val name: String? val operatorName: String val changes: TFlowImpl<A> - suspend fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> + fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> } -internal sealed class TStateDerived<A>(override val changes: TFlowImpl<A>) : - TStateImpl<A>, Key<Deferred<Pair<A, Long>>> { +internal sealed class TStateDerived<A>(override val changes: TFlowImpl<A>) : TStateImpl<A> { @Volatile var invalidatedEpoch = Long.MIN_VALUE @@ -52,12 +45,12 @@ internal sealed class TStateDerived<A>(override val changes: TFlowImpl<A>) : protected var cache: Any? = EmptyCache private set - override suspend fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> = - evalScope.transactionStore - .getOrPut(this) { evalScope.deferAsync(CoroutineStart.LAZY) { pull(evalScope) } } - .await() + private val transactionCache = TransactionCache<Lazy<Pair<A, Long>>>() - suspend fun pull(evalScope: EvalScope): Pair<A, Long> { + override fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> = + transactionCache.getOrPut(evalScope) { evalScope.deferAsync { pull(evalScope) } }.value + + fun pull(evalScope: EvalScope): Pair<A, Long> { @Suppress("UNCHECKED_CAST") return recalc(evalScope)?.also { (a, epoch) -> setCache(a, epoch) } ?: ((cache as A) to invalidatedEpoch) @@ -75,7 +68,7 @@ internal sealed class TStateDerived<A>(override val changes: TFlowImpl<A>) : return if (cache == EmptyCache) none else just(cache as A) } - protected abstract suspend fun recalc(evalScope: EvalScope): Pair<A, Long>? + protected abstract fun recalc(evalScope: EvalScope): Pair<A, Long>? private data object EmptyCache } @@ -83,7 +76,7 @@ internal sealed class TStateDerived<A>(override val changes: TFlowImpl<A>) : internal class TStateSource<A>( override val name: String?, override val operatorName: String, - init: Deferred<A>, + init: Lazy<A>, override val changes: TFlowImpl<A>, ) : TStateImpl<A> { constructor( @@ -91,33 +84,34 @@ internal class TStateSource<A>( operatorName: String, init: A, changes: TFlowImpl<A>, - ) : this(name, operatorName, CompletableDeferred(init), changes) + ) : this(name, operatorName, CompletableLazy(init), changes) lateinit var upstreamConnection: NodeConnection<A> // Note: Don't need to synchronize; we will never interleave reads and writes, since all writes // are performed at the end of a network step, after any reads would have taken place. - @Volatile private var _current: Deferred<A> = init + @Volatile private var _current: Lazy<A> = init + @Volatile var writeEpoch = 0L private set - override suspend fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> = - _current.await() to writeEpoch + override fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> = + _current.value to writeEpoch /** called by network after eval phase has completed */ - suspend fun updateState(evalScope: EvalScope) { + fun updateState(logIndent: Int, evalScope: EvalScope) { // write the latch - _current = CompletableDeferred(upstreamConnection.getPushEvent(evalScope)) + // TODO: deferAsync? + _current = CompletableLazy(upstreamConnection.getPushEvent(logIndent, evalScope)) writeEpoch = evalScope.epoch } override fun toString(): String = "TStateImpl(changes=$changes, current=$_current)" @OptIn(ExperimentalCoroutinesApi::class) - fun getStorageUnsafe(): Maybe<A> = - if (_current.isCompleted) just(_current.getCompleted()) else none + fun getStorageUnsafe(): Maybe<A> = if (_current.isInitialized()) just(_current.value) else none } internal fun <A> constS(name: String?, operatorName: String, init: A): TStateImpl<A> = @@ -127,8 +121,8 @@ internal inline fun <A> activatedTStateSource( name: String?, operatorName: String, evalScope: EvalScope, - crossinline getChanges: suspend EvalScope.() -> TFlowImpl<A>, - init: Deferred<A>, + crossinline getChanges: EvalScope.() -> TFlowImpl<A>, + init: Lazy<A>, ): TStateImpl<A> { lateinit var state: TStateSource<A> val calm: TFlowImpl<A> = @@ -167,19 +161,25 @@ private inline fun <A> TFlowImpl<A>.calm( internal fun <A, B> TStateImpl<A>.mapCheap( name: String?, operatorName: String, - transform: suspend EvalScope.(A) -> B, + transform: EvalScope.(A) -> B, ): TStateImpl<B> = - DerivedMapCheap(name, operatorName, this, mapImpl({ changes }) { transform(it) }, transform) + DerivedMapCheap( + name, + operatorName, + this, + mapImpl({ changes }) { it, _ -> transform(it) }, + transform, + ) internal class DerivedMapCheap<A, B>( override val name: String?, override val operatorName: String, val upstream: TStateImpl<A>, override val changes: TFlowImpl<B>, - private val transform: suspend EvalScope.(A) -> B, + private val transform: EvalScope.(A) -> B, ) : TStateImpl<B> { - override suspend fun getCurrentWithEpoch(evalScope: EvalScope): Pair<B, Long> { + override fun getCurrentWithEpoch(evalScope: EvalScope): Pair<B, Long> { val (a, epoch) = upstream.getCurrentWithEpoch(evalScope) return evalScope.transform(a) to epoch } @@ -190,10 +190,10 @@ internal class DerivedMapCheap<A, B>( internal fun <A, B> TStateImpl<A>.map( name: String?, operatorName: String, - transform: suspend EvalScope.(A) -> B, + transform: EvalScope.(A) -> B, ): TStateImpl<B> { lateinit var state: TStateDerived<B> - val mappedChanges = mapImpl({ changes }) { transform(it) }.cached().calm { state } + val mappedChanges = mapImpl({ changes }) { it, _ -> transform(it) }.cached().calm { state } state = DerivedMap(name, operatorName, transform, this, mappedChanges) return state } @@ -201,13 +201,13 @@ internal fun <A, B> TStateImpl<A>.map( internal class DerivedMap<A, B>( override val name: String?, override val operatorName: String, - private val transform: suspend EvalScope.(A) -> B, + private val transform: EvalScope.(A) -> B, val upstream: TStateImpl<A>, changes: TFlowImpl<B>, ) : TStateDerived<B>(changes) { override fun toString(): String = "${this::class.simpleName}@$hashString" - override suspend fun recalc(evalScope: EvalScope): Pair<B, Long>? { + override fun recalc(evalScope: EvalScope): Pair<B, Long>? { val (a, epoch) = upstream.getCurrentWithEpoch(evalScope) return if (epoch > invalidatedEpoch) { evalScope.transform(a) to epoch @@ -219,12 +219,13 @@ internal class DerivedMap<A, B>( internal fun <A> TStateImpl<TStateImpl<A>>.flatten(name: String?, operator: String): TStateImpl<A> { // emits the current value of the new inner state, when that state is emitted - val switchEvents = mapImpl({ changes }) { newInner -> newInner.getCurrentWithEpoch(this).first } + val switchEvents = + mapImpl({ changes }) { newInner, _ -> newInner.getCurrentWithEpoch(this).first } // emits the new value of the new inner state when that state is emitted, or // falls back to the current value if a new state is *not* being emitted this // transaction val innerChanges = - mapImpl({ changes }) { newInner -> + mapImpl({ changes }) { newInner, _ -> mergeNodes({ switchEvents }, { newInner.changes }) { _, new -> new } } val switchedChanges: TFlowImpl<A> = @@ -243,7 +244,7 @@ internal class DerivedFlatten<A>( val upstream: TStateImpl<TStateImpl<A>>, changes: TFlowImpl<A>, ) : TStateDerived<A>(changes) { - override suspend fun recalc(evalScope: EvalScope): Pair<A, Long> { + override fun recalc(evalScope: EvalScope): Pair<A, Long> { val (inner, epoch0) = upstream.getCurrentWithEpoch(evalScope) val (a, epoch1) = inner.getCurrentWithEpoch(evalScope) return a to maxOf(epoch0, epoch1) @@ -256,7 +257,7 @@ internal class DerivedFlatten<A>( internal inline fun <A, B> TStateImpl<A>.flatMap( name: String?, operatorName: String, - noinline transform: suspend EvalScope.(A) -> TStateImpl<B>, + noinline transform: EvalScope.(A) -> TStateImpl<B>, ): TStateImpl<B> = map(null, operatorName, transform).flatten(name, operatorName) internal fun <A, B, Z> zipStates( @@ -264,7 +265,7 @@ internal fun <A, B, Z> zipStates( operatorName: String, l1: TStateImpl<A>, l2: TStateImpl<B>, - transform: suspend EvalScope.(A, B) -> Z, + transform: EvalScope.(A, B) -> Z, ): TStateImpl<Z> = zipStateList(null, operatorName, listOf(l1, l2)).map(name, operatorName) { @Suppress("UNCHECKED_CAST") transform(it[0] as A, it[1] as B) @@ -276,7 +277,7 @@ internal fun <A, B, C, Z> zipStates( l1: TStateImpl<A>, l2: TStateImpl<B>, l3: TStateImpl<C>, - transform: suspend EvalScope.(A, B, C) -> Z, + transform: EvalScope.(A, B, C) -> Z, ): TStateImpl<Z> = zipStateList(null, operatorName, listOf(l1, l2, l3)).map(name, operatorName) { @Suppress("UNCHECKED_CAST") transform(it[0] as A, it[1] as B, it[2] as C) @@ -289,7 +290,7 @@ internal fun <A, B, C, D, Z> zipStates( l2: TStateImpl<B>, l3: TStateImpl<C>, l4: TStateImpl<D>, - transform: suspend EvalScope.(A, B, C, D) -> Z, + transform: EvalScope.(A, B, C, D) -> Z, ): TStateImpl<Z> = zipStateList(null, operatorName, listOf(l1, l2, l3, l4)).map(name, operatorName) { @Suppress("UNCHECKED_CAST") transform(it[0] as A, it[1] as B, it[2] as C, it[3] as D) @@ -303,7 +304,7 @@ internal fun <A, B, C, D, E, Z> zipStates( l3: TStateImpl<C>, l4: TStateImpl<D>, l5: TStateImpl<E>, - transform: suspend EvalScope.(A, B, C, D, E) -> Z, + transform: EvalScope.(A, B, C, D, E) -> Z, ): TStateImpl<Z> = zipStateList(null, operatorName, listOf(l1, l2, l3, l4, l5)).map(name, operatorName) { @Suppress("UNCHECKED_CAST") @@ -333,11 +334,7 @@ internal fun <V> zipStateList( name = name, operatorName = operatorName, numStates = states.size, - states = - states - .asSequence() - .mapIndexed { index, tStateImpl -> StoreEntry(index, tStateImpl) } - .asIterable(), + states = states.asIterableWithIndex(), storeFactory = MutableArrayMapK.Factory(), ) // Like mapCheap, but with caching (or like map, but without the calm changes, as they are not @@ -347,7 +344,7 @@ internal fun <V> zipStateList( operatorName = operatorName, transform = { arrayStore -> arrayStore.values.toList() }, upstream = zipped, - changes = mapImpl({ zipped.changes }) { arrayStore -> arrayStore.values.toList() }, + changes = mapImpl({ zipped.changes }) { arrayStore, _ -> arrayStore.values.toList() }, ) } @@ -364,26 +361,22 @@ internal fun <W, K, A> zipStates( val stateChanges = states.asSequence().map { (k, v) -> StoreEntry(k, v.changes) }.asIterable() lateinit var state: DerivedZipped<W, K, A> // No need for calm; invariant ensures that changes will only emit when there's a difference + val switchDeferredImpl = + switchDeferredImpl( + getStorage = { stateChanges }, + getPatches = { neverImpl }, + storeFactory = storeFactory, + ) val changes = - mapImpl({ - switchDeferredImpl( - getStorage = { stateChanges }, - getPatches = { neverImpl }, - storeFactory = storeFactory, - ) - }) { patch -> + mapImpl({ switchDeferredImpl }) { patch, logIndent -> val store = storeFactory.create<A>(numStates) - coroutineScope { - states.forEach { (k, state) -> - launchImmediate { - store[k] = - if (patch.contains(k)) { - patch.getValue(k).getPushEvent(evalScope = this@mapImpl) - } else { - state.getCurrentWithEpoch(evalScope = this@mapImpl).first - } + states.forEach { (k, state) -> + store[k] = + if (patch.contains(k)) { + patch.getValue(k).getPushEvent(logIndent, evalScope = this@mapImpl) + } else { + state.getCurrentWithEpoch(evalScope = this@mapImpl).first } - } } store.also { state.setCache(it, epoch) } } @@ -408,17 +401,13 @@ internal class DerivedZipped<W, K, A>( changes: TFlowImpl<MutableMapK<W, K, A>>, private val storeFactory: MutableMapK.Factory<W, K>, ) : TStateDerived<MutableMapK<W, K, A>>(changes) { - override suspend fun recalc(evalScope: EvalScope): Pair<MutableMapK<W, K, A>, Long> { + override fun recalc(evalScope: EvalScope): Pair<MutableMapK<W, K, A>, Long> { val newEpoch = AtomicLong() val store = storeFactory.create<A>(upstreamSize) - coroutineScope { - for ((key, value) in upstream) { - launchImmediate { - val (a, epoch) = value.getCurrentWithEpoch(evalScope) - newEpoch.accumulateAndGet(epoch, ::maxOf) - store[key] = a - } - } + for ((key, value) in upstream) { + val (a, epoch) = value.getCurrentWithEpoch(evalScope) + newEpoch.accumulateAndGet(epoch, ::maxOf) + store[key] = a } return store to newEpoch.get() } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TransactionalImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TransactionalImpl.kt index 8647bdd5b7b1..13bd3b005871 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TransactionalImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TransactionalImpl.kt @@ -16,31 +16,25 @@ package com.android.systemui.kairos.internal -import com.android.systemui.kairos.internal.util.Key import com.android.systemui.kairos.internal.util.hashString -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.Deferred internal sealed class TransactionalImpl<out A> { - data class Const<out A>(val value: Deferred<A>) : TransactionalImpl<A>() + data class Const<out A>(val value: Lazy<A>) : TransactionalImpl<A>() + + class Impl<A>(val block: EvalScope.() -> A) : TransactionalImpl<A>() { + val cache = TransactionCache<Lazy<A>>() - class Impl<A>(val block: suspend EvalScope.() -> A) : TransactionalImpl<A>(), Key<Deferred<A>> { override fun toString(): String = "${this::class.simpleName}@$hashString" } } @Suppress("NOTHING_TO_INLINE") -internal inline fun <A> transactionalImpl( - noinline block: suspend EvalScope.() -> A -): TransactionalImpl<A> = TransactionalImpl.Impl(block) +internal inline fun <A> transactionalImpl(noinline block: EvalScope.() -> A): TransactionalImpl<A> = + TransactionalImpl.Impl(block) -internal fun <A> TransactionalImpl<A>.sample(evalScope: EvalScope): Deferred<A> = +internal fun <A> TransactionalImpl<A>.sample(evalScope: EvalScope): Lazy<A> = when (this) { is TransactionalImpl.Const -> value is TransactionalImpl.Impl -> - evalScope.transactionStore - .getOrPut(this) { - evalScope.deferAsync(start = CoroutineStart.LAZY) { evalScope.block() } - } - .also { it.start() } + cache.getOrPut(evalScope) { evalScope.deferAsync { evalScope.block() } } } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt index 33709a97da8f..4d183481898b 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt @@ -21,13 +21,14 @@ import com.android.systemui.kairos.util.None import com.android.systemui.kairos.util.just import java.util.concurrent.ConcurrentHashMap -internal interface Key<A> - private object NULL -internal class HeteroMap { +internal class HeteroMap private constructor(private val store: ConcurrentHashMap<Key<*>, Any>) { + interface Key<A> {} + + constructor() : this(ConcurrentHashMap()) - private val store = ConcurrentHashMap<Key<*>, Any>() + constructor(capacity: Int) : this(ConcurrentHashMap(capacity)) @Suppress("UNCHECKED_CAST") operator fun <A> get(key: Key<A>): Maybe<A> = diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt index ebf9a66be0ae..13f884666182 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt @@ -16,8 +16,6 @@ package com.android.systemui.kairos.internal.util -import kotlinx.coroutines.CoroutineStart -import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.yield @@ -32,7 +30,7 @@ internal suspend inline fun <K, A, B : Any, M : MutableMap<K, B>> Map<K, A> destination.also { coroutineScope { mapValues { - async { + asyncImmediate { yield() block(it) } @@ -53,7 +51,7 @@ internal inline fun <K, A, B : Any, M : MutableMap<K, B>> Map<K, A>.mapValuesNot internal suspend fun <A, B> Iterable<A>.mapParallel(transform: suspend (A) -> B): List<B> = coroutineScope { - map { async(start = CoroutineStart.LAZY) { transform(it) } }.awaitAll() + map { asyncImmediate { transform(it) } }.awaitAll() } internal suspend fun <K, A, B, M : MutableMap<K, B>> Map<K, A>.mapValuesParallelTo( diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/Util.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/Util.kt index 6bb7f9f593aa..466a9f83b91f 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/Util.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/Util.kt @@ -18,8 +18,13 @@ package com.android.systemui.kairos.internal.util +import kotlin.contracts.ExperimentalContracts +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext +import kotlin.time.DurationUnit +import kotlin.time.measureTimedValue import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Deferred @@ -31,6 +36,62 @@ import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.launch import kotlinx.coroutines.newCoroutineContext +private const val LogEnabled = false + +@Suppress("NOTHING_TO_INLINE") +internal inline fun logLn(indent: Int = 0, message: Any?) { + if (!LogEnabled) return + log(indent, message) + println() +} + +@Suppress("NOTHING_TO_INLINE") +internal inline fun log(indent: Int = 0, message: Any?) { + if (!LogEnabled) return + printIndent(indent) + print(message) +} + +@JvmInline +internal value class LogIndent(val currentLogIndent: Int) { + @OptIn(ExperimentalContracts::class) + inline fun <R> logDuration(prefix: String, start: Boolean = true, block: LogIndent.() -> R): R { + contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } + return logDuration(currentLogIndent, prefix, start, block) + } + + @Suppress("NOTHING_TO_INLINE") + inline fun logLn(message: Any?) = logLn(currentLogIndent, message) +} + +@OptIn(ExperimentalContracts::class) +internal inline fun <R> logDuration( + indent: Int, + prefix: String, + start: Boolean = true, + block: LogIndent.() -> R, +): R { + contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } + if (!LogEnabled) return LogIndent(0).block() + if (start) { + logLn(indent, prefix) + } + val (result, duration) = measureTimedValue { LogIndent(indent + 1).block() } + + printIndent(indent) + print(prefix) + print(": ") + println(duration.toString(DurationUnit.MICROSECONDS)) + return result +} + +@Suppress("NOTHING_TO_INLINE") +private inline fun printIndent(indent: Int) { + for (i in 0 until indent) { + print(" ") + } +} + internal fun <A> CoroutineScope.asyncImmediate( start: CoroutineStart = CoroutineStart.UNDISPATCHED, context: CoroutineContext = EmptyCoroutineContext, |