[UWB HAL] Use AsyncFd to read the buffer only when it's readable.

This CL fixes the issue that some bytes are missing after we toggle uwb
off and on.

Test: atest CtsUwbTestCases
Bug: 302005209

Change-Id: I8ad8072fe01c8d1f466d8debf0ccf5831b3eddf8
diff --git a/uwb/aidl/default/Android.bp b/uwb/aidl/default/Android.bp
index c6d1a52..2b7ef57 100644
--- a/uwb/aidl/default/Android.bp
+++ b/uwb/aidl/default/Android.bp
@@ -20,6 +20,7 @@
         "libbinder_rs",
         "libbinder_tokio_rs",
         "libtokio",
+        "libtokio_util",
         "libnix",
         "libanyhow",
     ],
diff --git a/uwb/aidl/default/src/uwb_chip.rs b/uwb/aidl/default/src/uwb_chip.rs
index 9587efb..b63aabe 100644
--- a/uwb/aidl/default/src/uwb_chip.rs
+++ b/uwb/aidl/default/src/uwb_chip.rs
@@ -4,32 +4,34 @@
 };
 use android_hardware_uwb::binder;
 use async_trait::async_trait;
-use binder::{Result, Strong};
+use binder::{DeathRecipient, IBinder, Result, Strong};
 
-use tokio::fs::{File, OpenOptions};
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use log::info;
+use std::sync::Arc;
+use tokio::io::unix::AsyncFd;
+use tokio::select;
 use tokio::sync::Mutex;
+use tokio_util::sync::CancellationToken;
 
+use std::fs::{File, OpenOptions};
+use std::io::{self, Read, Write};
 use std::os::fd::AsRawFd;
 
-use std::io;
-
-use nix::sys::termios;
-
 enum State {
     Closed,
     Opened {
         callbacks: Strong<dyn IUwbClientCallback>,
-        #[allow(dead_code)]
-        tasks: tokio::task::JoinSet<()>,
+        _handle: tokio::task::JoinHandle<()>,
         serial: File,
+        death_recipient: DeathRecipient,
+        token: CancellationToken,
     },
 }
 
 pub struct UwbChip {
     name: String,
     path: String,
-    state: Mutex<State>,
+    state: Arc<Mutex<State>>,
 }
 
 impl UwbChip {
@@ -37,23 +39,59 @@
         Self {
             name,
             path,
-            state: Mutex::new(State::Closed),
+            state: Arc::new(Mutex::new(State::Closed)),
         }
     }
 }
 
+impl State {
+    /// Terminate the reader task.
+    #[allow(dead_code)]
+    fn close(&mut self) -> Result<()> {
+        if let State::Opened { ref mut token, ref callbacks, ref mut death_recipient, .. } = *self {
+            log::info!("waiting for task cancellation");
+            callbacks.as_binder().unlink_to_death(death_recipient)?;
+            token.cancel();
+            log::info!("task successfully cancelled");
+            callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
+            *self = State::Closed;
+        }
+        Ok(())
+    }
+}
+
 pub fn makeraw(file: File) -> io::Result<File> {
     let fd = file.as_raw_fd();
 
-    let mut attrs = termios::tcgetattr(fd)?;
+    // Configure the file descritpro as raw fd.
+    use nix::sys::termios::*;
+    let mut attrs = tcgetattr(fd)?;
+    cfmakeraw(&mut attrs);
+    tcsetattr(fd, SetArg::TCSANOW, &attrs)?;
 
-    termios::cfmakeraw(&mut attrs);
-
-    termios::tcsetattr(fd, termios::SetArg::TCSANOW, &attrs)?;
+    // Configure the file descriptor as non blocking.
+    use nix::fcntl::*;
+    let flags = OFlag::from_bits(fcntl(fd, FcntlArg::F_GETFL)?).unwrap();
+    fcntl(fd, FcntlArg::F_SETFL(flags | OFlag::O_NONBLOCK))?;
 
     Ok(file)
 }
 
+/// Wrapper around Read::read to handle EWOULDBLOCK.
+/// /!\ will actively wait for more data, make sure to call
+/// this method only when data is immediately expected.
+fn read_exact(file: &mut File, mut buf: &mut [u8]) -> io::Result<()> {
+    while buf.len() > 0 {
+        match file.read(buf) {
+            Ok(0) => panic!("unexpectedly reached end of file"),
+            Ok(read_len) => buf = &mut buf[read_len..],
+            Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue,
+            Err(err) => return Err(err),
+        }
+    }
+    Ok(())
+}
+
 impl binder::Interface for UwbChip {}
 
 #[async_trait]
