diff options
author | 2024-10-28 17:47:10 -0400 | |
---|---|---|
committer | 2025-01-03 14:28:51 -0500 | |
commit | c271f98dd435af1010b38c7cfde0c52fb02a5489 (patch) | |
tree | a8253b33471491d2a7687bfc5cf2f82fa88a3b8f | |
parent | de547b2377ba5f683d83ae370b3f4873af591866 (diff) |
[kairos] optimize pull nodes
- use epoch to determine if there is a result within the current
transaction
- have mux nodes produce lazier results
- eliminate Maybe indirection in transaction store for pull nodes
- separate out "mapping" from "filtering", allowing for more
fine-grained control over caching.
Flag: EXEMPT unused
Test: atest kairos-tests
Change-Id: I8ea329d40bca1e792cf38c96be444db202a91333
19 files changed, 305 insertions, 335 deletions
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt index a175e2e20e46..1d8fe116d57b 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt @@ -26,11 +26,11 @@ import com.android.systemui.kairos.internal.TFlowImpl import com.android.systemui.kairos.internal.activated import com.android.systemui.kairos.internal.cached import com.android.systemui.kairos.internal.constInit -import com.android.systemui.kairos.internal.filterNode +import com.android.systemui.kairos.internal.filterImpl +import com.android.systemui.kairos.internal.filterJustImpl import com.android.systemui.kairos.internal.init import com.android.systemui.kairos.internal.map import com.android.systemui.kairos.internal.mapImpl -import com.android.systemui.kairos.internal.mapMaybeNode import com.android.systemui.kairos.internal.mergeNodes import com.android.systemui.kairos.internal.mergeNodesLeft import com.android.systemui.kairos.internal.neverImpl @@ -121,11 +121,8 @@ val <A> TState<A>.stateChanges: TFlow<A> * @see mapNotNull */ @ExperimentalFrpApi -fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> { - val pulse = - mapMaybeNode({ init.connect(evalScope = this) }) { runInTransactionScope { transform(it) } } - return TFlowInit(constInit(name = null, pulse)) -} +fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> = + map(transform).filterJust() /** * Returns a [TFlow] that contains only the non-null results of applying [transform] to each value @@ -140,14 +137,17 @@ fun <A, B> TFlow<A>.mapNotNull(transform: suspend FrpTransactionScope.(A) -> B?) } /** Returns a [TFlow] containing only values of the original [TFlow] that are not null. */ -@ExperimentalFrpApi fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapNotNull { it } +@ExperimentalFrpApi +fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapCheap { it.toMaybe() }.filterJust() /** Shorthand for `mapNotNull { it as? A }`. */ @ExperimentalFrpApi -inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapNotNull { it as? A } +inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapCheap { it as? A }.filterNotNull() /** Shorthand for `mapMaybe { it }`. */ -@ExperimentalFrpApi fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> = mapMaybe { it } +@ExperimentalFrpApi +fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> = + TFlowInit(constInit(name = null, filterJustImpl { init.connect(evalScope = this) })) /** * Returns a [TFlow] containing the results of applying [transform] to each value of the original @@ -203,8 +203,8 @@ fun <A> TFlow<A>.onEach(action: suspend FrpTransactionScope.(A) -> Unit): TFlow< @ExperimentalFrpApi fun <A> TFlow<A>.filter(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> { val pulse = - filterNode({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } } - return TFlowInit(constInit(name = null, pulse.cached())) + filterImpl({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } } + return TFlowInit(constInit(name = null, pulse)) } /** @@ -455,7 +455,9 @@ fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> { mapImpl({ patches }) { newFlow -> mapOf(Unit to just(newFlow.init.connect(this))) } }, ) - return TFlowInit(constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit) })) + return TFlowInit( + constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit).getPushEvent(this) }) + ) } /** diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt index 80e74748a375..8ad5f55adca3 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt @@ -29,7 +29,7 @@ import com.android.systemui.kairos.internal.activated import com.android.systemui.kairos.internal.cached import com.android.systemui.kairos.internal.constInit import com.android.systemui.kairos.internal.constS -import com.android.systemui.kairos.internal.filterNode +import com.android.systemui.kairos.internal.filterImpl import com.android.systemui.kairos.internal.flatMap import com.android.systemui.kairos.internal.init import com.android.systemui.kairos.internal.map @@ -469,7 +469,7 @@ internal constructor(internal val network: Network, initialValue: Deferred<T>) : val operatorName = "MutableTState" lateinit var state: TStateSource<T> val calm: TFlowImpl<T> = - filterNode({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new -> + filterImpl({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new -> new != state.getCurrentWithEpoch(evalScope = this).first } .cached() diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt index e7b99528fdfc..dd46fe202413 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt @@ -14,25 +14,17 @@ * limitations under the License. */ -@file:Suppress("NOTHING_TO_INLINE") - package com.android.systemui.kairos.internal import com.android.systemui.kairos.internal.util.hashString -import com.android.systemui.kairos.util.Just -import com.android.systemui.kairos.util.Maybe -import com.android.systemui.kairos.util.flatMap -import com.android.systemui.kairos.util.getMaybe import java.util.concurrent.ConcurrentHashMap -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock internal class DemuxNode<K, A>( - private val branchNodeByKey: ConcurrentHashMap<K, DemuxBranchNode<K, A>>, + private val branchNodeByKey: ConcurrentHashMap<K, DemuxNode<K, A>.BranchNode>, val lifecycle: DemuxLifecycle<K, A>, private val spec: DemuxActivator<K, A>, ) : SchedulableNode { @@ -44,25 +36,23 @@ internal class DemuxNode<K, A>( lateinit var upstreamConnection: NodeConnection<Map<K, A>> - fun getAndMaybeAddDownstream(key: K): DemuxBranchNode<K, A> = - branchNodeByKey.getOrPut(key) { DemuxBranchNode(key, this) } + @Volatile private var epoch: Long = Long.MIN_VALUE + + suspend fun hasCurrentValueLocked(evalScope: EvalScope, key: K): Boolean = + evalScope.epoch == epoch && upstreamConnection.getPushEvent(evalScope).contains(key) + + suspend fun hasCurrentValue(evalScope: EvalScope, key: K): Boolean = + mutex.withLock { hasCurrentValueLocked(evalScope, key) } - override suspend fun schedule(evalScope: EvalScope) { + fun getAndMaybeAddDownstream(key: K): BranchNode = + branchNodeByKey.getOrPut(key) { BranchNode(key) } + + override suspend fun schedule(evalScope: EvalScope) = coroutineScope { val upstreamResult = upstreamConnection.getPushEvent(evalScope) - if (upstreamResult is Just) { - coroutineScope { - val outerScope = this - mutex.withLock { - coroutineScope { - for ((key, _) in upstreamResult.value) { - launch { - branchNodeByKey[key]?.let { branch -> - outerScope.launch { branch.schedule(evalScope) } - } - } - } - } - } + mutex.withLock { + updateEpoch(evalScope) + for ((key, _) in upstreamResult) { + branchNodeByKey[key]?.let { branch -> launch { branch.schedule(evalScope) } } } } } @@ -194,58 +184,63 @@ internal class DemuxNode<K, A>( upstreamConnection.removeDownstreamAndDeactivateIfNeeded(downstream = schedulable) } } -} -internal class DemuxBranchNode<K, A>(val key: K, private val demuxNode: DemuxNode<K, A>) : - PushNode<A> { + fun updateEpoch(evalScope: EvalScope) { + epoch = evalScope.epoch + } - private val mutex = Mutex() + suspend fun getPushEvent(evalScope: EvalScope, key: K): A = + upstreamConnection.getPushEvent(evalScope).getValue(key) - val downstreamSet = DownstreamSet() + inner class BranchNode(val key: K) : PushNode<A> { - override val depthTracker: DepthTracker - get() = demuxNode.upstreamConnection.depthTracker + private val mutex = Mutex() - override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean = - demuxNode.upstreamConnection.hasCurrentValue(transactionStore) + val downstreamSet = DownstreamSet() - override suspend fun getPushEvent(evalScope: EvalScope): Maybe<A> = - demuxNode.upstreamConnection.getPushEvent(evalScope).flatMap { it.getMaybe(key) } + override val depthTracker: DepthTracker + get() = upstreamConnection.depthTracker - override suspend fun addDownstream(downstream: Schedulable) { - mutex.withLock { downstreamSet.add(downstream) } - } + override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = + hasCurrentValue(evalScope, key) - override suspend fun removeDownstream(downstream: Schedulable) { - mutex.withLock { downstreamSet.remove(downstream) } - } + override suspend fun getPushEvent(evalScope: EvalScope): A = getPushEvent(evalScope, key) - override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) { - val canDeactivate = - mutex.withLock { - downstreamSet.remove(downstream) - downstreamSet.isEmpty() + override suspend fun addDownstream(downstream: Schedulable) { + mutex.withLock { downstreamSet.add(downstream) } + } + + override suspend fun removeDownstream(downstream: Schedulable) { + mutex.withLock { downstreamSet.remove(downstream) } + } + + override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) { + val canDeactivate = + mutex.withLock { + downstreamSet.remove(downstream) + downstreamSet.isEmpty() + } + if (canDeactivate) { + removeDownstreamAndDeactivateIfNeeded(key) } - if (canDeactivate) { - demuxNode.removeDownstreamAndDeactivateIfNeeded(key) } - } - override suspend fun deactivateIfNeeded() { - if (mutex.withLock { downstreamSet.isEmpty() }) { - demuxNode.removeDownstreamAndDeactivateIfNeeded(key) + override suspend fun deactivateIfNeeded() { + if (mutex.withLock { downstreamSet.isEmpty() }) { + removeDownstreamAndDeactivateIfNeeded(key) + } } - } - override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) { - if (mutex.withLock { downstreamSet.isEmpty() }) { - evalScope.scheduleDeactivation(this) + override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) { + if (mutex.withLock { downstreamSet.isEmpty() }) { + evalScope.scheduleDeactivation(this) + } } - } - suspend fun schedule(evalScope: EvalScope) { - if (!coroutineScope { mutex.withLock { scheduleAll(downstreamSet, evalScope) } }) { - evalScope.scheduleDeactivation(this) + suspend fun schedule(evalScope: EvalScope) { + if (!coroutineScope { mutex.withLock { scheduleAll(downstreamSet, evalScope) } }) { + evalScope.scheduleDeactivation(this) + } } } } @@ -254,30 +249,39 @@ internal fun <K, A> DemuxImpl( upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>, numKeys: Int?, ): DemuxImpl<K, A> = - DemuxImpl( - DemuxLifecycle( - object : DemuxActivator<K, A> { - override suspend fun activate( - evalScope: EvalScope, - lifecycle: DemuxLifecycle<K, A>, - ): Pair<DemuxNode<K, A>, Boolean>? { - val dmux = DemuxNode(ConcurrentHashMap(numKeys ?: 16), lifecycle, this) - return upstream - .invoke(evalScope) - .activate(evalScope, downstream = dmux.schedulable) - ?.let { (conn, needsEval) -> - dmux.apply { upstreamConnection = conn } to needsEval - } - } + DemuxImpl(DemuxLifecycle(DemuxLifecycleState.Inactive(DemuxActivator(numKeys, upstream)))) + +internal class DemuxActivator<K, A>( + private val numKeys: Int?, + private val upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>, +) { + suspend fun activate( + evalScope: EvalScope, + lifecycle: DemuxLifecycle<K, A>, + ): Pair<DemuxNode<K, A>, Set<K>>? { + val demux = DemuxNode(ConcurrentHashMap(numKeys ?: 16), lifecycle, this) + return upstream + .invoke(evalScope) + .activate(evalScope, downstream = demux.schedulable) + ?.let { (conn, needsEval) -> + Pair( + demux.apply { upstreamConnection = conn }, + if (needsEval) { + demux.updateEpoch(evalScope) + conn.getPushEvent(evalScope).keys + } else { + emptySet() + }, + ) } - ) - ) + } +} internal class DemuxImpl<in K, out A>(private val dmux: DemuxLifecycle<K, A>) { fun eventsForKey(key: K): TFlowImpl<A> = TFlowCheap { downstream -> dmux.activate(evalScope = this, key)?.let { (branchNode, needsEval) -> branchNode.addDownstream(downstream) - val branchNeedsEval = needsEval && branchNode.getPushEvent(evalScope = this) is Just + val branchNeedsEval = needsEval && branchNode.hasCurrentValue(evalScope = this) ActivationResult( connection = NodeConnection(branchNode, branchNode), needsEval = branchNeedsEval, @@ -291,38 +295,29 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle override fun toString(): String = "TFlowDmuxState[$hashString][$lifecycleState][$mutex]" - suspend fun activate(evalScope: EvalScope, key: K): Pair<DemuxBranchNode<K, A>, Boolean>? = - coroutineScope { - mutex - .withLock { - when (val state = lifecycleState) { - is DemuxLifecycleState.Dead -> null - is DemuxLifecycleState.Active -> - state.node.getAndMaybeAddDownstream(key) to - async { - state.node.upstreamConnection.hasCurrentValue( - evalScope.transactionStore - ) - } - is DemuxLifecycleState.Inactive -> { - state.spec - .activate(evalScope, this@DemuxLifecycle) - .also { result -> - lifecycleState = - if (result == null) { - DemuxLifecycleState.Dead - } else { - DemuxLifecycleState.Active(result.first) - } - } - ?.let { (node, needsEval) -> - node.getAndMaybeAddDownstream(key) to - CompletableDeferred(needsEval) + suspend fun activate(evalScope: EvalScope, key: K): Pair<DemuxNode<K, A>.BranchNode, Boolean>? = + mutex.withLock { + when (val state = lifecycleState) { + is DemuxLifecycleState.Dead -> null + is DemuxLifecycleState.Active -> + state.node.getAndMaybeAddDownstream(key) to + state.node.hasCurrentValueLocked(evalScope, key) + is DemuxLifecycleState.Inactive -> { + state.spec + .activate(evalScope, this@DemuxLifecycle) + .also { result -> + lifecycleState = + if (result == null) { + DemuxLifecycleState.Dead + } else { + DemuxLifecycleState.Active(result.first) } } - } + ?.let { (node, needsEval) -> + node.getAndMaybeAddDownstream(key) to (key in needsEval) + } } - ?.let { (branch, result) -> branch to result.await() } + } } } @@ -337,13 +332,3 @@ internal sealed interface DemuxLifecycleState<out K, out A> { data object Dead : DemuxLifecycleState<Nothing, Nothing> } - -internal interface DemuxActivator<K, A> { - suspend fun activate( - evalScope: EvalScope, - lifecycle: DemuxLifecycle<K, A>, - ): Pair<DemuxNode<K, A>, Boolean>? -} - -internal inline fun <K, A> DemuxLifecycle(onSubscribe: DemuxActivator<K, A>) = - DemuxLifecycle(DemuxLifecycleState.Inactive(onSubscribe)) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt index 815473fe900f..afbd7120653c 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt @@ -57,7 +57,7 @@ internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope) TStateInit( constInit( "now", - mkState( + activatedTStateSource( "now", "now", this, diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt index bc06a3679d5c..030119394ac0 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt @@ -21,14 +21,12 @@ import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.none -internal inline fun <A, B> mapMaybeNode( - crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>, - crossinline f: suspend EvalScope.(A) -> Maybe<B>, -): TFlowImpl<B> { - return DemuxImpl( +internal inline fun <A> filterJustImpl( + crossinline getPulse: suspend EvalScope.() -> TFlowImpl<Maybe<A>> +): TFlowImpl<A> = + DemuxImpl( { - mapImpl(getPulse) { - val maybeResult = f(it) + mapImpl(getPulse) { maybeResult -> if (maybeResult is Just) { mapOf(Unit to maybeResult.value) } else { @@ -39,9 +37,8 @@ internal inline fun <A, B> mapMaybeNode( numKeys = 1, ) .eventsForKey(Unit) -} -internal inline fun <A> filterNode( +internal inline fun <A> filterImpl( crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>, crossinline f: suspend EvalScope.(A) -> Boolean, -): TFlowImpl<A> = mapMaybeNode(getPulse) { if (f(it)) just(it) else none } +): TFlowImpl<A> = filterJustImpl { mapImpl(getPulse) { if (f(it)) just(it) else none }.cached() } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt index 8efaf79b18b2..1edc8c28b2ee 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt @@ -17,8 +17,6 @@ package com.android.systemui.kairos.internal import com.android.systemui.kairos.internal.util.Key -import com.android.systemui.kairos.util.Maybe -import com.android.systemui.kairos.util.just import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.sync.Mutex @@ -29,16 +27,18 @@ internal class InputNode<A>( private val deactivate: () -> Unit = {}, ) : PushNode<A>, Key<A> { - internal val downstreamSet = DownstreamSet() + private val downstreamSet = DownstreamSet() private val mutex = Mutex() private val activated = AtomicBoolean(false) + @Volatile private var epoch: Long = Long.MIN_VALUE + override val depthTracker: DepthTracker = DepthTracker() - override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean = - transactionStore.contains(this) + override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = epoch == evalScope.epoch suspend fun visit(evalScope: EvalScope, value: A) { + epoch = evalScope.epoch evalScope.setResult(this, value) coroutineScope { if (!mutex.withLock { scheduleAll(downstreamSet, evalScope) }) { @@ -90,8 +90,7 @@ internal class InputNode<A>( } } - override suspend fun getPushEvent(evalScope: EvalScope): Maybe<A> = - evalScope.getCurrentValue(this) + override suspend fun getPushEvent(evalScope: EvalScope): A = evalScope.getCurrentValue(this) } internal fun <A> InputNode<A>.activated() = TFlowCheap { downstream -> @@ -104,7 +103,7 @@ internal data object AlwaysNode : PushNode<Unit> { override val depthTracker = DepthTracker() - override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean = true + override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = true override suspend fun removeDownstream(downstream: Schedulable) {} @@ -116,5 +115,5 @@ internal data object AlwaysNode : PushNode<Unit> { override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {} - override suspend fun getPushEvent(evalScope: EvalScope): Maybe<Unit> = just(Unit) + override suspend fun getPushEvent(evalScope: EvalScope) = Unit } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt index 69994ba6e866..69ecafd26ba2 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt @@ -22,7 +22,6 @@ import com.android.systemui.kairos.FrpTransactionScope import com.android.systemui.kairos.TFlow import com.android.systemui.kairos.internal.util.HeteroMap import com.android.systemui.kairos.internal.util.Key -import com.android.systemui.kairos.util.Maybe internal interface InitScope { val networkId: Any @@ -75,6 +74,7 @@ internal fun <A> NetworkScope.setResult(node: Key<A>, result: A) { transactionStore[node] = result } -internal fun <A> NetworkScope.getCurrentValue(key: Key<A>): Maybe<A> = transactionStore[key] +internal fun <A> NetworkScope.getCurrentValue(key: Key<A>): A = + transactionStore.getOrError(key) { "No value for $key in transaction $epoch" } internal fun NetworkScope.hasCurrentValue(key: Key<*>): Boolean = transactionStore.contains(key) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt index af68a1e3d83c..1fc5470ef354 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt @@ -20,13 +20,14 @@ package com.android.systemui.kairos.internal import com.android.systemui.kairos.internal.util.ConcurrentNullableHashMap import com.android.systemui.kairos.internal.util.hashString -import com.android.systemui.kairos.util.Just import java.util.concurrent.ConcurrentHashMap import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock -/** Base class for muxing nodes, which have a potentially dynamic collection of upstream nodes. */ +internal typealias MuxResult<K, V> = Map<K, PullNode<V>> + +/** Base class for muxing nodes, which have a (potentially dynamic) collection of upstream nodes. */ internal sealed class MuxNode<K : Any, V, Output>(val lifecycle: MuxLifecycle<Output>) : PushNode<Output> { @@ -34,13 +35,22 @@ internal sealed class MuxNode<K : Any, V, Output>(val lifecycle: MuxLifecycle<Ou get() = lifecycle.mutex // TODO: preserve insertion order? - val upstreamData = ConcurrentNullableHashMap<K, V>() + val upstreamData = ConcurrentNullableHashMap<K, PullNode<V>>() val switchedIn = ConcurrentHashMap<K, MuxBranchNode<K, V>>() val downstreamSet: DownstreamSet = DownstreamSet() // TODO: inline DepthTracker? would need to be added to PushNode signature final override val depthTracker = DepthTracker() + @Volatile + var epoch: Long = Long.MIN_VALUE + protected set + + inline fun hasCurrentValueLocked(evalScope: EvalScope): Boolean = epoch == evalScope.epoch + + override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = + mutex.withLock { hasCurrentValueLocked(evalScope) } + final override suspend fun addDownstream(downstream: Schedulable) { mutex.withLock { addDownstreamLocked(downstream) } } @@ -187,8 +197,6 @@ internal sealed class MuxNode<K : Any, V, Output>(val lifecycle: MuxLifecycle<Ou } } - abstract fun hasCurrentValueLocked(transactionStore: TransactionStore): Boolean - fun schedule(evalScope: EvalScope) { // TODO: Potential optimization // Detect if this node is guaranteed to have a single upstream within this transaction, @@ -207,11 +215,8 @@ internal class MuxBranchNode<K : Any, V>(private val muxNode: MuxNode<K, V, *>, @Volatile lateinit var upstream: NodeConnection<V> override suspend fun schedule(evalScope: EvalScope) { - val upstreamResult = upstream.getPushEvent(evalScope) - if (upstreamResult is Just) { - muxNode.upstreamData[key] = upstreamResult.value - muxNode.schedule(evalScope) - } + muxNode.upstreamData[key] = upstream.directUpstream + muxNode.schedule(evalScope) } override suspend fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) { @@ -288,7 +293,7 @@ internal class MuxLifecycle<A>(@Volatile var lifecycleState: MuxLifecycleState<A state.node.addDownstreamLocked(downstream) ActivationResult( connection = NodeConnection(state.node, state.node), - needsEval = state.node.hasCurrentValueLocked(evalScope.transactionStore), + needsEval = state.node.hasCurrentValueLocked(evalScope), ) } is MuxLifecycleState.Inactive -> { @@ -332,3 +337,6 @@ internal interface MuxActivator<A> { internal inline fun <A> MuxLifecycle(onSubscribe: MuxActivator<A>): TFlowImpl<A> = MuxLifecycle(MuxLifecycleState.Inactive(onSubscribe)) + +internal fun <K, V> TFlowImpl<MuxResult<K, V>>.awaitValues(): TFlowImpl<Map<K, V>> = + mapImpl({ this@awaitValues }) { results -> results.mapValues { it.value.getPushEvent(this) } } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt index 3b9502a5d812..338ee0145530 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt @@ -33,7 +33,7 @@ import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.maybeThat import com.android.systemui.kairos.util.maybeThis import com.android.systemui.kairos.util.merge -import com.android.systemui.kairos.util.orElseGet +import com.android.systemui.kairos.util.orError import com.android.systemui.kairos.util.partitionEithers import com.android.systemui.kairos.util.these import java.util.TreeMap @@ -42,21 +42,15 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.sync.withLock internal class MuxDeferredNode<K : Any, V>( - lifecycle: MuxLifecycle<Map<K, V>>, - val spec: MuxActivator<Map<K, V>>, -) : MuxNode<K, V, Map<K, V>>(lifecycle), Key<Map<K, V>> { + lifecycle: MuxLifecycle<Map<K, PullNode<V>>>, + val spec: MuxActivator<Map<K, PullNode<V>>>, +) : MuxNode<K, V, Map<K, PullNode<V>>>(lifecycle), Key<Map<K, PullNode<V>>> { val schedulable = Schedulable.M(this) @Volatile var patches: NodeConnection<Map<K, Maybe<TFlowImpl<V>>>>? = null @Volatile var patchData: Map<K, Maybe<TFlowImpl<V>>>? = null - override fun hasCurrentValueLocked(transactionStore: TransactionStore): Boolean = - transactionStore.contains(this) - - override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean = - mutex.withLock { hasCurrentValueLocked(transactionStore) } - override suspend fun visit(evalScope: EvalScope) { val result = upstreamData.toMap() upstreamData.clear() @@ -74,6 +68,7 @@ internal class MuxDeferredNode<K : Any, V>( ) } if (scheduleDownstream) { + epoch = evalScope.epoch evalScope.setResult(this@MuxDeferredNode, result) if (!scheduleAll(downstreamSet, evalScope)) { evalScope.scheduleDeactivation(this@MuxDeferredNode) @@ -84,7 +79,7 @@ internal class MuxDeferredNode<K : Any, V>( } } - override suspend fun getPushEvent(evalScope: EvalScope): Maybe<Map<K, V>> = + override suspend fun getPushEvent(evalScope: EvalScope): Map<K, PullNode<V>> = evalScope.getCurrentValue(key = this) private suspend fun compactIfNeeded(evalScope: EvalScope) { @@ -282,7 +277,6 @@ internal class MuxDeferredNode<K : Any, V>( patchData = checkNotNull(patches) { "mux mover scheduled with unset patches upstream node" } .getPushEvent(evalScope) - .orElseGet { null } evalScope.scheduleMuxMover(this) } @@ -299,20 +293,20 @@ internal inline fun <A> switchDeferredImplSingle( getPatches = { mapImpl(getPatches) { newFlow -> mapOf(Unit to just(newFlow)) } }, ) }) { map -> - map.getValue(Unit) + map.getValue(Unit).getPushEvent(this) } -internal fun <K : Any, A> switchDeferredImpl( - getStorage: suspend EvalScope.() -> Map<K, TFlowImpl<A>>, - getPatches: suspend EvalScope.() -> TFlowImpl<Map<K, Maybe<TFlowImpl<A>>>>, -): TFlowImpl<Map<K, A>> = +internal fun <K : Any, V> switchDeferredImpl( + getStorage: suspend EvalScope.() -> Map<K, TFlowImpl<V>>, + getPatches: suspend EvalScope.() -> TFlowImpl<Map<K, Maybe<TFlowImpl<V>>>>, +): TFlowImpl<Map<K, PullNode<V>>> = MuxLifecycle( - object : MuxActivator<Map<K, A>> { + object : MuxActivator<Map<K, PullNode<V>>> { override suspend fun activate( evalScope: EvalScope, - lifecycle: MuxLifecycle<Map<K, A>>, - ): MuxNode<*, *, Map<K, A>>? { - val storage: Map<K, TFlowImpl<A>> = getStorage(evalScope) + lifecycle: MuxLifecycle<Map<K, PullNode<V>>>, + ): MuxNode<*, *, Map<K, PullNode<V>>>? { + val storage: Map<K, TFlowImpl<V>> = getStorage(evalScope) // Initialize mux node and switched-in connections. val muxNode = MuxDeferredNode(lifecycle, this).apply { @@ -324,10 +318,7 @@ internal fun <K : Any, A> switchDeferredImpl( .apply { upstream = conn } .also { if (needsEval) { - val result = conn.getPushEvent(evalScope) - if (result is Just) { - upstreamData[key] = result.value - } + upstreamData[key] = conn.directUpstream } } } @@ -398,11 +389,8 @@ internal fun <K : Any, A> switchDeferredImpl( // Schedule mover to process patch emission at the end of this transaction, if // needed. if (needsEval) { - val result = patchesConn.getPushEvent(evalScope) - if (result is Just) { - muxNode.patchData = result.value - evalScope.scheduleMuxMover(muxNode) - } + muxNode.patchData = patchesConn.getPushEvent(evalScope) + evalScope.scheduleMuxMover(muxNode) } } @@ -440,9 +428,9 @@ internal inline fun <A, B> mergeNodes( val switchNode = switchDeferredImpl(getStorage = { storage }, getPatches = { neverImpl }) val merged = mapImpl({ switchNode }) { mergeResults -> - val first = mergeResults.getMaybe(0).flatMap { it.maybeThis() } - val second = mergeResults.getMaybe(1).flatMap { it.maybeThat() } - these(first, second).orElseGet { error("unexpected missing merge result") } + val first = mergeResults.getMaybe(0).flatMap { it.getPushEvent(this).maybeThis() } + val second = mergeResults.getMaybe(1).flatMap { it.getPushEvent(this).maybeThat() } + these(first, second).orError { "unexpected missing merge result" } } return merged.cached() } @@ -455,7 +443,10 @@ internal inline fun <A> mergeNodes( getStorage = { getPulses().associateByIndexTo(TreeMap()) }, getPatches = { neverImpl }, ) - val merged = mapImpl({ switchNode }) { mergeResults -> mergeResults.values.toList() } + val merged = + mapImpl({ switchNode }) { mergeResults -> + mergeResults.values.map { it.getPushEvent(this) } + } return merged.cached() } @@ -468,6 +459,8 @@ internal inline fun <A> mergeNodesLeft( getPatches = { neverImpl }, ) val merged = - mapImpl({ switchNode }) { mergeResults: Map<Int, A> -> mergeResults.values.first() } + mapImpl({ switchNode }) { mergeResults: Map<Int, PullNode<A>> -> + mergeResults.values.first().getPushEvent(this) + } return merged.cached() } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt index b291c879b449..dd0357b0413d 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt @@ -25,36 +25,26 @@ import com.android.systemui.kairos.util.Left import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.None import com.android.systemui.kairos.util.Right -import com.android.systemui.kairos.util.filterJust -import com.android.systemui.kairos.util.map import com.android.systemui.kairos.util.partitionEithers import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.async -import kotlinx.coroutines.awaitAll import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlinx.coroutines.sync.withLock +private typealias MuxPromptMovingResult<K, V> = Pair<MuxResult<K, V>, MuxResult<K, V>?> + internal class MuxPromptMovingNode<K : Any, V>( - lifecycle: MuxLifecycle<Pair<Map<K, V>, Map<K, PullNode<V>>?>>, - private val spec: MuxActivator<Pair<Map<K, V>, Map<K, PullNode<V>>?>>, -) : - MuxNode<K, V, Pair<Map<K, V>, Map<K, PullNode<V>>?>>(lifecycle), - Key<Pair<Map<K, V>, Map<K, PullNode<V>>?>> { + lifecycle: MuxLifecycle<MuxPromptMovingResult<K, V>>, + private val spec: MuxActivator<MuxPromptMovingResult<K, V>>, +) : MuxNode<K, V, MuxPromptMovingResult<K, V>>(lifecycle), Key<MuxPromptMovingResult<K, V>> { @Volatile var patchData: Map<K, Maybe<TFlowImpl<V>>>? = null @Volatile var patches: MuxPromptPatchNode<K, V>? = null - @Volatile private var reEval: Pair<Map<K, V>, Map<K, PullNode<V>>?>? = null - - override fun hasCurrentValueLocked(transactionStore: TransactionStore): Boolean = - transactionStore.contains(this) - - override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean = - mutex.withLock { hasCurrentValueLocked(transactionStore) } + @Volatile private var reEval: MuxPromptMovingResult<K, V>? = null override suspend fun visit(evalScope: EvalScope) { - val preSwitchResults: Map<K, V> = upstreamData.toMap() + val preSwitchResults: MuxResult<K, V> = upstreamData.toMap() upstreamData.clear() val patch: Map<K, Maybe<TFlowImpl<V>>>? = patchData @@ -85,6 +75,7 @@ internal class MuxPromptMovingNode<K : Any, V>( adjustDownstreamDepths(evalScope, coroutineScope = this) } if (evalResult != null) { + epoch = evalScope.epoch evalScope.setResult(this@MuxPromptMovingNode, evalResult) if (!scheduleAll(downstreamSet, evalScope)) { evalScope.scheduleDeactivation(this@MuxPromptMovingNode) @@ -97,11 +88,11 @@ internal class MuxPromptMovingNode<K : Any, V>( } private suspend fun doEval( - preSwitchResults: Map<K, V>, + preSwitchResults: MuxResult<K, V>, patch: Map<K, Maybe<TFlowImpl<V>>>?, evalScope: EvalScope, - ): Pair<Boolean, Pair<Map<K, V>, Map<K, PullNode<V>>?>?> { - val newlySwitchedIn: Map<K, PullNode<V>>? = + ): Pair<Boolean, MuxPromptMovingResult<K, V>?> { + val newlySwitchedIn: MuxResult<K, V>? = patch?.let { // We have a patch, process additions/updates and removals val (adds, removes) = @@ -194,10 +185,7 @@ internal class MuxPromptMovingNode<K : Any, V>( } } - private suspend fun adjustDownstreamDepths( - evalScope: EvalScope, - coroutineScope: CoroutineScope, - ) { + private fun adjustDownstreamDepths(evalScope: EvalScope, coroutineScope: CoroutineScope) { if (depthTracker.dirty_depthIncreased()) { // schedule downstream nodes on the compaction scheduler; this scheduler is drained at // the end of this eval depth, so that all depth increases are applied before we advance @@ -215,9 +203,8 @@ internal class MuxPromptMovingNode<K : Any, V>( } } - override suspend fun getPushEvent( - evalScope: EvalScope - ): Maybe<Pair<Map<K, V>, Map<K, PullNode<V>>?>> = evalScope.getCurrentValue(key = this) + override suspend fun getPushEvent(evalScope: EvalScope): MuxPromptMovingResult<K, V> = + evalScope.getCurrentValue(key = this) override suspend fun doDeactivate() { // Update lifecycle @@ -264,18 +251,11 @@ internal class MuxPromptMovingNode<K : Any, V>( } internal class MuxPromptEvalNode<K, V>( - private val movingNode: PullNode<Pair<Map<K, V>, Map<K, PullNode<V>>?>> -) : PullNode<Map<K, V>> { - override suspend fun getPushEvent(evalScope: EvalScope): Maybe<Map<K, V>> = - movingNode.getPushEvent(evalScope).map { (preSwitchResults, newlySwitchedIn) -> - coroutineScope { - newlySwitchedIn - ?.map { (k, v) -> async { v.getPushEvent(evalScope).map { k to it } } } - ?.awaitAll() - ?.asSequence() - ?.filterJust() - ?.toMap(preSwitchResults.toMutableMap()) ?: preSwitchResults - } + private val movingNode: PullNode<MuxPromptMovingResult<K, V>> +) : PullNode<MuxResult<K, V>> { + override suspend fun getPushEvent(evalScope: EvalScope): MuxResult<K, V> = + movingNode.getPushEvent(evalScope).let { (preSwitchResults, newlySwitchedIn) -> + newlySwitchedIn?.toMap(preSwitchResults.toMutableMap()) ?: preSwitchResults } } @@ -288,11 +268,8 @@ internal class MuxPromptPatchNode<K : Any, V>(private val muxNode: MuxPromptMovi lateinit var upstream: NodeConnection<Map<K, Maybe<TFlowImpl<V>>>> override suspend fun schedule(evalScope: EvalScope) { - val upstreamResult = upstream.getPushEvent(evalScope) - if (upstreamResult is Just) { - muxNode.patchData = upstreamResult.value - muxNode.schedule(evalScope) - } + muxNode.patchData = upstream.getPushEvent(evalScope) + muxNode.schedule(evalScope) } override suspend fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) { @@ -350,18 +327,18 @@ internal class MuxPromptPatchNode<K : Any, V>(private val muxNode: MuxPromptMovi } } -internal fun <K : Any, A> switchPromptImpl( - getStorage: suspend EvalScope.() -> Map<K, TFlowImpl<A>>, - getPatches: suspend EvalScope.() -> TFlowImpl<Map<K, Maybe<TFlowImpl<A>>>>, -): TFlowImpl<Map<K, A>> { +internal fun <K : Any, V> switchPromptImpl( + getStorage: suspend EvalScope.() -> Map<K, TFlowImpl<V>>, + getPatches: suspend EvalScope.() -> TFlowImpl<Map<K, Maybe<TFlowImpl<V>>>>, +): TFlowImpl<MuxResult<K, V>> { val moving = MuxLifecycle( - object : MuxActivator<Pair<Map<K, A>, Map<K, PullNode<A>>?>> { + object : MuxActivator<MuxPromptMovingResult<K, V>> { override suspend fun activate( evalScope: EvalScope, - lifecycle: MuxLifecycle<Pair<Map<K, A>, Map<K, PullNode<A>>?>>, - ): MuxNode<*, *, Pair<Map<K, A>, Map<K, PullNode<A>>?>>? { - val storage: Map<K, TFlowImpl<A>> = getStorage(evalScope) + lifecycle: MuxLifecycle<MuxPromptMovingResult<K, V>>, + ): MuxNode<*, *, MuxPromptMovingResult<K, V>>? { + val storage: Map<K, TFlowImpl<V>> = getStorage(evalScope) // Initialize mux node and switched-in connections. val movingNode = MuxPromptMovingNode(lifecycle, this).apply { @@ -379,11 +356,7 @@ internal fun <K : Any, A> switchPromptImpl( .apply { upstream = conn } .also { if (needsEval) { - val result = - conn.getPushEvent(evalScope) - if (result is Just) { - upstreamData[key] = result.value - } + upstreamData[key] = conn.directUpstream } } } @@ -401,10 +374,7 @@ internal fun <K : Any, A> switchPromptImpl( patches = patchNode if (needsEval) { - val result = conn.getPushEvent(evalScope) - if (result is Just) { - patchData = result.value - } + patchData = conn.getPushEvent(evalScope) } } } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt index 599b18695034..cc36fda0c3ae 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt @@ -141,7 +141,10 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope { } while (evalScope { evalOutputs(this) }) // Update states evalScope { evalStateWriters(this) } + // Invalidate caches + // Note: this needs to occur before deferred switches transactionStore.clear() + epoch++ // Perform deferred switches evalScope { evalMuxMovers(this) } // Compact depths @@ -149,7 +152,6 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope { compactor.drainCompact() // Deactivate nodes with no downstream evalDeactivations() - epoch++ } /** Invokes all [Output]s that have received data within this transaction. */ diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt index 000240796a82..b9bef059d4b0 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt @@ -16,8 +16,6 @@ package com.android.systemui.kairos.internal -import com.android.systemui.kairos.util.Maybe - /* Dmux Muxes + Branch @@ -68,7 +66,7 @@ internal sealed interface PullNode<out A> { * will read from the cache, otherwise it will perform a full evaluation, even if invoked * multiple times within a transaction. */ - suspend fun getPushEvent(evalScope: EvalScope): Maybe<A> + suspend fun getPushEvent(evalScope: EvalScope): A } /* @@ -76,7 +74,7 @@ Muxes + DmuxBranch */ internal sealed interface PushNode<A> : PullNode<A> { - suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean + suspend fun hasCurrentValue(evalScope: EvalScope): Boolean val depthTracker: DepthTracker diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt index a3af2d304f7f..3373de05249c 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt @@ -16,7 +16,6 @@ package com.android.systemui.kairos.internal -import com.android.systemui.kairos.util.Just import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext @@ -37,7 +36,7 @@ internal class Output<A>( suspend fun visit(evalScope: EvalScope) { val upstreamResult = result check(upstreamResult !== NoResult) { "output visited with null upstream result" } - result = null + result = NoResult @Suppress("UNCHECKED_CAST") evalScope.onEmit(upstreamResult as A) } @@ -46,12 +45,9 @@ internal class Output<A>( } suspend fun schedule(evalScope: EvalScope) { - val upstreamResult = + result = checkNotNull(upstream) { "output scheduled with null upstream" }.getPushEvent(evalScope) - if (upstreamResult is Just) { - result = upstreamResult.value - evalScope.scheduleOutput(this) - } + evalScope.scheduleOutput(this) } } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt index dac98e0e807c..43b621fadc67 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt @@ -17,8 +17,6 @@ package com.android.systemui.kairos.internal import com.android.systemui.kairos.internal.util.Key -import com.android.systemui.kairos.util.Maybe -import com.android.systemui.kairos.util.map import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Deferred @@ -26,8 +24,8 @@ internal val neverImpl: TFlowImpl<Nothing> = TFlowCheap { null } internal class MapNode<A, B>(val upstream: PullNode<A>, val transform: suspend EvalScope.(A) -> B) : PullNode<B> { - override suspend fun getPushEvent(evalScope: EvalScope): Maybe<B> = - upstream.getPushEvent(evalScope).map { evalScope.transform(it) } + override suspend fun getPushEvent(evalScope: EvalScope): B = + evalScope.transform(upstream.getPushEvent(evalScope)) } internal inline fun <A, B> mapImpl( @@ -46,9 +44,8 @@ internal inline fun <A, B> mapImpl( } } -internal class CachedNode<A>(val key: Key<Deferred<Maybe<A>>>, val upstream: PullNode<A>) : - PullNode<A> { - override suspend fun getPushEvent(evalScope: EvalScope): Maybe<A> { +internal class CachedNode<A>(val key: Key<Deferred<A>>, val upstream: PullNode<A>) : PullNode<A> { + override suspend fun getPushEvent(evalScope: EvalScope): A { val deferred = evalScope.transactionStore.getOrPut(key) { evalScope.deferAsync(CoroutineStart.LAZY) { upstream.getPushEvent(evalScope) } @@ -58,7 +55,7 @@ internal class CachedNode<A>(val key: Key<Deferred<Maybe<A>>>, val upstream: Pul } internal fun <A> TFlowImpl<A>.cached(): TFlowImpl<A> { - val key = object : Key<Deferred<Maybe<A>>> {} + val key = object : Key<Deferred<A>> {} return TFlowCheap { activate(this, it)?.let { (connection, needsEval) -> ActivationResult( diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt index baf4101d52ef..06b5b1690391 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt @@ -78,7 +78,13 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: val changes = this@toTStateInternalDeferred val name = operatorName val impl = - mkState(name, operatorName, evalScope, { changes.init.connect(evalScope = this) }, init) + activatedTStateSource( + name, + operatorName, + evalScope, + { changes.init.connect(evalScope = this) }, + init, + ) return TStateInit(constInit(name, impl)) } @@ -102,21 +108,22 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: constInit( name, switchDeferredImpl( - getStorage = { - storage.init - .connect(this) - .getCurrentWithEpoch(this) - .first - .mapValuesParallel { (_, flow) -> flow.init.connect(this) } - }, - getPatches = { - mapImpl({ init.connect(this) }) { patch -> - patch.mapValuesParallel { (_, m) -> - m.map { flow -> flow.init.connect(this) } + getStorage = { + storage.init + .connect(this) + .getCurrentWithEpoch(this) + .first + .mapValuesParallel { (_, flow) -> flow.init.connect(this) } + }, + getPatches = { + mapImpl({ init.connect(this) }) { patch -> + patch.mapValuesParallel { (_, m) -> + m.map { flow -> flow.init.connect(this) } + } } - } - }, - ), + }, + ) + .awaitValues(), ) ) } @@ -129,21 +136,22 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: constInit( name, switchPromptImpl( - getStorage = { - storage.init - .connect(this) - .getCurrentWithEpoch(this) - .first - .mapValuesParallel { (_, flow) -> flow.init.connect(this) } - }, - getPatches = { - mapImpl({ init.connect(this) }) { patch -> - patch.mapValuesParallel { (_, m) -> - m.map { flow -> flow.init.connect(this) } + getStorage = { + storage.init + .connect(this) + .getCurrentWithEpoch(this) + .first + .mapValuesParallel { (_, flow) -> flow.init.connect(this) } + }, + getPatches = { + mapImpl({ init.connect(this) }) { patch -> + patch.mapValuesParallel { (_, m) -> + m.map { flow -> flow.init.connect(this) } + } } - } - }, - ), + }, + ) + .awaitValues(), ) ) } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt index b904b48f7f9c..784a2afe0992 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt @@ -16,8 +16,6 @@ package com.android.systemui.kairos.internal -import com.android.systemui.kairos.util.Maybe - /* Initialized TFlow */ internal fun interface TFlowImpl<out A> { suspend fun activate(evalScope: EvalScope, downstream: Schedulable): ActivationResult<A>? @@ -41,9 +39,8 @@ internal data class NodeConnection<out A>( val schedulerUpstream: PushNode<*>, ) -internal suspend fun <A> NodeConnection<A>.hasCurrentValue( - transactionStore: TransactionStore -): Boolean = schedulerUpstream.hasCurrentValue(transactionStore) +internal suspend fun <A> NodeConnection<A>.hasCurrentValue(evalScope: EvalScope): Boolean = + schedulerUpstream.hasCurrentValue(evalScope) internal suspend fun <A> NodeConnection<A>.removeDownstreamAndDeactivateIfNeeded( downstream: Schedulable @@ -55,7 +52,7 @@ internal suspend fun <A> NodeConnection<A>.scheduleDeactivationIfNeeded(evalScop internal suspend fun <A> NodeConnection<A>.removeDownstream(downstream: Schedulable) = schedulerUpstream.removeDownstream(downstream) -internal suspend fun <A> NodeConnection<A>.getPushEvent(evalScope: EvalScope): Maybe<A> = +internal suspend fun <A> NodeConnection<A>.getPushEvent(evalScope: EvalScope): A = directUpstream.getPushEvent(evalScope) internal val <A> NodeConnection<A>.depthTracker: DepthTracker diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt index c68b4c366776..c4a26a33e24d 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt @@ -20,7 +20,6 @@ import com.android.systemui.kairos.internal.util.Key import com.android.systemui.kairos.internal.util.associateByIndex import com.android.systemui.kairos.internal.util.hashString import com.android.systemui.kairos.internal.util.mapValuesParallel -import com.android.systemui.kairos.util.Just import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.none @@ -106,11 +105,8 @@ internal class TStateSource<A>( /** called by network after eval phase has completed */ suspend fun updateState(evalScope: EvalScope) { // write the latch - val eventResult = upstreamConnection.getPushEvent(evalScope) - if (eventResult is Just) { - _current = CompletableDeferred(eventResult.value) - writeEpoch = evalScope.epoch - } + _current = CompletableDeferred(upstreamConnection.getPushEvent(evalScope)) + writeEpoch = evalScope.epoch } override fun toString(): String = "TStateImpl(changes=$changes, current=$_current)" @@ -123,7 +119,7 @@ internal class TStateSource<A>( internal fun <A> constS(name: String?, operatorName: String, init: A): TStateImpl<A> = TStateSource(name, operatorName, init, neverImpl) -internal inline fun <A> mkState( +internal inline fun <A> activatedTStateSource( name: String?, operatorName: String, evalScope: EvalScope, @@ -132,8 +128,7 @@ internal inline fun <A> mkState( ): TStateImpl<A> { lateinit var state: TStateSource<A> val calm: TFlowImpl<A> = - filterNode(getChanges) { new -> new != state.getCurrentWithEpoch(evalScope = this).first } - .cached() + filterImpl(getChanges) { new -> new != state.getCurrentWithEpoch(evalScope = this).first } return TStateSource(name, operatorName, init, calm).also { state = it evalScope.scheduleOutput( @@ -153,7 +148,7 @@ internal inline fun <A> mkState( private inline fun <A> TFlowImpl<A>.calm( crossinline getState: () -> TStateDerived<A> ): TFlowImpl<A> = - filterNode({ this@calm }) { new -> + filterImpl({ this@calm }) { new -> val state = getState() val (current, _) = state.getCurrentWithEpoch(evalScope = this) if (new != current) { @@ -237,7 +232,7 @@ internal fun <A> TStateImpl<TStateImpl<A>>.flatten(name: String?, operator: Stri getPatches = { mapImpl({ innerChanges }) { new -> mapOf(Unit to just(new)) } }, ) }) { map -> - map.getValue(Unit) + map.getValue(Unit).getPushEvent(this) } lateinit var state: DerivedFlatten<A> state = DerivedFlatten(name, operator, this, switchedChanges.calm { state }) @@ -352,7 +347,7 @@ internal fun <K : Any, A> zipStates( states .mapValues { (k, v) -> if (k in patch) { - patch.getValue(k) + patch.getValue(k).getPushEvent(this) } else { v.getCurrentWithEpoch(evalScope = this).first } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt index 5cee2dd5880a..33709a97da8f 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt @@ -37,6 +37,17 @@ internal class HeteroMap { store[key] = value ?: NULL } + @Suppress("UNCHECKED_CAST") + fun <A : Any> getOrNull(key: Key<A>): A? = + store[key]?.let { (if (it === NULL) null else it) as A } + + @Suppress("UNCHECKED_CAST") + fun <A> getOrError(key: Key<A>, block: () -> String): A { + store[key]?.let { + return (if (it === NULL) null else it) as A + } ?: error(block()) + } + operator fun contains(key: Key<*>): Boolean = store.containsKey(key) fun clear() { diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/Maybe.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/Maybe.kt index c3cae3885bd3..eef6cbff545b 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/Maybe.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/Maybe.kt @@ -230,8 +230,14 @@ fun <A> Maybe<A>.mergeWith(other: Maybe<A>, transform: (A, A) -> A): Maybe<A> = * Returns a list containing only the present results of applying [transform] to each element in the * original iterable. */ -fun <A, B> Iterable<A>.mapMaybe(transform: (A) -> Maybe<B>): List<B> = - asSequence().mapMaybe(transform).toList() +inline fun <A, B> Iterable<A>.mapMaybe(transform: (A) -> Maybe<B>): List<B> = buildList { + for (a in this@mapMaybe) { + val result = transform(a) + if (result is Just) { + add(result.value) + } + } +} /** * Returns a sequence containing only the present results of applying [transform] to each element in @@ -244,9 +250,15 @@ fun <A, B> Sequence<A>.mapMaybe(transform: (A) -> Maybe<B>): Sequence<B> = * Returns a map with values of only the present results of applying [transform] to each entry in * the original map. */ -inline fun <K, A, B> Map<K, A>.mapMaybeValues( - crossinline p: (Map.Entry<K, A>) -> Maybe<B> -): Map<K, B> = asSequence().mapMaybe { entry -> p(entry).map { entry.key to it } }.toMap() +inline fun <K, A, B> Map<K, A>.mapMaybeValues(transform: (Map.Entry<K, A>) -> Maybe<B>): Map<K, B> = + buildMap { + for (entry in this@mapMaybeValues) { + val result = transform(entry) + if (result is Just) { + put(entry.key, result.value) + } + } + } /** Returns a map with all non-present values filtered out. */ fun <K, A> Map<K, Maybe<A>>.filterJustValues(): Map<K, A> = |