[rialto][vsock] Buffer the write from rialto to the host

This cl implements a write buffer for transmitting data from
rialto to the host to improve the communication efficiency.

Test: atest rialto_test
Bug: 293411448
Change-Id: I3e81dcb6c336f4279d25b168d0841ffe2f3265ec
diff --git a/rialto/Android.bp b/rialto/Android.bp
index 33c24eb..55423ea 100644
--- a/rialto/Android.bp
+++ b/rialto/Android.bp
@@ -16,6 +16,7 @@
         "liblibfdt",
         "liblog_rust_nostd",
         "libservice_vm_comm_nostd",
+        "libtinyvec_nostd",
         "libvirtio_drivers",
         "libvmbase",
     ],
diff --git a/rialto/src/communication.rs b/rialto/src/communication.rs
index 858ccfb..ee4ecdb 100644
--- a/rialto/src/communication.rs
+++ b/rialto/src/communication.rs
@@ -17,9 +17,11 @@
 use crate::error::Result;
 use ciborium_io::{Read, Write};
 use core::hint::spin_loop;
+use core::mem;
 use core::result;
 use log::info;
 use service_vm_comm::{Request, Response};
+use tinyvec::ArrayVec;
 use virtio_drivers::{
     self,
     device::socket::{
@@ -29,10 +31,13 @@
     Hal,
 };
 
+const WRITE_BUF_CAPACITY: usize = 512;
+
 pub struct VsockStream<H: Hal, T: Transport> {
     connection_manager: VsockConnectionManager<H, T>,
     /// Peer address. The same port is used on rialto and peer for convenience.
     peer_addr: VsockAddr,
+    write_buf: ArrayVec<[u8; WRITE_BUF_CAPACITY]>,
 }
 
 impl<H: Hal, T: Transport> VsockStream<H, T> {
@@ -43,6 +48,7 @@
         let mut vsock_stream = Self {
             connection_manager: VsockConnectionManager::new(socket_device_driver),
             peer_addr,
+            write_buf: ArrayVec::default(),
         };
         vsock_stream.connect()?;
         Ok(vsock_stream)
@@ -176,12 +182,25 @@
     type Error = virtio_drivers::Error;
 
     fn write_all(&mut self, data: &[u8]) -> result::Result<(), Self::Error> {
-        self.wait_for_send(data)
+        if data.len() >= self.write_buf.capacity() - self.write_buf.len() {
+            self.flush()?;
+            if data.len() >= self.write_buf.capacity() {
+                self.wait_for_send(data)?;
+                return Ok(());
+            }
+        }
+        self.write_buf.extend_from_slice(data);
+        Ok(())
     }
 
     fn flush(&mut self) -> result::Result<(), Self::Error> {
-        // TODO(b/293411448): Optimize the data sending by saving the data to write
-        // in a local buffer and then flushing only when the buffer is full.
+        if !self.write_buf.is_empty() {
+            // We need to take the memory from self.write_buf to a temporary
+            // buffer to avoid borrowing `*self` as mutable and immutable on
+            // the same time in `self.wait_for_send(&self.write_buf)`.
+            let buffer = mem::take(&mut self.write_buf);
+            self.wait_for_send(&buffer)?;
+        }
         Ok(())
     }
 }
diff --git a/rialto/src/main.rs b/rialto/src/main.rs
index a8338ca..42d39c4 100644
--- a/rialto/src/main.rs
+++ b/rialto/src/main.rs
@@ -26,6 +26,7 @@
 
 use crate::communication::VsockStream;
 use crate::error::{Error, Result};
+use ciborium_io::Write;
 use core::num::NonZeroUsize;
 use core::slice;
 use fdtpci::PciInfo;
@@ -141,6 +142,7 @@
     let mut vsock_stream = VsockStream::new(socket_device, host_addr())?;
     let response = requests::process_request(vsock_stream.read_request()?);
     vsock_stream.write_response(&response)?;
+    vsock_stream.flush()?;
     vsock_stream.shutdown()?;
 
     Ok(())