Merge "Throttle package session async write requests" into sc-dev
diff --git a/services/core/java/com/android/server/pm/PackageInstallerService.java b/services/core/java/com/android/server/pm/PackageInstallerService.java
index 656f347..b6f5e99 100644
--- a/services/core/java/com/android/server/pm/PackageInstallerService.java
+++ b/services/core/java/com/android/server/pm/PackageInstallerService.java
@@ -20,6 +20,7 @@
 import static org.xmlpull.v1.XmlPullParser.START_TAG;
 
 import android.Manifest;
+import android.annotation.NonNull;
 import android.app.ActivityManager;
 import android.app.AppGlobals;
 import android.app.AppOpsManager;
@@ -88,6 +89,7 @@
 import com.android.server.SystemService;
 import com.android.server.SystemServiceManager;
 import com.android.server.pm.parsing.PackageParser2;
+import com.android.server.pm.utils.RequestThrottle;
 
 import libcore.io.IoUtils;
 
@@ -220,6 +222,14 @@
         }
     }
 
+    @NonNull
+    private final RequestThrottle mSettingsWriteRequest = new RequestThrottle(IoThread.getHandler(),
+            () -> {
+                synchronized (mSessions) {
+                    return writeSessionsLocked();
+                }
+            });
+
     public PackageInstallerService(Context context, PackageManagerService pm,
             Supplier<PackageParser2> apexParserSupplier) {
         mContext = context;
@@ -275,7 +285,7 @@
 
             // Invalid sessions might have been marked while parsing. Re-write the database with
             // the updated information.
-            writeSessionsLocked();
+            mSettingsWriteRequest.runNow();
 
         }
     }
@@ -464,7 +474,7 @@
     }
 
     @GuardedBy("mSessions")
-    private void writeSessionsLocked() {
+    private boolean writeSessionsLocked() {
         if (LOGD) Slog.v(TAG, "writeSessionsLocked()");
 
         FileOutputStream fos = null;
@@ -483,28 +493,20 @@
             out.endDocument();
 
             mSessionsFile.finishWrite(fos);
+            return true;
         } catch (IOException e) {
             if (fos != null) {
                 mSessionsFile.failWrite(fos);
             }
         }
+
+        return false;
     }
 
     private File buildAppIconFile(int sessionId) {
         return new File(mSessionsDir, "app_icon." + sessionId + ".png");
     }
 
-    private void writeSessionsAsync() {
-        IoThread.getHandler().post(new Runnable() {
-            @Override
-            public void run() {
-                synchronized (mSessions) {
-                    writeSessionsLocked();
-                }
-            }
-        });
-    }
-
     @Override
     public int createSession(SessionParams params, String installerPackageName,
             String callingAttributionTag, int userId) {
@@ -764,7 +766,7 @@
 
         mCallbacks.notifySessionCreated(session.sessionId, session.userId);
 
-        writeSessionsAsync();
+        mSettingsWriteRequest.schedule();
         return sessionId;
     }
 
@@ -1374,7 +1376,7 @@
     class InternalCallback {
         public void onSessionBadgingChanged(PackageInstallerSession session) {
             mCallbacks.notifySessionBadgingChanged(session.sessionId, session.userId);
-            writeSessionsAsync();
+            mSettingsWriteRequest.schedule();
         }
 
         public void onSessionActiveChanged(PackageInstallerSession session, boolean active) {
@@ -1389,7 +1391,7 @@
 
         public void onStagedSessionChanged(PackageInstallerSession session) {
             session.markUpdated();
-            writeSessionsAsync();
+            mSettingsWriteRequest.schedule();
             if (mOkToSendBroadcasts && !session.isDestroyed()) {
                 // we don't scrub the data here as this is sent only to the installer several
                 // privileged system packages
@@ -1419,7 +1421,7 @@
                             appIconFile.delete();
                         }
 
-                        writeSessionsLocked();
+                        mSettingsWriteRequest.runNow();
                     }
                 }
             });
