diff options
| author | 2022-08-22 12:15:52 -0400 | |
|---|---|---|
| committer | 2022-08-23 12:45:19 -0400 | |
| commit | 15dd616209c773bcdd2f821e39a442967873b5f7 (patch) | |
| tree | a6a17519d111900ccdc30f81612f74c6b2bc51a5 | |
| parent | 4cce97fd2e8d132a3c4eaec7ad95e4bd39b1c98e (diff) | |
IpcSerializer utility class
This class provides a convenient replacement for the MessageQueue +
Handler pattern commonly used to serializer incoming binder IPCs
Bug: 241121499
Test: atest IpcSerializerTest
Change-Id: I4441ea058370f282d80f6523f837752095e6a2ea
| -rw-r--r-- | packages/SystemUI/src/com/android/systemui/util/kotlin/IpcSerializer.kt | 98 | ||||
| -rw-r--r-- | packages/SystemUI/tests/src/com/android/systemui/util/kotlin/IpcSerializerTest.kt | 71 |
2 files changed, 169 insertions, 0 deletions
diff --git a/packages/SystemUI/src/com/android/systemui/util/kotlin/IpcSerializer.kt b/packages/SystemUI/src/com/android/systemui/util/kotlin/IpcSerializer.kt new file mode 100644 index 000000000000..c0331e6000bf --- /dev/null +++ b/packages/SystemUI/src/com/android/systemui/util/kotlin/IpcSerializer.kt @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.android.systemui.util.kotlin + +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking + +/** + * A utility for handling incoming IPCs from a Binder interface in the order that they are received. + * + * This class serves as a replacement for the common [android.os.Handler] message-queue pattern, + * where IPCs can arrive on arbitrary threads and are all enqueued onto a queue and processed by the + * Handler in-order. + * + * class MyService : Service() { + * + * private val serializer = IpcSerializer() + * + * // Need to invoke process() in order to actually process IPCs sent over the serializer. + * override fun onStart(...) = lifecycleScope.launch { + * serializer.process() + * } + * + * // In your binder implementation, use runSerializedBlocking to enqueue a function onto + * // the serializer. + * override fun onBind(intent: Intent?) = object : IAidlService.Stub() { + * override fun ipcMethodFoo() = serializer.runSerializedBlocking { + * ... + * } + * + * override fun ipcMethodBar() = serializer.runSerializedBlocking { + * ... + * } + * } + * } + */ +class IpcSerializer { + + private val channel = Channel<Pair<CompletableDeferred<Unit>, Job>>() + + /** + * Runs functions enqueued via usage of [runSerialized] and [runSerializedBlocking] serially. + * This method will never complete normally, so it must be launched in its own coroutine; if + * this is not actively running, no enqueued functions will be evaluated. + */ + suspend fun process(): Nothing { + for ((start, finish) in channel) { + // Signal to the sender that serializer has reached this message + start.complete(Unit) + // Wait to hear from the sender that it has finished running it's work, before handling + // the next message + finish.join() + } + error("Unexpected end of serialization channel") + } + + /** + * Enqueues [block] for evaluation by the serializer, suspending the caller until it has + * completed. It is up to the caller to define what thread this is evaluated in, determined + * by the [kotlin.coroutines.CoroutineContext] used. + */ + suspend fun <R> runSerialized(block: suspend () -> R): R { + val start = CompletableDeferred(Unit) + val finish = CompletableDeferred(Unit) + // Enqueue our message on the channel. + channel.send(start to finish) + // Wait for the serializer to reach our message + start.await() + // Now evaluate the block + val result = block() + // Notify the serializer that we've completed evaluation + finish.complete(Unit) + return result + } + + /** + * Enqueues [block] for evaluation by the serializer, blocking the binder thread until it has + * completed. Evaluation occurs on the binder thread, so methods like + * [android.os.Binder.getCallingUid] that depend on the current thread will work as expected. + */ + fun <R> runSerializedBlocking(block: suspend () -> R): R = runBlocking { runSerialized(block) } +} diff --git a/packages/SystemUI/tests/src/com/android/systemui/util/kotlin/IpcSerializerTest.kt b/packages/SystemUI/tests/src/com/android/systemui/util/kotlin/IpcSerializerTest.kt new file mode 100644 index 000000000000..15ba67205034 --- /dev/null +++ b/packages/SystemUI/tests/src/com/android/systemui/util/kotlin/IpcSerializerTest.kt @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.android.systemui.util.kotlin + +import android.testing.AndroidTestingRunner +import androidx.test.filters.SmallTest +import com.android.systemui.SysuiTestCase +import java.util.concurrent.atomic.AtomicLong +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import org.junit.Assert.assertTrue +import org.junit.Test +import org.junit.runner.RunWith + +@SmallTest +@RunWith(AndroidTestingRunner::class) +class IpcSerializerTest : SysuiTestCase() { + + private val serializer = IpcSerializer() + + @Test + fun serializeManyIncomingIpcs(): Unit = runBlocking(Dispatchers.Main.immediate) { + val processor = launch(start = CoroutineStart.LAZY) { serializer.process() } + withContext(Dispatchers.IO) { + val lastEvaluatedTime = AtomicLong(System.currentTimeMillis()) + // First, launch many serialization requests in parallel + repeat(100_000) { + launch(Dispatchers.Unconfined) { + val enqueuedTime = System.currentTimeMillis() + serializer.runSerialized { + val last = lastEvaluatedTime.getAndSet(enqueuedTime) + assertTrue( + "expected $last less than or equal to $enqueuedTime ", + last <= enqueuedTime, + ) + } + } + } + // Then, process them all in the order they came in. + processor.start() + } + // All done, stop processing + processor.cancel() + } + + @Test(timeout = 5000) + fun serializeOnOneThread_doesNotDeadlock() = runBlocking { + val job = launch { serializer.process() } + repeat(100) { + serializer.runSerializedBlocking { } + } + job.cancel() + } +} |