Keystore 2.0: Revise GC.

* Store GC relevant information with blobs.
* Marking a key unreferenced now deletes the main key entry and all
  associated artifacts from the database except the blobs and their
  associated metadata, which is left to the GC.
* The GC is now automatically called by the DB module and it is now
  modular enough to use local test GC.

Test: keystore2_test
Change-Id: Iaf9e851d2895c8c457d160052fcab7f8d20e6555
diff --git a/keystore2/src/database.rs b/keystore2/src/database.rs
index 3789d28..dc6d7a0 100644
--- a/keystore2/src/database.rs
+++ b/keystore2/src/database.rs
@@ -41,12 +41,15 @@
 //! from the database module these functions take permission check
 //! callbacks.
 
-use crate::db_utils::{self, SqlField};
 use crate::error::{Error as KsError, ErrorCode, ResponseCode};
 use crate::impl_metadata; // This is in db_utils.rs
 use crate::key_parameter::{KeyParameter, Tag};
 use crate::permission::KeyPermSet;
 use crate::utils::get_current_time_in_seconds;
+use crate::{
+    db_utils::{self, SqlField},
+    gc::Gc,
+};
 use anyhow::{anyhow, Context, Result};
 use std::{convert::TryFrom, convert::TryInto, ops::Deref, time::SystemTimeError};
 
@@ -95,17 +98,7 @@
     /// A metadata entry for key entries.
     #[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
     pub enum KeyMetaEntry {
-        /// If present, indicates that the sensitive part of key
-        /// is encrypted with another key or a key derived from a password.
-        EncryptedBy(EncryptedBy) with accessor encrypted_by,
-        /// If the blob is password encrypted this field is set to the
-        /// salt used for the key derivation.
-        Salt(Vec<u8>) with accessor salt,
-        /// If the blob is encrypted, this field is set to the initialization vector.
-        Iv(Vec<u8>) with accessor iv,
-        /// If the blob is encrypted, this field holds the AEAD TAG.
-        AeadTag(Vec<u8>) with accessor aead_tag,
-        /// Creation date of a the key entry.
+        /// Date of the creation of the key entry.
         CreationDate(DateTime) with accessor creation_date,
         /// Expiration date for attestation keys.
         AttestationExpirationDate(DateTime) with accessor attestation_expiration_date,
@@ -166,6 +159,76 @@
     }
 }
 
