Avoid linear scans of database tables

Some places in Keystore perform linear scans of all rows in a database
table.  This can cause slowdown and ANRs if the device has an excessive
number of keys.

Problem areas:
- The GC process (in `handle_next_superseded_blobs`) does a linear scan
  of the `blobentry` table for rows where the corresponding key either
  has a newer `blobentry` (of the same type), or has been deleted.
- The GC process (in `cleanup_unreferenced`) does a linear scan of
  the `keyentry` table to find entries in the `Unreferenced` state,
  and repeats this for several dependent tables.
- Listing an app's keys allows for batching, but the retrieval for
  each batch queries the database for every key (even though only a
  subset will be returned).

Update the database schema to avoid the need for these linear scans:
- Add a `state INTEGER` column to the `blobentry` table, used to
  mark whether a row is current, superseded, or orphaned.
- Add an index on the new `state` column in the `blobentry` table.
- Add an index on the (existing) `state` column in the `keyentry` table.
- Add a 1->2 upgrade step that adds the column and the indices, and
  sets the `state` value according to whether the blobentry
  is the most recent of its type.  This is essentially the same
  logic/query as was previously in `handle_next_superseded_blob`,
  only happening once at upgrade time.

Note that the schema changes are *not* flag-controlled, because that
would require additional code to downgrade the schema if the flag were
to be flipped back.

Update behaviour as follows:
- Add a limit of 10,000 rows when retrieving a batch of keys/aliases for
  an app.
- On deleting a `keyentry`, mark any corresponding `blobentry`
  rows as superseded.
- On adding a new `blobentry` for a key, mark any already-existing
  `blobentry` rows for the same (key, subcomponent_type) as superseded.
- To perform garbage collection in `handle_next_superseded_blobs`, just
  - query for N keyblob rows that are not current, ready to delete them
    via KeyMint.
  - query for all non-keyblob rows that are not current, and delete them
    there and then.

Note that the flag only affects the last of these (the GC process).

Bug: 319563050
Test: keystore2_test
Test: keystore2_client_tests
Test: modified CtsVerifier to create and use lots of keys
Test: force SPL upgrade and check keys usable
Flag: android.security.keystore2.use_blob_state_column
Change-Id: I7019bb530cce195d4a7a290c8c43f078d8fe8a76
diff --git a/keystore2/src/database.rs b/keystore2/src/database.rs
index 8165c54..369e808 100644
--- a/keystore2/src/database.rs
+++ b/keystore2/src/database.rs
@@ -500,6 +500,40 @@
     }
 }
 
+/// Current state of a `blobentry` row.
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone, Default)]
+enum BlobState {
+    #[default]
+    /// Current blobentry (of its `subcomponent_type`) for the associated key.
+    Current,
+    /// Blobentry that is no longer the current blob (of its `subcomponent_type`) for the associated
+    /// key.
+    Superseded,
+    /// Blobentry for a key that no longer exists.
+    Orphaned,
+}
+
+impl ToSql for BlobState {
+    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> {
+        match self {
+            Self::Current => Ok(ToSqlOutput::Owned(Value::Integer(0))),
+            Self::Superseded => Ok(ToSqlOutput::Owned(Value::Integer(1))),
+            Self::Orphaned => Ok(ToSqlOutput::Owned(Value::Integer(2))),
+        }
+    }
+}
+
+impl FromSql for BlobState {
+    fn column_result(value: ValueRef) -> FromSqlResult<Self> {
+        match i64::column_result(value)? {
+            0 => Ok(Self::Current),
+            1 => Ok(Self::Superseded),
+            2 => Ok(Self::Orphaned),
+            v => Err(FromSqlError::OutOfRange(v)),
+        }
+    }
+}
+
 /// Keys have a KeyMint blob component and optional public certificate and
 /// certificate chain components.
 /// KeyEntryLoadBits is a bitmap that indicates to `KeystoreDB::load_key_entry`
