Keystore 2.0: Key garbage collection.

This patch introduces a key life cycle state in the keyentry table. We
use this to implement key garbage collection.
This patch:
 * Introduces the key lifecycle.
 * Database functionality for marking a key unreferenced, getting an
   unreferenced key, and purging keys from the database.
 * Implements the deleteKey API call of IKeyStoreService.
 * Implements async_task, a singke on-demand worker thread.
 * Implements a garbage collector that collects unreferenced
   keys and disposes off sensitive key material.
 * Remove security level from the blobentry table.

Bug: 159340471
Test: keystore2_test
Change-Id: I84ffd64eaae1b86c645b50f100b1b399b9e16e40
diff --git a/keystore2/src/async_task.rs b/keystore2/src/async_task.rs
new file mode 100644
index 0000000..6edd760
--- /dev/null
+++ b/keystore2/src/async_task.rs
@@ -0,0 +1,144 @@
+// Copyright 2020, The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! This module implements the handling of async tasks.
+//! The worker thread has a high priority and a low priority queue. Adding a job to either
+//! will cause one thread to be spawned if none exists. As a compromise between performance
+//! and resource consumption, the thread will linger for about 30 seconds after it has
+//! processed all tasks before it terminates.
+//! Note that low priority tasks are processed only when the high priority queue is empty.
+
+use std::time::Duration;
+use std::{
+    collections::VecDeque,
+    sync::Arc,
+    sync::{Condvar, Mutex, MutexGuard},
+    thread,
+};
+
+#[derive(Debug, PartialEq, Eq)]
+enum State {
+    Exiting,
+    Running,
+}
+
+struct AsyncTaskState {
+    state: State,
+    thread: Option<thread::JoinHandle<()>>,
+    hi_prio_req: VecDeque<Box<dyn FnOnce() + Send>>,
+    lo_prio_req: VecDeque<Box<dyn FnOnce() + Send>>,
+}
+
+/// AsyncTask spawns one worker thread on demand to process jobs inserted into
+/// a low and a high priority work queue.
+pub struct AsyncTask {
+    state: Arc<(Condvar, Mutex<AsyncTaskState>)>,
+}
+
+impl Default for AsyncTask {
+    fn default() -> Self {
+        Self {
+            state: Arc::new((
+                Condvar::new(),
+                Mutex::new(AsyncTaskState {
+                    state: State::Exiting,
+                    thread: None,
+                    hi_prio_req: VecDeque::new(),
+                    lo_prio_req: VecDeque::new(),
+                }),
+            )),
+        }
+    }
+}
+
+impl AsyncTask {
+    /// Adds a job to the high priority queue. High priority jobs are completed before
+    /// low priority jobs and can also overtake low priority jobs. But they cannot
+    /// preempt them.
+    pub fn queue_hi<F>(&self, f: F)
+    where
+        F: FnOnce() + Send + 'static,
+    {
+        self.queue(f, true)
+    }
+
+    /// Adds a job to the low priority queue. Low priority jobs are completed after
+    /// high priority. And they are not executed as long as high priority jobs are
+    /// present. Jobs always run to completion and are never preempted by high
+    /// priority jobs.
+    pub fn queue_lo<F>(&self, f: F)
+    where
+        F: FnOnce() + Send + 'static,
+    {
+        self.queue(f, false)
+    }
+
+    fn queue<F>(&self, f: F, hi_prio: bool)
+    where
+        F: FnOnce() + Send + 'static,
+    {
+        let (ref condvar, ref state) = *self.state;
+        let mut state = state.lock().unwrap();
+        if hi_prio {
+            state.hi_prio_req.push_back(Box::new(f));
+        } else {
+            state.lo_prio_req.push_back(Box::new(f));
+        }
+
+        if state.state != State::Running {
+            self.spawn_thread(&mut state);
+        }
+        drop(state);
+        condvar.notify_all();
+    }
+
+    fn spawn_thread(&self, state: &mut MutexGuard<AsyncTaskState>) {
+        if let Some(t) = state.thread.take() {
+            t.join().expect("AsyncTask panicked.");
+        }
+
+        let cloned_state = self.state.clone();
+
+        state.thread = Some(thread::spawn(move || {
+            let (ref condvar, ref state) = *cloned_state;
+            loop {
+                if let Some(f) = {
+                    let (mut state, timeout) = condvar
+                        .wait_timeout_while(
+                            state.lock().unwrap(),
+                            Duration::from_secs(30),
+                            |state| state.hi_prio_req.is_empty() && state.lo_prio_req.is_empty(),
+                        )
+                        .unwrap();
+                    match (
+                        state.hi_prio_req.pop_front(),
+                        state.lo_prio_req.is_empty(),
+                        timeout.timed_out(),
+                    ) {
+                        (Some(f), _, _) => Some(f),
+                        (None, false, _) => state.lo_prio_req.pop_front(),
+                        (None, true, true) => {
+                            state.state = State::Exiting;
+                            break;
+                        }
+                        (None, true, false) => None,
+                    }
+                } {
+                    f()
+                }
+            }
+        }));
+        state.state = State::Running;
+    }
+}
diff --git a/keystore2/src/database.rs b/keystore2/src/database.rs
index 0c49340..df1c24c 100644
--- a/keystore2/src/database.rs
+++ b/keystore2/src/database.rs
@@ -46,12 +46,15 @@
 
 #![allow(dead_code)]
 
-use crate::db_utils::{self, SqlField};
 use crate::error::{Error as KsError, ResponseCode};
 use crate::impl_metadata; // This is in db_utils.rs
 use crate::key_parameter::{KeyParameter, Tag};
 use crate::permission::KeyPermSet;
 use crate::utils::get_current_time_in_seconds;
+use crate::{
+    db_utils::{self, SqlField},
+    gc::Gc,
+};
 use anyhow::{anyhow, Context, Result};
 use std::{convert::TryFrom, convert::TryInto, time::SystemTimeError};
 
@@ -314,6 +317,39 @@
     }
 }
 
+#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
+enum KeyLifeCycle {
+    /// Existing keys have a key ID but are not fully populated yet.
+    /// This is a transient state. If Keystore finds any such keys when it starts up, it must move
+    /// them to Unreferenced for garbage collection.
+    Existing,
+    /// A live key is fully populated and usable by clients.
+    Live,
+    /// An unreferenced key is scheduled for garbage collection.
+    Unreferenced,
+}
+
+impl ToSql for KeyLifeCycle {
+    fn to_sql(&self) -> rusqlite::Result<ToSqlOutput> {
+        match self {
+            Self::Existing => Ok(ToSqlOutput::Owned(Value::Integer(0))),
+            Self::Live => Ok(ToSqlOutput::Owned(Value::Integer(1))),
+            Self::Unreferenced => Ok(ToSqlOutput::Owned(Value::Integer(2))),
+        }
+    }
+}
+
+impl FromSql for KeyLifeCycle {
+    fn column_result(value: ValueRef) -> FromSqlResult<Self> {
+        match i64::column_result(value)? {
+            0 => Ok(KeyLifeCycle::Existing),
+            1 => Ok(KeyLifeCycle::Live),
+            2 => Ok(KeyLifeCycle::Unreferenced),
+            v => Err(FromSqlError::OutOfRange(v)),
+        }
+    }
+}
+
 /// Keys have a KeyMint blob component and optional public certificate and
 /// certificate chain components.
 /// KeyEntryLoadBits is a bitmap that indicates to `KeystoreDB::load_key_entry`
@@ -593,25 +629,18 @@
                      key_type INTEGER,
                      domain INTEGER,
                      namespace INTEGER,
-                     alias BLOB);",
+                     alias BLOB,
+                     state INTEGER);",
             NO_PARAMS,
         )
         .context("Failed to initialize \"keyentry\" table.")?;
 
         conn.execute(
-            "CREATE VIEW IF NOT EXISTS persistent.orphaned AS
-                    SELECT id FROM persistent.keyentry WHERE domain IS NULL;",
-            NO_PARAMS,
-        )
-        .context("Failed to initialize \"orphaned\" view")?;
-
-        conn.execute(
             "CREATE TABLE IF NOT EXISTS persistent.blobentry (
                     id INTEGER PRIMARY KEY,
                     subcomponent_type INTEGER,
                     keyentryid INTEGER,
-                    blob BLOB,
-                    sec_level INTEGER);",
+                    blob BLOB);",
             NO_PARAMS,
         )
         .context("Failed to initialize \"blobentry\" table.")?;