+impl_metadata!(
+    /// A set of metadata for key blobs.
+    #[derive(Debug, Default, Eq, PartialEq)]
+    pub struct BlobMetaData;
+    /// A metadata entry for key blobs.
+    #[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
+    pub enum BlobMetaEntry {
+        /// If present, indicates that the blob is encrypted with another key or a key derived
+        /// from a password.
+        EncryptedBy(EncryptedBy) with accessor encrypted_by,
+        /// If the blob is password encrypted this field is set to the
+        /// salt used for the key derivation.
+        Salt(Vec<u8>) with accessor salt,
+        /// If the blob is encrypted, this field is set to the initialization vector.
+        Iv(Vec<u8>) with accessor iv,
+        /// If the blob is encrypted, this field holds the AEAD TAG.
+        AeadTag(Vec<u8>) with accessor aead_tag,
+        /// The uuid of the owning KeyMint instance.
+        KmUuid(Uuid) with accessor km_uuid,
+        //  --- ADD NEW META DATA FIELDS HERE ---
+        // For backwards compatibility add new entries only to
+        // end of this list and above this comment.
+    };
+);
+
+impl BlobMetaData {
+    fn load_from_db(blob_id: i64, tx: &Transaction) -> Result<Self> {
+        let mut stmt = tx
+            .prepare(
+                "SELECT tag, data from persistent.blobmetadata
+                    WHERE blobentryid = ?;",
+            )
+            .context("In BlobMetaData::load_from_db: prepare statement failed.")?;
+
+        let mut metadata: HashMap<i64, BlobMetaEntry> = Default::default();
+
+        let mut rows =
+            stmt.query(params![blob_id]).context("In BlobMetaData::load_from_db: query failed.")?;
+        db_utils::with_rows_extract_all(&mut rows, |row| {
+            let db_tag: i64 = row.get(0).context("Failed to read tag.")?;
+            metadata.insert(
+                db_tag,
+                BlobMetaEntry::new_from_sql(db_tag, &SqlField::new(1, &row))
+                    .context("Failed to read BlobMetaEntry.")?,
+            );
+            Ok(())
+        })
+        .context("In BlobMetaData::load_from_db.")?;
+
+        Ok(Self { data: metadata })
+    }
+
+    fn store_in_db(&self, blob_id: i64, tx: &Transaction) -> Result<()> {
+        let mut stmt = tx
+            .prepare(
+                "INSERT or REPLACE INTO persistent.blobmetadata (blobentryid, tag, data)
+                    VALUES (?, ?, ?);",
+            )
+            .context("In BlobMetaData::store_in_db: Failed to prepare statement.")?;
+
+        let iter = self.data.iter();
+        for (tag, entry) in iter {
+            stmt.insert(params![blob_id, tag, entry,]).with_context(|| {
+                format!("In BlobMetaData::store_in_db: Failed to insert {:?}", entry)
+            })?;
+        }
+        Ok(())
+    }
+}
+
 /// Indicates the type of the keyentry.
 #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
 pub enum KeyType {
@@ -531,7 +594,7 @@
 #[derive(Debug, Default, Eq, PartialEq)]
 pub struct KeyEntry {
     id: i64,
-    km_blob: Option<Vec<u8>>,
+    key_blob_info: Option<(Vec<u8>, BlobMetaData)>,
     cert: Option<Vec<u8>>,
     cert_chain: Option<Vec<u8>>,
     km_uuid: Uuid,
@@ -546,12 +609,12 @@
         self.id
     }
     /// Exposes the optional KeyMint blob.
-    pub fn km_blob(&self) -> &Option<Vec<u8>> {
-        &self.km_blob
+    pub fn key_blob_info(&self) -> &Option<(Vec<u8>, BlobMetaData)> {
+        &self.key_blob_info
     }
-    /// Extracts the Optional KeyMint blob.
-    pub fn take_km_blob(&mut self) -> Option<Vec<u8>> {
-        self.km_blob.take()
+    /// Extracts the Optional KeyMint blob including its metadata.
+    pub fn take_key_blob_info(&mut self) -> Option<(Vec<u8>, BlobMetaData)> {
+        self.key_blob_info.take()
     }
     /// Exposes the optional public certificate.
     pub fn cert(&self) -> &Option<Vec<u8>> {
@@ -616,10 +679,39 @@
     }
 }
 
+/// This trait is private to the database module. It is used to convey whether or not the garbage
+/// collector shall be invoked after a database access. All closures passed to
+/// `KeystoreDB::with_transaction` return a tuple (bool, T) where the bool indicates if the
+/// gc needs to be triggered. This convenience function allows to turn any anyhow::Result<T>
+/// into anyhow::Result<(bool, T)> by simply appending one of `.do_gc(bool)`, `.no_gc()`, or
+/// `.need_gc()`.
+trait DoGc<T> {
+    fn do_gc(self, need_gc: bool) -> Result<(bool, T)>;
+
+    fn no_gc(self) -> Result<(bool, T)>;
+
+    fn need_gc(self) -> Result<(bool, T)>;
+}
+
+impl<T> DoGc<T> for Result<T> {
+    fn do_gc(self, need_gc: bool) -> Result<(bool, T)> {
+        self.map(|r| (need_gc, r))
+    }
+
+    fn no_gc(self) -> Result<(bool, T)> {
+        self.do_gc(false)
+    }
+
+    fn need_gc(self) -> Result<(bool, T)> {
+        self.do_gc(true)
+    }
+}
+
 /// KeystoreDB wraps a connection to an SQLite database and tracks its
 /// ownership. It also implements all of Keystore 2.0's database functionality.
 pub struct KeystoreDB {
     conn: Connection,
+    gc: Option<Gc>,
 }
 
 /// Database representation of the monotonic time retrieved from the system call clock_gettime with
@@ -715,7 +807,7 @@
     /// It also attempts to initialize all of the tables.
     /// KeystoreDB cannot be used by multiple threads.
     /// Each thread should open their own connection using `thread_local!`.
-    pub fn new(db_root: &Path) -> Result<Self> {
+    pub fn new(db_root: &Path, gc: Option<Gc>) -> Result<Self> {
         // Build the path to the sqlite file.
         let mut persistent_path = db_root.to_path_buf();
         persistent_path.push("persistent.sqlite");
@@ -729,9 +821,9 @@
         // On busy fail Immediately. It is unlikely to succeed given a bug in sqlite.
         conn.busy_handler(None).context("In KeystoreDB::new: Failed to set busy handler.")?;
 
-        let mut db = Self { conn };
+        let mut db = Self { conn, gc };
         db.with_transaction(TransactionBehavior::Immediate, |tx| {
-            Self::init_tables(tx).context("Trying to initialize tables.")
+            Self::init_tables(tx).context("Trying to initialize tables.").no_gc()
         })?;
         Ok(db)
     }
@@ -782,6 +874,24 @@
         .context("Failed to create index blobentry_keyentryid_index.")?;
 
         tx.execute(
+            "CREATE TABLE IF NOT EXISTS persistent.blobmetadata (
+                     id INTEGER PRIMARY KEY,
+                     blobentryid INTEGER,
+                     tag INTEGER,
+                     data ANY,
+                     UNIQUE (blobentryid, tag));",
+            NO_PARAMS,
+        )
+        .context("Failed to initialize \"blobmetadata\" table.")?;
+
+        tx.execute(
+            "CREATE INDEX IF NOT EXISTS persistent.blobmetadata_blobentryid_index
+            ON blobmetadata(blobentryid);",
+            NO_PARAMS,
+        )
+        .context("Failed to create index blobmetadata_blobentryid_index.")?;
+
+        tx.execute(
             "CREATE TABLE IF NOT EXISTS persistent.keyparameter (
                      keyentryid INTEGER,
                      tag INTEGER,
@@ -894,77 +1004,73 @@
         Ok(conn)
     }
 
-    /// Get one unreferenced key. There is no particular order in which the keys are returned.
-    fn get_unreferenced_key_id(tx: &Transaction) -> Result<Option<i64>> {
-        tx.query_row(
-            "SELECT id FROM persistent.keyentry WHERE state = ?",
-            params![KeyLifeCycle::Unreferenced],
-            |row| row.get(0),
-        )
-        .optional()
-        .context("In get_unreferenced_key_id: Trying to get unreferenced key id.")
-    }
-
-    /// Returns a key id guard and key entry for one unreferenced key entry. Of the optional
-    /// fields of the key entry only the km_blob field will be populated. This is required
-    /// to subject the blob to its KeyMint instance for deletion.
-    pub fn get_unreferenced_key(&mut self) -> Result<Option<(KeyIdGuard, KeyEntry)>> {
-        self.with_transaction(TransactionBehavior::Deferred, |tx| {
-            let key_id = match Self::get_unreferenced_key_id(tx)
-                .context("Trying to get unreferenced key id")?
-            {
-                None => return Ok(None),
-                Some(id) => KEY_ID_LOCK.try_get(id).ok_or_else(KsError::sys).context(concat!(
-                    "A key id lock was held for an unreferenced key. ",
-                    "This should never happen."
-                ))?,
-            };
-            let key_entry = Self::load_key_components(tx, KeyEntryLoadBits::KM, key_id.id())
-                .context("Trying to get key components.")?;
-            Ok(Some((key_id, key_entry)))
-        })
-        .context("In get_unreferenced_key.")
-    }
-
-    /// This function purges all remnants of a key entry from the database.
-    /// Important: This does not check if the key was unreferenced, nor does it
-    /// subject the key to its KeyMint instance for permanent invalidation.
-    /// This function should only be called by the garbage collector.
-    /// To delete a key call `mark_unreferenced`, which transitions the key to the unreferenced
-    /// state, deletes all grants to the key, and notifies the garbage collector.
-    /// The garbage collector will:
-    ///  1. Call get_unreferenced_key.
-    ///  2. Determine the proper way to dispose of sensitive key material, e.g., call
-    ///     `KeyMintDevice::delete()`.
-    ///  3. Call `purge_key_entry`.
-    pub fn purge_key_entry(&mut self, key_id: KeyIdGuard) -> Result<()> {
+    /// This function is intended to be used by the garbage collector.
+    /// It deletes the blob given by `blob_id_to_delete`. It then tries to find a superseded
+    /// key blob that might need special handling by the garbage collector.
+    /// If no further superseded blobs can be found it deletes all other superseded blobs that don't
+    /// need special handling and returns None.
+    pub fn handle_next_superseded_blob(
+        &mut self,
+        blob_id_to_delete: Option<i64>,
+    ) -> Result<Option<(i64, Vec<u8>, BlobMetaData)>> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            tx.execute("DELETE FROM persistent.keyentry WHERE id = ?;", params![key_id.id()])
-                .context("Trying to delete keyentry.")?;
-            tx.execute(
-                "DELETE FROM persistent.blobentry WHERE keyentryid = ?;",
-                params![key_id.id()],
-            )
-            .context("Trying to delete blobentries.")?;
-            tx.execute(
-                "DELETE FROM persistent.keymetadata WHERE keyentryid = ?;",
-                params![key_id.id()],
-            )
-            .context("Trying to delete keymetadata.")?;
-            tx.execute(
-                "DELETE FROM persistent.keyparameter WHERE keyentryid = ?;",
-                params![key_id.id()],
-            )
-            .context("Trying to delete keyparameters.")?;
-            let grants_deleted = tx
-                .execute("DELETE FROM persistent.grant WHERE keyentryid = ?;", params![key_id.id()])
-                .context("Trying to delete grants.")?;
-            if grants_deleted != 0 {
-                log::error!("Purged key that still had grants. This should not happen.");
+            // Delete the given blob if one was given.
+            if let Some(blob_id_to_delete) = blob_id_to_delete {
+                tx.execute(
+                    "DELETE FROM persistent.blobmetadata WHERE blobentryid = ?;",
+                    params![blob_id_to_delete],
+                )
+                .context("Trying to delete blob metadata.")?;
+                tx.execute(
+                    "DELETE FROM persistent.blobentry WHERE id = ?;",
+                    params![blob_id_to_delete],
+                )
+                .context("Trying to blob.")?;
             }
-            Ok(())
+
+            // Find another superseded keyblob load its metadata and return it.
+            if let Some((blob_id, blob)) = tx
+                .query_row(
+                    "SELECT id, blob FROM persistent.blobentry
+                     WHERE subcomponent_type = ?
+                     AND (
+                         id NOT IN (
+                             SELECT MAX(id) FROM persistent.blobentry
+                             WHERE subcomponent_type = ?
+                             GROUP BY keyentryid, subcomponent_type
+                         )
+                     OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
+                 );",
+                    params![SubComponentType::KEY_BLOB, SubComponentType::KEY_BLOB],
+                    |row| Ok((row.get(0)?, row.get(1)?)),
+                )
+                .optional()
+                .context("Trying to query superseded blob.")?
+            {
+                let blob_metadata = BlobMetaData::load_from_db(blob_id, tx)
+                    .context("Trying to load blob metadata.")?;
+                return Ok(Some((blob_id, blob, blob_metadata))).no_gc();
+            }
+
+            // We did not find any superseded key blob, so let's remove other superseded blob in
+            // one transaction.
+            tx.execute(
+                "DELETE FROM persistent.blobentry
+                 WHERE NOT subcomponent_type = ?
+                 AND (
+                     id NOT IN (
+                        SELECT MAX(id) FROM persistent.blobentry
+                        WHERE NOT subcomponent_type = ?
+                        GROUP BY keyentryid, subcomponent_type
+                     ) OR keyentryid NOT IN (SELECT id FROM persistent.keyentry)
+                 );",
+                params![SubComponentType::KEY_BLOB, SubComponentType::KEY_BLOB],
+            )
+            .context("Trying to purge superseded blobs.")?;
+
+            Ok(None).no_gc()
         })
-        .context("In purge_key_entry.")
+        .context("In handle_next_superseded_blob.")
     }
 
     /// This maintenance function should be called only once before the database is used for the
@@ -982,6 +1088,7 @@
                 params![KeyLifeCycle::Unreferenced, KeyLifeCycle::Existing],
             )
             .context("Failed to execute query.")
