Keystore 2.0: Revisit GC.

* Limit the number of simultaniously scheduled garbage collection
  requests.
* Reduce the number of database transaction when many keys need to be
  collected.

Test: Ran Cts test for regression testing and performance validation.
Change-Id: I7c1e2321e13f48cb99e83574df3c4178179da633
diff --git a/keystore2/src/database.rs b/keystore2/src/database.rs
index e62ec4e..830926d 100644
--- a/keystore2/src/database.rs
+++ b/keystore2/src/database.rs
@@ -93,7 +93,7 @@
 use std::{
     collections::{HashMap, HashSet},
     path::Path,
-    sync::{Condvar, Mutex},
+    sync::{Arc, Condvar, Mutex},
     time::{Duration, SystemTime},
 };
 
@@ -735,7 +735,7 @@
 /// ownership. It also implements all of Keystore 2.0's database functionality.
 pub struct KeystoreDB {
     conn: Connection,
-    gc: Option<Gc>,
+    gc: Option<Arc<Gc>>,
 }
 
 /// Database representation of the monotonic time retrieved from the system call clock_gettime with
@@ -850,7 +850,7 @@
     /// It also attempts to initialize all of the tables.
     /// KeystoreDB cannot be used by multiple threads.
     /// Each thread should open their own connection using `thread_local!`.