@@ -692,6 +721,96 @@
         Ok(conn)
     }
 
+    /// Get one unreferenced key. There is no particular order in which the keys are returned.
+    fn get_unreferenced_key_id(tx: &Transaction) -> Result<Option<i64>> {
+        tx.query_row(
+            "SELECT id FROM persistent.keyentry WHERE state = ?",
+            params![KeyLifeCycle::Unreferenced],
+            |row| row.get(0),
+        )
+        .optional()
+        .context("In get_unreferenced_key_id: Trying to get unreferenced key id.")
+    }
+
+    /// Returns a key id guard and key entry for one unreferenced key entry. Of the optional
+    /// fields of the key entry only the km_blob field will be populated. This is required
+    /// to subject the blob to its KeyMint instance for deletion.
+    pub fn get_unreferenced_key(&mut self) -> Result<Option<(KeyIdGuard, KeyEntry)>> {
+        self.with_transaction(TransactionBehavior::Deferred, |tx| {
+            let key_id = match Self::get_unreferenced_key_id(tx)
+                .context("Trying to get unreferenced key id")?
+            {
+                None => return Ok(None),
+                Some(id) => KEY_ID_LOCK.try_get(id).ok_or_else(KsError::sys).context(concat!(
+                    "A key id lock was held for an unreferenced key. ",
+                    "This should never happen."
+                ))?,
+            };
+            let key_entry = Self::load_key_components(tx, KeyEntryLoadBits::KM, key_id.id())
+                .context("Trying to get key components.")?;
+            Ok(Some((key_id, key_entry)))
+        })
+        .context("In get_unreferenced_key.")
+    }
+
+    /// This function purges all remnants of a key entry from the database.
+    /// Important: This does not check if the key was unreferenced, nor does it
+    /// subject the key to its KeyMint instance for permanent invalidation.
+    /// This function should only be called by the garbage collector.
+    /// To delete a key call `mark_unreferenced`, which transitions the key to the unreferenced
+    /// state, deletes all grants to the key, and notifies the garbage collector.
+    /// The garbage collector will:
+    ///  1. Call get_unreferenced_key.
+    ///  2. Determine the proper way to dispose of sensitive key material, e.g., call
+    ///     `KeyMintDevice::delete()`.
+    ///  3. Call `purge_key_entry`.
+    pub fn purge_key_entry(&mut self, key_id: KeyIdGuard) -> Result<()> {
+        self.with_transaction(TransactionBehavior::Immediate, |tx| {
+            tx.execute("DELETE FROM persistent.keyentry WHERE id = ?;", params![key_id.id()])
+                .context("Trying to delete keyentry.")?;
+            tx.execute(
+                "DELETE FROM persistent.blobentry WHERE keyentryid = ?;",
+                params![key_id.id()],
+            )
+            .context("Trying to delete blobentries.")?;
+            tx.execute(
+                "DELETE FROM persistent.keymetadata WHERE keyentryid = ?;",
+                params![key_id.id()],
+            )
+            .context("Trying to delete keymetadata.")?;
+            tx.execute(
+                "DELETE FROM persistent.keyparameter WHERE keyentryid = ?;",
+                params![key_id.id()],
+            )
+            .context("Trying to delete keyparameters.")?;
+            let grants_deleted = tx
+                .execute("DELETE FROM persistent.grant WHERE keyentryid = ?;", params![key_id.id()])
+                .context("Trying to delete grants.")?;
+            if grants_deleted != 0 {
+                log::error!("Purged key that still had grants. This should not happen.");
+            }
+            Ok(())
+        })
+        .context("In purge_key_entry.")
+    }
+
+    /// This maintenance function should be called only once before the database is used for the
+    /// first time. It restores the invariant that `KeyLifeCycle::Existing` is a transient state.
+    /// The function transitions all key entries from Existing to Unreferenced unconditionally and
+    /// returns the number of rows affected. If this returns a value greater than 0, it means that
+    /// Keystore crashed at some point during key generation. Callers may want to log such
+    /// occurrences.
+    /// Unlike with `mark_unreferenced`, we don't need to purge grants, because only keys that made
+    /// it to `KeyLifeCycle::Live` may have grants.
+    pub fn cleanup_leftovers(&mut self) -> Result<usize> {
+        self.conn
+            .execute(
+                "UPDATE persistent.keyentry SET state = ? WHERE state = ?;",
+                params![KeyLifeCycle::Unreferenced, KeyLifeCycle::Existing],
+            )
+            .context("In cleanup_leftovers.")
+    }
+
     /// Atomically loads a key entry and associated metadata or creates it using the
     /// callback create_new_key callback. The callback is called during a database
     /// transaction. This means that implementers should be mindful about using
@@ -719,11 +838,12 @@
                     key_type = ?
                     AND domain = ?
                     AND namespace = ?
-                    AND alias = ?;",
+                    AND alias = ?
+                    AND state = ?;",
                 )
                 .context("In get_or_create_key_with: Failed to select from keyentry table.")?;
             let mut rows = stmt
-                .query(params![KeyType::Super, domain.0, namespace, alias])
+                .query(params![KeyType::Super, domain.0, namespace, alias, KeyLifeCycle::Live])
                 .context("In get_or_create_key_with: Failed to query from keyentry table.")?;
 
             db_utils::with_rows_extract_one(&mut rows, |row| {
@@ -746,22 +866,16 @@
                 let id = Self::insert_with_retry(|id| {
                     tx.execute(
                         "INSERT into persistent.keyentry
-                        (id, key_type, domain, namespace, alias)
-                        VALUES(?, ?, ?, ?, ?);",
-                        params![id, KeyType::Super, domain.0, namespace, alias],
+                        (id, key_type, domain, namespace, alias, state)
+                        VALUES(?, ?, ?, ?, ?, ?);",
+                        params![id, KeyType::Super, domain.0, namespace, alias, KeyLifeCycle::Live],
                     )
                 })
                 .context("In get_or_create_key_with.")?;
 
                 let (blob, metadata) = create_new_key().context("In get_or_create_key_with.")?;
-                Self::insert_blob_internal(
-                    &tx,
-                    id,
-                    SubComponentType::KEY_BLOB,
-                    &blob,
-                    SecurityLevel::SOFTWARE,
-                )
-                .context("In get_of_create_key_with.")?;
+                Self::insert_blob_internal(&tx, id, SubComponentType::KEY_BLOB, &blob)
+                    .context("In get_of_create_key_with.")?;
                 metadata.store_in_db(id, &tx).context("In get_or_create_key_with.")?;
                 (id, KeyEntry { id, km_blob: Some(blob), metadata, ..Default::default() })
             }
@@ -770,13 +884,40 @@
         Ok((KEY_ID_LOCK.get(id), entry))
     }
 
+    /// Creates a transaction with the given behavior and executes f with the new transaction.
+    /// The transaction is committed only if f returns Ok.
+    fn with_transaction<T, F>(&mut self, behavior: TransactionBehavior, f: F) -> Result<T>
+    where
+        F: FnOnce(&Transaction) -> Result<T>,
+    {
+        let tx = self
+            .conn
+            .transaction_with_behavior(behavior)
+            .context("In with_transaction: Failed to initialize transaction.")?;
+        f(&tx).and_then(|result| {
+            tx.commit().context("In with_transaction: Failed to commit transaction.")?;
+            Ok(result)
+        })
+    }
+
     /// Creates a new key entry and allocates a new randomized id for the new key.
     /// The key id gets associated with a domain and namespace but not with an alias.
     /// To complete key generation `rebind_alias` should be called after all of the
     /// key artifacts, i.e., blobs and parameters have been associated with the new
     /// key id. Finalizing with `rebind_alias` makes the creation of a new key entry
     /// atomic even if key generation is not.
