Avoid linear scans of database tables

Some places in Keystore perform linear scans of all rows in a database
table.  This can cause slowdown and ANRs if the device has an excessive
number of keys.

Problem areas:
- The GC process (in `handle_next_superseded_blobs`) does a linear scan
  of the `blobentry` table for rows where the corresponding key either
  has a newer `blobentry` (of the same type), or has been deleted.
- The GC process (in `cleanup_unreferenced`) does a linear scan of
  the `keyentry` table to find entries in the `Unreferenced` state,
  and repeats this for several dependent tables.
- Listing an app's keys allows for batching, but the retrieval for
  each batch queries the database for every key (even though only a
  subset will be returned).

Update the database schema to avoid the need for these linear scans:
- Add a `state INTEGER` column to the `blobentry` table, used to
  mark whether a row is current, superseded, or orphaned.
- Add an index on the new `state` column in the `blobentry` table.
- Add an index on the (existing) `state` column in the `keyentry` table.
- Add a 1->2 upgrade step that adds the column and the indices, and
  sets the `state` value according to whether the blobentry
  is the most recent of its type.  This is essentially the same
  logic/query as was previously in `handle_next_superseded_blob`,
  only happening once at upgrade time.

Note that the schema changes are *not* flag-controlled, because that
would require additional code to downgrade the schema if the flag were
to be flipped back.

Update behaviour as follows:
- Add a limit of 10,000 rows when retrieving a batch of keys/aliases for
  an app.
- On deleting a `keyentry`, mark any corresponding `blobentry`
  rows as superseded.
- On adding a new `blobentry` for a key, mark any already-existing
  `blobentry` rows for the same (key, subcomponent_type) as superseded.
- To perform garbage collection in `handle_next_superseded_blobs`, just
  - query for N keyblob rows that are not current, ready to delete them
    via KeyMint.
  - query for all non-keyblob rows that are not current, and delete them
    there and then.

Note that the flag only affects the last of these (the GC process).

Bug: 319563050
Test: keystore2_test
Test: keystore2_client_tests
Test: modified CtsVerifier to create and use lots of keys
Test: force SPL upgrade and check keys usable
Flag: android.security.keystore2.use_blob_state_column
Change-Id: I7019bb530cce195d4a7a290c8c43f078d8fe8a76
diff --git a/keystore2/aconfig/flags.aconfig b/keystore2/aconfig/flags.aconfig
index 05dae46..bf4ed51 100644
--- a/keystore2/aconfig/flags.aconfig
+++ b/keystore2/aconfig/flags.aconfig
@@ -32,3 +32,11 @@
   bug: "283077822"
   is_fixed_read_only: true
 }
+
+flag {
+  name: "use_blob_state_column"
+  namespace: "hardware_backed_security"
+  description: "Use state database column to track superseded blobentry rows"
+  bug: "319563050"
+  is_fixed_read_only: true
+}
diff --git a/keystore2/src/database.rs b/keystore2/src/database.rs
index 8165c54..369e808 100644
--- a/keystore2/src/database.rs
+++ b/keystore2/src/database.rs
@@ -500,6 +500,40 @@
     }
 }
 
+/// Current state of a `blobentry` row.
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone, Default)]
+enum BlobState {
+    #[default]
+    /// Current blobentry (of its `subcomponent_type`) for the associated key.
+    Current,
+    /// Blobentry that is no longer the current blob (of its `subcomponent_type`) for the associated
+    /// key.
+    Superseded,
+    /// Blobentry for a key that no longer exists.
+    Orphaned,
+}
+
+impl ToSql for BlobState {
+    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> {
+        match self {
+            Self::Current => Ok(ToSqlOutput::Owned(Value::Integer(0))),
+            Self::Superseded => Ok(ToSqlOutput::Owned(Value::Integer(1))),
+            Self::Orphaned => Ok(ToSqlOutput::Owned(Value::Integer(2))),
+        }
+    }
+}
+
+impl FromSql for BlobState {
+    fn column_result(value: ValueRef) -> FromSqlResult<Self> {
+        match i64::column_result(value)? {
+            0 => Ok(Self::Current),
+            1 => Ok(Self::Superseded),
+            2 => Ok(Self::Orphaned),
+            v => Err(FromSqlError::OutOfRange(v)),
+        }
+    }
+}
+
 /// Keys have a KeyMint blob component and optional public certificate and
 /// certificate chain components.
 /// KeyEntryLoadBits is a bitmap that indicates to `KeystoreDB::load_key_entry`