+            .need_gc()
         })
         .context("In cleanup_leftovers.")
     }
@@ -999,7 +1106,7 @@
         create_new_key: F,
     ) -> Result<(KeyIdGuard, KeyEntry)>
     where
-        F: Fn() -> Result<(Vec<u8>, KeyMetaData)>,
+        F: Fn() -> Result<(Vec<u8>, BlobMetaData)>,
     {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
             let id = {
@@ -1055,22 +1162,26 @@
 
                     let (blob, metadata) =
                         create_new_key().context("In get_or_create_key_with.")?;
-                    Self::set_blob_internal(&tx, id, SubComponentType::KEY_BLOB, Some(&blob))
-                        .context("In get_of_create_key_with.")?;
-                    metadata.store_in_db(id, &tx).context("In get_or_create_key_with.")?;
+                    Self::set_blob_internal(
+                        &tx,
+                        id,
+                        SubComponentType::KEY_BLOB,
+                        Some(&blob),
+                        Some(&metadata),
+                    )
+                    .context("In get_of_create_key_with.")?;
                     (
                         id,
                         KeyEntry {
                             id,
-                            km_blob: Some(blob),
-                            metadata,
+                            key_blob_info: Some((blob, metadata)),
                             pure_cert: false,
                             ..Default::default()
                         },
                     )
                 }
             };
-            Ok((KEY_ID_LOCK.get(id), entry))
+            Ok((KEY_ID_LOCK.get(id), entry)).no_gc()
         })
         .context("In get_or_create_key_with.")
     }
@@ -1092,7 +1203,7 @@
     /// or DatabaseLocked is encountered.
     fn with_transaction<T, F>(&mut self, behavior: TransactionBehavior, f: F) -> Result<T>
     where
-        F: Fn(&Transaction) -> Result<T>,
+        F: Fn(&Transaction) -> Result<(bool, T)>,
     {
         loop {
             match self
@@ -1115,6 +1226,14 @@
                 }
             }
         }
+        .map(|(need_gc, result)| {
+            if need_gc {
+                if let Some(ref gc) = self.gc {
+                    gc.notify_gc();
+                }
+            }
+            result
+        })
     }
 
     fn is_locked_error(e: &anyhow::Error) -> bool {
@@ -1142,7 +1261,7 @@
         km_uuid: &Uuid,
     ) -> Result<KeyIdGuard> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            Self::create_key_entry_internal(tx, domain, namespace, km_uuid)
+            Self::create_key_entry_internal(tx, domain, namespace, km_uuid).no_gc()
         })
         .context("In create_key_entry.")
     }
@@ -1204,12 +1323,18 @@
                 })
                 .context("In create_key_entry")?,
             );
-            Self::set_blob_internal(&tx, key_id.0, SubComponentType::KEY_BLOB, Some(private_key))?;
+            Self::set_blob_internal(
+                &tx,
+                key_id.0,
+                SubComponentType::KEY_BLOB,
+                Some(private_key),
+                None,
+            )?;
             let mut metadata = KeyMetaData::new();
             metadata.add(KeyMetaEntry::AttestationMacedPublicKey(maced_public_key.to_vec()));
             metadata.add(KeyMetaEntry::AttestationRawPubKey(raw_public_key.to_vec()));
             metadata.store_in_db(key_id.0, &tx)?;
-            Ok(())
+            Ok(()).no_gc()
         })
         .context("In create_attestation_key_entry")
     }
@@ -1226,9 +1351,10 @@
         key_id: &KeyIdGuard,
         sc_type: SubComponentType,
         blob: Option<&[u8]>,
+        blob_metadata: Option<&BlobMetaData>,
     ) -> Result<()> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            Self::set_blob_internal(&tx, key_id.0, sc_type, blob)
+            Self::set_blob_internal(&tx, key_id.0, sc_type, blob, blob_metadata).need_gc()
         })
         .context("In set_blob.")
     }
@@ -1238,6 +1364,7 @@
         key_id: i64,
         sc_type: SubComponentType,
         blob: Option<&[u8]>,
