summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt28
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt4
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt223
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt2
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt17
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt17
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt4
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt30
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt63
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt92
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt4
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt6
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt10
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt13
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt66
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt9
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt19
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt11
-rw-r--r--packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/Maybe.kt22
19 files changed, 305 insertions, 335 deletions
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt
index a175e2e20e46..1d8fe116d57b 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt
@@ -26,11 +26,11 @@ import com.android.systemui.kairos.internal.TFlowImpl
import com.android.systemui.kairos.internal.activated
import com.android.systemui.kairos.internal.cached
import com.android.systemui.kairos.internal.constInit
-import com.android.systemui.kairos.internal.filterNode
+import com.android.systemui.kairos.internal.filterImpl
+import com.android.systemui.kairos.internal.filterJustImpl
import com.android.systemui.kairos.internal.init
import com.android.systemui.kairos.internal.map
import com.android.systemui.kairos.internal.mapImpl
-import com.android.systemui.kairos.internal.mapMaybeNode
import com.android.systemui.kairos.internal.mergeNodes
import com.android.systemui.kairos.internal.mergeNodesLeft
import com.android.systemui.kairos.internal.neverImpl
@@ -121,11 +121,8 @@ val <A> TState<A>.stateChanges: TFlow<A>
* @see mapNotNull
*/
@ExperimentalFrpApi
-fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> {
- val pulse =
- mapMaybeNode({ init.connect(evalScope = this) }) { runInTransactionScope { transform(it) } }
- return TFlowInit(constInit(name = null, pulse))
-}
+fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> =
+ map(transform).filterJust()
/**
* Returns a [TFlow] that contains only the non-null results of applying [transform] to each value
@@ -140,14 +137,17 @@ fun <A, B> TFlow<A>.mapNotNull(transform: suspend FrpTransactionScope.(A) -> B?)
}
/** Returns a [TFlow] containing only values of the original [TFlow] that are not null. */
-@ExperimentalFrpApi fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapNotNull { it }
+@ExperimentalFrpApi
+fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapCheap { it.toMaybe() }.filterJust()
/** Shorthand for `mapNotNull { it as? A }`. */
@ExperimentalFrpApi
-inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapNotNull { it as? A }
+inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapCheap { it as? A }.filterNotNull()
/** Shorthand for `mapMaybe { it }`. */
-@ExperimentalFrpApi fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> = mapMaybe { it }
+@ExperimentalFrpApi
+fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> =
+ TFlowInit(constInit(name = null, filterJustImpl { init.connect(evalScope = this) }))
/**
* Returns a [TFlow] containing the results of applying [transform] to each value of the original
@@ -203,8 +203,8 @@ fun <A> TFlow<A>.onEach(action: suspend FrpTransactionScope.(A) -> Unit): TFlow<
@ExperimentalFrpApi
fun <A> TFlow<A>.filter(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> {
val pulse =
- filterNode({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } }
- return TFlowInit(constInit(name = null, pulse.cached()))
+ filterImpl({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } }
+ return TFlowInit(constInit(name = null, pulse))
}
/**
@@ -455,7 +455,9 @@ fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> {
mapImpl({ patches }) { newFlow -> mapOf(Unit to just(newFlow.init.connect(this))) }
},
)
- return TFlowInit(constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit) }))
+ return TFlowInit(
+ constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit).getPushEvent(this) })
+ )
}
/**
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt
index 80e74748a375..8ad5f55adca3 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt
@@ -29,7 +29,7 @@ import com.android.systemui.kairos.internal.activated
import com.android.systemui.kairos.internal.cached
import com.android.systemui.kairos.internal.constInit
import com.android.systemui.kairos.internal.constS
-import com.android.systemui.kairos.internal.filterNode
+import com.android.systemui.kairos.internal.filterImpl
import com.android.systemui.kairos.internal.flatMap
import com.android.systemui.kairos.internal.init
import com.android.systemui.kairos.internal.map
@@ -469,7 +469,7 @@ internal constructor(internal val network: Network, initialValue: Deferred<T>) :
val operatorName = "MutableTState"
lateinit var state: TStateSource<T>
val calm: TFlowImpl<T> =
- filterNode({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new ->
+ filterImpl({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new ->
new != state.getCurrentWithEpoch(evalScope = this).first
}
.cached()
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt
index e7b99528fdfc..dd46fe202413 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt
@@ -14,25 +14,17 @@
* limitations under the License.
*/
-@file:Suppress("NOTHING_TO_INLINE")
-
package com.android.systemui.kairos.internal
import com.android.systemui.kairos.internal.util.hashString
-import com.android.systemui.kairos.util.Just
-import com.android.systemui.kairos.util.Maybe
-import com.android.systemui.kairos.util.flatMap
-import com.android.systemui.kairos.util.getMaybe
import java.util.concurrent.ConcurrentHashMap
-import kotlinx.coroutines.CompletableDeferred
-import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
internal class DemuxNode<K, A>(
- private val branchNodeByKey: ConcurrentHashMap<K, DemuxBranchNode<K, A>>,
+ private val branchNodeByKey: ConcurrentHashMap<K, DemuxNode<K, A>.BranchNode>,
val lifecycle: DemuxLifecycle<K, A>,
private val spec: DemuxActivator<K, A>,
) : SchedulableNode {
@@ -44,25 +36,23 @@ internal class DemuxNode<K, A>(
lateinit var upstreamConnection: NodeConnection<Map<K, A>>
- fun getAndMaybeAddDownstream(key: K): DemuxBranchNode<K, A> =
- branchNodeByKey.getOrPut(key) { DemuxBranchNode(key, this) }
+ @Volatile private var epoch: Long = Long.MIN_VALUE
+
+ suspend fun hasCurrentValueLocked(evalScope: EvalScope, key: K): Boolean =
+ evalScope.epoch == epoch && upstreamConnection.getPushEvent(evalScope).contains(key)
+
+ suspend fun hasCurrentValue(evalScope: EvalScope, key: K): Boolean =
+ mutex.withLock { hasCurrentValueLocked(evalScope, key) }
- override suspend fun schedule(evalScope: EvalScope) {
+ fun getAndMaybeAddDownstream(key: K): BranchNode =
+ branchNodeByKey.getOrPut(key) { BranchNode(key) }
+
+ override suspend fun schedule(evalScope: EvalScope) = coroutineScope {
val upstreamResult = upstreamConnection.getPushEvent(evalScope)
- if (upstreamResult is Just) {
- coroutineScope {
- val outerScope = this
- mutex.withLock {
- coroutineScope {
- for ((key, _) in upstreamResult.value) {
- launch {
- branchNodeByKey[key]?.let { branch ->
- outerScope.launch { branch.schedule(evalScope) }
- }
- }
- }
- }
- }
+ mutex.withLock {
+ updateEpoch(evalScope)
+ for ((key, _) in upstreamResult) {
+ branchNodeByKey[key]?.let { branch -> launch { branch.schedule(evalScope) } }
}
}
}
@@ -194,58 +184,63 @@ internal class DemuxNode<K, A>(
upstreamConnection.removeDownstreamAndDeactivateIfNeeded(downstream = schedulable)
}
}
-}
-internal class DemuxBranchNode<K, A>(val key: K, private val demuxNode: DemuxNode<K, A>) :
- PushNode<A> {
+ fun updateEpoch(evalScope: EvalScope) {
+ epoch = evalScope.epoch
+ }
- private val mutex = Mutex()
+ suspend fun getPushEvent(evalScope: EvalScope, key: K): A =
+ upstreamConnection.getPushEvent(evalScope).getValue(key)
- val downstreamSet = DownstreamSet()
+ inner class BranchNode(val key: K) : PushNode<A> {
- override val depthTracker: DepthTracker
- get() = demuxNode.upstreamConnection.depthTracker
+ private val mutex = Mutex()
- override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean =
- demuxNode.upstreamConnection.hasCurrentValue(transactionStore)
+ val downstreamSet = DownstreamSet()
- override suspend fun getPushEvent(evalScope: EvalScope): Maybe<A> =
- demuxNode.upstreamConnection.getPushEvent(evalScope).flatMap { it.getMaybe(key) }
+ override val depthTracker: DepthTracker
+ get() = upstreamConnection.depthTracker
- override suspend fun addDownstream(downstream: Schedulable) {
- mutex.withLock { downstreamSet.add(downstream) }
- }
+ override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean =
+ hasCurrentValue(evalScope, key)
- override suspend fun removeDownstream(downstream: Schedulable) {
- mutex.withLock { downstreamSet.remove(downstream) }
- }
+ override suspend fun getPushEvent(evalScope: EvalScope): A = getPushEvent(evalScope, key)
- override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {
- val canDeactivate =
- mutex.withLock {
- downstreamSet.remove(downstream)
- downstreamSet.isEmpty()
+ override suspend fun addDownstream(downstream: Schedulable) {
+ mutex.withLock { downstreamSet.add(downstream) }
+ }
+
+ override suspend fun removeDownstream(downstream: Schedulable) {
+ mutex.withLock { downstreamSet.remove(downstream) }
+ }
+
+ override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {
+ val canDeactivate =
+ mutex.withLock {
+ downstreamSet.remove(downstream)
+ downstreamSet.isEmpty()
+ }
+ if (canDeactivate) {
+ removeDownstreamAndDeactivateIfNeeded(key)
}
- if (canDeactivate) {
- demuxNode.removeDownstreamAndDeactivateIfNeeded(key)
}
- }
- override suspend fun deactivateIfNeeded() {
- if (mutex.withLock { downstreamSet.isEmpty() }) {
- demuxNode.removeDownstreamAndDeactivateIfNeeded(key)
+ override suspend fun deactivateIfNeeded() {
+ if (mutex.withLock { downstreamSet.isEmpty() }) {
+ removeDownstreamAndDeactivateIfNeeded(key)
+ }
}
- }
- override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {
- if (mutex.withLock { downstreamSet.isEmpty() }) {
- evalScope.scheduleDeactivation(this)
+ override suspend fun scheduleDeactivationIfNeeded(evalScope: EvalScope) {
+ if (mutex.withLock { downstreamSet.isEmpty() }) {
+ evalScope.scheduleDeactivation(this)
+ }
}
- }
- suspend fun schedule(evalScope: EvalScope) {
- if (!coroutineScope { mutex.withLock { scheduleAll(downstreamSet, evalScope) } }) {
- evalScope.scheduleDeactivation(this)
+ suspend fun schedule(evalScope: EvalScope) {
+ if (!coroutineScope { mutex.withLock { scheduleAll(downstreamSet, evalScope) } }) {
+ evalScope.scheduleDeactivation(this)
+ }
}
}
}
@@ -254,30 +249,39 @@ internal fun <K, A> DemuxImpl(
upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>,
numKeys: Int?,
): DemuxImpl<K, A> =
- DemuxImpl(
- DemuxLifecycle(
- object : DemuxActivator<K, A> {
- override suspend fun activate(
- evalScope: EvalScope,
- lifecycle: DemuxLifecycle<K, A>,
- ): Pair<DemuxNode<K, A>, Boolean>? {
- val dmux = DemuxNode(ConcurrentHashMap(numKeys ?: 16), lifecycle, this)
- return upstream
- .invoke(evalScope)
- .activate(evalScope, downstream = dmux.schedulable)
- ?.let { (conn, needsEval) ->
- dmux.apply { upstreamConnection = conn } to needsEval
- }
- }
+ DemuxImpl(DemuxLifecycle(DemuxLifecycleState.Inactive(DemuxActivator(numKeys, upstream))))
+
+internal class DemuxActivator<K, A>(
+ private val numKeys: Int?,
+ private val upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>,
+) {
+ suspend fun activate(
+ evalScope: EvalScope,
+ lifecycle: DemuxLifecycle<K, A>,
+ ): Pair<DemuxNode<K, A>, Set<K>>? {
+ val demux = DemuxNode(ConcurrentHashMap(numKeys ?: 16), lifecycle, this)
+ return upstream
+ .invoke(evalScope)
+ .activate(evalScope, downstream = demux.schedulable)
+ ?.let { (conn, needsEval) ->
+ Pair(
+ demux.apply { upstreamConnection = conn },
+ if (needsEval) {
+ demux.updateEpoch(evalScope)
+ conn.getPushEvent(evalScope).keys
+ } else {
+ emptySet()
+ },
+ )
}
- )
- )
+ }
+}
internal class DemuxImpl<in K, out A>(private val dmux: DemuxLifecycle<K, A>) {
fun eventsForKey(key: K): TFlowImpl<A> = TFlowCheap { downstream ->
dmux.activate(evalScope = this, key)?.let { (branchNode, needsEval) ->
branchNode.addDownstream(downstream)
- val branchNeedsEval = needsEval && branchNode.getPushEvent(evalScope = this) is Just
+ val branchNeedsEval = needsEval && branchNode.hasCurrentValue(evalScope = this)
ActivationResult(
connection = NodeConnection(branchNode, branchNode),
needsEval = branchNeedsEval,
@@ -291,38 +295,29 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle
override fun toString(): String = "TFlowDmuxState[$hashString][$lifecycleState][$mutex]"
- suspend fun activate(evalScope: EvalScope, key: K): Pair<DemuxBranchNode<K, A>, Boolean>? =
- coroutineScope {
- mutex
- .withLock {
- when (val state = lifecycleState) {
- is DemuxLifecycleState.Dead -> null
- is DemuxLifecycleState.Active ->
- state.node.getAndMaybeAddDownstream(key) to
- async {
- state.node.upstreamConnection.hasCurrentValue(
- evalScope.transactionStore
- )
- }
- is DemuxLifecycleState.Inactive -> {
- state.spec
- .activate(evalScope, this@DemuxLifecycle)
- .also { result ->
- lifecycleState =
- if (result == null) {
- DemuxLifecycleState.Dead
- } else {
- DemuxLifecycleState.Active(result.first)
- }
- }
- ?.let { (node, needsEval) ->
- node.getAndMaybeAddDownstream(key) to
- CompletableDeferred(needsEval)
+ suspend fun activate(evalScope: EvalScope, key: K): Pair<DemuxNode<K, A>.BranchNode, Boolean>? =
+ mutex.withLock {
+ when (val state = lifecycleState) {
+ is DemuxLifecycleState.Dead -> null
+ is DemuxLifecycleState.Active ->
+ state.node.getAndMaybeAddDownstream(key) to
+ state.node.hasCurrentValueLocked(evalScope, key)
+ is DemuxLifecycleState.Inactive -> {
+ state.spec
+ .activate(evalScope, this@DemuxLifecycle)
+ .also { result ->
+ lifecycleState =
+ if (result == null) {
+ DemuxLifecycleState.Dead
+ } else {
+ DemuxLifecycleState.Active(result.first)
}
}
- }
+ ?.let { (node, needsEval) ->
+ node.getAndMaybeAddDownstream(key) to (key in needsEval)
+ }
}
- ?.let { (branch, result) -> branch to result.await() }
+ }
}
}
@@ -337,13 +332,3 @@ internal sealed interface DemuxLifecycleState<out K, out A> {
data object Dead : DemuxLifecycleState<Nothing, Nothing>
}
-
-internal interface DemuxActivator<K, A> {
- suspend fun activate(
- evalScope: EvalScope,
- lifecycle: DemuxLifecycle<K, A>,
- ): Pair<DemuxNode<K, A>, Boolean>?
-}
-
-internal inline fun <K, A> DemuxLifecycle(onSubscribe: DemuxActivator<K, A>) =
- DemuxLifecycle(DemuxLifecycleState.Inactive(onSubscribe))
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt
index 815473fe900f..afbd7120653c 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt
@@ -57,7 +57,7 @@ internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope)
TStateInit(
constInit(
"now",
- mkState(
+ activatedTStateSource(
"now",
"now",
this,
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt
index bc06a3679d5c..030119394ac0 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt
@@ -21,14 +21,12 @@ import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.just
import com.android.systemui.kairos.util.none
-internal inline fun <A, B> mapMaybeNode(
- crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>,
- crossinline f: suspend EvalScope.(A) -> Maybe<B>,
-): TFlowImpl<B> {
- return DemuxImpl(
+internal inline fun <A> filterJustImpl(
+ crossinline getPulse: suspend EvalScope.() -> TFlowImpl<Maybe<A>>
+): TFlowImpl<A> =
+ DemuxImpl(
{
- mapImpl(getPulse) {
- val maybeResult = f(it)
+ mapImpl(getPulse) { maybeResult ->
if (maybeResult is Just) {
mapOf(Unit to maybeResult.value)
} else {
@@ -39,9 +37,8 @@ internal inline fun <A, B> mapMaybeNode(
numKeys = 1,
)
.eventsForKey(Unit)
-}
-internal inline fun <A> filterNode(
+internal inline fun <A> filterImpl(
crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>,
crossinline f: suspend EvalScope.(A) -> Boolean,
-): TFlowImpl<A> = mapMaybeNode(getPulse) { if (f(it)) just(it) else none }
+): TFlowImpl<A> = filterJustImpl { mapImpl(getPulse) { if (f(it)) just(it) else none }.cached() }
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt
index 8efaf79b18b2..1edc8c28b2ee 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Inputs.kt
@@ -17,8 +17,6 @@
package com.android.systemui.kairos.internal
import com.android.systemui.kairos.internal.util.Key
-import com.android.systemui.kairos.util.Maybe
-import com.android.systemui.kairos.util.just
import java.util.concurrent.atomic.AtomicBoolean
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Mutex
@@ -29,16 +27,18 @@ internal class InputNode<A>(
private val deactivate: () -> Unit = {},
) : PushNode<A>, Key<A> {
- internal val downstreamSet = DownstreamSet()
+ private val downstreamSet = DownstreamSet()
private val mutex = Mutex()
private val activated = AtomicBoolean(false)
+ @Volatile private var epoch: Long = Long.MIN_VALUE
+
override val depthTracker: DepthTracker = DepthTracker()
- override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean =
- transactionStore.contains(this)
+ override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = epoch == evalScope.epoch
suspend fun visit(evalScope: EvalScope, value: A) {
+ epoch = evalScope.epoch
evalScope.setResult(this, value)
coroutineScope {
if (!mutex.withLock { scheduleAll(downstreamSet, evalScope) }) {
@@ -90,8 +90,7 @@ internal class InputNode<A>(
}
}
- override suspend fun getPushEvent(evalScope: EvalScope): Maybe<A> =
- evalScope.getCurrentValue(this)
+ override suspend fun getPushEvent(evalScope: EvalScope): A = evalScope.getCurrentValue(this)
}
internal fun <A> InputNode<A>.activated() = TFlowCheap { downstream ->
@@ -104,7 +103,7 @@ internal data object AlwaysNode : PushNode<Unit> {
override val depthTracker = DepthTracker()
- override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean = true
+ override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = true
override suspend fun removeDownstream(downstream: Schedulable) {}
@@ -116,5 +115,5 @@ internal data object AlwaysNode : PushNode<Unit> {
override suspend fun removeDownstreamAndDeactivateIfNeeded(downstream: Schedulable) {}
- override suspend fun getPushEvent(evalScope: EvalScope): Maybe<Unit> = just(Unit)
+ override suspend fun getPushEvent(evalScope: EvalScope) = Unit
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt
index 69994ba6e866..69ecafd26ba2 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/InternalScopes.kt
@@ -22,7 +22,6 @@ import com.android.systemui.kairos.FrpTransactionScope
import com.android.systemui.kairos.TFlow
import com.android.systemui.kairos.internal.util.HeteroMap
import com.android.systemui.kairos.internal.util.Key
-import com.android.systemui.kairos.util.Maybe
internal interface InitScope {
val networkId: Any
@@ -75,6 +74,7 @@ internal fun <A> NetworkScope.setResult(node: Key<A>, result: A) {
transactionStore[node] = result
}
-internal fun <A> NetworkScope.getCurrentValue(key: Key<A>): Maybe<A> = transactionStore[key]
+internal fun <A> NetworkScope.getCurrentValue(key: Key<A>): A =
+ transactionStore.getOrError(key) { "No value for $key in transaction $epoch" }
internal fun NetworkScope.hasCurrentValue(key: Key<*>): Boolean = transactionStore.contains(key)
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt
index af68a1e3d83c..1fc5470ef354 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Mux.kt
@@ -20,13 +20,14 @@ package com.android.systemui.kairos.internal
import com.android.systemui.kairos.internal.util.ConcurrentNullableHashMap
import com.android.systemui.kairos.internal.util.hashString
-import com.android.systemui.kairos.util.Just
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
-/** Base class for muxing nodes, which have a potentially dynamic collection of upstream nodes. */
+internal typealias MuxResult<K, V> = Map<K, PullNode<V>>
+
+/** Base class for muxing nodes, which have a (potentially dynamic) collection of upstream nodes. */
internal sealed class MuxNode<K : Any, V, Output>(val lifecycle: MuxLifecycle<Output>) :
PushNode<Output> {
@@ -34,13 +35,22 @@ internal sealed class MuxNode<K : Any, V, Output>(val lifecycle: MuxLifecycle<Ou
get() = lifecycle.mutex
// TODO: preserve insertion order?
- val upstreamData = ConcurrentNullableHashMap<K, V>()
+ val upstreamData = ConcurrentNullableHashMap<K, PullNode<V>>()
val switchedIn = ConcurrentHashMap<K, MuxBranchNode<K, V>>()
val downstreamSet: DownstreamSet = DownstreamSet()
// TODO: inline DepthTracker? would need to be added to PushNode signature
final override val depthTracker = DepthTracker()
+ @Volatile
+ var epoch: Long = Long.MIN_VALUE
+ protected set
+
+ inline fun hasCurrentValueLocked(evalScope: EvalScope): Boolean = epoch == evalScope.epoch
+
+ override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean =
+ mutex.withLock { hasCurrentValueLocked(evalScope) }
+
final override suspend fun addDownstream(downstream: Schedulable) {
mutex.withLock { addDownstreamLocked(downstream) }
}
@@ -187,8 +197,6 @@ internal sealed class MuxNode<K : Any, V, Output>(val lifecycle: MuxLifecycle<Ou
}
}
- abstract fun hasCurrentValueLocked(transactionStore: TransactionStore): Boolean
-
fun schedule(evalScope: EvalScope) {
// TODO: Potential optimization
// Detect if this node is guaranteed to have a single upstream within this transaction,
@@ -207,11 +215,8 @@ internal class MuxBranchNode<K : Any, V>(private val muxNode: MuxNode<K, V, *>,
@Volatile lateinit var upstream: NodeConnection<V>
override suspend fun schedule(evalScope: EvalScope) {
- val upstreamResult = upstream.getPushEvent(evalScope)
- if (upstreamResult is Just) {
- muxNode.upstreamData[key] = upstreamResult.value
- muxNode.schedule(evalScope)
- }
+ muxNode.upstreamData[key] = upstream.directUpstream
+ muxNode.schedule(evalScope)
}
override suspend fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) {
@@ -288,7 +293,7 @@ internal class MuxLifecycle<A>(@Volatile var lifecycleState: MuxLifecycleState<A
state.node.addDownstreamLocked(downstream)
ActivationResult(
connection = NodeConnection(state.node, state.node),
- needsEval = state.node.hasCurrentValueLocked(evalScope.transactionStore),
+ needsEval = state.node.hasCurrentValueLocked(evalScope),
)
}
is MuxLifecycleState.Inactive -> {
@@ -332,3 +337,6 @@ internal interface MuxActivator<A> {
internal inline fun <A> MuxLifecycle(onSubscribe: MuxActivator<A>): TFlowImpl<A> =
MuxLifecycle(MuxLifecycleState.Inactive(onSubscribe))
+
+internal fun <K, V> TFlowImpl<MuxResult<K, V>>.awaitValues(): TFlowImpl<Map<K, V>> =
+ mapImpl({ this@awaitValues }) { results -> results.mapValues { it.value.getPushEvent(this) } }
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt
index 3b9502a5d812..338ee0145530 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxDeferred.kt
@@ -33,7 +33,7 @@ import com.android.systemui.kairos.util.just
import com.android.systemui.kairos.util.maybeThat
import com.android.systemui.kairos.util.maybeThis
import com.android.systemui.kairos.util.merge
-import com.android.systemui.kairos.util.orElseGet
+import com.android.systemui.kairos.util.orError
import com.android.systemui.kairos.util.partitionEithers
import com.android.systemui.kairos.util.these
import java.util.TreeMap
@@ -42,21 +42,15 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.withLock
internal class MuxDeferredNode<K : Any, V>(
- lifecycle: MuxLifecycle<Map<K, V>>,
- val spec: MuxActivator<Map<K, V>>,
-) : MuxNode<K, V, Map<K, V>>(lifecycle), Key<Map<K, V>> {
+ lifecycle: MuxLifecycle<Map<K, PullNode<V>>>,
+ val spec: MuxActivator<Map<K, PullNode<V>>>,
+) : MuxNode<K, V, Map<K, PullNode<V>>>(lifecycle), Key<Map<K, PullNode<V>>> {
val schedulable = Schedulable.M(this)
@Volatile var patches: NodeConnection<Map<K, Maybe<TFlowImpl<V>>>>? = null
@Volatile var patchData: Map<K, Maybe<TFlowImpl<V>>>? = null
- override fun hasCurrentValueLocked(transactionStore: TransactionStore): Boolean =
- transactionStore.contains(this)
-
- override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean =
- mutex.withLock { hasCurrentValueLocked(transactionStore) }
-
override suspend fun visit(evalScope: EvalScope) {
val result = upstreamData.toMap()
upstreamData.clear()
@@ -74,6 +68,7 @@ internal class MuxDeferredNode<K : Any, V>(
)
}
if (scheduleDownstream) {
+ epoch = evalScope.epoch
evalScope.setResult(this@MuxDeferredNode, result)
if (!scheduleAll(downstreamSet, evalScope)) {
evalScope.scheduleDeactivation(this@MuxDeferredNode)
@@ -84,7 +79,7 @@ internal class MuxDeferredNode<K : Any, V>(
}
}
- override suspend fun getPushEvent(evalScope: EvalScope): Maybe<Map<K, V>> =
+ override suspend fun getPushEvent(evalScope: EvalScope): Map<K, PullNode<V>> =
evalScope.getCurrentValue(key = this)
private suspend fun compactIfNeeded(evalScope: EvalScope) {
@@ -282,7 +277,6 @@ internal class MuxDeferredNode<K : Any, V>(
patchData =
checkNotNull(patches) { "mux mover scheduled with unset patches upstream node" }
.getPushEvent(evalScope)
- .orElseGet { null }
evalScope.scheduleMuxMover(this)
}
@@ -299,20 +293,20 @@ internal inline fun <A> switchDeferredImplSingle(
getPatches = { mapImpl(getPatches) { newFlow -> mapOf(Unit to just(newFlow)) } },
)
}) { map ->
- map.getValue(Unit)
+ map.getValue(Unit).getPushEvent(this)
}
-internal fun <K : Any, A> switchDeferredImpl(
- getStorage: suspend EvalScope.() -> Map<K, TFlowImpl<A>>,
- getPatches: suspend EvalScope.() -> TFlowImpl<Map<K, Maybe<TFlowImpl<A>>>>,
-): TFlowImpl<Map<K, A>> =
+internal fun <K : Any, V> switchDeferredImpl(
+ getStorage: suspend EvalScope.() -> Map<K, TFlowImpl<V>>,
+ getPatches: suspend EvalScope.() -> TFlowImpl<Map<K, Maybe<TFlowImpl<V>>>>,
+): TFlowImpl<Map<K, PullNode<V>>> =
MuxLifecycle(
- object : MuxActivator<Map<K, A>> {
+ object : MuxActivator<Map<K, PullNode<V>>> {
override suspend fun activate(
evalScope: EvalScope,
- lifecycle: MuxLifecycle<Map<K, A>>,
- ): MuxNode<*, *, Map<K, A>>? {
- val storage: Map<K, TFlowImpl<A>> = getStorage(evalScope)
+ lifecycle: MuxLifecycle<Map<K, PullNode<V>>>,
+ ): MuxNode<*, *, Map<K, PullNode<V>>>? {
+ val storage: Map<K, TFlowImpl<V>> = getStorage(evalScope)
// Initialize mux node and switched-in connections.
val muxNode =
MuxDeferredNode(lifecycle, this).apply {
@@ -324,10 +318,7 @@ internal fun <K : Any, A> switchDeferredImpl(
.apply { upstream = conn }
.also {
if (needsEval) {
- val result = conn.getPushEvent(evalScope)
- if (result is Just) {
- upstreamData[key] = result.value
- }
+ upstreamData[key] = conn.directUpstream
}
}
}
@@ -398,11 +389,8 @@ internal fun <K : Any, A> switchDeferredImpl(
// Schedule mover to process patch emission at the end of this transaction, if
// needed.
if (needsEval) {
- val result = patchesConn.getPushEvent(evalScope)
- if (result is Just) {
- muxNode.patchData = result.value
- evalScope.scheduleMuxMover(muxNode)
- }
+ muxNode.patchData = patchesConn.getPushEvent(evalScope)
+ evalScope.scheduleMuxMover(muxNode)
}
}
@@ -440,9 +428,9 @@ internal inline fun <A, B> mergeNodes(
val switchNode = switchDeferredImpl(getStorage = { storage }, getPatches = { neverImpl })
val merged =
mapImpl({ switchNode }) { mergeResults ->
- val first = mergeResults.getMaybe(0).flatMap { it.maybeThis() }
- val second = mergeResults.getMaybe(1).flatMap { it.maybeThat() }
- these(first, second).orElseGet { error("unexpected missing merge result") }
+ val first = mergeResults.getMaybe(0).flatMap { it.getPushEvent(this).maybeThis() }
+ val second = mergeResults.getMaybe(1).flatMap { it.getPushEvent(this).maybeThat() }
+ these(first, second).orError { "unexpected missing merge result" }
}
return merged.cached()
}
@@ -455,7 +443,10 @@ internal inline fun <A> mergeNodes(
getStorage = { getPulses().associateByIndexTo(TreeMap()) },
getPatches = { neverImpl },
)
- val merged = mapImpl({ switchNode }) { mergeResults -> mergeResults.values.toList() }
+ val merged =
+ mapImpl({ switchNode }) { mergeResults ->
+ mergeResults.values.map { it.getPushEvent(this) }
+ }
return merged.cached()
}
@@ -468,6 +459,8 @@ internal inline fun <A> mergeNodesLeft(
getPatches = { neverImpl },
)
val merged =
- mapImpl({ switchNode }) { mergeResults: Map<Int, A> -> mergeResults.values.first() }
+ mapImpl({ switchNode }) { mergeResults: Map<Int, PullNode<A>> ->
+ mergeResults.values.first().getPushEvent(this)
+ }
return merged.cached()
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt
index b291c879b449..dd0357b0413d 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/MuxPrompt.kt
@@ -25,36 +25,26 @@ import com.android.systemui.kairos.util.Left
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.None
import com.android.systemui.kairos.util.Right
-import com.android.systemui.kairos.util.filterJust
-import com.android.systemui.kairos.util.map
import com.android.systemui.kairos.util.partitionEithers
import kotlinx.coroutines.CoroutineScope
-import kotlinx.coroutines.async
-import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.withLock
+private typealias MuxPromptMovingResult<K, V> = Pair<MuxResult<K, V>, MuxResult<K, V>?>
+
internal class MuxPromptMovingNode<K : Any, V>(
- lifecycle: MuxLifecycle<Pair<Map<K, V>, Map<K, PullNode<V>>?>>,
- private val spec: MuxActivator<Pair<Map<K, V>, Map<K, PullNode<V>>?>>,
-) :
- MuxNode<K, V, Pair<Map<K, V>, Map<K, PullNode<V>>?>>(lifecycle),
- Key<Pair<Map<K, V>, Map<K, PullNode<V>>?>> {
+ lifecycle: MuxLifecycle<MuxPromptMovingResult<K, V>>,
+ private val spec: MuxActivator<MuxPromptMovingResult<K, V>>,
+) : MuxNode<K, V, MuxPromptMovingResult<K, V>>(lifecycle), Key<MuxPromptMovingResult<K, V>> {
@Volatile var patchData: Map<K, Maybe<TFlowImpl<V>>>? = null
@Volatile var patches: MuxPromptPatchNode<K, V>? = null
- @Volatile private var reEval: Pair<Map<K, V>, Map<K, PullNode<V>>?>? = null
-
- override fun hasCurrentValueLocked(transactionStore: TransactionStore): Boolean =
- transactionStore.contains(this)
-
- override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean =
- mutex.withLock { hasCurrentValueLocked(transactionStore) }
+ @Volatile private var reEval: MuxPromptMovingResult<K, V>? = null
override suspend fun visit(evalScope: EvalScope) {
- val preSwitchResults: Map<K, V> = upstreamData.toMap()
+ val preSwitchResults: MuxResult<K, V> = upstreamData.toMap()
upstreamData.clear()
val patch: Map<K, Maybe<TFlowImpl<V>>>? = patchData
@@ -85,6 +75,7 @@ internal class MuxPromptMovingNode<K : Any, V>(
adjustDownstreamDepths(evalScope, coroutineScope = this)
}
if (evalResult != null) {
+ epoch = evalScope.epoch
evalScope.setResult(this@MuxPromptMovingNode, evalResult)
if (!scheduleAll(downstreamSet, evalScope)) {
evalScope.scheduleDeactivation(this@MuxPromptMovingNode)
@@ -97,11 +88,11 @@ internal class MuxPromptMovingNode<K : Any, V>(
}
private suspend fun doEval(
- preSwitchResults: Map<K, V>,
+ preSwitchResults: MuxResult<K, V>,
patch: Map<K, Maybe<TFlowImpl<V>>>?,
evalScope: EvalScope,
- ): Pair<Boolean, Pair<Map<K, V>, Map<K, PullNode<V>>?>?> {
- val newlySwitchedIn: Map<K, PullNode<V>>? =
+ ): Pair<Boolean, MuxPromptMovingResult<K, V>?> {
+ val newlySwitchedIn: MuxResult<K, V>? =
patch?.let {
// We have a patch, process additions/updates and removals
val (adds, removes) =
@@ -194,10 +185,7 @@ internal class MuxPromptMovingNode<K : Any, V>(
}
}
- private suspend fun adjustDownstreamDepths(
- evalScope: EvalScope,
- coroutineScope: CoroutineScope,
- ) {
+ private fun adjustDownstreamDepths(evalScope: EvalScope, coroutineScope: CoroutineScope) {
if (depthTracker.dirty_depthIncreased()) {
// schedule downstream nodes on the compaction scheduler; this scheduler is drained at
// the end of this eval depth, so that all depth increases are applied before we advance
@@ -215,9 +203,8 @@ internal class MuxPromptMovingNode<K : Any, V>(
}
}
- override suspend fun getPushEvent(
- evalScope: EvalScope
- ): Maybe<Pair<Map<K, V>, Map<K, PullNode<V>>?>> = evalScope.getCurrentValue(key = this)
+ override suspend fun getPushEvent(evalScope: EvalScope): MuxPromptMovingResult<K, V> =
+ evalScope.getCurrentValue(key = this)
override suspend fun doDeactivate() {
// Update lifecycle
@@ -264,18 +251,11 @@ internal class MuxPromptMovingNode<K : Any, V>(
}
internal class MuxPromptEvalNode<K, V>(
- private val movingNode: PullNode<Pair<Map<K, V>, Map<K, PullNode<V>>?>>
-) : PullNode<Map<K, V>> {
- override suspend fun getPushEvent(evalScope: EvalScope): Maybe<Map<K, V>> =
- movingNode.getPushEvent(evalScope).map { (preSwitchResults, newlySwitchedIn) ->
- coroutineScope {
- newlySwitchedIn
- ?.map { (k, v) -> async { v.getPushEvent(evalScope).map { k to it } } }
- ?.awaitAll()
- ?.asSequence()
- ?.filterJust()
- ?.toMap(preSwitchResults.toMutableMap()) ?: preSwitchResults
- }
+ private val movingNode: PullNode<MuxPromptMovingResult<K, V>>
+) : PullNode<MuxResult<K, V>> {
+ override suspend fun getPushEvent(evalScope: EvalScope): MuxResult<K, V> =
+ movingNode.getPushEvent(evalScope).let { (preSwitchResults, newlySwitchedIn) ->
+ newlySwitchedIn?.toMap(preSwitchResults.toMutableMap()) ?: preSwitchResults
}
}
@@ -288,11 +268,8 @@ internal class MuxPromptPatchNode<K : Any, V>(private val muxNode: MuxPromptMovi
lateinit var upstream: NodeConnection<Map<K, Maybe<TFlowImpl<V>>>>
override suspend fun schedule(evalScope: EvalScope) {
- val upstreamResult = upstream.getPushEvent(evalScope)
- if (upstreamResult is Just) {
- muxNode.patchData = upstreamResult.value
- muxNode.schedule(evalScope)
- }
+ muxNode.patchData = upstream.getPushEvent(evalScope)
+ muxNode.schedule(evalScope)
}
override suspend fun adjustDirectUpstream(scheduler: Scheduler, oldDepth: Int, newDepth: Int) {
@@ -350,18 +327,18 @@ internal class MuxPromptPatchNode<K : Any, V>(private val muxNode: MuxPromptMovi
}
}
-internal fun <K : Any, A> switchPromptImpl(
- getStorage: suspend EvalScope.() -> Map<K, TFlowImpl<A>>,
- getPatches: suspend EvalScope.() -> TFlowImpl<Map<K, Maybe<TFlowImpl<A>>>>,
-): TFlowImpl<Map<K, A>> {
+internal fun <K : Any, V> switchPromptImpl(
+ getStorage: suspend EvalScope.() -> Map<K, TFlowImpl<V>>,
+ getPatches: suspend EvalScope.() -> TFlowImpl<Map<K, Maybe<TFlowImpl<V>>>>,
+): TFlowImpl<MuxResult<K, V>> {
val moving =
MuxLifecycle(
- object : MuxActivator<Pair<Map<K, A>, Map<K, PullNode<A>>?>> {
+ object : MuxActivator<MuxPromptMovingResult<K, V>> {
override suspend fun activate(
evalScope: EvalScope,
- lifecycle: MuxLifecycle<Pair<Map<K, A>, Map<K, PullNode<A>>?>>,
- ): MuxNode<*, *, Pair<Map<K, A>, Map<K, PullNode<A>>?>>? {
- val storage: Map<K, TFlowImpl<A>> = getStorage(evalScope)
+ lifecycle: MuxLifecycle<MuxPromptMovingResult<K, V>>,
+ ): MuxNode<*, *, MuxPromptMovingResult<K, V>>? {
+ val storage: Map<K, TFlowImpl<V>> = getStorage(evalScope)
// Initialize mux node and switched-in connections.
val movingNode =
MuxPromptMovingNode(lifecycle, this).apply {
@@ -379,11 +356,7 @@ internal fun <K : Any, A> switchPromptImpl(
.apply { upstream = conn }
.also {
if (needsEval) {
- val result =
- conn.getPushEvent(evalScope)
- if (result is Just) {
- upstreamData[key] = result.value
- }
+ upstreamData[key] = conn.directUpstream
}
}
}
@@ -401,10 +374,7 @@ internal fun <K : Any, A> switchPromptImpl(
patches = patchNode
if (needsEval) {
- val result = conn.getPushEvent(evalScope)
- if (result is Just) {
- patchData = result.value
- }
+ patchData = conn.getPushEvent(evalScope)
}
}
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt
index 599b18695034..cc36fda0c3ae 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt
@@ -141,7 +141,10 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope {
} while (evalScope { evalOutputs(this) })
// Update states
evalScope { evalStateWriters(this) }
+ // Invalidate caches
+ // Note: this needs to occur before deferred switches
transactionStore.clear()
+ epoch++
// Perform deferred switches
evalScope { evalMuxMovers(this) }
// Compact depths
@@ -149,7 +152,6 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope {
compactor.drainCompact()
// Deactivate nodes with no downstream
evalDeactivations()
- epoch++
}
/** Invokes all [Output]s that have received data within this transaction. */
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt
index 000240796a82..b9bef059d4b0 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/NodeTypes.kt
@@ -16,8 +16,6 @@
package com.android.systemui.kairos.internal
-import com.android.systemui.kairos.util.Maybe
-
/*
Dmux
Muxes + Branch
@@ -68,7 +66,7 @@ internal sealed interface PullNode<out A> {
* will read from the cache, otherwise it will perform a full evaluation, even if invoked
* multiple times within a transaction.
*/
- suspend fun getPushEvent(evalScope: EvalScope): Maybe<A>
+ suspend fun getPushEvent(evalScope: EvalScope): A
}
/*
@@ -76,7 +74,7 @@ Muxes + DmuxBranch
*/
internal sealed interface PushNode<A> : PullNode<A> {
- suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean
+ suspend fun hasCurrentValue(evalScope: EvalScope): Boolean
val depthTracker: DepthTracker
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt
index a3af2d304f7f..3373de05249c 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Output.kt
@@ -16,7 +16,6 @@
package com.android.systemui.kairos.internal
-import com.android.systemui.kairos.util.Just
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@@ -37,7 +36,7 @@ internal class Output<A>(
suspend fun visit(evalScope: EvalScope) {
val upstreamResult = result
check(upstreamResult !== NoResult) { "output visited with null upstream result" }
- result = null
+ result = NoResult
@Suppress("UNCHECKED_CAST") evalScope.onEmit(upstreamResult as A)
}
@@ -46,12 +45,9 @@ internal class Output<A>(
}
suspend fun schedule(evalScope: EvalScope) {
- val upstreamResult =
+ result =
checkNotNull(upstream) { "output scheduled with null upstream" }.getPushEvent(evalScope)
- if (upstreamResult is Just) {
- result = upstreamResult.value
- evalScope.scheduleOutput(this)
- }
+ evalScope.scheduleOutput(this)
}
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt
index dac98e0e807c..43b621fadc67 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/PullNodes.kt
@@ -17,8 +17,6 @@
package com.android.systemui.kairos.internal
import com.android.systemui.kairos.internal.util.Key
-import com.android.systemui.kairos.util.Maybe
-import com.android.systemui.kairos.util.map
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Deferred
@@ -26,8 +24,8 @@ internal val neverImpl: TFlowImpl<Nothing> = TFlowCheap { null }
internal class MapNode<A, B>(val upstream: PullNode<A>, val transform: suspend EvalScope.(A) -> B) :
PullNode<B> {
- override suspend fun getPushEvent(evalScope: EvalScope): Maybe<B> =
- upstream.getPushEvent(evalScope).map { evalScope.transform(it) }
+ override suspend fun getPushEvent(evalScope: EvalScope): B =
+ evalScope.transform(upstream.getPushEvent(evalScope))
}
internal inline fun <A, B> mapImpl(
@@ -46,9 +44,8 @@ internal inline fun <A, B> mapImpl(
}
}
-internal class CachedNode<A>(val key: Key<Deferred<Maybe<A>>>, val upstream: PullNode<A>) :
- PullNode<A> {
- override suspend fun getPushEvent(evalScope: EvalScope): Maybe<A> {
+internal class CachedNode<A>(val key: Key<Deferred<A>>, val upstream: PullNode<A>) : PullNode<A> {
+ override suspend fun getPushEvent(evalScope: EvalScope): A {
val deferred =
evalScope.transactionStore.getOrPut(key) {
evalScope.deferAsync(CoroutineStart.LAZY) { upstream.getPushEvent(evalScope) }
@@ -58,7 +55,7 @@ internal class CachedNode<A>(val key: Key<Deferred<Maybe<A>>>, val upstream: Pul
}
internal fun <A> TFlowImpl<A>.cached(): TFlowImpl<A> {
- val key = object : Key<Deferred<Maybe<A>>> {}
+ val key = object : Key<Deferred<A>> {}
return TFlowCheap {
activate(this, it)?.let { (connection, needsEval) ->
ActivationResult(
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt
index baf4101d52ef..06b5b1690391 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/StateScopeImpl.kt
@@ -78,7 +78,13 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
val changes = this@toTStateInternalDeferred
val name = operatorName
val impl =
- mkState(name, operatorName, evalScope, { changes.init.connect(evalScope = this) }, init)
+ activatedTStateSource(
+ name,
+ operatorName,
+ evalScope,
+ { changes.init.connect(evalScope = this) },
+ init,
+ )
return TStateInit(constInit(name, impl))
}
@@ -102,21 +108,22 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
constInit(
name,
switchDeferredImpl(
- getStorage = {
- storage.init
- .connect(this)
- .getCurrentWithEpoch(this)
- .first
- .mapValuesParallel { (_, flow) -> flow.init.connect(this) }
- },
- getPatches = {
- mapImpl({ init.connect(this) }) { patch ->
- patch.mapValuesParallel { (_, m) ->
- m.map { flow -> flow.init.connect(this) }
+ getStorage = {
+ storage.init
+ .connect(this)
+ .getCurrentWithEpoch(this)
+ .first
+ .mapValuesParallel { (_, flow) -> flow.init.connect(this) }
+ },
+ getPatches = {
+ mapImpl({ init.connect(this) }) { patch ->
+ patch.mapValuesParallel { (_, m) ->
+ m.map { flow -> flow.init.connect(this) }
+ }
}
- }
- },
- ),
+ },
+ )
+ .awaitValues(),
)
)
}
@@ -129,21 +136,22 @@ internal class StateScopeImpl(val evalScope: EvalScope, override val endSignal:
constInit(
name,
switchPromptImpl(
- getStorage = {
- storage.init
- .connect(this)
- .getCurrentWithEpoch(this)
- .first
- .mapValuesParallel { (_, flow) -> flow.init.connect(this) }
- },
- getPatches = {
- mapImpl({ init.connect(this) }) { patch ->
- patch.mapValuesParallel { (_, m) ->
- m.map { flow -> flow.init.connect(this) }
+ getStorage = {
+ storage.init
+ .connect(this)
+ .getCurrentWithEpoch(this)
+ .first
+ .mapValuesParallel { (_, flow) -> flow.init.connect(this) }
+ },
+ getPatches = {
+ mapImpl({ init.connect(this) }) { patch ->
+ patch.mapValuesParallel { (_, m) ->
+ m.map { flow -> flow.init.connect(this) }
+ }
}
- }
- },
- ),
+ },
+ )
+ .awaitValues(),
)
)
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt
index b904b48f7f9c..784a2afe0992 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TFlowImpl.kt
@@ -16,8 +16,6 @@
package com.android.systemui.kairos.internal
-import com.android.systemui.kairos.util.Maybe
-
/* Initialized TFlow */
internal fun interface TFlowImpl<out A> {
suspend fun activate(evalScope: EvalScope, downstream: Schedulable): ActivationResult<A>?
@@ -41,9 +39,8 @@ internal data class NodeConnection<out A>(
val schedulerUpstream: PushNode<*>,
)
-internal suspend fun <A> NodeConnection<A>.hasCurrentValue(
- transactionStore: TransactionStore
-): Boolean = schedulerUpstream.hasCurrentValue(transactionStore)
+internal suspend fun <A> NodeConnection<A>.hasCurrentValue(evalScope: EvalScope): Boolean =
+ schedulerUpstream.hasCurrentValue(evalScope)
internal suspend fun <A> NodeConnection<A>.removeDownstreamAndDeactivateIfNeeded(
downstream: Schedulable
@@ -55,7 +52,7 @@ internal suspend fun <A> NodeConnection<A>.scheduleDeactivationIfNeeded(evalScop
internal suspend fun <A> NodeConnection<A>.removeDownstream(downstream: Schedulable) =
schedulerUpstream.removeDownstream(downstream)
-internal suspend fun <A> NodeConnection<A>.getPushEvent(evalScope: EvalScope): Maybe<A> =
+internal suspend fun <A> NodeConnection<A>.getPushEvent(evalScope: EvalScope): A =
directUpstream.getPushEvent(evalScope)
internal val <A> NodeConnection<A>.depthTracker: DepthTracker
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt
index c68b4c366776..c4a26a33e24d 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/TStateImpl.kt
@@ -20,7 +20,6 @@ import com.android.systemui.kairos.internal.util.Key
import com.android.systemui.kairos.internal.util.associateByIndex
import com.android.systemui.kairos.internal.util.hashString
import com.android.systemui.kairos.internal.util.mapValuesParallel
-import com.android.systemui.kairos.util.Just
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.just
import com.android.systemui.kairos.util.none
@@ -106,11 +105,8 @@ internal class TStateSource<A>(
/** called by network after eval phase has completed */
suspend fun updateState(evalScope: EvalScope) {
// write the latch
- val eventResult = upstreamConnection.getPushEvent(evalScope)
- if (eventResult is Just) {
- _current = CompletableDeferred(eventResult.value)
- writeEpoch = evalScope.epoch
- }
+ _current = CompletableDeferred(upstreamConnection.getPushEvent(evalScope))
+ writeEpoch = evalScope.epoch
}
override fun toString(): String = "TStateImpl(changes=$changes, current=$_current)"
@@ -123,7 +119,7 @@ internal class TStateSource<A>(
internal fun <A> constS(name: String?, operatorName: String, init: A): TStateImpl<A> =
TStateSource(name, operatorName, init, neverImpl)
-internal inline fun <A> mkState(
+internal inline fun <A> activatedTStateSource(
name: String?,
operatorName: String,
evalScope: EvalScope,
@@ -132,8 +128,7 @@ internal inline fun <A> mkState(
): TStateImpl<A> {
lateinit var state: TStateSource<A>
val calm: TFlowImpl<A> =
- filterNode(getChanges) { new -> new != state.getCurrentWithEpoch(evalScope = this).first }
- .cached()
+ filterImpl(getChanges) { new -> new != state.getCurrentWithEpoch(evalScope = this).first }
return TStateSource(name, operatorName, init, calm).also {
state = it
evalScope.scheduleOutput(
@@ -153,7 +148,7 @@ internal inline fun <A> mkState(
private inline fun <A> TFlowImpl<A>.calm(
crossinline getState: () -> TStateDerived<A>
): TFlowImpl<A> =
- filterNode({ this@calm }) { new ->
+ filterImpl({ this@calm }) { new ->
val state = getState()
val (current, _) = state.getCurrentWithEpoch(evalScope = this)
if (new != current) {
@@ -237,7 +232,7 @@ internal fun <A> TStateImpl<TStateImpl<A>>.flatten(name: String?, operator: Stri
getPatches = { mapImpl({ innerChanges }) { new -> mapOf(Unit to just(new)) } },
)
}) { map ->
- map.getValue(Unit)
+ map.getValue(Unit).getPushEvent(this)
}
lateinit var state: DerivedFlatten<A>
state = DerivedFlatten(name, operator, this, switchedChanges.calm { state })
@@ -352,7 +347,7 @@ internal fun <K : Any, A> zipStates(
states
.mapValues { (k, v) ->
if (k in patch) {
- patch.getValue(k)
+ patch.getValue(k).getPushEvent(this)
} else {
v.getCurrentWithEpoch(evalScope = this).first
}
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt
index 5cee2dd5880a..33709a97da8f 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/util/HeteroMap.kt
@@ -37,6 +37,17 @@ internal class HeteroMap {
store[key] = value ?: NULL
}
+ @Suppress("UNCHECKED_CAST")
+ fun <A : Any> getOrNull(key: Key<A>): A? =
+ store[key]?.let { (if (it === NULL) null else it) as A }
+
+ @Suppress("UNCHECKED_CAST")
+ fun <A> getOrError(key: Key<A>, block: () -> String): A {
+ store[key]?.let {
+ return (if (it === NULL) null else it) as A
+ } ?: error(block())
+ }
+
operator fun contains(key: Key<*>): Boolean = store.containsKey(key)
fun clear() {
diff --git a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/Maybe.kt b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/Maybe.kt
index c3cae3885bd3..eef6cbff545b 100644
--- a/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/Maybe.kt
+++ b/packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/util/Maybe.kt
@@ -230,8 +230,14 @@ fun <A> Maybe<A>.mergeWith(other: Maybe<A>, transform: (A, A) -> A): Maybe<A> =
* Returns a list containing only the present results of applying [transform] to each element in the
* original iterable.
*/
-fun <A, B> Iterable<A>.mapMaybe(transform: (A) -> Maybe<B>): List<B> =
- asSequence().mapMaybe(transform).toList()
+inline fun <A, B> Iterable<A>.mapMaybe(transform: (A) -> Maybe<B>): List<B> = buildList {
+ for (a in this@mapMaybe) {
+ val result = transform(a)
+ if (result is Just) {
+ add(result.value)
+ }
+ }
+}
/**
* Returns a sequence containing only the present results of applying [transform] to each element in
@@ -244,9 +250,15 @@ fun <A, B> Sequence<A>.mapMaybe(transform: (A) -> Maybe<B>): Sequence<B> =
* Returns a map with values of only the present results of applying [transform] to each entry in
* the original map.
*/
-inline fun <K, A, B> Map<K, A>.mapMaybeValues(
- crossinline p: (Map.Entry<K, A>) -> Maybe<B>
-): Map<K, B> = asSequence().mapMaybe { entry -> p(entry).map { entry.key to it } }.toMap()
+inline fun <K, A, B> Map<K, A>.mapMaybeValues(transform: (Map.Entry<K, A>) -> Maybe<B>): Map<K, B> =
+ buildMap {
+ for (entry in this@mapMaybeValues) {
+ val result = transform(entry)
+ if (result is Just) {
+ put(entry.key, result.value)
+ }
+ }
+ }
/** Returns a map with all non-present values filtered out. */
fun <K, A> Map<K, Maybe<A>>.filterJustValues(): Map<K, A> =