@@ -870,8 +904,9 @@
 
 impl KeystoreDB {
     const UNASSIGNED_KEY_ID: i64 = -1i64;
-    const CURRENT_DB_VERSION: u32 = 1;
-    const UPGRADERS: &'static [fn(&Transaction) -> Result<u32>] = &[Self::from_0_to_1];
+    const CURRENT_DB_VERSION: u32 = 2;
+    const UPGRADERS: &'static [fn(&Transaction) -> Result<u32>] =
+        &[Self::from_0_to_1, Self::from_1_to_2];
 
     /// Name of the file that holds the cross-boot persistent database.
     pub const PERSISTENT_DB_FILENAME: &'static str = "persistent.sqlite";
@@ -913,9 +948,77 @@
             params![KeyLifeCycle::Unreferenced, Tag::MAX_BOOT_LEVEL.0, BlobMetaData::MaxBootLevel],
         )
         .context(ks_err!("Failed to delete logical boot level keys."))?;
+
+        // DB version is now 1.
         Ok(1)
     }
 
+    // This upgrade function adds an additional `state INTEGER` column to the blobentry
+    // table, and populates it based on whether each blob is the most recent of its type for
+    // the corresponding key.
+    fn from_1_to_2(tx: &Transaction) -> Result<u32> {
+        tx.execute(
+            "ALTER TABLE persistent.blobentry ADD COLUMN state INTEGER DEFAULT 0;",
+            params![],
+        )
+        .context(ks_err!("Failed to add state column"))?;
+
+        // Mark keyblobs that are not the most recent for their corresponding key.
+        // This may take a while if there are excessive numbers of keys in the database.
+        let _wp = wd::watch("KeystoreDB::from_1_to_2 mark all non-current keyblobs");
+        let sc_key_blob = SubComponentType::KEY_BLOB;
+        let mut stmt = tx
+            .prepare(
+                "UPDATE persistent.blobentry SET state=?
+                     WHERE subcomponent_type = ?
+                     AND id NOT IN (
+                             SELECT MAX(id) FROM persistent.blobentry
+                             WHERE subcomponent_type = ?
+                             GROUP BY keyentryid, subcomponent_type
+                         );",
+            )
+            .context("Trying to prepare query to mark superseded keyblobs")?;
+        stmt.execute(params![BlobState::Superseded, sc_key_blob, sc_key_blob])
+            .context(ks_err!("Failed to set state=superseded state for keyblobs"))?;
+        log::info!("marked non-current blobentry rows for keyblobs as superseded");
+
+        // Mark keyblobs that don't have a corresponding key.
+        // This may take a while if there are excessive numbers of keys in the database.
+        let _wp = wd::watch("KeystoreDB::from_1_to_2 mark all orphaned keyblobs");
+        let mut stmt = tx
+            .prepare(
+                "UPDATE persistent.blobentry SET state=?
+                     WHERE subcomponent_type = ?
+                     AND NOT EXISTS (SELECT id FROM persistent.keyentry
+                                     WHERE id = keyentryid);",
+            )
+            .context("Trying to prepare query to mark orphaned keyblobs")?;
+        stmt.execute(params![BlobState::Orphaned, sc_key_blob])
+            .context(ks_err!("Failed to set state=orphaned for keyblobs"))?;
+        log::info!("marked orphaned blobentry rows for keyblobs");
+
+        // Add an index to make it fast to find out of date blobentry rows.
+        let _wp = wd::watch("KeystoreDB::from_1_to_2 add blobentry index");
+        tx.execute(
+            "CREATE INDEX IF NOT EXISTS persistent.blobentry_state_index
+            ON blobentry(subcomponent_type, state);",
+            [],
+        )
+        .context("Failed to create index blobentry_state_index.")?;
+
+        // Add an index to make it fast to find unreferenced keyentry rows.
+        let _wp = wd::watch("KeystoreDB::from_1_to_2 add keyentry state index");
+        tx.execute(
+            "CREATE INDEX IF NOT EXISTS persistent.keyentry_state_index
+            ON keyentry(state);",
+            [],
+        )
+        .context("Failed to create index keyentry_state_index.")?;
+
+        // DB version is now 2.
+        Ok(2)
+    }
+
     fn init_tables(tx: &Transaction) -> Result<()> {
         tx.execute(
             "CREATE TABLE IF NOT EXISTS persistent.keyentry (
@@ -944,12 +1047,21 @@
         )
         .context("Failed to create index keyentry_domain_namespace_index.")?;
 
+        // Index added in v2 of database schema.
+        tx.execute(
+            "CREATE INDEX IF NOT EXISTS persistent.keyentry_state_index
+            ON keyentry(state);",
+            [],
+        )
+        .context("Failed to create index keyentry_state_index.")?;
+
         tx.execute(
             "CREATE TABLE IF NOT EXISTS persistent.blobentry (
                     id INTEGER PRIMARY KEY,
                     subcomponent_type INTEGER,
                     keyentryid INTEGER,
-                    blob BLOB);",
+                    blob BLOB,
+                    state INTEGER DEFAULT 0);", // `state` added in v2 of schema
             [],
         )
         .context("Failed to initialize \"blobentry\" table.")?;
