Add LifecycleOperationStorage and tests

This is preparatory work to fix bug b/161089758 where
it's possible for an app's BackupAgent to crash and leave
the app in 'restricted mode' when subsequently relaunched.

The bug fix involves storing new associations between
package name and operation tokens.  Rather than further
extend UserBackupManagerService, this change introduces a
new LifecycleOperationStorage class, and interface
OperationStorage, which takes the responsibility for
storing Operations by token, the associated locking, and
adds the new mapping from package name to operation tokens.

Later changes will refactor UserBackupManagerService to
use LifecycleOperationStorage, and several related classes
to use OperationStorage.

BUG: 161089758
Change-Id: Ib3a3d4f6f88c49bbfa82068c625a2abae720dd92
Test: atest FrameworksServicesTests:OperationStoreTest
diff --git a/services/backup/Android.bp b/services/backup/Android.bp
index ead8aff..7b0d6c0 100644
--- a/services/backup/Android.bp
+++ b/services/backup/Android.bp
@@ -19,5 +19,9 @@
     defaults: ["platform_service_defaults"],
     srcs: [":services.backup-sources"],
     libs: ["services.core"],
-    static_libs: ["backuplib", "app-compat-annotations"],
+    static_libs: [
+        "backuplib",
+        "app-compat-annotations",
+        "guava",
+    ],
 }
diff --git a/services/backup/java/com/android/server/backup/OperationStorage.java b/services/backup/java/com/android/server/backup/OperationStorage.java
new file mode 100644
index 0000000..155a4b57
--- /dev/null
+++ b/services/backup/java/com/android/server/backup/OperationStorage.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.backup;
+
+import android.annotation.IntDef;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.util.Set;
+
+/**
+ * OperationStorage is an abstraction around a set of active operations.
+ *
+ * Operations are registered with a token that must first be obtained from
+ * {@link UserBackupManagerService#generateRandomIntegerToken()}.  When
+ * registering, the caller may also associate a set of package names with
+ * the operation.
+ *
+ * TODO(b/208442527): have the token be generated within and returned by
+ *                    registerOperation, as it should be an internal detail.
+ *
+ * Operations have a type and a state.  Although ints, the values that can
+ * be used are defined in {@link UserBackupManagerService}.  If the type of
+ * an operation is OP_BACKUP, then it represents a task running backups. The
+ * task is provided when registering the operation because it provides a
+ * handle to cancel the backup.
+ */
+public interface OperationStorage {
+
+    @Retention(RetentionPolicy.SOURCE)
+    @IntDef({
+        OpState.PENDING,
+        OpState.ACKNOWLEDGED,
+        OpState.TIMEOUT
+    })
+    public @interface OpState {
+        // The operation is in progress.
+        int PENDING = 0;
+        // The operation has been acknowledged.
+        int ACKNOWLEDGED = 1;
+        // The operation has timed out.
+        int TIMEOUT = -1;
+    }
+
+    @Retention(RetentionPolicy.SOURCE)
+    @IntDef({
+        OpType.BACKUP_WAIT,
+        OpType.RESTORE_WAIT,
+        OpType.BACKUP,
+    })
+    public @interface OpType {
+        // Waiting for backup agent to respond during backup operation.
+        int BACKUP_WAIT = 0;
+        // Waiting for backup agent to respond during restore operation.
+        int RESTORE_WAIT = 1;
+        // An entire backup operation spanning multiple packages.
+        int BACKUP = 2;
+    }
+
+    /**
+     * Record an ongoing operation of given type and in the given initial
+     * state. The associated task is used as a callback.
+     *
+     * @param token        an operation token issued by
+     *                     {@link UserBackupManagerService#generateRandomIntegerToken()}
+     * @param initialState the state that the operation starts in
+     * @param task         the {@link BackupRestoreTask} that is expected to
+     *                     remove the operation on completion, and which may
+     *                     be notified if the operation requires cancelling.
+     * @param type         the type of the operation.
+     */
+    void registerOperation(int token, @OpState int initialState,
+            BackupRestoreTask task, @OpType int type);
+
+    /**
+     * See {@link #registerOperation()}.  In addition this method accepts a set
+     * of package names which are associated with the operation.
+     *
+     * @param token        See {@link #registerOperation()}
+     * @param initialState See {@link #registerOperation()}
+     * @param packageNames the package names to associate with the operation.
+     * @param task         See {@link #registerOperation()}
+     * @param type         See {@link #registerOperation()}
+     */
+    void registerOperationForPackages(int token, @OpState int initialState,
+            Set<String> packageNames, BackupRestoreTask task, @OpType int type);
+
+    /**
+     * Remove the operation identified by token.  This is called when the
+     * operation is no longer in progress and should be dropped. Any association
+     * with package names provided in {@link #registerOperation()} is dropped as
+     * well.
+     *
+     * @param token the operation token specified when registering the operation.
+     */
+    void removeOperation(int token);
+
+    /**
+     * Obtain a set of operation tokens for all pending operations that were
+     * registered with an association to the specified package name.
+     *
+     * @param packageName the name of the package used at registration time
+     *
+     * @return a set of operation tokens associated to package name.
+     */
+    Set<Integer> operationTokensForPackage(String packageName);
+
+    /**
+     * Obtain a set of operation tokens for all pending operations that are
+     * of the specified operation type.
+     *
+     * @param type the type of the operation provided at registration time.
+     *
+     * @return a set of operation tokens for operations of that type.
+     */
+    Set<Integer> operationTokensForOpType(@OpType int type);
+
+    /**
+     * Obtain a set of operation tokens for all pending operations that are
+     * currently in the specified operation state.
+     *
+     * @param state the state of the operation.
+     *
+     * @return a set of operation tokens for operations in that state.
+     */
+    Set<Integer> operationTokensForOpState(@OpState int state);
+};
diff --git a/services/backup/java/com/android/server/backup/UserBackupManagerService.java b/services/backup/java/com/android/server/backup/UserBackupManagerService.java
index 452adb2..2a6c90f 100644
--- a/services/backup/java/com/android/server/backup/UserBackupManagerService.java
+++ b/services/backup/java/com/android/server/backup/UserBackupManagerService.java
@@ -290,8 +290,8 @@
     // Bookkeeping of in-flight operations. The operation token is the index of the entry in the
     // pending operations list.
     public static final int OP_PENDING = 0;