@@ -870,8 +904,9 @@
 
 impl KeystoreDB {
     const UNASSIGNED_KEY_ID: i64 = -1i64;
-    const CURRENT_DB_VERSION: u32 = 1;
-    const UPGRADERS: &'static [fn(&Transaction) -> Result<u32>] = &[Self::from_0_to_1];
+    const CURRENT_DB_VERSION: u32 = 2;
+    const UPGRADERS: &'static [fn(&Transaction) -> Result<u32>] =
+        &[Self::from_0_to_1, Self::from_1_to_2];
 
     /// Name of the file that holds the cross-boot persistent database.
     pub const PERSISTENT_DB_FILENAME: &'static str = "persistent.sqlite";
@@ -913,9 +948,77 @@
             params![KeyLifeCycle::Unreferenced, Tag::MAX_BOOT_LEVEL.0, BlobMetaData::MaxBootLevel],
         )
         .context(ks_err!("Failed to delete logical boot level keys."))?;
+
+        // DB version is now 1.
         Ok(1)
     }
 
+    // This upgrade function adds an additional `state INTEGER` column to the blobentry
+    // table, and populates it based on whether each blob is the most recent of its type for
+    // the corresponding key.
+    fn from_1_to_2(tx: &Transaction) -> Result<u32> {
+        tx.execute(
+            "ALTER TABLE persistent.blobentry ADD COLUMN state INTEGER DEFAULT 0;",
+            params![],
+        )
+        .context(ks_err!("Failed to add state column"))?;
+
+        // Mark keyblobs that are not the most recent for their corresponding key.
+        // This may take a while if there are excessive numbers of keys in the database.
+        let _wp = wd::watch("KeystoreDB::from_1_to_2 mark all non-current keyblobs");
+        let sc_key_blob = SubComponentType::KEY_BLOB;
+        let mut stmt = tx
+            .prepare(
+                "UPDATE persistent.blobentry SET state=?
+                     WHERE subcomponent_type = ?
+                     AND id NOT IN (
+                             SELECT MAX(id) FROM persistent.blobentry
+                             WHERE subcomponent_type = ?
+                             GROUP BY keyentryid, subcomponent_type
+                         );",
+            )
+            .context("Trying to prepare query to mark superseded keyblobs")?;
+        stmt.execute(params![BlobState::Superseded, sc_key_blob, sc_key_blob])
+            .context(ks_err!("Failed to set state=superseded state for keyblobs"))?;
+        log::info!("marked non-current blobentry rows for keyblobs as superseded");
+
+        // Mark keyblobs that don't have a corresponding key.
+        // This may take a while if there are excessive numbers of keys in the database.
+        let _wp = wd::watch("KeystoreDB::from_1_to_2 mark all orphaned keyblobs");
+        let mut stmt = tx
+            .prepare(
+                "UPDATE persistent.blobentry SET state=?
+                     WHERE subcomponent_type = ?
+                     AND NOT EXISTS (SELECT id FROM persistent.keyentry
+                                     WHERE id = keyentryid);",
+            )
+            .context("Trying to prepare query to mark orphaned keyblobs")?;
+        stmt.execute(params![BlobState::Orphaned, sc_key_blob])
+            .context(ks_err!("Failed to set state=orphaned for keyblobs"))?;
+        log::info!("marked orphaned blobentry rows for keyblobs");
+
+        // Add an index to make it fast to find out of date blobentry rows.
+        let _wp = wd::watch("KeystoreDB::from_1_to_2 add blobentry index");
+        tx.execute(
+            "CREATE INDEX IF NOT EXISTS persistent.blobentry_state_index
+            ON blobentry(subcomponent_type, state);",
+            [],
+        )
+        .context("Failed to create index blobentry_state_index.")?;
+
+        // Add an index to make it fast to find unreferenced keyentry rows.
+        let _wp = wd::watch("KeystoreDB::from_1_to_2 add keyentry state index");
+        tx.execute(
+            "CREATE INDEX IF NOT EXISTS persistent.keyentry_state_index
+            ON keyentry(state);",
+            [],
+        )
+        .context("Failed to create index keyentry_state_index.")?;
+
+        // DB version is now 2.
+        Ok(2)
+    }
+
     fn init_tables(tx: &Transaction) -> Result<()> {
         tx.execute(
             "CREATE TABLE IF NOT EXISTS persistent.keyentry (
@@ -944,12 +1047,21 @@
         )
         .context("Failed to create index keyentry_domain_namespace_index.")?;
 
+        // Index added in v2 of database schema.
+        tx.execute(
+            "CREATE INDEX IF NOT EXISTS persistent.keyentry_state_index
+            ON keyentry(state);",
+            [],
+        )
+        .context("Failed to create index keyentry_state_index.")?;
+
         tx.execute(
             "CREATE TABLE IF NOT EXISTS persistent.blobentry (
                     id INTEGER PRIMARY KEY,
                     subcomponent_type INTEGER,
                     keyentryid INTEGER,
-                    blob BLOB);",
+                    blob BLOB,
+                    state INTEGER DEFAULT 0);", // `state` added in v2 of schema
             [],
         )
         .context("Failed to initialize \"blobentry\" table.")?;