@@ -65,60 +103,109 @@
     async fn open(&self, callbacks: &Strong<dyn IUwbClientCallback>) -> Result<()> {
         log::debug!("open: {:?}", &self.path);
 
+        let mut state = self.state.lock().await;
+
+        if matches!(*state, State::Opened { .. }) {
+            log::error!("the state is already opened");
+            return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
+        }
+
         let serial = OpenOptions::new()
             .read(true)
             .write(true)
             .create(false)
             .open(&self.path)
-            .await
             .and_then(makeraw)
             .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
 
-        let mut state = self.state.lock().await;
+        let state_death_recipient = self.state.clone();
+        let mut death_recipient = DeathRecipient::new(move || {
+            let mut state = state_death_recipient.blocking_lock();
+            log::info!("Uwb service has died");
+            state.close().unwrap();
+        });
 
-        if let State::Closed = *state {
-            let client_callbacks = callbacks.clone();
+        callbacks.as_binder().link_to_death(&mut death_recipient)?;
 
-            let mut tasks = tokio::task::JoinSet::new();
-            let mut reader = serial
-                .try_clone()
-                .await
-                .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
+        let token = CancellationToken::new();
+        let cloned_token = token.clone();
 
-            tasks.spawn(async move {
-                loop {
-                    const UWB_HEADER_SIZE: usize = 4;
+        let client_callbacks = callbacks.clone();
 
-                    let mut buffer = vec![0; UWB_HEADER_SIZE];
-                    reader
-                        .read_exact(&mut buffer[0..UWB_HEADER_SIZE])
-                        .await
-                        .unwrap();
+        let reader = serial
+            .try_clone()
+            .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
 
-                    let length = buffer[3] as usize + UWB_HEADER_SIZE;
+        let join_handle = tokio::task::spawn(async move {
+            info!("UCI reader task started");
+            let mut reader = AsyncFd::new(reader).unwrap();
 
-                    buffer.resize(length, 0);
-                    reader
-                        .read_exact(&mut buffer[UWB_HEADER_SIZE..length])
-                        .await
-                        .unwrap();
+            loop {
+                const UWB_HEADER_SIZE: usize = 4;
+                let mut buffer = vec![0; UWB_HEADER_SIZE];
 
-                    client_callbacks.onUciMessage(&buffer[..]).unwrap();
-                }
-            });
+                // The only time where the task can be safely
+                // cancelled is when no packet bytes have been read.
+                //
+                // - read_exact() cannot be used here since it is not
+                //   cancellation safe.
+                // - read() cannot be used because it cannot be cancelled:
+                //   the syscall is executed blocking on the threadpool
+                //   and completes after termination of the task when
+                //   the pipe receives more data.
+                let read_len = loop {
+                    // On some platforms, the readiness detecting mechanism
+                    // relies on edge-triggered notifications. This means that
+                    // the OS will only notify Tokio when the file descriptor
+                    // transitions from not-ready to ready. For this to work
+                    // you should first try to read or write and only poll for
+                    // readiness if that fails with an error of 
+                    // std::io::ErrorKind::WouldBlock.
+                    match reader.get_mut().read(&mut buffer) {
+                        Ok(0) => {
+                            log::error!("file unexpectedly closed");
+                            return;
+                        }
+                        Ok(read_len) => break read_len,
+                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => (),
+                        Err(_) => panic!("unexpected read failure"),
+                    }
 
-            callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?;
+                    let mut guard = select! {
+                        _ = cloned_token.cancelled() => {
+                            info!("task is cancelled!");
+                            return;
+                        },
+                        result = reader.readable() => result.unwrap()
+                    };
 
-            *state = State::Opened {
-                callbacks: callbacks.clone(),
-                tasks,
-                serial,
-            };
+                    guard.clear_ready();
+                };
 
-            Ok(())
-        } else {
-            Err(binder::ExceptionCode::ILLEGAL_STATE.into())
-        }
+                // Read the remaining header bytes, if truncated.
+                read_exact(reader.get_mut(), &mut buffer[read_len..]).unwrap();
+
+                let length = buffer[3] as usize + UWB_HEADER_SIZE;
+                buffer.resize(length, 0);
+
+                // Read the payload bytes.
+                read_exact(reader.get_mut(), &mut buffer[UWB_HEADER_SIZE..]).unwrap();
+
+                client_callbacks.onUciMessage(&buffer).unwrap();
+            }
+        });
+
+        callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?;
+
+        *state = State::Opened {
+            callbacks: callbacks.clone(),
+            _handle: join_handle,
+            serial,
+            death_recipient,
+            token,
+        };
+
+        Ok(())
     }
 
     async fn close(&self) -> Result<()> {
@@ -126,10 +213,8 @@
 
         let mut state = self.state.lock().await;
 
-        if let State::Opened { ref callbacks, .. } = *state {
-            callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
-            *state = State::Closed;
-            Ok(())
+        if matches!(*state, State::Opened { .. }) {
+            state.close()
         } else {
             Err(binder::ExceptionCode::ILLEGAL_STATE.into())
         }
@@ -162,7 +247,6 @@
         if let State::Opened { ref mut serial, .. } = &mut *self.state.lock().await {
             serial
                 .write(data)
-                .await
                 .map(|written| written as i32)
                 .map_err(|_| binder::StatusCode::UNKNOWN_ERROR.into())
         } else {