@@ -961,6 +1073,14 @@
         )
         .context("Failed to create index blobentry_keyentryid_index.")?;
 
+        // Index added in v2 of database schema.
+        tx.execute(
+            "CREATE INDEX IF NOT EXISTS persistent.blobentry_state_index
+            ON blobentry(subcomponent_type, state);",
+            [],
+        )
+        .context("Failed to create index blobentry_state_index.")?;
+
         tx.execute(
             "CREATE TABLE IF NOT EXISTS persistent.blobmetadata (
                      id INTEGER PRIMARY KEY,
@@ -1195,9 +1315,28 @@
 
             Self::cleanup_unreferenced(tx).context("Trying to cleanup unreferenced.")?;
 
-            // Find up to `max_blobs` more superseded key blobs, load their metadata and return it.
-            let result: Vec<(i64, Vec<u8>)> = {
-                let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob find_next");
+            // Find up to `max_blobs` more out-of-date key blobs, load their metadata and return it.
+            let result: Vec<(i64, Vec<u8>)> = if keystore2_flags::use_blob_state_column() {
+                let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob find_next v2");
+                let mut stmt = tx
+                    .prepare(
+                        "SELECT id, blob FROM persistent.blobentry
+                        WHERE subcomponent_type = ? AND state != ?
+                        LIMIT ?;",
+                    )
+                    .context("Trying to prepare query for superseded blobs.")?;
+
+                let rows = stmt
+                    .query_map(
+                        params![SubComponentType::KEY_BLOB, BlobState::Current, max_blobs as i64],
+                        |row| Ok((row.get(0)?, row.get(1)?)),
+                    )
+                    .context("Trying to query superseded blob.")?;
+
+                rows.collect::<Result<Vec<(i64, Vec<u8>)>, rusqlite::Error>>()
+                    .context("Trying to extract superseded blobs.")?
+            } else {
+                let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob find_next v1");
                 let mut stmt = tx
                     .prepare(
                         "SELECT id, blob FROM persistent.blobentry
@@ -1244,22 +1383,32 @@
                 return Ok(result).no_gc();
             }
 
-            // We did not find any superseded key blob, so let's remove other superseded blob in
-            // one transaction.
-            let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob delete");
-            tx.execute(
-                "DELETE FROM persistent.blobentry
-                 WHERE NOT subcomponent_type = ?
-                 AND (
-                     id NOT IN (
-                        SELECT MAX(id) FROM persistent.blobentry
-                        WHERE NOT subcomponent_type = ?
-                        GROUP BY keyentryid, subcomponent_type
-                     ) OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
-                 );",
-                params![SubComponentType::KEY_BLOB, SubComponentType::KEY_BLOB],
-            )
-            .context("Trying to purge superseded blobs.")?;
+            // We did not find any out-of-date key blobs, so let's remove other types of superseded
+            // blob in one transaction.
+            if keystore2_flags::use_blob_state_column() {
+                let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob delete v2");
+                tx.execute(
+                    "DELETE FROM persistent.blobentry
+                    WHERE subcomponent_type != ? AND state != ?;",
+                    params![SubComponentType::KEY_BLOB, BlobState::Current],
+                )
+                .context("Trying to purge out-of-date blobs (other than keyblobs)")?;
+            } else {
+                let _wp = wd::watch("KeystoreDB::handle_next_superseded_blob delete v1");
+                tx.execute(
+                    "DELETE FROM persistent.blobentry
+                    WHERE NOT subcomponent_type = ?
+                    AND (
+                        id NOT IN (
+                           SELECT MAX(id) FROM persistent.blobentry
+                           WHERE NOT subcomponent_type = ?
+                           GROUP BY keyentryid, subcomponent_type
+                        ) OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
+                    );",
+                    params![SubComponentType::KEY_BLOB, SubComponentType::KEY_BLOB],
+                )
+                .context("Trying to purge superseded blobs.")?;
+            }
 
             Ok(vec![]).no_gc()
         })
@@ -1530,18 +1679,33 @@
     ) -> Result<()> {
         match (blob, sc_type) {
             (Some(blob), _) => {
+                // Mark any previous blobentry(s) of the same type for the same key as superseded.
+                tx.execute(
+                    "UPDATE persistent.blobentry SET state = ?
+                    WHERE keyentryid = ? AND subcomponent_type = ?",
+                    params![BlobState::Superseded, key_id, sc_type],
+                )
+                .context(ks_err!(
+                    "Failed to mark prior {sc_type:?} blobentrys for {key_id} as superseded"
+                ))?;
+
+                // Now insert the new, un-superseded, blob.  (If this fails, the marking of
+                // old blobs as superseded will be rolled back, because we're inside a
+                // transaction.)
                 tx.execute(
                     "INSERT INTO persistent.blobentry
                      (subcomponent_type, keyentryid, blob) VALUES (?, ?, ?);",
                     params![sc_type, key_id, blob],
                 )
                 .context(ks_err!("Failed to insert blob."))?;
+
                 if let Some(blob_metadata) = blob_metadata {
                     let blob_id = tx
                         .query_row("SELECT MAX(id) FROM persistent.blobentry;", [], |row| {
                             row.get(0)
                         })
                         .context(ks_err!("Failed to get new blob id."))?;
+
                     blob_metadata
                         .store_in_db(blob_id, tx)
                         .context(ks_err!("Trying to store blob metadata."))?;
@@ -2263,6 +2427,15 @@
             .context("Trying to delete keyparameters.")?;
         tx.execute("DELETE FROM persistent.grant WHERE keyentryid = ?;", params![key_id])
             .context("Trying to delete grants.")?;
+        // The associated blobentry rows are not immediately deleted when the owning keyentry is
+        // removed, because a KeyMint `deleteKey()` invocation is needed (specifically for the
+        // `KEY_BLOB`).  Mark the affected rows with `state=Orphaned` so a subsequent garbage
+        // collection can do this.
+        tx.execute(
+            "UPDATE persistent.blobentry SET state = ? WHERE keyentryid = ?",
+            params![BlobState::Orphaned, key_id],
+        )
+        .context("Trying to mark blobentrys as superseded")?;
         Ok(updated != 0)
     }
 
@@ -2547,6 +2720,8 @@
     /// The key descriptors will have the domain, nspace, and alias field set.
     /// The returned list will be sorted by alias.
     /// Domain must be APP or SELINUX, the caller must make sure of that.
+    /// Number of returned values is limited to 10,000 (which is empirically roughly
+    /// what will fit in a Binder message).
     pub fn list_past_alias(
         &mut self,
         domain: Domain,
@@ -2564,7 +2739,8 @@
                      AND state = ?
                      AND key_type = ?
                      {}
-                     ORDER BY alias ASC;",
+                     ORDER BY alias ASC
+                     LIMIT 10000;",
             if start_past_alias.is_some() { " AND alias > ?" } else { "" }
         );