@@ -1428,16 +1430,14 @@
         public void onSessionPrepared(PackageInstallerSession session) {
             // We prepared the destination to write into; we want to persist
             // this, but it's not critical enough to block for.
-            writeSessionsAsync();
+            mSettingsWriteRequest.schedule();
         }
 
         public void onSessionSealedBlocking(PackageInstallerSession session) {
             // It's very important that we block until we've recorded the
             // session as being sealed, since we never want to allow mutation
             // after sealing.
-            synchronized (mSessions) {
-                writeSessionsLocked();
-            }
+            mSettingsWriteRequest.runNow();
         }
     }
 }
diff --git a/services/core/java/com/android/server/pm/utils/RequestThrottle.java b/services/core/java/com/android/server/pm/utils/RequestThrottle.java
new file mode 100644
index 0000000..f1dd402
--- /dev/null
+++ b/services/core/java/com/android/server/pm/utils/RequestThrottle.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.pm.utils;
+
+import android.annotation.NonNull;
+import android.os.Handler;
+
+import com.android.server.IoThread;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+/**
+ * Loose throttle latest behavior for success/fail requests, with options to schedule or force a
+ * request through. Throttling is implicit and not configurable. This means requests are dispatched
+ * to the {@link Handler} immediately when received, and only batched while waiting on the next
+ * message execution or running request.
+ *
+ * This also means there is no explicit debouncing. Implicit debouncing is available through the
+ * same runtime delays in the {@link Handler} instance and the request execution, where multiple
+ * requests prior to the execution point are collapsed.
+ *
+ * Callers provide a {@link Handler} with which to schedule tasks on. This may be a highly
+ * contentious thread like {@link IoThread#getHandler()}, but note that there are no guarantees
+ * that the request will be handled before the system server dies. Ideally callers should handle
+ * re-initialization from stale state with no consequences to the user.
+ *
+ * This class will retry requests if they don't succeed, as provided by a true/false response from
+ * the block provided to run the request. This uses an exponential backoff mechanism, assuming that
+ * state write should be attempted immediately, but not retried so heavily as to potentially block
+ * other system server callers. Exceptions are not considered and will not result in a retry if
+ * thrown from inside the block. Caller should wrap with try-catch and rollback and transaction
+ * state before returning false to signal a retry.
+ *
+ * The caller is strictly responsible for data synchronization, as this class will not synchronize
+ * the request block, potentially running it multiple times or on multiple threads simultaneously
+ * if requests come in asynchronously.
+ */
+public class RequestThrottle {
+
+    private static final int DEFAULT_RETRY_MAX_ATTEMPTS = 5;
+    private static final int DEFAULT_DELAY_MS = 1000;
+    private static final int DEFAULT_BACKOFF_BASE = 2;
+
+    private final AtomicInteger mLastRequest = new AtomicInteger(0);
+    private final AtomicInteger mLastCommitted = new AtomicInteger(-1);
+
+    private final int mMaxAttempts;
+    private final int mFirstDelay;
+    private final int mBackoffBase;
+
+    private final AtomicInteger mCurrentRetry = new AtomicInteger(0);
+
+    @NonNull
+    private final Handler mHandler;
+
+    @NonNull
+    private final Supplier<Boolean> mBlock;
+
+    @NonNull
+    private final Runnable mRunnable;
+
+    /**
+     * @see #RequestThrottle(Handler, int, int, int, Supplier)
+     */
+    public RequestThrottle(@NonNull Handler handler, @NonNull Supplier<Boolean> block) {
+        this(handler, DEFAULT_RETRY_MAX_ATTEMPTS, DEFAULT_DELAY_MS, DEFAULT_BACKOFF_BASE,
+                block);
+    }
+
+    /**
+     * Backoff timing is calculated as firstDelay * (backoffBase ^ retryAttempt).
+     *
+     * @param handler     Representing the thread to run the provided block.
+     * @param block       The action to run when scheduled, returning whether or not the request was
+     *                    successful. Note that any thrown exceptions will be ignored and not
+     *                    retried, since it's not easy to tell how destructive or retry-able an
+     *                    exception is.
+     * @param maxAttempts Number of times to re-attempt any single request.
+     * @param firstDelay  The first delay used after the initial attempt.
+     * @param backoffBase The base of the backoff calculation, where retry attempt count is the
+     *                    exponent.
+     */
+    public RequestThrottle(@NonNull Handler handler, int maxAttempts, int firstDelay,
+            int backoffBase, @NonNull Supplier<Boolean> block) {
+        mHandler = handler;
+        mBlock = block;
+        mMaxAttempts = maxAttempts;
+        mFirstDelay = firstDelay;
+        mBackoffBase = backoffBase;
+        mRunnable = this::runInternal;
+    }
+
+    /**
+     * Schedule the intended action on the provided {@link Handler}.
+     */
+    public void schedule() {
+        // To avoid locking the Handler twice by pre-checking hasCallbacks, instead just queue
+        // the Runnable again. It will no-op if the request has already been written to disk.
+        mLastRequest.incrementAndGet();
+        mHandler.post(mRunnable);
+    }
+
+    /**
+     * Run the intended action immediately on the calling thread. Note that synchronization and
+     * deadlock between threads is not handled. This will immediately call the request block, and
+     * also potentially schedule a retry. The caller must not block itself.
+     *
+     * @return true if the write succeeded or the last request was already written
+     */
+    public boolean runNow() {
+        mLastRequest.incrementAndGet();
+        return runInternal();
+    }
+
+    private boolean runInternal() {
+        int lastRequest = mLastRequest.get();
+        int lastCommitted = mLastCommitted.get();
+        if (lastRequest == lastCommitted) {
+            return true;
+        }
+
+        if (mBlock.get()) {
+            mCurrentRetry.set(0);
+            mLastCommitted.set(lastRequest);
+            return true;
+        } else {
+            int currentRetry = mCurrentRetry.getAndIncrement();
+            if (currentRetry < mMaxAttempts) {
+                long nextDelay =
+                        (long) (mFirstDelay * Math.pow(mBackoffBase, currentRetry));
+                mHandler.postDelayed(mRunnable, nextDelay);
+            } else {
+                mCurrentRetry.set(0);
+            }
+
+            return false;
+        }
+    }
+}
diff --git a/services/tests/PackageManagerServiceTests/unit/src/com/android/server/pm/test/install/RequestThrottleTest.kt b/services/tests/PackageManagerServiceTests/unit/src/com/android/server/pm/test/install/RequestThrottleTest.kt
new file mode 100644
index 0000000..2196ef7
--- /dev/null
+++ b/services/tests/PackageManagerServiceTests/unit/src/com/android/server/pm/test/install/RequestThrottleTest.kt
@@ -0,0 +1,219 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.pm.test.install
+
+import com.android.server.pm.utils.RequestThrottle
+import com.android.server.testutils.TestHandler
+import com.google.common.collect.Range
+import com.google.common.truth.LongSubject
+import com.google.common.truth.Truth.assertThat
+import org.junit.Before
+import org.junit.Test
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.CyclicBarrier
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.AtomicLong
+
+class RequestThrottleTest {
+
+    private val counter = AtomicInteger(0)
+
+    private val handler = TestHandler(null)
+
+    @Before
+    fun resetValues() {
+        handler.flush()
+        counter.set(0)
+        assertThat(counter.get()).isEqualTo(0)
+    }
+
+    @Test
+    fun simpleThrottle() {
+        val request = RequestThrottle(handler) {
+            counter.incrementAndGet()
+            true
+        }
+
+        fun sendRequests() {
+            request.schedule()
+            val thread = startThread { request.schedule() }
+            request.schedule()
+            thread.joinForTest()
+        }
+
+        sendRequests()
+        handler.flush()
+        assertThat(counter.get()).isEqualTo(1)
+
+        sendRequests()
+        handler.flush()
+        assertThat(counter.get()).isEqualTo(2)
+    }
+
+    @Test
+    fun exceptionInRequest() {
+        val shouldThrow = AtomicBoolean(true)
+        val request = RequestThrottle(handler) {
+            if (shouldThrow.get()) {
+                throw RuntimeException()
+            }
+            counter.incrementAndGet()
+            true
+        }
+
+        fun sendRequests() {
+            request.schedule()
+            val thread = startThread { request.schedule() }
+            request.schedule()
+            thread.joinForTest()
+        }
+
+        sendRequests()
+        try {
+            handler.flush()
+        } catch (ignored: Exception) {
+        }
+        assertThat(counter.get()).isEqualTo(0)
+
+        shouldThrow.set(false)
+
+        sendRequests()
+        handler.flush()
+        assertThat(counter.get()).isEqualTo(1)
+    }
+
+    @Test
+    fun scheduleWhileRunning() {
+        val latchForStartRequest = CountDownLatch(1)
+        val latchForEndRequest = CountDownLatch(1)
+        val request = RequestThrottle(handler) {
+            latchForStartRequest.countDown()
+            counter.incrementAndGet()
+            latchForEndRequest.awaitForTest()
+            true
+        }
+
+        // Schedule and block a request
+        request.schedule()
+        val handlerThread = startThread { handler.timeAdvance() }
+        latchForStartRequest.awaitForTest()
+
+        // Hit it with other requests
+        request.schedule()
+        (0..5).map { startThread { request.schedule() } }
+                .forEach { it.joinForTest() }
+
+        // Release everything
+        latchForEndRequest.countDown()
+        handlerThread.join()
+        handler.flush()
+
+        // Ensure another request was run after initial blocking request ends
+        assertThat(counter.get()).isEqualTo(2)
+    }
+
+    @Test
+    fun backoffRetry() {
+        val time = AtomicLong(0)
+        val handler = TestHandler(null) { time.get() }
+        val returnValue = AtomicBoolean(false)
+        val request = RequestThrottle(handler, 3, 1000, 2) {
+            counter.incrementAndGet()
+            returnValue.get()
+        }
+
+        request.schedule()
+
+        handler.timeAdvance()
+        handler.pendingMessages.apply {
+            assertThat(size).isEqualTo(1)
+            assertThat(single().sendTime).isAround(1000)
+        }
+
+        time.set(1000)
+        handler.timeAdvance()
+        handler.pendingMessages.apply {
+            assertThat(size).isEqualTo(1)
+            assertThat(single().sendTime).isAround(3000)
+        }
+
+        time.set(3000)
+        handler.timeAdvance()
+        handler.pendingMessages.apply {
+            assertThat(size).isEqualTo(1)
+            assertThat(single().sendTime).isAround(7000)
+        }
+
+        returnValue.set(true)
+        time.set(7000)
+        handler.timeAdvance()
+        assertThat(handler.pendingMessages).isEmpty()
+
+        // Ensure another request was run after initial blocking request ends
+        assertThat(counter.get()).isEqualTo(4)
+    }
+
+    @Test
+    fun forceWriteMultiple() {
+        val request = RequestThrottle(handler) {
+            counter.incrementAndGet()
+            true
+        }
+
+        request.runNow()
+        request.runNow()
+        request.runNow()
+
+        assertThat(counter.get()).isEqualTo(3)
+    }
+
+    @Test
+    fun forceWriteNowWithoutSync() {
+        // When forcing a write without synchronizing the request block, 2 instances will be run.
+        // There is no test for "with sync" because any logic to avoid multiple runs is left
+        // entirely up to the caller.
+
+        val barrierForEndRequest = CyclicBarrier(2)
+        val request = RequestThrottle(handler) {
+            counter.incrementAndGet()
+            barrierForEndRequest.awaitForTest()
+            true
+        }
+
+        // Schedule and block a request
+        request.schedule()
+        val thread = startThread { handler.timeAdvance() }
+
+        request.runNow()
+
+        thread.joinForTest()
+
+        assertThat(counter.get()).isEqualTo(2)
+    }
+
+    private fun CountDownLatch.awaitForTest() = assertThat(await(5, TimeUnit.SECONDS)).isTrue()
+    private fun CyclicBarrier.awaitForTest() = await(5, TimeUnit.SECONDS)
+    private fun Thread.joinForTest() = join(5000)
+
+    private fun startThread(block: () -> Unit) = Thread { block() }.apply { start() }
+
+    // Float math means time calculations are not exact, so use a loose range
+    private fun LongSubject.isAround(value: Long, threshold: Long = 10) =
+            isIn(Range.closed(value - threshold, value + threshold))
+}