Merge "Throttle package session async write requests" into sc-dev
diff --git a/services/core/java/com/android/server/pm/PackageInstallerService.java b/services/core/java/com/android/server/pm/PackageInstallerService.java
index 656f347..b6f5e99 100644
--- a/services/core/java/com/android/server/pm/PackageInstallerService.java
+++ b/services/core/java/com/android/server/pm/PackageInstallerService.java
@@ -20,6 +20,7 @@
import static org.xmlpull.v1.XmlPullParser.START_TAG;
import android.Manifest;
+import android.annotation.NonNull;
import android.app.ActivityManager;
import android.app.AppGlobals;
import android.app.AppOpsManager;
@@ -88,6 +89,7 @@
import com.android.server.SystemService;
import com.android.server.SystemServiceManager;
import com.android.server.pm.parsing.PackageParser2;
+import com.android.server.pm.utils.RequestThrottle;
import libcore.io.IoUtils;
@@ -220,6 +222,14 @@
}
}
+ @NonNull
+ private final RequestThrottle mSettingsWriteRequest = new RequestThrottle(IoThread.getHandler(),
+ () -> {
+ synchronized (mSessions) {
+ return writeSessionsLocked();
+ }
+ });
+
public PackageInstallerService(Context context, PackageManagerService pm,
Supplier<PackageParser2> apexParserSupplier) {
mContext = context;
@@ -275,7 +285,7 @@
// Invalid sessions might have been marked while parsing. Re-write the database with
// the updated information.
- writeSessionsLocked();
+ mSettingsWriteRequest.runNow();
}
}
@@ -464,7 +474,7 @@
}
@GuardedBy("mSessions")
- private void writeSessionsLocked() {
+ private boolean writeSessionsLocked() {
if (LOGD) Slog.v(TAG, "writeSessionsLocked()");
FileOutputStream fos = null;
@@ -483,28 +493,20 @@
out.endDocument();
mSessionsFile.finishWrite(fos);
+ return true;
} catch (IOException e) {
if (fos != null) {
mSessionsFile.failWrite(fos);
}
}
+
+ return false;
}
private File buildAppIconFile(int sessionId) {
return new File(mSessionsDir, "app_icon." + sessionId + ".png");
}
- private void writeSessionsAsync() {
- IoThread.getHandler().post(new Runnable() {
- @Override
- public void run() {
- synchronized (mSessions) {
- writeSessionsLocked();
- }
- }
- });
- }
-
@Override
public int createSession(SessionParams params, String installerPackageName,
String callingAttributionTag, int userId) {
@@ -764,7 +766,7 @@
mCallbacks.notifySessionCreated(session.sessionId, session.userId);
- writeSessionsAsync();
+ mSettingsWriteRequest.schedule();
return sessionId;
}
@@ -1374,7 +1376,7 @@
class InternalCallback {
public void onSessionBadgingChanged(PackageInstallerSession session) {
mCallbacks.notifySessionBadgingChanged(session.sessionId, session.userId);
- writeSessionsAsync();
+ mSettingsWriteRequest.schedule();
}
public void onSessionActiveChanged(PackageInstallerSession session, boolean active) {
@@ -1389,7 +1391,7 @@
public void onStagedSessionChanged(PackageInstallerSession session) {
session.markUpdated();
- writeSessionsAsync();
+ mSettingsWriteRequest.schedule();
if (mOkToSendBroadcasts && !session.isDestroyed()) {
// we don't scrub the data here as this is sent only to the installer several
// privileged system packages
@@ -1419,7 +1421,7 @@
appIconFile.delete();
}
- writeSessionsLocked();
+ mSettingsWriteRequest.runNow();
}
}
});
@@ -1428,16 +1430,14 @@
public void onSessionPrepared(PackageInstallerSession session) {
// We prepared the destination to write into; we want to persist
// this, but it's not critical enough to block for.
- writeSessionsAsync();
+ mSettingsWriteRequest.schedule();
}
public void onSessionSealedBlocking(PackageInstallerSession session) {
// It's very important that we block until we've recorded the
// session as being sealed, since we never want to allow mutation
// after sealing.
- synchronized (mSessions) {
- writeSessionsLocked();
- }
+ mSettingsWriteRequest.runNow();
}
}
}
diff --git a/services/core/java/com/android/server/pm/utils/RequestThrottle.java b/services/core/java/com/android/server/pm/utils/RequestThrottle.java
new file mode 100644
index 0000000..f1dd402
--- /dev/null
+++ b/services/core/java/com/android/server/pm/utils/RequestThrottle.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.pm.utils;
+
+import android.annotation.NonNull;
+import android.os.Handler;
+
+import com.android.server.IoThread;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+/**
+ * Loose throttle latest behavior for success/fail requests, with options to schedule or force a
+ * request through. Throttling is implicit and not configurable. This means requests are dispatched
+ * to the {@link Handler} immediately when received, and only batched while waiting on the next
+ * message execution or running request.
+ *
+ * This also means there is no explicit debouncing. Implicit debouncing is available through the
+ * same runtime delays in the {@link Handler} instance and the request execution, where multiple
+ * requests prior to the execution point are collapsed.
+ *
+ * Callers provide a {@link Handler} with which to schedule tasks on. This may be a highly
+ * contentious thread like {@link IoThread#getHandler()}, but note that there are no guarantees
+ * that the request will be handled before the system server dies. Ideally callers should handle
+ * re-initialization from stale state with no consequences to the user.
+ *
+ * This class will retry requests if they don't succeed, as provided by a true/false response from
+ * the block provided to run the request. This uses an exponential backoff mechanism, assuming that
+ * state write should be attempted immediately, but not retried so heavily as to potentially block
+ * other system server callers. Exceptions are not considered and will not result in a retry if
+ * thrown from inside the block. Caller should wrap with try-catch and rollback and transaction
+ * state before returning false to signal a retry.
+ *
+ * The caller is strictly responsible for data synchronization, as this class will not synchronize
+ * the request block, potentially running it multiple times or on multiple threads simultaneously
+ * if requests come in asynchronously.
+ */
+public class RequestThrottle {
+
+ private static final int DEFAULT_RETRY_MAX_ATTEMPTS = 5;
+ private static final int DEFAULT_DELAY_MS = 1000;
+ private static final int DEFAULT_BACKOFF_BASE = 2;
+
+ private final AtomicInteger mLastRequest = new AtomicInteger(0);
+ private final AtomicInteger mLastCommitted = new AtomicInteger(-1);
+
+ private final int mMaxAttempts;
+ private final int mFirstDelay;
+ private final int mBackoffBase;
+
+ private final AtomicInteger mCurrentRetry = new AtomicInteger(0);
+
+ @NonNull
+ private final Handler mHandler;
+
+ @NonNull
+ private final Supplier<Boolean> mBlock;
+
+ @NonNull
+ private final Runnable mRunnable;
+
+ /**
+ * @see #RequestThrottle(Handler, int, int, int, Supplier)
+ */
+ public RequestThrottle(@NonNull Handler handler, @NonNull Supplier<Boolean> block) {
+ this(handler, DEFAULT_RETRY_MAX_ATTEMPTS, DEFAULT_DELAY_MS, DEFAULT_BACKOFF_BASE,
+ block);
+ }
+
+ /**
+ * Backoff timing is calculated as firstDelay * (backoffBase ^ retryAttempt).
+ *
+ * @param handler Representing the thread to run the provided block.
+ * @param block The action to run when scheduled, returning whether or not the request was
+ * successful. Note that any thrown exceptions will be ignored and not
+ * retried, since it's not easy to tell how destructive or retry-able an
+ * exception is.
+ * @param maxAttempts Number of times to re-attempt any single request.
+ * @param firstDelay The first delay used after the initial attempt.
+ * @param backoffBase The base of the backoff calculation, where retry attempt count is the
+ * exponent.
+ */
+ public RequestThrottle(@NonNull Handler handler, int maxAttempts, int firstDelay,
+ int backoffBase, @NonNull Supplier<Boolean> block) {
+ mHandler = handler;
+ mBlock = block;
+ mMaxAttempts = maxAttempts;
+ mFirstDelay = firstDelay;
+ mBackoffBase = backoffBase;
+ mRunnable = this::runInternal;
+ }
+
+ /**
+ * Schedule the intended action on the provided {@link Handler}.
+ */
+ public void schedule() {
+ // To avoid locking the Handler twice by pre-checking hasCallbacks, instead just queue
+ // the Runnable again. It will no-op if the request has already been written to disk.
+ mLastRequest.incrementAndGet();
+ mHandler.post(mRunnable);
+ }
+
+ /**
+ * Run the intended action immediately on the calling thread. Note that synchronization and
+ * deadlock between threads is not handled. This will immediately call the request block, and
+ * also potentially schedule a retry. The caller must not block itself.
+ *
+ * @return true if the write succeeded or the last request was already written
+ */
+ public boolean runNow() {
+ mLastRequest.incrementAndGet();
+ return runInternal();
+ }
+
+ private boolean runInternal() {
+ int lastRequest = mLastRequest.get();
+ int lastCommitted = mLastCommitted.get();
+ if (lastRequest == lastCommitted) {
+ return true;
+ }
+
+ if (mBlock.get()) {
+ mCurrentRetry.set(0);
+ mLastCommitted.set(lastRequest);
+ return true;
+ } else {
+ int currentRetry = mCurrentRetry.getAndIncrement();
+ if (currentRetry < mMaxAttempts) {
+ long nextDelay =
+ (long) (mFirstDelay * Math.pow(mBackoffBase, currentRetry));
+ mHandler.postDelayed(mRunnable, nextDelay);
+ } else {
+ mCurrentRetry.set(0);
+ }
+
+ return false;
+ }
+ }
+}
diff --git a/services/tests/PackageManagerServiceTests/unit/src/com/android/server/pm/test/install/RequestThrottleTest.kt b/services/tests/PackageManagerServiceTests/unit/src/com/android/server/pm/test/install/RequestThrottleTest.kt
new file mode 100644
index 0000000..2196ef7
--- /dev/null
+++ b/services/tests/PackageManagerServiceTests/unit/src/com/android/server/pm/test/install/RequestThrottleTest.kt
@@ -0,0 +1,219 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.pm.test.install
+
+import com.android.server.pm.utils.RequestThrottle
+import com.android.server.testutils.TestHandler
+import com.google.common.collect.Range
+import com.google.common.truth.LongSubject
+import com.google.common.truth.Truth.assertThat
+import org.junit.Before
+import org.junit.Test
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.CyclicBarrier
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.AtomicLong
+
+class RequestThrottleTest {
+
+ private val counter = AtomicInteger(0)
+
+ private val handler = TestHandler(null)
+
+ @Before
+ fun resetValues() {
+ handler.flush()
+ counter.set(0)
+ assertThat(counter.get()).isEqualTo(0)
+ }
+
+ @Test
+ fun simpleThrottle() {
+ val request = RequestThrottle(handler) {
+ counter.incrementAndGet()
+ true
+ }
+
+ fun sendRequests() {
+ request.schedule()
+ val thread = startThread { request.schedule() }
+ request.schedule()
+ thread.joinForTest()
+ }
+
+ sendRequests()
+ handler.flush()
+ assertThat(counter.get()).isEqualTo(1)
+
+ sendRequests()
+ handler.flush()
+ assertThat(counter.get()).isEqualTo(2)
+ }
+
+ @Test
+ fun exceptionInRequest() {
+ val shouldThrow = AtomicBoolean(true)
+ val request = RequestThrottle(handler) {
+ if (shouldThrow.get()) {
+ throw RuntimeException()
+ }
+ counter.incrementAndGet()
+ true
+ }
+
+ fun sendRequests() {
+ request.schedule()
+ val thread = startThread { request.schedule() }
+ request.schedule()
+ thread.joinForTest()
+ }
+
+ sendRequests()
+ try {
+ handler.flush()
+ } catch (ignored: Exception) {
+ }
+ assertThat(counter.get()).isEqualTo(0)
+
+ shouldThrow.set(false)
+
+ sendRequests()
+ handler.flush()
+ assertThat(counter.get()).isEqualTo(1)
+ }
+
+ @Test
+ fun scheduleWhileRunning() {
+ val latchForStartRequest = CountDownLatch(1)
+ val latchForEndRequest = CountDownLatch(1)
+ val request = RequestThrottle(handler) {
+ latchForStartRequest.countDown()
+ counter.incrementAndGet()
+ latchForEndRequest.awaitForTest()
+ true
+ }
+
+ // Schedule and block a request
+ request.schedule()
+ val handlerThread = startThread { handler.timeAdvance() }
+ latchForStartRequest.awaitForTest()
+
+ // Hit it with other requests
+ request.schedule()
+ (0..5).map { startThread { request.schedule() } }
+ .forEach { it.joinForTest() }
+
+ // Release everything
+ latchForEndRequest.countDown()
+ handlerThread.join()
+ handler.flush()
+
+ // Ensure another request was run after initial blocking request ends
+ assertThat(counter.get()).isEqualTo(2)
+ }
+
+ @Test
+ fun backoffRetry() {
+ val time = AtomicLong(0)
+ val handler = TestHandler(null) { time.get() }
+ val returnValue = AtomicBoolean(false)
+ val request = RequestThrottle(handler, 3, 1000, 2) {
+ counter.incrementAndGet()
+ returnValue.get()
+ }
+
+ request.schedule()
+
+ handler.timeAdvance()
+ handler.pendingMessages.apply {
+ assertThat(size).isEqualTo(1)
+ assertThat(single().sendTime).isAround(1000)
+ }
+
+ time.set(1000)
+ handler.timeAdvance()
+ handler.pendingMessages.apply {
+ assertThat(size).isEqualTo(1)
+ assertThat(single().sendTime).isAround(3000)
+ }
+
+ time.set(3000)
+ handler.timeAdvance()
+ handler.pendingMessages.apply {
+ assertThat(size).isEqualTo(1)
+ assertThat(single().sendTime).isAround(7000)
+ }
+
+ returnValue.set(true)
+ time.set(7000)
+ handler.timeAdvance()
+ assertThat(handler.pendingMessages).isEmpty()
+
+ // Ensure another request was run after initial blocking request ends
+ assertThat(counter.get()).isEqualTo(4)
+ }
+
+ @Test
+ fun forceWriteMultiple() {
+ val request = RequestThrottle(handler) {
+ counter.incrementAndGet()
+ true
+ }
+
+ request.runNow()
+ request.runNow()
+ request.runNow()
+
+ assertThat(counter.get()).isEqualTo(3)
+ }
+
+ @Test
+ fun forceWriteNowWithoutSync() {
+ // When forcing a write without synchronizing the request block, 2 instances will be run.
+ // There is no test for "with sync" because any logic to avoid multiple runs is left
+ // entirely up to the caller.
+
+ val barrierForEndRequest = CyclicBarrier(2)
+ val request = RequestThrottle(handler) {
+ counter.incrementAndGet()
+ barrierForEndRequest.awaitForTest()
+ true
+ }
+
+ // Schedule and block a request
+ request.schedule()
+ val thread = startThread { handler.timeAdvance() }
+
+ request.runNow()
+
+ thread.joinForTest()
+
+ assertThat(counter.get()).isEqualTo(2)
+ }
+
+ private fun CountDownLatch.awaitForTest() = assertThat(await(5, TimeUnit.SECONDS)).isTrue()
+ private fun CyclicBarrier.awaitForTest() = await(5, TimeUnit.SECONDS)
+ private fun Thread.joinForTest() = join(5000)
+
+ private fun startThread(block: () -> Unit) = Thread { block() }.apply { start() }
+
+ // Float math means time calculations are not exact, so use a loose range
+ private fun LongSubject.isAround(value: Long, threshold: Long = 10) =
+ isIn(Range.closed(value - threshold, value + threshold))
+}