+        blob_metadata: Option<&BlobMetaData>,
     ) -> Result<()> {
         match (blob, sc_type) {
             (Some(blob), _) => {
@@ -1247,6 +1374,16 @@
                     params![sc_type, key_id, blob],
                 )
                 .context("In set_blob_internal: Failed to insert blob.")?;
+                if let Some(blob_metadata) = blob_metadata {
+                    let blob_id = tx
+                        .query_row("SELECT MAX(id) FROM persistent.blobentry;", NO_PARAMS, |row| {
+                            row.get(0)
+                        })
+                        .context("In set_blob_internal: Failed to get new blob id.")?;
+                    blob_metadata
+                        .store_in_db(blob_id, tx)
+                        .context("In set_blob_internal: Trying to store blob metadata.")?;
+                }
             }
             (None, SubComponentType::CERT) | (None, SubComponentType::CERT_CHAIN) => {
                 tx.execute(
@@ -1266,13 +1403,10 @@
 
     /// Inserts a collection of key parameters into the `persistent.keyparameter` table
     /// and associates them with the given `key_id`.
-    pub fn insert_keyparameter(
-        &mut self,
-        key_id: &KeyIdGuard,
-        params: &[KeyParameter],
-    ) -> Result<()> {
+    #[cfg(test)]
+    fn insert_keyparameter(&mut self, key_id: &KeyIdGuard, params: &[KeyParameter]) -> Result<()> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            Self::insert_keyparameter_internal(tx, key_id, params)
+            Self::insert_keyparameter_internal(tx, key_id, params).no_gc()
         })
         .context("In insert_keyparameter.")
     }
@@ -1304,13 +1438,10 @@
     }
 
     /// Insert a set of key entry specific metadata into the database.
-    pub fn insert_key_metadata(
-        &mut self,
-        key_id: &KeyIdGuard,
-        metadata: &KeyMetaData,
-    ) -> Result<()> {
+    #[cfg(test)]
+    fn insert_key_metadata(&mut self, key_id: &KeyIdGuard, metadata: &KeyMetaData) -> Result<()> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
-            metadata.store_in_db(key_id.0, &tx)
+            metadata.store_in_db(key_id.0, &tx).no_gc()
         })
         .context("In insert_key_metadata.")
     }
@@ -1370,9 +1501,15 @@
                 expiration_date,
             )));
             metadata.store_in_db(key_id, &tx).context("Failed to insert key metadata.")?;
-            Self::set_blob_internal(&tx, key_id, SubComponentType::CERT_CHAIN, Some(cert_chain))
-                .context("Failed to insert cert chain")?;
-            Ok(())
+            Self::set_blob_internal(
+                &tx,
+                key_id,
+                SubComponentType::CERT_CHAIN,
+                Some(cert_chain),
+                None,
+            )
+            .context("Failed to insert cert chain")?;
+            Ok(()).no_gc()
         })
         .context("In store_signed_attestation_certificate_chain: ")
     }
@@ -1434,7 +1571,7 @@
                     result
                 ));
             }
-            Ok(())
+            Ok(()).no_gc()
         })
         .context("In assign_attestation_key: ")
     }
@@ -1476,7 +1613,7 @@
                 )?
                 .collect::<rusqlite::Result<Vec<Vec<u8>>>>()
                 .context("Failed to execute statement")?;
-            Ok(rows)
+            Ok(rows).no_gc()
         })
         .context("In fetch_unsigned_attestation_keys")
     }
@@ -1511,7 +1648,7 @@
                     num_deleted += 1;
                 }
             }
-            Ok(num_deleted)
+            Ok(num_deleted).do_gc(num_deleted != 0)
         })
         .context("In delete_expired_attestation_keys: ")
     }
@@ -1576,7 +1713,7 @@
                     _ => {}
                 }
             }
-            Ok(AttestationPoolStatus { expiring, unassigned, attested, total })
+            Ok(AttestationPoolStatus { expiring, unassigned, attested, total }).no_gc()
         })
         .context("In get_attestation_pool_status: ")
     }
@@ -1597,8 +1734,9 @@
                     .context(format!("Domain {:?} must be either App or SELinux.", domain));
             }
         }
-        let mut stmt = self.conn.prepare(
-            "SELECT subcomponent_type, blob
+        self.with_transaction(TransactionBehavior::Deferred, |tx| {
+            let mut stmt = tx.prepare(
+                "SELECT subcomponent_type, blob
              FROM persistent.blobentry
              WHERE keyentryid IN
                 (SELECT id
@@ -1608,48 +1746,50 @@
                        AND namespace = ?
                        AND state = ?
                        AND km_uuid = ?);",
-        )?;
-        let rows = stmt
-            .query_map(
-                params![
-                    KeyType::Attestation,
-                    domain.0 as u32,
-                    namespace,
-                    KeyLifeCycle::Live,
-                    km_uuid
-                ],
-                |row| Ok((row.get(0)?, row.get(1)?)),
-            )?
-            .collect::<rusqlite::Result<Vec<(SubComponentType, Vec<u8>)>>>()
-            .context("In retrieve_attestation_key_and_cert_chain: query failed.")?;
-        if rows.is_empty() {
-            return Ok(None);
-        } else if rows.len() != 2 {
-            return Err(KsError::sys()).context(format!(
-                concat!(
+            )?;
+            let rows = stmt
+                .query_map(
+                    params![
+                        KeyType::Attestation,
+                        domain.0 as u32,
+                        namespace,
+                        KeyLifeCycle::Live,
+                        km_uuid
+                    ],
+                    |row| Ok((row.get(0)?, row.get(1)?)),
+                )?
+                .collect::<rusqlite::Result<Vec<(SubComponentType, Vec<u8>)>>>()
+                .context("In retrieve_attestation_key_and_cert_chain: query failed.")?;
+            if rows.is_empty() {
+                return Ok(None).no_gc();
+            } else if rows.len() != 2 {
+                return Err(KsError::sys()).context(format!(
+                    concat!(
                 "In retrieve_attestation_key_and_cert_chain: Expected to get a single attestation",
                 "key chain but instead got {}."),
-                rows.len()
-            ));
-        }
-        let mut km_blob: Vec<u8> = Vec::new();
-        let mut cert_chain_blob: Vec<u8> = Vec::new();
-        for row in rows {
-            let sub_type: SubComponentType = row.0;
-            match sub_type {
-                SubComponentType::KEY_BLOB => {
-                    km_blob = row.1;
-                }
-                SubComponentType::CERT_CHAIN => {
-                    cert_chain_blob = row.1;
-                }
-                _ => Err(KsError::sys()).context("Unknown or incorrect subcomponent type.")?,
+                    rows.len()
+                ));
             }
-        }
-        Ok(Some(CertificateChain {
-            private_key: ZVec::try_from(km_blob)?,
-            cert_chain: ZVec::try_from(cert_chain_blob)?,
-        }))
+            let mut km_blob: Vec<u8> = Vec::new();
+            let mut cert_chain_blob: Vec<u8> = Vec::new();
+            for row in rows {
+                let sub_type: SubComponentType = row.0;
+                match sub_type {
+                    SubComponentType::KEY_BLOB => {
+                        km_blob = row.1;
+                    }
+                    SubComponentType::CERT_CHAIN => {
+                        cert_chain_blob = row.1;
+                    }
+                    _ => Err(KsError::sys()).context("Unknown or incorrect subcomponent type.")?,
+                }
+            }
+            Ok(Some(CertificateChain {
+                private_key: ZVec::try_from(km_blob)?,
+                cert_chain: ZVec::try_from(cert_chain_blob)?,
+            }))
+            .no_gc()
+        })
     }
 
     /// Updates the alias column of the given key id `newid` with the given alias,
@@ -1714,11 +1854,11 @@
         &mut self,
         key: &KeyDescriptor,
         params: &[KeyParameter],
-        blob: &[u8],
+        blob_info: &(&[u8], &BlobMetaData),
         cert_info: &CertificateInfo,
         metadata: &KeyMetaData,
         km_uuid: &Uuid,
-    ) -> Result<(bool, KeyIdGuard)> {
+    ) -> Result<KeyIdGuard> {
         let (alias, domain, namespace) = match key {
             KeyDescriptor { alias: Some(alias), domain: Domain::APP, nspace, blob: None }
             | KeyDescriptor { alias: Some(alias), domain: Domain::SELINUX, nspace, blob: None } => {
@@ -1732,10 +1872,17 @@
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
             let key_id = Self::create_key_entry_internal(tx, &domain, namespace, km_uuid)
                 .context("Trying to create new key entry.")?;
-            Self::set_blob_internal(tx, key_id.id(), SubComponentType::KEY_BLOB, Some(blob))
-                .context("Trying to insert the key blob.")?;
+            let (blob, blob_metadata) = *blob_info;
+            Self::set_blob_internal(
+                tx,
+                key_id.id(),
+                SubComponentType::KEY_BLOB,
+                Some(blob),
+                Some(&blob_metadata),
+            )
+            .context("Trying to insert the key blob.")?;
             if let Some(cert) = &cert_info.cert {
-                Self::set_blob_internal(tx, key_id.id(), SubComponentType::CERT, Some(&cert))
+                Self::set_blob_internal(tx, key_id.id(), SubComponentType::CERT, Some(&cert), None)
                     .context("Trying to insert the certificate.")?;
             }
             if let Some(cert_chain) = &cert_info.cert_chain {
@@ -1744,6 +1891,7 @@
                     key_id.id(),
                     SubComponentType::CERT_CHAIN,
                     Some(&cert_chain),
+                    None,
                 )
                 .context("Trying to insert the certificate chain.")?;
             }
@@ -1752,7 +1900,7 @@
             metadata.store_in_db(key_id.id(), tx).context("Trying to insert key metadata.")?;
             let need_gc = Self::rebind_alias(tx, &key_id, &alias, &domain, namespace)
                 .context("Trying to rebind alias.")?;
-            Ok((need_gc, key_id))
+            Ok(key_id).do_gc(need_gc)
         })
         .context("In store_new_key.")
     }
