diff options
author | 2024-12-04 15:08:42 -0500 | |
---|---|---|
committer | 2025-01-03 15:58:37 -0500 | |
commit | eb057d3eb942520b2e6b0fb5c5d910cd9765a37b (patch) | |
tree | 084eea666efccdf65d043578698b49356ae01d5c | |
parent | cdff97f1f5b958a8d0a3977b3d62dcd12a89b49e (diff) |
[kairos] introduce Incremental type
This type is a specialization of State<Map<K, V>> that models
"incremental" updates to the Map, via a new `MapPatch<K, V>` alias.
There are situations where it is useful to react to only what has
changed between two states; normally this is accomplished by diffing the
two states, but Incremental allows you to avoid performing the diff and
instead operating on the already-known "patch".
Flag: EXEMPT unused
Test: atest kairos-tests
Change-Id: I2829ffc149fb2e1888a11003908068e0ccc8fb35
20 files changed, 898 insertions, 554 deletions
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt index 3cba163dcb5f..2482c262e1d1 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt @@ -444,6 +444,13 @@ interface BuildScope : StateScope { ): Pair<Events<Map<K, Maybe<A>>>, DeferredValue<Map<K, B>>> = applyLatestSpecForKey(deferredOf(initialSpecs), numKeys) + fun <K, V> Incremental<K, BuildSpec<V>>.applyLatestSpecForKey( + numKeys: Int? = null + ): Incremental<K, V> { + val (events, initial) = updates.applyLatestSpecForKey(sampleDeferred(), numKeys) + return events.foldStateMapIncrementally(initial) + } + /** * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the * original [Events]. @@ -455,10 +462,10 @@ interface BuildScope : StateScope { * If the [Maybe] contained within the value for an associated key is [none], then the * previously-active [BuildSpec] will be undone with no replacement. */ - fun <K, A> Events<Map<K, Maybe<BuildSpec<A>>>>.applyLatestSpecForKey( + fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.applyLatestSpecForKey( numKeys: Int? = null - ): Events<Map<K, Maybe<A>>> = - applyLatestSpecForKey<K, A, Nothing>(deferredOf(emptyMap()), numKeys).first + ): Events<Map<K, Maybe<V>>> = + applyLatestSpecForKey<K, V, Nothing>(deferredOf(emptyMap()), numKeys).first /** * Returns a [State] containing the latest results of applying each [BuildSpec] emitted from the @@ -471,10 +478,10 @@ interface BuildScope : StateScope { * If the [Maybe] contained within the value for an associated key is [none], then the * previously-active [BuildSpec] will be undone with no replacement. */ - fun <K, A> Events<Map<K, Maybe<BuildSpec<A>>>>.holdLatestSpecForKey( - initialSpecs: DeferredValue<Map<K, BuildSpec<A>>>, + fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.holdLatestSpecForKey( + initialSpecs: DeferredValue<Map<K, BuildSpec<V>>>, numKeys: Int? = null, - ): State<Map<K, A>> { + ): Incremental<K, V> { val (changes, initialValues) = applyLatestSpecForKey(initialSpecs, numKeys) return changes.foldStateMapIncrementally(initialValues) } @@ -490,10 +497,10 @@ interface BuildScope : StateScope { * If the [Maybe] contained within the value for an associated key is [none], then the * previously-active [BuildSpec] will be undone with no replacement. */ - fun <K, A> Events<Map<K, Maybe<BuildSpec<A>>>>.holdLatestSpecForKey( - initialSpecs: Map<K, BuildSpec<A>> = emptyMap(), + fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.holdLatestSpecForKey( + initialSpecs: Map<K, BuildSpec<V>> = emptyMap(), numKeys: Int? = null, - ): State<Map<K, A>> = holdLatestSpecForKey(deferredOf(initialSpecs), numKeys) + ): Incremental<K, V> = holdLatestSpecForKey(deferredOf(initialSpecs), numKeys) /** * Returns an [Events] containing the results of applying [transform] to each value of the diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt index a26d5f8f122e..c20864648f00 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Combinators.kt @@ -59,7 +59,7 @@ fun <A, B, C> Events<A>.samplePromptly( state: State<B>, transform: TransactionScope.(A, B) -> C, ): Events<C> = - sample(state) { a, b -> These.thiz<Pair<A, B>, B>(a to b) } + sample(state) { a, b -> These.thiz(a to b) } .mergeWith(state.changes.map { These.that(it) }) { thiz, that -> These.both((thiz as These.This).thiz, (that as These.That).that) } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Events.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Events.kt index bd9b45d3be4c..88d0e2d0830d 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Events.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Events.kt @@ -31,7 +31,6 @@ import com.android.systemui.kairos.internal.demuxMap import com.android.systemui.kairos.internal.filterImpl import com.android.systemui.kairos.internal.filterJustImpl import com.android.systemui.kairos.internal.init -import com.android.systemui.kairos.internal.map import com.android.systemui.kairos.internal.mapImpl import com.android.systemui.kairos.internal.mergeNodes import com.android.systemui.kairos.internal.mergeNodesLeft @@ -44,7 +43,6 @@ import com.android.systemui.kairos.util.Left import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.Right import com.android.systemui.kairos.util.just -import com.android.systemui.kairos.util.map import com.android.systemui.kairos.util.toMaybe import java.util.concurrent.atomic.AtomicReference import kotlin.reflect.KProperty @@ -58,11 +56,11 @@ import kotlinx.coroutines.coroutineScope sealed class Events<out A> { companion object { /** An [Events] with no values. */ - val empty: Events<Nothing> = EmptyFlow + val empty: Events<Nothing> = EmptyEvents } } -/** AN [Events] with no values. */ +/** An [Events] with no values. */ @ExperimentalKairosApi val emptyEvents: Events<Nothing> = Events.empty /** @@ -82,7 +80,7 @@ class EventsLoop<A> : Events<A>() { var loopback: Events<A>? = null set(value) { value?.let { - check(!deferred.isInitialized()) { "TFlowLoop.loopback has already been set." } + check(!deferred.isInitialized()) { "EventsLoop.loopback has already been set." } deferred.setValue(value) field = value } @@ -441,7 +439,7 @@ class GroupedEvents<in K, out A> internal constructor(internal val impl: DemuxIm @ExperimentalKairosApi fun <A> State<Events<A>>.switchEvents(name: String? = null): Events<A> { val patches = - mapImpl({ init.connect(this).changes }) { newFlow, _ -> newFlow.init.connect(this) } + mapImpl({ init.connect(this).changes }) { newEvents, _ -> newEvents.init.connect(this) } return EventsInit( constInit( name = null, @@ -467,7 +465,7 @@ fun <A> State<Events<A>>.switchEvents(name: String? = null): Events<A> { @ExperimentalKairosApi fun <A> State<Events<A>>.switchEventsPromptly(): Events<A> { val patches = - mapImpl({ init.connect(this).changes }) { newFlow, _ -> newFlow.init.connect(this) } + mapImpl({ init.connect(this).changes }) { newEvents, _ -> newEvents.init.connect(this) } return EventsInit( constInit( name = null, @@ -557,7 +555,7 @@ internal constructor(internal val network: Network, internal val impl: InputNode } } -private data object EmptyFlow : Events<Nothing>() +private data object EmptyEvents : Events<Nothing>() internal class EventsInit<out A>(val init: Init<EventsImpl<A>>) : Events<A>() { override fun toString(): String = "${this::class.simpleName}@$hashString" @@ -566,7 +564,7 @@ internal class EventsInit<out A>(val init: Init<EventsImpl<A>>) : Events<A>() { internal val <A> Events<A>.init: Init<EventsImpl<A>> get() = when (this) { - is EmptyFlow -> constInit("EmptyFlow", neverImpl) + is EmptyEvents -> constInit("EmptyEvents", neverImpl) is EventsInit -> init is EventsLoop -> init is CoalescingMutableEvents<*, A> -> constInit(name, impl.activated()) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Incremental.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Incremental.kt new file mode 100644 index 000000000000..c95b9e83594f --- /dev/null +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Incremental.kt @@ -0,0 +1,297 @@ +/* + * Copyright (C) 2024 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.android.systemui.kairos + +import com.android.systemui.kairos.internal.CompletableLazy +import com.android.systemui.kairos.internal.IncrementalImpl +import com.android.systemui.kairos.internal.Init +import com.android.systemui.kairos.internal.InitScope +import com.android.systemui.kairos.internal.NoScope +import com.android.systemui.kairos.internal.awaitValues +import com.android.systemui.kairos.internal.constIncremental +import com.android.systemui.kairos.internal.constInit +import com.android.systemui.kairos.internal.init +import com.android.systemui.kairos.internal.mapImpl +import com.android.systemui.kairos.internal.mapValuesImpl +import com.android.systemui.kairos.internal.store.ConcurrentHashMapK +import com.android.systemui.kairos.internal.switchDeferredImpl +import com.android.systemui.kairos.internal.switchPromptImpl +import com.android.systemui.kairos.internal.util.hashString +import com.android.systemui.kairos.util.MapPatch +import com.android.systemui.kairos.util.map +import com.android.systemui.kairos.util.mapPatchFromFullDiff +import kotlin.reflect.KProperty + +/** A [State] tracking a [Map] that receives incremental updates. */ +sealed class Incremental<K, out V> : State<Map<K, V>>() { + abstract override val init: Init<IncrementalImpl<K, V>> +} + +/** An [Incremental] that never changes. */ +@ExperimentalKairosApi +fun <K, V> incrementalOf(value: Map<K, V>): Incremental<K, V> { + val operatorName = "stateOf" + val name = "$operatorName($value)" + return IncrementalInit(constInit(name, constIncremental(name, operatorName, value))) +} + +/** + * Returns an [Incremental] that acts as a deferred-reference to the [Incremental] produced by this + * [Lazy]. + * + * When the returned [Incremental] is accessed by the Kairos network, the [Lazy]'s + * [value][Lazy.value] will be queried and used. + * + * Useful for recursive definitions. + */ +@ExperimentalKairosApi +fun <K, V> Lazy<Incremental<K, V>>.defer(): Incremental<K, V> = deferInline { value } + +/** + * Returns an [Incremental] that acts as a deferred-reference to the [Incremental] produced by this + * [DeferredValue]. + * + * When the returned [Incremental] is accessed by the Kairos network, the [DeferredValue] will be + * queried and used. + * + * Useful for recursive definitions. + */ +@ExperimentalKairosApi +fun <K, V> DeferredValue<Incremental<K, V>>.defer(): Incremental<K, V> = deferInline { + unwrapped.value +} + +/** + * Returns an [Incremental] that acts as a deferred-reference to the [Incremental] produced by + * [block]. + * + * When the returned [Incremental] is accessed by the Kairos network, [block] will be invoked and + * the returned [Incremental] will be used. + * + * Useful for recursive definitions. + */ +@ExperimentalKairosApi +fun <K, V> deferredIncremental(block: KairosScope.() -> Incremental<K, V>): Incremental<K, V> = + deferInline { + NoScope.block() + } + +/** + * An [Events] that emits every time this [Incremental] changes, containing the subset of the map + * that has changed. + * + * @see MapPatch + */ +val <K, V> Incremental<K, V>.updates: Events<MapPatch<K, V>> + get() = EventsInit(init("patches") { init.connect(this).patches }) + +internal class IncrementalInit<K, V>(override val init: Init<IncrementalImpl<K, V>>) : + Incremental<K, V>() + +/** + * Returns an [Incremental] that tracks the entries of the original incremental, but values replaced + * with those obtained by applying [transform] to each original entry. + */ +fun <K, V, U> Incremental<K, V>.mapValues( + transform: KairosScope.(Map.Entry<K, V>) -> U +): Incremental<K, U> { + val operatorName = "mapValues" + val name = operatorName + return IncrementalInit( + init(name) { + mapValuesImpl({ init.connect(this) }, name, operatorName) { NoScope.transform(it) } + } + ) +} + +/** + * Returns an [Events] that emits from a merged, incrementally-accumulated collection of [Events] + * emitted from this, following the same "patch" rules as outlined in + * [StateScope.foldStateMapIncrementally]. + * + * Conceptually this is equivalent to: + * ```kotlin + * fun <K, V> State<Map<K, V>>.mergeEventsIncrementally(): Events<Map<K, V>> = + * map { it.merge() }.switchEvents() + * ``` + * + * While the behavior is equivalent to the conceptual definition above, the implementation is + * significantly more efficient. + * + * @see merge + */ +fun <K, V> Incremental<K, Events<V>>.mergeEventsIncrementally(): Events<Map<K, V>> { + val operatorName = "mergeEventsIncrementally" + val name = operatorName + val patches = + mapImpl({ init.connect(this).patches }) { patch, _ -> + patch.mapValues { (_, m) -> m.map { events -> events.init.connect(this) } }.asIterable() + } + return EventsInit( + constInit( + name, + switchDeferredImpl( + name = name, + getStorage = { + init + .connect(this) + .getCurrentWithEpoch(this) + .first + .mapValues { (_, events) -> events.init.connect(this) } + .asIterable() + }, + getPatches = { patches }, + storeFactory = ConcurrentHashMapK.Factory(), + ) + .awaitValues(), + ) + ) +} + +/** + * Returns an [Events] that emits from a merged, incrementally-accumulated collection of [Events] + * emitted from this, following the same "patch" rules as outlined in + * [StateScope.foldStateMapIncrementally]. + * + * Conceptually this is equivalent to: + * ```kotlin + * fun <K, V> State<Map<K, V>>.mergeEventsIncrementallyPromptly(): Events<Map<K, V>> = + * map { it.merge() }.switchEventsPromptly() + * ``` + * + * While the behavior is equivalent to the conceptual definition above, the implementation is + * significantly more efficient. + * + * @see merge + */ +fun <K, V> Incremental<K, Events<V>>.mergeEventsIncrementallyPromptly(): Events<Map<K, V>> { + val operatorName = "mergeEventsIncrementally" + val name = operatorName + val patches = + mapImpl({ init.connect(this).patches }) { patch, _ -> + patch.mapValues { (_, m) -> m.map { events -> events.init.connect(this) } }.asIterable() + } + return EventsInit( + constInit( + name, + switchPromptImpl( + name = name, + getStorage = { + init + .connect(this) + .getCurrentWithEpoch(this) + .first + .mapValues { (_, events) -> events.init.connect(this) } + .asIterable() + }, + getPatches = { patches }, + storeFactory = ConcurrentHashMapK.Factory(), + ) + .awaitValues(), + ) + ) +} + +/** A forward-reference to an [Incremental], allowing for recursive definitions. */ +@ExperimentalKairosApi +class IncrementalLoop<K, V>(private val name: String? = null) : Incremental<K, V>() { + + private val deferred = CompletableLazy<Incremental<K, V>>(name = name) + + override val init: Init<IncrementalImpl<K, V>> = + init(name) { deferred.value.init.connect(evalScope = this) } + + /** The [Incremental] this [IncrementalLoop] will forward to. */ + var loopback: Incremental<K, V>? = null + set(value) { + value?.let { + check(!deferred.isInitialized()) { + "IncrementalLoop($name).loopback has already been set." + } + deferred.setValue(value) + field = value + } + } + + operator fun getValue(thisRef: Any?, property: KProperty<*>): Incremental<K, V> = this + + operator fun setValue(thisRef: Any?, property: KProperty<*>, value: Incremental<K, V>) { + loopback = value + } + + override fun toString(): String = "${this::class.simpleName}($name)@$hashString" +} + +/** + * Returns an [Incremental] whose [updates] are calculated by diffing the given [State]'s + * [transitions]. + */ +fun <K, V> State<Map<K, V>>.asIncremental(): Incremental<K, V> { + if (this is Incremental<K, V>) return this + + val hashState = map { if (it is HashMap) it else HashMap(it) } + + val patches = + transitions.mapNotNull { (old, new) -> + mapPatchFromFullDiff(old, new).takeIf { it.isNotEmpty() } + } + + return IncrementalInit( + init("asIncremental") { + val upstream = hashState.init.connect(this) + IncrementalImpl( + upstream.name, + upstream.operatorName, + upstream.changes, + patches.init.connect(this), + upstream.store, + ) + } + ) +} + +/** Returns an [Incremental] that acts like the current value of the given [State]. */ +fun <K, V> State<Incremental<K, V>>.switchIncremental(): Incremental<K, V> { + val stateChangePatches = + transitions.mapNotNull { (old, new) -> + mapPatchFromFullDiff(old.sample(), new.sample()).takeIf { it.isNotEmpty() } + } + val innerChanges = + map { inner -> + merge(stateChangePatches, inner.updates) { switchPatch, upcomingPatch -> + switchPatch + upcomingPatch + } + } + .switchEventsPromptly() + val flattened = flatten() + return IncrementalInit( + init("switchIncremental") { + val upstream = flattened.init.connect(this) + IncrementalImpl( + "switchIncremental", + "switchIncremental", + upstream.changes, + innerChanges.init.connect(this), + upstream.store, + ) + } + ) +} + +private inline fun <K, V> deferInline( + crossinline block: InitScope.() -> Incremental<K, V> +): Incremental<K, V> = IncrementalInit(init(name = null) { block().init.connect(evalScope = this) }) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/State.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/State.kt index 08b27c86c9b9..1f0a19d5752b 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/State.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/State.kt @@ -31,11 +31,11 @@ import com.android.systemui.kairos.internal.cached import com.android.systemui.kairos.internal.constInit import com.android.systemui.kairos.internal.constState import com.android.systemui.kairos.internal.filterImpl -import com.android.systemui.kairos.internal.flatMap +import com.android.systemui.kairos.internal.flatMapStateImpl import com.android.systemui.kairos.internal.init -import com.android.systemui.kairos.internal.map -import com.android.systemui.kairos.internal.mapCheap import com.android.systemui.kairos.internal.mapImpl +import com.android.systemui.kairos.internal.mapStateImpl +import com.android.systemui.kairos.internal.mapStateImplCheap import com.android.systemui.kairos.internal.util.hashString import com.android.systemui.kairos.internal.zipStateMap import com.android.systemui.kairos.internal.zipStates @@ -45,7 +45,10 @@ import kotlin.reflect.KProperty * A time-varying value with discrete changes. Essentially, a combination of a [Transactional] that * holds a value, and an [Events] that emits when the value changes. */ -@ExperimentalKairosApi sealed class State<out A> +@ExperimentalKairosApi +sealed class State<out A> { + internal abstract val init: Init<StateImpl<A>> +} /** A [State] that never changes. */ @ExperimentalKairosApi @@ -98,7 +101,7 @@ fun <A, B> State<A>.map(transform: KairosScope.(A) -> B): State<B> { val name = operatorName return StateInit( init(name) { - init.connect(evalScope = this).map(name, operatorName) { NoScope.transform(it) } + mapStateImpl({ init.connect(this) }, name, operatorName) { NoScope.transform(it) } } ) } @@ -117,9 +120,7 @@ fun <A, B> State<A>.mapCheapUnsafe(transform: KairosScope.(A) -> B): State<B> { val operatorName = "map" val name = operatorName return StateInit( - init(name) { - init.connect(evalScope = this).mapCheap(name, operatorName) { NoScope.transform(it) } - } + init(name) { mapStateImplCheap(init, name, operatorName) { NoScope.transform(it) } } ) } @@ -160,7 +161,13 @@ fun <A> Iterable<State<A>>.combine(): State<List<A>> { val name = operatorName return StateInit( init(name) { - zipStates(name, operatorName, states = map { it.init.connect(evalScope = this) }) + val states = map { it.init } + zipStates( + name, + operatorName, + states.size, + states = init(null) { states.map { it.connect(this) } }, + ) } ) } @@ -179,7 +186,8 @@ fun <K, A> Map<K, State<A>>.combine(): State<Map<K, A>> { zipStateMap( name, operatorName, - states = mapValues { it.value.init.connect(evalScope = this) }, + size, + states = init(null) { mapValues { it.value.init.connect(evalScope = this) } }, ) } ) @@ -229,9 +237,9 @@ fun <A, B, Z> combine( val name = operatorName return StateInit( init(name) { - val dl1 = stateA.init.connect(evalScope = this@init) - val dl2 = stateB.init.connect(evalScope = this@init) - zipStates(name, operatorName, dl1, dl2) { a, b -> NoScope.transform(a, b) } + zipStates(name, operatorName, stateA.init, stateB.init) { a, b -> + NoScope.transform(a, b) + } } ) } @@ -253,10 +261,9 @@ fun <A, B, C, Z> combine( val name = operatorName return StateInit( init(name) { - val dl1 = stateA.init.connect(evalScope = this@init) - val dl2 = stateB.init.connect(evalScope = this@init) - val dl3 = stateC.init.connect(evalScope = this@init) - zipStates(name, operatorName, dl1, dl2, dl3) { a, b, c -> NoScope.transform(a, b, c) } + zipStates(name, operatorName, stateA.init, stateB.init, stateC.init) { a, b, c -> + NoScope.transform(a, b, c) + } } ) } @@ -279,11 +286,11 @@ fun <A, B, C, D, Z> combine( val name = operatorName return StateInit( init(name) { - val dl1 = stateA.init.connect(evalScope = this@init) - val dl2 = stateB.init.connect(evalScope = this@init) - val dl3 = stateC.init.connect(evalScope = this@init) - val dl4 = stateD.init.connect(evalScope = this@init) - zipStates(name, operatorName, dl1, dl2, dl3, dl4) { a, b, c, d -> + zipStates(name, operatorName, stateA.init, stateB.init, stateC.init, stateD.init) { + a, + b, + c, + d -> NoScope.transform(a, b, c, d) } } @@ -309,12 +316,15 @@ fun <A, B, C, D, E, Z> combine( val name = operatorName return StateInit( init(name) { - val dl1 = stateA.init.connect(evalScope = this@init) - val dl2 = stateB.init.connect(evalScope = this@init) - val dl3 = stateC.init.connect(evalScope = this@init) - val dl4 = stateD.init.connect(evalScope = this@init) - val dl5 = stateE.init.connect(evalScope = this@init) - zipStates(name, operatorName, dl1, dl2, dl3, dl4, dl5) { a, b, c, d, e -> + zipStates( + name, + operatorName, + stateA.init, + stateB.init, + stateC.init, + stateD.init, + stateE.init, + ) { a, b, c, d, e -> NoScope.transform(a, b, c, d, e) } } @@ -328,7 +338,7 @@ fun <A, B> State<A>.flatMap(transform: KairosScope.(A) -> State<B>): State<B> { val name = operatorName return StateInit( init(name) { - init.connect(this).flatMap(name, operatorName) { a -> + flatMapStateImpl({ init.connect(this) }, name, operatorName) { a -> NoScope.transform(a).init.connect(this) } } @@ -392,14 +402,12 @@ internal constructor( val name = "$operatorName[$value]" return StateInit( init(name) { - DerivedMapCheap( + StateImpl( name, operatorName, - upstream = upstream.init.connect(evalScope = this), - changes = groupedChanges.impl.eventsForKey(value), - ) { - it == value - } + groupedChanges.impl.eventsForKey(value), + DerivedMapCheap(upstream.init) { it == value }, + ) } ) } @@ -430,18 +438,20 @@ class MutableState<T> internal constructor(internal val network: Network, initia getInitialValue = { null }, ) + override val init: Init<StateImpl<T>> + get() = state.init + internal val state = run { val changes = input.impl val name = null val operatorName = "MutableState" - lateinit var state: StateSource<T> + val state: StateSource<T> = StateSource(initialValue) val mapImpl = mapImpl(upstream = { changes.activated() }) { it, _ -> it!!.value } val calm: EventsImpl<T> = filterImpl({ mapImpl }) { new -> new != state.getCurrentWithEpoch(evalScope = this).first } .cached() - state = StateSource(name, operatorName, initialValue, calm) @Suppress("DeferredResultUnused") network.transaction("MutableState.init") { calm.activate(evalScope = this, downstream = Schedulable.S(state))?.let { @@ -452,7 +462,7 @@ class MutableState<T> internal constructor(internal val network: Network, initia } } } - StateInit(constInit(name, state)) + StateInit(constInit(name, StateImpl(name, operatorName, calm, state))) } /** @@ -489,7 +499,7 @@ class StateLoop<A> : State<A>() { private val deferred = CompletableLazy<State<A>>() - internal val init: Init<StateImpl<A>> = + override val init: Init<StateImpl<A>> = init(name) { deferred.value.init.connect(evalScope = this) } /** The [State] this [StateLoop] will forward to. */ @@ -511,18 +521,10 @@ class StateLoop<A> : State<A>() { override fun toString(): String = "${this::class.simpleName}@$hashString" } -internal class StateInit<A> internal constructor(internal val init: Init<StateImpl<A>>) : +internal class StateInit<A> internal constructor(override val init: Init<StateImpl<A>>) : State<A>() { override fun toString(): String = "${this::class.simpleName}@$hashString" } -internal val <A> State<A>.init: Init<StateImpl<A>> - get() = - when (this) { - is StateInit -> init - is StateLoop -> init - is MutableState -> state.init - } - private inline fun <A> deferInline(crossinline block: InitScope.() -> State<A>): State<A> = StateInit(init(name = null) { block().init.connect(evalScope = this) }) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/StateScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/StateScope.kt index b1f48bb1ce56..f6a8a0ac2a09 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/StateScope.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/StateScope.kt @@ -17,14 +17,12 @@ package com.android.systemui.kairos import com.android.systemui.kairos.util.Just -import com.android.systemui.kairos.util.Left import com.android.systemui.kairos.util.Maybe -import com.android.systemui.kairos.util.Right import com.android.systemui.kairos.util.WithPrev import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.map +import com.android.systemui.kairos.util.mapMaybeValues import com.android.systemui.kairos.util.none -import com.android.systemui.kairos.util.partitionEithers import com.android.systemui.kairos.util.zipWith // TODO: caching story? should each Scope have a cache of applied Stateful instances? @@ -70,6 +68,19 @@ interface StateScope : TransactionScope { fun <A> Events<A>.holdStateDeferred(initialValue: DeferredValue<A>): State<A> /** + * Returns a [State] holding a [Map] that is updated incrementally whenever this emits a value. + * + * The value emitted is used as a "patch" for the tracked [Map]; for each key [K] in the emitted + * map, an associated value of [Just] will insert or replace the value in the tracked [Map], and + * an associated value of [none] will remove the key from the tracked [Map]. + */ + fun <K, V> Events<Map<K, Maybe<V>>>.foldStateMapIncrementally( + initialValues: DeferredValue<Map<K, V>> + ): Incremental<K, V> + + // TODO: everything below this comment can be made into extensions once we have context params + + /** * Returns an [Events] that emits from a merged, incrementally-accumulated collection of * [Events] emitted from this, following the same "patch" rules as outlined in * [foldStateMapIncrementally]. @@ -90,7 +101,7 @@ interface StateScope : TransactionScope { fun <K, V> Events<Map<K, Maybe<Events<V>>>>.mergeIncrementally( name: String? = null, initialEvents: DeferredValue<Map<K, Events<V>>>, - ): Events<Map<K, V>> + ): Events<Map<K, V>> = foldStateMapIncrementally(initialEvents).mergeEventsIncrementally() /** * Returns an [Events] that emits from a merged, incrementally-accumulated collection of @@ -113,9 +124,8 @@ interface StateScope : TransactionScope { fun <K, V> Events<Map<K, Maybe<Events<V>>>>.mergeIncrementallyPromptly( initialEvents: DeferredValue<Map<K, Events<V>>>, name: String? = null, - ): Events<Map<K, V>> - - // TODO: everything below this comment can be made into extensions once we have context params + ): Events<Map<K, V>> = + foldStateMapIncrementally(initialEvents).mergeEventsIncrementallyPromptly() /** * Returns an [Events] that emits from a merged, incrementally-accumulated collection of @@ -321,6 +331,13 @@ interface StateScope : TransactionScope { ): Pair<Events<Map<K, Maybe<A>>>, DeferredValue<Map<K, B>>> = applyLatestStatefulForKey(deferredOf(init), numKeys) + fun <K, V> Incremental<K, Stateful<V>>.applyLatestStatefulForKey( + numKeys: Int? = null + ): Incremental<K, V> { + val (events, init) = updates.applyLatestStatefulForKey(sampleDeferred()) + return events.foldStateMapIncrementally(init) + } + /** * Returns a [State] containing the latest results of applying each [Stateful] emitted from the * original [Events]. @@ -334,7 +351,7 @@ interface StateScope : TransactionScope { fun <K, A> Events<Map<K, Maybe<Stateful<A>>>>.holdLatestStatefulForKey( init: DeferredValue<Map<K, Stateful<A>>>, numKeys: Int? = null, - ): State<Map<K, A>> { + ): Incremental<K, A> { val (changes, initialValues) = applyLatestStatefulForKey(init, numKeys) return changes.foldStateMapIncrementally(initialValues) } @@ -352,11 +369,11 @@ interface StateScope : TransactionScope { fun <K, A> Events<Map<K, Maybe<Stateful<A>>>>.holdLatestStatefulForKey( init: Map<K, Stateful<A>> = emptyMap(), numKeys: Int? = null, - ): State<Map<K, A>> = holdLatestStatefulForKey(deferredOf(init), numKeys) + ): Incremental<K, A> = holdLatestStatefulForKey(deferredOf(init), numKeys) /** * Returns an [Events] containing the results of applying each [Stateful] emitted from the - * original [Events], and a [DeferredValue] containing the result of applying [init] + * original [Events], and a [DeferredValue] containing the result of applying [stateInit] * immediately. * * When each [Stateful] is applied, state accumulation from the previously-active [Stateful] @@ -537,7 +554,7 @@ interface StateScope : TransactionScope { * Shorthand for: * ```kotlin * val (changes, initApplied) = applyLatestStateful(init) - * return changes.toStateDeferred(initApplied) + * return changes.holdStateDeferred(initApplied) * ``` */ fun <A> Events<Stateful<A>>.holdLatestStateful(init: Stateful<A>): State<A> { @@ -586,29 +603,8 @@ interface StateScope : TransactionScope { * an associated value of [none] will remove the key from the tracked [Map]. */ fun <K, V> Events<Map<K, Maybe<V>>>.foldStateMapIncrementally( - initialValues: DeferredValue<Map<K, V>> - ): State<Map<K, V>> = - foldStateDeferred(initialValues) { patch, map -> - val (adds: List<Pair<K, V>>, removes: List<K>) = - patch - .asSequence() - .map { (k, v) -> if (v is Just) Left(k to v.value) else Right(k) } - .partitionEithers() - val removed: Map<K, V> = map - removes.toSet() - val updated: Map<K, V> = removed + adds - updated - } - - /** - * Returns a [State] holding a [Map] that is updated incrementally whenever this emits a value. - * - * The value emitted is used as a "patch" for the tracked [Map]; for each key [K] in the emitted - * map, an associated value of [Just] will insert or replace the value in the tracked [Map], and - * an associated value of [none] will remove the key from the tracked [Map]. - */ - fun <K, V> Events<Map<K, Maybe<V>>>.foldStateMapIncrementally( initialValues: Map<K, V> = emptyMap() - ): State<Map<K, V>> = foldStateMapIncrementally(deferredOf(initialValues)) + ): Incremental<K, V> = foldStateMapIncrementally(deferredOf(initialValues)) /** * Returns an [Events] that wraps each emission of the original [Events] into an [IndexedValue], @@ -757,4 +753,54 @@ interface StateScope : TransactionScope { other: State<B>, transform: TransactionScope.(A, B) -> C, ): State<C> = combineTransactionally(this, other, transform) + + /** + * Returns an [Incremental] that reflects the state of the original [Incremental], but also adds + * / removes entries based on the state of the original's values. + */ + fun <K, V> Incremental<K, State<Maybe<V>>>.applyStateIncrementally(): Incremental<K, V> = + mapValues { (_, v) -> v.changes } + .mergeEventsIncrementallyPromptly() + .foldStateMapIncrementally( + deferredStateScope { sample().mapMaybeValues { (_, s) -> s.sample() } } + ) + + /** + * Returns an [Incremental] that reflects the state of the original [Incremental], but also adds + * / removes entries based on the [State] returned from applying [transform] to the original's + * entries. + */ + fun <K, V, U> Incremental<K, V>.mapIncrementalState( + transform: KairosScope.(Map.Entry<K, V>) -> State<Maybe<U>> + ): Incremental<K, U> = mapValues { transform(it) }.applyStateIncrementally() + + /** + * Returns an [Incremental] that reflects the state of the original [Incremental], but also adds + * / removes entries based on the [State] returned from applying [transform] to the original's + * entries, such that entries are added when that state is `true`, and removed when `false`. + */ + fun <K, V> Incremental<K, V>.filterIncrementally( + transform: KairosScope.(Map.Entry<K, V>) -> State<Boolean> + ): Incremental<K, V> = mapIncrementalState { entry -> + transform(entry).map { if (it) just(entry.value) else none } + } + + /** + * Returns an [Incremental] that samples the [Transactionals][Transactional] held by the + * original within the same transaction that the incremental [updates]. + */ + fun <K, V> Incremental<K, Transactional<V>>.sampleTransactionals(): Incremental<K, V> = + updates + .map { patch -> patch.mapValues { (k, mv) -> mv.map { it.sample() } } } + .foldStateMapIncrementally( + deferredStateScope { sample().mapValues { (k, v) -> v.sample() } } + ) + + /** + * Returns an [Incremental] that tracks the entries of the original incremental, but values + * replaced with those obtained by applying [transform] to each original entry. + */ + fun <K, V, U> Incremental<K, V>.mapValuesTransactionally( + transform: TransactionScope.(Map.Entry<K, V>) -> U + ): Incremental<K, U> = mapValues { transactionally { transform(it) } }.sampleTransactionals() } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/debug/Debug.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/debug/Debug.kt deleted file mode 100644 index d43a0bbf433e..000000000000 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/debug/Debug.kt +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright (C) 2024 The Android Open Source Project - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.android.systemui.kairos.debug - -import com.android.systemui.kairos.MutableState -import com.android.systemui.kairos.State -import com.android.systemui.kairos.StateInit -import com.android.systemui.kairos.StateLoop -import com.android.systemui.kairos.internal.DerivedFlatten -import com.android.systemui.kairos.internal.DerivedMap -import com.android.systemui.kairos.internal.DerivedMapCheap -import com.android.systemui.kairos.internal.DerivedZipped -import com.android.systemui.kairos.internal.Init -import com.android.systemui.kairos.internal.StateDerived -import com.android.systemui.kairos.internal.StateImpl -import com.android.systemui.kairos.internal.StateSource -import com.android.systemui.kairos.util.Just -import com.android.systemui.kairos.util.Maybe -import com.android.systemui.kairos.util.None -import com.android.systemui.kairos.util.flatMap -import com.android.systemui.kairos.util.map -import com.android.systemui.kairos.util.none -import com.android.systemui.kairos.util.orElseGet - -// object IdGen { -// private val counter = AtomicLong() -// fun getId() = counter.getAndIncrement() -// } - -typealias StateGraph = Graph<ActivationInfo> - -sealed class StateInfo( - val name: String, - val value: Maybe<Any?>, - val operator: String, - val epoch: Long?, -) - -class Source(name: String, value: Maybe<Any?>, operator: String, epoch: Long) : - StateInfo(name, value, operator, epoch) - -class Derived( - name: String, - val type: DerivedStateType, - value: Maybe<Any?>, - operator: String, - epoch: Long?, -) : StateInfo(name, value, operator, epoch) - -sealed interface DerivedStateType - -data object Flatten : DerivedStateType - -data class Mapped(val cheap: Boolean) : DerivedStateType - -data object Combine : DerivedStateType - -sealed class InitInfo(val name: String) - -class Uninitialized(name: String) : InitInfo(name) - -class Initialized(val state: StateInfo) : InitInfo(state.name) - -sealed interface ActivationInfo - -class Inactive(val name: String) : ActivationInfo - -class Active(val nodeInfo: StateInfo) : ActivationInfo - -class Dead(val name: String) : ActivationInfo - -data class Edge(val upstream: Any, val downstream: Any, val tag: Any? = null) - -data class Graph<T>(val nodes: Map<Any, T>, val edges: List<Edge>) - -internal fun State<*>.dump(infoMap: MutableMap<Any, InitInfo>, edges: MutableList<Edge>) { - val init: Init<StateImpl<Any?>> = - when (this) { - is StateInit -> init - is StateLoop -> init - is MutableState -> state.init - } - when (val stateMaybe = init.getUnsafe()) { - None -> { - infoMap[this] = Uninitialized(init.name ?: init.toString()) - } - is Just -> { - stateMaybe.value.dump(infoMap, edges) - } - } -} - -internal fun StateImpl<*>.dump(infoById: MutableMap<Any, InitInfo>, edges: MutableList<Edge>) { - val state = this - if (state in infoById) return - val stateInfo = - when (state) { - is StateDerived -> { - val type = - when (state) { - is DerivedFlatten -> { - state.upstream.dump(infoById, edges) - edges.add( - Edge(upstream = state.upstream, downstream = state, tag = "outer") - ) - state.upstream - .getUnsafe() - .orElseGet { null } - ?.let { - edges.add( - Edge(upstream = it, downstream = state, tag = "inner") - ) - it.dump(infoById, edges) - } - Flatten - } - is DerivedMap<*, *> -> { - state.upstream.dump(infoById, edges) - edges.add(Edge(upstream = state.upstream, downstream = state)) - Mapped(cheap = false) - } - is DerivedZipped<*, *, *> -> { - state.upstream.forEach { (key, upstream) -> - edges.add( - Edge(upstream = upstream, downstream = state, tag = "key=$key") - ) - upstream.dump(infoById, edges) - } - Combine - } - } - Derived( - state.name ?: state.operatorName, - type, - state.getCachedUnsafe(), - state.operatorName, - state.invalidatedEpoch, - ) - } - is StateSource -> - Source( - state.name ?: state.operatorName, - state.getStorageUnsafe(), - state.operatorName, - state.writeEpoch, - ) - is DerivedMapCheap<*, *> -> { - state.upstream.dump(infoById, edges) - edges.add(Edge(upstream = state.upstream, downstream = state)) - val type = Mapped(cheap = true) - Derived( - state.name ?: state.operatorName, - type, - state.getUnsafe(), - state.operatorName, - null, - ) - } - } - infoById[state] = Initialized(stateInfo) -} - -private fun <A> StateImpl<A>.getUnsafe(): Maybe<A> = - when (this) { - is StateDerived -> getCachedUnsafe() - is StateSource -> getStorageUnsafe() - is DerivedMapCheap<*, *> -> none - } - -private fun <A> StateImpl<A>.getUnsafeWithEpoch(): Maybe<Pair<A, Long>> = - when (this) { - is StateDerived -> getCachedUnsafe().map { it to invalidatedEpoch } - is StateSource -> getStorageUnsafe().map { it to writeEpoch } - is DerivedMapCheap<*, *> -> none - } - -/** - * Returns the current value held in this [State], or [none] if the [State] has not been - * initialized. - * - * The returned [Long] is the *epoch* at which the internal cache was last updated. This can be used - * to identify values which are out-of-date. - */ -fun <A> State<A>.sampleUnsafe(): Maybe<Pair<A, Long>> = - when (this) { - is MutableState -> state.init.getUnsafe().flatMap { it.getUnsafeWithEpoch() } - is StateInit -> init.getUnsafe().flatMap { it.getUnsafeWithEpoch() } - is StateLoop -> this.init.getUnsafe().flatMap { it.getUnsafeWithEpoch() } - } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt index 8a66f9a0d40d..d2c154f05b37 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/DeferScope.kt @@ -45,17 +45,10 @@ internal inline fun <A> deferScope(block: DeferScope.() -> A): A { internal object NoValue -internal class CompletableLazy<T> : Lazy<T> { - - private var _value: Any? - - constructor() { - _value = NoValue - } - - constructor(init: T) { - _value = init - } +internal class CompletableLazy<T>( + private var _value: Any? = NoValue, + private val name: String? = null, +) : Lazy<T> { fun setValue(value: T) { check(_value === NoValue) { "CompletableLazy value already set" } @@ -64,7 +57,7 @@ internal class CompletableLazy<T> : Lazy<T> { override val value: T get() { - check(_value !== NoValue) { "CompletableLazy accessed before initialized" } + check(_value !== NoValue) { "CompletableLazy($name) accessed before initialized" } @Suppress("UNCHECKED_CAST") return _value as T } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt index d19a47eb873e..4cf24580fa32 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt @@ -261,7 +261,7 @@ internal class DemuxImpl<in K, out A>(private val dmux: DemuxLifecycle<K, A>) { internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycleState<K, A>) { val mutex = Mutex() - override fun toString(): String = "TFlowDmuxState[$hashString][$lifecycleState][$mutex]" + override fun toString(): String = "EventsDmuxState[$hashString][$lifecycleState][$mutex]" fun activate(evalScope: EvalScope, key: K): Pair<DemuxNode<*, K, A>.BranchNode, Boolean>? = when (val state = lifecycleState) { diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EventsImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EventsImpl.kt index e7978b8bc5ea..59c5e7244aa2 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EventsImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EventsImpl.kt @@ -16,7 +16,7 @@ package com.android.systemui.kairos.internal -/* Initialized TFlow */ +/* Initialized Events */ internal fun interface EventsImpl<out A> { fun activate(evalScope: EvalScope, downstream: Schedulable): ActivationResult<A>? } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/IncrementalImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/IncrementalImpl.kt new file mode 100644 index 000000000000..8a3e01af6565 --- /dev/null +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/IncrementalImpl.kt @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2024 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.android.systemui.kairos.internal + +import com.android.systemui.kairos.internal.store.StoreEntry +import com.android.systemui.kairos.util.Maybe +import com.android.systemui.kairos.util.applyPatch +import com.android.systemui.kairos.util.just +import com.android.systemui.kairos.util.map +import com.android.systemui.kairos.util.none + +internal class IncrementalImpl<K, out V>( + name: String?, + operatorName: String, + changes: EventsImpl<Map<K, V>>, + val patches: EventsImpl<Map<K, Maybe<V>>>, + store: StateStore<Map<K, V>>, +) : StateImpl<Map<K, V>>(name, operatorName, changes, store) + +internal fun <K, V> constIncremental( + name: String?, + operatorName: String, + init: Map<K, V>, +): IncrementalImpl<K, V> = + IncrementalImpl(name, operatorName, neverImpl, neverImpl, StateSource(init)) + +internal inline fun <K, V> activatedIncremental( + name: String?, + operatorName: String, + evalScope: EvalScope, + crossinline getPatches: EvalScope.() -> EventsImpl<Map<K, Maybe<V>>>, + init: Lazy<Map<K, V>>, +): IncrementalImpl<K, V> { + val store = StateSource(init) + val maybeChanges = + mapImpl(getPatches) { patch, _ -> + val (old, _) = store.getCurrentWithEpoch(evalScope = this) + val new = old.applyPatch(patch) + if (new != old) just(patch to new) else none + } + .cached() + val calm = filterJustImpl { maybeChanges } + val changes = mapImpl({ calm }) { (_, change), _ -> change } + val patches = mapImpl({ calm }) { (patch, _), _ -> patch } + evalScope.scheduleOutput( + OneShot { + changes.activate(evalScope = this, downstream = Schedulable.S(store))?.let { + (connection, needsEval) -> + store.upstreamConnection = connection + if (needsEval) { + schedule(store) + } + } + } + ) + return IncrementalImpl(name, operatorName, changes, patches, store) +} + +internal inline fun <K, V> EventsImpl<Map<K, Maybe<V>>>.calmUpdates( + state: StateDerived<Map<K, V>> +): Pair<EventsImpl<Map<K, Maybe<V>>>, EventsImpl<Map<K, V>>> { + val maybeUpdate = + mapImpl({ this@calmUpdates }) { patch, _ -> + val (current, _) = state.getCurrentWithEpoch(evalScope = this) + val new = current.applyPatch(patch) + if (new != current) { + state.setCacheFromPush(new, epoch) + just(patch to new) + } else { + none + } + } + .cached() + val calm = filterJustImpl { maybeUpdate } + val patches = mapImpl({ calm }) { (p, _), _ -> p } + val changes = mapImpl({ calm }) { (_, s), _ -> s } + return patches to changes +} + +internal fun <K, A, B> mapValuesImpl( + incrementalImpl: InitScope.() -> IncrementalImpl<K, A>, + name: String?, + operatorName: String, + transform: EvalScope.(Map.Entry<K, A>) -> B, +): IncrementalImpl<K, B> { + val store = DerivedMap(incrementalImpl) { map -> map.mapValues { transform(it) } } + val mappedPatches = + mapImpl({ incrementalImpl().patches }) { patch, _ -> + patch.mapValues { (k, mv) -> mv.map { v -> transform(StoreEntry(k, v)) } } + } + .cached() + val (calmPatches, calmChanges) = mappedPatches.calmUpdates(store) + return IncrementalImpl(name, operatorName, calmChanges, calmPatches, store) +} diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt index 10a46775beb9..640c561a21eb 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Init.kt @@ -50,6 +50,9 @@ internal class Init<out A>(val name: String?, private val block: InitScope.() -> } } -internal fun <A> init(name: String?, block: InitScope.() -> A) = Init(name, block) +@Suppress("NOTHING_TO_INLINE") +internal inline fun <A> init(name: String?, noinline block: InitScope.() -> A): Init<A> = + Init(name, block) -internal fun <A> constInit(name: String?, value: A) = init(name) { value } +@Suppress("NOTHING_TO_INLINE") +internal inline fun <A> constInit(name: String?, value: A): Init<A> = init(name) { value } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt index 53a6ecabda6a..067075bba3e4 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt @@ -282,7 +282,7 @@ internal inline fun <A> switchDeferredImplSingle( crossinline getStorage: EvalScope.() -> EventsImpl<A>, crossinline getPatches: EvalScope.() -> EventsImpl<EventsImpl<A>>, ): EventsImpl<A> { - val patches = mapImpl(getPatches) { newFlow, _ -> singleOf(just(newFlow)).asIterable() } + val patches = mapImpl(getPatches) { newEvents, _ -> singleOf(just(newEvents)).asIterable() } val switchDeferredImpl = switchDeferredImpl( name = name, @@ -402,7 +402,7 @@ internal inline fun <A, B> mergeNodes( ): EventsImpl<These<A, B>> { val storage = listOf( - mapImpl(getPulse) { it, _ -> These.thiz<A, B>(it) }, + mapImpl(getPulse) { it, _ -> These.thiz(it) }, mapImpl(getOther) { it, _ -> These.that(it) }, ) .asIterableWithIndex() diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt index 785a98b105c5..26e2d4db0c52 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt @@ -311,7 +311,7 @@ internal inline fun <A> switchPromptImplSingle( switchPromptImpl( getStorage = { singleOf(getStorage()).asIterable() }, getPatches = { - mapImpl(getPatches) { newFlow, _ -> singleOf(just(newFlow)).asIterable() } + mapImpl(getPatches) { newEvents, _ -> singleOf(just(newEvents)).asIterable() } }, storeFactory = SingletonMapK.Factory(), ) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateImpl.kt index 5ba645246b0f..46127cb2276b 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateImpl.kt @@ -24,24 +24,28 @@ import com.android.systemui.kairos.internal.util.hashString import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.none -import java.util.concurrent.atomic.AtomicLong -import kotlinx.coroutines.ExperimentalCoroutinesApi -internal sealed interface StateImpl<out A> { - val name: String? - val operatorName: String - val changes: EventsImpl<A> - - fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> +internal open class StateImpl<out A>( + val name: String?, + val operatorName: String, + val changes: EventsImpl<A>, + val store: StateStore<A>, +) { + fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> = + store.getCurrentWithEpoch(evalScope) } -internal sealed class StateDerived<A>(override val changes: EventsImpl<A>) : StateImpl<A> { +internal sealed class StateDerived<A> : StateStore<A>() { @Volatile var invalidatedEpoch = Long.MIN_VALUE private set @Volatile + protected var validatedEpoch = Long.MIN_VALUE + private set + + @Volatile protected var cache: Any? = EmptyCache private set @@ -52,15 +56,19 @@ internal sealed class StateDerived<A>(override val changes: EventsImpl<A>) : Sta fun pull(evalScope: EvalScope): Pair<A, Long> { @Suppress("UNCHECKED_CAST") - return recalc(evalScope)?.also { (a, epoch) -> setCache(a, epoch) } - ?: ((cache as A) to invalidatedEpoch) - } - - fun setCache(value: A, epoch: Long) { - if (epoch > invalidatedEpoch) { - cache = value - invalidatedEpoch = epoch - } + val result = + recalc(evalScope)?.let { (newValue, epoch) -> + newValue.also { + if (epoch > validatedEpoch) { + validatedEpoch = epoch + if (cache != newValue) { + cache = newValue + invalidatedEpoch = epoch + } + } + } + } ?: (cache as A) + return result to invalidatedEpoch } fun getCachedUnsafe(): Maybe<A> { @@ -70,52 +78,50 @@ internal sealed class StateDerived<A>(override val changes: EventsImpl<A>) : Sta protected abstract fun recalc(evalScope: EvalScope): Pair<A, Long>? + fun setCacheFromPush(value: A, epoch: Long) { + cache = value + validatedEpoch = epoch + 1 + invalidatedEpoch = epoch + 1 + } + private data object EmptyCache } -internal class StateSource<A>( - override val name: String?, - override val operatorName: String, - init: Lazy<A>, - override val changes: EventsImpl<A>, -) : StateImpl<A> { - constructor( - name: String?, - operatorName: String, - init: A, - changes: EventsImpl<A>, - ) : this(name, operatorName, CompletableLazy(init), changes) +internal sealed class StateStore<out S> { + abstract fun getCurrentWithEpoch(evalScope: EvalScope): Pair<S, Long> +} + +internal class StateSource<S>(init: Lazy<S>) : StateStore<S>() { + constructor(init: S) : this(CompletableLazy(init)) - lateinit var upstreamConnection: NodeConnection<A> + lateinit var upstreamConnection: NodeConnection<S> // Note: Don't need to synchronize; we will never interleave reads and writes, since all writes // are performed at the end of a network step, after any reads would have taken place. - @Volatile private var _current: Lazy<A> = init + @Volatile private var _current: Lazy<S> = init @Volatile var writeEpoch = 0L private set - override fun getCurrentWithEpoch(evalScope: EvalScope): Pair<A, Long> = + override fun getCurrentWithEpoch(evalScope: EvalScope): Pair<S, Long> = _current.value to writeEpoch /** called by network after eval phase has completed */ fun updateState(logIndent: Int, evalScope: EvalScope) { // write the latch - // TODO: deferAsync? _current = CompletableLazy(upstreamConnection.getPushEvent(logIndent, evalScope)) - writeEpoch = evalScope.epoch + writeEpoch = evalScope.epoch + 1 } - override fun toString(): String = "StateImpl(changes=$changes, current=$_current)" + override fun toString(): String = "StateImpl(current=$_current, writeEpoch=$writeEpoch)" - @OptIn(ExperimentalCoroutinesApi::class) - fun getStorageUnsafe(): Maybe<A> = if (_current.isInitialized()) just(_current.value) else none + fun getStorageUnsafe(): Maybe<S> = if (_current.isInitialized()) just(_current.value) else none } internal fun <A> constState(name: String?, operatorName: String, init: A): StateImpl<A> = - StateSource(name, operatorName, init, neverImpl) + StateImpl(name, operatorName, neverImpl, StateSource(init)) internal inline fun <A> activatedStateSource( name: String?, @@ -124,33 +130,28 @@ internal inline fun <A> activatedStateSource( crossinline getChanges: EvalScope.() -> EventsImpl<A>, init: Lazy<A>, ): StateImpl<A> { - lateinit var state: StateSource<A> + val store = StateSource(init) val calm: EventsImpl<A> = - filterImpl(getChanges) { new -> new != state.getCurrentWithEpoch(evalScope = this).first } - return StateSource(name, operatorName, init, calm).also { - state = it - evalScope.scheduleOutput( - OneShot { - calm.activate(evalScope = this, downstream = Schedulable.S(state))?.let { - (connection, needsEval) -> - state.upstreamConnection = connection - if (needsEval) { - schedule(state) - } + filterImpl(getChanges) { new -> new != store.getCurrentWithEpoch(evalScope = this).first } + evalScope.scheduleOutput( + OneShot { + calm.activate(evalScope = this, downstream = Schedulable.S(store))?.let { + (connection, needsEval) -> + store.upstreamConnection = connection + if (needsEval) { + schedule(store) } } - ) - } + } + ) + return StateImpl(name, operatorName, calm, store) } -private inline fun <A> EventsImpl<A>.calm( - crossinline getState: () -> StateDerived<A> -): EventsImpl<A> = +private inline fun <A> EventsImpl<A>.calm(state: StateDerived<A>): EventsImpl<A> = filterImpl({ this@calm }) { new -> - val state = getState() val (current, _) = state.getCurrentWithEpoch(evalScope = this) if (new != current) { - state.setCache(new, epoch) + state.setCacheFromPush(new, epoch) true } else { false @@ -158,58 +159,53 @@ private inline fun <A> EventsImpl<A>.calm( } .cached() -internal fun <A, B> StateImpl<A>.mapCheap( +internal fun <A, B> mapStateImplCheap( + stateImpl: Init<StateImpl<A>>, name: String?, operatorName: String, transform: EvalScope.(A) -> B, ): StateImpl<B> = - DerivedMapCheap( - name, - operatorName, - this, - mapImpl({ changes }) { it, _ -> transform(it) }, - transform, + StateImpl( + name = name, + operatorName = operatorName, + changes = mapImpl({ stateImpl.connect(this).changes }) { it, _ -> transform(it) }, + store = DerivedMapCheap(stateImpl, transform), ) internal class DerivedMapCheap<A, B>( - override val name: String?, - override val operatorName: String, - val upstream: StateImpl<A>, - override val changes: EventsImpl<B>, + val upstream: Init<StateImpl<A>>, private val transform: EvalScope.(A) -> B, -) : StateImpl<B> { +) : StateStore<B>() { override fun getCurrentWithEpoch(evalScope: EvalScope): Pair<B, Long> { - val (a, epoch) = upstream.getCurrentWithEpoch(evalScope) + val (a, epoch) = upstream.connect(evalScope).getCurrentWithEpoch(evalScope) return evalScope.transform(a) to epoch } override fun toString(): String = "${this::class.simpleName}@$hashString" } -internal fun <A, B> StateImpl<A>.map( +internal fun <A, B> mapStateImpl( + stateImpl: InitScope.() -> StateImpl<A>, name: String?, operatorName: String, transform: EvalScope.(A) -> B, ): StateImpl<B> { - lateinit var state: StateDerived<B> - val mappedChanges = mapImpl({ changes }) { it, _ -> transform(it) }.cached().calm { state } - state = DerivedMap(name, operatorName, transform, this, mappedChanges) - return state + val store = DerivedMap(stateImpl, transform) + val mappedChanges = + mapImpl({ stateImpl().changes }) { it, _ -> transform(it) }.cached().calm(store) + return StateImpl(name, operatorName, mappedChanges, store) } internal class DerivedMap<A, B>( - override val name: String?, - override val operatorName: String, + val upstream: InitScope.() -> StateImpl<A>, private val transform: EvalScope.(A) -> B, - val upstream: StateImpl<A>, - changes: EventsImpl<B>, -) : StateDerived<B>(changes) { +) : StateDerived<B>() { override fun toString(): String = "${this::class.simpleName}@$hashString" override fun recalc(evalScope: EvalScope): Pair<B, Long>? { - val (a, epoch) = upstream.getCurrentWithEpoch(evalScope) - return if (epoch > invalidatedEpoch) { + val (a, epoch) = evalScope.upstream().getCurrentWithEpoch(evalScope) + return if (epoch > validatedEpoch) { evalScope.transform(a) to epoch } else { null @@ -217,35 +213,34 @@ internal class DerivedMap<A, B>( } } -internal fun <A> StateImpl<StateImpl<A>>.flatten(name: String?, operator: String): StateImpl<A> { +internal fun <A> flattenStateImpl( + stateImpl: InitScope.() -> StateImpl<StateImpl<A>>, + name: String?, + operator: String, +): StateImpl<A> { // emits the current value of the new inner state, when that state is emitted val switchEvents = - mapImpl({ changes }) { newInner, _ -> newInner.getCurrentWithEpoch(this).first } + mapImpl({ stateImpl().changes }) { newInner, _ -> newInner.getCurrentWithEpoch(this).first } // emits the new value of the new inner state when that state is emitted, or // falls back to the current value if a new state is *not* being emitted this // transaction val innerChanges = - mapImpl({ changes }) { newInner, _ -> + mapImpl({ stateImpl().changes }) { newInner, _ -> mergeNodes({ switchEvents }, { newInner.changes }) { _, new -> new } } val switchedChanges: EventsImpl<A> = switchPromptImplSingle( - getStorage = { this@flatten.getCurrentWithEpoch(evalScope = this).first.changes }, + getStorage = { stateImpl().getCurrentWithEpoch(evalScope = this).first.changes }, getPatches = { innerChanges }, ) - lateinit var state: DerivedFlatten<A> - state = DerivedFlatten(name, operator, this, switchedChanges.calm { state }) - return state + val store: DerivedFlatten<A> = DerivedFlatten(stateImpl) + return StateImpl(name, operator, switchedChanges.calm(store), store) } -internal class DerivedFlatten<A>( - override val name: String?, - override val operatorName: String, - val upstream: StateImpl<StateImpl<A>>, - changes: EventsImpl<A>, -) : StateDerived<A>(changes) { +internal class DerivedFlatten<A>(val upstream: InitScope.() -> StateImpl<StateImpl<A>>) : + StateDerived<A>() { override fun recalc(evalScope: EvalScope): Pair<A, Long> { - val (inner, epoch0) = upstream.getCurrentWithEpoch(evalScope) + val (inner, epoch0) = evalScope.upstream().getCurrentWithEpoch(evalScope) val (a, epoch1) = inner.getCurrentWithEpoch(evalScope) return a to maxOf(epoch0, epoch1) } @@ -254,97 +249,144 @@ internal class DerivedFlatten<A>( } @Suppress("NOTHING_TO_INLINE") -internal inline fun <A, B> StateImpl<A>.flatMap( +internal inline fun <A, B> flatMapStateImpl( + noinline stateImpl: InitScope.() -> StateImpl<A>, name: String?, operatorName: String, noinline transform: EvalScope.(A) -> StateImpl<B>, -): StateImpl<B> = map(null, operatorName, transform).flatten(name, operatorName) +): StateImpl<B> { + val mapped = mapStateImpl(stateImpl, null, operatorName, transform) + return flattenStateImpl({ mapped }, name, operatorName) +} internal fun <A, B, Z> zipStates( name: String?, operatorName: String, - l1: StateImpl<A>, - l2: StateImpl<B>, + l1: Init<StateImpl<A>>, + l2: Init<StateImpl<B>>, transform: EvalScope.(A, B) -> Z, -): StateImpl<Z> = - zipStateList(null, operatorName, listOf(l1, l2)).map(name, operatorName) { +): StateImpl<Z> { + val zipped = + zipStateList( + null, + operatorName, + 2, + init(null) { listOf(l1.connect(this), l2.connect(this)) }, + ) + return mapStateImpl({ zipped }, name, operatorName) { @Suppress("UNCHECKED_CAST") transform(it[0] as A, it[1] as B) } +} internal fun <A, B, C, Z> zipStates( name: String?, operatorName: String, - l1: StateImpl<A>, - l2: StateImpl<B>, - l3: StateImpl<C>, + l1: Init<StateImpl<A>>, + l2: Init<StateImpl<B>>, + l3: Init<StateImpl<C>>, transform: EvalScope.(A, B, C) -> Z, -): StateImpl<Z> = - zipStateList(null, operatorName, listOf(l1, l2, l3)).map(name, operatorName) { +): StateImpl<Z> { + val zipped = + zipStateList( + null, + operatorName, + 3, + init(null) { listOf(l1.connect(this), l2.connect(this), l3.connect(this)) }, + ) + return mapStateImpl({ zipped }, name, operatorName) { @Suppress("UNCHECKED_CAST") transform(it[0] as A, it[1] as B, it[2] as C) } +} internal fun <A, B, C, D, Z> zipStates( name: String?, operatorName: String, - l1: StateImpl<A>, - l2: StateImpl<B>, - l3: StateImpl<C>, - l4: StateImpl<D>, + l1: Init<StateImpl<A>>, + l2: Init<StateImpl<B>>, + l3: Init<StateImpl<C>>, + l4: Init<StateImpl<D>>, transform: EvalScope.(A, B, C, D) -> Z, -): StateImpl<Z> = - zipStateList(null, operatorName, listOf(l1, l2, l3, l4)).map(name, operatorName) { +): StateImpl<Z> { + val zipped = + zipStateList( + null, + operatorName, + 4, + init(null) { + listOf(l1.connect(this), l2.connect(this), l3.connect(this), l4.connect(this)) + }, + ) + return mapStateImpl({ zipped }, name, operatorName) { @Suppress("UNCHECKED_CAST") transform(it[0] as A, it[1] as B, it[2] as C, it[3] as D) } +} internal fun <A, B, C, D, E, Z> zipStates( name: String?, operatorName: String, - l1: StateImpl<A>, - l2: StateImpl<B>, - l3: StateImpl<C>, - l4: StateImpl<D>, - l5: StateImpl<E>, + l1: Init<StateImpl<A>>, + l2: Init<StateImpl<B>>, + l3: Init<StateImpl<C>>, + l4: Init<StateImpl<D>>, + l5: Init<StateImpl<E>>, transform: EvalScope.(A, B, C, D, E) -> Z, -): StateImpl<Z> = - zipStateList(null, operatorName, listOf(l1, l2, l3, l4, l5)).map(name, operatorName) { +): StateImpl<Z> { + val zipped = + zipStateList( + null, + operatorName, + 5, + init(null) { + listOf( + l1.connect(this), + l2.connect(this), + l3.connect(this), + l4.connect(this), + l5.connect(this), + ) + }, + ) + return mapStateImpl({ zipped }, name, operatorName) { @Suppress("UNCHECKED_CAST") transform(it[0] as A, it[1] as B, it[2] as C, it[3] as D, it[4] as E) } +} internal fun <K, V> zipStateMap( name: String?, operatorName: String, - states: Map<K, StateImpl<V>>, + numStates: Int, + states: Init<Map<K, StateImpl<V>>>, ): StateImpl<Map<K, V>> = zipStates( name = name, operatorName = operatorName, - numStates = states.size, - states = states.asIterable(), + numStates = numStates, + states = init(null) { states.connect(this).asIterable() }, storeFactory = ConcurrentHashMapK.Factory(), ) internal fun <V> zipStateList( name: String?, operatorName: String, - states: List<StateImpl<V>>, + numStates: Int, + states: Init<List<StateImpl<V>>>, ): StateImpl<List<V>> { val zipped = zipStates( name = name, operatorName = operatorName, - numStates = states.size, - states = states.asIterableWithIndex(), + numStates = numStates, + states = init(name) { states.connect(this).asIterableWithIndex() }, storeFactory = MutableArrayMapK.Factory(), ) // Like mapCheap, but with caching (or like map, but without the calm changes, as they are not // necessary). - return DerivedMap( + return StateImpl( name = name, operatorName = operatorName, - transform = { arrayStore -> arrayStore.values.toList() }, - upstream = zipped, changes = mapImpl({ zipped.changes }) { arrayStore, _ -> arrayStore.values.toList() }, + DerivedMap(upstream = { zipped }, transform = { arrayStore -> arrayStore.values.toList() }), ) } @@ -352,64 +394,60 @@ internal fun <W, K, A> zipStates( name: String?, operatorName: String, numStates: Int, - states: Iterable<Map.Entry<K, StateImpl<A>>>, + states: Init<Iterable<Map.Entry<K, StateImpl<A>>>>, storeFactory: MutableMapK.Factory<W, K>, ): StateImpl<MutableMapK<W, K, A>> { if (numStates == 0) { return constState(name, operatorName, storeFactory.create(0)) } - val stateChanges = states.asSequence().map { (k, v) -> StoreEntry(k, v.changes) }.asIterable() - lateinit var state: DerivedZipped<W, K, A> + val stateStore = DerivedZipped(numStates, states, storeFactory) // No need for calm; invariant ensures that changes will only emit when there's a difference val switchDeferredImpl = switchDeferredImpl( - getStorage = { stateChanges }, + getStorage = { + states + .connect(this) + .asSequence() + .map { (k, v) -> StoreEntry(k, v.changes) } + .asIterable() + }, getPatches = { neverImpl }, storeFactory = storeFactory, ) val changes = mapImpl({ switchDeferredImpl }) { patch, logIndent -> - val store = storeFactory.create<A>(numStates) - states.forEach { (k, state) -> - store[k] = + val muxStore = storeFactory.create<A>(numStates) + states.connect(this).forEach { (k, state) -> + muxStore[k] = if (patch.contains(k)) { patch.getValue(k).getPushEvent(logIndent, evalScope = this@mapImpl) } else { state.getCurrentWithEpoch(evalScope = this@mapImpl).first } } - store.also { state.setCache(it, epoch) } + // Read the current value so that it is cached in this transaction and won't be + // clobbered by the cache write + stateStore.getCurrentWithEpoch(evalScope = this) + muxStore.also { stateStore.setCacheFromPush(it, epoch) } } .cached() - state = - DerivedZipped( - name = name, - operatorName = operatorName, - upstreamSize = numStates, - upstream = states, - changes = changes, - storeFactory = storeFactory, - ) - return state + return StateImpl(name, operatorName, changes, stateStore) } internal class DerivedZipped<W, K, A>( - override val name: String?, - override val operatorName: String, private val upstreamSize: Int, - val upstream: Iterable<Map.Entry<K, StateImpl<A>>>, - changes: EventsImpl<MutableMapK<W, K, A>>, + val upstream: Init<Iterable<Map.Entry<K, StateImpl<A>>>>, private val storeFactory: MutableMapK.Factory<W, K>, -) : StateDerived<MutableMapK<W, K, A>>(changes) { +) : StateDerived<MutableMapK<W, K, A>>() { override fun recalc(evalScope: EvalScope): Pair<MutableMapK<W, K, A>, Long> { - val newEpoch = AtomicLong() + var newEpoch = 0L val store = storeFactory.create<A>(upstreamSize) - for ((key, value) in upstream) { + for ((key, value) in upstream.connect(evalScope)) { val (a, epoch) = value.getCurrentWithEpoch(evalScope) - newEpoch.accumulateAndGet(epoch, ::maxOf) + newEpoch = maxOf(newEpoch, epoch) store[key] = a } - return store to newEpoch.get() + return store to newEpoch } override fun toString(): String = "${this::class.simpleName}@$hashString" @@ -419,10 +457,11 @@ internal class DerivedZipped<W, K, A>( internal inline fun <A> zipStates( name: String?, operatorName: String, - states: List<StateImpl<A>>, + numStates: Int, + states: Init<List<StateImpl<A>>>, ): StateImpl<List<A>> = - if (states.isEmpty()) { + if (numStates <= 0) { constState(name, operatorName, emptyList()) } else { - zipStateList(null, operatorName, states) + zipStateList(null, operatorName, numStates, states) } diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt index b5eec85f1f5f..bd1f94fca22f 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt @@ -21,6 +21,8 @@ import com.android.systemui.kairos.Events import com.android.systemui.kairos.EventsInit import com.android.systemui.kairos.EventsLoop import com.android.systemui.kairos.GroupedEvents +import com.android.systemui.kairos.Incremental +import com.android.systemui.kairos.IncrementalInit import com.android.systemui.kairos.State import com.android.systemui.kairos.StateInit import com.android.systemui.kairos.StateScope @@ -28,7 +30,6 @@ import com.android.systemui.kairos.Stateful import com.android.systemui.kairos.emptyEvents import com.android.systemui.kairos.groupByKey import com.android.systemui.kairos.init -import com.android.systemui.kairos.internal.store.ConcurrentHashMapK import com.android.systemui.kairos.mapCheap import com.android.systemui.kairos.merge import com.android.systemui.kairos.switchEvents @@ -47,69 +48,24 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: val operatorName = "holdStateDeferred" // Ensure state is only collected until the end of this scope return truncateToScope(operatorName) - .toStateInternalDeferred(operatorName, initialValue.unwrapped) + .holdStateInternalDeferred(operatorName, initialValue.unwrapped) } - override fun <K, V> Events<Map<K, Maybe<Events<V>>>>.mergeIncrementally( - name: String?, - initialEvents: DeferredValue<Map<K, Events<V>>>, - ): Events<Map<K, V>> { - val storage: State<Map<K, Events<V>>> = foldStateMapIncrementally(initialEvents) - val patches = - mapImpl({ init.connect(this) }) { patch, _ -> - patch - .mapValues { (_, m) -> m.map { events -> events.init.connect(this) } } - .asIterable() - } - return EventsInit( - constInit( - name, - switchDeferredImpl( - name = name, - getStorage = { - storage.init - .connect(this) - .getCurrentWithEpoch(this) - .first - .mapValues { (_, events) -> events.init.connect(this) } - .asIterable() - }, - getPatches = { patches }, - storeFactory = ConcurrentHashMapK.Factory(), - ) - .awaitValues(), - ) - ) - } - - override fun <K, V> Events<Map<K, Maybe<Events<V>>>>.mergeIncrementallyPromptly( - initialEvents: DeferredValue<Map<K, Events<V>>>, - name: String?, - ): Events<Map<K, V>> { - val storage: State<Map<K, Events<V>>> = foldStateMapIncrementally(initialEvents) - val patches = - mapImpl({ init.connect(this) }) { patch, _ -> - patch - .mapValues { (_, m) -> m.map { events -> events.init.connect(this) } } - .asIterable() - } - return EventsInit( + override fun <K, V> Events<Map<K, Maybe<V>>>.foldStateMapIncrementally( + initialValues: DeferredValue<Map<K, V>> + ): Incremental<K, V> { + val operatorName = "foldStateMapIncrementally" + val name = operatorName + return IncrementalInit( constInit( - name, - switchPromptImpl( - name = name, - getStorage = { - storage.init - .connect(this) - .getCurrentWithEpoch(this) - .first - .mapValues { (_, events) -> events.init.connect(this) } - .asIterable() - }, - getPatches = { patches }, - storeFactory = ConcurrentHashMapK.Factory(), - ) - .awaitValues(), + operatorName, + activatedIncremental( + name, + operatorName, + evalScope, + { init.connect(this) }, + initialValues.unwrapped, + ), ) ) } @@ -171,7 +127,7 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: } else { endSignalOnce .mapCheap { emptyEvents } - .toStateInternal(operatorName, this) + .holdStateInternal(operatorName, this) .switchEvents() } @@ -182,19 +138,19 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal: EventsLoop<A>().apply { loopback = mapCheap { emptyEvents } - .toStateInternal(operatorName, this@nextOnlyInternal) + .holdStateInternal(operatorName, this@nextOnlyInternal) .switchEvents() } } - private fun <A> Events<A>.toStateInternal(operatorName: String, init: A): State<A> = - toStateInternalDeferred(operatorName, CompletableLazy(init)) + private fun <A> Events<A>.holdStateInternal(operatorName: String, init: A): State<A> = + holdStateInternalDeferred(operatorName, CompletableLazy(init)) - private fun <A> Events<A>.toStateInternalDeferred( + private fun <A> Events<A>.holdStateInternalDeferred( operatorName: String, init: Lazy<A>, ): State<A> { - val changes = this@toStateInternalDeferred + val changes = this@holdStateInternalDeferred val name = operatorName val impl = activatedStateSource( diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt index 13f884666182..67862593b80a 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/MapUtils.kt @@ -39,7 +39,7 @@ internal suspend inline fun <K, A, B : Any, M : MutableMap<K, B>> Map<K, A> .mapValuesNotNullTo(it) { (_, deferred) -> deferred.await() } } -internal inline fun <K, A, B : Any, M : MutableMap<K, B>> Map<K, A>.mapValuesNotNullTo( +internal inline fun <K, A, B, M : MutableMap<K, B>> Map<K, A>.mapValuesNotNullTo( destination: M, block: (Map.Entry<K, A>) -> B?, ): M = @@ -49,6 +49,10 @@ internal inline fun <K, A, B : Any, M : MutableMap<K, B>> Map<K, A>.mapValuesNot } } +internal inline fun <K, A, B> Map<K, A>.mapValuesNotNull( + block: (Map.Entry<K, A>) -> B? +): Map<K, B> = mapValuesNotNullTo(mutableMapOf(), block) + internal suspend fun <A, B> Iterable<A>.mapParallel(transform: suspend (A) -> B): List<B> = coroutineScope { map { asyncImmediate { transform(it) } }.awaitAll() diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/MapPatch.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/MapPatch.kt new file mode 100644 index 000000000000..f12b17816b61 --- /dev/null +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/MapPatch.kt @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2024 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.android.systemui.kairos.util + +/** A "patch" that can be used to batch-update a [Map], via [applyPatch]. */ +typealias MapPatch<K, V> = Map<K, Maybe<V>> + +/** + * Returns a new [Map] that has [patch] applied to the original map. + * + * For each entry in [patch]: + * * a [Just] value will be included in the new map, replacing the entry in the original map with + * the same key, if present. + * * a [None] value will be omitted from the new map, excluding the entry in the original map with + * the same key, if present. + */ +fun <K, V> Map<K, V>.applyPatch(patch: MapPatch<K, V>): Map<K, V> { + val (adds: List<Pair<K, V>>, removes: List<K>) = + patch + .asSequence() + .map { (k, v) -> if (v is Just) Left(k to v.value) else Right(k) } + .partitionEithers() + val removed: Map<K, V> = this - removes.toSet() + val updated: Map<K, V> = removed + adds + return updated +} + +/** + * Returns a [MapPatch] that, when applied, includes all of the values from the original [Map]. + * + * Shorthand for: + * ```kotlin + * mapValues { just(it.value) } + * ``` + */ +fun <K, V> Map<K, V>.toMapPatch(): MapPatch<K, V> = mapValues { just(it.value) } + +/** + * Returns a [MapPatch] that, when applied, includes all of the entries from [new] whose keys are + * not present in [old], and excludes all entries with keys present in [old] that are not also + * present in [new]. + * + * Note that, unlike [mapPatchFromFullDiff], only keys are taken into account. If the same key is + * present in both [old] and [new], but the associated values are not equal, then the returned + * [MapPatch] will *not* include any update to that key. + */ +fun <K, V> mapPatchFromKeyDiff(old: Map<K, V>, new: Map<K, V>): MapPatch<K, V> { + val removes = old.keys - new.keys + val adds = new - old.keys + return buildMap { + for (removed in removes) { + put(removed, none) + } + for ((newKey, newValue) in adds) { + put(newKey, just(newValue)) + } + } +} + +/** + * Returns a [MapPatch] that, when applied, includes all of the entries from [new] that are not + * present in [old], and excludes all entries with keys present in [old] that are not also present + * in [new]. + * + * Note that, unlike [mapPatchFromKeyDiff], both keys and values are taken into account. If the same + * key is present in both [old] and [new], but the associated values are not equal, then the + * returned [MapPatch] will include the entry from [new]. + */ +fun <K, V> mapPatchFromFullDiff(old: Map<K, V>, new: Map<K, V>): MapPatch<K, V> { + val removes = old.keys - new.keys + val adds = new.mapMaybeValues { (k, v) -> if (k in old && v == old[k]) none else just(v) } + return hashMapOf<K, Maybe<V>>().apply { + for (removed in removes) { + put(removed, none) + } + for ((newKey, newValue) in adds) { + put(newKey, just(newValue)) + } + } +} diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/These.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/These.kt index aa95e0d2dc1b..408d4d1e7803 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/These.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/These.kt @@ -17,7 +17,7 @@ package com.android.systemui.kairos.util /** Contains at least one of two potential values. */ -sealed class These<A, B> { +sealed class These<out A, out B> { /** Contains a single potential value. */ class This<A, B> internal constructor(val thiz: A) : These<A, B>() @@ -29,10 +29,10 @@ sealed class These<A, B> { companion object { /** Constructs a [These] containing only [thiz]. */ - fun <A, B> thiz(thiz: A): These<A, B> = This(thiz) + fun <A> thiz(thiz: A): These<A, Nothing> = This(thiz) /** Constructs a [These] containing only [that]. */ - fun <A, B> that(that: B): These<A, B> = That(that) + fun <B> that(that: B): These<Nothing, B> = That(that) /** Constructs a [These] containing both [thiz] and [that]. */ fun <A, B> both(thiz: A, that: B): These<A, B> = Both(thiz, that) diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/WithPrev.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/WithPrev.kt index 5cfaa3ea2801..1bb97acc165d 100644 --- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/WithPrev.kt +++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/WithPrev.kt @@ -16,5 +16,5 @@ package com.android.systemui.kairos.util -/** Holds a [newValue] emitted from a `TFlow`, along with the [previousValue] emitted value. */ +/** Holds a [newValue] emitted from a `Events`, along with the [previousValue] emitted value. */ data class WithPrev<out S, out T : S>(val previousValue: S, val newValue: T) |