blob: ee4ecdb017caecb7ad9e45aa092c45c194ae7327 [file] [log] [blame]
Alice Wang4e082c32023-07-11 07:41:50 +00001// Copyright 2023, The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Supports for the communication between rialto and host.
16
Alice Wang748b0322023-07-24 12:51:18 +000017use crate::error::Result;
18use ciborium_io::{Read, Write};
19use core::hint::spin_loop;
Alice Wang953a6572023-08-24 13:40:10 +000020use core::mem;
Alice Wang748b0322023-07-24 12:51:18 +000021use core::result;
Alice Wang4e082c32023-07-11 07:41:50 +000022use log::info;
Alice Wang748b0322023-07-24 12:51:18 +000023use service_vm_comm::{Request, Response};
Alice Wang953a6572023-08-24 13:40:10 +000024use tinyvec::ArrayVec;
Alice Wang4e082c32023-07-11 07:41:50 +000025use virtio_drivers::{
26 self,
27 device::socket::{
Alice Wang748b0322023-07-24 12:51:18 +000028 SocketError, VirtIOSocket, VsockAddr, VsockConnectionManager, VsockEventType,
Alice Wang4e082c32023-07-11 07:41:50 +000029 },
30 transport::Transport,
31 Hal,
32};
33
Alice Wang953a6572023-08-24 13:40:10 +000034const WRITE_BUF_CAPACITY: usize = 512;
35
Alice Wang748b0322023-07-24 12:51:18 +000036pub struct VsockStream<H: Hal, T: Transport> {
37 connection_manager: VsockConnectionManager<H, T>,
38 /// Peer address. The same port is used on rialto and peer for convenience.
39 peer_addr: VsockAddr,
Alice Wang953a6572023-08-24 13:40:10 +000040 write_buf: ArrayVec<[u8; WRITE_BUF_CAPACITY]>,
Alice Wang4e082c32023-07-11 07:41:50 +000041}
42
Alice Wang748b0322023-07-24 12:51:18 +000043impl<H: Hal, T: Transport> VsockStream<H, T> {
44 pub fn new(
45 socket_device_driver: VirtIOSocket<H, T>,
46 peer_addr: VsockAddr,
47 ) -> virtio_drivers::Result<Self> {
48 let mut vsock_stream = Self {
49 connection_manager: VsockConnectionManager::new(socket_device_driver),
50 peer_addr,
Alice Wang953a6572023-08-24 13:40:10 +000051 write_buf: ArrayVec::default(),
Alice Wang748b0322023-07-24 12:51:18 +000052 };
53 vsock_stream.connect()?;
54 Ok(vsock_stream)
Alice Wang4e082c32023-07-11 07:41:50 +000055 }
Alice Wang4e082c32023-07-11 07:41:50 +000056
Alice Wang748b0322023-07-24 12:51:18 +000057 fn connect(&mut self) -> virtio_drivers::Result {
58 self.connection_manager.connect(self.peer_addr, self.peer_addr.port)?;
59 self.wait_for_connect()?;
60 info!("Connected to the peer {:?}", self.peer_addr);
Alice Wang4e082c32023-07-11 07:41:50 +000061 Ok(())
62 }
63
Alice Wang748b0322023-07-24 12:51:18 +000064 fn wait_for_connect(&mut self) -> virtio_drivers::Result {
Alice Wang4e082c32023-07-11 07:41:50 +000065 loop {
Alice Wang748b0322023-07-24 12:51:18 +000066 if let Some(event) = self.poll_event_from_peer()? {
67 match event {
68 VsockEventType::Connected => return Ok(()),
69 VsockEventType::Disconnected { .. } => {
70 return Err(SocketError::ConnectionFailed.into())
71 }
72 // We shouldn't receive the following event before the connection is
73 // established.
74 VsockEventType::ConnectionRequest | VsockEventType::Received { .. } => {
75 return Err(SocketError::InvalidOperation.into())
76 }
77 // We can receive credit requests and updates at any time.
78 // This can be ignored as the connection manager handles them in poll().
79 VsockEventType::CreditRequest | VsockEventType::CreditUpdate => {}
Alice Wang4e082c32023-07-11 07:41:50 +000080 }
Alice Wang748b0322023-07-24 12:51:18 +000081 } else {
82 spin_loop();
Alice Wang4e082c32023-07-11 07:41:50 +000083 }
84 }
85 }
86
Alice Wang748b0322023-07-24 12:51:18 +000087 pub fn read_request(&mut self) -> Result<Request> {
88 Ok(ciborium::from_reader(self)?)
89 }
90
91 pub fn write_response(&mut self, response: &Response) -> Result<()> {
92 Ok(ciborium::into_writer(response, self)?)
93 }
94
Alice Wang4e082c32023-07-11 07:41:50 +000095 /// Shuts down the data channel.
Alice Wang748b0322023-07-24 12:51:18 +000096 pub fn shutdown(&mut self) -> virtio_drivers::Result {
97 self.connection_manager.force_close(self.peer_addr, self.peer_addr.port)?;
Alice Wang4e082c32023-07-11 07:41:50 +000098 info!("Connection shutdown.");
99 Ok(())
100 }
Alice Wang748b0322023-07-24 12:51:18 +0000101
102 fn recv(&mut self, buffer: &mut [u8]) -> virtio_drivers::Result<usize> {
103 self.connection_manager.recv(self.peer_addr, self.peer_addr.port, buffer)
104 }
105
106 fn wait_for_send(&mut self, buffer: &[u8]) -> virtio_drivers::Result {
107 const INSUFFICIENT_BUFFER_SPACE_ERROR: virtio_drivers::Error =
108 virtio_drivers::Error::SocketDeviceError(SocketError::InsufficientBufferSpaceInPeer);
109 loop {
110 match self.connection_manager.send(self.peer_addr, self.peer_addr.port, buffer) {
111 Ok(_) => return Ok(()),
112 Err(INSUFFICIENT_BUFFER_SPACE_ERROR) => {
113 self.poll()?;
114 }
115 Err(e) => return Err(e),
116 }
117 }
118 }
119
120 fn wait_for_recv(&mut self) -> virtio_drivers::Result {
121 loop {
122 match self.poll()? {
123 Some(VsockEventType::Received { .. }) => return Ok(()),
124 _ => spin_loop(),
125 }
126 }
127 }
128
129 /// Polls the rx queue after the connection is established with the peer, this function
130 /// rejects some invalid events. The valid events are handled inside the connection
131 /// manager.
132 fn poll(&mut self) -> virtio_drivers::Result<Option<VsockEventType>> {
133 if let Some(event) = self.poll_event_from_peer()? {
134 match event {
135 VsockEventType::Disconnected { .. } => Err(SocketError::ConnectionFailed.into()),
136 VsockEventType::Connected | VsockEventType::ConnectionRequest => {
137 Err(SocketError::InvalidOperation.into())
138 }
139 // When there is a received event, the received data is buffered in the
140 // connection manager's internal receive buffer, so we don't need to do
141 // anything here.
142 // The credit request and updates also handled inside the connection
143 // manager.
144 VsockEventType::Received { .. }
145 | VsockEventType::CreditRequest
146 | VsockEventType::CreditUpdate => Ok(Some(event)),
147 }
148 } else {
149 Ok(None)
150 }
151 }
152
153 fn poll_event_from_peer(&mut self) -> virtio_drivers::Result<Option<VsockEventType>> {
154 Ok(self.connection_manager.poll()?.map(|event| {
155 assert_eq!(event.source, self.peer_addr);
156 assert_eq!(event.destination.port, self.peer_addr.port);
157 event.event_type
158 }))
159 }
160}
161
162impl<H: Hal, T: Transport> Read for VsockStream<H, T> {
163 type Error = virtio_drivers::Error;
164
165 fn read_exact(&mut self, data: &mut [u8]) -> result::Result<(), Self::Error> {
166 let mut start = 0;
167 while start < data.len() {
168 let len = self.recv(&mut data[start..])?;
169 let len = if len == 0 {
170 self.wait_for_recv()?;
171 self.recv(&mut data[start..])?
172 } else {
173 len
174 };
175 start += len;
176 }
177 Ok(())
178 }
179}
180
181impl<H: Hal, T: Transport> Write for VsockStream<H, T> {
182 type Error = virtio_drivers::Error;
183
184 fn write_all(&mut self, data: &[u8]) -> result::Result<(), Self::Error> {
Alice Wang953a6572023-08-24 13:40:10 +0000185 if data.len() >= self.write_buf.capacity() - self.write_buf.len() {
186 self.flush()?;
187 if data.len() >= self.write_buf.capacity() {
188 self.wait_for_send(data)?;
189 return Ok(());
190 }
191 }
192 self.write_buf.extend_from_slice(data);
193 Ok(())
Alice Wang748b0322023-07-24 12:51:18 +0000194 }
195
196 fn flush(&mut self) -> result::Result<(), Self::Error> {
Alice Wang953a6572023-08-24 13:40:10 +0000197 if !self.write_buf.is_empty() {
198 // We need to take the memory from self.write_buf to a temporary
199 // buffer to avoid borrowing `*self` as mutable and immutable on
200 // the same time in `self.wait_for_send(&self.write_buf)`.
201 let buffer = mem::take(&mut self.write_buf);
202 self.wait_for_send(&buffer)?;
203 }
Alice Wang748b0322023-07-24 12:51:18 +0000204 Ok(())
205 }
Alice Wang4e082c32023-07-11 07:41:50 +0000206}