[UWB HAL] Use AsyncFd to read the buffer only when it's readable.
This CL fixes the issue that some bytes are missing after we toggle uwb
off and on.
Test: atest CtsUwbTestCases
Bug: 302005209
Change-Id: I8ad8072fe01c8d1f466d8debf0ccf5831b3eddf8
diff --git a/uwb/aidl/default/Android.bp b/uwb/aidl/default/Android.bp
index c6d1a52..2b7ef57 100644
--- a/uwb/aidl/default/Android.bp
+++ b/uwb/aidl/default/Android.bp
@@ -20,6 +20,7 @@
"libbinder_rs",
"libbinder_tokio_rs",
"libtokio",
+ "libtokio_util",
"libnix",
"libanyhow",
],
diff --git a/uwb/aidl/default/src/uwb_chip.rs b/uwb/aidl/default/src/uwb_chip.rs
index 9587efb..b63aabe 100644
--- a/uwb/aidl/default/src/uwb_chip.rs
+++ b/uwb/aidl/default/src/uwb_chip.rs
@@ -4,32 +4,34 @@
};
use android_hardware_uwb::binder;
use async_trait::async_trait;
-use binder::{Result, Strong};
+use binder::{DeathRecipient, IBinder, Result, Strong};
-use tokio::fs::{File, OpenOptions};
-use tokio::io::{AsyncReadExt, AsyncWriteExt};
+use log::info;
+use std::sync::Arc;
+use tokio::io::unix::AsyncFd;
+use tokio::select;
use tokio::sync::Mutex;
+use tokio_util::sync::CancellationToken;
+use std::fs::{File, OpenOptions};
+use std::io::{self, Read, Write};
use std::os::fd::AsRawFd;
-use std::io;
-
-use nix::sys::termios;
-
enum State {
Closed,
Opened {
callbacks: Strong<dyn IUwbClientCallback>,
- #[allow(dead_code)]
- tasks: tokio::task::JoinSet<()>,
+ _handle: tokio::task::JoinHandle<()>,
serial: File,
+ death_recipient: DeathRecipient,
+ token: CancellationToken,
},
}
pub struct UwbChip {
name: String,
path: String,
- state: Mutex<State>,
+ state: Arc<Mutex<State>>,
}
impl UwbChip {
@@ -37,23 +39,59 @@
Self {
name,
path,
- state: Mutex::new(State::Closed),
+ state: Arc::new(Mutex::new(State::Closed)),
}
}
}
+impl State {
+ /// Terminate the reader task.
+ #[allow(dead_code)]
+ fn close(&mut self) -> Result<()> {
+ if let State::Opened { ref mut token, ref callbacks, ref mut death_recipient, .. } = *self {
+ log::info!("waiting for task cancellation");
+ callbacks.as_binder().unlink_to_death(death_recipient)?;
+ token.cancel();
+ log::info!("task successfully cancelled");
+ callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
+ *self = State::Closed;
+ }
+ Ok(())
+ }
+}
+
pub fn makeraw(file: File) -> io::Result<File> {
let fd = file.as_raw_fd();
- let mut attrs = termios::tcgetattr(fd)?;
+ // Configure the file descritpro as raw fd.
+ use nix::sys::termios::*;
+ let mut attrs = tcgetattr(fd)?;
+ cfmakeraw(&mut attrs);
+ tcsetattr(fd, SetArg::TCSANOW, &attrs)?;
- termios::cfmakeraw(&mut attrs);
-
- termios::tcsetattr(fd, termios::SetArg::TCSANOW, &attrs)?;
+ // Configure the file descriptor as non blocking.
+ use nix::fcntl::*;
+ let flags = OFlag::from_bits(fcntl(fd, FcntlArg::F_GETFL)?).unwrap();
+ fcntl(fd, FcntlArg::F_SETFL(flags | OFlag::O_NONBLOCK))?;
Ok(file)
}
+/// Wrapper around Read::read to handle EWOULDBLOCK.
+/// /!\ will actively wait for more data, make sure to call
+/// this method only when data is immediately expected.
+fn read_exact(file: &mut File, mut buf: &mut [u8]) -> io::Result<()> {
+ while buf.len() > 0 {
+ match file.read(buf) {
+ Ok(0) => panic!("unexpectedly reached end of file"),
+ Ok(read_len) => buf = &mut buf[read_len..],
+ Err(err) if err.kind() == io::ErrorKind::WouldBlock => continue,
+ Err(err) => return Err(err),
+ }
+ }
+ Ok(())
+}
+
impl binder::Interface for UwbChip {}
#[async_trait]
@@ -65,60 +103,109 @@
async fn open(&self, callbacks: &Strong<dyn IUwbClientCallback>) -> Result<()> {
log::debug!("open: {:?}", &self.path);
+ let mut state = self.state.lock().await;
+
+ if matches!(*state, State::Opened { .. }) {
+ log::error!("the state is already opened");
+ return Err(binder::ExceptionCode::ILLEGAL_STATE.into());
+ }
+
let serial = OpenOptions::new()
.read(true)
.write(true)
.create(false)
.open(&self.path)
- .await
.and_then(makeraw)
.map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
- let mut state = self.state.lock().await;
+ let state_death_recipient = self.state.clone();
+ let mut death_recipient = DeathRecipient::new(move || {
+ let mut state = state_death_recipient.blocking_lock();
+ log::info!("Uwb service has died");
+ state.close().unwrap();
+ });
- if let State::Closed = *state {
- let client_callbacks = callbacks.clone();
+ callbacks.as_binder().link_to_death(&mut death_recipient)?;
- let mut tasks = tokio::task::JoinSet::new();
- let mut reader = serial
- .try_clone()
- .await
- .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
+ let token = CancellationToken::new();
+ let cloned_token = token.clone();
- tasks.spawn(async move {
- loop {
- const UWB_HEADER_SIZE: usize = 4;
+ let client_callbacks = callbacks.clone();
- let mut buffer = vec![0; UWB_HEADER_SIZE];
- reader
- .read_exact(&mut buffer[0..UWB_HEADER_SIZE])
- .await
- .unwrap();
+ let reader = serial
+ .try_clone()
+ .map_err(|_| binder::StatusCode::UNKNOWN_ERROR)?;
- let length = buffer[3] as usize + UWB_HEADER_SIZE;
+ let join_handle = tokio::task::spawn(async move {
+ info!("UCI reader task started");
+ let mut reader = AsyncFd::new(reader).unwrap();
- buffer.resize(length, 0);
- reader
- .read_exact(&mut buffer[UWB_HEADER_SIZE..length])
- .await
- .unwrap();
+ loop {
+ const UWB_HEADER_SIZE: usize = 4;
+ let mut buffer = vec![0; UWB_HEADER_SIZE];
- client_callbacks.onUciMessage(&buffer[..]).unwrap();
- }
- });
+ // The only time where the task can be safely
+ // cancelled is when no packet bytes have been read.
+ //
+ // - read_exact() cannot be used here since it is not
+ // cancellation safe.
+ // - read() cannot be used because it cannot be cancelled:
+ // the syscall is executed blocking on the threadpool
+ // and completes after termination of the task when
+ // the pipe receives more data.
+ let read_len = loop {
+ // On some platforms, the readiness detecting mechanism
+ // relies on edge-triggered notifications. This means that
+ // the OS will only notify Tokio when the file descriptor
+ // transitions from not-ready to ready. For this to work
+ // you should first try to read or write and only poll for
+ // readiness if that fails with an error of
+ // std::io::ErrorKind::WouldBlock.
+ match reader.get_mut().read(&mut buffer) {
+ Ok(0) => {
+ log::error!("file unexpectedly closed");
+ return;
+ }
+ Ok(read_len) => break read_len,
+ Err(err) if err.kind() == io::ErrorKind::WouldBlock => (),
+ Err(_) => panic!("unexpected read failure"),
+ }
- callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?;
+ let mut guard = select! {
+ _ = cloned_token.cancelled() => {
+ info!("task is cancelled!");
+ return;
+ },
+ result = reader.readable() => result.unwrap()
+ };
- *state = State::Opened {
- callbacks: callbacks.clone(),
- tasks,
- serial,
- };
+ guard.clear_ready();
+ };
- Ok(())
- } else {
- Err(binder::ExceptionCode::ILLEGAL_STATE.into())
- }
+ // Read the remaining header bytes, if truncated.
+ read_exact(reader.get_mut(), &mut buffer[read_len..]).unwrap();
+
+ let length = buffer[3] as usize + UWB_HEADER_SIZE;
+ buffer.resize(length, 0);
+
+ // Read the payload bytes.
+ read_exact(reader.get_mut(), &mut buffer[UWB_HEADER_SIZE..]).unwrap();
+
+ client_callbacks.onUciMessage(&buffer).unwrap();
+ }
+ });
+
+ callbacks.onHalEvent(UwbEvent::OPEN_CPLT, UwbStatus::OK)?;
+
+ *state = State::Opened {
+ callbacks: callbacks.clone(),
+ _handle: join_handle,
+ serial,
+ death_recipient,
+ token,
+ };
+
+ Ok(())
}
async fn close(&self) -> Result<()> {
@@ -126,10 +213,8 @@
let mut state = self.state.lock().await;
- if let State::Opened { ref callbacks, .. } = *state {
- callbacks.onHalEvent(UwbEvent::CLOSE_CPLT, UwbStatus::OK)?;
- *state = State::Closed;
- Ok(())
+ if matches!(*state, State::Opened { .. }) {
+ state.close()
} else {
Err(binder::ExceptionCode::ILLEGAL_STATE.into())
}
@@ -162,7 +247,6 @@
if let State::Opened { ref mut serial, .. } = &mut *self.state.lock().await {
serial
.write(data)
- .await
.map(|written| written as i32)
.map_err(|_| binder::StatusCode::UNKNOWN_ERROR.into())
} else {