@@ -961,6 +1073,14 @@
         )
         .context("Failed to create index blobentry_keyentryid_index.")?;
 
+        // Index added in v2 of database schema.
+        tx.execute(
+            "CREATE INDEX IF NOT EXISTS persistent.blobentry_state_index
+            ON blobentry(subcomponent_type, state);",
+            [],
+        )
+        .context("Failed to create index blobentry_state_index.")?;
+
         tx.execute(
             "CREATE TABLE IF NOT EXISTS persistent.blobmetadata (
                      id INTEGER PRIMARY KEY,
@@ -1195,9 +1315,28 @@
 
             Self::cleanup_unreferenced(tx).context("Trying to cleanup unreferenced.")?;
 
-            // Find up to `max_blobs` more superseded key blobs, load their metadata and return it.
-            let result: Vec<(i64, Vec<u8>)> = {
-                let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob find_next");
+            // Find up to `max_blobs` more out-of-date key blobs, load their metadata and return it.
+            let result: Vec<(i64, Vec<u8>)> = if keystore2_flags::use_blob_state_column() {
+                let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob find_next v2");
+                let mut stmt = tx
+                    .prepare(
+                        "SELECT id, blob FROM persistent.blobentry
+                        WHERE subcomponent_type = ? AND state != ?
+                        LIMIT ?;",
+                    )
+                    .context("Trying to prepare query for superseded blobs.")?;
+
+                let rows = stmt
+                    .query_map(
+                        params![SubComponentType::KEY_BLOB, BlobState::Current, max_blobs as i64],
+                        |row| Ok((row.get(0)?, row.get(1)?)),
+                    )
+                    .context("Trying to query superseded blob.")?;
+
+                rows.collect::<Result<Vec<(i64, Vec<u8>)>, rusqlite::Error>>()
+                    .context("Trying to extract superseded blobs.")?
+            } else {
+                let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob find_next v1");
                 let mut stmt = tx
                     .prepare(
                         "SELECT id, blob FROM persistent.blobentry
@@ -1244,22 +1383,32 @@
                 return Ok(result).no_gc();
             }
 
-            // We did not find any superseded key blob, so let's remove other superseded blob in
-            // one transaction.
-            let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob delete");
-            tx.execute(
-                "DELETE FROM persistent.blobentry
-                 WHERE NOT subcomponent_type = ?
-                 AND (
-                     id NOT IN (
-                        SELECT MAX(id) FROM persistent.blobentry
-                        WHERE NOT subcomponent_type = ?
-                        GROUP BY keyentryid, subcomponent_type
-                     ) OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
-                 );",
-                params![SubComponentType::KEY_BLOB, SubComponentType::KEY_BLOB],
-            )
-            .context("Trying to purge superseded blobs.")?;
+            // We did not find any out-of-date key blobs, so let's remove other types of superseded
+            // blob in one transaction.
+            if keystore2_flags::use_blob_state_column() {
+                let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob delete v2");
+                tx.execute(
+                    "DELETE FROM persistent.blobentry
+                    WHERE subcomponent_type != ? AND state != ?;",
+                    params![SubComponentType::KEY_BLOB, BlobState::Current],
+                )
+                .context("Trying to purge out-of-date blobs (other than keyblobs)")?;
+            } else {
+                let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob delete v1");
+                tx.execute(
+                    "DELETE FROM persistent.blobentry
+                    WHERE NOT subcomponent_type = ?
+                    AND (
+                        id NOT IN (
+                           SELECT MAX(id) FROM persistent.blobentry
+                           WHERE NOT subcomponent_type = ?
+                           GROUP BY keyentryid, subcomponent_type
+                        ) OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
+                    );",
+                    params![SubComponentType::KEY_BLOB, SubComponentType::KEY_BLOB],
+                )
+                .context("Trying to purge superseded blobs.")?;
+            }
 
             Ok(vec![]).no_gc()
         })
@@ -1530,18 +1679,33 @@
     ) -> Result<()> {
         match (blob, sc_type) {
             (Some(blob), _) => {
+                // Mark any previous blobentry(s) of the same type for the same key as superseded.
+                tx.execute(
+                    "UPDATE persistent.blobentry SET state = ?
+                    WHERE keyentryid = ? AND subcomponent_type = ?",
+                    params![BlobState::Superseded, key_id, sc_type],
+                )
+                .context(ks_err!(
+                    "Failed to mark prior {sc_type:?} blobentrys for {key_id} as superseded"
+                ))?;
+
+                // Now insert the new, un-superseded, blob.  (If this fails, the marking of
+                // old blobs as superseded will be rolled back, because we're inside a
+                // transaction.)
                 tx.execute(
                     "INSERT INTO persistent.blobentry
                      (subcomponent_type, keyentryid, blob) VALUES (?, ?, ?);",
                     params![sc_type, key_id, blob],
                 )
                 .context(ks_err!("Failed to insert blob."))?;
+
                 if let Some(blob_metadata) = blob_metadata {
                     let blob_id = tx
                         .query_row("SELECT MAX(id) FROM persistent.blobentry;", [], |row| {
                             row.get(0)
                         })
                         .context(ks_err!("Failed to get new blob id."))?;
+
                     blob_metadata
                         .store_in_db(blob_id, tx)
                         .context(ks_err!("Trying to store blob metadata."))?;
@@ -2263,6 +2427,15 @@
             .context("Trying to delete keyparameters.")?;
         tx.execute("DELETE FROM persistent.grant WHERE keyentryid = ?;", params![key_id])
             .context("Trying to delete grants.")?;
+        // The associated blobentry rows are not immediately deleted when the owning keyentry is
+        // removed, because a KeyMint `deleteKey()` invocation is needed (specifically for the
+        // `KEY_BLOB`).  Mark the affected rows with `state=Orphaned` so a subsequent garbage
+        // collection can do this.
+        tx.execute(
+            "UPDATE persistent.blobentry SET state = ? WHERE keyentryid = ?",
+            params![BlobState::Orphaned, key_id],
+        )
+        .context("Trying to mark blobentrys as superseded")?;
         Ok(updated != 0)
     }
 
@@ -2547,6 +2720,8 @@
     /// The key descriptors will have the domain, nspace, and alias field set.
     /// The returned list will be sorted by alias.
     /// Domain must be APP or SELINUX, the caller must make sure of that.
+    /// Number of returned values is limited to 10,000 (which is empirically roughly
+    /// what will fit in a Binder message).
     pub fn list_past_alias(
         &mut self,
         domain: Domain,
@@ -2564,7 +2739,8 @@
                      AND state = ?
                      AND key_type = ?
                      {}
-                     ORDER BY alias ASC;",
+                     ORDER BY alias ASC
+                     LIMIT 10000;",
             if start_past_alias.is_some() { " AND alias > ?" } else { "" }
         );
 
diff --git a/keystore2/src/database/tests.rs b/keystore2/src/database/tests.rs
index 5f882cd..381a45a 100644
--- a/keystore2/src/database/tests.rs
+++ b/keystore2/src/database/tests.rs
@@ -2429,6 +2429,20 @@
     .unwrap()
 }
 
+fn blob_count_in_state(db: &mut KeystoreDB, sc_type: SubComponentType, state: BlobState) -> usize {
+    db.with_transaction(TransactionBehavior::Deferred, |tx| {
+        tx.query_row(
+            "SELECT COUNT(*) FROM persistent.blobentry
+                     WHERE subcomponent_type = ? AND state = ?;",
+            params![sc_type, state],
+            |row| row.get(0),
+        )
+        .context(ks_err!("Failed to count number of {sc_type:?} blobs"))
+        .no_gc()
+    })
+    .unwrap()
+}
+
 #[test]
 fn test_blobentry_gc() -> Result<()> {
     let mut db = new_test_db()?;
@@ -2439,6 +2453,9 @@
     let key_id5 = make_test_key_entry(&mut db, Domain::APP, 5, "key5", None)?.0;
 
     assert_eq!(5, blob_count(&mut db, SubComponentType::KEY_BLOB));
+    assert_eq!(5, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Current));
+    assert_eq!(0, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Superseded));
+    assert_eq!(0, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Orphaned));
     assert_eq!(5, blob_count(&mut db, SubComponentType::CERT));
     assert_eq!(5, blob_count(&mut db, SubComponentType::CERT_CHAIN));
 
