summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author Steve Elliott <steell@google.com> 2024-12-06 16:27:41 -0500
committer Steve Elliott <steell@google.com> 2025-01-03 15:55:35 -0500
commitd6865b295361c40202506a86aeaa6dfbbbb6e5c7 (patch)
tree161e5f649a44735dcd48a2457462c6f9ce24a43a
parent02946ae243e89bef2bae8087e91262e2ae8f499b (diff)
[kairos] remove most internal usage of `suspend fun`
And by proxy, all concurrency from internal graph evaluation. This produces a large performance improvement, mostly due to observed overhead with `suspend fun`. Flag: EXEMPT unused Test: atest kairos-tests Change-Id: I72d3a0a15ae4d9a143eca8d587f177fdee171a44
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt10
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpBuildScope.kt100
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpNetwork.kt23
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpScope.kt23
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpStateScope.kt87
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpTransactionScope.kt8
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt73
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt152
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Transactional.kt17
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt125
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt77
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt264
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt37
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt13
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Graph.kt172
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt23
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt93
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt21
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt329
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt442
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt442
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt218
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NoScope.kt24
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt28
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt15
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt48
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt90
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt102
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt21
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt137
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TransactionalImpl.kt22
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt9
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt6
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/Util.kt61
34 files changed, 1589 insertions, 1723 deletions
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt
index ae9b8c85910f..d5576b3b83df 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt
@@ -35,14 +35,14 @@ fun <A> TFlow<Transactional<A>>.sampleTransactionals(): TFlow<A> = map { it.samp
@ExperimentalFrpApi
fun <A, B, C> TFlow<A>.sample(
state: TState<B>,
- transform: suspend FrpTransactionScope.(A, B) -> C,
+ transform: FrpTransactionScope.(A, B) -> C,
): TFlow<C> = map { transform(it, state.sample()) }
/** @see FrpTransactionScope.sample */
@ExperimentalFrpApi
fun <A, B, C> TFlow<A>.sample(
transactional: Transactional<B>,
- transform: suspend FrpTransactionScope.(A, B) -> C,
+ transform: FrpTransactionScope.(A, B) -> C,
): TFlow<C> = map { transform(it, transactional.sample()) }
/**
@@ -57,7 +57,7 @@ fun <A, B, C> TFlow<A>.sample(
@ExperimentalFrpApi
fun <A, B, C> TFlow<A>.samplePromptly(
state: TState<B>,
- transform: suspend FrpTransactionScope.(A, B) -> C,
+ transform: FrpTransactionScope.(A, B) -> C,
): TFlow<C> =
sample(state) { a, b -> These.thiz<Pair<A, B>, B>(a to b) }
.mergeWith(state.stateChanges.map { These.that(it) }) { thiz, that ->
@@ -189,7 +189,7 @@ fun interface FrpBuildMode<out A> {
* Invoked when this mode is enabled. Returns a value and a [TFlow] that signals a switch to a
* new mode.
*/
- suspend fun FrpBuildScope.enableMode(): Pair<A, TFlow<FrpBuildMode<A>>>
+ fun FrpBuildScope.enableMode(): Pair<A, TFlow<FrpBuildMode<A>>>
}
/**
@@ -229,7 +229,7 @@ fun interface FrpStatefulMode<out A> {
* Invoked when this mode is enabled. Returns a value and a [TFlow] that signals a switch to a
* new mode.
*/
- suspend fun FrpStateScope.enableMode(): Pair<A, TFlow<FrpStatefulMode<A>>>
+ fun FrpStateScope.enableMode(): Pair<A, TFlow<FrpStatefulMode<A>>>
}
/**
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpBuildScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpBuildScope.kt
index 209a402bd629..31778dc32697 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpBuildScope.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpBuildScope.kt
@@ -42,7 +42,7 @@ import kotlinx.coroutines.flow.scan
import kotlinx.coroutines.launch
/** A function that modifies the FrpNetwork. */
-typealias FrpSpec<A> = suspend FrpBuildScope.() -> A
+typealias FrpSpec<A> = FrpBuildScope.() -> A
/**
* Constructs an [FrpSpec]. The passed [block] will be invoked with an [FrpBuildScope] that can be
@@ -51,7 +51,7 @@ typealias FrpSpec<A> = suspend FrpBuildScope.() -> A
*/
@ExperimentalFrpApi
@Suppress("NOTHING_TO_INLINE")
-inline fun <A> frpSpec(noinline block: suspend FrpBuildScope.() -> A): FrpSpec<A> = block
+inline fun <A> frpSpec(noinline block: FrpBuildScope.() -> A): FrpSpec<A> = block
/** Applies the [FrpSpec] within this [FrpBuildScope]. */
@ExperimentalFrpApi
@@ -63,11 +63,14 @@ inline operator fun <A> FrpBuildScope.invoke(block: FrpBuildScope.() -> A) = run
interface FrpBuildScope : FrpStateScope {
/** TODO: Javadoc */
+ val frpNetwork: FrpNetwork
+
+ /** TODO: Javadoc */
@ExperimentalFrpApi
- fun <R> deferredBuildScope(block: suspend FrpBuildScope.() -> R): FrpDeferredValue<R>
+ fun <R> deferredBuildScope(block: FrpBuildScope.() -> R): FrpDeferredValue<R>
/** TODO: Javadoc */
- @ExperimentalFrpApi fun deferredBuildScopeAction(block: suspend FrpBuildScope.() -> Unit)
+ @ExperimentalFrpApi fun deferredBuildScopeAction(block: FrpBuildScope.() -> Unit)
/**
* Returns a [TFlow] containing the results of applying [transform] to each value of the
@@ -81,8 +84,7 @@ interface FrpBuildScope : FrpStateScope {
* (or a downstream) [TFlow] is observed separately, [transform] will not be invoked, and no
* internal side-effects will occur.
*/
- @ExperimentalFrpApi
- fun <A, B> TFlow<A>.mapBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B>
+ @ExperimentalFrpApi fun <A, B> TFlow<A>.mapBuild(transform: FrpBuildScope.(A) -> B): TFlow<B>
/**
* Invokes [block] whenever this [TFlow] emits a value, allowing side-effects to be safely
@@ -100,7 +102,7 @@ interface FrpBuildScope : FrpStateScope {
@ExperimentalFrpApi
fun <A> TFlow<A>.observe(
coroutineContext: CoroutineContext = EmptyCoroutineContext,
- block: suspend FrpEffectScope.(A) -> Unit = {},
+ block: FrpEffectScope.(A) -> Unit = {},
): Job
/**
@@ -133,7 +135,8 @@ interface FrpBuildScope : FrpStateScope {
* tFlow { ... }.apply { observe() }
* ```
*/
- @ExperimentalFrpApi fun <T> tFlow(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T>
+ @ExperimentalFrpApi
+ fun <T> tFlow(name: String? = null, builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T>
/**
* Creates an instance of a [TFlow] with elements that are emitted from [builder].
@@ -197,7 +200,7 @@ interface FrpBuildScope : FrpStateScope {
* @see observe
*/
@ExperimentalFrpApi
- fun <A> TFlow<A>.observeBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
+ fun <A> TFlow<A>.observeBuild(block: FrpBuildScope.(A) -> Unit = {}): Job =
mapBuild(block).observe()
/**
@@ -320,9 +323,8 @@ interface FrpBuildScope : FrpStateScope {
* [observers][observe] are unregistered, and any pending [effects][effect] are cancelled).
*/
@ExperimentalFrpApi
- fun <A, B> TFlow<A>.flatMapLatestBuild(
- transform: suspend FrpBuildScope.(A) -> TFlow<B>
- ): TFlow<B> = mapCheap { frpSpec { transform(it) } }.applyLatestSpec().flatten()
+ fun <A, B> TFlow<A>.flatMapLatestBuild(transform: FrpBuildScope.(A) -> TFlow<B>): TFlow<B> =
+ mapCheap { frpSpec { transform(it) } }.applyLatestSpec().flatten()
/**
* Returns a [TState] by applying [transform] to the value held by the original [TState].
@@ -333,9 +335,8 @@ interface FrpBuildScope : FrpStateScope {
* cancelled).
*/
@ExperimentalFrpApi
- fun <A, B> TState<A>.flatMapLatestBuild(
- transform: suspend FrpBuildScope.(A) -> TState<B>
- ): TState<B> = mapLatestBuild { transform(it) }.flatten()
+ fun <A, B> TState<A>.flatMapLatestBuild(transform: FrpBuildScope.(A) -> TState<B>): TState<B> =
+ mapLatestBuild { transform(it) }.flatten()
/**
* Returns a [TState] that transforms the value held inside this [TState] by applying it to the
@@ -347,7 +348,7 @@ interface FrpBuildScope : FrpStateScope {
* cancelled).
*/
@ExperimentalFrpApi
- fun <A, B> TState<A>.mapLatestBuild(transform: suspend FrpBuildScope.(A) -> B): TState<B> =
+ fun <A, B> TState<A>.mapLatestBuild(transform: FrpBuildScope.(A) -> B): TState<B> =
mapCheapUnsafe { frpSpec { transform(it) } }.applyLatestSpec()
/**
@@ -391,7 +392,7 @@ interface FrpBuildScope : FrpStateScope {
* cancelled).
*/
@ExperimentalFrpApi
- fun <A, B> TFlow<A>.mapLatestBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B> =
+ fun <A, B> TFlow<A>.mapLatestBuild(transform: FrpBuildScope.(A) -> B): TFlow<B> =
mapCheap { frpSpec { transform(it) } }.applyLatestSpec()
/**
@@ -407,7 +408,7 @@ interface FrpBuildScope : FrpStateScope {
@ExperimentalFrpApi
fun <A, B> TFlow<A>.mapLatestBuild(
initialValue: A,
- transform: suspend FrpBuildScope.(A) -> B,
+ transform: FrpBuildScope.(A) -> B,
): Pair<TFlow<B>, FrpDeferredValue<B>> =
mapLatestBuildDeferred(deferredOf(initialValue), transform)
@@ -424,7 +425,7 @@ interface FrpBuildScope : FrpStateScope {
@ExperimentalFrpApi
fun <A, B> TFlow<A>.mapLatestBuildDeferred(
initialValue: FrpDeferredValue<A>,
- transform: suspend FrpBuildScope.(A) -> B,
+ transform: FrpBuildScope.(A) -> B,
): Pair<TFlow<B>, FrpDeferredValue<B>> =
mapCheap { frpSpec { transform(it) } }
.applyLatestSpec(initialSpec = frpSpec { transform(initialValue.get()) })
@@ -519,12 +520,12 @@ interface FrpBuildScope : FrpStateScope {
fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey(
initialValues: FrpDeferredValue<Map<K, A>>,
numKeys: Int? = null,
- transform: suspend FrpBuildScope.(A) -> B,
+ transform: FrpBuildScope.(K, A) -> B,
): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> =
- map { patch -> patch.mapValues { (_, v) -> v.map { frpSpec { transform(it) } } } }
+ map { patch -> patch.mapValues { (k, v) -> v.map { frpSpec { transform(k, it) } } } }
.applyLatestSpecForKey(
deferredBuildScope {
- initialValues.get().mapValues { (_, v) -> frpSpec { transform(v) } }
+ initialValues.get().mapValues { (k, v) -> frpSpec { transform(k, v) } }
},
numKeys = numKeys,
)
@@ -546,7 +547,7 @@ interface FrpBuildScope : FrpStateScope {
fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey(
initialValues: Map<K, A>,
numKeys: Int? = null,
- transform: suspend FrpBuildScope.(A) -> B,
+ transform: FrpBuildScope.(K, A) -> B,
): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> =
mapLatestBuildForKey(deferredOf(initialValues), numKeys, transform)
@@ -565,7 +566,7 @@ interface FrpBuildScope : FrpStateScope {
@ExperimentalFrpApi
fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestBuildForKey(
numKeys: Int? = null,
- transform: suspend FrpBuildScope.(A) -> B,
+ transform: FrpBuildScope.(K, A) -> B,
): TFlow<Map<K, Maybe<B>>> = mapLatestBuildForKey(emptyMap(), numKeys, transform).first
/** Returns a [Deferred] containing the next value to be emitted from this [TFlow]. */
@@ -585,7 +586,8 @@ interface FrpBuildScope : FrpStateScope {
}
/** Returns a [TFlow] that emits whenever this [Flow] emits. */
- @ExperimentalFrpApi fun <A> Flow<A>.toTFlow(): TFlow<A> = tFlow { collect { emit(it) } }
+ @ExperimentalFrpApi
+ fun <A> Flow<A>.toTFlow(name: String? = null): TFlow<A> = tFlow(name) { collect { emit(it) } }
/**
* Shorthand for:
@@ -626,7 +628,7 @@ interface FrpBuildScope : FrpStateScope {
* cancelled).
*/
@ExperimentalFrpApi
- fun <A> TFlow<A>.observeLatestBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
+ fun <A> TFlow<A>.observeLatestBuild(block: FrpBuildScope.(A) -> Unit = {}): Job =
mapLatestBuild { block(it) }.observe()
/**
@@ -636,7 +638,7 @@ interface FrpBuildScope : FrpStateScope {
* With each invocation of [block], running effects from the previous invocation are cancelled.
*/
@ExperimentalFrpApi
- fun <A> TFlow<A>.observeLatest(block: suspend FrpEffectScope.(A) -> Unit = {}): Job {
+ fun <A> TFlow<A>.observeLatest(block: FrpEffectScope.(A) -> Unit = {}): Job {
var innerJob: Job? = null
return observeBuild {
innerJob?.cancel()
@@ -651,14 +653,13 @@ interface FrpBuildScope : FrpStateScope {
* With each invocation of [block], running effects from the previous invocation are cancelled.
*/
@ExperimentalFrpApi
- fun <A> TState<A>.observeLatest(block: suspend FrpEffectScope.(A) -> Unit = {}): Job =
- launchScope {
- var innerJob = effect { block(sample()) }
- stateChanges.observeBuild {
- innerJob.cancel()
- innerJob = effect { block(it) }
- }
+ fun <A> TState<A>.observeLatest(block: FrpEffectScope.(A) -> Unit = {}): Job = launchScope {
+ var innerJob = effect { block(sample()) }
+ stateChanges.observeBuild {
+ innerJob.cancel()
+ innerJob = effect { block(it) }
}
+ }
/**
* Applies [block] to the value held by this [TState]. [block] receives an [FrpBuildScope] that
@@ -670,17 +671,16 @@ interface FrpBuildScope : FrpStateScope {
* [observers][observe] are unregistered, and any pending [side-effects][effect] are cancelled).
*/
@ExperimentalFrpApi
- fun <A> TState<A>.observeLatestBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
- launchScope {
- var innerJob: Job = launchScope { block(sample()) }
- stateChanges.observeBuild {
- innerJob.cancel()
- innerJob = launchScope { block(it) }
- }
+ fun <A> TState<A>.observeLatestBuild(block: FrpBuildScope.(A) -> Unit = {}): Job = launchScope {
+ var innerJob: Job = launchScope { block(sample()) }
+ stateChanges.observeBuild {
+ innerJob.cancel()
+ innerJob = launchScope { block(it) }
}
+ }
/** Applies the [FrpSpec] within this [FrpBuildScope]. */
- @ExperimentalFrpApi suspend fun <A> FrpSpec<A>.applySpec(): A = this()
+ @ExperimentalFrpApi fun <A> FrpSpec<A>.applySpec(): A = this()
/**
* Applies the [FrpSpec] within this [FrpBuildScope], returning the result as an
@@ -695,11 +695,10 @@ interface FrpBuildScope : FrpStateScope {
* [effect].
*/
@ExperimentalFrpApi
- fun <A> TState<A>.observeBuild(block: suspend FrpBuildScope.(A) -> Unit = {}): Job =
- launchScope {
- block(sample())
- stateChanges.observeBuild(block)
- }
+ fun <A> TState<A>.observeBuild(block: FrpBuildScope.(A) -> Unit = {}): Job = launchScope {
+ block(sample())
+ stateChanges.observeBuild(block)
+ }
/**
* Invokes [block] with the current value of this [TState], re-invoking whenever it changes,
@@ -714,7 +713,7 @@ interface FrpBuildScope : FrpStateScope {
* otherwise, it will be invoked with the [current][sample] value.
*/
@ExperimentalFrpApi
- fun <A> TState<A>.observe(block: suspend FrpEffectScope.(A) -> Unit = {}): Job =
+ fun <A> TState<A>.observe(block: FrpEffectScope.(A) -> Unit = {}): Job =
now.map { sample() }.mergeWith(stateChanges) { _, new -> new }.observe { block(it) }
}
@@ -753,7 +752,10 @@ fun <A> FrpBuildScope.asyncTFlow(block: suspend () -> A): TFlow<A> =
* ```
*/
@ExperimentalFrpApi
-fun FrpBuildScope.effect(block: suspend FrpEffectScope.() -> Unit): Job = now.observe { block() }
+fun FrpBuildScope.effect(
+ context: CoroutineContext = EmptyCoroutineContext,
+ block: FrpEffectScope.() -> Unit,
+): Job = now.observe(context) { block() }
/**
* Launches [block] in a new coroutine, returning a [Job] bound to the coroutine.
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpNetwork.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpNetwork.kt
index cec76886c06d..0679848c6c80 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpNetwork.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpNetwork.kt
@@ -23,7 +23,6 @@ import com.android.systemui.kairos.internal.util.awaitCancellationAndThen
import com.android.systemui.kairos.internal.util.childScope
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
-import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
@@ -53,7 +52,7 @@ interface FrpNetwork {
* If the network is cancelled while the caller of [transact] is suspended, then the call will
* be cancelled.
*/
- @ExperimentalFrpApi suspend fun <R> transact(block: suspend FrpTransactionScope.() -> R): R
+ @ExperimentalFrpApi suspend fun <R> transact(block: FrpTransactionScope.() -> R): R
/**
* Activates [spec] in a transaction, suspending indefinitely. While suspended, all observers
@@ -133,22 +132,36 @@ internal class LocalFrpNetwork(
private val scope: CoroutineScope,
private val endSignal: TFlow<Any>,
) : FrpNetwork {
- override suspend fun <R> transact(block: suspend FrpTransactionScope.() -> R): R =
+ override suspend fun <R> transact(block: FrpTransactionScope.() -> R): R =
network.transaction("FrpNetwork.transact") { runInTransactionScope { block() } }.await()
override suspend fun activateSpec(spec: FrpSpec<*>) {
+ val stopEmitter =
+ CoalescingMutableTFlow(
+ name = "activateSpec",
+ coalesce = { _, _: Unit -> },
+ network = network,
+ getInitialValue = {},
+ )
val job =
network
.transaction("FrpNetwork.activateSpec") {
val buildScope =
BuildScopeImpl(
- stateScope = StateScopeImpl(evalScope = this, endSignal = endSignal),
+ stateScope =
+ StateScopeImpl(
+ evalScope = this,
+ endSignal = mergeLeft(stopEmitter, endSignal),
+ ),
coroutineScope = scope,
)
buildScope.runInBuildScope { launchScope(spec) }
}
.await()
- awaitCancellationAndThen { job.cancel() }
+ awaitCancellationAndThen {
+ stopEmitter.emit(Unit)
+ job.cancel()
+ }
}
override fun <In, Out> coalescingMutableTFlow(
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpScope.kt
index ad6b2c8d04eb..92cb13f77d04 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpScope.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpScope.kt
@@ -16,13 +16,8 @@
package com.android.systemui.kairos
+import com.android.systemui.kairos.internal.CompletableLazy
import kotlin.coroutines.RestrictsSuspension
-import kotlin.coroutines.resume
-import kotlin.coroutines.resumeWithException
-import kotlinx.coroutines.CompletableDeferred
-import kotlinx.coroutines.Deferred
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.suspendCancellableCoroutine
/** Denotes [FrpScope] interfaces as [DSL markers][DslMarker]. */
@DslMarker annotation class FrpScopeMarker
@@ -37,13 +32,7 @@ interface FrpScope {
/**
* Returns the value held by the [FrpDeferredValue], suspending until available if necessary.
*/
- @ExperimentalFrpApi
- @OptIn(ExperimentalCoroutinesApi::class)
- suspend fun <A> FrpDeferredValue<A>.get(): A = suspendCancellableCoroutine { k ->
- unwrapped.invokeOnCompletion { ex ->
- ex?.let { k.resumeWithException(ex) } ?: k.resume(unwrapped.getCompleted())
- }
- }
+ @ExperimentalFrpApi fun <A> FrpDeferredValue<A>.get(): A = unwrapped.value
}
/**
@@ -53,7 +42,7 @@ interface FrpScope {
* @see FrpScope.get
*/
@ExperimentalFrpApi
-class FrpDeferredValue<out A> internal constructor(internal val unwrapped: Deferred<A>)
+class FrpDeferredValue<out A> internal constructor(internal val unwrapped: Lazy<A>)
/**
* Returns the value held by this [FrpDeferredValue], or throws [IllegalStateException] if it is not
@@ -64,10 +53,8 @@ class FrpDeferredValue<out A> internal constructor(internal val unwrapped: Defer
*
* @see FrpScope.get
*/
-@ExperimentalFrpApi
-@OptIn(ExperimentalCoroutinesApi::class)
-fun <A> FrpDeferredValue<A>.getUnsafe(): A = unwrapped.getCompleted()
+@ExperimentalFrpApi fun <A> FrpDeferredValue<A>.getUnsafe(): A = unwrapped.value
/** Returns an already-available [FrpDeferredValue] containing [value]. */
@ExperimentalFrpApi
-fun <A> deferredOf(value: A): FrpDeferredValue<A> = FrpDeferredValue(CompletableDeferred(value))
+fun <A> deferredOf(value: A): FrpDeferredValue<A> = FrpDeferredValue(CompletableLazy(value))
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpStateScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpStateScope.kt
index 058fc1037e58..3de246300501 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpStateScope.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpStateScope.kt
@@ -30,7 +30,7 @@ import com.android.systemui.kairos.util.partitionEithers
import com.android.systemui.kairos.util.zipWith
import kotlin.coroutines.RestrictsSuspension
-typealias FrpStateful<R> = suspend FrpStateScope.() -> R
+typealias FrpStateful<R> = FrpStateScope.() -> R
/**
* Returns a [FrpStateful] that, when [applied][FrpStateScope.applyStateful], invokes [block] with
@@ -39,7 +39,7 @@ typealias FrpStateful<R> = suspend FrpStateScope.() -> R
// TODO: caching story? should each Scope have a cache of applied FrpStateful instances?
@ExperimentalFrpApi
@Suppress("NOTHING_TO_INLINE")
-inline fun <A> statefully(noinline block: suspend FrpStateScope.() -> A): FrpStateful<A> = block
+inline fun <A> statefully(noinline block: FrpStateScope.() -> A): FrpStateful<A> = block
/**
* Operations that accumulate state within the FRP network.
@@ -55,7 +55,7 @@ interface FrpStateScope : FrpTransactionScope {
/** TODO */
@ExperimentalFrpApi
// TODO: wish this could just be `deferred` but alas
- fun <A> deferredStateScope(block: suspend FrpStateScope.() -> A): FrpDeferredValue<A>
+ fun <A> deferredStateScope(block: FrpStateScope.() -> A): FrpDeferredValue<A>
/**
* Returns a [TState] that holds onto the most recently emitted value from this [TFlow], or
@@ -86,7 +86,8 @@ interface FrpStateScope : FrpTransactionScope {
*/
@ExperimentalFrpApi
fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
- initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
+ name: String? = null,
+ initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>,
): TFlow<Map<K, V>>
/**
@@ -108,7 +109,8 @@ interface FrpStateScope : FrpTransactionScope {
*/
@ExperimentalFrpApi
fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
- initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
+ initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>,
+ name: String? = null,
): TFlow<Map<K, V>>
// TODO: everything below this comment can be made into extensions once we have context params
@@ -132,8 +134,9 @@ interface FrpStateScope : FrpTransactionScope {
*/
@ExperimentalFrpApi
fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
- initialTFlows: Map<K, TFlow<V>> = emptyMap()
- ): TFlow<Map<K, V>> = mergeIncrementally(deferredOf(initialTFlows))
+ name: String? = null,
+ initialTFlows: Map<K, TFlow<V>> = emptyMap(),
+ ): TFlow<Map<K, V>> = mergeIncrementally(name, deferredOf(initialTFlows))
/**
* Returns a [TFlow] that emits from a merged, incrementally-accumulated collection of [TFlow]s
@@ -154,11 +157,12 @@ interface FrpStateScope : FrpTransactionScope {
*/
@ExperimentalFrpApi
fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
- initialTFlows: Map<K, TFlow<V>> = emptyMap()
- ): TFlow<Map<K, V>> = mergeIncrementallyPromptly(deferredOf(initialTFlows))
+ initialTFlows: Map<K, TFlow<V>> = emptyMap(),
+ name: String? = null,
+ ): TFlow<Map<K, V>> = mergeIncrementallyPromptly(deferredOf(initialTFlows), name)
/** Applies the [FrpStateful] within this [FrpStateScope]. */
- @ExperimentalFrpApi suspend fun <A> FrpStateful<A>.applyStateful(): A = this()
+ @ExperimentalFrpApi fun <A> FrpStateful<A>.applyStateful(): A = this()
/**
* Applies the [FrpStateful] within this [FrpStateScope], returning the result as an
@@ -198,7 +202,7 @@ interface FrpStateScope : FrpTransactionScope {
* original [TFlow].
*/
@ExperimentalFrpApi
- fun <A, B> TFlow<A>.mapStateful(transform: suspend FrpStateScope.(A) -> B): TFlow<B> =
+ fun <A, B> TFlow<A>.mapStateful(transform: FrpStateScope.(A) -> B): TFlow<B> =
mapPure { statefully { transform(it) } }.applyStatefuls()
/**
@@ -224,7 +228,7 @@ interface FrpStateScope : FrpTransactionScope {
* invocation of [transform], state accumulation from previous invocation is stopped.
*/
@ExperimentalFrpApi
- fun <A, B> TFlow<A>.mapLatestStateful(transform: suspend FrpStateScope.(A) -> B): TFlow<B> =
+ fun <A, B> TFlow<A>.mapLatestStateful(transform: FrpStateScope.(A) -> B): TFlow<B> =
mapPure { statefully { transform(it) } }.applyLatestStateful()
/**
@@ -235,9 +239,8 @@ interface FrpStateScope : FrpTransactionScope {
* invocation of [transform], state accumulation from previous invocation is stopped.
*/
@ExperimentalFrpApi
- fun <A, B> TFlow<A>.flatMapLatestStateful(
- transform: suspend FrpStateScope.(A) -> TFlow<B>
- ): TFlow<B> = mapLatestStateful(transform).flatten()
+ fun <A, B> TFlow<A>.flatMapLatestStateful(transform: FrpStateScope.(A) -> TFlow<B>): TFlow<B> =
+ mapLatestStateful(transform).flatten()
/**
* Returns a [TFlow] containing the results of applying each [FrpStateful] emitted from the
@@ -394,7 +397,7 @@ interface FrpStateScope : FrpTransactionScope {
fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey(
initialValues: FrpDeferredValue<Map<K, A>>,
numKeys: Int? = null,
- transform: suspend FrpStateScope.(A) -> B,
+ transform: FrpStateScope.(A) -> B,
): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> =
mapPure { patch -> patch.mapValues { (_, v) -> v.map { statefully { transform(it) } } } }
.applyLatestStatefulForKey(
@@ -419,7 +422,7 @@ interface FrpStateScope : FrpTransactionScope {
fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey(
initialValues: Map<K, A>,
numKeys: Int? = null,
- transform: suspend FrpStateScope.(A) -> B,
+ transform: FrpStateScope.(A) -> B,
): Pair<TFlow<Map<K, Maybe<B>>>, FrpDeferredValue<Map<K, B>>> =
mapLatestStatefulForKey(deferredOf(initialValues), numKeys, transform)
@@ -436,7 +439,7 @@ interface FrpStateScope : FrpTransactionScope {
@ExperimentalFrpApi
fun <K, A, B> TFlow<Map<K, Maybe<A>>>.mapLatestStatefulForKey(
numKeys: Int? = null,
- transform: suspend FrpStateScope.(A) -> B,
+ transform: FrpStateScope.(A) -> B,
): TFlow<Map<K, Maybe<B>>> = mapLatestStatefulForKey(emptyMap(), numKeys, transform).first
/**
@@ -447,12 +450,12 @@ interface FrpStateScope : FrpTransactionScope {
* even emitted from the result [TFlow].
*/
@ExperimentalFrpApi
- fun <A> TFlow<A>.nextOnly(): TFlow<A> =
+ fun <A> TFlow<A>.nextOnly(name: String? = null): TFlow<A> =
if (this === emptyTFlow) {
this
} else {
TFlowLoop<A>().also {
- it.loopback = it.mapCheap { emptyTFlow }.hold(this@nextOnly).switch()
+ it.loopback = it.mapCheap { emptyTFlow }.hold(this@nextOnly).switch(name)
}
}
@@ -501,7 +504,7 @@ interface FrpStateScope : FrpTransactionScope {
* emitted that satisfies [predicate].
*/
@ExperimentalFrpApi
- fun <A> TFlow<A>.takeUntil(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> =
+ fun <A> TFlow<A>.takeUntil(predicate: FrpTransactionScope.(A) -> Boolean): TFlow<A> =
takeUntil(filter(predicate))
/**
@@ -515,7 +518,7 @@ interface FrpStateScope : FrpTransactionScope {
@ExperimentalFrpApi
fun <A, B> TFlow<A>.fold(
initialValue: B,
- transform: suspend FrpTransactionScope.(A, B) -> B,
+ transform: FrpTransactionScope.(A, B) -> B,
): TState<B> {
lateinit var state: TState<B>
return mapPure { a -> transform(a, state.sample()) }.hold(initialValue).also { state = it }
@@ -532,7 +535,7 @@ interface FrpStateScope : FrpTransactionScope {
@ExperimentalFrpApi
fun <A, B> TFlow<A>.foldDeferred(
initialValue: FrpDeferredValue<B>,
- transform: suspend FrpTransactionScope.(A, B) -> B,
+ transform: FrpTransactionScope.(A, B) -> B,
): TState<B> {
lateinit var state: TState<B>
return mapPure { a -> transform(a, state.sample()) }
@@ -657,7 +660,7 @@ interface FrpStateScope : FrpTransactionScope {
* ```
*/
@ExperimentalFrpApi
- fun <A, B> TFlow<A>.mapIndexed(transform: suspend FrpTransactionScope.(Int, A) -> B): TFlow<B> {
+ fun <A, B> TFlow<A>.mapIndexed(transform: FrpTransactionScope.(Int, A) -> B): TFlow<B> {
val index = fold(0) { _, i -> i + 1 }
return sample(index) { a, idx -> transform(idx, a) }
}
@@ -679,7 +682,7 @@ interface FrpStateScope : FrpTransactionScope {
@ExperimentalFrpApi
fun <A, B, C> TFlow<A>.sample(
other: TFlow<B>,
- transform: suspend FrpTransactionScope.(A, B) -> C,
+ transform: FrpTransactionScope.(A, B) -> C,
): TFlow<C> {
val state = other.mapCheap { just(it) }.hold(none)
return sample(state) { a, b -> b.map { transform(a, it) } }.filterJust()
@@ -700,7 +703,7 @@ interface FrpStateScope : FrpTransactionScope {
* given function [transform].
*/
@ExperimentalFrpApi
- fun <A, B> TState<A>.map(transform: suspend FrpTransactionScope.(A) -> B): TState<B> =
+ fun <A, B> TState<A>.map(transform: FrpTransactionScope.(A) -> B): TState<B> =
mapPure { transactionally { transform(it) } }.sampleTransactionals()
/**
@@ -713,7 +716,7 @@ interface FrpStateScope : FrpTransactionScope {
fun <A, B, Z> combine(
stateA: TState<A>,
stateB: TState<B>,
- transform: suspend FrpTransactionScope.(A, B) -> Z,
+ transform: FrpTransactionScope.(A, B) -> Z,
): TState<Z> =
com.android.systemui.kairos
.combine(stateA, stateB) { a, b -> transactionally { transform(a, b) } }
@@ -723,6 +726,23 @@ interface FrpStateScope : FrpTransactionScope {
* Returns a [TState] whose value is generated with [transform] by combining the current values
* of each given [TState].
*
+ * @see TState.combineWithTransactionally
+ */
+ @ExperimentalFrpApi
+ fun <A, B, C, Z> combine(
+ stateA: TState<A>,
+ stateB: TState<B>,
+ stateC: TState<C>,
+ transform: FrpTransactionScope.(A, B, C) -> Z,
+ ): TState<Z> =
+ com.android.systemui.kairos
+ .combine(stateA, stateB, stateC) { a, b, c -> transactionally { transform(a, b, c) } }
+ .sampleTransactionals()
+
+ /**
+ * Returns a [TState] whose value is generated with [transform] by combining the current values
+ * of each given [TState].
+ *
* @see TState.combineWith
*/
@ExperimentalFrpApi
@@ -731,7 +751,7 @@ interface FrpStateScope : FrpTransactionScope {
stateB: TState<B>,
stateC: TState<C>,
stateD: TState<D>,
- transform: suspend FrpTransactionScope.(A, B, C, D) -> Z,
+ transform: FrpTransactionScope.(A, B, C, D) -> Z,
): TState<Z> =
com.android.systemui.kairos
.combine(stateA, stateB, stateC, stateD) { a, b, c, d ->
@@ -741,9 +761,8 @@ interface FrpStateScope : FrpTransactionScope {
/** Returns a [TState] by applying [transform] to the value held by the original [TState]. */
@ExperimentalFrpApi
- fun <A, B> TState<A>.flatMap(
- transform: suspend FrpTransactionScope.(A) -> TState<B>
- ): TState<B> = mapPure { transactionally { transform(it) } }.sampleTransactionals().flatten()
+ fun <A, B> TState<A>.flatMap(transform: FrpTransactionScope.(A) -> TState<B>): TState<B> =
+ mapPure { transactionally { transform(it) } }.sampleTransactionals().flatten()
/**
* Returns a [TState] whose value is generated with [transform] by combining the current values
@@ -754,7 +773,7 @@ interface FrpStateScope : FrpTransactionScope {
@ExperimentalFrpApi
fun <A, Z> combine(
vararg states: TState<A>,
- transform: suspend FrpTransactionScope.(List<A>) -> Z,
+ transform: FrpTransactionScope.(List<A>) -> Z,
): TState<Z> = combinePure(*states).map(transform)
/**
@@ -765,7 +784,7 @@ interface FrpStateScope : FrpTransactionScope {
*/
@ExperimentalFrpApi
fun <A, Z> Iterable<TState<A>>.combine(
- transform: suspend FrpTransactionScope.(List<A>) -> Z
+ transform: FrpTransactionScope.(List<A>) -> Z
): TState<Z> = combinePure().map(transform)
/**
@@ -775,6 +794,6 @@ interface FrpStateScope : FrpTransactionScope {
@ExperimentalFrpApi
fun <A, B, C> TState<A>.combineWith(
other: TState<B>,
- transform: suspend FrpTransactionScope.(A, B) -> C,
+ transform: FrpTransactionScope.(A, B) -> C,
): TState<C> = combine(this, other, transform)
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpTransactionScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpTransactionScope.kt
index a7ae1d9646b3..7d48b9853e1c 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpTransactionScope.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/FrpTransactionScope.kt
@@ -44,9 +44,7 @@ interface FrpTransactionScope : FrpScope {
/** TODO */
@ExperimentalFrpApi
- fun <A> deferredTransactionScope(
- block: suspend FrpTransactionScope.() -> A
- ): FrpDeferredValue<A>
+ fun <A> deferredTransactionScope(block: FrpTransactionScope.() -> A): FrpDeferredValue<A>
/** A [TFlow] that emits once, within this transaction, and then never again. */
@ExperimentalFrpApi val now: TFlow<Unit>
@@ -55,11 +53,11 @@ interface FrpTransactionScope : FrpScope {
* Returns the current value held by this [TState]. Guaranteed to be consistent within the same
* transaction.
*/
- @ExperimentalFrpApi suspend fun <A> TState<A>.sample(): A = sampleDeferred().get()
+ @ExperimentalFrpApi fun <A> TState<A>.sample(): A = sampleDeferred().get()
/**
* Returns the current value held by this [Transactional]. Guaranteed to be consistent within
* the same transaction.
*/
- @ExperimentalFrpApi suspend fun <A> Transactional<A>.sample(): A = sampleDeferred().get()
+ @ExperimentalFrpApi fun <A> Transactional<A>.sample(): A = sampleDeferred().get()
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt
index 362a890f44e2..96edc1043325 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt
@@ -16,6 +16,7 @@
package com.android.systemui.kairos
+import com.android.systemui.kairos.internal.CompletableLazy
import com.android.systemui.kairos.internal.DemuxImpl
import com.android.systemui.kairos.internal.Init
import com.android.systemui.kairos.internal.InitScope
@@ -47,7 +48,6 @@ import com.android.systemui.kairos.util.map
import com.android.systemui.kairos.util.toMaybe
import java.util.concurrent.atomic.AtomicReference
import kotlin.reflect.KProperty
-import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
@@ -73,17 +73,18 @@ sealed class TFlow<out A> {
*/
@ExperimentalFrpApi
class TFlowLoop<A> : TFlow<A>() {
- private val deferred = CompletableDeferred<TFlow<A>>()
+ private val deferred = CompletableLazy<TFlow<A>>()
internal val init: Init<TFlowImpl<A>> =
- init(name = null) { deferred.await().init.connect(evalScope = this) }
+ init(name = null) { deferred.value.init.connect(evalScope = this) }
/** The [TFlow] this reference is referring to. */
@ExperimentalFrpApi
var loopback: TFlow<A>? = null
set(value) {
value?.let {
- check(deferred.complete(value)) { "TFlowLoop.loopback has already been set." }
+ check(!deferred.isInitialized()) { "TFlowLoop.loopback has already been set." }
+ deferred.setValue(value)
field = value
}
}
@@ -102,11 +103,11 @@ class TFlowLoop<A> : TFlow<A>() {
/** TODO */
@ExperimentalFrpApi
-fun <A> FrpDeferredValue<TFlow<A>>.defer(): TFlow<A> = deferInline { unwrapped.await() }
+fun <A> FrpDeferredValue<TFlow<A>>.defer(): TFlow<A> = deferInline { unwrapped.value }
/** TODO */
@ExperimentalFrpApi
-fun <A> deferTFlow(block: suspend FrpScope.() -> TFlow<A>): TFlow<A> = deferInline {
+fun <A> deferTFlow(block: FrpScope.() -> TFlow<A>): TFlow<A> = deferInline {
NoScope.runInFrpScope(block)
}
@@ -122,7 +123,7 @@ val <A> TState<A>.stateChanges: TFlow<A>
* @see mapNotNull
*/
@ExperimentalFrpApi
-fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> =
+fun <A, B> TFlow<A>.mapMaybe(transform: FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> =
map(transform).filterJust()
/**
@@ -132,10 +133,9 @@ fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe
* @see mapMaybe
*/
@ExperimentalFrpApi
-fun <A, B> TFlow<A>.mapNotNull(transform: suspend FrpTransactionScope.(A) -> B?): TFlow<B> =
- mapMaybe {
- transform(it).toMaybe()
- }
+fun <A, B> TFlow<A>.mapNotNull(transform: FrpTransactionScope.(A) -> B?): TFlow<B> = mapMaybe {
+ transform(it).toMaybe()
+}
/** Returns a [TFlow] containing only values of the original [TFlow] that are not null. */
@ExperimentalFrpApi
@@ -155,9 +155,11 @@ fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> =
* [TFlow].
*/
@ExperimentalFrpApi
-fun <A, B> TFlow<A>.map(transform: suspend FrpTransactionScope.(A) -> B): TFlow<B> {
+fun <A, B> TFlow<A>.map(transform: FrpTransactionScope.(A) -> B): TFlow<B> {
val mapped: TFlowImpl<B> =
- mapImpl({ init.connect(evalScope = this) }) { a -> runInTransactionScope { transform(a) } }
+ mapImpl({ init.connect(evalScope = this) }) { a, _ ->
+ runInTransactionScope { transform(a) }
+ }
return TFlowInit(constInit(name = null, mapped.cached()))
}
@@ -168,11 +170,11 @@ fun <A, B> TFlow<A>.map(transform: suspend FrpTransactionScope.(A) -> B): TFlow<
* @see map
*/
@ExperimentalFrpApi
-fun <A, B> TFlow<A>.mapCheap(transform: suspend FrpTransactionScope.(A) -> B): TFlow<B> =
+fun <A, B> TFlow<A>.mapCheap(transform: FrpTransactionScope.(A) -> B): TFlow<B> =
TFlowInit(
constInit(
name = null,
- mapImpl({ init.connect(evalScope = this) }) { a ->
+ mapImpl({ init.connect(evalScope = this) }) { a, _ ->
runInTransactionScope { transform(a) }
},
)
@@ -192,7 +194,7 @@ fun <A, B> TFlow<A>.mapCheap(transform: suspend FrpTransactionScope.(A) -> B): T
* [FrpBuildScope.toSharedFlow] or [FrpBuildScope.observe].
*/
@ExperimentalFrpApi
-fun <A> TFlow<A>.onEach(action: suspend FrpTransactionScope.(A) -> Unit): TFlow<A> = map {
+fun <A> TFlow<A>.onEach(action: FrpTransactionScope.(A) -> Unit): TFlow<A> = map {
action(it)
it
}
@@ -202,7 +204,7 @@ fun <A> TFlow<A>.onEach(action: suspend FrpTransactionScope.(A) -> Unit): TFlow<
* [predicate].
*/
@ExperimentalFrpApi
-fun <A> TFlow<A>.filter(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> {
+fun <A> TFlow<A>.filter(predicate: FrpTransactionScope.(A) -> Boolean): TFlow<A> {
val pulse =
filterImpl({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } }
return TFlowInit(constInit(name = null, pulse))
@@ -236,10 +238,12 @@ fun <A, B> TFlow<Pair<A, B>>.unzip(): Pair<TFlow<A>, TFlow<B>> {
@ExperimentalFrpApi
fun <A> TFlow<A>.mergeWith(
other: TFlow<A>,
- transformCoincidence: suspend FrpTransactionScope.(A, A) -> A = { a, _ -> a },
+ name: String? = null,
+ transformCoincidence: FrpTransactionScope.(A, A) -> A = { a, _ -> a },
): TFlow<A> {
val node =
mergeNodes(
+ name = name,
getPulse = { init.connect(evalScope = this) },
getOther = { other.init.connect(evalScope = this) },
) { a, b ->
@@ -355,7 +359,7 @@ fun <K, A> TFlow<Map<K, A>>.groupByKey(numKeys: Int? = null): GroupedTFlow<K, A>
@ExperimentalFrpApi
fun <K, A> TFlow<A>.groupBy(
numKeys: Int? = null,
- extractKey: suspend FrpTransactionScope.(A) -> K,
+ extractKey: FrpTransactionScope.(A) -> K,
): GroupedTFlow<K, A> = map { mapOf(extractKey(it) to it) }.groupByKey(numKeys)
/**
@@ -367,7 +371,7 @@ fun <K, A> TFlow<A>.groupBy(
*/
@ExperimentalFrpApi
fun <A> TFlow<A>.partition(
- predicate: suspend FrpTransactionScope.(A) -> Boolean
+ predicate: FrpTransactionScope.(A) -> Boolean
): Pair<TFlow<A>, TFlow<A>> {
val grouped: GroupedTFlow<Boolean, A> = groupBy(numKeys = 2, extractKey = predicate)
return Pair(grouped.eventsForKey(true), grouped.eventsForKey(false))
@@ -418,22 +422,22 @@ class GroupedTFlow<in K, out A> internal constructor(internal val impl: DemuxImp
* that takes effect immediately, see [switchPromptly].
*/
@ExperimentalFrpApi
-fun <A> TState<TFlow<A>>.switch(): TFlow<A> =
- TFlowInit(
+fun <A> TState<TFlow<A>>.switch(name: String? = null): TFlow<A> {
+ val patches =
+ mapImpl({ init.connect(this).changes }) { newFlow, _ -> newFlow.init.connect(this) }
+ return TFlowInit(
constInit(
name = null,
switchDeferredImplSingle(
+ name = name,
getStorage = {
init.connect(this).getCurrentWithEpoch(this).first.init.connect(this)
},
- getPatches = {
- mapImpl({ init.connect(this).changes }) { newFlow ->
- newFlow.init.connect(this)
- }
- },
+ getPatches = { patches },
),
)
)
+}
/**
* Returns a [TFlow] that switches to the [TFlow] contained within this [TState] whenever it
@@ -444,22 +448,21 @@ fun <A> TState<TFlow<A>>.switch(): TFlow<A> =
*/
// TODO: parameter to handle coincidental emission from both old and new
@ExperimentalFrpApi
-fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> =
- TFlowInit(
+fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> {
+ val patches =
+ mapImpl({ init.connect(this).changes }) { newFlow, _ -> newFlow.init.connect(this) }
+ return TFlowInit(
constInit(
name = null,
switchPromptImplSingle(
getStorage = {
init.connect(this).getCurrentWithEpoch(this).first.init.connect(this)
},
- getPatches = {
- mapImpl({ init.connect(this).changes }) { newFlow ->
- newFlow.init.connect(this)
- }
- },
+ getPatches = { patches },
),
)
)
+}
/**
* A mutable [TFlow] that provides the ability to [emit] values to the flow, handling backpressure
@@ -559,5 +562,5 @@ internal val <A> TFlow<A>.init: Init<TFlowImpl<A>>
is MutableTFlow -> constInit(name, impl.activated())
}
-private inline fun <A> deferInline(crossinline block: suspend InitScope.() -> TFlow<A>): TFlow<A> =
+private inline fun <A> deferInline(crossinline block: InitScope.() -> TFlow<A>): TFlow<A> =
TFlowInit(init(name = null) { block().init.connect(evalScope = this) })
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt
index 66aa2a950fcf..d84a6f2ddb34 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt
@@ -16,6 +16,7 @@
package com.android.systemui.kairos
+import com.android.systemui.kairos.internal.CompletableLazy
import com.android.systemui.kairos.internal.DerivedMapCheap
import com.android.systemui.kairos.internal.Init
import com.android.systemui.kairos.internal.InitScope
@@ -39,10 +40,6 @@ import com.android.systemui.kairos.internal.util.hashString
import com.android.systemui.kairos.internal.zipStateMap
import com.android.systemui.kairos.internal.zipStates
import kotlin.reflect.KProperty
-import kotlinx.coroutines.CompletableDeferred
-import kotlinx.coroutines.Deferred
-import kotlinx.coroutines.async
-import kotlinx.coroutines.coroutineScope
/**
* A time-varying value with discrete changes. Essentially, a combination of a [Transactional] that
@@ -63,11 +60,11 @@ fun <A> tStateOf(value: A): TState<A> {
/** TODO */
@ExperimentalFrpApi
-fun <A> FrpDeferredValue<TState<A>>.defer(): TState<A> = deferInline { unwrapped.await() }
+fun <A> FrpDeferredValue<TState<A>>.defer(): TState<A> = deferInline { unwrapped.value }
/** TODO */
@ExperimentalFrpApi
-fun <A> deferTState(block: suspend FrpScope.() -> TState<A>): TState<A> = deferInline {
+fun <A> deferTState(block: FrpScope.() -> TState<A>): TState<A> = deferInline {
NoScope.runInFrpScope(block)
}
@@ -76,7 +73,7 @@ fun <A> deferTState(block: suspend FrpScope.() -> TState<A>): TState<A> = deferI
* original [TState].
*/
@ExperimentalFrpApi
-fun <A, B> TState<A>.map(transform: suspend FrpScope.(A) -> B): TState<B> {
+fun <A, B> TState<A>.map(transform: FrpScope.(A) -> B): TState<B> {
val operatorName = "map"
val name = operatorName
return TStateInit(
@@ -98,7 +95,7 @@ fun <A, B> TState<A>.map(transform: suspend FrpScope.(A) -> B): TState<B> {
* observable change to the returned [TState].
*/
@ExperimentalFrpApi
-fun <A, B> TState<A>.mapCheapUnsafe(transform: suspend FrpScope.(A) -> B): TState<B> {
+fun <A, B> TState<A>.mapCheapUnsafe(transform: FrpScope.(A) -> B): TState<B> {
val operatorName = "map"
val name = operatorName
return TStateInit(
@@ -115,10 +112,8 @@ fun <A, B> TState<A>.mapCheapUnsafe(transform: suspend FrpScope.(A) -> B): TStat
* the given function [transform].
*/
@ExperimentalFrpApi
-fun <A, B, C> TState<A>.combineWith(
- other: TState<B>,
- transform: suspend FrpScope.(A, B) -> C,
-): TState<C> = combine(this, other, transform)
+fun <A, B, C> TState<A>.combineWith(other: TState<B>, transform: FrpScope.(A, B) -> C): TState<C> =
+ combine(this, other, transform)
/**
* Splits a [TState] of pairs into a pair of [TFlows][TState], where each returned [TState] holds
@@ -181,7 +176,7 @@ fun <K, A> Map<K, TState<A>>.combine(): TState<Map<K, A>> {
* @see TState.combineWith
*/
@ExperimentalFrpApi
-fun <A, B> Iterable<TState<A>>.combine(transform: suspend FrpScope.(List<A>) -> B): TState<B> =
+fun <A, B> Iterable<TState<A>>.combine(transform: FrpScope.(List<A>) -> B): TState<B> =
combine().map(transform)
/**
@@ -199,10 +194,8 @@ fun <A> combine(vararg states: TState<A>): TState<List<A>> = states.asIterable()
* @see TState.combineWith
*/
@ExperimentalFrpApi
-fun <A, B> combine(
- vararg states: TState<A>,
- transform: suspend FrpScope.(List<A>) -> B,
-): TState<B> = states.asIterable().combine(transform)
+fun <A, B> combine(vararg states: TState<A>, transform: FrpScope.(List<A>) -> B): TState<B> =
+ states.asIterable().combine(transform)
/**
* Returns a [TState] whose value is generated with [transform] by combining the current values of
@@ -214,22 +207,16 @@ fun <A, B> combine(
fun <A, B, Z> combine(
stateA: TState<A>,
stateB: TState<B>,
- transform: suspend FrpScope.(A, B) -> Z,
+ transform: FrpScope.(A, B) -> Z,
): TState<Z> {
val operatorName = "combine"
val name = operatorName
return TStateInit(
init(name) {
- coroutineScope {
- val dl1: Deferred<TStateImpl<A>> = async {
- stateA.init.connect(evalScope = this@init)
- }
- val dl2: Deferred<TStateImpl<B>> = async {
- stateB.init.connect(evalScope = this@init)
- }
- zipStates(name, operatorName, dl1.await(), dl2.await()) { a, b ->
- NoScope.runInFrpScope { transform(a, b) }
- }
+ val dl1 = stateA.init.connect(evalScope = this@init)
+ val dl2 = stateB.init.connect(evalScope = this@init)
+ zipStates(name, operatorName, dl1, dl2) { a, b ->
+ NoScope.runInFrpScope { transform(a, b) }
}
}
)
@@ -246,25 +233,17 @@ fun <A, B, C, Z> combine(
stateA: TState<A>,
stateB: TState<B>,
stateC: TState<C>,
- transform: suspend FrpScope.(A, B, C) -> Z,
+ transform: FrpScope.(A, B, C) -> Z,
): TState<Z> {
val operatorName = "combine"
val name = operatorName
return TStateInit(
init(name) {
- coroutineScope {
- val dl1: Deferred<TStateImpl<A>> = async {
- stateA.init.connect(evalScope = this@init)
- }
- val dl2: Deferred<TStateImpl<B>> = async {
- stateB.init.connect(evalScope = this@init)
- }
- val dl3: Deferred<TStateImpl<C>> = async {
- stateC.init.connect(evalScope = this@init)
- }
- zipStates(name, operatorName, dl1.await(), dl2.await(), dl3.await()) { a, b, c ->
- NoScope.runInFrpScope { transform(a, b, c) }
- }
+ val dl1 = stateA.init.connect(evalScope = this@init)
+ val dl2 = stateB.init.connect(evalScope = this@init)
+ val dl3 = stateC.init.connect(evalScope = this@init)
+ zipStates(name, operatorName, dl1, dl2, dl3) { a, b, c ->
+ NoScope.runInFrpScope { transform(a, b, c) }
}
}
)
@@ -282,32 +261,18 @@ fun <A, B, C, D, Z> combine(
stateB: TState<B>,
stateC: TState<C>,
stateD: TState<D>,
- transform: suspend FrpScope.(A, B, C, D) -> Z,
+ transform: FrpScope.(A, B, C, D) -> Z,
): TState<Z> {
val operatorName = "combine"
val name = operatorName
return TStateInit(
init(name) {
- coroutineScope {
- val dl1: Deferred<TStateImpl<A>> = async {
- stateA.init.connect(evalScope = this@init)
- }
- val dl2: Deferred<TStateImpl<B>> = async {
- stateB.init.connect(evalScope = this@init)
- }
- val dl3: Deferred<TStateImpl<C>> = async {
- stateC.init.connect(evalScope = this@init)
- }
- val dl4: Deferred<TStateImpl<D>> = async {
- stateD.init.connect(evalScope = this@init)
- }
- zipStates(name, operatorName, dl1.await(), dl2.await(), dl3.await(), dl4.await()) {
- a,
- b,
- c,
- d ->
- NoScope.runInFrpScope { transform(a, b, c, d) }
- }
+ val dl1 = stateA.init.connect(evalScope = this@init)
+ val dl2 = stateB.init.connect(evalScope = this@init)
+ val dl3 = stateC.init.connect(evalScope = this@init)
+ val dl4 = stateD.init.connect(evalScope = this@init)
+ zipStates(name, operatorName, dl1, dl2, dl3, dl4) { a, b, c, d ->
+ NoScope.runInFrpScope { transform(a, b, c, d) }
}
}
)
@@ -326,39 +291,19 @@ fun <A, B, C, D, E, Z> combine(
stateC: TState<C>,
stateD: TState<D>,
stateE: TState<E>,
- transform: suspend FrpScope.(A, B, C, D, E) -> Z,
+ transform: FrpScope.(A, B, C, D, E) -> Z,
): TState<Z> {
val operatorName = "combine"
val name = operatorName
return TStateInit(
init(name) {
- coroutineScope {
- val dl1: Deferred<TStateImpl<A>> = async {
- stateA.init.connect(evalScope = this@init)
- }
- val dl2: Deferred<TStateImpl<B>> = async {
- stateB.init.connect(evalScope = this@init)
- }
- val dl3: Deferred<TStateImpl<C>> = async {
- stateC.init.connect(evalScope = this@init)
- }
- val dl4: Deferred<TStateImpl<D>> = async {
- stateD.init.connect(evalScope = this@init)
- }
- val dl5: Deferred<TStateImpl<E>> = async {
- stateE.init.connect(evalScope = this@init)
- }
- zipStates(
- name,
- operatorName,
- dl1.await(),
- dl2.await(),
- dl3.await(),
- dl4.await(),
- dl5.await(),
- ) { a, b, c, d, e ->
- NoScope.runInFrpScope { transform(a, b, c, d, e) }
- }
+ val dl1 = stateA.init.connect(evalScope = this@init)
+ val dl2 = stateB.init.connect(evalScope = this@init)
+ val dl3 = stateC.init.connect(evalScope = this@init)
+ val dl4 = stateD.init.connect(evalScope = this@init)
+ val dl5 = stateE.init.connect(evalScope = this@init)
+ zipStates(name, operatorName, dl1, dl2, dl3, dl4, dl5) { a, b, c, d, e ->
+ NoScope.runInFrpScope { transform(a, b, c, d, e) }
}
}
)
@@ -366,7 +311,7 @@ fun <A, B, C, D, E, Z> combine(
/** Returns a [TState] by applying [transform] to the value held by the original [TState]. */
@ExperimentalFrpApi
-fun <A, B> TState<A>.flatMap(transform: suspend FrpScope.(A) -> TState<B>): TState<B> {
+fun <A, B> TState<A>.flatMap(transform: FrpScope.(A) -> TState<B>): TState<B> {
val operatorName = "flatMap"
val name = operatorName
return TStateInit(
@@ -453,10 +398,10 @@ internal constructor(
/** TODO */
@ExperimentalFrpApi
-class MutableTState<T>
-internal constructor(internal val network: Network, initialValue: Deferred<T>) : TState<T>() {
+class MutableTState<T> internal constructor(internal val network: Network, initialValue: Lazy<T>) :
+ TState<T>() {
- private val input: CoalescingMutableTFlow<Deferred<T>, Deferred<T>?> =
+ private val input: CoalescingMutableTFlow<Lazy<T>, Lazy<T>?> =
CoalescingMutableTFlow(
name = null,
coalesce = { _, new -> new },
@@ -469,8 +414,9 @@ internal constructor(internal val network: Network, initialValue: Deferred<T>) :
val name = null
val operatorName = "MutableTState"
lateinit var state: TStateSource<T>
+ val mapImpl = mapImpl(upstream = { changes.activated() }) { it, _ -> it!!.value }
val calm: TFlowImpl<T> =
- filterImpl({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new ->
+ filterImpl({ mapImpl }) { new ->
new != state.getCurrentWithEpoch(evalScope = this).first
}
.cached()
@@ -489,7 +435,7 @@ internal constructor(internal val network: Network, initialValue: Deferred<T>) :
}
/** TODO */
- @ExperimentalFrpApi fun setValue(value: T) = input.emit(CompletableDeferred(value))
+ @ExperimentalFrpApi fun setValue(value: T) = input.emit(CompletableLazy(value))
@ExperimentalFrpApi
fun setValueDeferred(value: FrpDeferredValue<T>) = input.emit(value.unwrapped)
@@ -501,17 +447,18 @@ class TStateLoop<A> : TState<A>() {
private val name: String? = null
- private val deferred = CompletableDeferred<TState<A>>()
+ private val deferred = CompletableLazy<TState<A>>()
internal val init: Init<TStateImpl<A>> =
- init(name) { deferred.await().init.connect(evalScope = this) }
+ init(name) { deferred.value.init.connect(evalScope = this) }
/** The [TState] this [TStateLoop] will forward to. */
@ExperimentalFrpApi
var loopback: TState<A>? = null
set(value) {
value?.let {
- check(deferred.complete(value)) { "TStateLoop.loopback has already been set." }
+ check(!deferred.isInitialized()) { "TStateLoop.loopback has already been set." }
+ deferred.setValue(value)
field = value
}
}
@@ -540,6 +487,5 @@ internal val <A> TState<A>.init: Init<TStateImpl<A>>
is MutableTState -> tState.init
}
-private inline fun <A> deferInline(
- crossinline block: suspend InitScope.() -> TState<A>
-): TState<A> = TStateInit(init(name = null) { block().init.connect(evalScope = this) })
+private inline fun <A> deferInline(crossinline block: InitScope.() -> TState<A>): TState<A> =
+ TStateInit(init(name = null) { block().init.connect(evalScope = this) })
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Transactional.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Transactional.kt
index 6b1c8c8fc3e5..e7a5b1bbd105 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Transactional.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Transactional.kt
@@ -16,13 +16,13 @@
package com.android.systemui.kairos
+import com.android.systemui.kairos.internal.CompletableLazy
import com.android.systemui.kairos.internal.InitScope
import com.android.systemui.kairos.internal.NoScope
import com.android.systemui.kairos.internal.TransactionalImpl
import com.android.systemui.kairos.internal.init
import com.android.systemui.kairos.internal.transactionalImpl
import com.android.systemui.kairos.internal.util.hashString
-import kotlinx.coroutines.CompletableDeferred
/**
* A time-varying value. A [Transactional] encapsulates the idea of some continuous state; each time
@@ -40,12 +40,12 @@ class Transactional<out A> internal constructor(internal val impl: TState<Transa
/** A constant [Transactional] that produces [value] whenever it is sampled. */
@ExperimentalFrpApi
fun <A> transactionalOf(value: A): Transactional<A> =
- Transactional(tStateOf(TransactionalImpl.Const(CompletableDeferred(value))))
+ Transactional(tStateOf(TransactionalImpl.Const(CompletableLazy(value))))
/** TODO */
@ExperimentalFrpApi
fun <A> FrpDeferredValue<Transactional<A>>.defer(): Transactional<A> = deferInline {
- unwrapped.await()
+ unwrapped.value
}
/** TODO */
@@ -53,13 +53,12 @@ fun <A> FrpDeferredValue<Transactional<A>>.defer(): Transactional<A> = deferInli
/** TODO */
@ExperimentalFrpApi
-fun <A> deferTransactional(block: suspend FrpScope.() -> Transactional<A>): Transactional<A> =
- deferInline {
- NoScope.runInFrpScope(block)
- }
+fun <A> deferTransactional(block: FrpScope.() -> Transactional<A>): Transactional<A> = deferInline {
+ NoScope.runInFrpScope(block)
+}
private inline fun <A> deferInline(
- crossinline block: suspend InitScope.() -> Transactional<A>
+ crossinline block: InitScope.() -> Transactional<A>
): Transactional<A> =
Transactional(TStateInit(init(name = null) { block().impl.init.connect(evalScope = this) }))
@@ -68,5 +67,5 @@ private inline fun <A> deferInline(
* transaction; any subsequent sampling within the same transaction will receive a cached value.
*/
@ExperimentalFrpApi
-fun <A> transactionally(block: suspend FrpTransactionScope.() -> A): Transactional<A> =
+fun <A> transactionally(block: FrpTransactionScope.() -> A): Transactional<A> =
Transactional(tStateOf(transactionalImpl { runInTransactionScope(block) }))
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
index 7e6384925f38..14488a3131c7 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/BuildScopeImpl.kt
@@ -34,7 +34,7 @@ import com.android.systemui.kairos.TFlowInit
import com.android.systemui.kairos.groupByKey
import com.android.systemui.kairos.init
import com.android.systemui.kairos.internal.util.childScope
-import com.android.systemui.kairos.internal.util.mapValuesParallel
+import com.android.systemui.kairos.internal.util.launchImmediate
import com.android.systemui.kairos.launchEffect
import com.android.systemui.kairos.mergeLeft
import com.android.systemui.kairos.util.Just
@@ -43,17 +43,12 @@ import com.android.systemui.kairos.util.None
import com.android.systemui.kairos.util.just
import com.android.systemui.kairos.util.map
import java.util.concurrent.atomic.AtomicReference
-import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
-import kotlin.coroutines.startCoroutine
-import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
-import kotlinx.coroutines.completeWith
import kotlinx.coroutines.job
internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope: CoroutineScope) :
@@ -64,45 +59,43 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
override val frpScope: FrpBuildScope = FrpBuildScopeImpl()
- override suspend fun <R> runInBuildScope(block: suspend FrpBuildScope.() -> R): R {
- val complete = CompletableDeferred<R>(parent = coroutineContext.job)
- block.startCoroutine(
- frpScope,
- object : Continuation<R> {
- override val context: CoroutineContext
- get() = EmptyCoroutineContext
-
- override fun resumeWith(result: Result<R>) {
- complete.completeWith(result)
- }
- },
- )
- return complete.await()
- }
+ override fun <R> runInBuildScope(block: FrpBuildScope.() -> R): R = frpScope.block()
private fun <A, T : TFlow<A>, S> buildTFlow(
+ name: String? = null,
constructFlow: (InputNode<A>) -> Pair<T, S>,
builder: suspend S.() -> Unit,
): TFlow<A> {
var job: Job? = null
- val stopEmitter = newStopEmitter("buildTFlow")
+ val stopEmitter = newStopEmitter("buildTFlow[$name]")
// Create a child scope that will be kept alive beyond the end of this transaction.
val childScope = coroutineScope.childScope()
lateinit var emitter: Pair<T, S>
val inputNode =
InputNode<A>(
activate = {
- check(job == null) { "already activated" }
+ // It's possible that activation occurs after all effects have been run, due
+ // to a MuxDeferred switch-in. For this reason, we need to activate in a new
+ // transaction.
+ check(job == null) { "[$name] already activated" }
job =
- reenterBuildScope(this@BuildScopeImpl, childScope).runInBuildScope {
- launchEffect {
- builder(emitter.second)
- stopEmitter.emit(Unit)
- }
+ childScope.launchImmediate {
+ network
+ .transaction("buildTFlow") {
+ reenterBuildScope(this@BuildScopeImpl, childScope)
+ .runInBuildScope {
+ launchEffect {
+ builder(emitter.second)
+ stopEmitter.emit(Unit)
+ }
+ }
+ }
+ .await()
+ .join()
}
},
deactivate = {
- checkNotNull(job) { "already deactivated" }.cancel()
+ checkNotNull(job) { "[$name] already deactivated" }.cancel()
job = null
},
)
@@ -110,8 +103,12 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
return with(frpScope) { emitter.first.takeUntil(mergeLeft(stopEmitter, endSignal)) }
}
- private fun <T> tFlowInternal(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> =
+ private fun <T> tFlowInternal(
+ name: String?,
+ builder: suspend FrpProducerScope<T>.() -> Unit,
+ ): TFlow<T> =
buildTFlow(
+ name,
constructFlow = { inputNode ->
val flow = MutableTFlow(network, inputNode)
flow to
@@ -148,16 +145,16 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
return FrpDeferredValue(deferAsync { childScope.runInBuildScope(block) }) to childScope.job
}
- private fun <R> deferredInternal(block: suspend FrpBuildScope.() -> R): FrpDeferredValue<R> =
+ private fun <R> deferredInternal(block: FrpBuildScope.() -> R): FrpDeferredValue<R> =
FrpDeferredValue(deferAsync { runInBuildScope(block) })
- private fun deferredActionInternal(block: suspend FrpBuildScope.() -> Unit) {
+ private fun deferredActionInternal(block: FrpBuildScope.() -> Unit) {
deferAction { runInBuildScope(block) }
}
private fun <A> TFlow<A>.observeEffectInternal(
context: CoroutineContext,
- block: suspend FrpEffectScope.(A) -> Unit,
+ block: FrpEffectScope.(A) -> Unit,
): Job {
val subRef = AtomicReference<Maybe<Output<A>>>(null)
val childScope = coroutineScope.childScope()
@@ -181,25 +178,13 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
onEmit = { output ->
if (subRef.get() is Just) {
// Not cancelled, safe to emit
- val coroutine: suspend FrpEffectScope.() -> Unit = { block(output) }
- val complete = CompletableDeferred<Unit>(parent = coroutineContext.job)
- coroutine.startCoroutine(
+ val scope =
object : FrpEffectScope, FrpTransactionScope by frpScope {
override val frpCoroutineScope: CoroutineScope = childScope
override val frpNetwork: FrpNetwork =
LocalFrpNetwork(network, childScope, endSignal)
- },
- completion =
- object : Continuation<Unit> {
- override val context: CoroutineContext
- get() = EmptyCoroutineContext
-
- override fun resumeWith(result: Result<Unit>) {
- complete.completeWith(result)
- }
- },
- )
- complete.await()
+ }
+ scope.block(output)
}
},
)
@@ -213,21 +198,19 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
// Job's already been cancelled, schedule deactivation
scheduleDeactivation(outputNode)
} else if (needsEval) {
- outputNode.schedule(evalScope = stateScope.evalScope)
+ outputNode.schedule(0, evalScope = stateScope.evalScope)
}
} ?: run { childScope.cancel() }
}
return childScope.coroutineContext.job
}
- private fun <A, B> TFlow<A>.mapBuildInternal(
- transform: suspend FrpBuildScope.(A) -> B
- ): TFlow<B> {
+ private fun <A, B> TFlow<A>.mapBuildInternal(transform: FrpBuildScope.(A) -> B): TFlow<B> {
val childScope = coroutineScope.childScope()
return TFlowInit(
constInit(
"mapBuild",
- mapImpl({ init.connect(evalScope = this) }) { spec ->
+ mapImpl({ init.connect(evalScope = this) }) { spec, _ ->
reenterBuildScope(outerScope = this@BuildScopeImpl, childScope)
.runInBuildScope { transform(spec) }
}
@@ -241,9 +224,9 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
numKeys: Int?,
): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> {
val eventsByKey: GroupedTFlow<K, Maybe<FrpSpec<A>>> = groupByKey(numKeys)
- val initOut: Deferred<Map<K, B>> = deferAsync {
- init.unwrapped.await().mapValuesParallel { (k, spec) ->
- val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
+ val initOut: Lazy<Map<K, B>> = deferAsync {
+ init.unwrapped.value.mapValues { (k, spec) ->
+ val newEnd = eventsByKey[k]
val newScope = childBuildScope(newEnd)
newScope.runInBuildScope(spec)
}
@@ -251,9 +234,10 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
val childScope = coroutineScope.childScope()
val changesNode: TFlowImpl<Map<K, Maybe<A>>> =
mapImpl(upstream = { this@applyLatestForKeyInternal.init.connect(evalScope = this) }) {
- upstreamMap ->
+ upstreamMap,
+ _ ->
reenterBuildScope(this@BuildScopeImpl, childScope).run {
- upstreamMap.mapValuesParallel { (k: K, ma: Maybe<FrpSpec<A>>) ->
+ upstreamMap.mapValues { (k: K, ma: Maybe<FrpSpec<A>>) ->
ma.map { spec ->
val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
val newScope = childBuildScope(newEnd)
@@ -277,7 +261,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
getInitialValue = {},
)
- private suspend fun childBuildScope(newEnd: TFlow<Any>): BuildScopeImpl {
+ private fun childBuildScope(newEnd: TFlow<Any>): BuildScopeImpl {
val newCoroutineScope: CoroutineScope = coroutineScope.childScope()
return BuildScopeImpl(
stateScope = stateScope.childStateScope(newEnd),
@@ -292,7 +276,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
(newCoroutineScope.coroutineContext.job as CompletableJob).complete()
}
)
- runInBuildScope { endSignal.nextOnly().observe { newCoroutineScope.cancel() } }
+ runInBuildScope { endSignalOnce.observe { newCoroutineScope.cancel() } }
}
}
@@ -318,8 +302,14 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
private inner class FrpBuildScopeImpl : FrpBuildScope, FrpStateScope by stateScope.frpScope {
- override fun <T> tFlow(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> =
- tFlowInternal(builder)
+ override val frpNetwork: FrpNetwork by lazy {
+ LocalFrpNetwork(network, coroutineScope, endSignal)
+ }
+
+ override fun <T> tFlow(
+ name: String?,
+ builder: suspend FrpProducerScope<T>.() -> Unit,
+ ): TFlow<T> = tFlowInternal(name, builder)
override fun <In, Out> coalescingTFlow(
getInitialValue: () -> Out,
@@ -330,19 +320,18 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
override fun <A> asyncScope(block: FrpSpec<A>): Pair<FrpDeferredValue<A>, Job> =
asyncScopeInternal(block)
- override fun <R> deferredBuildScope(
- block: suspend FrpBuildScope.() -> R
- ): FrpDeferredValue<R> = deferredInternal(block)
+ override fun <R> deferredBuildScope(block: FrpBuildScope.() -> R): FrpDeferredValue<R> =
+ deferredInternal(block)
- override fun deferredBuildScopeAction(block: suspend FrpBuildScope.() -> Unit) =
+ override fun deferredBuildScopeAction(block: FrpBuildScope.() -> Unit) =
deferredActionInternal(block)
override fun <A> TFlow<A>.observe(
coroutineContext: CoroutineContext,
- block: suspend FrpEffectScope.(A) -> Unit,
+ block: FrpEffectScope.(A) -> Unit,
): Job = observeEffectInternal(coroutineContext, block)
- override fun <A, B> TFlow<A>.mapBuild(transform: suspend FrpBuildScope.(A) -> B): TFlow<B> =
+ override fun <A, B> TFlow<A>.mapBuild(transform: FrpBuildScope.(A) -> B): TFlow<B> =
mapBuildInternal(transform)
override fun <K, A, B> TFlow<Map<K, Maybe<FrpSpec<A>>>>.applyLatestSpecForKey(
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt
index f65307c6106f..8a66f9a0d40d 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt
@@ -16,33 +16,58 @@
package com.android.systemui.kairos.internal
-import com.android.systemui.kairos.internal.util.asyncImmediate
-import com.android.systemui.kairos.internal.util.launchImmediate
-import kotlinx.coroutines.CoroutineName
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.CoroutineStart
-import kotlinx.coroutines.Deferred
-import kotlinx.coroutines.Job
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.isActive
-
-internal typealias DeferScope = CoroutineScope
-
-internal inline fun DeferScope.deferAction(
- start: CoroutineStart = CoroutineStart.UNDISPATCHED,
- crossinline block: suspend () -> Unit,
-): Job {
- check(isActive) { "Cannot perform deferral, scope already closed." }
- return launchImmediate(start, CoroutineName("deferAction")) { block() }
+internal interface DeferScope {
+ fun deferAction(block: () -> Unit)
+
+ fun <R> deferAsync(block: () -> R): Lazy<R>
}
-internal inline fun <R> DeferScope.deferAsync(
- start: CoroutineStart = CoroutineStart.UNDISPATCHED,
- crossinline block: suspend () -> R,
-): Deferred<R> {
- check(isActive) { "Cannot perform deferral, scope already closed." }
- return asyncImmediate(start, CoroutineName("deferAsync")) { block() }
+internal inline fun <A> deferScope(block: DeferScope.() -> A): A {
+ val scope =
+ object : DeferScope {
+ val deferrals = ArrayDeque<() -> Unit>() // TODO: store lazies instead?
+
+ fun drainDeferrals() {
+ while (deferrals.isNotEmpty()) {
+ deferrals.removeFirst().invoke()
+ }
+ }
+
+ override fun deferAction(block: () -> Unit) {
+ deferrals.add(block)
+ }
+
+ override fun <R> deferAsync(block: () -> R): Lazy<R> =
+ lazy(block).also { deferrals.add { it.value } }
+ }
+ return scope.block().also { scope.drainDeferrals() }
}
-internal suspend inline fun <A> deferScope(noinline block: suspend DeferScope.() -> A): A =
- coroutineScope(block)
+internal object NoValue
+
+internal class CompletableLazy<T> : Lazy<T> {
+
+ private var _value: Any?
+
+ constructor() {
+ _value = NoValue
+ }
+
+ constructor(init: T) {
+ _value = init
+ }
+
+ fun setValue(value: T) {
+ check(_value === NoValue) { "CompletableLazy value already set" }
+ _value = value
+ }
+
+ override val value: T
+ get() {
+ check(_value !== NoValue) { "CompletableLazy accessed before initialized" }
+ @Suppress("UNCHECKED_CAST")
+ return _value as T
+ }
+
+ override fun isInitialized(): Boolean = _value !== NoValue
+}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt
index 5f652525f036..b71a245c71a2 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt
@@ -21,10 +21,8 @@ import com.android.systemui.kairos.internal.store.MapHolder
import com.android.systemui.kairos.internal.store.MapK
import com.android.systemui.kairos.internal.store.MutableMapK
import com.android.systemui.kairos.internal.util.hashString
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.launch
+import com.android.systemui.kairos.internal.util.logDuration
import kotlinx.coroutines.sync.Mutex
-import kotlinx.coroutines.sync.withLock
internal class DemuxNode<W, K, A>(
private val branchNodeByKey: MutableMapK<W, K, DemuxNode<W, K, A>.BranchNode>,
@@ -34,156 +32,111 @@ internal class DemuxNode<W, K, A>(
val schedulable = Schedulable.N(this)
- inline val mutex
- get() = lifecycle.mutex
-
lateinit var upstreamConnection: NodeConnection<MapK<W, K, A>>
@Volatile private var epoch: Long = Long.MIN_VALUE
- suspend fun hasCurrentValueLocked(evalScope: EvalScope, key: K): Boolean =
- evalScope.epoch == epoch && upstreamConnection.getPushEvent(evalScope).contains(key)
+ fun hasCurrentValueLocked(logIndent: Int, evalScope: EvalScope, key: K): Boolean =
+ evalScope.epoch == epoch &&
+ upstreamConnection.getPushEvent(logIndent, evalScope).contains(key)
- suspend fun hasCurrentValue(evalScope: EvalScope, key: K): Boolean =
- mutex.withLock { hasCurrentValueLocked(evalScope, key) }
+ fun hasCurrentValue(logIndent: Int, evalScope: EvalScope, key: K): Boolean =
+ hasCurrentValueLocked(logIndent, evalScope, key)
fun getAndMaybeAddDownstream(key: K): BranchNode =
branchNodeByKey.getOrPut(key) { BranchNode(key) }
- override suspend fun schedule(evalScope: EvalScope) = coroutineScope {
- val upstreamResult = upstreamConnection.getPushEvent(evalScope)
- mutex.withLock {
+ override fun schedule(logIndent: Int, evalScope: EvalScope) =
+ logDuration(logIndent, "DemuxNode.schedule") {
+ val upstreamResult =
+ logDuration("upstream.getPushEvent") {
+ upstreamConnection.getPushEvent(currentLogIndent, evalScope)
+ }
updateEpoch(evalScope)
for ((key, _) in upstreamResult) {
- if (key !in branchNodeByKey) continue
+ if (!branchNodeByKey.contains(key)) continue
val branch = branchNodeByKey.getValue(key)
- // TODO: launchImmediate?
- launch { branch.schedule(evalScope) }
+ branch.schedule(currentLogIndent, evalScope)
}
}
- }
- override suspend fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) {
- coroutineScope {
- mutex.withLock {
- for ((_, branchNode) in branchNodeByKey) {
- branchNode.downstreamSet.adjustDirectUpstream(
- coroutineScope = this,
- scheduler,
- oldDepth,
- newDepth,
- )
- }
- }
+ override fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) {
+ for ((_, branchNode) in branchNodeByKey) {
+ branchNode.downstreamSet.adjustDirectUpstream(scheduler, oldDepth, newDepth)
}
}
- override suspend fun moveIndirectUpstreamToDirect(
+ override fun moveIndirectUpstreamToDirect(
scheduler: Scheduler,
oldIndirectDepth: Int,
oldIndirectSet: Set<MuxDeferredNode<*, *, *>>,
newDirectDepth: Int,
) {
- coroutineScope {
- mutex.withLock {
- for ((_, branchNode) in branchNodeByKey) {
- branchNode.downstreamSet.moveIndirectUpstreamToDirect(
- coroutineScope = this,
- scheduler,
- oldIndirectDepth,
- oldIndirectSet,
- newDirectDepth,
- )
- }
- }
+ for ((_, branchNode) in branchNodeByKey) {
+ branchNode.downstreamSet.moveIndirectUpstreamToDirect(
+ scheduler,
+ oldIndirectDepth,
+ oldIndirectSet,
+ newDirectDepth,
+ )
}
}
- override suspend fun adjustIndirectUpstream(
+ override fun adjustIndirectUpstream(
scheduler: Scheduler,
oldDepth: Int,
newDepth: Int,
removals: Set<MuxDeferredNode<*, *, *>>,
additions: Set<MuxDeferredNode<*, *, *>>,
) {
- coroutineScope {
- mutex.withLock {
- for ((_, branchNode) in branchNodeByKey) {
- branchNode.downstreamSet.adjustIndirectUpstream(
- coroutineScope = this,
- scheduler,
- oldDepth,
- newDepth,
- removals,
- additions,
- )
- }
- }
+ for ((_, branchNode) in branchNodeByKey) {
+ branchNode.downstreamSet.adjustIndirectUpstream(
+ scheduler,
+ oldDepth,
+ newDepth,
+ removals,
+ additions,
+ )
}
}
- override suspend fun moveDirectUpstreamToIndirect(
+ override fun moveDirectUpstreamToIndirect(
scheduler: Scheduler,
oldDirectDepth: Int,
newIndirectDepth: Int,
newIndirectSet: Set<MuxDeferredNode<*, *, *>>,
) {
- coroutineScope {
- mutex.withLock {
- for ((_, branchNode) in branchNodeByKey) {
- branchNode.downstreamSet.moveDirectUpstreamToIndirect(
- coroutineScope = this,
- scheduler,
- oldDirectDepth,
- newIndirectDepth,
- newIndirectSet,
- )
- }
- }
+ for ((_, branchNode) in branchNodeByKey) {
+ branchNode.downstreamSet.moveDirectUpstreamToIndirect(
+ scheduler,
+ oldDirectDepth,
+ newIndirectDepth,
+ newIndirectSet,
+ )
}
}
- override suspend fun removeIndirectUpstream(
+ override fun removeIndirectUpstream(
scheduler: Scheduler,
depth: Int,
indirectSet: Set<MuxDeferredNode<*, *, *>>,
) {
- coroutineScope {
- mutex.withLock {
- lifecycle.lifecycleState = DemuxLifecycleState.Dead
- for ((_, branchNode) in branchNodeByKey) {
- branchNode.downstreamSet.removeIndirectUpstream(
- coroutineScope = this,
- scheduler,
- depth,
- indirectSet,
- )
- }
- }
+ lifecycle.lifecycleState = DemuxLifecycleState.Dead
+ for ((_, branchNode) in branchNodeByKey) {
+ branchNode.downstreamSet.removeIndirectUpstream(scheduler, depth, indirectSet)
}
}
- override suspend fun removeDirectUpstream(scheduler: Scheduler, depth: Int) {
- coroutineScope {
- mutex.withLock {
- lifecycle.lifecycleState = DemuxLifecycleState.Dead
- for ((_, branchNode) in branchNodeByKey) {
- branchNode.downstreamSet.removeDirectUpstream(
- coroutineScope = this,
- scheduler,
- depth,
- )
- }
- }
+ override fun removeDirectUpstream(scheduler: Scheduler, depth: Int) {
+ lifecycle.lifecycleState = DemuxLifecycleState.Dead
+ for ((_, branchNode) in branchNodeByKey) {
+ branchNode.downstreamSet.removeDirectUpstream(scheduler, depth)
}
}
- suspend fun removeDownstreamAndDeactivateIfNeeded(key: K) {
- val deactivate =
- mutex.withLock {
- branchNodeByKey.remove(key)
- branchNodeByKey.isEmpty()
- }
+ fun removeDownstreamAndDeactivateIfNeeded(key: K) {
+ branchNodeByKey.remove(key)
+ val deactivate = branchNodeByKey.isEmpty()
if (deactivate) {
// No need for mutex here; no more concurrent changes to can occur during this phase
lifecycle.lifecycleState = DemuxLifecycleState.Inactive(spec)
@@ -195,57 +148,57 @@ internal class DemuxNode<W, K, A>(
epoch = evalScope.epoch
}
- suspend fun getPushEvent(evalScope: EvalScope, key: K): A =
- upstreamConnection.getPushEvent(evalScope).getValue(key)
+ fun getPushEvent(logIndent: Int, evalScope: EvalScope, key: K): A =
+ logDuration(logIndent, "Demux.getPushEvent($key)") {
+ upstreamConnection.getPushEvent(currentLogIndent, evalScope).getValue(key)
+ }
inner class BranchNode(val key: K) : PushNode<A> {
- private val mutex = Mutex()
-
val downstreamSet = DownstreamSet()
override val depthTracker: DepthTracker
get() = upstreamConnection.depthTracker
- override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean =
- hasCurrentValue(evalScope, key)
+ override fun hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean =
+ hasCurrentValue(logIndent, evalScope, key)
- override suspend fun getPushEvent(evalScope: EvalScope): A = getPushEvent(evalScope, key)
+ override fun getPushEvent(logIndent: Int, evalScope: EvalScope): A =
+ getPushEvent(logIndent, evalScope, key)
- override suspend fun addDownstream(downstream: Schedulable) {
- mutex.withLock { downstreamSet.add(downstream) }
+ override fun addDownstream(downstream: Schedulable) {
+ downstreamSet.add(downstream)
}
- override suspend fun removeDownstream(downstream: Schedulable) {
- mutex.withLock { downstreamSet.remove(downstream) }
+ override fun removeDownstream(downstream: Schedulable) {
+ downstreamSet.remove(downstream)
}
- override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {
- val canDeactivate =
- mutex.withLock {
- downstreamSet.remove(downstream)
- downstreamSet.isEmpty()
- }
+ override fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {
+ downstreamSet.remove(downstream)
+ val canDeactivate = downstreamSet.isEmpty()
if (canDeactivate) {
removeDownstreamAndDeactivateIfNeeded(key)
}
}
- override suspend fun deactivateIfNeeded() {
- if (mutex.withLock { downstreamSet.isEmpty() }) {
+ override fun deactivateIfNeeded() {
+ if (downstreamSet.isEmpty()) {
removeDownstreamAndDeactivateIfNeeded(key)
}
}
- override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {
- if (mutex.withLock { downstreamSet.isEmpty() }) {
+ override fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {
+ if (downstreamSet.isEmpty()) {
evalScope.scheduleDeactivation(this)
}
}
- suspend fun schedule(evalScope: EvalScope) {
- if (!coroutineScope { mutex.withLock { scheduleAll(downstreamSet, evalScope) } }) {
- evalScope.scheduleDeactivation(this)
+ fun schedule(logIndent: Int, evalScope: EvalScope) {
+ logDuration(logIndent, "DemuxBranchNode($key).schedule") {
+ if (!scheduleAll(currentLogIndent, downstreamSet, evalScope)) {
+ evalScope.scheduleDeactivation(this@BranchNode)
+ }
}
}
}
@@ -263,28 +216,27 @@ internal fun <W, K, A> DemuxImpl(
)
internal fun <K, A> demuxMap(
- upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>,
+ upstream: EvalScope.() -> TFlowImpl<Map<K, A>>,
numKeys: Int?,
): DemuxImpl<K, A> =
- DemuxImpl(mapImpl(upstream) { MapHolder(it) }, numKeys, ConcurrentHashMapK.Factory())
+ DemuxImpl(mapImpl(upstream) { it, _ -> MapHolder(it) }, numKeys, ConcurrentHashMapK.Factory())
internal class DemuxActivator<W, K, A>(
private val numKeys: Int?,
private val upstream: TFlowImpl<MapK<W, K, A>>,
private val storeFactory: MutableMapK.Factory<W, K>,
) {
- suspend fun activate(
+ fun activate(
evalScope: EvalScope,
lifecycle: DemuxLifecycle<K, A>,
): Pair<DemuxNode<W, K, A>, Set<K>>? {
val demux = DemuxNode(storeFactory.create(numKeys), lifecycle, this)
- return upstream.activate(evalScope, downstream = demux.schedulable)?.let { (conn, needsEval)
- ->
+ return upstream.activate(evalScope, demux.schedulable)?.let { (conn, needsEval) ->
Pair(
demux.apply { upstreamConnection = conn },
if (needsEval) {
demux.updateEpoch(evalScope)
- conn.getPushEvent(evalScope).keys
+ conn.getPushEvent(0, evalScope).keys
} else {
emptySet()
},
@@ -297,7 +249,7 @@ internal class DemuxImpl<in K, out A>(private val dmux: DemuxLifecycle<K, A>) {
fun eventsForKey(key: K): TFlowImpl<A> = TFlowCheap { downstream ->
dmux.activate(evalScope = this, key)?.let { (branchNode, needsEval) ->
branchNode.addDownstream(downstream)
- val branchNeedsEval = needsEval && branchNode.hasCurrentValue(evalScope = this)
+ val branchNeedsEval = needsEval && branchNode.hasCurrentValue(0, evalScope = this)
ActivationResult(
connection = NodeConnection(branchNode, branchNode),
needsEval = branchNeedsEval,
@@ -311,31 +263,31 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle
override fun toString(): String = "TFlowDmuxState[$hashString][$lifecycleState][$mutex]"
- suspend fun activate(
- evalScope: EvalScope,
- key: K,
- ): Pair<DemuxNode<*, K, A>.BranchNode, Boolean>? =
- mutex.withLock {
- when (val state = lifecycleState) {
- is DemuxLifecycleState.Dead -> null
- is DemuxLifecycleState.Active ->
- state.node.getAndMaybeAddDownstream(key) to
- state.node.hasCurrentValueLocked(evalScope, key)
- is DemuxLifecycleState.Inactive -> {
- state.spec
- .activate(evalScope, this@DemuxLifecycle)
- .also { result ->
- lifecycleState =
- if (result == null) {
- DemuxLifecycleState.Dead
- } else {
- DemuxLifecycleState.Active(result.first)
- }
- }
- ?.let { (node, needsEval) ->
- node.getAndMaybeAddDownstream(key) to (key in needsEval)
- }
- }
+ fun activate(evalScope: EvalScope, key: K): Pair<DemuxNode<*, K, A>.BranchNode, Boolean>? =
+ when (val state = lifecycleState) {
+ is DemuxLifecycleState.Dead -> {
+ null
+ }
+
+ is DemuxLifecycleState.Active -> {
+ state.node.getAndMaybeAddDownstream(key) to
+ state.node.hasCurrentValueLocked(0, evalScope, key)
+ }
+
+ is DemuxLifecycleState.Inactive -> {
+ state.spec
+ .activate(evalScope, this@DemuxLifecycle)
+ .also { result ->
+ lifecycleState =
+ if (result == null) {
+ DemuxLifecycleState.Dead
+ } else {
+ DemuxLifecycleState.Active(result.first)
+ }
+ }
+ ?.let { (node, needsEval) ->
+ node.getAndMaybeAddDownstream(key) to (key in needsEval)
+ }
}
}
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt
index afbd7120653c..9ecfbba7d647 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt
@@ -28,21 +28,13 @@ import com.android.systemui.kairos.emptyTFlow
import com.android.systemui.kairos.init
import com.android.systemui.kairos.mapCheap
import com.android.systemui.kairos.switch
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.EmptyCoroutineContext
-import kotlin.coroutines.startCoroutine
-import kotlinx.coroutines.CompletableDeferred
-import kotlinx.coroutines.completeWith
-import kotlinx.coroutines.job
internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope) :
EvalScope, NetworkScope by networkScope, DeferScope by deferScope {
- private suspend fun <A> Transactional<A>.sample(): A =
- impl.sample().sample(this@EvalScopeImpl).await()
+ private fun <A> Transactional<A>.sample(): A = impl.sample().sample(this@EvalScopeImpl).value
- private suspend fun <A> TState<A>.sample(): A =
+ private fun <A> TState<A>.sample(): A =
init.connect(evalScope = this@EvalScopeImpl).getCurrentWithEpoch(this@EvalScopeImpl).first
private val <A> Transactional<A>.deferredValue: FrpDeferredValue<A>
@@ -62,7 +54,7 @@ internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope)
"now",
this,
{ result.mapCheap { emptyTFlow }.init.connect(evalScope = this) },
- CompletableDeferred(
+ CompletableLazy(
TFlowInit(
constInit(
"now",
@@ -82,25 +74,10 @@ internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope)
result
}
- private fun <R> deferredInternal(
- block: suspend FrpTransactionScope.() -> R
- ): FrpDeferredValue<R> = FrpDeferredValue(deferAsync { runInTransactionScope(block) })
+ private fun <R> deferredInternal(block: FrpTransactionScope.() -> R): FrpDeferredValue<R> =
+ FrpDeferredValue(deferAsync { runInTransactionScope(block) })
- override suspend fun <R> runInTransactionScope(block: suspend FrpTransactionScope.() -> R): R {
- val complete = CompletableDeferred<R>(parent = coroutineContext.job)
- block.startCoroutine(
- frpScope,
- object : Continuation<R> {
- override val context: CoroutineContext
- get() = EmptyCoroutineContext
-
- override fun resumeWith(result: Result<R>) {
- complete.completeWith(result)
- }
- },
- )
- return complete.await()
- }
+ override fun <R> runInTransactionScope(block: FrpTransactionScope.() -> R): R = frpScope.block()
override val frpScope: FrpTransactionScope = FrpTransactionScopeImpl()
@@ -110,7 +87,7 @@ internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope)
override fun <A> TState<A>.sampleDeferred(): FrpDeferredValue<A> = deferredValue
override fun <R> deferredTransactionScope(
- block: suspend FrpTransactionScope.() -> R
+ block: FrpTransactionScope.() -> R
): FrpDeferredValue<R> = deferredInternal(block)
override val now: TFlow<Unit>
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt
index b60c227bcfbe..30c1a865f50a 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt
@@ -24,10 +24,10 @@ import com.android.systemui.kairos.util.just
import com.android.systemui.kairos.util.none
internal inline fun <A> filterJustImpl(
- crossinline getPulse: suspend EvalScope.() -> TFlowImpl<Maybe<A>>
+ crossinline getPulse: EvalScope.() -> TFlowImpl<Maybe<A>>
): TFlowImpl<A> =
DemuxImpl(
- mapImpl(getPulse) { maybeResult ->
+ mapImpl(getPulse) { maybeResult, _ ->
if (maybeResult is Just) {
Single(maybeResult.value)
} else {
@@ -40,6 +40,9 @@ internal inline fun <A> filterJustImpl(
.eventsForKey(Unit)
internal inline fun <A> filterImpl(
- crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>,
- crossinline f: suspend EvalScope.(A) -> Boolean,
-): TFlowImpl<A> = filterJustImpl { mapImpl(getPulse) { if (f(it)) just(it) else none }.cached() }
+ crossinline getPulse: EvalScope.() -> TFlowImpl<A>,
+ crossinline f: EvalScope.(A) -> Boolean,
+): TFlowImpl<A> {
+ val mapped = mapImpl(getPulse) { it, _ -> if (f(it)) just(it) else none }.cached()
+ return filterJustImpl { mapped }
+}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Graph.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Graph.kt
index 828f13b026d3..667002bd413c 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Graph.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Graph.kt
@@ -18,8 +18,6 @@ package com.android.systemui.kairos.internal
import com.android.systemui.kairos.internal.util.Bag
import java.util.TreeMap
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.launch
/**
* Tracks all upstream connections for Mux nodes.
@@ -86,7 +84,7 @@ internal class DepthTracker {
@Volatile private var dirty_depthIsDirect = true
@Volatile private var dirty_isIndirectRoot = false
- fun schedule(scheduler: Scheduler, node: MuxNode<*, *, *, *>) {
+ fun schedule(scheduler: Scheduler, node: MuxNode<*, *, *>) {
if (dirty_depthIsDirect) {
scheduler.schedule(dirty_directDepth, node)
} else {
@@ -192,30 +190,27 @@ internal class DepthTracker {
return remainder
}
- suspend fun propagateChanges(scheduler: Scheduler, muxNode: MuxNode<*, *, *, *>) {
+ fun propagateChanges(scheduler: Scheduler, muxNode: MuxNode<*, *, *>) {
if (isDirty()) {
schedule(scheduler, muxNode)
}
}
fun applyChanges(
- coroutineScope: CoroutineScope,
scheduler: Scheduler,
downstreamSet: DownstreamSet,
- muxNode: MuxNode<*, *, *, *>,
+ muxNode: MuxNode<*, *, *>,
) {
when {
dirty_depthIsDirect -> {
if (snapshotIsDirect) {
downstreamSet.adjustDirectUpstream(
- coroutineScope,
scheduler,
oldDepth = snapshotDirectDepth,
newDepth = dirty_directDepth,
)
} else {
downstreamSet.moveIndirectUpstreamToDirect(
- coroutineScope,
scheduler,
oldIndirectDepth = snapshotIndirectDepth,
oldIndirectSet =
@@ -233,7 +228,6 @@ internal class DepthTracker {
dirty_hasIndirectUpstream() || dirty_isIndirectRoot -> {
if (snapshotIsDirect) {
downstreamSet.moveDirectUpstreamToIndirect(
- coroutineScope,
scheduler,
oldDirectDepth = snapshotDirectDepth,
newIndirectDepth = dirty_indirectDepth,
@@ -247,7 +241,6 @@ internal class DepthTracker {
)
} else {
downstreamSet.adjustIndirectUpstream(
- coroutineScope,
scheduler,
oldDepth = snapshotIndirectDepth,
newDepth = dirty_indirectDepth,
@@ -274,14 +267,9 @@ internal class DepthTracker {
muxNode.lifecycle.lifecycleState = MuxLifecycleState.Dead
if (snapshotIsDirect) {
- downstreamSet.removeDirectUpstream(
- coroutineScope,
- scheduler,
- depth = snapshotDirectDepth,
- )
+ downstreamSet.removeDirectUpstream(scheduler, depth = snapshotDirectDepth)
} else {
downstreamSet.removeIndirectUpstream(
- coroutineScope,
scheduler,
depth = snapshotIndirectDepth,
indirectSet =
@@ -374,125 +362,92 @@ internal class DownstreamSet {
}
}
- fun adjustDirectUpstream(
- coroutineScope: CoroutineScope,
- scheduler: Scheduler,
- oldDepth: Int,
- newDepth: Int,
- ) =
- coroutineScope.run {
- for (node in nodes) {
- launch { node.adjustDirectUpstream(scheduler, oldDepth, newDepth) }
- }
+ fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) {
+ for (node in nodes) {
+ node.adjustDirectUpstream(scheduler, oldDepth, newDepth)
}
+ }
fun moveIndirectUpstreamToDirect(
- coroutineScope: CoroutineScope,
scheduler: Scheduler,
oldIndirectDepth: Int,
oldIndirectSet: Set<MuxDeferredNode<*, *, *>>,
newDirectDepth: Int,
- ) =
- coroutineScope.run {
- for (node in nodes) {
- launch {
- node.moveIndirectUpstreamToDirect(
- scheduler,
- oldIndirectDepth,
- oldIndirectSet,
- newDirectDepth,
- )
- }
- }
- for (mover in muxMovers) {
- launch {
- mover.moveIndirectPatchNodeToDirect(scheduler, oldIndirectDepth, oldIndirectSet)
- }
- }
+ ) {
+ for (node in nodes) {
+ node.moveIndirectUpstreamToDirect(
+ scheduler,
+ oldIndirectDepth,
+ oldIndirectSet,
+ newDirectDepth,
+ )
+ }
+ for (mover in muxMovers) {
+ mover.moveIndirectPatchNodeToDirect(scheduler, oldIndirectDepth, oldIndirectSet)
}
+ }
fun adjustIndirectUpstream(
- coroutineScope: CoroutineScope,
scheduler: Scheduler,
oldDepth: Int,
newDepth: Int,
removals: Set<MuxDeferredNode<*, *, *>>,
additions: Set<MuxDeferredNode<*, *, *>>,
- ) =
- coroutineScope.run {
- for (node in nodes) {
- launch {
- node.adjustIndirectUpstream(scheduler, oldDepth, newDepth, removals, additions)
- }
- }
- for (mover in muxMovers) {
- launch {
- mover.adjustIndirectPatchNode(
- scheduler,
- oldDepth,
- newDepth,
- removals,
- additions,
- )
- }
- }
+ ) {
+ for (node in nodes) {
+ node.adjustIndirectUpstream(scheduler, oldDepth, newDepth, removals, additions)
+ }
+ for (mover in muxMovers) {
+ mover.adjustIndirectPatchNode(scheduler, oldDepth, newDepth, removals, additions)
}
+ }
fun moveDirectUpstreamToIndirect(
- coroutineScope: CoroutineScope,
scheduler: Scheduler,
oldDirectDepth: Int,
newIndirectDepth: Int,
newIndirectSet: Set<MuxDeferredNode<*, *, *>>,
- ) =
- coroutineScope.run {
- for (node in nodes) {
- launch {
- node.moveDirectUpstreamToIndirect(
- scheduler,
- oldDirectDepth,
- newIndirectDepth,
- newIndirectSet,
- )
- }
- }
- for (mover in muxMovers) {
- launch {
- mover.moveDirectPatchNodeToIndirect(scheduler, newIndirectDepth, newIndirectSet)
- }
- }
+ ) {
+ for (node in nodes) {
+ node.moveDirectUpstreamToIndirect(
+ scheduler,
+ oldDirectDepth,
+ newIndirectDepth,
+ newIndirectSet,
+ )
+ }
+ for (mover in muxMovers) {
+ mover.moveDirectPatchNodeToIndirect(scheduler, newIndirectDepth, newIndirectSet)
}
+ }
fun removeIndirectUpstream(
- coroutineScope: CoroutineScope,
scheduler: Scheduler,
depth: Int,
indirectSet: Set<MuxDeferredNode<*, *, *>>,
- ) =
- coroutineScope.run {
- for (node in nodes) {
- launch { node.removeIndirectUpstream(scheduler, depth, indirectSet) }
- }
- for (mover in muxMovers) {
- launch { mover.removeIndirectPatchNode(scheduler, depth, indirectSet) }
- }
- for (output in outputs) {
- launch { output.kill() }
- }
+ ) {
+ for (node in nodes) {
+ node.removeIndirectUpstream(scheduler, depth, indirectSet)
+ }
+ for (mover in muxMovers) {
+ mover.removeIndirectPatchNode(scheduler, depth, indirectSet)
}
+ for (output in outputs) {
+ output.kill()
+ }
+ }
- fun removeDirectUpstream(coroutineScope: CoroutineScope, scheduler: Scheduler, depth: Int) =
- coroutineScope.run {
- for (node in nodes) {
- launch { node.removeDirectUpstream(scheduler, depth) }
- }
- for (mover in muxMovers) {
- launch { mover.removeDirectPatchNode(scheduler) }
- }
- for (output in outputs) {
- launch { output.kill() }
- }
+ fun removeDirectUpstream(scheduler: Scheduler, depth: Int) {
+ for (node in nodes) {
+ node.removeDirectUpstream(scheduler, depth)
+ }
+ for (mover in muxMovers) {
+ mover.removeDirectPatchNode(scheduler)
}
+ for (output in outputs) {
+ output.kill()
+ }
+ }
fun clear() {
outputs.clear()
@@ -518,13 +473,14 @@ internal fun DownstreamSet.isEmpty() =
@Suppress("NOTHING_TO_INLINE") internal inline fun DownstreamSet.isNotEmpty() = !isEmpty()
-internal fun CoroutineScope.scheduleAll(
+internal fun scheduleAll(
+ logIndent: Int,
downstreamSet: DownstreamSet,
evalScope: EvalScope,
): Boolean {
- downstreamSet.nodes.forEach { launch { it.schedule(evalScope) } }
- downstreamSet.muxMovers.forEach { launch { it.scheduleMover(evalScope) } }
- downstreamSet.outputs.forEach { launch { it.schedule(evalScope) } }
+ downstreamSet.nodes.forEach { it.schedule(logIndent, evalScope) }
+ downstreamSet.muxMovers.forEach { it.scheduleMover(logIndent, evalScope) }
+ downstreamSet.outputs.forEach { it.schedule(logIndent, evalScope) }
downstreamSet.stateWriters.forEach { evalScope.schedule(it) }
return downstreamSet.isNotEmpty()
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt
index 57db9a493e21..10a46775beb9 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt
@@ -19,42 +19,37 @@ package com.android.systemui.kairos.internal
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.just
import com.android.systemui.kairos.util.none
-import java.util.concurrent.atomic.AtomicBoolean
-import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
/** Performs actions once, when the reactive component is first connected to the network. */
-internal class Init<out A>(val name: String?, private val block: suspend InitScope.() -> A) {
-
- /** Has the initialization logic been evaluated yet? */
- private val initialized = AtomicBoolean()
+internal class Init<out A>(val name: String?, private val block: InitScope.() -> A) {
/**
* Stores the result after initialization, as well as the id of the [Network] it's been
* initialized with.
*/
- private val cache = CompletableDeferred<Pair<Any, A>>()
+ private val cache = CompletableLazy<Pair<Any, A>>()
- suspend fun connect(evalScope: InitScope): A =
- if (initialized.getAndSet(true)) {
+ fun connect(evalScope: InitScope): A =
+ if (cache.isInitialized()) {
// Read from cache
- val (networkId, result) = cache.await()
+ val (networkId, result) = cache.value
check(networkId == evalScope.networkId) { "Network mismatch" }
result
} else {
// Write to cache
- block(evalScope).also { cache.complete(evalScope.networkId to it) }
+ block(evalScope).also { cache.setValue(evalScope.networkId to it) }
}
@OptIn(ExperimentalCoroutinesApi::class)
fun getUnsafe(): Maybe<A> =
- if (cache.isCompleted) {
- just(cache.getCompleted().second)
+ if (cache.isInitialized()) {
+ just(cache.value.second)
} else {
none
}
}
-internal fun <A> init(name: String?, block: suspend InitScope.() -> A) = Init(name, block)
+internal fun <A> init(name: String?, block: InitScope.() -> A) = Init(name, block)
internal fun <A> constInit(name: String?, value: A) = init(name) { value }
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt
index 1edc8c28b2ee..1dcba4433a8d 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt
@@ -16,104 +16,103 @@
package com.android.systemui.kairos.internal
-import com.android.systemui.kairos.internal.util.Key
+import com.android.systemui.kairos.internal.util.logDuration
import java.util.concurrent.atomic.AtomicBoolean
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.sync.Mutex
-import kotlinx.coroutines.sync.withLock
internal class InputNode<A>(
- private val activate: suspend EvalScope.() -> Unit = {},
+ private val activate: EvalScope.() -> Unit = {},
private val deactivate: () -> Unit = {},
-) : PushNode<A>, Key<A> {
+) : PushNode<A> {
private val downstreamSet = DownstreamSet()
- private val mutex = Mutex()
- private val activated = AtomicBoolean(false)
+ val activated = AtomicBoolean(false)
- @Volatile private var epoch: Long = Long.MIN_VALUE
+ private val transactionCache = TransactionCache<A>()
+ private val epoch
+ get() = transactionCache.epoch
override val depthTracker: DepthTracker = DepthTracker()
- override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = epoch == evalScope.epoch
+ override fun hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean =
+ epoch == evalScope.epoch
- suspend fun visit(evalScope: EvalScope, value: A) {
- epoch = evalScope.epoch
- evalScope.setResult(this, value)
- coroutineScope {
- if (!mutex.withLock { scheduleAll(downstreamSet, evalScope) }) {
- evalScope.scheduleDeactivation(this@InputNode)
- }
+ fun visit(evalScope: EvalScope, value: A) {
+ transactionCache.put(evalScope, value)
+ if (!scheduleAll(0, downstreamSet, evalScope)) {
+ evalScope.scheduleDeactivation(this@InputNode)
}
}
- override suspend fun removeDownstream(downstream: Schedulable) {
- mutex.withLock { downstreamSet.remove(downstream) }
+ override fun removeDownstream(downstream: Schedulable) {
+ downstreamSet.remove(downstream)
}
- override suspend fun deactivateIfNeeded() {
- if (mutex.withLock { downstreamSet.isEmpty() && activated.getAndSet(false) }) {
+ override fun deactivateIfNeeded() {
+ if (downstreamSet.isEmpty() && activated.getAndSet(false)) {
deactivate()
}
}
- override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {
- if (mutex.withLock { downstreamSet.isEmpty() }) {
+ override fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {
+ if (downstreamSet.isEmpty()) {
evalScope.scheduleDeactivation(this)
}
}
- override suspend fun addDownstream(downstream: Schedulable) {
- mutex.withLock { downstreamSet.add(downstream) }
+ override fun addDownstream(downstream: Schedulable) {
+ downstreamSet.add(downstream)
}
- suspend fun addDownstreamAndActivateIfNeeded(downstream: Schedulable, evalScope: EvalScope) {
- val needsActivation =
- mutex.withLock {
- val wasEmpty = downstreamSet.isEmpty()
- downstreamSet.add(downstream)
- wasEmpty && !activated.getAndSet(true)
- }
+ fun addDownstreamAndActivateIfNeeded(downstream: Schedulable, evalScope: EvalScope) {
+ val needsActivation = run {
+ val wasEmpty = downstreamSet.isEmpty()
+ downstreamSet.add(downstream)
+ wasEmpty && !activated.getAndSet(true)
+ }
if (needsActivation) {
activate(evalScope)
}
}
- override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {
- val needsDeactivation =
- mutex.withLock {
- downstreamSet.remove(downstream)
- downstreamSet.isEmpty() && activated.getAndSet(false)
- }
+ override fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {
+ downstreamSet.remove(downstream)
+ val needsDeactivation = downstreamSet.isEmpty() && activated.getAndSet(false)
if (needsDeactivation) {
deactivate()
}
}
- override suspend fun getPushEvent(evalScope: EvalScope): A = evalScope.getCurrentValue(this)
+ override fun getPushEvent(logIndent: Int, evalScope: EvalScope): A =
+ logDuration(logIndent, "Input.getPushEvent", false) {
+ transactionCache.getCurrentValue(evalScope)
+ }
}
internal fun <A> InputNode<A>.activated() = TFlowCheap { downstream ->
val input = this@activated
addDownstreamAndActivateIfNeeded(downstream, evalScope = this)
- ActivationResult(connection = NodeConnection(input, input), needsEval = hasCurrentValue(input))
+ ActivationResult(
+ connection = NodeConnection(input, input),
+ needsEval = input.hasCurrentValue(0, evalScope = this),
+ )
}
internal data object AlwaysNode : PushNode<Unit> {
override val depthTracker = DepthTracker()
- override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = true
+ override fun hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean = true
- override suspend fun removeDownstream(downstream: Schedulable) {}
+ override fun removeDownstream(downstream: Schedulable) {}
- override suspend fun deactivateIfNeeded() {}
+ override fun deactivateIfNeeded() {}
- override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {}
+ override fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {}
- override suspend fun addDownstream(downstream: Schedulable) {}
+ override fun addDownstream(downstream: Schedulable) {}
- override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {}
+ override fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {}
- override suspend fun getPushEvent(evalScope: EvalScope) = Unit
+ override fun getPushEvent(logIndent: Int, evalScope: EvalScope) =
+ logDuration(logIndent, "Always.getPushEvent", false) { Unit }
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt
index 80c40ba740a5..62bf34810de7 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt
@@ -20,8 +20,6 @@ import com.android.systemui.kairos.FrpBuildScope
import com.android.systemui.kairos.FrpStateScope
import com.android.systemui.kairos.FrpTransactionScope
import com.android.systemui.kairos.TFlow
-import com.android.systemui.kairos.internal.util.HeteroMap
-import com.android.systemui.kairos.internal.util.Key
internal interface InitScope {
val networkId: Any
@@ -30,23 +28,25 @@ internal interface InitScope {
internal interface EvalScope : NetworkScope, DeferScope {
val frpScope: FrpTransactionScope
- suspend fun <R> runInTransactionScope(block: suspend FrpTransactionScope.() -> R): R
+ fun <R> runInTransactionScope(block: FrpTransactionScope.() -> R): R
}
internal interface StateScope : EvalScope {
override val frpScope: FrpStateScope
- suspend fun <R> runInStateScope(block: suspend FrpStateScope.() -> R): R
+ fun <R> runInStateScope(block: FrpStateScope.() -> R): R
val endSignal: TFlow<Any>
fun childStateScope(newEnd: TFlow<Any>): StateScope
+
+ val endSignalOnce: TFlow<Any>
}
internal interface BuildScope : StateScope {
override val frpScope: FrpBuildScope
- suspend fun <R> runInBuildScope(block: suspend FrpBuildScope.() -> R): R
+ fun <R> runInBuildScope(block: FrpBuildScope.() -> R): R
}
internal interface NetworkScope : InitScope {
@@ -57,7 +57,7 @@ internal interface NetworkScope : InitScope {
val compactor: Scheduler
val scheduler: Scheduler
- val transactionStore: HeteroMap
+ val transactionStore: TransactionStore
fun scheduleOutput(output: Output<*>)
@@ -69,12 +69,3 @@ internal interface NetworkScope : InitScope {
fun scheduleDeactivation(output: Output<*>)
}
-
-internal fun <A> NetworkScope.setResult(node: Key<A>, result: A) {
- transactionStore[node] = result
-}
-
-internal fun <A> NetworkScope.getCurrentValue(key: Key<A>): A =
- transactionStore.getOrError(key) { "No value for $key in transaction $epoch" }
-
-internal fun NetworkScope.hasCurrentValue(key: Key<*>): Boolean = transactionStore.contains(key)
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt
index a479c90cc4de..1cdf895ec1ed 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt
@@ -22,44 +22,39 @@ import com.android.systemui.kairos.internal.store.MapHolder
import com.android.systemui.kairos.internal.store.MapK
import com.android.systemui.kairos.internal.store.MutableMapK
import com.android.systemui.kairos.internal.store.asMapHolder
-import com.android.systemui.kairos.internal.util.asyncImmediate
import com.android.systemui.kairos.internal.util.hashString
-import kotlinx.coroutines.CoroutineStart
-import kotlinx.coroutines.awaitAll
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.sync.Mutex
-import kotlinx.coroutines.sync.withLock
+import com.android.systemui.kairos.internal.util.logDuration
internal typealias MuxResult<W, K, V> = MapK<W, K, PullNode<V>>
/** Base class for muxing nodes, which have a (potentially dynamic) collection of upstream nodes. */
-internal sealed class MuxNode<W, K, V, Output>(
- val lifecycle: MuxLifecycle<Output>,
+internal sealed class MuxNode<W, K, V>(
+ val lifecycle: MuxLifecycle<W, K, V>,
protected val storeFactory: MutableMapK.Factory<W, K>,
-) : PushNode<Output> {
+) : PushNode<MuxResult<W, K, V>> {
- inline val mutex
- get() = lifecycle.mutex
+ lateinit var upstreamData: MutableMapK<W, K, PullNode<V>>
+ lateinit var switchedIn: MutableMapK<W, K, BranchNode>
- @Volatile lateinit var upstreamData: MutableMapK<W, K, PullNode<V>>
- @Volatile lateinit var switchedIn: MutableMapK<W, K, BranchNode>
+ @Volatile var markedForCompaction = false
+ @Volatile var markedForEvaluation = false
val downstreamSet: DownstreamSet = DownstreamSet()
// TODO: inline DepthTracker? would need to be added to PushNode signature
final override val depthTracker = DepthTracker()
- @Volatile
- var epoch: Long = Long.MIN_VALUE
- protected set
+ val transactionCache = TransactionCache<MuxResult<W, K, V>>()
+ val epoch
+ get() = transactionCache.epoch
inline fun hasCurrentValueLocked(evalScope: EvalScope): Boolean = epoch == evalScope.epoch
- override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean =
- mutex.withLock { hasCurrentValueLocked(evalScope) }
+ override fun hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean =
+ hasCurrentValueLocked(evalScope)
- final override suspend fun addDownstream(downstream: Schedulable) {
- mutex.withLock { addDownstreamLocked(downstream) }
+ final override fun addDownstream(downstream: Schedulable) {
+ addDownstreamLocked(downstream)
}
/**
@@ -72,135 +67,121 @@ internal sealed class MuxNode<W, K, V, Output>(
downstreamSet.add(downstream)
}
- final override suspend fun removeDownstream(downstream: Schedulable) {
+ final override fun removeDownstream(downstream: Schedulable) {
// TODO: return boolean?
- mutex.withLock { downstreamSet.remove(downstream) }
+ downstreamSet.remove(downstream)
}
- final override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {
- val deactivate =
- mutex.withLock {
- downstreamSet.remove(downstream)
- downstreamSet.isEmpty()
- }
+ final override fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {
+ downstreamSet.remove(downstream)
+ val deactivate = downstreamSet.isEmpty()
if (deactivate) {
doDeactivate()
}
}
- final override suspend fun deactivateIfNeeded() {
- if (mutex.withLock { downstreamSet.isEmpty() }) {
+ final override fun deactivateIfNeeded() {
+ if (downstreamSet.isEmpty()) {
doDeactivate()
}
}
/** visit this node from the scheduler (push eval) */
- abstract suspend fun visit(evalScope: EvalScope)
+ abstract fun visit(logIndent: Int, evalScope: EvalScope)
/** perform deactivation logic, propagating to all upstream nodes. */
- protected abstract suspend fun doDeactivate()
+ protected abstract fun doDeactivate()
- final override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {
- if (mutex.withLock { downstreamSet.isEmpty() }) {
+ final override fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {
+ if (downstreamSet.isEmpty()) {
evalScope.scheduleDeactivation(this)
}
}
- suspend fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) {
- mutex.withLock {
- if (depthTracker.addDirectUpstream(oldDepth, newDepth)) {
- depthTracker.schedule(scheduler, this)
- }
+ fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) {
+
+ if (depthTracker.addDirectUpstream(oldDepth, newDepth)) {
+ depthTracker.schedule(scheduler, this)
}
}
- suspend fun moveIndirectUpstreamToDirect(
+ fun moveIndirectUpstreamToDirect(
scheduler: Scheduler,
oldIndirectDepth: Int,
oldIndirectRoots: Set<MuxDeferredNode<*, *, *>>,
newDepth: Int,
) {
- mutex.withLock {
- if (
- depthTracker.addDirectUpstream(oldDepth = null, newDepth) or
- depthTracker.removeIndirectUpstream(depth = oldIndirectDepth) or
- depthTracker.updateIndirectRoots(removals = oldIndirectRoots)
- ) {
- depthTracker.schedule(scheduler, this)
- }
+ if (
+ depthTracker.addDirectUpstream(oldDepth = null, newDepth) or
+ depthTracker.removeIndirectUpstream(depth = oldIndirectDepth) or
+ depthTracker.updateIndirectRoots(removals = oldIndirectRoots)
+ ) {
+ depthTracker.schedule(scheduler, this)
}
}
- suspend fun adjustIndirectUpstream(
+ fun adjustIndirectUpstream(
scheduler: Scheduler,
oldDepth: Int,
newDepth: Int,
removals: Set<MuxDeferredNode<*, *, *>>,
additions: Set<MuxDeferredNode<*, *, *>>,
) {
- mutex.withLock {
- if (
- depthTracker.addIndirectUpstream(oldDepth, newDepth) or
- depthTracker.updateIndirectRoots(
- additions,
- removals,
- butNot = this as? MuxDeferredNode<*, *, *>,
- )
- ) {
- depthTracker.schedule(scheduler, this)
- }
+ if (
+ depthTracker.addIndirectUpstream(oldDepth, newDepth) or
+ depthTracker.updateIndirectRoots(
+ additions,
+ removals,
+ butNot = this as? MuxDeferredNode<*, *, *>,
+ )
+ ) {
+ depthTracker.schedule(scheduler, this)
}
}
- suspend fun moveDirectUpstreamToIndirect(
+ fun moveDirectUpstreamToIndirect(
scheduler: Scheduler,
oldDepth: Int,
newDepth: Int,
newIndirectSet: Set<MuxDeferredNode<*, *, *>>,
) {
- mutex.withLock {
- if (
- depthTracker.addIndirectUpstream(oldDepth = null, newDepth) or
- depthTracker.removeDirectUpstream(oldDepth) or
- depthTracker.updateIndirectRoots(
- additions = newIndirectSet,
- butNot = this as? MuxDeferredNode<*, *, *>,
- )
- ) {
- depthTracker.schedule(scheduler, this)
- }
+ if (
+ depthTracker.addIndirectUpstream(oldDepth = null, newDepth) or
+ depthTracker.removeDirectUpstream(oldDepth) or
+ depthTracker.updateIndirectRoots(
+ additions = newIndirectSet,
+ butNot = this as? MuxDeferredNode<*, *, *>,
+ )
+ ) {
+ depthTracker.schedule(scheduler, this)
}
}
- suspend fun removeDirectUpstream(scheduler: Scheduler, depth: Int, key: K) {
- mutex.withLock {
- switchedIn.remove(key)
- if (depthTracker.removeDirectUpstream(depth)) {
- depthTracker.schedule(scheduler, this)
- }
+ fun removeDirectUpstream(scheduler: Scheduler, depth: Int, key: K) {
+ switchedIn.remove(key)
+ if (depthTracker.removeDirectUpstream(depth)) {
+ depthTracker.schedule(scheduler, this)
}
}
- suspend fun removeIndirectUpstream(
+ fun removeIndirectUpstream(
scheduler: Scheduler,
oldDepth: Int,
indirectSet: Set<MuxDeferredNode<*, *, *>>,
key: K,
) {
- mutex.withLock {
- switchedIn.remove(key)
- if (
- depthTracker.removeIndirectUpstream(oldDepth) or
- depthTracker.updateIndirectRoots(removals = indirectSet)
- ) {
- depthTracker.schedule(scheduler, this)
- }
+ switchedIn.remove(key)
+ if (
+ depthTracker.removeIndirectUpstream(oldDepth) or
+ depthTracker.updateIndirectRoots(removals = indirectSet)
+ ) {
+ depthTracker.schedule(scheduler, this)
}
}
- suspend fun visitCompact(scheduler: Scheduler) = coroutineScope {
+ fun visitCompact(scheduler: Scheduler) {
if (depthTracker.isDirty()) {
- depthTracker.applyChanges(coroutineScope = this, scheduler, downstreamSet, this@MuxNode)
+ depthTracker.applyChanges(scheduler, downstreamSet, this@MuxNode)
}
}
@@ -217,22 +198,23 @@ internal sealed class MuxNode<W, K, V, Output>(
val schedulable = Schedulable.N(this)
- @Volatile lateinit var upstream: NodeConnection<V>
+ lateinit var upstream: NodeConnection<V>
- override suspend fun schedule(evalScope: EvalScope) {
- upstreamData[key] = upstream.directUpstream
- this@MuxNode.schedule(evalScope)
+ override fun schedule(logIndent: Int, evalScope: EvalScope) {
+ logDuration(logIndent, "MuxBranchNode.schedule") {
+ if (this@MuxNode is MuxPromptNode && this@MuxNode.name != null) {
+ logLn("[${this@MuxNode}] scheduling $key")
+ }
+ upstreamData[key] = upstream.directUpstream
+ this@MuxNode.schedule(evalScope)
+ }
}
- override suspend fun adjustDirectUpstream(
- scheduler: Scheduler,
- oldDepth: Int,
- newDepth: Int,
- ) {
+ override fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) {
this@MuxNode.adjustDirectUpstream(scheduler, oldDepth, newDepth)
}
- override suspend fun moveIndirectUpstreamToDirect(
+ override fun moveIndirectUpstreamToDirect(
scheduler: Scheduler,
oldIndirectDepth: Int,
oldIndirectSet: Set<MuxDeferredNode<*, *, *>>,
@@ -246,7 +228,7 @@ internal sealed class MuxNode<W, K, V, Output>(
)
}
- override suspend fun adjustIndirectUpstream(
+ override fun adjustIndirectUpstream(
scheduler: Scheduler,
oldDepth: Int,
newDepth: Int,
@@ -256,7 +238,7 @@ internal sealed class MuxNode<W, K, V, Output>(
this@MuxNode.adjustIndirectUpstream(scheduler, oldDepth, newDepth, removals, additions)
}
- override suspend fun moveDirectUpstreamToIndirect(
+ override fun moveDirectUpstreamToIndirect(
scheduler: Scheduler,
oldDirectDepth: Int,
newIndirectDepth: Int,
@@ -270,11 +252,11 @@ internal sealed class MuxNode<W, K, V, Output>(
)
}
- override suspend fun removeDirectUpstream(scheduler: Scheduler, depth: Int) {
+ override fun removeDirectUpstream(scheduler: Scheduler, depth: Int) {
removeDirectUpstream(scheduler, depth, key)
}
- override suspend fun removeIndirectUpstream(
+ override fun removeIndirectUpstream(
scheduler: Scheduler,
depth: Int,
indirectSet: Set<MuxDeferredNode<*, *, *>>,
@@ -286,113 +268,110 @@ internal sealed class MuxNode<W, K, V, Output>(
}
}
-internal typealias BranchNode<W, K, V> = MuxNode<W, K, V, *>.BranchNode
+internal typealias BranchNode<W, K, V> = MuxNode<W, K, V>.BranchNode
/** Tracks lifecycle of MuxNode in the network. Essentially a mutable ref for MuxLifecycleState. */
-internal class MuxLifecycle<A>(@Volatile var lifecycleState: MuxLifecycleState<A>) : TFlowImpl<A> {
- val mutex = Mutex()
+internal class MuxLifecycle<W, K, V>(var lifecycleState: MuxLifecycleState<W, K, V>) :
+ TFlowImpl<MuxResult<W, K, V>> {
- override fun toString(): String = "TFlowLifecycle[$hashString][$lifecycleState][$mutex]"
+ override fun toString(): String = "TFlowMuxLifecycle[$hashString][$lifecycleState]"
- override suspend fun activate(
+ override fun activate(
evalScope: EvalScope,
downstream: Schedulable,
- ): ActivationResult<A>? =
- mutex.withLock {
- when (val state = lifecycleState) {
- is MuxLifecycleState.Dead -> null
- is MuxLifecycleState.Active -> {
- state.node.addDownstreamLocked(downstream)
- ActivationResult(
- connection = NodeConnection(state.node, state.node),
- needsEval = state.node.hasCurrentValueLocked(evalScope),
- )
- }
- is MuxLifecycleState.Inactive -> {
- state.spec
- .activate(evalScope, this@MuxLifecycle)
- .also { node ->
- lifecycleState =
- if (node == null) {
- MuxLifecycleState.Dead
- } else {
- MuxLifecycleState.Active(node)
- }
- }
- ?.let { node ->
- node.addDownstreamLocked(downstream)
- ActivationResult(
- connection = NodeConnection(node, node),
- needsEval = false,
- )
- }
- }
+ ): ActivationResult<MuxResult<W, K, V>>? =
+ when (val state = lifecycleState) {
+ is MuxLifecycleState.Dead -> {
+ null
+ }
+ is MuxLifecycleState.Active -> {
+ state.node.addDownstreamLocked(downstream)
+ ActivationResult(
+ connection = NodeConnection(state.node, state.node),
+ needsEval = state.node.hasCurrentValueLocked(evalScope),
+ )
+ }
+ is MuxLifecycleState.Inactive -> {
+ state.spec
+ .activate(evalScope, this@MuxLifecycle)
+ .also { node ->
+ lifecycleState =
+ if (node == null) {
+ MuxLifecycleState.Dead
+ } else {
+ MuxLifecycleState.Active(node.first)
+ }
+ }
+ ?.let { (node, postActivate) ->
+ postActivate?.invoke()
+ node.addDownstreamLocked(downstream)
+ ActivationResult(connection = NodeConnection(node, node), needsEval = false)
+ }
}
}
}
-internal sealed interface MuxLifecycleState<out A> {
- class Inactive<A>(val spec: MuxActivator<A>) : MuxLifecycleState<A> {
+internal sealed interface MuxLifecycleState<out W, out K, out V> {
+ class Inactive<W, K, V>(val spec: MuxActivator<W, K, V>) : MuxLifecycleState<W, K, V> {
override fun toString(): String = "Inactive"
}
- class Active<A>(val node: MuxNode<*, *, *, A>) : MuxLifecycleState<A> {
+ class Active<W, K, V>(val node: MuxNode<W, K, V>) : MuxLifecycleState<W, K, V> {
override fun toString(): String = "Active(node=$node)"
}
- data object Dead : MuxLifecycleState<Nothing>
+ data object Dead : MuxLifecycleState<Nothing, Nothing, Nothing>
}
-internal interface MuxActivator<A> {
- suspend fun activate(evalScope: EvalScope, lifecycle: MuxLifecycle<A>): MuxNode<*, *, *, A>?
+internal interface MuxActivator<W, K, V> {
+ fun activate(
+ evalScope: EvalScope,
+ lifecycle: MuxLifecycle<W, K, V>,
+ ): Pair<MuxNode<W, K, V>, (() -> Unit)?>?
}
-internal inline fun <A> MuxLifecycle(onSubscribe: MuxActivator<A>): TFlowImpl<A> =
- MuxLifecycle(MuxLifecycleState.Inactive(onSubscribe))
+internal inline fun <W, K, V> MuxLifecycle(
+ onSubscribe: MuxActivator<W, K, V>
+): TFlowImpl<MuxResult<W, K, V>> = MuxLifecycle(MuxLifecycleState.Inactive(onSubscribe))
internal fun <K, V> TFlowImpl<MuxResult<MapHolder.W, K, V>>.awaitValues(): TFlowImpl<Map<K, V>> =
- mapImpl({ this@awaitValues }) { results ->
- results.asMapHolder().unwrapped.mapValues { it.value.getPushEvent(this) }
+ mapImpl({ this@awaitValues }) { results, logIndent ->
+ results.asMapHolder().unwrapped.mapValues { it.value.getPushEvent(logIndent, this) }
}
// activation logic
-internal suspend fun <W, K, V, O> MuxNode<W, K, V, O>.initializeUpstream(
+internal fun <W, K, V> MuxNode<W, K, V>.initializeUpstream(
evalScope: EvalScope,
- getStorage: suspend EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>,
+ getStorage: EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>,
storeFactory: MutableMapK.Factory<W, K>,
) {
val storage = getStorage(evalScope)
- coroutineScope {
- val initUpstream = buildList {
- storage.forEach { (key, flow) ->
- val branchNode = BranchNode(key)
- add(
- asyncImmediate(start = CoroutineStart.LAZY) {
- flow.activate(evalScope, branchNode.schedulable)?.let { (conn, needsEval) ->
- Triple(
- key,
- branchNode.apply { upstream = conn },
- if (needsEval) conn.directUpstream else null,
- )
- }
- }
- )
- }
+ val initUpstream = buildList {
+ storage.forEach { (key, flow) ->
+ val branchNode = BranchNode(key)
+ add(
+ flow.activate(evalScope, branchNode.schedulable)?.let { (conn, needsEval) ->
+ Triple(
+ key,
+ branchNode.apply { upstream = conn },
+ if (needsEval) conn.directUpstream else null,
+ )
+ }
+ )
}
- val results = initUpstream.awaitAll()
- switchedIn = storeFactory.create(initUpstream.size)
- upstreamData = storeFactory.create(initUpstream.size)
- for (triple in results) {
- triple?.let { (key, branch, upstream) ->
- switchedIn[key] = branch
- upstream?.let { upstreamData[key] = upstream }
- }
+ }
+ switchedIn = storeFactory.create(initUpstream.size)
+ upstreamData = storeFactory.create(initUpstream.size)
+ for (triple in initUpstream) {
+ triple?.let { (key, branch, upstream) ->
+ switchedIn[key] = branch
+ upstream?.let { upstreamData[key] = upstream }
}
}
}
-internal fun <W, K, V, O> MuxNode<W, K, V, O>.initializeDepth() {
+internal fun <W, K, V> MuxNode<W, K, V>.initializeDepth() {
switchedIn.forEach { (_, branch) ->
val conn = branch.upstream
if (conn.depthTracker.snapshotIsDirect) {
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt
index 7f40df508fb1..5ce0248d0655 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt
@@ -16,15 +16,17 @@
package com.android.systemui.kairos.internal
+import com.android.systemui.kairos.internal.store.MapK
import com.android.systemui.kairos.internal.store.MutableArrayMapK
import com.android.systemui.kairos.internal.store.MutableMapK
import com.android.systemui.kairos.internal.store.SingletonMapK
import com.android.systemui.kairos.internal.store.StoreEntry
import com.android.systemui.kairos.internal.store.asArrayHolder
+import com.android.systemui.kairos.internal.store.asSingle
import com.android.systemui.kairos.internal.store.singleOf
-import com.android.systemui.kairos.internal.util.Key
import com.android.systemui.kairos.internal.util.hashString
-import com.android.systemui.kairos.internal.util.mapParallel
+import com.android.systemui.kairos.internal.util.logDuration
+import com.android.systemui.kairos.internal.util.logLn
import com.android.systemui.kairos.util.Just
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.None
@@ -37,41 +39,49 @@ import com.android.systemui.kairos.util.maybeThis
import com.android.systemui.kairos.util.merge
import com.android.systemui.kairos.util.orError
import com.android.systemui.kairos.util.these
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.sync.withLock
internal class MuxDeferredNode<W, K, V>(
- lifecycle: MuxLifecycle<MuxResult<W, K, V>>,
- val spec: MuxActivator<MuxResult<W, K, V>>,
+ val name: String?,
+ lifecycle: MuxLifecycle<W, K, V>,
+ val spec: MuxActivator<W, K, V>,
factory: MutableMapK.Factory<W, K>,
-) : MuxNode<W, K, V, MuxResult<W, K, V>>(lifecycle, factory), Key<MuxResult<W, K, V>> {
+) : MuxNode<W, K, V>(lifecycle, factory) {
val schedulable = Schedulable.M(this)
-
- @Volatile var patches: NodeConnection<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>? = null
- @Volatile var patchData: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = null
-
- override suspend fun visit(evalScope: EvalScope) {
- val scheduleDownstream = upstreamData.isNotEmpty()
- val result = upstreamData.readOnlyCopy()
- upstreamData.clear()
- val compactDownstream = depthTracker.isDirty()
- if (scheduleDownstream || compactDownstream) {
- coroutineScope {
- mutex.withLock {
- if (compactDownstream) {
+ var patches: NodeConnection<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>? = null
+ var patchData: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = null
+
+ override fun visit(logIndent: Int, evalScope: EvalScope) {
+ check(epoch < evalScope.epoch) { "node unexpectedly visited multiple times in transaction" }
+ logDuration(logIndent, "MuxDeferred[$name].visit") {
+ val scheduleDownstream: Boolean
+ val result: MapK<W, K, PullNode<V>>
+ logDuration("copying upstream data", false) {
+ scheduleDownstream = upstreamData.isNotEmpty()
+ result = upstreamData.readOnlyCopy()
+ upstreamData.clear()
+ }
+ if (name != null) {
+ logLn("[${this@MuxDeferredNode}] result = $result")
+ }
+ val compactDownstream = depthTracker.isDirty()
+ if (scheduleDownstream || compactDownstream) {
+ if (compactDownstream) {
+ logDuration("compactDownstream", false) {
depthTracker.applyChanges(
- coroutineScope = this,
evalScope.scheduler,
downstreamSet,
muxNode = this@MuxDeferredNode,
)
}
- if (scheduleDownstream) {
- epoch = evalScope.epoch
- evalScope.setResult(this@MuxDeferredNode, result)
- if (!scheduleAll(downstreamSet, evalScope)) {
+ }
+ if (scheduleDownstream) {
+ logDuration("scheduleDownstream") {
+ if (name != null) {
+ logLn("[${this@MuxDeferredNode}] scheduling")
+ }
+ transactionCache.put(evalScope, result)
+ if (!scheduleAll(currentLogIndent, downstreamSet, evalScope)) {
evalScope.scheduleDeactivation(this@MuxDeferredNode)
}
}
@@ -80,26 +90,26 @@ internal class MuxDeferredNode<W, K, V>(
}
}
- override suspend fun getPushEvent(evalScope: EvalScope): MuxResult<W, K, V> =
- evalScope.getCurrentValue(key = this)
+ override fun getPushEvent(logIndent: Int, evalScope: EvalScope): MuxResult<W, K, V> =
+ logDuration(logIndent, "MuxDeferred.getPushEvent") {
+ transactionCache.getCurrentValue(evalScope).also {
+ if (name != null) {
+ logLn("[${this@MuxDeferredNode}] getPushEvent = $it")
+ }
+ }
+ }
- private suspend fun compactIfNeeded(evalScope: EvalScope) {
+ private fun compactIfNeeded(evalScope: EvalScope) {
depthTracker.propagateChanges(evalScope.compactor, this)
}
- override suspend fun doDeactivate() {
+ override fun doDeactivate() {
// Update lifecycle
- lifecycle.mutex.withLock {
- if (lifecycle.lifecycleState !is MuxLifecycleState.Active) return@doDeactivate
- lifecycle.lifecycleState = MuxLifecycleState.Inactive(spec)
- }
+ if (lifecycle.lifecycleState !is MuxLifecycleState.Active) return@doDeactivate
+ lifecycle.lifecycleState = MuxLifecycleState.Inactive(spec)
// Process branch nodes
- coroutineScope {
- switchedIn.forEach { (_, branchNode) ->
- branchNode.upstream.let {
- launch { it.removeDownstreamAndDeactivateIfNeeded(branchNode.schedulable) }
- }
- }
+ switchedIn.forEach { (_, branchNode) ->
+ branchNode.upstream.removeDownstreamAndDeactivateIfNeeded(branchNode.schedulable)
}
// Process patch node
patches?.removeDownstreamAndDeactivateIfNeeded(schedulable)
@@ -108,12 +118,15 @@ internal class MuxDeferredNode<W, K, V>(
// MOVE phase
// - concurrent moves may be occurring, but no more evals. all depth recalculations are
// deferred to the end of this phase.
- suspend fun performMove(evalScope: EvalScope) {
+ fun performMove(logIndent: Int, evalScope: EvalScope) {
+ if (name != null) {
+ logLn(logIndent, "[${this@MuxDeferredNode}] performMove (patchData = $patchData)")
+ }
+
val patch = patchData ?: return
patchData = null
- // TODO: this logic is very similar to what's in MuxPromptMoving, maybe turn into an inline
- // fun?
+ // TODO: this logic is very similar to what's in MuxPrompt, maybe turn into an inline fun?
// We have a patch, process additions/updates and removals
val adds = mutableListOf<Pair<K, TFlowImpl<V>>>()
@@ -127,131 +140,112 @@ internal class MuxDeferredNode<W, K, V>(
val severed = mutableListOf<NodeConnection<*>>()
- coroutineScope {
- // remove and sever
- removes.forEach { k ->
- switchedIn.remove(k)?.let { branchNode: BranchNode ->
- val conn = branchNode.upstream
- severed.add(conn)
- launch { conn.removeDownstream(downstream = branchNode.schedulable) }
- depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth)
- }
+ // remove and sever
+ removes.forEach { k ->
+ switchedIn.remove(k)?.let { branchNode: BranchNode ->
+ val conn = branchNode.upstream
+ severed.add(conn)
+ conn.removeDownstream(downstream = branchNode.schedulable)
+ depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth)
}
+ }
- // add or replace
- adds
- .mapParallel { (k, newUpstream: TFlowImpl<V>) ->
- val branchNode = BranchNode(k)
- k to
- newUpstream.activate(evalScope, branchNode.schedulable)?.let { (conn, _) ->
- branchNode.apply { upstream = conn }
- }
- }
- .forEach { (k, newBranch: BranchNode?) ->
- // remove old and sever, if present
- switchedIn.remove(k)?.let { branchNode ->
- val conn = branchNode.upstream
- severed.add(conn)
- launch { conn.removeDownstream(downstream = branchNode.schedulable) }
- depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth)
- }
+ // add or replace
+ adds.forEach { (k, newUpstream: TFlowImpl<V>) ->
+ // remove old and sever, if present
+ switchedIn.remove(k)?.let { branchNode ->
+ val conn = branchNode.upstream
+ severed.add(conn)
+ conn.removeDownstream(downstream = branchNode.schedulable)
+ depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth)
+ }
- // add new
- newBranch?.let {
- switchedIn[k] = newBranch
- val branchDepthTracker = newBranch.upstream.depthTracker
- if (branchDepthTracker.snapshotIsDirect) {
- depthTracker.addDirectUpstream(
- oldDepth = null,
- newDepth = branchDepthTracker.snapshotDirectDepth,
- )
- } else {
- depthTracker.addIndirectUpstream(
- oldDepth = null,
- newDepth = branchDepthTracker.snapshotIndirectDepth,
- )
- depthTracker.updateIndirectRoots(
- additions = branchDepthTracker.snapshotIndirectRoots,
- butNot = this@MuxDeferredNode,
- )
- }
- }
+ // add new
+ val newBranch = BranchNode(k)
+ newUpstream.activate(evalScope, newBranch.schedulable)?.let { (conn, _) ->
+ newBranch.upstream = conn
+ switchedIn[k] = newBranch
+ val branchDepthTracker = newBranch.upstream.depthTracker
+ if (branchDepthTracker.snapshotIsDirect) {
+ depthTracker.addDirectUpstream(
+ oldDepth = null,
+ newDepth = branchDepthTracker.snapshotDirectDepth,
+ )
+ } else {
+ depthTracker.addIndirectUpstream(
+ oldDepth = null,
+ newDepth = branchDepthTracker.snapshotIndirectDepth,
+ )
+ depthTracker.updateIndirectRoots(
+ additions = branchDepthTracker.snapshotIndirectRoots,
+ butNot = this@MuxDeferredNode,
+ )
}
+ }
}
- coroutineScope {
- for (severedNode in severed) {
- launch { severedNode.scheduleDeactivationIfNeeded(evalScope) }
- }
+ for (severedNode in severed) {
+ severedNode.scheduleDeactivationIfNeeded(evalScope)
}
compactIfNeeded(evalScope)
}
- suspend fun removeDirectPatchNode(scheduler: Scheduler) {
- mutex.withLock {
- if (
- depthTracker.removeIndirectUpstream(depth = 0) or
- depthTracker.setIsIndirectRoot(false)
- ) {
- depthTracker.schedule(scheduler, this)
- }
- patches = null
+ fun removeDirectPatchNode(scheduler: Scheduler) {
+ if (
+ depthTracker.removeIndirectUpstream(depth = 0) or depthTracker.setIsIndirectRoot(false)
+ ) {
+ depthTracker.schedule(scheduler, this)
}
+ patches = null
}
- suspend fun removeIndirectPatchNode(
+ fun removeIndirectPatchNode(
scheduler: Scheduler,
depth: Int,
indirectSet: Set<MuxDeferredNode<*, *, *>>,
) {
// indirectly connected patches forward the indirectSet
- mutex.withLock {
- if (
- depthTracker.updateIndirectRoots(removals = indirectSet) or
- depthTracker.removeIndirectUpstream(depth)
- ) {
- depthTracker.schedule(scheduler, this)
- }
- patches = null
+ if (
+ depthTracker.updateIndirectRoots(removals = indirectSet) or
+ depthTracker.removeIndirectUpstream(depth)
+ ) {
+ depthTracker.schedule(scheduler, this)
}
+ patches = null
}
- suspend fun moveIndirectPatchNodeToDirect(
+ fun moveIndirectPatchNodeToDirect(
scheduler: Scheduler,
oldIndirectDepth: Int,
oldIndirectSet: Set<MuxDeferredNode<*, *, *>>,
) {
// directly connected patches are stored as an indirect singleton set of the patchNode
- mutex.withLock {
- if (
- depthTracker.updateIndirectRoots(removals = oldIndirectSet) or
- depthTracker.removeIndirectUpstream(oldIndirectDepth) or
- depthTracker.setIsIndirectRoot(true)
- ) {
- depthTracker.schedule(scheduler, this)
- }
+ if (
+ depthTracker.updateIndirectRoots(removals = oldIndirectSet) or
+ depthTracker.removeIndirectUpstream(oldIndirectDepth) or
+ depthTracker.setIsIndirectRoot(true)
+ ) {
+ depthTracker.schedule(scheduler, this)
}
}
- suspend fun moveDirectPatchNodeToIndirect(
+ fun moveDirectPatchNodeToIndirect(
scheduler: Scheduler,
newIndirectDepth: Int,
newIndirectSet: Set<MuxDeferredNode<*, *, *>>,
) {
// indirectly connected patches forward the indirectSet
- mutex.withLock {
- if (
- depthTracker.setIsIndirectRoot(false) or
- depthTracker.updateIndirectRoots(additions = newIndirectSet, butNot = this) or
- depthTracker.addIndirectUpstream(oldDepth = null, newDepth = newIndirectDepth)
- ) {
- depthTracker.schedule(scheduler, this)
- }
+ if (
+ depthTracker.setIsIndirectRoot(false) or
+ depthTracker.updateIndirectRoots(additions = newIndirectSet, butNot = this) or
+ depthTracker.addIndirectUpstream(oldDepth = null, newDepth = newIndirectDepth)
+ ) {
+ depthTracker.schedule(scheduler, this)
}
}
- suspend fun adjustIndirectPatchNode(
+ fun adjustIndirectPatchNode(
scheduler: Scheduler,
oldDepth: Int,
newDepth: Int,
@@ -259,65 +253,73 @@ internal class MuxDeferredNode<W, K, V>(
additions: Set<MuxDeferredNode<*, *, *>>,
) {
// indirectly connected patches forward the indirectSet
- mutex.withLock {
- if (
- depthTracker.updateIndirectRoots(
- additions = additions,
- removals = removals,
- butNot = this,
- ) or depthTracker.addIndirectUpstream(oldDepth = oldDepth, newDepth = newDepth)
- ) {
- depthTracker.schedule(scheduler, this)
- }
+ if (
+ depthTracker.updateIndirectRoots(
+ additions = additions,
+ removals = removals,
+ butNot = this,
+ ) or depthTracker.addIndirectUpstream(oldDepth = oldDepth, newDepth = newDepth)
+ ) {
+ depthTracker.schedule(scheduler, this)
}
}
- suspend fun scheduleMover(evalScope: EvalScope) {
- patchData =
- checkNotNull(patches) { "mux mover scheduled with unset patches upstream node" }
- .getPushEvent(evalScope)
- evalScope.scheduleMuxMover(this)
+ fun scheduleMover(logIndent: Int, evalScope: EvalScope) {
+ logDuration(logIndent, "MuxDeferred.scheduleMover") {
+ patchData =
+ checkNotNull(patches) { "mux mover scheduled with unset patches upstream node" }
+ .getPushEvent(currentLogIndent, evalScope)
+ evalScope.scheduleMuxMover(this@MuxDeferredNode)
+ }
}
- override fun toString(): String = "${this::class.simpleName}@$hashString"
+ override fun toString(): String =
+ "${this::class.simpleName}@$hashString${name?.let { "[$it]" }.orEmpty()}"
}
internal inline fun <A> switchDeferredImplSingle(
- crossinline getStorage: suspend EvalScope.() -> TFlowImpl<A>,
- crossinline getPatches: suspend EvalScope.() -> TFlowImpl<TFlowImpl<A>>,
-): TFlowImpl<A> =
- mapImpl({
+ name: String? = null,
+ crossinline getStorage: EvalScope.() -> TFlowImpl<A>,
+ crossinline getPatches: EvalScope.() -> TFlowImpl<TFlowImpl<A>>,
+): TFlowImpl<A> {
+ val patches = mapImpl(getPatches) { newFlow, _ -> singleOf(just(newFlow)).asIterable() }
+ val switchDeferredImpl =
switchDeferredImpl(
+ name = name,
getStorage = { singleOf(getStorage()).asIterable() },
- getPatches = {
- mapImpl(getPatches) { newFlow -> singleOf(just(newFlow)).asIterable() }
- },
+ getPatches = { patches },
storeFactory = SingletonMapK.Factory(),
)
- }) { map ->
- map.getValue(Unit).getPushEvent(this)
+ return mapImpl({ switchDeferredImpl }) { map, logIndent ->
+ map.asSingle().getValue(Unit).getPushEvent(logIndent, this).also {
+ if (name != null) {
+ logLn(logIndent, "[$name] extracting single mux: $it")
+ }
+ }
}
+}
internal fun <W, K, V> switchDeferredImpl(
- getStorage: suspend EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>,
- getPatches: suspend EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>,
+ name: String? = null,
+ getStorage: EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>,
+ getPatches: EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>,
storeFactory: MutableMapK.Factory<W, K>,
): TFlowImpl<MuxResult<W, K, V>> =
- MuxLifecycle(MuxDeferredActivator(getStorage, storeFactory, getPatches))
+ MuxLifecycle(MuxDeferredActivator(name, getStorage, storeFactory, getPatches))
private class MuxDeferredActivator<W, K, V>(
- private val getStorage: suspend EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>,
+ private val name: String?,
+ private val getStorage: EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>,
private val storeFactory: MutableMapK.Factory<W, K>,
- private val getPatches:
- suspend EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>,
-) : MuxActivator<MuxResult<W, K, V>> {
- override suspend fun activate(
+ private val getPatches: EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>,
+) : MuxActivator<W, K, V> {
+ override fun activate(
evalScope: EvalScope,
- lifecycle: MuxLifecycle<MuxResult<W, K, V>>,
- ): MuxNode<W, *, *, MuxResult<W, K, V>>? {
+ lifecycle: MuxLifecycle<W, K, V>,
+ ): Pair<MuxNode<W, K, V>, (() -> Unit)?>? {
// Initialize mux node and switched-in connections.
val muxNode =
- MuxDeferredNode(lifecycle, this, storeFactory).apply {
+ MuxDeferredNode(name, lifecycle, this, storeFactory).apply {
initializeUpstream(evalScope, getStorage, storeFactory)
// Update depth based on all initial switched-in nodes.
initializeDepth()
@@ -327,29 +329,34 @@ private class MuxDeferredActivator<W, K, V>(
depthTracker.setIsIndirectRoot(true)
depthTracker.reset()
}
- // Setup patches connection; deferring allows for a recursive connection, where
- // muxNode is downstream of itself via patches.
- var isIndirect = true
- evalScope.deferAction {
- val (patchesConn, needsEval) =
- getPatches(evalScope).activate(evalScope, downstream = muxNode.schedulable)
- ?: run {
- isIndirect = false
- // Turns out we can't connect to patches, so update our depth and
- // propagate
- muxNode.mutex.withLock {
+
+ // Schedule for evaluation if any switched-in nodes have already emitted within
+ // this transaction.
+ if (muxNode.upstreamData.isNotEmpty()) {
+ muxNode.schedule(evalScope)
+ }
+
+ return muxNode to
+ fun() {
+ // Setup patches connection; deferring allows for a recursive connection, where
+ // muxNode is downstream of itself via patches.
+ val (patchesConn, needsEval) =
+ getPatches(evalScope).activate(evalScope, downstream = muxNode.schedulable)
+ ?: run {
+ // Turns out we can't connect to patches, so update our depth and
+ // propagate
if (muxNode.depthTracker.setIsIndirectRoot(false)) {
+ // TODO: schedules might not be necessary now that we're not
+ // parallel?
muxNode.depthTracker.schedule(evalScope.scheduler, muxNode)
}
+ return
}
- return@deferAction
- }
- muxNode.patches = patchesConn
+ muxNode.patches = patchesConn
- if (!patchesConn.schedulerUpstream.depthTracker.snapshotIsDirect) {
- // Turns out patches is indirect, so we are not a root. Update depth and
- // propagate.
- muxNode.mutex.withLock {
+ if (!patchesConn.schedulerUpstream.depthTracker.snapshotIsDirect) {
+ // Turns out patches is indirect, so we are not a root. Update depth and
+ // propagate.
if (
muxNode.depthTracker.setIsIndirectRoot(false) or
muxNode.depthTracker.addIndirectUpstream(
@@ -363,63 +370,63 @@ private class MuxDeferredActivator<W, K, V>(
muxNode.depthTracker.schedule(evalScope.scheduler, muxNode)
}
}
+ // Schedule mover to process patch emission at the end of this transaction, if
+ // needed.
+ if (needsEval) {
+ muxNode.patchData = patchesConn.getPushEvent(0, evalScope)
+ evalScope.scheduleMuxMover(muxNode)
+ }
}
- // Schedule mover to process patch emission at the end of this transaction, if
- // needed.
- if (needsEval) {
- muxNode.patchData = patchesConn.getPushEvent(evalScope)
- evalScope.scheduleMuxMover(muxNode)
- }
- }
-
- // Schedule for evaluation if any switched-in nodes have already emitted within
- // this transaction.
- if (muxNode.upstreamData.isNotEmpty()) {
- muxNode.schedule(evalScope)
- }
- return muxNode.takeUnless { muxNode.switchedIn.isEmpty() && !isIndirect }
}
}
internal inline fun <A> mergeNodes(
- crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>,
- crossinline getOther: suspend EvalScope.() -> TFlowImpl<A>,
- crossinline f: suspend EvalScope.(A, A) -> A,
+ crossinline getPulse: EvalScope.() -> TFlowImpl<A>,
+ crossinline getOther: EvalScope.() -> TFlowImpl<A>,
+ name: String? = null,
+ crossinline f: EvalScope.(A, A) -> A,
): TFlowImpl<A> {
+ val mergedThese = mergeNodes(name, getPulse, getOther)
val merged =
- mapImpl({ mergeNodes(getPulse, getOther) }) { these ->
- these.merge { thiz, that -> f(thiz, that) }
- }
+ mapImpl({ mergedThese }) { these, _ -> these.merge { thiz, that -> f(thiz, that) } }
return merged.cached()
}
-internal fun <T> Iterable<T>.asIterableWithIndex(): Iterable<StoreEntry<Int, T>> =
+internal fun <T> Iterable<T>.asIterableWithIndex(): Iterable<Map.Entry<Int, T>> =
asSequence().mapIndexed { i, t -> StoreEntry(i, t) }.asIterable()
internal inline fun <A, B> mergeNodes(
- crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>,
- crossinline getOther: suspend EvalScope.() -> TFlowImpl<B>,
+ name: String? = null,
+ crossinline getPulse: EvalScope.() -> TFlowImpl<A>,
+ crossinline getOther: EvalScope.() -> TFlowImpl<B>,
): TFlowImpl<These<A, B>> {
val storage =
- listOf(mapImpl(getPulse) { These.thiz<A, B>(it) }, mapImpl(getOther) { These.that(it) })
+ listOf(
+ mapImpl(getPulse) { it, _ -> These.thiz<A, B>(it) },
+ mapImpl(getOther) { it, _ -> These.that(it) },
+ )
.asIterableWithIndex()
val switchNode =
switchDeferredImpl(
+ name = name,
getStorage = { storage },
getPatches = { neverImpl },
storeFactory = MutableArrayMapK.Factory(),
)
val merged =
- mapImpl({ switchNode }) { mergeResults ->
- val first = mergeResults.getMaybe(0).flatMap { it.getPushEvent(this).maybeThis() }
- val second = mergeResults.getMaybe(1).flatMap { it.getPushEvent(this).maybeThat() }
+ mapImpl({ switchNode }) { it, logIndent ->
+ val mergeResults = it.asArrayHolder()
+ val first =
+ mergeResults.getMaybe(0).flatMap { it.getPushEvent(logIndent, this).maybeThis() }
+ val second =
+ mergeResults.getMaybe(1).flatMap { it.getPushEvent(logIndent, this).maybeThat() }
these(first, second).orError { "unexpected missing merge result" }
}
return merged.cached()
}
internal inline fun <A> mergeNodes(
- crossinline getPulses: suspend EvalScope.() -> Iterable<TFlowImpl<A>>
+ crossinline getPulses: EvalScope.() -> Iterable<TFlowImpl<A>>
): TFlowImpl<List<A>> {
val switchNode =
switchDeferredImpl(
@@ -428,15 +435,15 @@ internal inline fun <A> mergeNodes(
storeFactory = MutableArrayMapK.Factory(),
)
val merged =
- mapImpl({ switchNode }) {
+ mapImpl({ switchNode }) { it, logIndent ->
val mergeResults = it.asArrayHolder()
- mergeResults.map { (_, node) -> node.getPushEvent(this) }
+ mergeResults.map { (_, node) -> node.getPushEvent(logIndent, this) }
}
return merged.cached()
}
internal inline fun <A> mergeNodesLeft(
- crossinline getPulses: suspend EvalScope.() -> Iterable<TFlowImpl<A>>
+ crossinline getPulses: EvalScope.() -> Iterable<TFlowImpl<A>>
): TFlowImpl<A> {
val switchNode =
switchDeferredImpl(
@@ -445,6 +452,9 @@ internal inline fun <A> mergeNodesLeft(
storeFactory = MutableArrayMapK.Factory(),
)
val merged =
- mapImpl({ switchNode }) { mergeResults -> mergeResults.values.first().getPushEvent(this) }
+ mapImpl({ switchNode }) { it, logIndent ->
+ val mergeResults = it.asArrayHolder()
+ mergeResults.values.first().getPushEvent(logIndent, this)
+ }
return merged.cached()
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt
index 839c5a64272a..1c9af24a392f 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt
@@ -18,207 +18,180 @@ package com.android.systemui.kairos.internal
import com.android.systemui.kairos.internal.store.MutableMapK
import com.android.systemui.kairos.internal.store.SingletonMapK
+import com.android.systemui.kairos.internal.store.asSingle
import com.android.systemui.kairos.internal.store.singleOf
-import com.android.systemui.kairos.internal.util.Key
-import com.android.systemui.kairos.internal.util.launchImmediate
-import com.android.systemui.kairos.internal.util.mapParallel
+import com.android.systemui.kairos.internal.util.LogIndent
+import com.android.systemui.kairos.internal.util.hashString
+import com.android.systemui.kairos.internal.util.logDuration
import com.android.systemui.kairos.util.Just
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.None
import com.android.systemui.kairos.util.just
-import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.launch
-import kotlinx.coroutines.sync.withLock
-private typealias MuxPromptMovingResult<W, K, V> = Pair<MuxResult<W, K, V>, MuxResult<W, K, V>?>
-
-internal class MuxPromptMovingNode<W, K, V>(
- lifecycle: MuxLifecycle<MuxPromptMovingResult<W, K, V>>,
- private val spec: MuxActivator<MuxPromptMovingResult<W, K, V>>,
+internal class MuxPromptNode<W, K, V>(
+ val name: String?,
+ lifecycle: MuxLifecycle<W, K, V>,
+ private val spec: MuxActivator<W, K, V>,
factory: MutableMapK.Factory<W, K>,
-) :
- MuxNode<W, K, V, MuxPromptMovingResult<W, K, V>>(lifecycle, factory),
- Key<MuxPromptMovingResult<W, K, V>> {
-
- @Volatile var patchData: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = null
- @Volatile var patches: PatchNode? = null
+) : MuxNode<W, K, V>(lifecycle, factory) {
- @Volatile private var reEval: MuxPromptMovingResult<W, K, V>? = null
+ var patchData: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = null
+ var patches: PatchNode? = null
- override suspend fun visit(evalScope: EvalScope) {
- val preSwitchNotEmpty = upstreamData.isNotEmpty()
- val preSwitchResults: MuxResult<W, K, V> = upstreamData.readOnlyCopy()
- upstreamData.clear()
+ override fun visit(logIndent: Int, evalScope: EvalScope) {
+ check(epoch < evalScope.epoch) { "node unexpectedly visited multiple times in transaction" }
+ logDuration(logIndent, "MuxPrompt.visit") {
+ val patch: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = patchData
+ patchData = null
- val patch: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>? = patchData
- patchData = null
-
- val (reschedule, evalResult) =
- reEval?.let { false to it }
- ?: if (preSwitchNotEmpty || patch != null) {
- doEval(preSwitchNotEmpty, preSwitchResults, patch, evalScope)
- } else {
- false to null
+ // If there's a patch, process it.
+ patch?.let {
+ val needsReschedule = processPatch(patch, evalScope)
+ // We may need to reschedule if newly-switched-in nodes have not yet been
+ // visited within this transaction.
+ val depthIncreased = depthTracker.dirty_depthIncreased()
+ if (needsReschedule || depthIncreased) {
+ if (depthIncreased) {
+ depthTracker.schedule(evalScope.compactor, this@MuxPromptNode)
+ }
+ if (name != null) {
+ logLn(
+ "[${this@MuxPromptNode}] rescheduling (reschedule=$needsReschedule, depthIncrease=$depthIncreased)"
+ )
+ }
+ schedule(evalScope)
+ return
}
- reEval = null
-
- if (reschedule || depthTracker.dirty_depthIncreased()) {
- reEval = evalResult
- // Can't schedule downstream yet, need to compact first
- if (depthTracker.dirty_depthIncreased()) {
- depthTracker.schedule(evalScope.compactor, node = this)
}
- schedule(evalScope)
- } else {
+ val results = upstreamData.readOnlyCopy().also { upstreamData.clear() }
+
+ // If we don't need to reschedule, or there wasn't a patch at all, then we proceed
+ // with merging pre-switch and post-switch results
+ val hasResult = results.isNotEmpty()
val compactDownstream = depthTracker.isDirty()
- if (evalResult != null || compactDownstream) {
- coroutineScope {
- mutex.withLock {
- if (compactDownstream) {
- adjustDownstreamDepths(evalScope, coroutineScope = this)
- }
- if (evalResult != null) {
- epoch = evalScope.epoch
- evalScope.setResult(this@MuxPromptMovingNode, evalResult)
- if (!scheduleAll(downstreamSet, evalScope)) {
- evalScope.scheduleDeactivation(this@MuxPromptMovingNode)
- }
- }
+ if (hasResult || compactDownstream) {
+ if (compactDownstream) {
+ adjustDownstreamDepths(evalScope)
+ }
+ if (hasResult) {
+ transactionCache.put(evalScope, results)
+ if (!scheduleAll(currentLogIndent, downstreamSet, evalScope)) {
+ evalScope.scheduleDeactivation(this@MuxPromptNode)
}
}
}
}
}
- private suspend fun doEval(
- preSwitchNotEmpty: Boolean,
- preSwitchResults: MuxResult<W, K, V>,
- patch: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>?,
+ // side-effect: this will populate `upstreamData` with any immediately available results
+ private fun LogIndent.processPatch(
+ patch: Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>,
evalScope: EvalScope,
- ): Pair<Boolean, MuxPromptMovingResult<W, K, V>?> {
- val newlySwitchedIn: MuxResult<W, K, V>? =
- patch?.let {
- // We have a patch, process additions/updates and removals
- val adds = mutableListOf<Pair<K, TFlowImpl<V>>>()
- val removes = mutableListOf<K>()
- patch.forEach { (k, newUpstream) ->
- when (newUpstream) {
- is Just -> adds.add(k to newUpstream.value)
- None -> removes.add(k)
- }
- }
+ ): Boolean {
+ var needsReschedule = false
+ // We have a patch, process additions/updates and removals
+ val adds = mutableListOf<Pair<K, TFlowImpl<V>>>()
+ val removes = mutableListOf<K>()
+ patch.forEach { (k, newUpstream) ->
+ when (newUpstream) {
+ is Just -> adds.add(k to newUpstream.value)
+ None -> removes.add(k)
+ }
+ }
- val additionsAndUpdates = mutableListOf<Pair<K, PullNode<V>>>()
- val severed = mutableListOf<NodeConnection<*>>()
-
- coroutineScope {
- // remove and sever
- removes.forEach { k ->
- switchedIn.remove(k)?.let { branchNode: BranchNode ->
- val conn: NodeConnection<V> = branchNode.upstream
- severed.add(conn)
- launchImmediate {
- conn.removeDownstream(downstream = branchNode.schedulable)
- }
- depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth)
- }
- }
+ val severed = mutableListOf<NodeConnection<*>>()
- // add or replace
- adds
- .mapParallel { (k, newUpstream: TFlowImpl<V>) ->
- val branchNode = BranchNode(k)
- k to
- newUpstream.activate(evalScope, branchNode.schedulable)?.let {
- (conn, _) ->
- branchNode.apply { upstream = conn }
- }
- }
- .forEach { (k, newBranch: BranchNode?) ->
- // remove old and sever, if present
- switchedIn.remove(k)?.let { oldBranch: BranchNode ->
- val conn: NodeConnection<V> = oldBranch.upstream
- severed.add(conn)
- launchImmediate {
- conn.removeDownstream(downstream = oldBranch.schedulable)
- }
- depthTracker.removeDirectUpstream(
- conn.depthTracker.snapshotDirectDepth
- )
- }
-
- // add new
- newBranch?.let {
- switchedIn[k] = newBranch
- additionsAndUpdates.add(k to newBranch.upstream.directUpstream)
- val branchDepthTracker = newBranch.upstream.depthTracker
- if (branchDepthTracker.snapshotIsDirect) {
- depthTracker.addDirectUpstream(
- oldDepth = null,
- newDepth = branchDepthTracker.snapshotDirectDepth,
- )
- } else {
- depthTracker.addIndirectUpstream(
- oldDepth = null,
- newDepth = branchDepthTracker.snapshotIndirectDepth,
- )
- depthTracker.updateIndirectRoots(
- additions = branchDepthTracker.snapshotIndirectRoots,
- butNot = null,
- )
- }
- }
- }
+ // remove and sever
+ removes.forEach { k ->
+ switchedIn.remove(k)?.let { branchNode: BranchNode ->
+ if (name != null) {
+ logLn("[${this@MuxPromptNode}] removing $k")
}
+ val conn: NodeConnection<V> = branchNode.upstream
+ severed.add(conn)
+ conn.removeDownstream(downstream = branchNode.schedulable)
+ depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth)
+ }
+ }
- coroutineScope {
- for (severedNode in severed) {
- launch { severedNode.scheduleDeactivationIfNeeded(evalScope) }
- }
+ // add or replace
+ adds.forEach { (k, newUpstream: TFlowImpl<V>) ->
+ // remove old and sever, if present
+ switchedIn.remove(k)?.let { oldBranch: BranchNode ->
+ if (name != null) {
+ logLn("[${this@MuxPromptNode}] replacing $k")
}
+ val conn: NodeConnection<V> = oldBranch.upstream
+ severed.add(conn)
+ conn.removeDownstream(downstream = oldBranch.schedulable)
+ depthTracker.removeDirectUpstream(conn.depthTracker.snapshotDirectDepth)
+ }
- val resultStore = storeFactory.create<PullNode<V>>(additionsAndUpdates.size)
- for ((k, node) in additionsAndUpdates) {
- resultStore[k] = node
+ // add new
+ val newBranch = BranchNode(k)
+ newUpstream.activate(evalScope, newBranch.schedulable)?.let { (conn, needsEval) ->
+ newBranch.upstream = conn
+ if (name != null) {
+ logLn("[${this@MuxPromptNode}] switching in $k")
+ }
+ switchedIn[k] = newBranch
+ if (needsEval) {
+ upstreamData[k] = newBranch.upstream.directUpstream
+ } else {
+ needsReschedule = true
+ }
+ val branchDepthTracker = newBranch.upstream.depthTracker
+ if (branchDepthTracker.snapshotIsDirect) {
+ depthTracker.addDirectUpstream(
+ oldDepth = null,
+ newDepth = branchDepthTracker.snapshotDirectDepth,
+ )
+ } else {
+ depthTracker.addIndirectUpstream(
+ oldDepth = null,
+ newDepth = branchDepthTracker.snapshotIndirectDepth,
+ )
+ depthTracker.updateIndirectRoots(
+ additions = branchDepthTracker.snapshotIndirectRoots,
+ butNot = null,
+ )
}
- resultStore.takeIf { it.isNotEmpty() }?.asReadOnly()
}
+ }
- return if (preSwitchNotEmpty || newlySwitchedIn != null) {
- (newlySwitchedIn != null) to (preSwitchResults to newlySwitchedIn)
- } else {
- false to null
+ for (severedNode in severed) {
+ severedNode.scheduleDeactivationIfNeeded(evalScope)
}
+
+ return needsReschedule
}
- private fun adjustDownstreamDepths(evalScope: EvalScope, coroutineScope: CoroutineScope) {
+ private fun adjustDownstreamDepths(evalScope: EvalScope) {
if (depthTracker.dirty_depthIncreased()) {
// schedule downstream nodes on the compaction scheduler; this scheduler is drained at
// the end of this eval depth, so that all depth increases are applied before we advance
// the eval step
- depthTracker.schedule(evalScope.compactor, node = this@MuxPromptMovingNode)
+ depthTracker.schedule(evalScope.compactor, node = this@MuxPromptNode)
} else if (depthTracker.isDirty()) {
// schedule downstream nodes on the eval scheduler; this is more efficient and is only
// safe if the depth hasn't increased
depthTracker.applyChanges(
- coroutineScope,
evalScope.scheduler,
downstreamSet,
- muxNode = this@MuxPromptMovingNode,
+ muxNode = this@MuxPromptNode,
)
}
}
- override suspend fun getPushEvent(evalScope: EvalScope): MuxPromptMovingResult<W, K, V> =
- evalScope.getCurrentValue(key = this)
+ override fun getPushEvent(logIndent: Int, evalScope: EvalScope): MuxResult<W, K, V> =
+ logDuration(logIndent, "MuxPrompt.getPushEvent") {
+ transactionCache.getCurrentValue(evalScope)
+ }
- override suspend fun doDeactivate() {
+ override fun doDeactivate() {
// Update lifecycle
- lifecycle.mutex.withLock {
- if (lifecycle.lifecycleState !is MuxLifecycleState.Active) return@doDeactivate
- lifecycle.lifecycleState = MuxLifecycleState.Inactive(spec)
- }
+ if (lifecycle.lifecycleState !is MuxLifecycleState.Active) return
+ lifecycle.lifecycleState = MuxLifecycleState.Inactive(spec)
// Process branch nodes
switchedIn.forEach { (_, branchNode) ->
branchNode.upstream.removeDownstreamAndDeactivateIfNeeded(
@@ -231,57 +204,54 @@ internal class MuxPromptMovingNode<W, K, V>(
}
}
- suspend fun removeIndirectPatchNode(
+ fun removeIndirectPatchNode(
scheduler: Scheduler,
oldDepth: Int,
indirectSet: Set<MuxDeferredNode<*, *, *>>,
) {
- mutex.withLock {
- patches = null
- if (
- depthTracker.removeIndirectUpstream(oldDepth) or
- depthTracker.updateIndirectRoots(removals = indirectSet)
- ) {
- depthTracker.schedule(scheduler, this)
- }
+ patches = null
+ if (
+ depthTracker.removeIndirectUpstream(oldDepth) or
+ depthTracker.updateIndirectRoots(removals = indirectSet)
+ ) {
+ depthTracker.schedule(scheduler, this)
}
}
- suspend fun removeDirectPatchNode(scheduler: Scheduler, depth: Int) {
- mutex.withLock {
- patches = null
- if (depthTracker.removeDirectUpstream(depth)) {
- depthTracker.schedule(scheduler, this)
- }
+ fun removeDirectPatchNode(scheduler: Scheduler, depth: Int) {
+ patches = null
+ if (depthTracker.removeDirectUpstream(depth)) {
+ depthTracker.schedule(scheduler, this)
}
}
+ override fun toString(): String =
+ "${this::class.simpleName}@$hashString${name?.let { "[$it]" }.orEmpty()}"
+
inner class PatchNode : SchedulableNode {
val schedulable = Schedulable.N(this)
lateinit var upstream: NodeConnection<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>
- override suspend fun schedule(evalScope: EvalScope) {
- patchData = upstream.getPushEvent(evalScope)
- this@MuxPromptMovingNode.schedule(evalScope)
+ override fun schedule(logIndent: Int, evalScope: EvalScope) {
+ logDuration(logIndent, "MuxPromptPatchNode.schedule") {
+ patchData = upstream.getPushEvent(currentLogIndent, evalScope)
+ this@MuxPromptNode.schedule(evalScope)
+ }
}
- override suspend fun adjustDirectUpstream(
- scheduler: Scheduler,
- oldDepth: Int,
- newDepth: Int,
- ) {
- this@MuxPromptMovingNode.adjustDirectUpstream(scheduler, oldDepth, newDepth)
+ override fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) {
+ this@MuxPromptNode.adjustDirectUpstream(scheduler, oldDepth, newDepth)
}
- override suspend fun moveIndirectUpstreamToDirect(
+ override fun moveIndirectUpstreamToDirect(
scheduler: Scheduler,
oldIndirectDepth: Int,
oldIndirectSet: Set<MuxDeferredNode<*, *, *>>,
newDirectDepth: Int,
) {
- this@MuxPromptMovingNode.moveIndirectUpstreamToDirect(
+ this@MuxPromptNode.moveIndirectUpstreamToDirect(
scheduler,
oldIndirectDepth,
oldIndirectSet,
@@ -289,14 +259,14 @@ internal class MuxPromptMovingNode<W, K, V>(
)
}
- override suspend fun adjustIndirectUpstream(
+ override fun adjustIndirectUpstream(
scheduler: Scheduler,
oldDepth: Int,
newDepth: Int,
removals: Set<MuxDeferredNode<*, *, *>>,
additions: Set<MuxDeferredNode<*, *, *>>,
) {
- this@MuxPromptMovingNode.adjustIndirectUpstream(
+ this@MuxPromptNode.adjustIndirectUpstream(
scheduler,
oldDepth,
newDepth,
@@ -305,13 +275,13 @@ internal class MuxPromptMovingNode<W, K, V>(
)
}
- override suspend fun moveDirectUpstreamToIndirect(
+ override fun moveDirectUpstreamToIndirect(
scheduler: Scheduler,
oldDirectDepth: Int,
newIndirectDepth: Int,
newIndirectSet: Set<MuxDeferredNode<*, *, *>>,
) {
- this@MuxPromptMovingNode.moveDirectUpstreamToIndirect(
+ this@MuxPromptNode.moveDirectUpstreamToIndirect(
scheduler,
oldDirectDepth,
newIndirectDepth,
@@ -319,98 +289,70 @@ internal class MuxPromptMovingNode<W, K, V>(
)
}
- override suspend fun removeDirectUpstream(scheduler: Scheduler, depth: Int) {
- this@MuxPromptMovingNode.removeDirectPatchNode(scheduler, depth)
+ override fun removeDirectUpstream(scheduler: Scheduler, depth: Int) {
+ this@MuxPromptNode.removeDirectPatchNode(scheduler, depth)
}
- override suspend fun removeIndirectUpstream(
+ override fun removeIndirectUpstream(
scheduler: Scheduler,
depth: Int,
indirectSet: Set<MuxDeferredNode<*, *, *>>,
) {
- this@MuxPromptMovingNode.removeIndirectPatchNode(scheduler, depth, indirectSet)
+ this@MuxPromptNode.removeIndirectPatchNode(scheduler, depth, indirectSet)
}
}
}
-internal class MuxPromptEvalNode<W, K, V>(
- private val movingNode: PullNode<MuxPromptMovingResult<W, K, V>>,
- private val factory: MutableMapK.Factory<W, K>,
-) : PullNode<MuxResult<W, K, V>> {
- override suspend fun getPushEvent(evalScope: EvalScope): MuxResult<W, K, V> =
- movingNode.getPushEvent(evalScope).let { (preSwitchResults, newlySwitchedIn) ->
- newlySwitchedIn?.let {
- factory
- .create(preSwitchResults)
- .also { store ->
- newlySwitchedIn.forEach { k, pullNode -> store[k] = pullNode }
- }
- .asReadOnly()
- } ?: preSwitchResults
- }
-}
-
internal inline fun <A> switchPromptImplSingle(
- crossinline getStorage: suspend EvalScope.() -> TFlowImpl<A>,
- crossinline getPatches: suspend EvalScope.() -> TFlowImpl<TFlowImpl<A>>,
-): TFlowImpl<A> =
- mapImpl({
+ crossinline getStorage: EvalScope.() -> TFlowImpl<A>,
+ crossinline getPatches: EvalScope.() -> TFlowImpl<TFlowImpl<A>>,
+): TFlowImpl<A> {
+ val switchPromptImpl =
switchPromptImpl(
getStorage = { singleOf(getStorage()).asIterable() },
getPatches = {
- mapImpl(getPatches) { newFlow -> singleOf(just(newFlow)).asIterable() }
+ mapImpl(getPatches) { newFlow, _ -> singleOf(just(newFlow)).asIterable() }
},
storeFactory = SingletonMapK.Factory(),
)
- }) { map ->
- map.getValue(Unit).getPushEvent(this)
+ return mapImpl({ switchPromptImpl }) { map, logIndent ->
+ map.asSingle().getValue(Unit).getPushEvent(logIndent, this)
}
+}
internal fun <W, K, V> switchPromptImpl(
- getStorage: suspend EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>,
- getPatches: suspend EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>,
+ name: String? = null,
+ getStorage: EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>,
+ getPatches: EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>,
storeFactory: MutableMapK.Factory<W, K>,
-): TFlowImpl<MuxResult<W, K, V>> {
- val moving = MuxLifecycle(MuxPromptActivator(getStorage, storeFactory, getPatches))
- val eval = TFlowCheap { downstream ->
- moving.activate(evalScope = this, downstream)?.let { (connection, needsEval) ->
- val evalNode = MuxPromptEvalNode(connection.directUpstream, storeFactory)
- ActivationResult(
- connection = NodeConnection(evalNode, connection.schedulerUpstream),
- needsEval = needsEval,
- )
- }
- }
- return eval.cached()
-}
+): TFlowImpl<MuxResult<W, K, V>> =
+ MuxLifecycle(MuxPromptActivator(name, getStorage, storeFactory, getPatches))
private class MuxPromptActivator<W, K, V>(
- private val getStorage: suspend EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>,
+ private val name: String?,
+ private val getStorage: EvalScope.() -> Iterable<Map.Entry<K, TFlowImpl<V>>>,
private val storeFactory: MutableMapK.Factory<W, K>,
- private val getPatches:
- suspend EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>,
-) : MuxActivator<MuxPromptMovingResult<W, K, V>> {
- override suspend fun activate(
+ private val getPatches: EvalScope.() -> TFlowImpl<Iterable<Map.Entry<K, Maybe<TFlowImpl<V>>>>>,
+) : MuxActivator<W, K, V> {
+ override fun activate(
evalScope: EvalScope,
- lifecycle: MuxLifecycle<MuxPromptMovingResult<W, K, V>>,
- ): MuxNode<W, *, *, MuxPromptMovingResult<W, K, V>>? {
+ lifecycle: MuxLifecycle<W, K, V>,
+ ): Pair<MuxNode<W, K, V>, (() -> Unit)?>? {
// Initialize mux node and switched-in connections.
val movingNode =
- MuxPromptMovingNode(lifecycle, this, storeFactory).apply {
- coroutineScope {
- launch { initializeUpstream(evalScope, getStorage, storeFactory) }
- // Setup patches connection
- val patchNode = PatchNode()
- getPatches(evalScope)
- .activate(evalScope = evalScope, downstream = patchNode.schedulable)
- ?.let { (conn, needsEval) ->
- patchNode.upstream = conn
- patches = patchNode
- if (needsEval) {
- patchData = conn.getPushEvent(evalScope)
- }
+ MuxPromptNode(name, lifecycle, this, storeFactory).apply {
+ initializeUpstream(evalScope, getStorage, storeFactory)
+ // Setup patches connection
+ val patchNode = PatchNode()
+ getPatches(evalScope)
+ .activate(evalScope = evalScope, downstream = patchNode.schedulable)
+ ?.let { (conn, needsEval) ->
+ patchNode.upstream = conn
+ patches = patchNode
+ if (needsEval) {
+ patchData = conn.getPushEvent(0, evalScope)
}
- }
+ }
// Update depth based on all initial switched-in nodes.
initializeDepth()
// Update depth based on patches node.
@@ -441,6 +383,10 @@ private class MuxPromptActivator<W, K, V>(
movingNode.schedule(evalScope)
}
- return movingNode.takeUnless { it.patches == null && it.switchedIn.isEmpty() }
+ return if (movingNode.patches == null && movingNode.switchedIn.isEmpty()) {
+ null
+ } else {
+ movingNode to null
+ }
}
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt
index 79d4b7a843ac..b90c4c02fa7c 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt
@@ -18,15 +18,15 @@ package com.android.systemui.kairos.internal
import com.android.systemui.kairos.TState
import com.android.systemui.kairos.internal.util.HeteroMap
+import com.android.systemui.kairos.internal.util.logDuration
+import com.android.systemui.kairos.internal.util.logLn
import com.android.systemui.kairos.util.Just
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.just
import com.android.systemui.kairos.util.none
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.ConcurrentLinkedDeque
-import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicLong
import kotlin.coroutines.ContinuationInterceptor
+import kotlin.time.measureTime
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
@@ -52,25 +52,34 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope {
override val network
get() = this
- override val compactor = SchedulerImpl()
- override val scheduler = SchedulerImpl()
- override val transactionStore = HeteroMap()
+ override val compactor = SchedulerImpl {
+ if (it.markedForCompaction) false
+ else {
+ it.markedForCompaction = true
+ true
+ }
+ }
+ override val scheduler = SchedulerImpl {
+ if (it.markedForEvaluation) false
+ else {
+ it.markedForEvaluation = true
+ true
+ }
+ }
+ override val transactionStore = TransactionStore()
- private val stateWrites = ConcurrentLinkedQueue<TStateSource<*>>()
- private val outputsByDispatcher =
- ConcurrentHashMap<ContinuationInterceptor, ConcurrentLinkedQueue<Output<*>>>()
- private val muxMovers = ConcurrentLinkedQueue<MuxDeferredNode<*, *, *>>()
- private val deactivations = ConcurrentLinkedDeque<PushNode<*>>()
- private val outputDeactivations = ConcurrentLinkedQueue<Output<*>>()
+ private val stateWrites = ArrayDeque<TStateSource<*>>()
+ private val outputsByDispatcher = HashMap<ContinuationInterceptor, ArrayDeque<Output<*>>>()
+ private val muxMovers = ArrayDeque<MuxDeferredNode<*, *, *>>()
+ private val deactivations = ArrayDeque<PushNode<*>>()
+ private val outputDeactivations = ArrayDeque<Output<*>>()
private val transactionMutex = Mutex()
private val inputScheduleChan = Channel<ScheduledAction<*>>()
override fun scheduleOutput(output: Output<*>) {
val continuationInterceptor =
output.context[ContinuationInterceptor] ?: Dispatchers.Unconfined
- outputsByDispatcher
- .computeIfAbsent(continuationInterceptor) { ConcurrentLinkedQueue() }
- .add(output)
+ outputsByDispatcher.computeIfAbsent(continuationInterceptor) { ArrayDeque() }.add(output)
}
override fun scheduleMuxMover(muxMover: MuxDeferredNode<*, *, *>) {
@@ -101,28 +110,37 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope {
actions.add(func)
}
transactionMutex.withLock {
- try {
- // Run all actions
- evalScope {
- for (action in actions) {
- launch { action.started(evalScope = this@evalScope) }
+ val e = epoch
+ val duration = measureTime {
+ logLn(0, "===starting transaction $e===")
+ try {
+ logDuration(1, "init actions") {
+ // Run all actions
+ evalScope {
+ for (action in actions) {
+ action.started(evalScope = this@evalScope)
+ }
+ }
+ }
+ // Step through the network
+ doTransaction(1)
+ } catch (e: Exception) {
+ // Signal failure
+ while (actions.isNotEmpty()) {
+ actions.removeLast().fail(e)
+ }
+ // re-throw, cancelling this coroutine
+ throw e
+ } finally {
+ logDuration(1, "signal completions") {
+ // Signal completion
+ while (actions.isNotEmpty()) {
+ actions.removeLast().completed()
+ }
}
- }
- // Step through the network
- doTransaction()
- } catch (e: Exception) {
- // Signal failure
- while (actions.isNotEmpty()) {
- actions.removeLast().fail(e)
- }
- // re-throw, cancelling this coroutine
- throw e
- } finally {
- // Signal completion
- while (actions.isNotEmpty()) {
- actions.removeLast().completed()
}
}
+ logLn(0, "===transaction $e took $duration===")
}
}
}
@@ -139,35 +157,47 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope {
onResult.invokeOnCompletion { job.cancel() }
}
- suspend fun <R> evalScope(block: suspend EvalScope.() -> R): R = deferScope {
+ inline fun <R> evalScope(block: EvalScope.() -> R): R = deferScope {
block(EvalScopeImpl(this@Network, this))
}
/** Performs a transactional update of the FRP network. */
- private suspend fun doTransaction() {
+ private suspend fun doTransaction(logIndent: Int) {
// Traverse network, then run outputs
- do {
- scheduler.drainEval(this)
- } while (evalScope { evalOutputs(this) })
+ logDuration(logIndent, "traverse network") {
+ do {
+ val numNodes =
+ logDuration("drainEval") { scheduler.drainEval(currentLogIndent, this@Network) }
+ logLn("drained $numNodes nodes")
+ } while (logDuration("evalOutputs") { evalScope { evalOutputs(this) } })
+ }
// Update states
- evalScope { evalStateWriters(this) }
+ logDuration(logIndent, "update states") {
+ evalScope { evalStateWriters(currentLogIndent, this) }
+ }
// Invalidate caches
// Note: this needs to occur before deferred switches
- transactionStore.clear()
+ logDuration(logIndent, "clear store") { transactionStore.clear() }
epoch++
// Perform deferred switches
- evalScope { evalMuxMovers(this) }
+ logDuration(logIndent, "evalMuxMovers") {
+ evalScope { evalMuxMovers(currentLogIndent, this) }
+ }
// Compact depths
- scheduler.drainCompact()
- compactor.drainCompact()
+ logDuration(logIndent, "compact") {
+ scheduler.drainCompact(currentLogIndent)
+ compactor.drainCompact(currentLogIndent)
+ }
// Deactivate nodes with no downstream
- evalDeactivations()
+ logDuration(logIndent, "deactivations") { evalDeactivations() }
}
/** Invokes all [Output]s that have received data within this transaction. */
private suspend fun evalOutputs(evalScope: EvalScope): Boolean {
+ if (outputsByDispatcher.isEmpty()) {
+ return false
+ }
// Outputs can enqueue other outputs, so we need two loops
- if (outputsByDispatcher.isEmpty()) return false
while (outputsByDispatcher.isNotEmpty()) {
var launchedAny = false
coroutineScope {
@@ -176,57 +206,50 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope {
launchedAny = true
launch(key) {
while (outputs.isNotEmpty()) {
- val output = outputs.remove()
+ val output = outputs.removeFirst()
launch { output.visit(evalScope) }
}
}
}
}
}
- if (!launchedAny) outputsByDispatcher.clear()
+ if (!launchedAny) {
+ outputsByDispatcher.clear()
+ }
}
return true
}
- private suspend fun evalMuxMovers(evalScope: EvalScope) {
+ private fun evalMuxMovers(logIndent: Int, evalScope: EvalScope) {
while (muxMovers.isNotEmpty()) {
- coroutineScope {
- val toMove = muxMovers.remove()
- launch { toMove.performMove(evalScope) }
- }
+ val toMove = muxMovers.removeFirst()
+ toMove.performMove(logIndent, evalScope)
}
}
/** Updates all [TState]es that have changed within this transaction. */
- private suspend fun evalStateWriters(evalScope: EvalScope) {
- coroutineScope {
- while (stateWrites.isNotEmpty()) {
- val latch = stateWrites.remove()
- launch { latch.updateState(evalScope) }
- }
+ private fun evalStateWriters(logIndent: Int, evalScope: EvalScope) {
+ while (stateWrites.isNotEmpty()) {
+ val latch = stateWrites.removeFirst()
+ latch.updateState(logIndent, evalScope)
}
}
- private suspend fun evalDeactivations() {
- coroutineScope {
- launch {
- while (deactivations.isNotEmpty()) {
- // traverse in reverse order
- // - deactivations are added in depth-order during the node traversal phase
- // - perform deactivations in reverse order, in case later ones propagate to
- // earlier ones
- val toDeactivate = deactivations.removeLast()
- launch { toDeactivate.deactivateIfNeeded() }
- }
- }
- while (outputDeactivations.isNotEmpty()) {
- val toDeactivate = outputDeactivations.remove()
- launch {
- toDeactivate.upstream?.removeDownstreamAndDeactivateIfNeeded(
- downstream = toDeactivate.schedulable
- )
- }
- }
+ private fun evalDeactivations() {
+ while (deactivations.isNotEmpty()) {
+ // traverse in reverse order
+ // - deactivations are added in depth-order during the node traversal phase
+ // - perform deactivations in reverse order, in case later ones propagate to
+ // earlier ones
+ val toDeactivate = deactivations.removeLast()
+ toDeactivate.deactivateIfNeeded()
+ }
+
+ while (outputDeactivations.isNotEmpty()) {
+ val toDeactivate = outputDeactivations.removeFirst()
+ toDeactivate.upstream?.removeDownstreamAndDeactivateIfNeeded(
+ downstream = toDeactivate.schedulable
+ )
}
check(deactivations.isEmpty()) { "unexpected lingering deactivations" }
check(outputDeactivations.isEmpty()) { "unexpected lingering output deactivations" }
@@ -260,4 +283,39 @@ internal class ScheduledAction<T>(
}
}
-internal typealias TransactionStore = HeteroMap
+internal class TransactionStore private constructor(private val storage: HeteroMap) {
+ constructor(capacity: Int) : this(HeteroMap(capacity))
+
+ constructor() : this(HeteroMap())
+
+ operator fun <A> get(key: HeteroMap.Key<A>): A =
+ storage.getOrError(key) { "no value for $key in this transaction" }
+
+ operator fun <A> set(key: HeteroMap.Key<A>, value: A) {
+ storage[key] = value
+ }
+
+ fun clear() = storage.clear()
+}
+
+internal class TransactionCache<A> {
+ private val key = object : HeteroMap.Key<A> {}
+ @Volatile
+ var epoch: Long = Long.MIN_VALUE
+ private set
+
+ fun getOrPut(evalScope: EvalScope, block: () -> A): A =
+ if (epoch < evalScope.epoch) {
+ epoch = evalScope.epoch
+ block().also { evalScope.transactionStore[key] = it }
+ } else {
+ evalScope.transactionStore[key]
+ }
+
+ fun put(evalScope: EvalScope, value: A) {
+ epoch = evalScope.epoch
+ evalScope.transactionStore[key] = value
+ }
+
+ fun getCurrentValue(evalScope: EvalScope): A = evalScope.transactionStore[key]
+}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NoScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NoScope.kt
index fbd9689eb1d0..14e4e1cfc143 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NoScope.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NoScope.kt
@@ -17,31 +17,9 @@
package com.android.systemui.kairos.internal
import com.android.systemui.kairos.FrpScope
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.EmptyCoroutineContext
-import kotlin.coroutines.coroutineContext
-import kotlin.coroutines.startCoroutine
-import kotlinx.coroutines.CompletableDeferred
-import kotlinx.coroutines.completeWith
-import kotlinx.coroutines.job
internal object NoScope {
private object FrpScopeImpl : FrpScope
- suspend fun <R> runInFrpScope(block: suspend FrpScope.() -> R): R {
- val complete = CompletableDeferred<R>(coroutineContext.job)
- block.startCoroutine(
- FrpScopeImpl,
- object : Continuation<R> {
- override val context: CoroutineContext
- get() = EmptyCoroutineContext
-
- override fun resumeWith(result: Result<R>) {
- complete.completeWith(result)
- }
- },
- )
- return complete.await()
- }
+ fun <R> runInFrpScope(block: FrpScope.() -> R): R = FrpScopeImpl.block()
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt
index 7a015d8ca1f6..39b8bfe540d2 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt
@@ -22,18 +22,18 @@ Muxes + Branch
*/
internal sealed interface SchedulableNode {
/** schedule this node w/ given NodeEvalScope */
- suspend fun schedule(evalScope: EvalScope)
+ fun schedule(logIndent: Int, evalScope: EvalScope)
- suspend fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int)
+ fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int)
- suspend fun moveIndirectUpstreamToDirect(
+ fun moveIndirectUpstreamToDirect(
scheduler: Scheduler,
oldIndirectDepth: Int,
oldIndirectSet: Set<MuxDeferredNode<*, *, *>>,
newDirectDepth: Int,
)
- suspend fun adjustIndirectUpstream(
+ fun adjustIndirectUpstream(
scheduler: Scheduler,
oldDepth: Int,
newDepth: Int,
@@ -41,20 +41,20 @@ internal sealed interface SchedulableNode {
additions: Set<MuxDeferredNode<*, *, *>>,
)
- suspend fun moveDirectUpstreamToIndirect(
+ fun moveDirectUpstreamToIndirect(
scheduler: Scheduler,
oldDirectDepth: Int,
newIndirectDepth: Int,
newIndirectSet: Set<MuxDeferredNode<*, *, *>>,
)
- suspend fun removeIndirectUpstream(
+ fun removeIndirectUpstream(
scheduler: Scheduler,
depth: Int,
indirectSet: Set<MuxDeferredNode<*, *, *>>,
)
- suspend fun removeDirectUpstream(scheduler: Scheduler, depth: Int)
+ fun removeDirectUpstream(scheduler: Scheduler, depth: Int)
}
/*
@@ -66,7 +66,7 @@ internal sealed interface PullNode<out A> {
* will read from the cache, otherwise it will perform a full evaluation, even if invoked
* multiple times within a transaction.
*/
- suspend fun getPushEvent(evalScope: EvalScope): A
+ fun getPushEvent(logIndent: Int, evalScope: EvalScope): A
}
/*
@@ -74,19 +74,19 @@ Muxes + DmuxBranch
*/
internal sealed interface PushNode<A> : PullNode<A> {
- suspend fun hasCurrentValue(evalScope: EvalScope): Boolean
+ fun hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean
val depthTracker: DepthTracker
- suspend fun removeDownstream(downstream: Schedulable)
+ fun removeDownstream(downstream: Schedulable)
/** called during cleanup phase */
- suspend fun deactivateIfNeeded()
+ fun deactivateIfNeeded()
/** called from mux nodes after severs */
- suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope)
+ fun scheduleDeactivationIfNeeded(evalScope: EvalScope)
- suspend fun addDownstream(downstream: Schedulable)
+ fun addDownstream(downstream: Schedulable)
- suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable)
+ fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable)
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt
index 3373de05249c..38d8cf70b36e 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt
@@ -21,8 +21,8 @@ import kotlin.coroutines.EmptyCoroutineContext
internal class Output<A>(
val context: CoroutineContext = EmptyCoroutineContext,
- val onDeath: suspend () -> Unit = {},
- val onEmit: suspend EvalScope.(A) -> Unit,
+ val onDeath: () -> Unit = {},
+ val onEmit: EvalScope.(A) -> Unit,
) {
val schedulable = Schedulable.O(this)
@@ -33,23 +33,24 @@ internal class Output<A>(
private object NoResult
// invoked by network
- suspend fun visit(evalScope: EvalScope) {
+ fun visit(evalScope: EvalScope) {
val upstreamResult = result
check(upstreamResult !== NoResult) { "output visited with null upstream result" }
result = NoResult
@Suppress("UNCHECKED_CAST") evalScope.onEmit(upstreamResult as A)
}
- suspend fun kill() {
+ fun kill() {
onDeath()
}
- suspend fun schedule(evalScope: EvalScope) {
+ fun schedule(logIndent: Int, evalScope: EvalScope) {
result =
- checkNotNull(upstream) { "output scheduled with null upstream" }.getPushEvent(evalScope)
+ checkNotNull(upstream) { "output scheduled with null upstream" }
+ .getPushEvent(logIndent, evalScope)
evalScope.scheduleOutput(this)
}
}
-internal inline fun OneShot(crossinline onEmit: suspend EvalScope.() -> Unit): Output<Unit> =
+internal inline fun OneShot(crossinline onEmit: EvalScope.() -> Unit): Output<Unit> =
Output<Unit>(onEmit = { onEmit() }).apply { result = Unit }
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt
index 43b621fadc67..5ade401da1a5 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt
@@ -16,21 +16,23 @@
package com.android.systemui.kairos.internal
-import com.android.systemui.kairos.internal.util.Key
-import kotlinx.coroutines.CoroutineStart
-import kotlinx.coroutines.Deferred
+import com.android.systemui.kairos.internal.util.logDuration
internal val neverImpl: TFlowImpl<Nothing> = TFlowCheap { null }
-internal class MapNode<A, B>(val upstream: PullNode<A>, val transform: suspend EvalScope.(A) -> B) :
+internal class MapNode<A, B>(val upstream: PullNode<A>, val transform: EvalScope.(A, Int) -> B) :
PullNode<B> {
- override suspend fun getPushEvent(evalScope: EvalScope): B =
- evalScope.transform(upstream.getPushEvent(evalScope))
+ override fun getPushEvent(logIndent: Int, evalScope: EvalScope): B =
+ logDuration(logIndent, "MapNode.getPushEvent") {
+ val upstream =
+ logDuration("upstream event") { upstream.getPushEvent(currentLogIndent, evalScope) }
+ logDuration("transform") { evalScope.transform(upstream, currentLogIndent) }
+ }
}
internal inline fun <A, B> mapImpl(
- crossinline upstream: suspend EvalScope.() -> TFlowImpl<A>,
- noinline transform: suspend EvalScope.(A) -> B,
+ crossinline upstream: EvalScope.() -> TFlowImpl<A>,
+ noinline transform: EvalScope.(A, Int) -> B,
): TFlowImpl<B> = TFlowCheap { downstream ->
upstream().activate(evalScope = this, downstream)?.let { (connection, needsEval) ->
ActivationResult(
@@ -44,19 +46,29 @@ internal inline fun <A, B> mapImpl(
}
}
-internal class CachedNode<A>(val key: Key<Deferred<A>>, val upstream: PullNode<A>) : PullNode<A> {
- override suspend fun getPushEvent(evalScope: EvalScope): A {
- val deferred =
- evalScope.transactionStore.getOrPut(key) {
- evalScope.deferAsync(CoroutineStart.LAZY) { upstream.getPushEvent(evalScope) }
- }
- return deferred.await()
- }
+internal class CachedNode<A>(
+ private val transactionCache: TransactionCache<Lazy<A>>,
+ val upstream: PullNode<A>,
+) : PullNode<A> {
+ override fun getPushEvent(logIndent: Int, evalScope: EvalScope): A =
+ logDuration(logIndent, "CachedNode.getPushEvent") {
+ val deferred =
+ logDuration("CachedNode.getOrPut", false) {
+ transactionCache.getOrPut(evalScope) {
+ evalScope.deferAsync {
+ logDuration("CachedNode.getUpstreamEvent") {
+ upstream.getPushEvent(currentLogIndent, evalScope)
+ }
+ }
+ }
+ }
+ logDuration("await") { deferred.value }
+ }
}
internal fun <A> TFlowImpl<A>.cached(): TFlowImpl<A> {
- val key = object : Key<Deferred<A>> {}
- return TFlowCheap {
+ val key = TransactionCache<Lazy<A>>()
+ return TFlowCheap { it ->
activate(this, it)?.let { (connection, needsEval) ->
ActivationResult(
connection =
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt
index d046420517fe..0529bcb63c07 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt
@@ -14,66 +14,76 @@
* limitations under the License.
*/
-@file:OptIn(ExperimentalCoroutinesApi::class)
-
package com.android.systemui.kairos.internal
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.PriorityBlockingQueue
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.launch
+import com.android.systemui.kairos.internal.util.LogIndent
+import java.util.PriorityQueue
internal interface Scheduler {
- fun schedule(depth: Int, node: MuxNode<*, *, *, *>)
+ fun schedule(depth: Int, node: MuxNode<*, *, *>)
- fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *, *>)
+ fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>)
}
-internal class SchedulerImpl : Scheduler {
- val enqueued = ConcurrentHashMap<MuxNode<*, *, *, *>, Any>()
- val scheduledQ =
- PriorityBlockingQueue<Pair<Int, MuxNode<*, *, *, *>>>(16, compareBy { it.first })
+internal class SchedulerImpl(private val enqueue: (MuxNode<*, *, *>) -> Boolean) : Scheduler {
+ private val scheduledQ = PriorityQueue<Pair<Int, MuxNode<*, *, *>>>(compareBy { it.first })
- override fun schedule(depth: Int, node: MuxNode<*, *, *, *>) {
- if (enqueued.putIfAbsent(node, node) == null) {
+ override fun schedule(depth: Int, node: MuxNode<*, *, *>) {
+ if (enqueue(node)) {
scheduledQ.add(Pair(depth, node))
}
}
- override fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *, *>) {
+ override fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) {
schedule(Int.MIN_VALUE + indirectDepth, node)
}
- internal suspend fun drainEval(network: Network) {
- drain { runStep ->
- runStep { muxNode -> network.evalScope { muxNode.visit(this) } }
+ internal fun drainEval(logIndent: Int, network: Network): Int =
+ drain(logIndent) { runStep ->
+ runStep { muxNode ->
+ network.evalScope {
+ muxNode.markedForEvaluation = false
+ muxNode.visit(currentLogIndent, this)
+ }
+ }
// If any visited MuxPromptNodes had their depths increased, eagerly propagate those
// depth changes now before performing further network evaluation.
- network.compactor.drainCompact()
+ val numNodes = network.compactor.drainCompact(currentLogIndent)
+ logLn("promptly compacted $numNodes nodes")
}
- }
- internal suspend fun drainCompact() {
- drain { runStep -> runStep { muxNode -> muxNode.visitCompact(scheduler = this) } }
- }
+ internal fun drainCompact(logIndent: Int): Int =
+ drain(logIndent) { runStep ->
+ runStep { muxNode ->
+ muxNode.markedForCompaction = false
+ muxNode.visitCompact(scheduler = this@SchedulerImpl)
+ }
+ }
- private suspend inline fun drain(
+ private inline fun drain(
+ logIndent: Int,
crossinline onStep:
- suspend (
- runStep: suspend (visit: suspend (MuxNode<*, *, *, *>) -> Unit) -> Unit
- ) -> Unit
- ): Unit = coroutineScope {
+ LogIndent.(
+ runStep: LogIndent.(visit: LogIndent.(MuxNode<*, *, *>) -> Unit) -> Unit
+ ) -> Unit,
+ ): Int {
+ var total = 0
while (scheduledQ.isNotEmpty()) {
val maxDepth = scheduledQ.peek()?.first ?: error("Unexpected empty scheduler")
- onStep { visit -> runStep(maxDepth, visit) }
+ LogIndent(logIndent).onStep { visit ->
+ logDuration("step $maxDepth") {
+ val subtotal = runStep(maxDepth) { visit(it) }
+ logLn("visited $subtotal nodes")
+ total += subtotal
+ }
+ }
}
+ return total
}
- private suspend inline fun runStep(
- maxDepth: Int,
- crossinline visit: suspend (MuxNode<*, *, *, *>) -> Unit,
- ) = coroutineScope {
+ private inline fun runStep(maxDepth: Int, crossinline visit: (MuxNode<*, *, *>) -> Unit): Int {
+ var total = 0
+ val toVisit = mutableListOf<MuxNode<*, *, *>>()
while (scheduledQ.peek()?.first?.let { it <= maxDepth } == true) {
val (d, node) = scheduledQ.remove()
if (
@@ -82,11 +92,15 @@ internal class SchedulerImpl : Scheduler {
) {
scheduledQ.add(node.depthTracker.dirty_directDepth to node)
} else {
- launch {
- enqueued.remove(node)
- visit(node)
- }
+ total++
+ toVisit.add(node)
}
}
+
+ for (node in toVisit) {
+ visit(node)
+ }
+
+ return total
}
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt
index 94f94f510d48..48f69036df89 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt
@@ -30,25 +30,16 @@ import com.android.systemui.kairos.emptyTFlow
import com.android.systemui.kairos.groupByKey
import com.android.systemui.kairos.init
import com.android.systemui.kairos.internal.store.ConcurrentHashMapK
-import com.android.systemui.kairos.internal.util.mapValuesParallel
import com.android.systemui.kairos.mapCheap
import com.android.systemui.kairos.merge
import com.android.systemui.kairos.switch
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.map
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.EmptyCoroutineContext
-import kotlin.coroutines.startCoroutine
-import kotlinx.coroutines.CompletableDeferred
-import kotlinx.coroutines.Deferred
-import kotlinx.coroutines.completeWith
-import kotlinx.coroutines.job
internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: TFlow<Any>) :
StateScope, EvalScope by evalScope {
- private val endSignalOnce: TFlow<Any> = endSignal.nextOnlyInternal("StateScope.endSignal")
+ override val endSignalOnce: TFlow<Any> = endSignal.nextOnlyInternal("StateScope.endSignal")
private fun <A> TFlow<A>.truncateToScope(operatorName: String): TFlow<A> =
if (endSignalOnce === emptyTFlow) {
@@ -70,11 +61,11 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
}
private fun <A> TFlow<A>.toTStateInternal(operatorName: String, init: A): TState<A> =
- toTStateInternalDeferred(operatorName, CompletableDeferred(init))
+ toTStateInternalDeferred(operatorName, CompletableLazy(init))
private fun <A> TFlow<A>.toTStateInternalDeferred(
operatorName: String,
- init: Deferred<A>,
+ init: Lazy<A>,
): TState<A> {
val changes = this@toTStateInternalDeferred
val name = operatorName
@@ -89,7 +80,7 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
return TStateInit(constInit(name, impl))
}
- private fun <R> deferredInternal(block: suspend FrpStateScope.() -> R): FrpDeferredValue<R> =
+ private fun <R> deferredInternal(block: FrpStateScope.() -> R): FrpDeferredValue<R> =
FrpDeferredValue(deferAsync { runInStateScope(block) })
private fun <A> TFlow<A>.toTStateDeferredInternal(
@@ -102,30 +93,27 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
}
private fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyInternal(
- storage: TState<Map<K, TFlow<V>>>
+ name: String?,
+ storage: TState<Map<K, TFlow<V>>>,
): TFlow<Map<K, V>> {
- val name = "mergeIncrementally"
+ val patches =
+ mapImpl({ init.connect(this) }) { patch, _ ->
+ patch.mapValues { (_, m) -> m.map { flow -> flow.init.connect(this) } }.asIterable()
+ }
return TFlowInit(
constInit(
name,
switchDeferredImpl(
+ name = name,
getStorage = {
storage.init
.connect(this)
.getCurrentWithEpoch(this)
.first
- .mapValuesParallel { (_, flow) -> flow.init.connect(this) }
+ .mapValues { (_, flow) -> flow.init.connect(this) }
.asIterable()
},
- getPatches = {
- mapImpl({ init.connect(this) }) { patch ->
- patch
- .mapValuesParallel { (_, m) ->
- m.map { flow -> flow.init.connect(this) }
- }
- .asIterable()
- }
- },
+ getPatches = { patches },
storeFactory = ConcurrentHashMapK.Factory(),
)
.awaitValues(),
@@ -134,30 +122,27 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
}
private fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptInternal(
- storage: TState<Map<K, TFlow<V>>>
+ storage: TState<Map<K, TFlow<V>>>,
+ name: String?,
): TFlow<Map<K, V>> {
- val name = "mergeIncrementallyPrompt"
+ val patches =
+ mapImpl({ init.connect(this) }) { patch, _ ->
+ patch.mapValues { (_, m) -> m.map { flow -> flow.init.connect(this) } }.asIterable()
+ }
return TFlowInit(
constInit(
name,
switchPromptImpl(
+ name = name,
getStorage = {
storage.init
.connect(this)
.getCurrentWithEpoch(this)
.first
- .mapValuesParallel { (_, flow) -> flow.init.connect(this) }
+ .mapValues { (_, flow) -> flow.init.connect(this) }
.asIterable()
},
- getPatches = {
- mapImpl({ init.connect(this) }) { patch ->
- patch
- .mapValuesParallel { (_, m) ->
- m.map { flow -> flow.init.connect(this) }
- }
- .asIterable()
- }
- },
+ getPatches = { patches },
storeFactory = ConcurrentHashMapK.Factory(),
)
.awaitValues(),
@@ -170,9 +155,9 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
numKeys: Int?,
): Pair<TFlow<Map<K, Maybe<A>>>, FrpDeferredValue<Map<K, B>>> {
val eventsByKey: GroupedTFlow<K, Maybe<FrpStateful<A>>> = groupByKey(numKeys)
- val initOut: Deferred<Map<K, B>> = deferAsync {
- init.unwrapped.await().mapValuesParallel { (k, stateful) ->
- val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
+ val initOut: Lazy<Map<K, B>> = deferAsync {
+ init.unwrapped.value.mapValues { (k, stateful) ->
+ val newEnd = with(frpScope) { eventsByKey[k] }
val newScope = childStateScope(newEnd)
newScope.runInStateScope(stateful)
}
@@ -180,8 +165,8 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
val changesNode: TFlowImpl<Map<K, Maybe<A>>> =
mapImpl(
upstream = { this@applyLatestStatefulForKeyInternal.init.connect(evalScope = this) }
- ) { upstreamMap ->
- upstreamMap.mapValuesParallel { (k: K, ma: Maybe<FrpStateful<A>>) ->
+ ) { upstreamMap, _ ->
+ upstreamMap.mapValues { (k: K, ma: Maybe<FrpStateful<A>>) ->
reenterStateScope(this@StateScopeImpl).run {
ma.map { stateful ->
val newEnd = with(frpScope) { eventsByKey[k].skipNext() }
@@ -205,7 +190,7 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
name,
mapImpl(
upstream = { this@observeStatefulsInternal.init.connect(evalScope = this) }
- ) { stateful ->
+ ) { stateful, _ ->
reenterStateScope(outerScope = this@StateScopeImpl)
.runInStateScope(stateful)
}
@@ -219,25 +204,26 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
private inner class FrpStateScopeImpl :
FrpStateScope, FrpTransactionScope by evalScope.frpScope {
- override fun <A> deferredStateScope(
- block: suspend FrpStateScope.() -> A
- ): FrpDeferredValue<A> = deferredInternal(block)
+ override fun <A> deferredStateScope(block: FrpStateScope.() -> A): FrpDeferredValue<A> =
+ deferredInternal(block)
override fun <A> TFlow<A>.holdDeferred(initialValue: FrpDeferredValue<A>): TState<A> =
toTStateDeferredInternal(initialValue)
override fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
- initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
+ name: String?,
+ initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>,
): TFlow<Map<K, V>> {
val storage: TState<Map<K, TFlow<V>>> = foldMapIncrementally(initialTFlows)
- return mergeIncrementallyInternal(storage)
+ return mergeIncrementallyInternal(name, storage)
}
override fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
- initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
+ initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>,
+ name: String?,
): TFlow<Map<K, V>> {
val storage: TState<Map<K, TFlow<V>>> = foldMapIncrementally(initialTFlows)
- return mergeIncrementallyPromptInternal(storage)
+ return mergeIncrementallyPromptInternal(storage, name)
}
override fun <K, A, B> TFlow<Map<K, Maybe<FrpStateful<A>>>>.applyLatestStatefulForKey(
@@ -250,21 +236,7 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
observeStatefulsInternal()
}
- override suspend fun <R> runInStateScope(block: suspend FrpStateScope.() -> R): R {
- val complete = CompletableDeferred<R>(parent = coroutineContext.job)
- block.startCoroutine(
- frpScope,
- object : Continuation<R> {
- override val context: CoroutineContext
- get() = EmptyCoroutineContext
-
- override fun resumeWith(result: Result<R>) {
- complete.completeWith(result)
- }
- },
- )
- return complete.await()
- }
+ override fun <R> runInStateScope(block: FrpStateScope.() -> R): R = frpScope.block()
override fun childStateScope(newEnd: TFlow<Any>) =
StateScopeImpl(evalScope, merge(newEnd, endSignal))
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt
index 784a2afe0992..47a585abac5f 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt
@@ -18,7 +18,7 @@ package com.android.systemui.kairos.internal
/* Initialized TFlow */
internal fun interface TFlowImpl<out A> {
- suspend fun activate(evalScope: EvalScope, downstream: Schedulable): ActivationResult<A>?
+ fun activate(evalScope: EvalScope, downstream: Schedulable): ActivationResult<A>?
}
internal data class ActivationResult<out A>(
@@ -32,28 +32,27 @@ internal inline fun <A> TFlowCheap(crossinline cheap: CheapNodeSubscribe<A>) =
}
internal typealias CheapNodeSubscribe<A> =
- suspend EvalScope.(downstream: Schedulable) -> ActivationResult<A>?
+ EvalScope.(downstream: Schedulable) -> ActivationResult<A>?
internal data class NodeConnection<out A>(
val directUpstream: PullNode<A>,
val schedulerUpstream: PushNode<*>,
)
-internal suspend fun <A> NodeConnection<A>.hasCurrentValue(evalScope: EvalScope): Boolean =
- schedulerUpstream.hasCurrentValue(evalScope)
+internal fun <A> NodeConnection<A>.hasCurrentValue(logIndent: Int, evalScope: EvalScope): Boolean =
+ schedulerUpstream.hasCurrentValue(logIndent, evalScope)
-internal suspend fun <A> NodeConnection<A>.removeDownstreamAndDeactivateIfNeeded(
- downstream: Schedulable
-) = schedulerUpstream.removeDownstreamAndDeactivateIfNeeded(downstream)
+internal fun <A> NodeConnection<A>.removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) =
+ schedulerUpstream.removeDownstreamAndDeactivateIfNeeded(downstream)
-internal suspend fun <A> NodeConnection<A>.scheduleDeactivationIfNeeded(evalScope: EvalScope) =
+internal fun <A> NodeConnection<A>.scheduleDeactivationIfNeeded(evalScope: EvalScope) =
schedulerUpstream.scheduleDeactivationIfNeeded(evalScope)
-internal suspend fun <A> NodeConnection<A>.removeDownstream(downstream: Schedulable) =
+internal fun <A> NodeConnection<A>.removeDownstream(downstream: Schedulable) =
schedulerUpstream.removeDownstream(downstream)
-internal suspend fun <A> NodeConnection<A>.getPushEvent(evalScope: EvalScope): A =
- directUpstream.getPushEvent(evalScope)
+internal fun <A> NodeConnection<A>.getPushEvent(logIndent: Int, evalScope: EvalScope): A =
+ directUpstream.getPushEvent(logIndent, evalScope)
internal val <A> NodeConnection<A>.depthTracker: DepthTracker
get() = schedulerUpstream.depthTracker
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt
index 916f22575b0c..9565a9c12d38 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt
@@ -20,29 +20,22 @@ import com.android.systemui.kairos.internal.store.ConcurrentHashMapK
import com.android.systemui.kairos.internal.store.MutableArrayMapK
import com.android.systemui.kairos.internal.store.MutableMapK
import com.android.systemui.kairos.internal.store.StoreEntry
-import com.android.systemui.kairos.internal.util.Key
import com.android.systemui.kairos.internal.util.hashString
-import com.android.systemui.kairos.internal.util.launchImmediate
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.just
import com.android.systemui.kairos.util.none
import java.util.concurrent.atomic.AtomicLong
-import kotlinx.coroutines.CompletableDeferred
-import kotlinx.coroutines.CoroutineStart
-import kotlinx.coroutines.Deferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.coroutineScope
internal sealed interface TStateImpl<out A> {
val name: String?
val operatorName: String
val changes: TFlowImpl<A>
- suspend fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long>
+ fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long>
}
-internal sealed class TStateDerived<A>(override val changes: TFlowImpl<A>) :
- TStateImpl<A>, Key<Deferred<Pair<A, Long>>> {
+internal sealed class TStateDerived<A>(override val changes: TFlowImpl<A>) : TStateImpl<A> {
@Volatile
var invalidatedEpoch = Long.MIN_VALUE
@@ -52,12 +45,12 @@ internal sealed class TStateDerived<A>(override val changes: TFlowImpl<A>) :
protected var cache: Any? = EmptyCache
private set
- override suspend fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> =
- evalScope.transactionStore
- .getOrPut(this) { evalScope.deferAsync(CoroutineStart.LAZY) { pull(evalScope) } }
- .await()
+ private val transactionCache = TransactionCache<Lazy<Pair<A, Long>>>()
- suspend fun pull(evalScope: EvalScope): Pair<A, Long> {
+ override fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> =
+ transactionCache.getOrPut(evalScope) { evalScope.deferAsync { pull(evalScope) } }.value
+
+ fun pull(evalScope: EvalScope): Pair<A, Long> {
@Suppress("UNCHECKED_CAST")
return recalc(evalScope)?.also { (a, epoch) -> setCache(a, epoch) }
?: ((cache as A) to invalidatedEpoch)
@@ -75,7 +68,7 @@ internal sealed class TStateDerived<A>(override val changes: TFlowImpl<A>) :
return if (cache == EmptyCache) none else just(cache as A)
}
- protected abstract suspend fun recalc(evalScope: EvalScope): Pair<A, Long>?
+ protected abstract fun recalc(evalScope: EvalScope): Pair<A, Long>?
private data object EmptyCache
}
@@ -83,7 +76,7 @@ internal sealed class TStateDerived<A>(override val changes: TFlowImpl<A>) :
internal class TStateSource<A>(
override val name: String?,
override val operatorName: String,
- init: Deferred<A>,
+ init: Lazy<A>,
override val changes: TFlowImpl<A>,
) : TStateImpl<A> {
constructor(
@@ -91,33 +84,34 @@ internal class TStateSource<A>(
operatorName: String,
init: A,
changes: TFlowImpl<A>,
- ) : this(name, operatorName, CompletableDeferred(init), changes)
+ ) : this(name, operatorName, CompletableLazy(init), changes)
lateinit var upstreamConnection: NodeConnection<A>
// Note: Don't need to synchronize; we will never interleave reads and writes, since all writes
// are performed at the end of a network step, after any reads would have taken place.
- @Volatile private var _current: Deferred<A> = init
+ @Volatile private var _current: Lazy<A> = init
+
@Volatile
var writeEpoch = 0L
private set
- override suspend fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> =
- _current.await() to writeEpoch
+ override fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> =
+ _current.value to writeEpoch
/** called by network after eval phase has completed */
- suspend fun updateState(evalScope: EvalScope) {
+ fun updateState(logIndent: Int, evalScope: EvalScope) {
// write the latch
- _current = CompletableDeferred(upstreamConnection.getPushEvent(evalScope))
+ // TODO: deferAsync?
+ _current = CompletableLazy(upstreamConnection.getPushEvent(logIndent, evalScope))
writeEpoch = evalScope.epoch
}
override fun toString(): String = "TStateImpl(changes=$changes, current=$_current)"
@OptIn(ExperimentalCoroutinesApi::class)
- fun getStorageUnsafe(): Maybe<A> =
- if (_current.isCompleted) just(_current.getCompleted()) else none
+ fun getStorageUnsafe(): Maybe<A> = if (_current.isInitialized()) just(_current.value) else none
}
internal fun <A> constS(name: String?, operatorName: String, init: A): TStateImpl<A> =
@@ -127,8 +121,8 @@ internal inline fun <A> activatedTStateSource(
name: String?,
operatorName: String,
evalScope: EvalScope,
- crossinline getChanges: suspend EvalScope.() -> TFlowImpl<A>,
- init: Deferred<A>,
+ crossinline getChanges: EvalScope.() -> TFlowImpl<A>,
+ init: Lazy<A>,
): TStateImpl<A> {
lateinit var state: TStateSource<A>
val calm: TFlowImpl<A> =
@@ -167,19 +161,25 @@ private inline fun <A> TFlowImpl<A>.calm(
internal fun <A, B> TStateImpl<A>.mapCheap(
name: String?,
operatorName: String,
- transform: suspend EvalScope.(A) -> B,
+ transform: EvalScope.(A) -> B,
): TStateImpl<B> =
- DerivedMapCheap(name, operatorName, this, mapImpl({ changes }) { transform(it) }, transform)
+ DerivedMapCheap(
+ name,
+ operatorName,
+ this,
+ mapImpl({ changes }) { it, _ -> transform(it) },
+ transform,
+ )
internal class DerivedMapCheap<A, B>(
override val name: String?,
override val operatorName: String,
val upstream: TStateImpl<A>,
override val changes: TFlowImpl<B>,
- private val transform: suspend EvalScope.(A) -> B,
+ private val transform: EvalScope.(A) -> B,
) : TStateImpl<B> {
- override suspend fun getCurrentWithEpoch(evalScope: EvalScope): Pair<B, Long> {
+ override fun getCurrentWithEpoch(evalScope: EvalScope): Pair<B, Long> {
val (a, epoch) = upstream.getCurrentWithEpoch(evalScope)
return evalScope.transform(a) to epoch
}
@@ -190,10 +190,10 @@ internal class DerivedMapCheap<A, B>(
internal fun <A, B> TStateImpl<A>.map(
name: String?,
operatorName: String,
- transform: suspend EvalScope.(A) -> B,
+ transform: EvalScope.(A) -> B,
): TStateImpl<B> {
lateinit var state: TStateDerived<B>
- val mappedChanges = mapImpl({ changes }) { transform(it) }.cached().calm { state }
+ val mappedChanges = mapImpl({ changes }) { it, _ -> transform(it) }.cached().calm { state }
state = DerivedMap(name, operatorName, transform, this, mappedChanges)
return state
}
@@ -201,13 +201,13 @@ internal fun <A, B> TStateImpl<A>.map(
internal class DerivedMap<A, B>(
override val name: String?,
override val operatorName: String,
- private val transform: suspend EvalScope.(A) -> B,
+ private val transform: EvalScope.(A) -> B,
val upstream: TStateImpl<A>,
changes: TFlowImpl<B>,
) : TStateDerived<B>(changes) {
override fun toString(): String = "${this::class.simpleName}@$hashString"
- override suspend fun recalc(evalScope: EvalScope): Pair<B, Long>? {
+ override fun recalc(evalScope: EvalScope): Pair<B, Long>? {
val (a, epoch) = upstream.getCurrentWithEpoch(evalScope)
return if (epoch > invalidatedEpoch) {
evalScope.transform(a) to epoch
@@ -219,12 +219,13 @@ internal class DerivedMap<A, B>(
internal fun <A> TStateImpl<TStateImpl<A>>.flatten(name: String?, operator: String): TStateImpl<A> {
// emits the current value of the new inner state, when that state is emitted
- val switchEvents = mapImpl({ changes }) { newInner -> newInner.getCurrentWithEpoch(this).first }
+ val switchEvents =
+ mapImpl({ changes }) { newInner, _ -> newInner.getCurrentWithEpoch(this).first }
// emits the new value of the new inner state when that state is emitted, or
// falls back to the current value if a new state is *not* being emitted this
// transaction
val innerChanges =
- mapImpl({ changes }) { newInner ->
+ mapImpl({ changes }) { newInner, _ ->
mergeNodes({ switchEvents }, { newInner.changes }) { _, new -> new }
}
val switchedChanges: TFlowImpl<A> =
@@ -243,7 +244,7 @@ internal class DerivedFlatten<A>(
val upstream: TStateImpl<TStateImpl<A>>,
changes: TFlowImpl<A>,
) : TStateDerived<A>(changes) {
- override suspend fun recalc(evalScope: EvalScope): Pair<A, Long> {
+ override fun recalc(evalScope: EvalScope): Pair<A, Long> {
val (inner, epoch0) = upstream.getCurrentWithEpoch(evalScope)
val (a, epoch1) = inner.getCurrentWithEpoch(evalScope)
return a to maxOf(epoch0, epoch1)
@@ -256,7 +257,7 @@ internal class DerivedFlatten<A>(
internal inline fun <A, B> TStateImpl<A>.flatMap(
name: String?,
operatorName: String,
- noinline transform: suspend EvalScope.(A) -> TStateImpl<B>,
+ noinline transform: EvalScope.(A) -> TStateImpl<B>,
): TStateImpl<B> = map(null, operatorName, transform).flatten(name, operatorName)
internal fun <A, B, Z> zipStates(
@@ -264,7 +265,7 @@ internal fun <A, B, Z> zipStates(
operatorName: String,
l1: TStateImpl<A>,
l2: TStateImpl<B>,
- transform: suspend EvalScope.(A, B) -> Z,
+ transform: EvalScope.(A, B) -> Z,
): TStateImpl<Z> =
zipStateList(null, operatorName, listOf(l1, l2)).map(name, operatorName) {
@Suppress("UNCHECKED_CAST") transform(it[0] as A, it[1] as B)
@@ -276,7 +277,7 @@ internal fun <A, B, C, Z> zipStates(
l1: TStateImpl<A>,
l2: TStateImpl<B>,
l3: TStateImpl<C>,
- transform: suspend EvalScope.(A, B, C) -> Z,
+ transform: EvalScope.(A, B, C) -> Z,
): TStateImpl<Z> =
zipStateList(null, operatorName, listOf(l1, l2, l3)).map(name, operatorName) {
@Suppress("UNCHECKED_CAST") transform(it[0] as A, it[1] as B, it[2] as C)
@@ -289,7 +290,7 @@ internal fun <A, B, C, D, Z> zipStates(
l2: TStateImpl<B>,
l3: TStateImpl<C>,
l4: TStateImpl<D>,
- transform: suspend EvalScope.(A, B, C, D) -> Z,
+ transform: EvalScope.(A, B, C, D) -> Z,
): TStateImpl<Z> =
zipStateList(null, operatorName, listOf(l1, l2, l3, l4)).map(name, operatorName) {
@Suppress("UNCHECKED_CAST") transform(it[0] as A, it[1] as B, it[2] as C, it[3] as D)
@@ -303,7 +304,7 @@ internal fun <A, B, C, D, E, Z> zipStates(
l3: TStateImpl<C>,
l4: TStateImpl<D>,
l5: TStateImpl<E>,
- transform: suspend EvalScope.(A, B, C, D, E) -> Z,
+ transform: EvalScope.(A, B, C, D, E) -> Z,
): TStateImpl<Z> =
zipStateList(null, operatorName, listOf(l1, l2, l3, l4, l5)).map(name, operatorName) {
@Suppress("UNCHECKED_CAST")
@@ -333,11 +334,7 @@ internal fun <V> zipStateList(
name = name,
operatorName = operatorName,
numStates = states.size,
- states =
- states
- .asSequence()
- .mapIndexed { index, tStateImpl -> StoreEntry(index, tStateImpl) }
- .asIterable(),
+ states = states.asIterableWithIndex(),
storeFactory = MutableArrayMapK.Factory(),
)
// Like mapCheap, but with caching (or like map, but without the calm changes, as they are not
@@ -347,7 +344,7 @@ internal fun <V> zipStateList(
operatorName = operatorName,
transform = { arrayStore -> arrayStore.values.toList() },
upstream = zipped,
- changes = mapImpl({ zipped.changes }) { arrayStore -> arrayStore.values.toList() },
+ changes = mapImpl({ zipped.changes }) { arrayStore, _ -> arrayStore.values.toList() },
)
}
@@ -364,26 +361,22 @@ internal fun <W, K, A> zipStates(
val stateChanges = states.asSequence().map { (k, v) -> StoreEntry(k, v.changes) }.asIterable()
lateinit var state: DerivedZipped<W, K, A>
// No need for calm; invariant ensures that changes will only emit when there's a difference
+ val switchDeferredImpl =
+ switchDeferredImpl(
+ getStorage = { stateChanges },
+ getPatches = { neverImpl },
+ storeFactory = storeFactory,
+ )
val changes =
- mapImpl({
- switchDeferredImpl(
- getStorage = { stateChanges },
- getPatches = { neverImpl },
- storeFactory = storeFactory,
- )
- }) { patch ->
+ mapImpl({ switchDeferredImpl }) { patch, logIndent ->
val store = storeFactory.create<A>(numStates)
- coroutineScope {
- states.forEach { (k, state) ->
- launchImmediate {
- store[k] =
- if (patch.contains(k)) {
- patch.getValue(k).getPushEvent(evalScope = this@mapImpl)
- } else {
- state.getCurrentWithEpoch(evalScope = this@mapImpl).first
- }
+ states.forEach { (k, state) ->
+ store[k] =
+ if (patch.contains(k)) {
+ patch.getValue(k).getPushEvent(logIndent, evalScope = this@mapImpl)
+ } else {
+ state.getCurrentWithEpoch(evalScope = this@mapImpl).first
}
- }
}
store.also { state.setCache(it, epoch) }
}
@@ -408,17 +401,13 @@ internal class DerivedZipped<W, K, A>(
changes: TFlowImpl<MutableMapK<W, K, A>>,
private val storeFactory: MutableMapK.Factory<W, K>,
) : TStateDerived<MutableMapK<W, K, A>>(changes) {
- override suspend fun recalc(evalScope: EvalScope): Pair<MutableMapK<W, K, A>, Long> {
+ override fun recalc(evalScope: EvalScope): Pair<MutableMapK<W, K, A>, Long> {
val newEpoch = AtomicLong()
val store = storeFactory.create<A>(upstreamSize)
- coroutineScope {
- for ((key, value) in upstream) {
- launchImmediate {
- val (a, epoch) = value.getCurrentWithEpoch(evalScope)
- newEpoch.accumulateAndGet(epoch, ::maxOf)
- store[key] = a
- }
- }
+ for ((key, value) in upstream) {
+ val (a, epoch) = value.getCurrentWithEpoch(evalScope)
+ newEpoch.accumulateAndGet(epoch, ::maxOf)
+ store[key] = a
}
return store to newEpoch.get()
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TransactionalImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TransactionalImpl.kt
index 8647bdd5b7b1..13bd3b005871 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TransactionalImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TransactionalImpl.kt
@@ -16,31 +16,25 @@
package com.android.systemui.kairos.internal
-import com.android.systemui.kairos.internal.util.Key
import com.android.systemui.kairos.internal.util.hashString
-import kotlinx.coroutines.CoroutineStart
-import kotlinx.coroutines.Deferred
internal sealed class TransactionalImpl<out A> {
- data class Const<out A>(val value: Deferred<A>) : TransactionalImpl<A>()
+ data class Const<out A>(val value: Lazy<A>) : TransactionalImpl<A>()
+
+ class Impl<A>(val block: EvalScope.() -> A) : TransactionalImpl<A>() {
+ val cache = TransactionCache<Lazy<A>>()
- class Impl<A>(val block: suspend EvalScope.() -> A) : TransactionalImpl<A>(), Key<Deferred<A>> {
override fun toString(): String = "${this::class.simpleName}@$hashString"
}
}
@Suppress("NOTHING_TO_INLINE")
-internal inline fun <A> transactionalImpl(
- noinline block: suspend EvalScope.() -> A
-): TransactionalImpl<A> = TransactionalImpl.Impl(block)
+internal inline fun <A> transactionalImpl(noinline block: EvalScope.() -> A): TransactionalImpl<A> =
+ TransactionalImpl.Impl(block)
-internal fun <A> TransactionalImpl<A>.sample(evalScope: EvalScope): Deferred<A> =
+internal fun <A> TransactionalImpl<A>.sample(evalScope: EvalScope): Lazy<A> =
when (this) {
is TransactionalImpl.Const -> value
is TransactionalImpl.Impl ->
- evalScope.transactionStore
- .getOrPut(this) {
- evalScope.deferAsync(start = CoroutineStart.LAZY) { evalScope.block() }
- }
- .also { it.start() }
+ cache.getOrPut(evalScope) { evalScope.deferAsync { evalScope.block() } }
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt
index 33709a97da8f..4d183481898b 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt
@@ -21,13 +21,14 @@ import com.android.systemui.kairos.util.None
import com.android.systemui.kairos.util.just
import java.util.concurrent.ConcurrentHashMap
-internal interface Key<A>
-
private object NULL
-internal class HeteroMap {
+internal class HeteroMap private constructor(private val store: ConcurrentHashMap<Key<*>, Any>) {
+ interface Key<A> {}
+
+ constructor() : this(ConcurrentHashMap())
- private val store = ConcurrentHashMap<Key<*>, Any>()
+ constructor(capacity: Int) : this(ConcurrentHashMap(capacity))
@Suppress("UNCHECKED_CAST")
operator fun <A> get(key: Key<A>): Maybe<A> =
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt
index ebf9a66be0ae..13f884666182 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt
@@ -16,8 +16,6 @@
package com.android.systemui.kairos.internal.util
-import kotlinx.coroutines.CoroutineStart
-import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.yield
@@ -32,7 +30,7 @@ internal suspend inline fun <K, A, B : Any, M : MutableMap<K, B>> Map<K, A>
destination.also {
coroutineScope {
mapValues {
- async {
+ asyncImmediate {
yield()
block(it)
}
@@ -53,7 +51,7 @@ internal inline fun <K, A, B : Any, M : MutableMap<K, B>> Map<K, A>.mapValuesNot
internal suspend fun <A, B> Iterable<A>.mapParallel(transform: suspend (A) -> B): List<B> =
coroutineScope {
- map { async(start = CoroutineStart.LAZY) { transform(it) } }.awaitAll()
+ map { asyncImmediate { transform(it) } }.awaitAll()
}
internal suspend fun <K, A, B, M : MutableMap<K, B>> Map<K, A>.mapValuesParallelTo(
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/Util.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/Util.kt
index 6bb7f9f593aa..466a9f83b91f 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/Util.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/Util.kt
@@ -18,8 +18,13 @@
package com.android.systemui.kairos.internal.util
+import kotlin.contracts.ExperimentalContracts
+import kotlin.contracts.InvocationKind
+import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
+import kotlin.time.DurationUnit
+import kotlin.time.measureTimedValue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Deferred
@@ -31,6 +36,62 @@ import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.launch
import kotlinx.coroutines.newCoroutineContext
+private const val LogEnabled = false
+
+@Suppress("NOTHING_TO_INLINE")
+internal inline fun logLn(indent: Int = 0, message: Any?) {
+ if (!LogEnabled) return
+ log(indent, message)
+ println()
+}
+
+@Suppress("NOTHING_TO_INLINE")
+internal inline fun log(indent: Int = 0, message: Any?) {
+ if (!LogEnabled) return
+ printIndent(indent)
+ print(message)
+}
+
+@JvmInline
+internal value class LogIndent(val currentLogIndent: Int) {
+ @OptIn(ExperimentalContracts::class)
+ inline fun <R> logDuration(prefix: String, start: Boolean = true, block: LogIndent.() -> R): R {
+ contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
+ return logDuration(currentLogIndent, prefix, start, block)
+ }
+
+ @Suppress("NOTHING_TO_INLINE")
+ inline fun logLn(message: Any?) = logLn(currentLogIndent, message)
+}
+
+@OptIn(ExperimentalContracts::class)
+internal inline fun <R> logDuration(
+ indent: Int,
+ prefix: String,
+ start: Boolean = true,
+ block: LogIndent.() -> R,
+): R {
+ contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
+ if (!LogEnabled) return LogIndent(0).block()
+ if (start) {
+ logLn(indent, prefix)
+ }
+ val (result, duration) = measureTimedValue { LogIndent(indent + 1).block() }
+
+ printIndent(indent)
+ print(prefix)
+ print(": ")
+ println(duration.toString(DurationUnit.MICROSECONDS))
+ return result
+}
+
+@Suppress("NOTHING_TO_INLINE")
+private inline fun printIndent(indent: Int) {
+ for (i in 0 until indent) {
+ print(" ")
+ }
+}
+
internal fun <A> CoroutineScope.asyncImmediate(
start: CoroutineStart = CoroutineStart.UNDISPATCHED,
context: CoroutineContext = EmptyCoroutineContext,