-    private static final int OP_ACKNOWLEDGED = 1;
-    private static final int OP_TIMEOUT = -1;
+    public static final int OP_ACKNOWLEDGED = 1;
+    public static final int OP_TIMEOUT = -1;
 
     // Waiting for backup agent to respond during backup operation.
     public static final int OP_TYPE_BACKUP_WAIT = 0;
diff --git a/services/backup/java/com/android/server/backup/internal/LifecycleOperationStorage.java b/services/backup/java/com/android/server/backup/internal/LifecycleOperationStorage.java
new file mode 100644
index 0000000..b58c973
--- /dev/null
+++ b/services/backup/java/com/android/server/backup/internal/LifecycleOperationStorage.java
@@ -0,0 +1,337 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.backup.internal;
+
+import static com.android.server.backup.BackupManagerService.DEBUG;
+import static com.android.server.backup.BackupManagerService.MORE_DEBUG;
+
+import android.annotation.UserIdInt;
+import android.util.Slog;
+import android.util.SparseArray;
+
+import com.android.internal.annotations.GuardedBy;
+import com.android.server.backup.BackupRestoreTask;
+import com.android.server.backup.OperationStorage;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.IntConsumer;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+/**
+ * LifecycleOperationStorage is responsible for maintaining a set of currently
+ * active operations.  Each operation has a type and state, and a callback that
+ * can receive events upon operation completion or cancellation.  It may also
+ * be associated with one or more package names.
+ *
+ * An operation wraps a {@link BackupRestoreTask} within it.
+ * It's the responsibility of this task to remove the operation from this array.
+ *
+ * If type of operation is {@code OP_TYPE_WAIT}, it is waiting for an ACK or
+ * timeout.
+ *
+ * A BackupRestore task gets notified of AVK/timeout for the operation via
+ * {@link BackupRestoreTask#handleCancel()},
+ * {@link BackupRestoreTask#operationComplete()} and {@code notifyAll} called
+ * on the {@code mCurrentOpLock}.
+ *
+ * {@link LifecycleOperationStorage#waitUntilOperationComplete(int)} is used in
+ * various places to 'wait' for notifyAll and detect change of pending state of
+ * an operation. So typically, an operation will be removed from this array by:
+ * - {@link BackupRestoreTask#handleCancel()} and
+ * - {@link BackupRestoreTask#operationComplete()} OR
+ *   {@link BackupRestoreTask#waitUntilOperationComplete()}.
+ * Do not remove at both these places because {@code waitUntilOperationComplete}
+ * relies on the operation being present to determine its completion status.
+ *
+ * If type of operation is {@code OP_BACKUP}, it is a task running backups. It
+ * provides a handle to cancel backup tasks.
+ */
+@ThreadSafe
+public class LifecycleOperationStorage implements OperationStorage {
+    private static final String TAG = "LifecycleOperationStorage";
+
+    private final int mUserId;
+
+    private final Object mOperationsLock = new Object();
+
+    // Bookkeeping of in-flight operations. The operation token is the index of
+    // the entry in the pending operations list.
+    @GuardedBy("mOperationsLock")
+    private final SparseArray<Operation> mOperations = new SparseArray<>();
+
+    // Association from package name to one or more operations relating to that
+    // package.
+    @GuardedBy("mOperationsLock")
+    private final Map<String, Set<Integer>> mOpTokensByPackage = new HashMap<>();
+
+    public LifecycleOperationStorage(@UserIdInt int userId) {
+        this.mUserId = userId;
+    }
+
+    /** See {@link OperationStorage#registerOperation()} */
+    @Override
+    public void registerOperation(int token, @OpState int initialState,
+            BackupRestoreTask task, @OpType int type) {
+        registerOperationForPackages(token, initialState, ImmutableSet.of(), task, type);
+    }
+
+    /** See {@link OperationStorage#registerOperationForPackages()} */
+    @Override
+    public void registerOperationForPackages(int token, @OpState int initialState,
+            Set<String> packageNames, BackupRestoreTask task, @OpType int type) {
+        synchronized (mOperationsLock) {
+            mOperations.put(token, new Operation(initialState, task, type));
+            for (String packageName : packageNames) {
+                Set<Integer> tokens = mOpTokensByPackage.get(packageName);
+                if (tokens == null) {
+                    tokens = new HashSet<Integer>();
+                }
+                tokens.add(token);
+                mOpTokensByPackage.put(packageName, tokens);
+            }
+        }
+    }
+
+    /** See {@link OperationStorage#removeOperation()} */
+    @Override
+    public void removeOperation(int token) {
+        synchronized (mOperationsLock) {
+            mOperations.remove(token);
+            ImmutableSet<String> packagesWithTokens =
+                    ImmutableSet.copyOf(mOpTokensByPackage.keySet());
+            for (String packageName : packagesWithTokens) {
+                Set<Integer> tokens = mOpTokensByPackage.get(packageName);
+                if (tokens == null) {
+                    continue;
+                }
+                tokens.remove(token);
+                mOpTokensByPackage.put(packageName, tokens);
+            }
+        }
+    }
+
+    /** See {@link OperationStorage#operationTokensForPackage()} */
+    @Override
+    public Set<Integer> operationTokensForPackage(String packageName) {
+        synchronized (mOperationsLock) {
+            Set<Integer> tokens = mOpTokensByPackage.get(packageName);
+            if (tokens == null) {
+                return ImmutableSet.of();
+            }
+            return ImmutableSet.copyOf(tokens);
+        }
+    }
+
+    /** See {@link OperationStorage#operationTokensForOpType()} */
+    @Override
+    public Set<Integer> operationTokensForOpType(@OpType int type) {
+        ImmutableSet.Builder<Integer> tokens = ImmutableSet.builder();
+        synchronized (mOperationsLock) {
+            for (int i = 0; i < mOperations.size(); i++) {
+                final Operation op = mOperations.valueAt(i);
+                final int token = mOperations.keyAt(i);
+                if (op.type == type) {
+                    tokens.add(token);
+                }
+            }
+            return tokens.build();
+        }
+    }
+
+    /** See {@link OperationStorage#operationTokensForOpState()} */
+    @Override
+    public Set<Integer> operationTokensForOpState(@OpState int state) {
+        ImmutableSet.Builder<Integer> tokens = ImmutableSet.builder();
+        synchronized (mOperationsLock) {
+            for (int i = 0; i < mOperations.size(); i++) {
+                final Operation op = mOperations.valueAt(i);
+                final int token = mOperations.keyAt(i);
+                if (op.state == state) {
+                    tokens.add(token);
+                }
+            }
+            return tokens.build();
+        }
+    }
+
+    /**
+     * A blocking function that blocks the caller until the operation identified
+     * by {@code token} is complete - either via a message from the backup,
+     * agent or through cancellation.
+     *
+     * @param token the operation token specified when registering the operation
+     * @param callback a lambda which is invoked once only when the operation
+     *                 completes - ie. if this method is called twice for the
+     *                 same token, the lambda is not invoked the second time.
+     * @return true if the operation was ACKed prior to or during this call.
+     */
+    public boolean waitUntilOperationComplete(int token, IntConsumer callback) {
+        if (MORE_DEBUG) {
+            Slog.i(TAG, "[UserID:" + mUserId + "] Blocking until operation complete for "
+                    + Integer.toHexString(token));
+        }
+        @OpState int finalState = OpState.PENDING;
+        Operation op = null;
+        synchronized (mOperationsLock) {
+            while (true) {
+                op = mOperations.get(token);
+                if (op == null) {
+                    // mysterious disappearance: treat as success with no callback
+                    break;
+                } else {
+                    if (op.state == OpState.PENDING) {
+                        try {
+                            mOperationsLock.wait();
+                        } catch (InterruptedException e) {
+                            Slog.w(TAG, "Waiting on mOperationsLock: ", e);
+                        }
+                        // When the wait is notified we loop around and recheck the current state
+                    } else {
+                        if (MORE_DEBUG) {
+                            Slog.d(TAG, "[UserID:" + mUserId
+                                    + "] Unblocked waiting for operation token="
+                                    + Integer.toHexString(token));
+                        }
+                        // No longer pending; we're done
+                        finalState = op.state;
+                        break;
+                    }
+                }
+            }
+        }
+
+        removeOperation(token);
+        if (op != null) {
+            callback.accept(op.type);
+        }
+        if (MORE_DEBUG) {
+            Slog.v(TAG, "[UserID:" + mUserId + "] operation " + Integer.toHexString(token)
+                    + " complete: finalState=" + finalState);
+        }
+        return finalState == OpState.ACKNOWLEDGED;
+    }
+
+    /**
+     * Signals that an ongoing operation is complete: after a currently-active
+     * backup agent has notified us that it has completed the outstanding
+     * asynchronous backup/restore operation identified by the supplied
+     * {@code} token.
+     *
+     * @param token the operation token specified when registering the operation
+     * @param result a result code or error code for the completed operation
+     * @param callback a lambda that is invoked if the completion moves the
+     *                 operation from PENDING to ACKNOWLEDGED state.
+     */
+    public void onOperationComplete(int token, long result, Consumer<BackupRestoreTask> callback) {
+        if (MORE_DEBUG) {
+            Slog.v(TAG, "[UserID:" + mUserId + "] onOperationComplete: "
+                    + Integer.toHexString(token) + " result=" + result);
+        }
+        Operation op = null;
+        synchronized (mOperationsLock) {
+            op = mOperations.get(token);
+            if (op != null) {
+                if (op.state == OpState.TIMEOUT) {
+                    // The operation already timed out, and this is a late response.  Tidy up
+                    // and ignore it; we've already dealt with the timeout.
+                    op = null;
+                    mOperations.remove(token);
+                } else if (op.state == OpState.ACKNOWLEDGED) {
+                    if (DEBUG) {
+                        Slog.w(TAG, "[UserID:" + mUserId + "] Received duplicate ack for token="
+                                + Integer.toHexString(token));
+                    }
+                    op = null;
+                    mOperations.remove(token);
+                } else if (op.state == OpState.PENDING) {
+                    // Can't delete op from mOperations. waitUntilOperationComplete can be
+                    // called after we we receive this call.
+                    op.state = OpState.ACKNOWLEDGED;
+                }
+            }
+            mOperationsLock.notifyAll();
+        }
+
+        // Invoke the operation's completion callback, if there is one.
+        if (op != null && op.callback != null) {
+            callback.accept(op.callback);
+        }
+    }
+
+    /**
+     * Cancel the operation associated with {@code token}.  Cancellation may be
+     * propagated to the operation's callback (a {@link BackupRestoreTask}) if
+     * the operation has one, and the cancellation is due to the operation
+     * timing out.
+     *
+     * @param token the operation token specified when registering the operation
+     * @param cancelAll this is passed on when propagating the cancellation
+     * @param operationTimedOutCallback a lambda that is invoked with the
+     *                                  operation type where the operation is
+     *                                  cancelled due to timeout, allowing the
+     *                                  caller to do type-specific clean-ups.
+     */
+    public void cancelOperation(
+            int token, boolean cancelAll, IntConsumer operationTimedOutCallback) {
+        // Notify any synchronous waiters
+        Operation op = null;
+        synchronized (mOperationsLock) {
+            op = mOperations.get(token);
+            if (MORE_DEBUG) {
+                if (op == null) {
+                    Slog.w(TAG, "[UserID:" + mUserId + "] Cancel of token "
+                            + Integer.toHexString(token) + " but no op found");
+                }
+            }
+            int state = (op != null) ? op.state : OpState.TIMEOUT;
+            if (state == OpState.ACKNOWLEDGED) {
+                // The operation finished cleanly, so we have nothing more to do.
+                if (DEBUG) {
+                    Slog.w(TAG, "[UserID:" + mUserId + "] Operation already got an ack."
+                            + "Should have been removed from mCurrentOperations.");
+                }
+                op = null;
+                mOperations.delete(token);
+            } else if (state == OpState.PENDING) {
+                if (DEBUG) {
+                    Slog.v(TAG, "[UserID:" + mUserId + "] Cancel: token="
+                            + Integer.toHexString(token));
+                }
+                op.state = OpState.TIMEOUT;
+                // Can't delete op from mOperations here. waitUntilOperationComplete may be
+                // called after we receive cancel here. We need this op's state there.
+                operationTimedOutCallback.accept(op.type);
+            }
+            mOperationsLock.notifyAll();
+        }
+
+        // If there's a TimeoutHandler for this event, call it
+        if (op != null && op.callback != null) {
+            if (MORE_DEBUG) {
+                Slog.v(TAG, "[UserID:" + mUserId + "   Invoking cancel on " + op.callback);
+            }
+            op.callback.handleCancel(cancelAll);
+        }
+    }
+};
diff --git a/services/tests/servicestests/src/com/android/server/backup/internal/LifecycleOperationStorageTest.java b/services/tests/servicestests/src/com/android/server/backup/internal/LifecycleOperationStorageTest.java
new file mode 100644
index 0000000..c079c2d
--- /dev/null
+++ b/services/tests/servicestests/src/com/android/server/backup/internal/LifecycleOperationStorageTest.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2021 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.android.server.backup.internal;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import static org.mockito.Mockito.verify;
+
+import android.platform.test.annotations.Presubmit;
+
+import androidx.test.runner.AndroidJUnit4;
+
+import com.android.server.backup.BackupRestoreTask;
+import com.android.server.backup.OperationStorage.OpState;
+import com.android.server.backup.OperationStorage.OpType;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Set;
+
+@Presubmit
+@RunWith(AndroidJUnit4.class)
+public class LifecycleOperationStorageTest {
+    private static final int USER_ID = 0;
+    private static final int TOKEN_1 = 1;
+    private static final int TOKEN_2 = 2;
+    private static final long RESULT = 123L;
+
+    private static final String PKG_FOO = "com.android.foo";
+    private static final String PKG_BAR = "com.android.bar";
+    private static final String PKG_BAZ = "com.android.baz";
+    private static final ImmutableSet<String> MULTIPLE_PKG    = ImmutableSet.of(PKG_FOO);
+    private static final ImmutableSet<String> MULTIPLE_PKGS_1 = ImmutableSet.of(PKG_FOO, PKG_BAR);
+    private static final ImmutableSet<String> MULTIPLE_PKGS_2 = ImmutableSet.of(PKG_BAR, PKG_BAZ);
+
+    @Mock private BackupRestoreTask mCallback;
+    private LifecycleOperationStorage mOpStorage;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(/* testClass */ this);
+        mOpStorage = new LifecycleOperationStorage(USER_ID);
+    }
+
+    @After
+    public void tearDown() {}
+
+    @Test
+    public void testRegisterOperation_singleOperation() throws Exception {
+        mOpStorage.registerOperation(TOKEN_1, OpState.PENDING, mCallback, OpType.BACKUP_WAIT);
+
+        Set<Integer> tokens = mOpStorage.operationTokensForOpType(OpType.BACKUP_WAIT);
+
+        assertThat(tokens).isEqualTo(only(TOKEN_1));
+    }
+
+    @Test
+    public void testRegisterOperation_multipleOperations() throws Exception {
+        mOpStorage.registerOperation(TOKEN_1, OpState.PENDING, mCallback, OpType.BACKUP_WAIT);
+        mOpStorage.registerOperation(TOKEN_2, OpState.ACKNOWLEDGED, mCallback, OpType.BACKUP_WAIT);
+
+        Set<Integer> typeWaitTokens = mOpStorage.operationTokensForOpType(OpType.BACKUP_WAIT);
+        Set<Integer> statePendingTokens = mOpStorage.operationTokensForOpState(OpState.PENDING);
+        Set<Integer> stateAcknowledgedTokens =
+                mOpStorage.operationTokensForOpState(OpState.ACKNOWLEDGED);
+
+        assertThat(typeWaitTokens).isEqualTo(ImmutableSet.of(TOKEN_1, TOKEN_2));
+        assertThat(statePendingTokens).isEqualTo(only(TOKEN_1));
+        assertThat(stateAcknowledgedTokens).isEqualTo(only(TOKEN_2));
+    }
+
+    @Test
+    public void testRegisterOperationForPackages_singlePackage() throws Exception {
+        mOpStorage.registerOperationForPackages(TOKEN_1, OpState.PENDING,
+                MULTIPLE_PKG, mCallback, OpType.BACKUP_WAIT);
+
+        Set<Integer> tokens = mOpStorage.operationTokensForPackage(PKG_FOO);
+
+        assertThat(tokens).isEqualTo(only(TOKEN_1));
+    }
+
+    @Test
+    public void testRegisterOperationForPackages_multiplePackage() throws Exception {
+        mOpStorage.registerOperationForPackages(TOKEN_1, OpState.PENDING,
+                MULTIPLE_PKGS_1, mCallback, OpType.BACKUP);
+        mOpStorage.registerOperationForPackages(TOKEN_2, OpState.PENDING,
+                MULTIPLE_PKGS_2, mCallback, OpType.BACKUP);
+
+        Set<Integer> tokensFoo = mOpStorage.operationTokensForPackage(PKG_FOO);
+        Set<Integer> tokensBar = mOpStorage.operationTokensForPackage(PKG_BAR);
+        Set<Integer> tokensBaz = mOpStorage.operationTokensForPackage(PKG_BAZ);
+
+        assertThat(tokensFoo).isEqualTo(only(TOKEN_1));
+        assertThat(tokensBar).isEqualTo(ImmutableSet.of(TOKEN_1, TOKEN_2));
+        assertThat(tokensBaz).isEqualTo(only(TOKEN_2));
+    }
+
+    @Test
+    public void testRemoveOperation() throws Exception {
+        mOpStorage.registerOperation(TOKEN_2, OpState.PENDING, mCallback, OpType.BACKUP_WAIT);
+
+        Set<Integer> typeWaitTokens = mOpStorage.operationTokensForOpType(OpType.BACKUP_WAIT);
+        Set<Integer> statePendingTokens = mOpStorage.operationTokensForOpState(OpState.PENDING);
+
+        assertThat(typeWaitTokens).isEqualTo(only(TOKEN_2));
+        assertThat(statePendingTokens).isEqualTo(only(TOKEN_2));
+
+        mOpStorage.removeOperation(TOKEN_2);
+
+        typeWaitTokens = mOpStorage.operationTokensForOpType(OpType.BACKUP_WAIT);
+        statePendingTokens = mOpStorage.operationTokensForOpState(OpState.PENDING);
+
+        assertThat(typeWaitTokens).isEmpty();
+        assertThat(statePendingTokens).isEmpty();
+    }
+
+    @Test
+    public void testRemoveOperation_removesPackageMappings() throws Exception {
+        mOpStorage.registerOperationForPackages(TOKEN_1, OpState.PENDING, MULTIPLE_PKGS_1,
+                mCallback, OpType.BACKUP);
+        mOpStorage.registerOperationForPackages(TOKEN_2, OpState.PENDING, MULTIPLE_PKGS_2,
+                mCallback, OpType.BACKUP);
+
+        mOpStorage.removeOperation(TOKEN_2);
+
+        Set<Integer> tokensFoo = mOpStorage.operationTokensForPackage(PKG_FOO);
+        Set<Integer> tokensBar = mOpStorage.operationTokensForPackage(PKG_BAR);
+        Set<Integer> tokensBaz = mOpStorage.operationTokensForPackage(PKG_BAZ);
+
+        assertThat(tokensFoo).isEqualTo(only(TOKEN_1));
+        assertThat(tokensBar).isEqualTo(only(TOKEN_1));
+        assertThat(tokensBaz).isEmpty();
+    }
+
+    @Test
+    public void testOnOperationComplete_pendingAdvancesState_invokesCallback() throws Exception {
+        mOpStorage.registerOperation(TOKEN_1, OpState.PENDING, mCallback, OpType.BACKUP_WAIT);
+
+        mOpStorage.onOperationComplete(TOKEN_1, RESULT, callback -> {
+            mCallback.operationComplete(RESULT);
+        });
+
+        assertThat(mOpStorage.operationTokensForOpType(OpType.BACKUP_WAIT))
+                .isEqualTo(only(TOKEN_1));
+        assertThat(mOpStorage.operationTokensForOpState(OpState.PENDING)).isEmpty();
+        assertThat(mOpStorage.operationTokensForOpState(OpState.ACKNOWLEDGED)).isNotEmpty();
+        verify(mCallback).operationComplete(RESULT);
+    }
+
+    private Set<Integer> only(Integer val) {
+        return ImmutableSet.of(val);
+    }
+}