@@ -1781,8 +1929,14 @@
             let key_id = Self::create_key_entry_internal(tx, &domain, namespace, km_uuid)
                 .context("Trying to create new key entry.")?;
 
-            Self::set_blob_internal(tx, key_id.id(), SubComponentType::CERT_CHAIN, Some(cert))
-                .context("Trying to insert certificate.")?;
+            Self::set_blob_internal(
+                tx,
+                key_id.id(),
+                SubComponentType::CERT_CHAIN,
+                Some(cert),
+                None,
+            )
+            .context("Trying to insert certificate.")?;
 
             let mut metadata = KeyMetaData::new();
             metadata.add(KeyMetaEntry::CreationDate(
@@ -1791,9 +1945,9 @@
 
             metadata.store_in_db(key_id.id(), tx).context("Trying to insert key metadata.")?;
 
-            Self::rebind_alias(tx, &key_id, &alias, &domain, namespace)
+            let need_gc = Self::rebind_alias(tx, &key_id, &alias, &domain, namespace)
                 .context("Trying to rebind alias.")?;
-            Ok(key_id)
+            Ok(key_id).do_gc(need_gc)
         })
         .context("In store_new_certificate.")
     }
@@ -1954,7 +2108,7 @@
         key_id: i64,
         load_bits: KeyEntryLoadBits,
         tx: &Transaction,