@@ -2447,6 +2464,9 @@
     db.set_blob(&key_guard3, SubComponentType::KEY_BLOB, Some(&[1, 2, 3]), None)?;
 
     assert_eq!(7, blob_count(&mut db, SubComponentType::KEY_BLOB));
+    assert_eq!(5, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Current));
+    assert_eq!(2, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Superseded));
+    assert_eq!(0, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Orphaned));
     assert_eq!(5, blob_count(&mut db, SubComponentType::CERT));
     assert_eq!(5, blob_count(&mut db, SubComponentType::CERT_CHAIN));
 
@@ -2459,6 +2479,9 @@
     .unwrap();
 
     assert_eq!(7, blob_count(&mut db, SubComponentType::KEY_BLOB));
+    assert_eq!(3, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Current));
+    assert_eq!(2, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Superseded));
+    assert_eq!(2, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Orphaned));
     assert_eq!(5, blob_count(&mut db, SubComponentType::CERT));
     assert_eq!(5, blob_count(&mut db, SubComponentType::CERT_CHAIN));
 
@@ -2468,6 +2491,9 @@
     let superseded_ids: Vec<i64> = superseded.iter().map(|v| v.blob_id).collect();
     assert_eq!(4, superseded.len());
     assert_eq!(7, blob_count(&mut db, SubComponentType::KEY_BLOB));