-    pub fn create_key_entry(&self, domain: Domain, namespace: i64) -> Result<KeyIdGuard> {
+    pub fn create_key_entry(&mut self, domain: Domain, namespace: i64) -> Result<KeyIdGuard> {
+        self.with_transaction(TransactionBehavior::Immediate, |tx| {
+            Self::create_key_entry_internal(tx, domain, namespace)
+        })
+        .context("In create_key_entry.")
+    }
+
+    fn create_key_entry_internal(
+        tx: &Transaction,
+        domain: Domain,
+        namespace: i64,
+    ) -> Result<KeyIdGuard> {
         match domain {
             Domain::APP | Domain::SELINUX => {}
             _ => {
@@ -786,14 +927,20 @@
         }
         Ok(KEY_ID_LOCK.get(
             Self::insert_with_retry(|id| {
-                self.conn.execute(
+                tx.execute(
                     "INSERT into persistent.keyentry
-                     (id, key_type, domain, namespace, alias)
-                     VALUES(?, ?, ?, ?, NULL);",
-                    params![id, KeyType::Client, domain.0 as u32, namespace],
+                     (id, key_type, domain, namespace, alias, state)
+                     VALUES(?, ?, ?, ?, NULL, ?);",
+                    params![
+                        id,
+                        KeyType::Client,
+                        domain.0 as u32,
+                        namespace,
+                        KeyLifeCycle::Existing
+                    ],
                 )
             })
-            .context("In create_key_entry")?,
+            .context("In create_key_entry_internal")?,
         ))
     }
 
@@ -808,17 +955,11 @@
         key_id: &KeyIdGuard,
         sc_type: SubComponentType,
         blob: &[u8],
-        sec_level: SecurityLevel,
     ) -> Result<()> {
-        let tx = self
-            .conn
-            .transaction_with_behavior(TransactionBehavior::Immediate)
-            .context("In insert_blob: Failed to initialize transaction.")?;
-
-        Self::insert_blob_internal(&tx, key_id.0, sc_type, blob, sec_level)
-            .context("In insert_blob.")?;
-
-        tx.commit().context("In insert_blob: Failed to commit transaction.")
+        self.with_transaction(TransactionBehavior::Immediate, |tx| {
+            Self::insert_blob_internal(&tx, key_id.0, sc_type, blob)
+        })
+        .context("In insert_blob.")
     }
 
     fn insert_blob_internal(
@@ -826,12 +967,11 @@
         key_id: i64,
         sc_type: SubComponentType,
         blob: &[u8],
-        sec_level: SecurityLevel,
     ) -> Result<()> {
         tx.execute(
-            "INSERT into persistent.blobentry (subcomponent_type, keyentryid, blob, sec_level)
-                VALUES (?, ?, ?, ?);",
-            params![sc_type, key_id, blob, sec_level.0],
+            "INSERT into persistent.blobentry (subcomponent_type, keyentryid, blob)
+                VALUES (?, ?, ?);",
+            params![sc_type, key_id, blob],
         )
         .context("In insert_blob_internal: Failed to insert blob.")?;
         Ok(())
@@ -844,30 +984,36 @@
         key_id: &KeyIdGuard,
         params: impl IntoIterator<Item = &'a KeyParameter>,
     ) -> Result<()> {
-        let tx = self
-            .conn
-            .transaction_with_behavior(TransactionBehavior::Immediate)
-            .context("In insert_keyparameter: Failed to start transaction.")?;
-        {
-            let mut stmt = tx
-                .prepare(
-                    "INSERT into persistent.keyparameter (keyentryid, tag, data, security_level)
-                    VALUES (?, ?, ?, ?);",
-                )
-                .context("In insert_keyparameter: Failed to prepare statement.")?;
+        self.with_transaction(TransactionBehavior::Immediate, |tx| {
+            Self::insert_keyparameter_internal(tx, key_id, params)
+        })
+        .context("In insert_keyparameter.")
+    }
 
-            let iter = params.into_iter();
-            for p in iter {
-                stmt.insert(params![
-                    key_id.0,
-                    p.get_tag().0,
-                    p.key_parameter_value(),
-                    p.security_level().0
-                ])
-                .with_context(|| format!("In insert_keyparameter: Failed to insert {:?}", p))?;
-            }
+    fn insert_keyparameter_internal<'a>(
+        tx: &Transaction,
+        key_id: &KeyIdGuard,
+        params: impl IntoIterator<Item = &'a KeyParameter>,
+    ) -> Result<()> {
+        let mut stmt = tx
+            .prepare(
+                "INSERT into persistent.keyparameter (keyentryid, tag, data, security_level)
+                VALUES (?, ?, ?, ?);",
+            )
+            .context("In insert_keyparameter_internal: Failed to prepare statement.")?;
+
+        let iter = params.into_iter();
+        for p in iter {
+            stmt.insert(params![
+                key_id.0,
+                p.get_tag().0,
+                p.key_parameter_value(),
+                p.security_level().0
+            ])
+            .with_context(|| {
+                format!("In insert_keyparameter_internal: Failed to insert {:?}", p)
+            })?;
         }
-        tx.commit().context("In insert_keyparameter: Failed to commit transaction.")?;
         Ok(())
     }
 
@@ -877,19 +1023,30 @@
         key_id: &KeyIdGuard,
         metadata: &KeyMetaData,
     ) -> Result<()> {
-        let tx = self
-            .conn
-            .transaction_with_behavior(TransactionBehavior::Immediate)
-            .context("In insert_key_metadata: Failed to initialize transaction.")?;
-        metadata.store_in_db(key_id.0, &tx).context("In insert_key_metadata")?;
-        tx.commit().context("In insert_key_metadata: Failed to commit transaction")
+        self.with_transaction(TransactionBehavior::Immediate, |tx| {
+            metadata.store_in_db(key_id.0, &tx)
+        })
+        .context("In insert_key_metadata.")
+    }
+
+    fn rebind_alias(
+        &mut self,
+        newid: &KeyIdGuard,
+        alias: &str,
+        domain: Domain,
+        namespace: i64,
+    ) -> Result<()> {
+        self.with_transaction(TransactionBehavior::Immediate, |tx| {
+            Self::rebind_alias_internal(tx, newid, alias, domain, namespace)
+        })
+        .context("In rebind_alias.")
     }
 
     /// Updates the alias column of the given key id `newid` with the given alias,
     /// and atomically, removes the alias, domain, and namespace from another row
     /// with the same alias-domain-namespace tuple if such row exits.
-    pub fn rebind_alias(
-        &mut self,
+    fn rebind_alias_internal(
+        tx: &Transaction,
         newid: &KeyIdGuard,
         alias: &str,
         domain: Domain,
@@ -899,41 +1056,94 @@
             Domain::APP | Domain::SELINUX => {}
             _ => {
                 return Err(KsError::sys()).context(format!(
-                    "In rebind_alias: Domain {:?} must be either App or SELinux.",
+                    "In rebind_alias_internal: Domain {:?} must be either App or SELinux.",
                     domain
                 ));
             }
         }
-        let tx = self
-            .conn
-            .transaction_with_behavior(TransactionBehavior::Immediate)
-            .context("In rebind_alias: Failed to initialize transaction.")?;
-        tx.execute(
-            "UPDATE persistent.keyentry
-                 SET alias = NULL, domain = NULL, namespace = NULL
+        let updated = tx
+            .execute(
+                "UPDATE persistent.keyentry
+                 SET alias = NULL, domain = NULL, namespace = NULL, state = ?
                  WHERE alias = ? AND domain = ? AND namespace = ?;",
-            params![alias, domain.0 as u32, namespace],
-        )
-        .context("In rebind_alias: Failed to rebind existing entry.")?;
+                params![KeyLifeCycle::Unreferenced, alias, domain.0 as u32, namespace],
+            )
+            .context("In rebind_alias_internal: Failed to rebind existing entry.")?;
+        if updated != 0 {
+            Gc::notify_gc();
+        }
         let result = tx
             .execute(
                 "UPDATE persistent.keyentry
-                    SET alias = ?
-                    WHERE id = ? AND domain = ? AND namespace = ?;",
-                params![alias, newid.0, domain.0 as u32, namespace],
+                    SET alias = ?, state = ?
+                    WHERE id = ? AND domain = ? AND namespace = ? AND state = ?;",
+                params![
+                    alias,
+                    KeyLifeCycle::Live,
+                    newid.0,
+                    domain.0 as u32,
+                    namespace,
+                    KeyLifeCycle::Existing
+                ],
             )
-            .context("In rebind_alias: Failed to set alias.")?;
+            .context("In rebind_alias_internal: Failed to set alias.")?;
         if result != 1 {
-            // Note that this explicit rollback is not required, as
-            // the transaction should rollback if we do not commit it.
-            // We leave it here for readability.
-            tx.rollback().context("In rebind_alias: Failed to rollback a failed transaction.")?;
             return Err(KsError::sys()).context(format!(
-                "In rebind_alias: Expected to update a single entry but instead updated {}.",
+                "In rebind_alias_internal: Expected to update a single entry but instead updated {}.",
                 result
             ));
         }
-        tx.commit().context("In rebind_alias: Failed to commit transaction.")
+        Ok(())
+    }
+
+    /// Store a new key in a single transaction.
+    /// The function creates a new key entry, populates the blob, key parameter, and metadata
+    /// fields, and rebinds the given alias to the new key.
+    pub fn store_new_key<'a>(
+        &mut self,
+        key: KeyDescriptor,
+        params: impl IntoIterator<Item = &'a KeyParameter>,
+        blob: &[u8],
+        cert: Option<&[u8]>,
+        cert_chain: Option<&[u8]>,
+        metadata: &KeyMetaData,
+    ) -> Result<KeyIdGuard> {
+        let (alias, domain, namespace) = match key {
+            KeyDescriptor { alias: Some(alias), domain: Domain::APP, nspace, blob: None }
+            | KeyDescriptor { alias: Some(alias), domain: Domain::SELINUX, nspace, blob: None } => {
+                (alias, key.domain, nspace)
+            }
+            _ => {
+                return Err(KsError::Rc(ResponseCode::INVALID_ARGUMENT))
+                    .context("In store_new_key: Need alias and domain must be APP or SELINUX.")
+            }
+        };
+        self.with_transaction(TransactionBehavior::Immediate, |tx| {
+            let key_id = Self::create_key_entry_internal(tx, domain, namespace)
+                .context("Trying to create new key entry.")?;
+            Self::insert_blob_internal(tx, key_id.id(), SubComponentType::KEY_BLOB, blob)
+                .context("Trying to insert the key blob.")?;
+            if let Some(cert) = cert {
+                Self::insert_blob_internal(tx, key_id.id(), SubComponentType::CERT, cert)
+                    .context("Trying to insert the certificate.")?;
+            }
+            if let Some(cert_chain) = cert_chain {
+                Self::insert_blob_internal(
+                    tx,
+                    key_id.id(),
+                    SubComponentType::CERT_CHAIN,
+                    cert_chain,
+                )
+                .context("Trying to insert the certificate chain.")?;
+            }
+            Self::insert_keyparameter_internal(tx, &key_id, params)
+                .context("Trying to insert key parameters.")?;
+            metadata.store_in_db(key_id.id(), tx).context("Tryin to insert key metadata.")?;
+            Self::rebind_alias_internal(tx, &key_id, &alias, domain, namespace)
+                .context("Trying to rebind alias.")?;
+            Ok(key_id)
+        })
+        .context("In store_new_key.")
     }
 
     // Helper function loading the key_id given the key descriptor