-    ) -> Result<(bool, Option<Vec<u8>>, Option<Vec<u8>>, Option<Vec<u8>>)> {
+    ) -> Result<(bool, Option<(Vec<u8>, BlobMetaData)>, Option<Vec<u8>>, Option<Vec<u8>>)> {
         let mut stmt = tx
             .prepare(
                 "SELECT MAX(id), subcomponent_type, blob FROM persistent.blobentry
@@ -1965,7 +2119,7 @@
         let mut rows =
             stmt.query(params![key_id]).context("In load_blob_components: query failed.")?;
 
-        let mut km_blob: Option<Vec<u8>> = None;
+        let mut key_blob: Option<(i64, Vec<u8>)> = None;
         let mut cert_blob: Option<Vec<u8>> = None;
         let mut cert_chain_blob: Option<Vec<u8>> = None;
         let mut has_km_blob: bool = false;
@@ -1975,7 +2129,10 @@
             has_km_blob = has_km_blob || sub_type == SubComponentType::KEY_BLOB;
             match (sub_type, load_bits.load_public(), load_bits.load_km()) {
                 (SubComponentType::KEY_BLOB, _, true) => {
-                    km_blob = Some(row.get(2).context("Failed to extract KM blob.")?);
+                    key_blob = Some((
+                        row.get(0).context("Failed to extract key blob id.")?,
+                        row.get(2).context("Failed to extract key blob.")?,
+                    ));
                 }
                 (SubComponentType::CERT, true, _) => {
                     cert_blob =
@@ -1994,7 +2151,15 @@
         })
         .context("In load_blob_components.")?;
 
-        Ok((has_km_blob, km_blob, cert_blob, cert_chain_blob))
+        let blob_info = key_blob.map_or::<Result<_>, _>(Ok(None), |(blob_id, blob)| {
+            Ok(Some((
+                blob,
+                BlobMetaData::load_from_db(blob_id, tx)
+                    .context("In load_blob_components: Trying to load blob_metadata.")?,
+            )))
+        })?;
+
+        Ok((has_km_blob, blob_info, cert_blob, cert_chain_blob))
     }
 
     fn load_key_parameters(key_id: i64, tx: &Transaction) -> Result<Vec<KeyParameter>> {
@@ -2027,7 +2192,7 @@
     /// usage has been exhausted, if not, decreases the usage count. If the usage count reaches
     /// zero, the key also gets marked unreferenced and scheduled for deletion.
     /// Returns Ok(true) if the key was marked unreferenced as a hint to the garbage collector.
-    pub fn check_and_update_key_usage_count(&mut self, key_id: i64) -> Result<bool> {
+    pub fn check_and_update_key_usage_count(&mut self, key_id: i64) -> Result<()> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
             let limit: Option<i32> = tx
                 .query_row(
@@ -2052,9 +2217,10 @@
 
             match limit {
                 1 => Self::mark_unreferenced(tx, key_id)
+                    .map(|need_gc| (need_gc, ()))
                     .context("Trying to mark limited use key for deletion."),
                 0 => Err(KsError::Km(ErrorCode::INVALID_KEY_BLOB)).context("Key is exhausted."),
-                _ => Ok(false),
+                _ => Ok(()).no_gc(),
             }
         })
         .context("In check_and_update_key_usage_count.")
@@ -2180,13 +2346,14 @@
 
     fn mark_unreferenced(tx: &Transaction, key_id: i64) -> Result<bool> {
         let updated = tx
-            .execute(
-                "UPDATE persistent.keyentry SET state = ? WHERE id = ?;",
-                params![KeyLifeCycle::Unreferenced, key_id],
-            )
-            .context("In mark_unreferenced: Failed to update state of key entry.")?;
-        tx.execute("DELETE from persistent.grant WHERE keyentryid = ?;", params![key_id])
-            .context("In mark_unreferenced: Failed to drop grants.")?;
+            .execute("DELETE FROM persistent.keyentry WHERE id = ?;", params![key_id])
+            .context("Trying to delete keyentry.")?;
+        tx.execute("DELETE FROM persistent.keymetadata WHERE keyentryid = ?;", params![key_id])
+            .context("Trying to delete keymetadata.")?;
+        tx.execute("DELETE FROM persistent.keyparameter WHERE keyentryid = ?;", params![key_id])
+            .context("Trying to delete keyparameters.")?;
+        tx.execute("DELETE FROM persistent.grant WHERE keyentryid = ?;", params![key_id])
+            .context("Trying to delete grants.")?;
         Ok(updated != 0)
     }
 
@@ -2198,7 +2365,7 @@
         key_type: KeyType,
         caller_uid: u32,
         check_permission: impl Fn(&KeyDescriptor, Option<KeyPermSet>) -> Result<()>,
-    ) -> Result<bool> {
+    ) -> Result<()> {
         self.with_transaction(TransactionBehavior::Immediate, |tx| {
             let (key_id, access_key_descriptor, access_vector) =
                 Self::load_access_tuple(tx, key, key_type, caller_uid)
@@ -2209,7 +2376,9 @@
             check_permission(&access_key_descriptor, access_vector)
                 .context("While checking permission.")?;
 
-            Self::mark_unreferenced(tx, key_id).context("Trying to mark the key unreferenced.")
+            Self::mark_unreferenced(tx, key_id)
+                .map(|need_gc| (need_gc, ()))
+                .context("Trying to mark the key unreferenced.")
         })
         .context("In unbind_key.")
     }
@@ -2230,7 +2399,7 @@
     ) -> Result<KeyEntry> {
         let metadata = KeyMetaData::load_from_db(key_id, &tx).context("In load_key_components.")?;
 
-        let (has_km_blob, km_blob, cert_blob, cert_chain_blob) =
+        let (has_km_blob, key_blob_info, cert_blob, cert_chain_blob) =
             Self::load_blob_components(key_id, load_bits, &tx)
                 .context("In load_key_components.")?;
 
@@ -2242,7 +2411,7 @@
 
         Ok(KeyEntry {
             id: key_id,
-            km_blob,
+            key_blob_info,
             cert: cert_blob,
             cert_chain: cert_chain_blob,
             km_uuid,
@@ -2279,7 +2448,7 @@
                 Ok(())
             })
             .context("In list: Failed to extract rows.")?;
-            Ok(descriptors)
+            Ok(descriptors).no_gc()
         })
     }
 
@@ -2349,6 +2518,7 @@
             };
 
             Ok(KeyDescriptor { domain: Domain::GRANT, nspace: grant_id, alias: None, blob: None })
+                .no_gc()
         })
     }
 
@@ -2380,7 +2550,7 @@
             )
             .context("Failed to delete grant.")?;
 
-            Ok(())
+            Ok(()).no_gc()
         })
     }
 
@@ -2424,7 +2594,7 @@
                 ],
             )
             .context("In insert_auth_token: failed to insert auth token into the database")?;
-            Ok(())
+            Ok(()).no_gc()
         })
     }
 
@@ -2460,10 +2630,11 @@
                         entry,
                         Self::get_last_off_body(tx)
                             .context("In find_auth_token_entry: Trying to get last off body")?,
-                    )));
+                    )))
+                    .no_gc();
                 }
             }
-            Ok(None)
+            Ok(None).no_gc()
         })
         .context("In find_auth_token_entry.")
     }
@@ -2476,7 +2647,7 @@
                 params!["last_off_body", last_off_body],
             )
             .context("In insert_last_off_body: failed to insert.")?;
-            Ok(())
+            Ok(()).no_gc()
         })
     }
 
@@ -2488,7 +2659,7 @@
                 params![last_off_body, "last_off_body"],
             )
             .context("In update_last_off_body: failed to update.")?;
-            Ok(())
+            Ok(()).no_gc()
         })
     }
 
@@ -2534,13 +2705,23 @@
     fn new_test_db() -> Result<KeystoreDB> {
         let conn = KeystoreDB::make_connection("file::memory:", "file::memory:")?;
 
-        let mut db = KeystoreDB { conn };
+        let mut db = KeystoreDB { conn, gc: None };
         db.with_transaction(TransactionBehavior::Immediate, |tx| {
-            KeystoreDB::init_tables(tx).context("Failed to initialize tables.")
+            KeystoreDB::init_tables(tx).context("Failed to initialize tables.").no_gc()
         })?;
         Ok(db)
     }
 
+    fn new_test_db_with_gc<F>(path: &Path, cb: F) -> Result<KeystoreDB>
+    where
+        F: Fn(&Uuid, &[u8]) -> Result<()> + Send + 'static,
+    {
+        let gc_db = KeystoreDB::new(path, None).expect("Failed to open test gc db_connection.");
+        let gc = Gc::new_init_with(Default::default(), move || (Box::new(cb), gc_db));
+
+        KeystoreDB::new(path, Some(gc))
+    }
+
     fn rebind_alias(
         db: &mut KeystoreDB,
         newid: &KeyIdGuard,
@@ -2549,7 +2730,7 @@
         namespace: i64,
     ) -> Result<bool> {
         db.with_transaction(TransactionBehavior::Immediate, |tx| {
-            KeystoreDB::rebind_alias(tx, newid, alias, &domain, &namespace)
+            KeystoreDB::rebind_alias(tx, newid, alias, &domain, &namespace).no_gc()
         })
         .context("In rebind_alias.")
     }
@@ -2601,12 +2782,13 @@
             .prepare("SELECT name from persistent.sqlite_master WHERE type='table' ORDER BY name;")?
             .query_map(params![], |row| row.get(0))?
             .collect::<rusqlite::Result<Vec<String>>>()?;