+    assert_eq!(3, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Current));
+    assert_eq!(2, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Superseded));
+    assert_eq!(2, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Orphaned));
     assert_eq!(5, blob_count(&mut db, SubComponentType::CERT));
     assert_eq!(5, blob_count(&mut db, SubComponentType::CERT_CHAIN));
 
@@ -2477,6 +2503,9 @@
     let superseded_ids: Vec<i64> = superseded.iter().map(|v| v.blob_id).collect();
     assert_eq!(0, superseded.len());
     assert_eq!(3, blob_count(&mut db, SubComponentType::KEY_BLOB));
+    assert_eq!(3, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Current));
+    assert_eq!(0, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Superseded));
+    assert_eq!(0, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Orphaned));
     assert_eq!(3, blob_count(&mut db, SubComponentType::CERT));
     assert_eq!(3, blob_count(&mut db, SubComponentType::CERT_CHAIN));
 
@@ -2484,6 +2513,9 @@
     let superseded = db.handle_next_superseded_blobs(&superseded_ids, 20).unwrap();
     assert_eq!(0, superseded.len());
     assert_eq!(3, blob_count(&mut db, SubComponentType::KEY_BLOB));
+    assert_eq!(3, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Current));
+    assert_eq!(0, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Superseded));
+    assert_eq!(0, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Orphaned));
     assert_eq!(3, blob_count(&mut db, SubComponentType::CERT));
     assert_eq!(3, blob_count(&mut db, SubComponentType::CERT_CHAIN));
 
