Keystore 2.0: Revisit GC.

* Limit the number of simultaniously scheduled garbage collection
  requests.
* Reduce the number of database transaction when many keys need to be
  collected.

Test: Ran Cts test for regression testing and performance validation.
Change-Id: I7c1e2321e13f48cb99e83574df3c4178179da633
diff --git a/keystore2/src/database.rs b/keystore2/src/database.rs
index e62ec4e..830926d 100644
--- a/keystore2/src/database.rs
+++ b/keystore2/src/database.rs
@@ -93,7 +93,7 @@
 use std::{
     collections::{HashMap, HashSet},
     path::Path,
-    sync::{Condvar, Mutex},
+    sync::{Arc, Condvar, Mutex},
     time::{Duration, SystemTime},
 };
 
@@ -735,7 +735,7 @@
 /// ownership. It also implements all of Keystore 2.0's database functionality.
 pub struct KeystoreDB {
     conn: Connection,
-    gc: Option<Gc>,
+    gc: Option<Arc<Gc>>,
 }
 
 /// Database representation of the monotonic time retrieved from the system call clock_gettime with
@@ -850,7 +850,7 @@
     /// It also attempts to initialize all of the tables.
     /// KeystoreDB cannot be used by multiple threads.
     /// Each thread should open their own connection using `thread_local!`.
-    pub fn new(db_root: &Path, gc: Option<Gc>) -> Result<Self> {
+    pub fn new(db_root: &Path, gc: Option<Arc<Gc>>) -> Result<Self> {
         let _wp = wd::watch_millis("KeystoreDB::new", 500);
 
         // Build the path to the sqlite file.
@@ -1147,52 +1147,68 @@
     }
 
     /// This function is intended to be used by the garbage collector.
-    /// It deletes the blob given by `blob_id_to_delete`. It then tries to find a superseded
-    /// key blob that might need special handling by the garbage collector.
+    /// It deletes the blobs given by `blob_ids_to_delete`. It then tries to find up to `max_blobs`
+    /// superseded key blobs that might need special handling by the garbage collector.
     /// If no further superseded blobs can be found it deletes all other superseded blobs that don't
     /// need special handling and returns None.
-    pub fn handle_next_superseded_blob(
+    pub fn handle_next_superseded_blobs(
         &mut self,
-        blob_id_to_delete: Option<i64>,
-    ) -> Result<Option<(i64, Vec<u8>, BlobMetaData)>> {
+        blob_ids_to_delete: &[i64],
+        max_blobs: usize,
+    ) -> Result<Vec<(i64, Vec<u8>, BlobMetaData)>> {
         let _wp = wd::watch_millis("KeystoreDB::handle_next_superseded_blob", 500);
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            // Delete the given blob if one was given.
-            if let Some(blob_id_to_delete) = blob_id_to_delete {
+            // Delete the given blobs.
+            for blob_id in blob_ids_to_delete {
                 tx.execute(
                     "DELETE FROM persistent.blobmetadata WHERE blobentryid = ?;",
-                    params![blob_id_to_delete],
+                    params![blob_id],
                 )
                 .context("Trying to delete blob metadata.")?;
-                tx.execute(
-                    "DELETE FROM persistent.blobentry WHERE id = ?;",
-                    params![blob_id_to_delete],
-                )
-                .context("Trying to blob.")?;
+                tx.execute("DELETE FROM persistent.blobentry WHERE id = ?;", params![blob_id])
+                    .context("Trying to blob.")?;
             }
+            // Find up to max_blobx more superseded key blobs, load their metadata and return it.
+            let result: Vec<(i64, Vec<u8>)> = {
+                let mut stmt = tx
+                    .prepare(
+                        "SELECT id, blob FROM persistent.blobentry
+                        WHERE subcomponent_type = ?
+                        AND (
+                            id NOT IN (
+                                SELECT MAX(id) FROM persistent.blobentry
+                                WHERE subcomponent_type = ?
+                                GROUP BY keyentryid, subcomponent_type
+                            )
+                        OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
+                    ) LIMIT ?;",
+                    )
+                    .context("Trying to prepare query for superseded blobs.")?;
 
-            // Find another superseded keyblob load its metadata and return it.
-            if let Some((blob_id, blob)) = tx
-                .query_row(
-                    "SELECT id, blob FROM persistent.blobentry
-                     WHERE subcomponent_type = ?
-                     AND (
-                         id NOT IN (
-                             SELECT MAX(id) FROM persistent.blobentry
-                             WHERE subcomponent_type = ?
-                             GROUP BY keyentryid, subcomponent_type
-                         )
-                     OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
-                 );",
-                    params![SubComponentType::KEY_BLOB, SubComponentType::KEY_BLOB],
-                    |row| Ok((row.get(0)?, row.get(1)?)),
-                )
-                .optional()
-                .context("Trying to query superseded blob.")?
-            {
-                let blob_metadata = BlobMetaData::load_from_db(blob_id, tx)
-                    .context("Trying to load blob metadata.")?;
-                return Ok(Some((blob_id, blob, blob_metadata))).no_gc();
+                let rows = stmt
+                    .query_map(
+                        params![
+                            SubComponentType::KEY_BLOB,
+                            SubComponentType::KEY_BLOB,
+                            max_blobs as i64,
+                        ],
+                        |row| Ok((row.get(0)?, row.get(1)?)),
+                    )
+                    .context("Trying to query superseded blob.")?;
+
+                rows.collect::<Result<Vec<(i64, Vec<u8>)>, rusqlite::Error>>()
+                    .context("Trying to extract superseded blobs.")?
+            };
+
+            let result = result
+                .into_iter()
+                .map(|(blob_id, blob)| {
+                    Ok((blob_id, blob, BlobMetaData::load_from_db(blob_id, tx)?))
+                })
+                .collect::<Result<Vec<(i64, Vec<u8>, BlobMetaData)>>>()
+                .context("Trying to load blob metadata.")?;
+            if !result.is_empty() {
+                return Ok(result).no_gc();
             }
 
             // We did not find any superseded key blob, so let's remove other superseded blob in
@@ -1211,9 +1227,9 @@
             )
             .context("Trying to purge superseded blobs.")?;
 
-            Ok(None).no_gc()
+            Ok(vec![]).no_gc()
         })
-        .context("In handle_next_superseded_blob.")
+        .context("In handle_next_superseded_blobs.")
     }
 
     /// This maintenance function should be called only once before the database is used for the
@@ -3269,7 +3285,7 @@
         let gc_db = KeystoreDB::new(path, None).expect("Failed to open test gc db_connection.");
         let gc = Gc::new_init_with(Default::default(), move || (Box::new(cb), gc_db, super_key));
 
-        KeystoreDB::new(path, Some(gc))
+        KeystoreDB::new(path, Some(Arc::new(gc)))
     }
 
     fn rebind_alias(