-        assert_eq!(tables.len(), 5);
+        assert_eq!(tables.len(), 6);
         assert_eq!(tables[0], "blobentry");
-        assert_eq!(tables[1], "grant");
-        assert_eq!(tables[2], "keyentry");
-        assert_eq!(tables[3], "keymetadata");
-        assert_eq!(tables[4], "keyparameter");
+        assert_eq!(tables[1], "blobmetadata");
+        assert_eq!(tables[2], "grant");
+        assert_eq!(tables[3], "keyentry");
+        assert_eq!(tables[4], "keymetadata");
+        assert_eq!(tables[5], "keyparameter");
         let tables = db
             .conn
             .prepare("SELECT name from perboot.sqlite_master WHERE type='table' ORDER BY name;")?
@@ -2696,13 +2878,13 @@
     #[test]
     fn test_persistence_for_files() -> Result<()> {
         let temp_dir = TempDir::new("persistent_db_test")?;
-        let mut db = KeystoreDB::new(temp_dir.path())?;
+        let mut db = KeystoreDB::new(temp_dir.path(), None)?;
 
         db.create_key_entry(&Domain::APP, &100, &KEYSTORE_UUID)?;
         let entries = get_keyentry(&db)?;
         assert_eq!(entries.len(), 1);
 
-        let db = KeystoreDB::new(temp_dir.path())?;
+        let db = KeystoreDB::new(temp_dir.path(), None)?;
 
         let entries_new = get_keyentry(&db)?;
         assert_eq!(entries, entries_new);
@@ -2833,7 +3015,9 @@
 
     #[test]
     fn test_remove_expired_certs() -> Result<()> {
-        let mut db = new_test_db()?;
+        let temp_dir =
+            TempDir::new("test_remove_expired_certs_").expect("Failed to create temp dir.");
+        let mut db = new_test_db_with_gc(temp_dir.path(), |_, _| Ok(()))?;
         let expiration_date: i64 =
             SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)?.as_millis() as i64 + 10000;
         let namespace: i64 = 30;
@@ -2847,11 +3031,20 @@
         )?;
         load_attestation_key_pool(&mut db, 45, namespace_del1, 0x02)?;
         load_attestation_key_pool(&mut db, 60, namespace_del2, 0x03)?;
+
+        let blob_entry_row_count: u32 = db
+            .conn
+            .query_row("SELECT COUNT(id) FROM persistent.blobentry;", NO_PARAMS, |row| row.get(0))
+            .expect("Failed to get blob entry row count.");
+        // We expect 6 rows here because there are two blobs per attestation key, i.e.,
+        // One key and one certificate.
+        assert_eq!(blob_entry_row_count, 6);
+
         assert_eq!(db.delete_expired_attestation_keys()?, 2);
 
         let mut cert_chain =
             db.retrieve_attestation_key_and_cert_chain(Domain::APP, namespace, &KEYSTORE_UUID)?;
-        assert_eq!(true, cert_chain.is_some());
+        assert!(cert_chain.is_some());
         let value = cert_chain.unwrap();
         assert_eq!(entry_values[1], value.cert_chain.to_vec());
         assert_eq!(entry_values[2], value.private_key.to_vec());
@@ -2861,26 +3054,25 @@
             namespace_del1,
             &KEYSTORE_UUID,
         )?;
-        assert_eq!(false, cert_chain.is_some());
+        assert!(!cert_chain.is_some());
         cert_chain = db.retrieve_attestation_key_and_cert_chain(
             Domain::APP,
             namespace_del2,
             &KEYSTORE_UUID,
         )?;
-        assert_eq!(false, cert_chain.is_some());
+        assert!(!cert_chain.is_some());
 
-        let mut option_entry = db.get_unreferenced_key()?;
-        assert_eq!(true, option_entry.is_some());
-        let (key_guard, _) = option_entry.unwrap();
-        db.purge_key_entry(key_guard)?;
+        // Give the garbage collector half a second to catch up.
+        std::thread::sleep(Duration::from_millis(500));
 
-        option_entry = db.get_unreferenced_key()?;
-        assert_eq!(true, option_entry.is_some());
-        let (key_guard, _) = option_entry.unwrap();
-        db.purge_key_entry(key_guard)?;
+        let blob_entry_row_count: u32 = db
+            .conn
+            .query_row("SELECT COUNT(id) FROM persistent.blobentry;", NO_PARAMS, |row| row.get(0))
+            .expect("Failed to get blob entry row count.");
+        // There shound be 2 blob entries left, because we deleted two of the attestation
+        // key entries with two blobs each.
+        assert_eq!(blob_entry_row_count, 2);
 
-        option_entry = db.get_unreferenced_key()?;
-        assert_eq!(false, option_entry.is_some());
         Ok(())
     }
 