@@ -952,11 +1162,12 @@
                     key_type =  ?
                     AND domain = ?
                     AND namespace = ?
-                    AND alias = ?;",
+                    AND alias = ?
+                    AND state = ?;",
             )
             .context("In load_key_entry_id: Failed to select from keyentry table.")?;
         let mut rows = stmt
-            .query(params![key_type, key.domain.0 as u32, key.nspace, alias])
+            .query(params![key_type, key.domain.0 as u32, key.nspace, alias, KeyLifeCycle::Live])
             .context("In load_key_entry_id: Failed to read from keyentry table.")?;
         db_utils::with_rows_extract_one(&mut rows, |row| {
             row.map_or_else(|| Err(KsError::Rc(ResponseCode::KEY_NOT_FOUND)), Ok)?
@@ -1035,11 +1246,13 @@
                     .prepare(
                         "SELECT domain, namespace FROM persistent.keyentry
                             WHERE
-                            id = ?;",
+                            id = ?
+                            AND state = ?;",
                     )
                     .context("Domain::KEY_ID: prepare statement failed")?;
-                let mut rows =
-                    stmt.query(params![key.nspace]).context("Domain::KEY_ID: query failed.")?;
+                let mut rows = stmt
+                    .query(params![key.nspace, KeyLifeCycle::Live])
+                    .context("Domain::KEY_ID: query failed.")?;
                 let (domain, namespace): (Domain, i64) =
                     db_utils::with_rows_extract_one(&mut rows, |row| {
                         let r =
@@ -1065,10 +1278,10 @@
         key_id: i64,
         load_bits: KeyEntryLoadBits,
         tx: &Transaction,
-    ) -> Result<(SecurityLevel, Option<Vec<u8>>, Option<Vec<u8>>, Option<Vec<u8>>)> {
+    ) -> Result<(Option<Vec<u8>>, Option<Vec<u8>>, Option<Vec<u8>>)> {
         let mut stmt = tx
             .prepare(
-                "SELECT MAX(id), sec_level, subcomponent_type, blob FROM persistent.blobentry
+                "SELECT MAX(id), subcomponent_type, blob FROM persistent.blobentry
                     WHERE keyentryid = ? GROUP BY subcomponent_type;",
             )
             .context("In load_blob_components: prepare statement failed.")?;
@@ -1076,37 +1289,34 @@
         let mut rows =
             stmt.query(params![key_id]).context("In load_blob_components: query failed.")?;
 
-        let mut sec_level: SecurityLevel = Default::default();
         let mut km_blob: Option<Vec<u8>> = None;
         let mut cert_blob: Option<Vec<u8>> = None;
         let mut cert_chain_blob: Option<Vec<u8>> = None;
         db_utils::with_rows_extract_all(&mut rows, |row| {
             let sub_type: SubComponentType =
-                row.get(2).context("Failed to extract subcomponent_type.")?;
-            match (sub_type, load_bits.load_public()) {
-                (SubComponentType::KEY_BLOB, _) => {
-                    sec_level =
-                        SecurityLevel(row.get(1).context("Failed to extract security level.")?);
-                    if load_bits.load_km() {
-                        km_blob = Some(row.get(3).context("Failed to extract KM blob.")?);
-                    }
+                row.get(1).context("Failed to extract subcomponent_type.")?;
+            match (sub_type, load_bits.load_public(), load_bits.load_km()) {
+                (SubComponentType::KEY_BLOB, _, true) => {
+                    km_blob = Some(row.get(2).context("Failed to extract KM blob.")?);
                 }
-                (SubComponentType::CERT, true) => {
+                (SubComponentType::CERT, true, _) => {
                     cert_blob =
-                        Some(row.get(3).context("Failed to extract public certificate blob.")?);
+                        Some(row.get(2).context("Failed to extract public certificate blob.")?);
                 }
-                (SubComponentType::CERT_CHAIN, true) => {
+                (SubComponentType::CERT_CHAIN, true, _) => {
                     cert_chain_blob =
-                        Some(row.get(3).context("Failed to extract certificate chain blob.")?);
+                        Some(row.get(2).context("Failed to extract certificate chain blob.")?);
                 }
-                (SubComponentType::CERT, _) | (SubComponentType::CERT_CHAIN, _) => {}
+                (SubComponentType::CERT, _, _)
+                | (SubComponentType::CERT_CHAIN, _, _)
+                | (SubComponentType::KEY_BLOB, _, _) => {}
                 _ => Err(KsError::sys()).context("Unknown subcomponent type.")?,
             }
             Ok(())
         })
         .context("In load_blob_components.")?;
 
-        Ok((sec_level, km_blob, cert_blob, cert_chain_blob))
+        Ok((km_blob, cert_blob, cert_chain_blob))
     }
 
     fn load_key_parameters(key_id: i64, tx: &Transaction) -> Result<Vec<KeyParameter>> {
@@ -1223,6 +1433,44 @@
         Ok((key_id_guard, key_entry))
     }
 
+    fn mark_unreferenced(tx: &Transaction, key_id: i64) -> Result<()> {
+        let updated = tx
+            .execute(
+                "UPDATE persistent.keyentry SET state = ? WHERE id = ?;",
+                params![KeyLifeCycle::Unreferenced, key_id],
+            )
+            .context("In mark_unreferenced: Failed to update state of key entry.")?;
+        if updated != 0 {
+            Gc::notify_gc();
+        }
+        tx.execute("DELETE from persistent.grant WHERE keyentryid = ?;", params![key_id])
+            .context("In mark_unreferenced: Failed to drop grants.")?;
+        Ok(())
+    }
+
+    /// Marks the given key as unreferenced and removes all of the grants to this key.
+    pub fn unbind_key(
+        &mut self,
+        key: KeyDescriptor,
+        key_type: KeyType,
+        caller_uid: u32,
+        check_permission: impl FnOnce(&KeyDescriptor, Option<KeyPermSet>) -> Result<()>,
+    ) -> Result<()> {
+        self.with_transaction(TransactionBehavior::Immediate, |tx| {
+            let (key_id, access_key_descriptor, access_vector) =
+                Self::load_access_tuple(tx, key, key_type, caller_uid)
+                    .context("Trying to get access tuple.")?;
+
+            // Perform access control. It is vital that we return here if the permission is denied.
+            // So do not touch that '?' at the end.
+            check_permission(&access_key_descriptor, access_vector)
+                .context("While checking permission.")?;
+
+            Self::mark_unreferenced(tx, key_id).context("Trying to mark the key unreferenced.")
+        })
+        .context("In unbind_key.")
+    }
+
     fn load_key_components(
         tx: &Transaction,
         load_bits: KeyEntryLoadBits,
@@ -1230,13 +1478,23 @@
     ) -> Result<KeyEntry> {
         let metadata = KeyMetaData::load_from_db(key_id, &tx).context("In load_key_components.")?;
 
-        let (sec_level, km_blob, cert_blob, cert_chain_blob) =
+        let (km_blob, cert_blob, cert_chain_blob) =
             Self::load_blob_components(key_id, load_bits, &tx)
                 .context("In load_key_components.")?;
 
         let parameters =
             Self::load_key_parameters(key_id, &tx).context("In load_key_components.")?;
 
+        // Extract the security level by checking the security level of the origin tag.
+        // Super keys don't have key parameters so we use security_level software by default.
+        let sec_level = parameters
+            .iter()
+            .find_map(|k| match k.get_tag() {
+                Tag::ORIGIN => Some(*k.security_level()),
+                _ => None,
+            })
+            .unwrap_or(SecurityLevel::SOFTWARE);
+
         Ok(KeyEntry {
             id: key_id,
             km_blob,
@@ -1256,12 +1514,13 @@
             .conn
             .prepare(
                 "SELECT alias FROM persistent.keyentry
-             WHERE domain = ? AND namespace = ? AND alias IS NOT NULL;",
+             WHERE domain = ? AND namespace = ? AND alias IS NOT NULL AND state = ?;",
             )
             .context("In list: Failed to prepare.")?;
 
-        let mut rows =
-            stmt.query(params![domain.0 as u32, namespace]).context("In list: Failed to query.")?;
+        let mut rows = stmt
+            .query(params![domain.0 as u32, namespace, KeyLifeCycle::Live])
+            .context("In list: Failed to query.")?;
 
         let mut descriptors: Vec<KeyDescriptor> = Vec::new();
         db_utils::with_rows_extract_all(&mut rows, |row| {
@@ -1602,7 +1861,7 @@
     #[test]
     fn test_persistence_for_files() -> Result<()> {
         let temp_dir = TempDir::new("persistent_db_test")?;
-        let db = KeystoreDB::new(temp_dir.path())?;
+        let mut db = KeystoreDB::new(temp_dir.path())?;
 
         db.create_key_entry(Domain::APP, 100)?;
         let entries = get_keyentry(&db)?;
@@ -1621,7 +1880,7 @@
             (ke.domain.unwrap(), ke.namespace.unwrap(), ke.alias.as_deref())
         }
 
-        let db = new_test_db()?;
+        let mut db = new_test_db()?;
 
         db.create_key_entry(Domain::APP, 100)?;
         db.create_key_entry(Domain::SELINUX, 101)?;
@@ -1712,8 +1971,8 @@
 
         let mut db = new_test_db()?;
         db.conn.execute(
-            "INSERT INTO persistent.keyentry (id, key_type, domain, namespace, alias)
-                VALUES (1, 0, 0, 15, 'key'), (2, 0, 2, 7, 'yek');",
+            "INSERT INTO persistent.keyentry (id, key_type, domain, namespace, alias, state)
+                VALUES (1, 0, 0, 15, 'key', 1), (2, 0, 2, 7, 'yek', 1);",
             NO_PARAMS,
         )?;
         let app_key = KeyDescriptor {
@@ -1730,8 +1989,8 @@
         reset_random();
         let next_random = 0i64;
 
-        let app_granted_key =
-            db.grant(app_key.clone(), CALLER_UID, GRANTEE_UID, PVEC1, |k, a| {
+        let app_granted_key = db
+            .grant(app_key.clone(), CALLER_UID, GRANTEE_UID, PVEC1, |k, a| {
                 assert_eq!(*a, PVEC1);
                 assert_eq!(
                     *k,
@@ -1744,7 +2003,8 @@
                     }
                 );
                 Ok(())
-            })?;
+            })
+            .unwrap();
 
         assert_eq!(
             app_granted_key,
@@ -1764,8 +2024,8 @@
             blob: None,
         };
 
-        let selinux_granted_key =
-            db.grant(selinux_key.clone(), CALLER_UID, 12, PVEC1, |k, a| {
+        let selinux_granted_key = db
+            .grant(selinux_key.clone(), CALLER_UID, 12, PVEC1, |k, a| {
                 assert_eq!(*a, PVEC1);
                 assert_eq!(
                     *k,
@@ -1779,7 +2039,8 @@
                     }
                 );
                 Ok(())
-            })?;
+            })
+            .unwrap();
 
         assert_eq!(
             selinux_granted_key,
@@ -1793,8 +2054,8 @@
         );
 
         // This should update the existing grant with PVEC2.
-        let selinux_granted_key =
-            db.grant(selinux_key.clone(), CALLER_UID, 12, PVEC2, |k, a| {
+        let selinux_granted_key = db
+            .grant(selinux_key.clone(), CALLER_UID, 12, PVEC2, |k, a| {
                 assert_eq!(*a, PVEC2);
                 assert_eq!(
                     *k,
@@ -1808,7 +2069,8 @@
                     }
                 );
                 Ok(())
-            })?;
+            })
+            .unwrap();
 
         assert_eq!(
             selinux_granted_key,
@@ -1859,40 +2121,27 @@
 
     #[test]
     fn test_insert_blob() -> Result<()> {
+        let key_id = KEY_ID_LOCK.get(3000);
         let mut db = new_test_db()?;
-        db.insert_blob(
-            &KEY_ID_LOCK.get(1),
-            SubComponentType::KEY_BLOB,
-            TEST_KEY_BLOB,
-            SecurityLevel::SOFTWARE,
-        )?;
-        db.insert_blob(
-            &KEY_ID_LOCK.get(1),
-            SubComponentType::CERT,
-            TEST_CERT_BLOB,
-            SecurityLevel::TRUSTED_ENVIRONMENT,
-        )?;
-        db.insert_blob(
-            &KEY_ID_LOCK.get(1),
-            SubComponentType::CERT_CHAIN,
-            TEST_CERT_CHAIN_BLOB,
-            SecurityLevel::STRONGBOX,
-        )?;
+        db.insert_blob(&key_id, SubComponentType::KEY_BLOB, TEST_KEY_BLOB)?;
+        db.insert_blob(&key_id, SubComponentType::CERT, TEST_CERT_BLOB)?;
+        db.insert_blob(&key_id, SubComponentType::CERT_CHAIN, TEST_CERT_CHAIN_BLOB)?;
+        drop(key_id);
 
         let mut stmt = db.conn.prepare(
-            "SELECT subcomponent_type, keyentryid, blob, sec_level FROM persistent.blobentry
-                ORDER BY sec_level ASC;",
+            "SELECT subcomponent_type, keyentryid, blob FROM persistent.blobentry
+                ORDER BY subcomponent_type ASC;",
         )?;
         let mut rows = stmt
-            .query_map::<(SubComponentType, i64, Vec<u8>, i64), _, _>(NO_PARAMS, |row| {
-                Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?))
+            .query_map::<(SubComponentType, i64, Vec<u8>), _, _>(NO_PARAMS, |row| {
+                Ok((row.get(0)?, row.get(1)?, row.get(2)?))
             })?;
         let r = rows.next().unwrap().unwrap();
-        assert_eq!(r, (SubComponentType::KEY_BLOB, 1, TEST_KEY_BLOB.to_vec(), 0));
+        assert_eq!(r, (SubComponentType::KEY_BLOB, 3000, TEST_KEY_BLOB.to_vec()));
         let r = rows.next().unwrap().unwrap();
-        assert_eq!(r, (SubComponentType::CERT, 1, TEST_CERT_BLOB.to_vec(), 1));
+        assert_eq!(r, (SubComponentType::CERT, 3000, TEST_CERT_BLOB.to_vec()));
         let r = rows.next().unwrap().unwrap();
-        assert_eq!(r, (SubComponentType::CERT_CHAIN, 1, TEST_CERT_CHAIN_BLOB.to_vec(), 2));
+        assert_eq!(r, (SubComponentType::CERT_CHAIN, 3000, TEST_CERT_CHAIN_BLOB.to_vec()));
 
         Ok(())
     }
@@ -1905,7 +2154,23 @@
         let key_id = make_test_key_entry(&mut db, Domain::APP, 1, TEST_ALIAS)
             .context("test_insert_and_load_full_keyentry_domain_app")?
             .0;
-        let (_key_guard, key_entry) = db.load_key_entry(
+        let (_key_guard, key_entry) = db
+            .load_key_entry(
+                KeyDescriptor {
+                    domain: Domain::APP,
+                    nspace: 0,
+                    alias: Some(TEST_ALIAS.to_string()),
+                    blob: None,
+                },
+                KeyType::Client,
+                KeyEntryLoadBits::BOTH,
+                1,
+                |_k, _av| Ok(()),
+            )
+            .unwrap();
+        assert_eq!(key_entry, make_test_key_entry_test_vector(key_id));
+
+        db.unbind_key(
             KeyDescriptor {
                 domain: Domain::APP,
                 nspace: 0,
@@ -1913,11 +2178,30 @@
                 blob: None,
             },
             KeyType::Client,
-            KeyEntryLoadBits::BOTH,
             1,
-            |_k, _av| Ok(()),
-        )?;
-        assert_eq!(key_entry, make_test_key_entry_test_vector(key_id));
+            |_, _| Ok(()),
+        )
+        .unwrap();
+
+        assert_eq!(
+            Some(&KsError::Rc(ResponseCode::KEY_NOT_FOUND)),
+            db.load_key_entry(
+                KeyDescriptor {
+                    domain: Domain::APP,
+                    nspace: 0,
+                    alias: Some(TEST_ALIAS.to_string()),
+                    blob: None,
+                },
+                KeyType::Client,
+                KeyEntryLoadBits::NONE,
+                1,
+                |_k, _av| Ok(()),
+            )
+            .unwrap_err()
+            .root_cause()
+            .downcast_ref::<KsError>()
+        );
+
         Ok(())
     }
 
@@ -1927,7 +2211,23 @@
         let key_id = make_test_key_entry(&mut db, Domain::SELINUX, 1, TEST_ALIAS)
             .context("test_insert_and_load_full_keyentry_domain_selinux")?
             .0;
-        let (_key_guard, key_entry) = db.load_key_entry(
+        let (_key_guard, key_entry) = db
+            .load_key_entry(
+                KeyDescriptor {
+                    domain: Domain::SELINUX,
+                    nspace: 1,
+                    alias: Some(TEST_ALIAS.to_string()),
+                    blob: None,
+                },
+                KeyType::Client,
+                KeyEntryLoadBits::BOTH,
+                1,
+                |_k, _av| Ok(()),
+            )
+            .unwrap();
+        assert_eq!(key_entry, make_test_key_entry_test_vector(key_id));
+
+        db.unbind_key(
             KeyDescriptor {
                 domain: Domain::SELINUX,
                 nspace: 1,
@@ -1935,11 +2235,30 @@
                 blob: None,
             },
             KeyType::Client,
-            KeyEntryLoadBits::BOTH,
             1,
-            |_k, _av| Ok(()),
-        )?;
-        assert_eq!(key_entry, make_test_key_entry_test_vector(key_id));
+            |_, _| Ok(()),
+        )
+        .unwrap();
+
+        assert_eq!(
+            Some(&KsError::Rc(ResponseCode::KEY_NOT_FOUND)),
+            db.load_key_entry(
+                KeyDescriptor {
+                    domain: Domain::SELINUX,
+                    nspace: 1,
+                    alias: Some(TEST_ALIAS.to_string()),
+                    blob: None,
+                },
+                KeyType::Client,
+                KeyEntryLoadBits::NONE,
+                1,
+                |_k, _av| Ok(()),
+            )
+            .unwrap_err()
+            .root_cause()
+            .downcast_ref::<KsError>()
+        );
+
         Ok(())
     }
 
@@ -1949,14 +2268,39 @@
         let key_id = make_test_key_entry(&mut db, Domain::SELINUX, 1, TEST_ALIAS)
             .context("test_insert_and_load_full_keyentry_domain_key_id")?
             .0;
-        let (_key_guard, key_entry) = db.load_key_entry(
+        let (_, key_entry) = db
+            .load_key_entry(
+                KeyDescriptor { domain: Domain::KEY_ID, nspace: key_id, alias: None, blob: None },
+                KeyType::Client,
+                KeyEntryLoadBits::BOTH,
+                1,
+                |_k, _av| Ok(()),
+            )
+            .unwrap();
+
+        assert_eq!(key_entry, make_test_key_entry_test_vector(key_id));
+
+        db.unbind_key(
             KeyDescriptor { domain: Domain::KEY_ID, nspace: key_id, alias: None, blob: None },
             KeyType::Client,
-            KeyEntryLoadBits::BOTH,
             1,
-            |_k, _av| Ok(()),
-        )?;
-        assert_eq!(key_entry, make_test_key_entry_test_vector(key_id));
+            |_, _| Ok(()),
+        )
+        .unwrap();
+
+        assert_eq!(
+            Some(&KsError::Rc(ResponseCode::KEY_NOT_FOUND)),
+            db.load_key_entry(
+                KeyDescriptor { domain: Domain::KEY_ID, nspace: key_id, alias: None, blob: None },
+                KeyType::Client,
+                KeyEntryLoadBits::NONE,
+                1,
+                |_k, _av| Ok(()),
+            )
+            .unwrap_err()
+            .root_cause()
+            .downcast_ref::<KsError>()
+        );
 
         Ok(())
     }
@@ -1968,29 +2312,55 @@
             .context("test_insert_and_load_full_keyentry_from_grant")?
             .0;
 
-        let granted_key = db.grant(
-            KeyDescriptor {
-                domain: Domain::APP,
-                nspace: 0,
-                alias: Some(TEST_ALIAS.to_string()),
-                blob: None,
-            },
-            1,
-            2,
-            key_perm_set![KeyPerm::use_()],
-            |_k, _av| Ok(()),
-        )?;
+        let granted_key = db
+            .grant(
+                KeyDescriptor {
+                    domain: Domain::APP,
+                    nspace: 0,
+                    alias: Some(TEST_ALIAS.to_string()),
+                    blob: None,
+                },
+                1,
+                2,
+                key_perm_set![KeyPerm::use_()],
+                |_k, _av| Ok(()),
+            )
+            .unwrap();
 
         debug_dump_grant_table(&mut db)?;
 
-        let (_key_guard, key_entry) =
-            db.load_key_entry(granted_key, KeyType::Client, KeyEntryLoadBits::BOTH, 2, |k, av| {
-                assert_eq!(Domain::GRANT, k.domain);
-                assert!(av.unwrap().includes(KeyPerm::use_()));
-                Ok(())
-            })?;
+        let (_key_guard, key_entry) = db
+            .load_key_entry(
+                granted_key.clone(),
+                KeyType::Client,
+                KeyEntryLoadBits::BOTH,
+                2,
+                |k, av| {
+                    assert_eq!(Domain::GRANT, k.domain);
+                    assert!(av.unwrap().includes(KeyPerm::use_()));
+                    Ok(())
+                },
+            )
+            .unwrap();
 
         assert_eq!(key_entry, make_test_key_entry_test_vector(key_id));
+
+        db.unbind_key(granted_key.clone(), KeyType::Client, 2, |_, _| Ok(())).unwrap();
+
+        assert_eq!(
+            Some(&KsError::Rc(ResponseCode::KEY_NOT_FOUND)),
+            db.load_key_entry(
+                granted_key,
+                KeyType::Client,
+                KeyEntryLoadBits::NONE,
+                2,
+                |_k, _av| Ok(()),
+            )
+            .unwrap_err()
+            .root_cause()
+            .downcast_ref::<KsError>()
+        );
+
         Ok(())
     }
 
@@ -2005,18 +2375,20 @@
             let key_id = make_test_key_entry(&mut db, Domain::APP, 33, KEY_LOCK_TEST_ALIAS)
                 .context("test_insert_and_load_full_keyentry_domain_app")?
                 .0;
-            let (_key_guard, key_entry) = db.load_key_entry(
-                KeyDescriptor {
-                    domain: Domain::APP,
-                    nspace: 0,
-                    alias: Some(KEY_LOCK_TEST_ALIAS.to_string()),
-                    blob: None,
-                },
-                KeyType::Client,
-                KeyEntryLoadBits::BOTH,
-                33,
-                |_k, _av| Ok(()),
-            )?;
+            let (_key_guard, key_entry) = db
+                .load_key_entry(
+                    KeyDescriptor {
+                        domain: Domain::APP,
+                        nspace: 0,
+                        alias: Some(KEY_LOCK_TEST_ALIAS.to_string()),
+                        blob: None,
+                    },
+                    KeyType::Client,
+                    KeyEntryLoadBits::BOTH,
+                    33,
+                    |_k, _av| Ok(()),
+                )
+                .unwrap();
             assert_eq!(key_entry, make_test_key_entry_test_vector(key_id));
             let state = Arc::new(AtomicU8::new(1));
             let state2 = state.clone();
@@ -2182,6 +2554,7 @@
         domain: Option<Domain>,
         namespace: Option<i64>,
         alias: Option<String>,
+        state: KeyLifeCycle,
     }
 
     fn get_keyentry(db: &KeystoreDB) -> Result<Vec<KeyEntryRow>> {
@@ -2197,6 +2570,7 @@
                     },
                     namespace: row.get(3)?,
                     alias: row.get(4)?,
+                    state: row.get(5)?,
                 })
             })?
             .map(|r| r.context("Could not read keyentry row."))
@@ -2432,24 +2806,9 @@
         alias: &str,
     ) -> Result<KeyIdGuard> {
         let key_id = db.create_key_entry(domain, namespace)?;
-        db.insert_blob(
-            &key_id,
-            SubComponentType::KEY_BLOB,
-            TEST_KEY_BLOB,
-            SecurityLevel::TRUSTED_ENVIRONMENT,
-        )?;
-        db.insert_blob(
-            &key_id,
-            SubComponentType::CERT,
-            TEST_CERT_BLOB,
-            SecurityLevel::TRUSTED_ENVIRONMENT,
-        )?;
-        db.insert_blob(
-            &key_id,
-            SubComponentType::CERT_CHAIN,
-            TEST_CERT_CHAIN_BLOB,
-            SecurityLevel::TRUSTED_ENVIRONMENT,
-        )?;
+        db.insert_blob(&key_id, SubComponentType::KEY_BLOB, TEST_KEY_BLOB)?;
+        db.insert_blob(&key_id, SubComponentType::CERT, TEST_CERT_BLOB)?;
+        db.insert_blob(&key_id, SubComponentType::CERT_CHAIN, TEST_CERT_CHAIN_BLOB)?;
         db.insert_keyparameter(&key_id, &make_test_params())?;
         let mut metadata = KeyMetaData::new();
         metadata.add(KeyMetaEntry::EncryptedBy(EncryptedBy::Password));
@@ -2480,19 +2839,22 @@
     }
 
     fn debug_dump_keyentry_table(db: &mut KeystoreDB) -> Result<()> {
-        let mut stmt = db
-            .conn
-            .prepare("SELECT id, key_type, domain, namespace, alias FROM persistent.keyentry;")?;
-        let rows = stmt.query_map::<(i64, KeyType, i32, i64, String), _, _>(NO_PARAMS, |row| {
-            Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?))
-        })?;
+        let mut stmt = db.conn.prepare(
+            "SELECT id, key_type, domain, namespace, alias, state FROM persistent.keyentry;",
+        )?;
+        let rows = stmt.query_map::<(i64, KeyType, i32, i64, String, KeyLifeCycle), _, _>(
+            NO_PARAMS,
+            |row| {
+                Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?, row.get(4)?, row.get(5)?))
+            },
+        )?;
 
         println!("Key entry table rows:");
         for r in rows {
-            let (id, key_type, domain, namespace, alias) = r.unwrap();
+            let (id, key_type, domain, namespace, alias, state) = r.unwrap();
             println!(
-                "    id: {} KeyType: {:?} Domain: {} Namespace: {} Alias: {}",
-                id, key_type, domain, namespace, alias
+                "    id: {} KeyType: {:?} Domain: {} Namespace: {} Alias: {} State: {:?}",
+                id, key_type, domain, namespace, alias, state
             );
         }
         Ok(())