@@ -2491,6 +2523,55 @@
 }
 
 #[test]
+fn test_upgrade_1_to_2() -> Result<()> {
+    let mut db = new_test_db()?;
+    let _key_id1 = make_test_key_entry(&mut db, Domain::APP, 1, "key1", None)?.0;
+    let key_guard2 = make_test_key_entry(&mut db, Domain::APP, 2, "key2", None)?;
+    let key_guard3 = make_test_key_entry(&mut db, Domain::APP, 3, "key3", None)?;
+    let key_id4 = make_test_key_entry(&mut db, Domain::APP, 4, "key4", None)?.0;
+    let key_id5 = make_test_key_entry(&mut db, Domain::APP, 5, "key5", None)?.0;
+
+    // Replace the keyblobs for keys 2 and 3.  The previous blobs will still exist.
+    db.set_blob(&key_guard2, SubComponentType::KEY_BLOB, Some(&[1, 2, 3]), None)?;
+    db.set_blob(&key_guard3, SubComponentType::KEY_BLOB, Some(&[1, 2, 3]), None)?;
+
+    // Delete keys 4 and 5.  The keyblobs aren't removed yet.
+    db.with_transaction(Immediate("TX_delete_test_keys"), |tx| {
+        KeystoreDB::mark_unreferenced(tx, key_id4)?;
+        KeystoreDB::mark_unreferenced(tx, key_id5)?;
+        Ok(()).no_gc()
+    })
+    .unwrap();
+    assert_eq!(7, blob_count(&mut db, SubComponentType::KEY_BLOB));
+    assert_eq!(5, blob_count(&mut db, SubComponentType::CERT));
+    assert_eq!(5, blob_count(&mut db, SubComponentType::CERT_CHAIN));
+
+    // Manually downgrade the database to the v1 schema.
+    db.with_transaction(Immediate("TX_downgrade_2_to_1"), |tx| {
+        tx.execute("DROP INDEX persistent.keyentry_state_index;", params!())?;
+        tx.execute("DROP INDEX persistent.blobentry_state_index;", params!())?;
+        tx.execute("ALTER TABLE persistent.blobentry DROP COLUMN state;", params!())?;
+        Ok(()).no_gc()
+    })?;
+
+    // Run the upgrade process.
+    let version = db.with_transaction(Immediate("TX_upgrade_1_to_2"), |tx| {
+        KeystoreDB::from_1_to_2(tx).no_gc()
+    })?;
+    assert_eq!(version, 2);
+
+    // Check blobs have acquired the right `state` values.
+    assert_eq!(7, blob_count(&mut db, SubComponentType::KEY_BLOB));
+    assert_eq!(3, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Current));
+    assert_eq!(2, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Superseded));
+    assert_eq!(2, blob_count_in_state(&mut db, SubComponentType::KEY_BLOB, BlobState::Orphaned));
+    assert_eq!(5, blob_count(&mut db, SubComponentType::CERT));
+    assert_eq!(5, blob_count(&mut db, SubComponentType::CERT_CHAIN));
+
+    Ok(())
+}
+
+#[test]
 fn test_load_key_descriptor() -> Result<()> {
     let mut db = new_test_db()?;
     let key_id = make_test_key_entry(&mut db, Domain::APP, 1, TEST_ALIAS, None)?.0;
@@ -2652,6 +2733,16 @@
 where
     F: Fn(&mut KeystoreDB) -> T,
 {
+    prep_and_run_with_many_keys(max_count, |_db| (), test_fn)
+}
+
+/// Run the provided `test_fn` against the database at various increasing stages of
+/// database population.
+fn prep_and_run_with_many_keys<F, T, P>(max_count: usize, prep_fn: P, test_fn: F) -> Result<()>
+where
+    F: Fn(&mut KeystoreDB) -> T,
+    P: Fn(&mut KeystoreDB),
+{
     android_logger::init_once(
         android_logger::Config::default()
             .with_tag("keystore2_test")
@@ -2670,6 +2761,10 @@
         db_populate_keys(&mut db, next_keyid, key_count);
         assert_eq!(db_key_count(&mut db), key_count);
 
+        // Perform any test-specific preparation
+        prep_fn(&mut db);
+
+        // Time execution of the test function.
         let start = std::time::Instant::now();
         let _result = test_fn(&mut db);
         println!("{key_count}, {}", start.elapsed().as_secs_f64());
@@ -2753,3 +2848,27 @@
         }
     })
 }
