diff options
3 files changed, 394 insertions, 21 deletions
diff --git a/services/core/java/com/android/server/pm/PackageInstallerService.java b/services/core/java/com/android/server/pm/PackageInstallerService.java index 656f3474e797..b6f5e999be27 100644 --- a/services/core/java/com/android/server/pm/PackageInstallerService.java +++ b/services/core/java/com/android/server/pm/PackageInstallerService.java @@ -20,6 +20,7 @@ import static org.xmlpull.v1.XmlPullParser.END_DOCUMENT; import static org.xmlpull.v1.XmlPullParser.START_TAG; import android.Manifest; +import android.annotation.NonNull; import android.app.ActivityManager; import android.app.AppGlobals; import android.app.AppOpsManager; @@ -88,6 +89,7 @@ import com.android.server.SystemConfig; import com.android.server.SystemService; import com.android.server.SystemServiceManager; import com.android.server.pm.parsing.PackageParser2; +import com.android.server.pm.utils.RequestThrottle; import libcore.io.IoUtils; @@ -220,6 +222,14 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements } } + @NonNull + private final RequestThrottle mSettingsWriteRequest = new RequestThrottle(IoThread.getHandler(), + () -> { + synchronized (mSessions) { + return writeSessionsLocked(); + } + }); + public PackageInstallerService(Context context, PackageManagerService pm, Supplier<PackageParser2> apexParserSupplier) { mContext = context; @@ -275,7 +285,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements // Invalid sessions might have been marked while parsing. Re-write the database with // the updated information. - writeSessionsLocked(); + mSettingsWriteRequest.runNow(); } } @@ -464,7 +474,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements } @GuardedBy("mSessions") - private void writeSessionsLocked() { + private boolean writeSessionsLocked() { if (LOGD) Slog.v(TAG, "writeSessionsLocked()"); FileOutputStream fos = null; @@ -483,28 +493,20 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements out.endDocument(); mSessionsFile.finishWrite(fos); + return true; } catch (IOException e) { if (fos != null) { mSessionsFile.failWrite(fos); } } + + return false; } private File buildAppIconFile(int sessionId) { return new File(mSessionsDir, "app_icon." + sessionId + ".png"); } - private void writeSessionsAsync() { - IoThread.getHandler().post(new Runnable() { - @Override - public void run() { - synchronized (mSessions) { - writeSessionsLocked(); - } - } - }); - } - @Override public int createSession(SessionParams params, String installerPackageName, String callingAttributionTag, int userId) { @@ -764,7 +766,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements mCallbacks.notifySessionCreated(session.sessionId, session.userId); - writeSessionsAsync(); + mSettingsWriteRequest.schedule(); return sessionId; } @@ -1374,7 +1376,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements class InternalCallback { public void onSessionBadgingChanged(PackageInstallerSession session) { mCallbacks.notifySessionBadgingChanged(session.sessionId, session.userId); - writeSessionsAsync(); + mSettingsWriteRequest.schedule(); } public void onSessionActiveChanged(PackageInstallerSession session, boolean active) { @@ -1389,7 +1391,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements public void onStagedSessionChanged(PackageInstallerSession session) { session.markUpdated(); - writeSessionsAsync(); + mSettingsWriteRequest.schedule(); if (mOkToSendBroadcasts && !session.isDestroyed()) { // we don't scrub the data here as this is sent only to the installer several // privileged system packages @@ -1419,7 +1421,7 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements appIconFile.delete(); } - writeSessionsLocked(); + mSettingsWriteRequest.runNow(); } } }); @@ -1428,16 +1430,14 @@ public class PackageInstallerService extends IPackageInstaller.Stub implements public void onSessionPrepared(PackageInstallerSession session) { // We prepared the destination to write into; we want to persist // this, but it's not critical enough to block for. - writeSessionsAsync(); + mSettingsWriteRequest.schedule(); } public void onSessionSealedBlocking(PackageInstallerSession session) { // It's very important that we block until we've recorded the // session as being sealed, since we never want to allow mutation // after sealing. - synchronized (mSessions) { - writeSessionsLocked(); - } + mSettingsWriteRequest.runNow(); } } } diff --git a/services/core/java/com/android/server/pm/utils/RequestThrottle.java b/services/core/java/com/android/server/pm/utils/RequestThrottle.java new file mode 100644 index 000000000000..f1dd402d48f5 --- /dev/null +++ b/services/core/java/com/android/server/pm/utils/RequestThrottle.java @@ -0,0 +1,154 @@ +/* + * Copyright (C) 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.android.server.pm.utils; + +import android.annotation.NonNull; +import android.os.Handler; + +import com.android.server.IoThread; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +/** + * Loose throttle latest behavior for success/fail requests, with options to schedule or force a + * request through. Throttling is implicit and not configurable. This means requests are dispatched + * to the {@link Handler} immediately when received, and only batched while waiting on the next + * message execution or running request. + * + * This also means there is no explicit debouncing. Implicit debouncing is available through the + * same runtime delays in the {@link Handler} instance and the request execution, where multiple + * requests prior to the execution point are collapsed. + * + * Callers provide a {@link Handler} with which to schedule tasks on. This may be a highly + * contentious thread like {@link IoThread#getHandler()}, but note that there are no guarantees + * that the request will be handled before the system server dies. Ideally callers should handle + * re-initialization from stale state with no consequences to the user. + * + * This class will retry requests if they don't succeed, as provided by a true/false response from + * the block provided to run the request. This uses an exponential backoff mechanism, assuming that + * state write should be attempted immediately, but not retried so heavily as to potentially block + * other system server callers. Exceptions are not considered and will not result in a retry if + * thrown from inside the block. Caller should wrap with try-catch and rollback and transaction + * state before returning false to signal a retry. + * + * The caller is strictly responsible for data synchronization, as this class will not synchronize + * the request block, potentially running it multiple times or on multiple threads simultaneously + * if requests come in asynchronously. + */ +public class RequestThrottle { + + private static final int DEFAULT_RETRY_MAX_ATTEMPTS = 5; + private static final int DEFAULT_DELAY_MS = 1000; + private static final int DEFAULT_BACKOFF_BASE = 2; + + private final AtomicInteger mLastRequest = new AtomicInteger(0); + private final AtomicInteger mLastCommitted = new AtomicInteger(-1); + + private final int mMaxAttempts; + private final int mFirstDelay; + private final int mBackoffBase; + + private final AtomicInteger mCurrentRetry = new AtomicInteger(0); + + @NonNull + private final Handler mHandler; + + @NonNull + private final Supplier<Boolean> mBlock; + + @NonNull + private final Runnable mRunnable; + + /** + * @see #RequestThrottle(Handler, int, int, int, Supplier) + */ + public RequestThrottle(@NonNull Handler handler, @NonNull Supplier<Boolean> block) { + this(handler, DEFAULT_RETRY_MAX_ATTEMPTS, DEFAULT_DELAY_MS, DEFAULT_BACKOFF_BASE, + block); + } + + /** + * Backoff timing is calculated as firstDelay * (backoffBase ^ retryAttempt). + * + * @param handler Representing the thread to run the provided block. + * @param block The action to run when scheduled, returning whether or not the request was + * successful. Note that any thrown exceptions will be ignored and not + * retried, since it's not easy to tell how destructive or retry-able an + * exception is. + * @param maxAttempts Number of times to re-attempt any single request. + * @param firstDelay The first delay used after the initial attempt. + * @param backoffBase The base of the backoff calculation, where retry attempt count is the + * exponent. + */ + public RequestThrottle(@NonNull Handler handler, int maxAttempts, int firstDelay, + int backoffBase, @NonNull Supplier<Boolean> block) { + mHandler = handler; + mBlock = block; + mMaxAttempts = maxAttempts; + mFirstDelay = firstDelay; + mBackoffBase = backoffBase; + mRunnable = this::runInternal; + } + + /** + * Schedule the intended action on the provided {@link Handler}. + */ + public void schedule() { + // To avoid locking the Handler twice by pre-checking hasCallbacks, instead just queue + // the Runnable again. It will no-op if the request has already been written to disk. + mLastRequest.incrementAndGet(); + mHandler.post(mRunnable); + } + + /** + * Run the intended action immediately on the calling thread. Note that synchronization and + * deadlock between threads is not handled. This will immediately call the request block, and + * also potentially schedule a retry. The caller must not block itself. + * + * @return true if the write succeeded or the last request was already written + */ + public boolean runNow() { + mLastRequest.incrementAndGet(); + return runInternal(); + } + + private boolean runInternal() { + int lastRequest = mLastRequest.get(); + int lastCommitted = mLastCommitted.get(); + if (lastRequest == lastCommitted) { + return true; + } + + if (mBlock.get()) { + mCurrentRetry.set(0); + mLastCommitted.set(lastRequest); + return true; + } else { + int currentRetry = mCurrentRetry.getAndIncrement(); + if (currentRetry < mMaxAttempts) { + long nextDelay = + (long) (mFirstDelay * Math.pow(mBackoffBase, currentRetry)); + mHandler.postDelayed(mRunnable, nextDelay); + } else { + mCurrentRetry.set(0); + } + + return false; + } + } +} diff --git a/services/tests/PackageManagerServiceTests/unit/src/com/android/server/pm/test/install/RequestThrottleTest.kt b/services/tests/PackageManagerServiceTests/unit/src/com/android/server/pm/test/install/RequestThrottleTest.kt new file mode 100644 index 000000000000..2196ef74d6a4 --- /dev/null +++ b/services/tests/PackageManagerServiceTests/unit/src/com/android/server/pm/test/install/RequestThrottleTest.kt @@ -0,0 +1,219 @@ +/* + * Copyright (C) 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.android.server.pm.test.install + +import com.android.server.pm.utils.RequestThrottle +import com.android.server.testutils.TestHandler +import com.google.common.collect.Range +import com.google.common.truth.LongSubject +import com.google.common.truth.Truth.assertThat +import org.junit.Before +import org.junit.Test +import java.util.concurrent.CountDownLatch +import java.util.concurrent.CyclicBarrier +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong + +class RequestThrottleTest { + + private val counter = AtomicInteger(0) + + private val handler = TestHandler(null) + + @Before + fun resetValues() { + handler.flush() + counter.set(0) + assertThat(counter.get()).isEqualTo(0) + } + + @Test + fun simpleThrottle() { + val request = RequestThrottle(handler) { + counter.incrementAndGet() + true + } + + fun sendRequests() { + request.schedule() + val thread = startThread { request.schedule() } + request.schedule() + thread.joinForTest() + } + + sendRequests() + handler.flush() + assertThat(counter.get()).isEqualTo(1) + + sendRequests() + handler.flush() + assertThat(counter.get()).isEqualTo(2) + } + + @Test + fun exceptionInRequest() { + val shouldThrow = AtomicBoolean(true) + val request = RequestThrottle(handler) { + if (shouldThrow.get()) { + throw RuntimeException() + } + counter.incrementAndGet() + true + } + + fun sendRequests() { + request.schedule() + val thread = startThread { request.schedule() } + request.schedule() + thread.joinForTest() + } + + sendRequests() + try { + handler.flush() + } catch (ignored: Exception) { + } + assertThat(counter.get()).isEqualTo(0) + + shouldThrow.set(false) + + sendRequests() + handler.flush() + assertThat(counter.get()).isEqualTo(1) + } + + @Test + fun scheduleWhileRunning() { + val latchForStartRequest = CountDownLatch(1) + val latchForEndRequest = CountDownLatch(1) + val request = RequestThrottle(handler) { + latchForStartRequest.countDown() + counter.incrementAndGet() + latchForEndRequest.awaitForTest() + true + } + + // Schedule and block a request + request.schedule() + val handlerThread = startThread { handler.timeAdvance() } + latchForStartRequest.awaitForTest() + + // Hit it with other requests + request.schedule() + (0..5).map { startThread { request.schedule() } } + .forEach { it.joinForTest() } + + // Release everything + latchForEndRequest.countDown() + handlerThread.join() + handler.flush() + + // Ensure another request was run after initial blocking request ends + assertThat(counter.get()).isEqualTo(2) + } + + @Test + fun backoffRetry() { + val time = AtomicLong(0) + val handler = TestHandler(null) { time.get() } + val returnValue = AtomicBoolean(false) + val request = RequestThrottle(handler, 3, 1000, 2) { + counter.incrementAndGet() + returnValue.get() + } + + request.schedule() + + handler.timeAdvance() + handler.pendingMessages.apply { + assertThat(size).isEqualTo(1) + assertThat(single().sendTime).isAround(1000) + } + + time.set(1000) + handler.timeAdvance() + handler.pendingMessages.apply { + assertThat(size).isEqualTo(1) + assertThat(single().sendTime).isAround(3000) + } + + time.set(3000) + handler.timeAdvance() + handler.pendingMessages.apply { + assertThat(size).isEqualTo(1) + assertThat(single().sendTime).isAround(7000) + } + + returnValue.set(true) + time.set(7000) + handler.timeAdvance() + assertThat(handler.pendingMessages).isEmpty() + + // Ensure another request was run after initial blocking request ends + assertThat(counter.get()).isEqualTo(4) + } + + @Test + fun forceWriteMultiple() { + val request = RequestThrottle(handler) { + counter.incrementAndGet() + true + } + + request.runNow() + request.runNow() + request.runNow() + + assertThat(counter.get()).isEqualTo(3) + } + + @Test + fun forceWriteNowWithoutSync() { + // When forcing a write without synchronizing the request block, 2 instances will be run. + // There is no test for "with sync" because any logic to avoid multiple runs is left + // entirely up to the caller. + + val barrierForEndRequest = CyclicBarrier(2) + val request = RequestThrottle(handler) { + counter.incrementAndGet() + barrierForEndRequest.awaitForTest() + true + } + + // Schedule and block a request + request.schedule() + val thread = startThread { handler.timeAdvance() } + + request.runNow() + + thread.joinForTest() + + assertThat(counter.get()).isEqualTo(2) + } + + private fun CountDownLatch.awaitForTest() = assertThat(await(5, TimeUnit.SECONDS)).isTrue() + private fun CyclicBarrier.awaitForTest() = await(5, TimeUnit.SECONDS) + private fun Thread.joinForTest() = join(5000) + + private fun startThread(block: () -> Unit) = Thread { block() }.apply { start() } + + // Float math means time calculations are not exact, so use a loose range + private fun LongSubject.isAround(value: Long, threshold: Long = 10) = + isIn(Range.closed(value - threshold, value + threshold)) +} |