diff --git a/keystore2/src/gc.rs b/keystore2/src/gc.rs
new file mode 100644
index 0000000..b5bdd98
--- /dev/null
+++ b/keystore2/src/gc.rs
@@ -0,0 +1,94 @@
+// Copyright 2020, The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! This module implements the key garbage collector.
+//! The key garbage collector has one public function `notify_gc()`. This will create
+//! a thread on demand which will query the database for unreferenced key entries,
+//! optionally dispose of sensitive key material appropriately, and then delete
+//! the key entry from the database.
+
+use crate::globals::{get_keymint_device, DB};
+use crate::{error::map_km_error, globals::ASYNC_TASK};
+use android_hardware_security_keymint::aidl::android::hardware::security::keymint::IKeyMintDevice::IKeyMintDevice;
+use anyhow::Result;
+
+#[derive(Clone, Copy)]
+pub struct Gc {
+    remaining_tries: u32,
+}
+
+impl Gc {
+    const MAX_ERROR_RETRIES: u32 = 3u32;
+
+    /// Attempts to process one unreferenced key from the database.
+    /// Returns Ok(true) if a key was deleted and Ok(false) if there were no more keys to process.
+    /// We process one key at a time, because deleting a key is a time consuming process which
+    /// may involve calling into the KeyMint backend and we don't want to hog neither the backend
+    /// nor the database for extended periods of time.
+    fn process_one_key() -> Result<bool> {
+        DB.with(|db| {
+            let mut db = db.borrow_mut();
+            if let Some((key_id, mut key_entry)) = db.get_unreferenced_key()? {
+                if let Some(blob) = key_entry.take_km_blob() {
+                    let km_dev: Box<dyn IKeyMintDevice> =
+                        get_keymint_device(key_entry.sec_level())?.get_interface()?;
+                    if let Err(e) = map_km_error(km_dev.deleteKey(&blob)) {
+                        // Log but ignore error.
+                        log::error!("Error trying to delete key. {:?}", e);
+                    }
+                }
+                db.purge_key_entry(key_id)?;
+                return Ok(true);
+            }
+            Ok(false)
+        })
+    }
+
+    /// Processes one key and then schedules another attempt until it runs out of tries or keys
+    /// to delete.
+    fn process_all(mut self) {
+        match Self::process_one_key() {
+            // We successfully removed a key.
+            Ok(true) => self.remaining_tries = Self::MAX_ERROR_RETRIES,
+            // There were no more keys to remove. We may exit.
+            Ok(false) => self.remaining_tries = 0,
+            // An error occurred. We retry in case the error was transient, but
+            // we also count down the number of tries so that we don't spin
+            // indefinitely.
+            Err(e) => {
+                self.remaining_tries -= 1;
+                log::error!(
+                    concat!(
+                        "Failed to delete key. Retrying in case this error was transient. ",
+                        "(Tries remaining {}) {:?}"
+                    ),
+                    self.remaining_tries,
+                    e
+                )
+            }
+        }
+        if self.remaining_tries != 0 {
+            ASYNC_TASK.queue_lo(move || {
+                self.process_all();
+            })
+        }
+    }
+
+    /// Notifies the key garbage collector to iterate through unreferenced keys and attempt
+    /// their deletion. We only process one key at a time and then schedule another
+    /// attempt by queueing it in the async_task (low priority) queue.
+    pub fn notify_gc() {
+        ASYNC_TASK.queue_lo(|| Self { remaining_tries: Self::MAX_ERROR_RETRIES }.process_all())
+    }
+}
diff --git a/keystore2/src/globals.rs b/keystore2/src/globals.rs
index d6c2ba4..36f3e99 100644
--- a/keystore2/src/globals.rs
+++ b/keystore2/src/globals.rs
@@ -16,6 +16,8 @@
 //! database connections and connections to services that Keystore needs
 //! to talk to.
 
