blob: 858ccfb77a626079fa5b7d6abca8a13c2cf279d1 [file] [log] [blame]
Alice Wang4e082c32023-07-11 07:41:50 +00001// Copyright 2023, The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Supports for the communication between rialto and host.
16
Alice Wang748b0322023-07-24 12:51:18 +000017use crate::error::Result;
18use ciborium_io::{Read, Write};
19use core::hint::spin_loop;
20use core::result;
Alice Wang4e082c32023-07-11 07:41:50 +000021use log::info;
Alice Wang748b0322023-07-24 12:51:18 +000022use service_vm_comm::{Request, Response};
Alice Wang4e082c32023-07-11 07:41:50 +000023use virtio_drivers::{
24 self,
25 device::socket::{
Alice Wang748b0322023-07-24 12:51:18 +000026 SocketError, VirtIOSocket, VsockAddr, VsockConnectionManager, VsockEventType,
Alice Wang4e082c32023-07-11 07:41:50 +000027 },
28 transport::Transport,
29 Hal,
30};
31
Alice Wang748b0322023-07-24 12:51:18 +000032pub struct VsockStream<H: Hal, T: Transport> {
33 connection_manager: VsockConnectionManager<H, T>,
34 /// Peer address. The same port is used on rialto and peer for convenience.
35 peer_addr: VsockAddr,
Alice Wang4e082c32023-07-11 07:41:50 +000036}
37
Alice Wang748b0322023-07-24 12:51:18 +000038impl<H: Hal, T: Transport> VsockStream<H, T> {
39 pub fn new(
40 socket_device_driver: VirtIOSocket<H, T>,
41 peer_addr: VsockAddr,
42 ) -> virtio_drivers::Result<Self> {
43 let mut vsock_stream = Self {
44 connection_manager: VsockConnectionManager::new(socket_device_driver),
45 peer_addr,
46 };
47 vsock_stream.connect()?;
48 Ok(vsock_stream)
Alice Wang4e082c32023-07-11 07:41:50 +000049 }
Alice Wang4e082c32023-07-11 07:41:50 +000050
Alice Wang748b0322023-07-24 12:51:18 +000051 fn connect(&mut self) -> virtio_drivers::Result {
52 self.connection_manager.connect(self.peer_addr, self.peer_addr.port)?;
53 self.wait_for_connect()?;
54 info!("Connected to the peer {:?}", self.peer_addr);
Alice Wang4e082c32023-07-11 07:41:50 +000055 Ok(())
56 }
57
Alice Wang748b0322023-07-24 12:51:18 +000058 fn wait_for_connect(&mut self) -> virtio_drivers::Result {
Alice Wang4e082c32023-07-11 07:41:50 +000059 loop {
Alice Wang748b0322023-07-24 12:51:18 +000060 if let Some(event) = self.poll_event_from_peer()? {
61 match event {
62 VsockEventType::Connected => return Ok(()),
63 VsockEventType::Disconnected { .. } => {
64 return Err(SocketError::ConnectionFailed.into())
65 }
66 // We shouldn't receive the following event before the connection is
67 // established.
68 VsockEventType::ConnectionRequest | VsockEventType::Received { .. } => {
69 return Err(SocketError::InvalidOperation.into())
70 }
71 // We can receive credit requests and updates at any time.
72 // This can be ignored as the connection manager handles them in poll().
73 VsockEventType::CreditRequest | VsockEventType::CreditUpdate => {}
Alice Wang4e082c32023-07-11 07:41:50 +000074 }
Alice Wang748b0322023-07-24 12:51:18 +000075 } else {
76 spin_loop();
Alice Wang4e082c32023-07-11 07:41:50 +000077 }
78 }
79 }
80
Alice Wang748b0322023-07-24 12:51:18 +000081 pub fn read_request(&mut self) -> Result<Request> {
82 Ok(ciborium::from_reader(self)?)
83 }
84
85 pub fn write_response(&mut self, response: &Response) -> Result<()> {
86 Ok(ciborium::into_writer(response, self)?)
87 }
88
Alice Wang4e082c32023-07-11 07:41:50 +000089 /// Shuts down the data channel.
Alice Wang748b0322023-07-24 12:51:18 +000090 pub fn shutdown(&mut self) -> virtio_drivers::Result {
91 self.connection_manager.force_close(self.peer_addr, self.peer_addr.port)?;
Alice Wang4e082c32023-07-11 07:41:50 +000092 info!("Connection shutdown.");
93 Ok(())
94 }
Alice Wang748b0322023-07-24 12:51:18 +000095
96 fn recv(&mut self, buffer: &mut [u8]) -> virtio_drivers::Result<usize> {
97 self.connection_manager.recv(self.peer_addr, self.peer_addr.port, buffer)
98 }
99
100 fn wait_for_send(&mut self, buffer: &[u8]) -> virtio_drivers::Result {
101 const INSUFFICIENT_BUFFER_SPACE_ERROR: virtio_drivers::Error =
102 virtio_drivers::Error::SocketDeviceError(SocketError::InsufficientBufferSpaceInPeer);
103 loop {
104 match self.connection_manager.send(self.peer_addr, self.peer_addr.port, buffer) {
105 Ok(_) => return Ok(()),
106 Err(INSUFFICIENT_BUFFER_SPACE_ERROR) => {
107 self.poll()?;
108 }
109 Err(e) => return Err(e),
110 }
111 }
112 }
113
114 fn wait_for_recv(&mut self) -> virtio_drivers::Result {
115 loop {
116 match self.poll()? {
117 Some(VsockEventType::Received { .. }) => return Ok(()),
118 _ => spin_loop(),
119 }
120 }
121 }
122
123 /// Polls the rx queue after the connection is established with the peer, this function
124 /// rejects some invalid events. The valid events are handled inside the connection
125 /// manager.
126 fn poll(&mut self) -> virtio_drivers::Result<Option<VsockEventType>> {
127 if let Some(event) = self.poll_event_from_peer()? {
128 match event {
129 VsockEventType::Disconnected { .. } => Err(SocketError::ConnectionFailed.into()),
130 VsockEventType::Connected | VsockEventType::ConnectionRequest => {
131 Err(SocketError::InvalidOperation.into())
132 }
133 // When there is a received event, the received data is buffered in the
134 // connection manager's internal receive buffer, so we don't need to do
135 // anything here.
136 // The credit request and updates also handled inside the connection
137 // manager.
138 VsockEventType::Received { .. }
139 | VsockEventType::CreditRequest
140 | VsockEventType::CreditUpdate => Ok(Some(event)),
141 }
142 } else {
143 Ok(None)
144 }
145 }
146
147 fn poll_event_from_peer(&mut self) -> virtio_drivers::Result<Option<VsockEventType>> {
148 Ok(self.connection_manager.poll()?.map(|event| {
149 assert_eq!(event.source, self.peer_addr);
150 assert_eq!(event.destination.port, self.peer_addr.port);
151 event.event_type
152 }))
153 }
154}
155
156impl<H: Hal, T: Transport> Read for VsockStream<H, T> {
157 type Error = virtio_drivers::Error;
158
159 fn read_exact(&mut self, data: &mut [u8]) -> result::Result<(), Self::Error> {
160 let mut start = 0;
161 while start < data.len() {
162 let len = self.recv(&mut data[start..])?;
163 let len = if len == 0 {
164 self.wait_for_recv()?;
165 self.recv(&mut data[start..])?
166 } else {
167 len
168 };
169 start += len;
170 }
171 Ok(())
172 }
173}
174
175impl<H: Hal, T: Transport> Write for VsockStream<H, T> {
176 type Error = virtio_drivers::Error;
177
178 fn write_all(&mut self, data: &[u8]) -> result::Result<(), Self::Error> {
179 self.wait_for_send(data)
180 }
181
182 fn flush(&mut self) -> result::Result<(), Self::Error> {
183 // TODO(b/293411448): Optimize the data sending by saving the data to write
184 // in a local buffer and then flushing only when the buffer is full.
185 Ok(())
186 }
Alice Wang4e082c32023-07-11 07:41:50 +0000187}