Merge "uwb: Refactor default AIDL HAL implementation" into main am: 23a520e008 am: 45df9d0ee5

Original change: https://android-review.googlesource.com/c/platform/hardware/interfaces/+/3267754

Change-Id: I49d4c4b340e62dce66f8e99ded50c8271d793e54
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/uwb/aidl/default/Android.bp b/uwb/aidl/default/Android.bp
index eba18cf..037eac3 100644
--- a/uwb/aidl/default/Android.bp
+++ b/uwb/aidl/default/Android.bp
@@ -16,17 +16,13 @@
     prefer_rlib: true,
     rustlibs: [
         "android.hardware.uwb-V1-rust",
-        "liblibc",
         "liblogger",
         "liblog_rust",
         "libbinder_rs",
         "libbinder_tokio_rs",
         "libtokio",
-        "libtokio_util",
         "libnix",
         "libanyhow",
-        "libpdl_runtime",
-        "libuwb_uci_packets",
     ],
     proc_macros: [
         "libasync_trait",
diff --git a/uwb/aidl/default/src/service.rs b/uwb/aidl/default/src/service.rs
index e97b291..80fa8af 100644
--- a/uwb/aidl/default/src/service.rs
+++ b/uwb/aidl/default/src/service.rs
@@ -1,8 +1,6 @@
 use android_hardware_uwb::aidl::android::hardware::uwb::IUwb::{self, IUwb as _};
 use android_hardware_uwb::binder;
 
-use tokio::runtime::Runtime;
-
 use std::env;
 use std::panic;
 
@@ -25,13 +23,12 @@
 
     log::info!("UWB HAL starting up");
 
-    // Create the tokio runtime
-    let rt = Runtime::new()?;
+    let rt = tokio::runtime::Runtime::new()?;
 
     let chips = env::args()
         .skip(1) // Skip binary name
         .enumerate()
-        .map(|(i, arg)| uwb_chip::UwbChip::new(i.to_string(), arg));
+        .map(|(i, arg)| rt.block_on(uwb_chip::UwbChip::new(i.to_string(), arg)));
 
     binder::add_service(
         &format!("{}/default", IUwb::BpUwb::get_descriptor()),
diff --git a/uwb/aidl/default/src/uwb_chip.rs b/uwb/aidl/default/src/uwb_chip.rs
index 956cf6c..0ed05d8 100644
--- a/uwb/aidl/default/src/uwb_chip.rs
+++ b/uwb/aidl/default/src/uwb_chip.rs
@@ -7,126 +7,113 @@
 use binder::{DeathRecipient, IBinder, Result, Strong};
 
 use std::sync::Arc;
-use tokio::io::unix::AsyncFd;
-use tokio::select;
+use tokio::fs;
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
 use tokio::sync::Mutex;
-use tokio_util::sync::CancellationToken;
 
-use std::fs::{File, OpenOptions};
-use std::io::{self, Read, Write};
-use std::os::unix::fs::OpenOptionsExt;
-
-use pdl_runtime::Packet;
-use uwb_uci_packets::{DeviceResetCmdBuilder, ResetConfig, UciControlPacket, UciControlPacketHal};
-
-enum State {
+enum ClientState {
     Closed,
     Opened {
         callbacks: Strong<dyn IUwbClientCallback>,
-        handle: tokio::task::JoinHandle<()>,
-        serial: File,
-        death_recipient: DeathRecipient,
-        token: CancellationToken,
+        _death_recipient: DeathRecipient,
     },
 }
 
+struct ServiceState {
+    client_state: ClientState,
+    writer: fs::File,
+}
+
 pub struct UwbChip {
     name: String,
-    path: String,
-    state: Arc<Mutex<State>>,
+    _handle: tokio::task::JoinHandle<()>,
+    service_state: Arc<Mutex<ServiceState>>,
 }
 
-impl UwbChip {
-    pub fn new(name: String, path: String) -> Self {
-        Self {
-            name,
-            path,
-            state: Arc::new(Mutex::new(State::Closed)),
-        }
-    }
-}
-
-impl State {
-    /// Terminate the reader task.
-    async fn close(&mut self) -> Result<()> {
-        if let State::Opened {
-            ref mut token,
-            ref callbacks,
-            ref mut death_recipient,
-            ref mut handle,
-            ref mut serial,
-        } = *self
-        {
-            log::info!("waiting for task cancellation");
-            callbacks.as_binder().unlink_to_death(death_recipient)?;
-            token.cancel();
-            handle.await.unwrap();
-            let packet: UciControlPacket = DeviceResetCmdBuilder {
-                reset_config: ResetConfig::UwbsReset,
-            }
-            .build()
-            .into();
-            // DeviceResetCmd need to be send to reset the device to stop all running
-            // activities on UWBS.
-            let packet_vec: Vec<UciControlPacketHal> = packet.into();
-            for hal_packet in packet_vec.into_iter() {
-                serial
-                    .write(&hal_packet.encode_to_vec().unwrap())
-                    .map(|written| written as i32)
-                    .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
-            }
-            consume_device_reset_rsp_and_ntf(
-                &mut serial
-                    .try_clone()
-                    .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?,
-            );
-            log::info!("task successfully cancelled");
-            callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
-            *self = State::Closed;
-        }
-        Ok(())
-    }
-}
-
-fn consume_device_reset_rsp_and_ntf(reader: &mut File) {
-    // Poll the DeviceResetRsp and DeviceStatusNtf before hal is closed to prevent
-    // the host from getting response and notifications from a 'powered down' UWBS.
-    // Do nothing when these packets are received.
-    const DEVICE_RESET_RSP: [u8; 5] = [64, 0, 0, 1, 0];
-    const DEVICE_STATUS_NTF: [u8; 5] = [96, 1, 0, 1, 1];
-    let mut buffer = vec![0; DEVICE_RESET_RSP.len() + DEVICE_STATUS_NTF.len()];
-    read_exact(reader, &mut buffer).unwrap();
-
-    // Make sure received packets are the expected ones.
-    assert_eq!(&buffer[0..DEVICE_RESET_RSP.len()], &DEVICE_RESET_RSP);
-    assert_eq!(&buffer[DEVICE_RESET_RSP.len()..], &DEVICE_STATUS_NTF);
-}
-
-pub fn makeraw(file: File) -> io::Result<File> {
-    // Configure the file descriptor as raw fd.
+/// Configure a file descriptor as raw fd.
+pub fn makeraw(file: fs::File) -> std::io::Result<fs::File> {
     use nix::sys::termios::*;
     let mut attrs = tcgetattr(&file)?;
     cfmakeraw(&mut attrs);
     tcsetattr(&file, SetArg::TCSANOW, &attrs)?;
-
     Ok(file)
 }
 
-/// Wrapper around Read::read to handle EWOULDBLOCK.
-/// /!\ will actively wait for more data, make sure to call
-/// this method only when data is immediately expected.
-fn read_exact(file: &mut File, mut buf: &mut [u8]) -> io::Result<()> {
-    while buf.len() > 0 {
-        match file.read(buf) {
-            Ok(0) => panic!("unexpectedly reached end of file"),
-            Ok(read_len) => buf = &mut buf[read_len..],
-            Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue,
-            Err(err) => return Err(err),
+impl UwbChip {
+    pub async fn new(name: String, path: String) -> Self {
+        // Open the serial file and configure it as raw file
+        // descriptor.
+        let mut reader = fs::OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create(false)
+            .open(&path)
+            .await
+            .and_then(makeraw)
+            .expect("failed to open the serial device");
+        let writer = reader
+            .try_clone()
+            .await
+            .expect("failed to clone serial for writing");
+
+        // Create the chip
+        let service_state = Arc::new(Mutex::new(ServiceState {
+            writer,
+            client_state: ClientState::Closed,
+        }));
+
+        // Spawn the task that will run the polling loop.
+        let handle = {
+            let service_state = service_state.clone();
+
+            tokio::task::spawn(async move {
+                log::info!("UCI reader task started");
+
+                const MESSAGE_TYPE_MASK: u8 = 0b11100000;
+                const DATA_MESSAGE_TYPE: u8 = 0b000;
+                const UCI_HEADER_SIZE: usize = 4;
+                const UCI_BUFFER_SIZE: usize = 1024;
+
+                let mut buffer = [0; UCI_BUFFER_SIZE];
+
+                loop {
+                    reader
+                        .read_exact(&mut buffer[0..UCI_HEADER_SIZE])
+                        .await
+                        .expect("failed to read uci header bytes");
+                    let common_header = buffer[0];
+                    let mt = (common_header & MESSAGE_TYPE_MASK) >> 5;
+                    let payload_length = if mt == DATA_MESSAGE_TYPE {
+                        u16::from_le_bytes([buffer[2], buffer[3]]) as usize
+                    } else {
+                        buffer[3] as usize
+                    };
+
+                    let total_packet_length = payload_length + UCI_HEADER_SIZE;
+                    reader
+                        .read_exact(&mut buffer[UCI_HEADER_SIZE..total_packet_length])
+                        .await
+                        .expect("failed to read uci payload bytes");
+
+                    log::debug!(" <-- {:?}", &buffer[0..total_packet_length]);
+
+                    let service_state = service_state.lock().await;
+                    if let ClientState::Opened { ref callbacks, .. } = service_state.client_state {
+                        callbacks
+                            .onUciMessage(&buffer[0..total_packet_length])
+                            .unwrap();
+                    }
+                }
+            })
+        };
+
+        Self {
+            name,
+            _handle: handle,
+            service_state,
         }
     }
-    Ok(())
 }
-
 impl binder::Interface for UwbChip {}
 
 #[async_trait]
@@ -136,124 +123,30 @@
     }
 
     async fn open(&self, callbacks: &Strong<dyn IUwbClientCallback>) -> Result<()> {
-        log::debug!("open: {:?}", &self.path);
+        log::debug!("open");
 
-        let mut state = self.state.lock().await;
+        let mut service_state = self.service_state.lock().await;
 
-        if matches!(*state, State::Opened { .. }) {
+        if matches!(service_state.client_state, ClientState::Opened { .. }) {
             log::error!("the state is already opened");
             return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
         }
 
-        let serial = OpenOptions::new()
-            .read(true)
-            .write(true)
-            .create(false)
-            .custom_flags(libc::O_NONBLOCK)
-            .open(&self.path)
-            .and_then(makeraw)
-            .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
-
-        let state_death_recipient = self.state.clone();
-        let mut death_recipient = DeathRecipient::new(move || {
-            let mut state = state_death_recipient.blocking_lock();
-            log::info!("Uwb service has died");
-            if let State::Opened { ref mut token, .. } = *state {
-                token.cancel();
-                *state = State::Closed;
-            }
-        });
+        let mut death_recipient = {
+            let service_state = self.service_state.clone();
+            DeathRecipient::new(move || {
+                log::info!("Uwb service has died");
+                let mut service_state = service_state.blocking_lock();
+                service_state.client_state = ClientState::Closed;
+            })
+        };
 
         callbacks.as_binder().link_to_death(&mut death_recipient)?;
-
-        let token = CancellationToken::new();
-        let cloned_token = token.clone();
-
-        let client_callbacks = callbacks.clone();
-
-        let reader = serial
-            .try_clone()
-            .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
-
-        let join_handle = tokio::task::spawn(async move {
-            log::info!("UCI reader task started");
-            let mut reader = AsyncFd::new(reader).unwrap();
-
-            loop {
-                const MESSAGE_TYPE_MASK: u8 = 0b11100000;
-                const DATA_MESSAGE_TYPE: u8 = 0b000;
-                const UWB_HEADER_SIZE: usize = 4;
-                let mut buffer = vec![0; UWB_HEADER_SIZE];
-
-                // The only time where the task can be safely
-                // cancelled is when no packet bytes have been read.
-                //
-                // - read_exact() cannot be used here since it is not
-                //   cancellation safe.
-                // - read() cannot be used because it cannot be cancelled:
-                //   the syscall is executed blocking on the threadpool
-                //   and completes after termination of the task when
-                //   the pipe receives more data.
-                let read_len = loop {
-                    // On some platforms, the readiness detecting mechanism
-                    // relies on edge-triggered notifications. This means that
-                    // the OS will only notify Tokio when the file descriptor
-                    // transitions from not-ready to ready. For this to work
-                    // you should first try to read or write and only poll for
-                    // readiness if that fails with an error of
-                    // std::io::ErrorKind::WouldBlock.
-                    match reader.get_mut().read(&mut buffer) {
-                        Ok(0) => {
-                            log::error!("file unexpectedly closed");
-                            return;
-                        }
-                        Ok(read_len) => break read_len,
-                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => (),
-                        Err(_) => panic!("unexpected read failure"),
-                    }
-
-                    let mut guard = select! {
-                        _ = cloned_token.cancelled() => {
-                            log::info!("task is cancelled!");
-                            return;
-                        },
-                        result = reader.readable() => result.unwrap()
-                    };
-
-                    guard.clear_ready();
-                };
-
-                // Read the remaining header bytes, if truncated.
-                read_exact(reader.get_mut(), &mut buffer[read_len..]).unwrap();
-
-                let common_header = buffer[0];
-                let mt = (common_header & MESSAGE_TYPE_MASK) >> 5;
-                let payload_length = if mt == DATA_MESSAGE_TYPE {
-                    let payload_length_fields: [u8; 2] = buffer[2..=3].try_into().unwrap();
-                    u16::from_le_bytes(payload_length_fields) as usize
-                } else {
-                    buffer[3] as usize
-                };
-
-                let length = payload_length + UWB_HEADER_SIZE;
-                buffer.resize(length, 0);
-
-                // Read the payload bytes.
-                read_exact(reader.get_mut(), &mut buffer[UWB_HEADER_SIZE..]).unwrap();
-
-                log::debug!(" <-- {:?}", buffer);
-                client_callbacks.onUciMessage(&buffer).unwrap();
-            }
-        });
-
         callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?;
 
-        *state = State::Opened {
+        service_state.client_state = ClientState::Opened {
             callbacks: callbacks.clone(),
-            handle: join_handle,
-            serial,
-            death_recipient,
-            token,
+            _death_recipient: death_recipient,
         };
 
         Ok(())
@@ -262,19 +155,42 @@
     async fn close(&self) -> Result<()> {
         log::debug!("close");
 
-        let mut state = self.state.lock().await;
+        let mut service_state = self.service_state.lock().await;
 
-        if let State::Opened { .. } = *state {
-            state.close().await
-        } else {
-            Err(binder::ExceptionCode::ILLEGAL_STATE.into())
+        if matches!(service_state.client_state, ClientState::Closed) {
+            log::error!("the state is already closed");
+            return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
         }
+
+        // Send the command Device Reset to stop all running activities
+        // on the UWBS emulator. This is necessary because the emulator
+        // is otherwise not notified of the power down (the serial stays
+        // open).
+        //
+        // The response to the command will be dropped by the polling loop,
+        // as the callbacks will have been removed then.
+        let uci_core_device_reset_cmd = [0x20, 0x00, 0x00, 0x01, 0x00];
+
+        service_state
+            .writer
+            .write_all(&uci_core_device_reset_cmd)
+            .await
+            .expect("failed to write UCI Device Reset command");
+
+        if let ClientState::Opened { ref callbacks, .. } = service_state.client_state {
+            callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
+        }
+
+        service_state.client_state = ClientState::Closed;
+        Ok(())
     }
 
     async fn coreInit(&self) -> Result<()> {
         log::debug!("coreInit");
 
-        if let State::Opened { ref callbacks, .. } = *self.state.lock().await {
+        let service_state = self.service_state.lock().await;
+
+        if let ClientState::Opened { ref callbacks, .. } = service_state.client_state {
             callbacks.onHalEvent(UwbEvent::POST_INIT_CPLT, UwbStatus::OK)?;
             Ok(())
         } else {
@@ -289,22 +205,27 @@
     }
 
     async fn getSupportedAndroidUciVersion(&self) -> Result<i32> {
+        log::debug!("getSupportedAndroidUciVersion");
+
         Ok(1)
     }
 
     async fn sendUciMessage(&self, data: &[u8]) -> Result<i32> {
         log::debug!("sendUciMessage");
 
-        if let State::Opened { ref mut serial, .. } = &mut *self.state.lock().await {
-            log::debug!(" --> {:?}", data);
-            let result = serial
-                .write_all(data)
-                .map(|_| data.len() as i32)
-                .map_err(|_| binder::StatusCode::UNKNOWN_ERROR.into());
-            log::debug!(" status: {:?}", result);
-            result
-        } else {
-            Err(binder::ExceptionCode::ILLEGAL_STATE.into())
+        let mut service_state = self.service_state.lock().await;
+
+        if matches!(service_state.client_state, ClientState::Closed) {
+            log::error!("the state is not opened");
+            return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
         }
+
+        log::debug!(" --> {:?}", data);
+        service_state
+            .writer
+            .write_all(data)
+            .await
+            .map(|_| data.len() as i32)
+            .map_err(|_| binder::StatusCode::UNKNOWN_ERROR.into())
     }
 }