@@ -3120,26 +3312,43 @@
     fn test_set_blob() -> Result<()> {
         let key_id = KEY_ID_LOCK.get(3000);
         let mut db = new_test_db()?;
-        db.set_blob(&key_id, SubComponentType::KEY_BLOB, Some(TEST_KEY_BLOB))?;
-        db.set_blob(&key_id, SubComponentType::CERT, Some(TEST_CERT_BLOB))?;
-        db.set_blob(&key_id, SubComponentType::CERT_CHAIN, Some(TEST_CERT_CHAIN_BLOB))?;
+        let mut blob_metadata = BlobMetaData::new();
+        blob_metadata.add(BlobMetaEntry::KmUuid(KEYSTORE_UUID));
+        db.set_blob(
+            &key_id,
+            SubComponentType::KEY_BLOB,
+            Some(TEST_KEY_BLOB),
+            Some(&blob_metadata),
+        )?;
+        db.set_blob(&key_id, SubComponentType::CERT, Some(TEST_CERT_BLOB), None)?;
+        db.set_blob(&key_id, SubComponentType::CERT_CHAIN, Some(TEST_CERT_CHAIN_BLOB), None)?;
         drop(key_id);
 
         let mut stmt = db.conn.prepare(
-            "SELECT subcomponent_type, keyentryid, blob FROM persistent.blobentry
+            "SELECT subcomponent_type, keyentryid, blob, id FROM persistent.blobentry
                 ORDER BY subcomponent_type ASC;",
         )?;
         let mut rows = stmt
-            .query_map::<(SubComponentType, i64, Vec<u8>), _, _>(NO_PARAMS, |row| {
-                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
+            .query_map::<((SubComponentType, i64, Vec<u8>), i64), _, _>(NO_PARAMS, |row| {
+                Ok(((row.get(0)?, row.get(1)?, row.get(2)?), row.get(3)?))
             })?;
-        let r = rows.next().unwrap().unwrap();
+        let (r, id) = rows.next().unwrap().unwrap();
         assert_eq!(r, (SubComponentType::KEY_BLOB, 3000, TEST_KEY_BLOB.to_vec()));
-        let r = rows.next().unwrap().unwrap();
+        let (r, _) = rows.next().unwrap().unwrap();
         assert_eq!(r, (SubComponentType::CERT, 3000, TEST_CERT_BLOB.to_vec()));
-        let r = rows.next().unwrap().unwrap();
+        let (r, _) = rows.next().unwrap().unwrap();
         assert_eq!(r, (SubComponentType::CERT_CHAIN, 3000, TEST_CERT_CHAIN_BLOB.to_vec()));
 
+        drop(rows);
+        drop(stmt);
+
+        assert_eq!(
+            db.with_transaction(TransactionBehavior::Immediate, |tx| {
+                BlobMetaData::load_from_db(id, tx).no_gc()
+            })
+            .expect("Should find blob metadata."),
+            blob_metadata
+        );
         Ok(())
     }
 
@@ -3564,7 +3773,7 @@
         let handle = {
             let temp_dir = Arc::new(TempDir::new("id_lock_test")?);
             let temp_dir_clone = temp_dir.clone();
-            let mut db = KeystoreDB::new(temp_dir.path())?;
+            let mut db = KeystoreDB::new(temp_dir.path(), None)?;
             let key_id = make_test_key_entry(&mut db, Domain::APP, 33, KEY_LOCK_TEST_ALIAS, None)
                 .context("test_insert_and_load_full_keyentry_domain_app")?
                 .0;
@@ -3595,7 +3804,7 @@
             // the primary thread.
             let handle = thread::spawn(move || {
                 let temp_dir = temp_dir_clone;
-                let mut db = KeystoreDB::new(temp_dir.path()).unwrap();
+                let mut db = KeystoreDB::new(temp_dir.path(), None).unwrap();
                 assert!(db
                     .load_key_entry(
                         &KeyDescriptor {
@@ -3638,8 +3847,8 @@
         let temp_dir =
             TempDir::new("test_database_busy_error_code_").expect("Failed to create temp dir.");
 
-        let mut db1 = KeystoreDB::new(temp_dir.path()).expect("Failed to open database1.");
-        let mut db2 = KeystoreDB::new(temp_dir.path()).expect("Failed to open database2.");
+        let mut db1 = KeystoreDB::new(temp_dir.path(), None).expect("Failed to open database1.");
+        let mut db2 = KeystoreDB::new(temp_dir.path(), None).expect("Failed to open database2.");
 
         let _tx1 = db1
             .conn
@@ -3799,7 +4008,7 @@
     #[test]
     fn list() -> Result<()> {
         let temp_dir = TempDir::new("list_test")?;
-        let mut db = KeystoreDB::new(temp_dir.path())?;
+        let mut db = KeystoreDB::new(temp_dir.path(), None)?;
         static LIST_O_ENTRIES: &[(Domain, i64, &str)] = &[
             (Domain::APP, 1, "test1"),
             (Domain::APP, 1, "test2"),
@@ -4198,18 +4407,27 @@
         max_usage_count: Option<i32>,
     ) -> Result<KeyIdGuard> {
         let key_id = db.create_key_entry(&domain, &namespace, &KEYSTORE_UUID)?;
-        db.set_blob(&key_id, SubComponentType::KEY_BLOB, Some(TEST_KEY_BLOB))?;
-        db.set_blob(&key_id, SubComponentType::CERT, Some(TEST_CERT_BLOB))?;
-        db.set_blob(&key_id, SubComponentType::CERT_CHAIN, Some(TEST_CERT_CHAIN_BLOB))?;
+        let mut blob_metadata = BlobMetaData::new();
+        blob_metadata.add(BlobMetaEntry::EncryptedBy(EncryptedBy::Password));
+        blob_metadata.add(BlobMetaEntry::Salt(vec![1, 2, 3]));
+        blob_metadata.add(BlobMetaEntry::Iv(vec![2, 3, 1]));
+        blob_metadata.add(BlobMetaEntry::AeadTag(vec![3, 1, 2]));
+        blob_metadata.add(BlobMetaEntry::KmUuid(KEYSTORE_UUID));
+
+        db.set_blob(
+            &key_id,
+            SubComponentType::KEY_BLOB,
+            Some(TEST_KEY_BLOB),
+            Some(&blob_metadata),
+        )?;
+        db.set_blob(&key_id, SubComponentType::CERT, Some(TEST_CERT_BLOB), None)?;
+        db.set_blob(&key_id, SubComponentType::CERT_CHAIN, Some(TEST_CERT_CHAIN_BLOB), None)?;
 
         let params = make_test_params(max_usage_count);
         db.insert_keyparameter(&key_id, &params)?;
 
         let mut metadata = KeyMetaData::new();
-        metadata.add(KeyMetaEntry::EncryptedBy(EncryptedBy::Password));
-        metadata.add(KeyMetaEntry::Salt(vec![1, 2, 3]));
-        metadata.add(KeyMetaEntry::Iv(vec![2, 3, 1]));
-        metadata.add(KeyMetaEntry::AeadTag(vec![3, 1, 2]));
+        metadata.add(KeyMetaEntry::CreationDate(DateTime::from_millis_epoch(123456789)));
         db.insert_key_metadata(&key_id, &metadata)?;
         rebind_alias(db, &key_id, alias, domain, namespace)?;
         Ok(key_id)
@@ -4218,15 +4436,19 @@
     fn make_test_key_entry_test_vector(key_id: i64, max_usage_count: Option<i32>) -> KeyEntry {
         let params = make_test_params(max_usage_count);
 
+        let mut blob_metadata = BlobMetaData::new();
+        blob_metadata.add(BlobMetaEntry::EncryptedBy(EncryptedBy::Password));
+        blob_metadata.add(BlobMetaEntry::Salt(vec![1, 2, 3]));
+        blob_metadata.add(BlobMetaEntry::Iv(vec![2, 3, 1]));
+        blob_metadata.add(BlobMetaEntry::AeadTag(vec![3, 1, 2]));
+        blob_metadata.add(BlobMetaEntry::KmUuid(KEYSTORE_UUID));
+
         let mut metadata = KeyMetaData::new();
-        metadata.add(KeyMetaEntry::EncryptedBy(EncryptedBy::Password));
-        metadata.add(KeyMetaEntry::Salt(vec![1, 2, 3]));
-        metadata.add(KeyMetaEntry::Iv(vec![2, 3, 1]));
-        metadata.add(KeyMetaEntry::AeadTag(vec![3, 1, 2]));
+        metadata.add(KeyMetaEntry::CreationDate(DateTime::from_millis_epoch(123456789)));
 
         KeyEntry {
             id: key_id,
-            km_blob: Some(TEST_KEY_BLOB.to_vec()),
+            key_blob_info: Some((TEST_KEY_BLOB.to_vec(), blob_metadata)),
             cert: Some(TEST_CERT_BLOB.to_vec()),
             cert_chain: Some(TEST_CERT_CHAIN_BLOB.to_vec()),
             km_uuid: KEYSTORE_UUID,