+use crate::async_task::AsyncTask;
+use crate::gc::Gc;
 use crate::super_key::SuperKeyManager;
 use crate::utils::Asp;
 use crate::{
@@ -27,9 +29,40 @@
 };
 use anyhow::{Context, Result};
 use lazy_static::lazy_static;
-use std::cell::RefCell;
 use std::collections::HashMap;
 use std::sync::Mutex;
+use std::{cell::RefCell, sync::Once};
+
+static DB_INIT: Once = Once::new();
+
+/// Open a connection to the Keystore 2.0 database. This is called during the initialization of
+/// the thread local DB field. It should never be called directly. The first time this is called
+/// we also call KeystoreDB::cleanup_leftovers to restore the key lifecycle invariant. See the
+/// documentation of cleanup_leftovers for more details.
+fn create_thread_local_db() -> KeystoreDB {
+    let mut db = KeystoreDB::new(
+        // Keystore changes to the database directory on startup
+        // (see keystore2_main.rs).
+        &std::env::current_dir().expect("Could not get the current working directory."),
+    )
+    .expect("Failed to open database.");
+    DB_INIT.call_once(|| {
+        log::info!("Touching Keystore 2.0 database for this first time since boot.");
+        log::info!("Calling cleanup leftovers.");
+        let n = db.cleanup_leftovers().expect("Failed to cleanup database on startup.");
+        if n != 0 {
+            log::info!(
+                concat!(
+                    "Cleaned up {} failed entries. ",
+                    "This indicates keystore crashed during key generation."
+                ),
+                n
+            );
+        }
+        Gc::notify_gc();
+    });
+    db
+}
 
 thread_local! {
     /// Database connections are not thread safe, but connecting to the
@@ -37,14 +70,7 @@
     /// used by only one thread. So we store one database connection per
     /// thread in this thread local key.
     pub static DB: RefCell<KeystoreDB> =
-            RefCell::new(
-                KeystoreDB::new(
-                    // Keystore changes to the database directory on startup
-                    // (see keystor2_main.rs).
-                    &std::env::current_dir()
-                    .expect("Could not get the current working directory.")
-                )
-                .expect("Failed to open database."));
+            RefCell::new(create_thread_local_db());
 }
 
 lazy_static! {
@@ -52,6 +78,9 @@
     pub static ref SUPER_KEY: SuperKeyManager = Default::default();
     /// Map of KeyMint devices.
     static ref KEY_MINT_DEVICES: Mutex<HashMap<SecurityLevel, Asp>> = Default::default();
+    /// A single on-demand worker thread that handles deferred tasks with two different
+    /// priorities.
+    pub static ref ASYNC_TASK: AsyncTask = Default::default();
 }
 
 static KEYMINT_SERVICE_NAME: &str = "android.hardware.security.keymint.IKeyMintDevice";
