Merge changes from topic "b/269460851"

* changes:
  keystore2: Fix timeout handling
  keystore2_test: Join all test threads
diff --git a/keystore2/src/rkpd_client.rs b/keystore2/src/rkpd_client.rs
index f1e8e11..0ea2d39 100644
--- a/keystore2/src/rkpd_client.rs
+++ b/keystore2/src/rkpd_client.rs
@@ -57,8 +57,11 @@
 
     fn send(&self, value: T) {
         if let Some(inner) = self.inner.lock().unwrap().take() {
-            // assert instead of unwrap, because on failure send returns Err(value)
-            assert!(inner.send(value).is_ok(), "thread state is terminally broken");
+            // It's possible for the corresponding receiver to time out and be dropped. In this
+            // case send() will fail. This error is not actionable though, so only log the error.
+            if inner.send(value).is_err() {
+                log::error!("SafeSender::send() failed");
+            }
         }
     }
 }
@@ -202,8 +205,14 @@
         .context(ks_err!("Trying to get key."))?;
 
     match timeout(RKPD_TIMEOUT, rx).await {
-        Err(e) => Err(Error::Rc(ResponseCode::OUT_OF_KEYS_TRANSIENT_ERROR))
-            .context(ks_err!("Waiting for RKPD key timed out: {:?}", e)),
+        Err(e) => {
+            // Make a best effort attempt to cancel the timed out request.
+            if let Err(e) = registration.cancelGetKey(&cb) {
+                log::error!("IRegistration::cancelGetKey failed: {:?}", e);
+            }
+            Err(Error::Rc(ResponseCode::OUT_OF_KEYS_TRANSIENT_ERROR))
+                .context(ks_err!("Waiting for RKPD key timed out: {:?}", e))
+        }
         Ok(v) => v.unwrap(),
     }
 }
@@ -314,52 +323,68 @@
     use keystore2_crypto::parse_subject_from_certificate;
     use std::collections::HashMap;
     use std::sync::atomic::{AtomicU32, Ordering};
+    use std::sync::{Arc, Mutex};
 
-    #[derive(Default)]
-    struct MockRegistration {
+    struct MockRegistrationValues {
         key: RemotelyProvisionedKey,
         latency: Option<Duration>,
+        thread_join_handles: Vec<Option<std::thread::JoinHandle<()>>>,
     }
 
+    struct MockRegistration(Arc<Mutex<MockRegistrationValues>>);
+
     impl MockRegistration {
         pub fn new_native_binder(
             key: &RemotelyProvisionedKey,
             latency: Option<Duration>,
         ) -> Strong<dyn IRegistration> {
-            let result = Self {
+            let result = Self(Arc::new(Mutex::new(MockRegistrationValues {
                 key: RemotelyProvisionedKey {
                     keyBlob: key.keyBlob.clone(),
                     encodedCertChain: key.encodedCertChain.clone(),
                 },
                 latency,
-            };
+                thread_join_handles: Vec::new(),
+            })));
             BnRegistration::new_binder(result, BinderFeatures::default())
         }
     }
 
+    impl Drop for MockRegistration {
+        fn drop(&mut self) {
+            let mut values = self.0.lock().unwrap();
+            for handle in values.thread_join_handles.iter_mut() {
+                // These are test threads. So, no need to worry too much about error handling.
+                handle.take().unwrap().join().unwrap();
+            }
+        }
+    }
+
     impl Interface for MockRegistration {}
 
     impl IRegistration for MockRegistration {
         fn getKey(&self, _: i32, cb: &Strong<dyn IGetKeyCallback>) -> binder::Result<()> {
+            let mut values = self.0.lock().unwrap();
             let key = RemotelyProvisionedKey {
-                keyBlob: self.key.keyBlob.clone(),
-                encodedCertChain: self.key.encodedCertChain.clone(),
+                keyBlob: values.key.keyBlob.clone(),
+                encodedCertChain: values.key.encodedCertChain.clone(),
             };
-            let latency = self.latency;
+            let latency = values.latency;
             let get_key_cb = cb.clone();
 
             // Need a separate thread to trigger timeout in the caller.
-            std::thread::spawn(move || {
+            let join_handle = std::thread::spawn(move || {
                 if let Some(duration) = latency {
                     std::thread::sleep(duration);
                 }
                 get_key_cb.onSuccess(&key).unwrap();
             });
+            values.thread_join_handles.push(Some(join_handle));
             Ok(())
         }
 
         fn cancelGetKey(&self, _: &Strong<dyn IGetKeyCallback>) -> binder::Result<()> {
-            todo!()
+            Ok(())
         }
 
         fn storeUpgradedKeyAsync(
@@ -370,8 +395,9 @@
         ) -> binder::Result<()> {
             // We are primarily concerned with timing out correctly. Storing the key in this mock
             // registration isn't particularly interesting, so skip that part.
+            let values = self.0.lock().unwrap();
             let store_cb = cb.clone();
-            let latency = self.latency;
+            let latency = values.latency;
 
             std::thread::spawn(move || {
                 if let Some(duration) = latency {
@@ -528,7 +554,7 @@
     fn test_get_mock_key_timeout() {
         let mock_key =
             RemotelyProvisionedKey { keyBlob: vec![1, 2, 3], encodedCertChain: vec![4, 5, 6] };
-        let latency = RKPD_TIMEOUT + Duration::from_secs(10);
+        let latency = RKPD_TIMEOUT + Duration::from_secs(1);
         let registration = get_mock_registration(&mock_key, Some(latency)).unwrap();
 
         let result =
@@ -553,7 +579,7 @@
     fn test_store_mock_key_timeout() {
         let mock_key =
             RemotelyProvisionedKey { keyBlob: vec![1, 2, 3], encodedCertChain: vec![4, 5, 6] };
-        let latency = RKPD_TIMEOUT + Duration::from_secs(10);
+        let latency = RKPD_TIMEOUT + Duration::from_secs(1);
         let registration = get_mock_registration(&mock_key, Some(latency)).unwrap();
 
         let result = tokio_rt().block_on(store_rkpd_attestation_key_with_registration_async(