-    pub fn new(db_root: &Path, gc: Option<Gc>) -> Result<Self> {
+    pub fn new(db_root: &Path, gc: Option<Arc<Gc>>) -> Result<Self> {
         let _wp = wd::watch_millis("KeystoreDB::new", 500);
 
         // Build the path to the sqlite file.
@@ -1147,52 +1147,68 @@
     }
 
     /// This function is intended to be used by the garbage collector.
-    /// It deletes the blob given by `blob_id_to_delete`. It then tries to find a superseded
-    /// key blob that might need special handling by the garbage collector.
+    /// It deletes the blobs given by `blob_ids_to_delete`. It then tries to find up to `max_blobs`
+    /// superseded key blobs that might need special handling by the garbage collector.
     /// If no further superseded blobs can be found it deletes all other superseded blobs that don't
     /// need special handling and returns None.
-    pub fn handle_next_superseded_blob(
+    pub fn handle_next_superseded_blobs(
         &mut self,
-        blob_id_to_delete: Option<i64>,
-    ) -> Result<Option<(i64, Vec<u8>, BlobMetaData)>> {
+        blob_ids_to_delete: &[i64],
+        max_blobs: usize,
+    ) -> Result<Vec<(i64, Vec<u8>, BlobMetaData)>> {
         let _wp = wd::watch_millis("KeystoreDB::handle_next_superseded_blob", 500);
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            // Delete the given blob if one was given.
-            if let Some(blob_id_to_delete) = blob_id_to_delete {
+            // Delete the given blobs.
+            for blob_id in blob_ids_to_delete {
                 tx.execute(
                     "DELETE FROM persistent.blobmetadata WHERE blobentryid = ?;",
-                    params![blob_id_to_delete],
+                    params![blob_id],
                 )
                 .context("Trying to delete blob metadata.")?;
-                tx.execute(
-                    "DELETE FROM persistent.blobentry WHERE id = ?;",
-                    params![blob_id_to_delete],
-                )
-                .context("Trying to blob.")?;
+                tx.execute("DELETE FROM persistent.blobentry WHERE id = ?;", params![blob_id])
+                    .context("Trying to blob.")?;
             }
+            // Find up to max_blobx more superseded key blobs, load their metadata and return it.
+            let result: Vec<(i64, Vec<u8>)> = {
+                let mut stmt = tx
+                    .prepare(
+                        "SELECT id, blob FROM persistent.blobentry
+                        WHERE subcomponent_type = ?
+                        AND (
+                            id NOT IN (
+                                SELECT MAX(id) FROM persistent.blobentry
+                                WHERE subcomponent_type = ?
+                                GROUP BY keyentryid, subcomponent_type
+                            )
+                        OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
+                    ) LIMIT ?;",
+                    )
+                    .context("Trying to prepare query for superseded blobs.")?;
 
-            // Find another superseded keyblob load its metadata and return it.
-            if let Some((blob_id, blob)) = tx
-                .query_row(
-                    "SELECT id, blob FROM persistent.blobentry
-                     WHERE subcomponent_type = ?
-                     AND (
-                         id NOT IN (
-                             SELECT MAX(id) FROM persistent.blobentry
-                             WHERE subcomponent_type = ?
-                             GROUP BY keyentryid, subcomponent_type
-                         )
-                     OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
-                 );",
-                    params![SubComponentType::KEY_BLOB, SubComponentType::KEY_BLOB],
-                    |row| Ok((row.get(0)?, row.get(1)?)),
-                )
-                .optional()
-                .context("Trying to query superseded blob.")?
-            {
-                let blob_metadata = BlobMetaData::load_from_db(blob_id, tx)
-                    .context("Trying to load blob metadata.")?;
-                return Ok(Some((blob_id, blob, blob_metadata))).no_gc();
+                let rows = stmt
+                    .query_map(
+                        params![
+                            SubComponentType::KEY_BLOB,
+                            SubComponentType::KEY_BLOB,
+                            max_blobs as i64,
+                        ],
+                        |row| Ok((row.get(0)?, row.get(1)?)),
+                    )
+                    .context("Trying to query superseded blob.")?;
+
+                rows.collect::<Result<Vec<(i64, Vec<u8>)>, rusqlite::Error>>()
+                    .context("Trying to extract superseded blobs.")?
+            };
+
+            let result = result
+                .into_iter()
+                .map(|(blob_id, blob)| {
+                    Ok((blob_id, blob, BlobMetaData::load_from_db(blob_id, tx)?))
+                })
+                .collect::<Result<Vec<(i64, Vec<u8>, BlobMetaData)>>>()
+                .context("Trying to load blob metadata.")?;
+            if !result.is_empty() {
+                return Ok(result).no_gc();
             }
 
             // We did not find any superseded key blob, so let's remove other superseded blob in
@@ -1211,9 +1227,9 @@
             )
             .context("Trying to purge superseded blobs.")?;
 
-            Ok(None).no_gc()
+            Ok(vec![]).no_gc()
         })
-        .context("In handle_next_superseded_blob.")
+        .context("In handle_next_superseded_blobs.")
     }
 
     /// This maintenance function should be called only once before the database is used for the
@@ -3269,7 +3285,7 @@
         let gc_db = KeystoreDB::new(path, None).expect("Failed to open test gc db_connection.");
         let gc = Gc::new_init_with(Default::default(), move || (Box::new(cb), gc_db, super_key));
 
-        KeystoreDB::new(path, Some(gc))
+        KeystoreDB::new(path, Some(Arc::new(gc)))
     }
 
     fn rebind_alias(
diff --git a/keystore2/src/gc.rs b/keystore2/src/gc.rs
index 6cc0f27..2010c79 100644
--- a/keystore2/src/gc.rs
+++ b/keystore2/src/gc.rs
@@ -20,22 +20,28 @@
 
 use crate::{
     async_task,
-    database::{KeystoreDB, Uuid},
+    database::{BlobMetaData, KeystoreDB, Uuid},
     super_key::SuperKeyManager,
 };
 use anyhow::{Context, Result};
 use async_task::AsyncTask;
-use std::sync::Arc;
+use std::sync::{
+    atomic::{AtomicU8, Ordering},
+    Arc,
+};
 
 pub struct Gc {
     async_task: Arc<AsyncTask>,
+    notified: Arc<AtomicU8>,
 }
 
 impl Gc {
     /// Creates a garbage collector using the given async_task.
-    /// The garbage collector needs a function to invalidate key blobs and a database connection.
-    /// Both are obtained from the init function. The function is only called if this is first
-    /// time a garbage collector was initialized with the given AsyncTask instance.
+    /// The garbage collector needs a function to invalidate key blobs, a database connection,
+    /// and a reference to the `SuperKeyManager`. They are obtained from the init function.
+    /// The function is only called if this is first time a garbage collector was initialized
+    /// with the given AsyncTask instance.
+    /// Note: It is a logical error to initialize different Gc instances with the same `AsyncTask`.
     pub fn new_init_with<F>(async_task: Arc<AsyncTask>, init: F) -> Self
     where
         F: FnOnce() -> (
@@ -46,34 +52,43 @@
             + 'static,
     {
         let weak_at = Arc::downgrade(&async_task);
+        let notified = Arc::new(AtomicU8::new(0));
+        let notified_clone = notified.clone();
         // Initialize the task's shelf.
         async_task.queue_hi(move |shelf| {
             let (invalidate_key, db, super_key) = init();
+            let notified = notified_clone;
             shelf.get_or_put_with(|| GcInternal {
-                blob_id_to_delete: None,
+                deleted_blob_ids: vec![],
+                superseded_blobs: vec![],
                 invalidate_key,
                 db,
                 async_task: weak_at,
                 super_key,
+                notified,
             });
         });
-        Self { async_task }
+        Self { async_task, notified }
     }
 
     /// Notifies the key garbage collector to iterate through orphaned and superseded blobs and
     /// attempts their deletion. We only process one key at a time and then schedule another
     /// attempt by queueing it in the async_task (low priority) queue.
     pub fn notify_gc(&self) {
-        self.async_task.queue_lo(|shelf| shelf.get_downcast_mut::<GcInternal>().unwrap().step())
+        if let Ok(0) = self.notified.compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed) {
+            self.async_task.queue_lo(|shelf| shelf.get_downcast_mut::<GcInternal>().unwrap().step())
+        }
     }
 }
 
 struct GcInternal {
-    blob_id_to_delete: Option<i64>,
+    deleted_blob_ids: Vec<i64>,
+    superseded_blobs: Vec<(i64, Vec<u8>, BlobMetaData)>,
     invalidate_key: Box<dyn Fn(&Uuid, &[u8]) -> Result<()> + Send + 'static>,
     db: KeystoreDB,
     async_task: std::sync::Weak<AsyncTask>,
     super_key: Arc<SuperKeyManager>,
+    notified: Arc<AtomicU8>,
 }
 
 impl GcInternal {
@@ -81,16 +96,23 @@
     /// We process one key at a time, because deleting a key is a time consuming process which
     /// may involve calling into the KeyMint backend and we don't want to hog neither the backend
     /// nor the database for extended periods of time.
+    /// To limit the number of database transactions, which are also expensive and competing
+    /// with threads on the critical path, deleted blobs are loaded in batches.
     fn process_one_key(&mut self) -> Result<()> {
-        if let Some((blob_id, blob, blob_metadata)) = self
-            .db
-            .handle_next_superseded_blob(self.blob_id_to_delete.take())
-            .context("In process_one_key: Trying to handle superseded blob.")?
-        {
-            // Set the blob_id as the next to be deleted blob. So it will be
+        if self.superseded_blobs.is_empty() {
+            let blobs = self
+                .db
+                .handle_next_superseded_blobs(&self.deleted_blob_ids, 20)
+                .context("In process_one_key: Trying to handle superseded blob.")?;
+            self.deleted_blob_ids = vec![];
+            self.superseded_blobs = blobs;
+        }
+
+        if let Some((blob_id, blob, blob_metadata)) = self.superseded_blobs.pop() {
+            // Add the next blob_id to the deleted blob ids list. So it will be
             // removed from the database regardless of whether the following
             // succeeds or not.
-            self.blob_id_to_delete = Some(blob_id);
+            self.deleted_blob_ids.push(blob_id);
 
             // If the key has a km_uuid we try to get the corresponding device
             // and delete the key, unwrapping if necessary and possible.
@@ -110,13 +132,20 @@
 
     /// Processes one key and then schedules another attempt until it runs out of blobs to delete.
     fn step(&mut self) {
+        self.notified.store(0, Ordering::Relaxed);
         if let Err(e) = self.process_one_key() {
             log::error!("Error trying to delete blob entry. {:?}", e);
         }
         // Schedule the next step. This gives high priority requests a chance to interleave.
-        if self.blob_id_to_delete.is_some() {
+        if !self.deleted_blob_ids.is_empty() {
             if let Some(at) = self.async_task.upgrade() {
-                at.queue_lo(move |shelf| shelf.get_downcast_mut::<GcInternal>().unwrap().step());
+                if let Ok(0) =
+                    self.notified.compare_exchange(0, 1, Ordering::Relaxed, Ordering::Relaxed)
+                {
+                    at.queue_lo(move |shelf| {
+                        shelf.get_downcast_mut::<GcInternal>().unwrap().step()
+                    });
+                }
             }
         }
     }
diff --git a/keystore2/src/globals.rs b/keystore2/src/globals.rs
index 54fceab..c492120 100644
--- a/keystore2/src/globals.rs
+++ b/keystore2/src/globals.rs
@@ -54,24 +54,11 @@
 /// is run only once, as long as the ASYNC_TASK instance is the same. So only one additional
 /// database connection is created for the garbage collector worker.
 pub fn create_thread_local_db() -> KeystoreDB {
-    let gc = Gc::new_init_with(ASYNC_TASK.clone(), || {
-        (
-            Box::new(|uuid, blob| {
-                let km_dev: Strong<dyn IKeyMintDevice> =
-                    get_keymint_dev_by_uuid(uuid).map(|(dev, _)| dev)?.get_interface()?;
-                let _wp = wd::watch_millis("In create_thread_local_db: calling deleteKey", 500);
-                map_km_error(km_dev.deleteKey(&*blob))
-                    .context("In invalidate key closure: Trying to invalidate key blob.")
-            }),
-            KeystoreDB::new(&DB_PATH.lock().expect("Could not get the database directory."), None)
-                .expect("Failed to open database."),
-            SUPER_KEY.clone(),
-        )
-    });
-
-    let mut db =
-        KeystoreDB::new(&DB_PATH.lock().expect("Could not get the database directory."), Some(gc))
-            .expect("Failed to open database.");
+    let mut db = KeystoreDB::new(
+        &DB_PATH.lock().expect("Could not get the database directory."),
+        Some(GC.clone()),
+    )
+    .expect("Failed to open database.");
     DB_INIT.call_once(|| {
         log::info!("Touching Keystore 2.0 database for this first time since boot.");
         db.insert_last_off_body(MonotonicRawTime::now())
@@ -177,6 +164,21 @@
         Arc::new(LegacyMigrator::new(Arc::new(Default::default())));
     /// Background thread which handles logging via statsd and logd
     pub static ref LOGS_HANDLER: Arc<AsyncTask> = Default::default();
+
+    static ref GC: Arc<Gc> = Arc::new(Gc::new_init_with(ASYNC_TASK.clone(), || {
+        (
+            Box::new(|uuid, blob| {
+                let km_dev: Strong<dyn IKeyMintDevice> =
+                    get_keymint_dev_by_uuid(uuid).map(|(dev, _)| dev)?.get_interface()?;
+                let _wp = wd::watch_millis("In invalidate key closure: calling deleteKey", 500);
+                map_km_error(km_dev.deleteKey(&*blob))
+                    .context("In invalidate key closure: Trying to invalidate key blob.")
+            }),
+            KeystoreDB::new(&DB_PATH.lock().expect("Could not get the database directory."), None)
+                .expect("Failed to open database."),
+            SUPER_KEY.clone(),
+        )
+    }));
 }
 
 static KEYMINT_SERVICE_NAME: &str = "android.hardware.security.keymint.IKeyMintDevice";