diff --git a/keystore2/src/lib.rs b/keystore2/src/lib.rs
index 3fb938c..6b1dea3 100644
--- a/keystore2/src/lib.rs
+++ b/keystore2/src/lib.rs
@@ -29,7 +29,9 @@
 pub mod service;
 pub mod utils;
 
+mod async_task;
 mod db_utils;
+mod gc;
 mod super_key;
 
 #[cfg(test)]
diff --git a/keystore2/src/security_level.rs b/keystore2/src/security_level.rs
index 29bb9b2..af59f79 100644
--- a/keystore2/src/security_level.rs
+++ b/keystore2/src/security_level.rs
@@ -114,47 +114,21 @@
                 KeyDescriptor { domain: Domain::BLOB, blob: Some(blob.data), ..Default::default() }
             }
             _ => DB
-                .with(|db| {
-                    let mut db = db.borrow_mut();
-                    let key_id = db
-                        .create_key_entry(key.domain, key.nspace)
-                        .context("Trying to create a key entry.")?;
-                    db.insert_blob(
-                        &key_id,
-                        SubComponentType::KEY_BLOB,
-                        &blob.data,
-                        self.security_level,
-                    )
-                    .context("Trying to insert km blob.")?;
-                    if let Some(c) = &cert {
-                        db.insert_blob(&key_id, SubComponentType::CERT, c, self.security_level)
-                            .context("Trying to insert cert blob.")?;
-                    }
-                    if let Some(c) = &cert_chain {
-                        db.insert_blob(
-                            &key_id,
-                            SubComponentType::CERT_CHAIN,
-                            c,
-                            self.security_level,
-                        )
-                        .context("Trying to insert cert chain blob.")?;
-                    }
-                    db.insert_keyparameter(&key_id, &key_parameters)
-                        .context("Trying to insert key parameters.")?;
+                .with::<_, Result<KeyDescriptor>>(|db| {
                     let mut metadata = KeyMetaData::new();
                     metadata.add(KeyMetaEntry::CreationDate(creation_date));
-                    db.insert_key_metadata(&key_id, &metadata)
-                        .context("Trying to insert key metadata.")?;
-                    match &key.alias {
-                        Some(alias) => db
-                            .rebind_alias(&key_id, alias, key.domain, key.nspace)
-                            .context("Failed to rebind alias.")?,
-                        None => {
-                            return Err(error::Error::sys()).context(
-                                "Alias must be specified. (This should have been checked earlier.)",
-                            )
-                        }
-                    }
+
+                    let mut db = db.borrow_mut();
+                    let key_id = db
+                        .store_new_key(
+                            key,
+                            &key_parameters,
+                            &blob.data,
+                            cert.as_deref(),
+                            cert_chain.as_deref(),
+                            &metadata,
+                        )
+                        .context("In store_new_key.")?;
                     Ok(KeyDescriptor {
                         domain: Domain::KEY_ID,
                         nspace: key_id.id(),
@@ -527,7 +501,6 @@
                             &key_id_guard,
                             SubComponentType::KEY_BLOB,
                             &upgraded_blob,
-                            self.security_level,
                         )
                     })
                     .context(concat!(
diff --git a/keystore2/src/service.rs b/keystore2/src/service.rs
index d185025..9c5a697 100644
--- a/keystore2/src/service.rs
+++ b/keystore2/src/service.rs
@@ -165,18 +165,13 @@
                 .context("Failed to load key_entry.")?;
 
             if let Some(cert) = public_cert {
-                db.insert_blob(&key_id_guard, SubComponentType::CERT, cert, key_entry.sec_level())
+                db.insert_blob(&key_id_guard, SubComponentType::CERT, cert)
                     .context("Failed to update cert subcomponent.")?;
             }
 
             if let Some(cert_chain) = certificate_chain {
-                db.insert_blob(
-                    &key_id_guard,
-                    SubComponentType::CERT_CHAIN,
-                    cert_chain,
-                    key_entry.sec_level(),
-                )
-                .context("Failed to update cert chain subcomponent.")?;
+                db.insert_blob(&key_id_guard, SubComponentType::CERT_CHAIN, cert_chain)
+                    .context("Failed to update cert chain subcomponent.")?;
             }
             Ok(())
         })
@@ -225,7 +220,13 @@
     }
 
     fn delete_key(&self, key: &KeyDescriptor) -> Result<()> {
-        // TODO implement.
+        let caller_uid = ThreadState::get_calling_uid();
+        DB.with(|db| {
+            db.borrow_mut().unbind_key(key.clone(), KeyType::Client, caller_uid, |k, av| {
+                check_key_permission(KeyPerm::delete(), k, &av).context("During delete_key.")
+            })
+        })
+        .context("In delete_key: Trying to unbind the key.")?;
         Ok(())
     }