+
+#[test]
+fn test_upgrade_1_to_2_with_many_keys() -> Result<()> {
+    prep_and_run_with_many_keys(
+        1_000_000,
+        |db: &mut KeystoreDB| {
+            // Manually downgrade the database to the v1 schema.
+            db.with_transaction(Immediate("TX_downgrade_2_to_1"), |tx| {
+                tx.execute("DROP INDEX persistent.keyentry_state_index;", params!())?;
+                tx.execute("DROP INDEX persistent.blobentry_state_index;", params!())?;
+                tx.execute("ALTER TABLE persistent.blobentry DROP COLUMN state;", params!())?;
+                Ok(()).no_gc()
+            })
+            .unwrap();
+        },
+        |db: &mut KeystoreDB| -> Result<()> {
+            // Run the upgrade process.
+            db.with_transaction(Immediate("TX_upgrade_1_to_2"), |tx| {
+                KeystoreDB::from_1_to_2(tx).no_gc()
+            })?;
+            Ok(())
+        },
+    )
+}
diff --git a/keystore2/src/database/versioning.rs b/keystore2/src/database/versioning.rs
index bc68f15..a047cf3 100644
--- a/keystore2/src/database/versioning.rs
+++ b/keystore2/src/database/versioning.rs
@@ -61,7 +61,7 @@
     Ok(version)
 }
 
-fn update_version(tx: &Transaction, new_version: u32) -> Result<()> {
+pub(crate) fn update_version(tx: &Transaction, new_version: u32) -> Result<()> {
     let updated = tx
         .execute("UPDATE persistent.version SET version = ? WHERE id = 0;", params![new_version])
         .context("In update_version: Failed to update row.")?;
@@ -82,9 +82,11 @@
     let mut db_version = create_or_get_version(tx, current_version)
         .context("In upgrade_database: Failed to get database version.")?;
     while db_version < current_version {
+        log::info!("Current DB version={db_version}, perform upgrade");
         db_version = upgraders[db_version as usize](tx).with_context(|| {
             format!("In upgrade_database: Trying to upgrade from db version {}.", db_version)
         })?;
+        log::info!("DB upgrade successful, current DB version now={db_version}");
     }
     update_version(tx, db_version).context("In upgrade_database.")
 }