Make forwarder_host buildable
Bug: 340126051
Test: m forwarder_host
Change-Id: I8c6da4602147ce329f73270559cdfa999d9f3351
diff --git a/android/forwarder_host/Android.bp b/android/forwarder_host/Android.bp
new file mode 100644
index 0000000..35c478e
--- /dev/null
+++ b/android/forwarder_host/Android.bp
@@ -0,0 +1,21 @@
+package {
+ default_applicable_licenses: ["Android-Apache-2.0"],
+}
+
+rust_binary {
+ name: "forwarder_host",
+ edition: "2021",
+ srcs: ["src/main.rs"],
+ rustlibs: [
+ "libforwarder",
+ "liblog_rust",
+ "libnix",
+ "libvmm_sys_util",
+ "libvsock",
+ ],
+ proc_macros: [
+ "libpoll_token_derive",
+ "libremain",
+ ],
+ static_executable: true,
+}
diff --git a/android/forwarder_host/src/main.rs b/android/forwarder_host/src/main.rs
index 40edc84..b95b2cc 100644
--- a/android/forwarder_host/src/main.rs
+++ b/android/forwarder_host/src/main.rs
@@ -15,62 +15,27 @@
// Copied from ChromiumOS with relicensing:
// src/platform2/vm_tools/chunnel/src/bin/chunneld.rs
+//! Host-side stream socket forwarder
+
use std::collections::btree_map::Entry as BTreeMapEntry;
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use std::fmt;
-use std::fs::File;
use std::io;
use std::net::{Ipv4Addr, Ipv6Addr, TcpListener};
-use std::os::fd::OwnedFd;
-use std::os::raw::c_int;
use std::os::unix::io::AsRawFd;
use std::result;
use std::sync::{Arc, Mutex};
-use std::thread;
use std::time::Duration;
-use chunnel::forwarder::ForwarderSession;
-use dbus::blocking::LocalConnection as DBusConnection;
-use dbus::{self, Error as DBusError};
-use libchromeos::deprecated::{EventFd, PollContext, PollToken};
-use libchromeos::panic_handler::install_memfd_handler;
-use libchromeos::pipe;
-use libchromeos::signal::block_signal;
-use libchromeos::syslog;
+use forwarder::forwarder::ForwarderSession;
use log::{error, warn};
-use nix::sys::signal::Signal;
-use protobuf::{self, Message as ProtoMessage};
-use system_api::chunneld_service::*;
-use system_api::cicerone_service;
+use nix::sys::eventfd::EventFd;
+use poll_token_derive::PollToken;
+use vmm_sys_util::poll::{PollContext, PollToken};
use vsock::VsockListener;
use vsock::VMADDR_CID_ANY;
-// chunnel dbus-constants.h
-const CHUNNELD_INTERFACE: &str = "org.chromium.Chunneld";
-const CHUNNELD_SERVICE_PATH: &str = "/org/chromium/Chunneld";
-const CHUNNELD_SERVICE_NAME: &str = "org.chromium.Chunneld";
-
-// cicerone dbus-constants.h
-const VM_CICERONE_INTERFACE: &str = "org.chromium.VmCicerone";
-const VM_CICERONE_SERVICE_PATH: &str = "/org/chromium/VmCicerone";
-const VM_CICERONE_SERVICE_NAME: &str = "org.chromium.VmCicerone";
-const CONNECT_CHUNNEL_METHOD: &str = "ConnectChunnel";
-
-// permission_broker dbus-constants.h
-const PERMISSION_BROKER_INTERFACE: &str = "org.chromium.PermissionBroker";
-const PERMISSION_BROKER_SERVICE_PATH: &str = "/org/chromium/PermissionBroker";
-const PERMISSION_BROKER_SERVICE_NAME: &str = "org.chromium.PermissionBroker";
-const REQUEST_LOOPBACK_TCP_PORT_LOCKDOWN_METHOD: &str = "RequestLoopbackTcpPortLockdown";
-const RELEASE_LOOPBACK_TCP_PORT_METHOD: &str = "ReleaseLoopbackTcpPort";
-
-// chunneld dbus-constants.h
-const UPDATE_LISTENING_PORTS_METHOD: &str = "UpdateListeningPorts";
-
const CHUNNEL_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
-const DBUS_TIMEOUT: Duration = Duration::from_secs(30);
-
-// Program name.
-const IDENT: &str = "chunneld";
const VMADDR_PORT_ANY: u32 = u32::MAX;
@@ -78,32 +43,19 @@
#[derive(Debug)]
enum Error {
BindVsock(io::Error),
- BlockSigpipe(nix::Error),
- ConnectChunnelFailure(String),
- CreateProtobusService(dbus::Error),
- DBusGetSystemBus(DBusError),
- DBusMessageSend(DBusError),
- DBusProcessMessage(DBusError),
- EventFdClone(io::Error),
EventFdNew(nix::Error),
IncorrectCid(u32),
- LifelinePipe(nix::Error),
NoListenerForPort(u16),
NoSessionForTag(SessionTag),
- PollContextAdd(nix::Error),
- PollContextDelete(nix::Error),
- PollContextNew(nix::Error),
- PollWait(nix::Error),
- ProtobufDeserialize(protobuf::Error),
- ProtobufSerialize(protobuf::Error),
+ PollContextAdd(vmm_sys_util::errno::Error),
+ PollContextDelete(vmm_sys_util::errno::Error),
+ PollContextNew(vmm_sys_util::errno::Error),
+ PollWait(vmm_sys_util::errno::Error),
SetVsockNonblocking(io::Error),
- Syslog(syslog::Error),
TcpAccept(io::Error),
- TcpListenerPort(io::Error),
UpdateEventRead(nix::Error),
VsockAccept(io::Error),
VsockAcceptTimeout,
- VsockListenerPort(io::Error),
}
type Result<T> = result::Result<T, Error>;
@@ -116,34 +68,19 @@
#[remain::sorted]
match self {
BindVsock(e) => write!(f, "failed to bind vsock: {}", e),
- BlockSigpipe(e) => write!(f, "failed to block SIGPIPE: {}", e),
- ConnectChunnelFailure(e) => write!(f, "failed to connect chunnel: {}", e),
- CreateProtobusService(e) => write!(f, "failed to create D-Bus service: {}", e),
- DBusGetSystemBus(e) => write!(f, "failed to get D-Bus system bus: {}", e),
- DBusMessageSend(e) => write!(f, "failed to send D-Bus message: {}", e),
- DBusProcessMessage(e) => write!(f, "failed to process D-Bus message: {}", e),
- EventFdClone(e) => write!(f, "failed to clone eventfd: {}", e),
EventFdNew(e) => write!(f, "failed to create eventfd: {}", e),
IncorrectCid(cid) => write!(f, "chunnel connection from unexpected cid {}", cid),
- LifelinePipe(e) => write!(f, "failed to create firewall lifeline pipe {}", e),
NoListenerForPort(port) => write!(f, "could not find listener for port: {}", port),
NoSessionForTag(tag) => write!(f, "could not find session for tag: {:x}", tag),
PollContextAdd(e) => write!(f, "failed to add fd to poll context: {}", e),
PollContextDelete(e) => write!(f, "failed to delete fd from poll context: {}", e),
PollContextNew(e) => write!(f, "failed to create poll context: {}", e),
PollWait(e) => write!(f, "failed to wait for poll: {}", e),
- ProtobufDeserialize(e) => write!(f, "failed to deserialize protobuf: {}", e),
- ProtobufSerialize(e) => write!(f, "failed to serialize protobuf: {}", e),
SetVsockNonblocking(e) => write!(f, "failed to set vsock to nonblocking: {}", e),
- Syslog(e) => write!(f, "failed to initialize syslog: {}", e),
TcpAccept(e) => write!(f, "failed to accept tcp: {}", e),
- TcpListenerPort(e) => {
- write!(f, "failed to read local sockaddr for tcp listener: {}", e)
- }
UpdateEventRead(e) => write!(f, "failed to read update eventfd: {}", e),
VsockAccept(e) => write!(f, "failed to accept vsock: {}", e),
VsockAcceptTimeout => write!(f, "timed out waiting for vsock connection"),
- VsockListenerPort(e) => write!(f, "failed to get vsock listener port: {}", e),
}
}
}
@@ -151,9 +88,6 @@
/// A TCP forwarding target. Uniquely identifies a listening port in a given container.
struct TcpForwardTarget {
pub port: u16,
- pub vm_name: String,
- pub container_name: String,
- pub owner_id: String,
pub vsock_cid: u32,
}
@@ -177,7 +111,6 @@
tcp4_listener: TcpListener,
tcp6_listener: TcpListener,
forward_target: TcpForwardTarget,
- _firewall_lifeline: File,
}
/// SocketFamily specifies whether a socket uses IPv4 or IPv6.
@@ -192,7 +125,6 @@
tcp4_forwarders: HashMap<SessionTag, ForwarderSession>,
update_evt: EventFd,
update_queue: Arc<Mutex<VecDeque<TcpForwardTarget>>>,
- dbus_conn: DBusConnection,
}
impl ForwarderSessions {
@@ -206,7 +138,6 @@
tcp4_forwarders: HashMap::new(),
update_evt,
update_queue,
- dbus_conn: DBusConnection::new_system().map_err(Error::DBusGetSystemBus)?,
})
}
@@ -224,26 +155,6 @@
continue;
}
if let BTreeMapEntry::Vacant(o) = self.listening_ports.entry(port) {
- // Lock down the port to allow only Chrome to connect to it.
- let (firewall_lifeline, dbus_fd) = pipe(true).map_err(Error::LifelinePipe)?;
- let (allowed,): (bool,) = self
- .dbus_conn
- .with_proxy(
- PERMISSION_BROKER_SERVICE_NAME,
- PERMISSION_BROKER_SERVICE_PATH,
- DBUS_TIMEOUT,
- )
- .method_call(
- PERMISSION_BROKER_INTERFACE,
- REQUEST_LOOPBACK_TCP_PORT_LOCKDOWN_METHOD,
- (port, OwnedFd::from(dbus_fd)),
- )
- .map_err(Error::DBusMessageSend)?;
- if !allowed {
- warn!("failed to lock down loopback TCP port {}", port);
- continue;
- }
-
// Failing to bind a port is not fatal, but we should log it.
// Both IPv4 and IPv6 localhost must be bound since the host may resolve
// "localhost" to either.
@@ -262,17 +173,12 @@
}
};
poll_ctx
- .add_many(&[
- (&tcp4_listener, Token::Ipv4Listener(port)),
- (&tcp6_listener, Token::Ipv6Listener(port)),
- ])
+ .add(&tcp4_listener, Token::Ipv4Listener(port))
.map_err(Error::PollContextAdd)?;
- o.insert(PortListeners {
- tcp4_listener,
- tcp6_listener,
- forward_target: target,
- _firewall_lifeline: firewall_lifeline,
- });
+ poll_ctx
+ .add(&tcp6_listener, Token::Ipv6Listener(port))
+ .map_err(Error::PollContextAdd)?;
+ o.insert(PortListeners { tcp4_listener, tcp6_listener, forward_target: target });
}
active_ports.insert(port);
}
@@ -285,23 +191,6 @@
// Remove the PortListeners struct first - on error we want to drop it and the
// fds it contains.
let _listening_port = self.listening_ports.remove(port);
- // Release the locked down port.
- let (allowed,): (bool,) = self
- .dbus_conn
- .with_proxy(
- PERMISSION_BROKER_SERVICE_NAME,
- PERMISSION_BROKER_SERVICE_PATH,
- DBUS_TIMEOUT,
- )
- .method_call(
- PERMISSION_BROKER_INTERFACE,
- RELEASE_LOOPBACK_TCP_PORT_METHOD,
- (port,),
- )
- .map_err(Error::DBusMessageSend)?;
- if !allowed {
- warn!("failed to release loopback TCP port {}", port);
- }
}
}
@@ -317,10 +206,8 @@
port: u16,
sock_family: SocketFamily,
) -> Result<()> {
- let port_listeners = self
- .listening_ports
- .get(&port)
- .ok_or(Error::NoListenerForPort(port))?;
+ let port_listeners =
+ self.listening_ports.get(&port).ok_or(Error::NoListenerForPort(port))?;
let listener = match sock_family {
SocketFamily::Ipv4 => &port_listeners.tcp4_listener,
@@ -330,19 +217,15 @@
// This session should be dropped if any of the PollContext setup fails. Since the only
// extant fds for the underlying sockets will be closed, they will be unregistered from
// epoll set automatically.
- let session = create_forwarder_session(
- &mut self.dbus_conn,
- listener,
- &port_listeners.forward_target,
- )?;
+ let session = create_forwarder_session(listener, &port_listeners.forward_target)?;
let tag = session.local_stream().as_raw_fd() as u32;
poll_ctx
- .add_many(&[
- (session.local_stream(), Token::LocalSocket(tag)),
- (session.remote_stream(), Token::RemoteSocket(tag)),
- ])
+ .add(session.local_stream(), Token::LocalSocket(tag))
+ .map_err(Error::PollContextAdd)?;
+ poll_ctx
+ .add(session.remote_stream(), Token::RemoteSocket(tag))
.map_err(Error::PollContextAdd)?;
self.tcp4_forwarders.insert(tag, session);
@@ -351,15 +234,10 @@
}
fn forward_from_local(&mut self, poll_ctx: &PollContext<Token>, tag: SessionTag) -> Result<()> {
- let session = self
- .tcp4_forwarders
- .get_mut(&tag)
- .ok_or(Error::NoSessionForTag(tag))?;
+ let session = self.tcp4_forwarders.get_mut(&tag).ok_or(Error::NoSessionForTag(tag))?;
let shutdown = session.forward_from_local().unwrap_or(true);
if shutdown {
- poll_ctx
- .delete(session.local_stream())
- .map_err(Error::PollContextDelete)?;
+ poll_ctx.delete(session.local_stream()).map_err(Error::PollContextDelete)?;
if session.is_shut_down() {
self.tcp4_forwarders.remove(&tag);
}
@@ -373,15 +251,10 @@
poll_ctx: &PollContext<Token>,
tag: SessionTag,
) -> Result<()> {
- let session = self
- .tcp4_forwarders
- .get_mut(&tag)
- .ok_or(Error::NoSessionForTag(tag))?;
+ let session = self.tcp4_forwarders.get_mut(&tag).ok_or(Error::NoSessionForTag(tag))?;
let shutdown = session.forward_from_remote().unwrap_or(true);
if shutdown {
- poll_ctx
- .delete(session.remote_stream())
- .map_err(Error::PollContextDelete)?;
+ poll_ctx.delete(session.remote_stream()).map_err(Error::PollContextDelete)?;
if session.is_shut_down() {
self.tcp4_forwarders.remove(&tag);
}
@@ -391,9 +264,8 @@
}
fn run(&mut self) -> Result<()> {
- let poll_ctx: PollContext<Token> =
- PollContext::build_with(&[(&self.update_evt, Token::UpdatePorts)])
- .map_err(Error::PollContextNew)?;
+ let poll_ctx: PollContext<Token> = PollContext::new().map_err(Error::PollContextNew)?;
+ poll_ctx.add(&self.update_evt, Token::UpdatePorts).map_err(Error::PollContextAdd)?;
loop {
let events = poll_ctx.wait().map_err(Error::PollWait)?;
@@ -433,44 +305,8 @@
}
}
-/// Sends a D-Bus request to launch chunnel in the target container.
-fn launch_chunnel(
- dbus_conn: &mut DBusConnection,
- vsock_port: u32,
- tcp4_port: u16,
- target: &TcpForwardTarget,
-) -> Result<()> {
- let mut request = cicerone_service::ConnectChunnelRequest::new();
- request.vm_name = target.vm_name.to_owned();
- request.container_name = target.container_name.to_owned();
- request.owner_id = target.owner_id.to_owned();
- request.chunneld_port = vsock_port;
- request.target_tcp4_port = u32::from(tcp4_port);
-
- let (raw_buffer,): (Vec<u8>,) = dbus_conn
- .with_proxy(
- VM_CICERONE_SERVICE_NAME,
- VM_CICERONE_SERVICE_PATH,
- DBUS_TIMEOUT,
- )
- .method_call(
- VM_CICERONE_INTERFACE,
- CONNECT_CHUNNEL_METHOD,
- (request.write_to_bytes().map_err(Error::ProtobufSerialize)?,),
- )
- .map_err(Error::DBusMessageSend)?;
- let response: cicerone_service::ConnectChunnelResponse =
- ProtoMessage::parse_from_bytes(&raw_buffer).map_err(Error::ProtobufDeserialize)?;
-
- match response.status.enum_value() {
- Ok(cicerone_service::connect_chunnel_response::Status::SUCCESS) => Ok(()),
- _ => Err(Error::ConnectChunnelFailure(response.failure_reason)),
- }
-}
-
/// Creates a forwarder session from a `listener` that has a pending connection to accept.
fn create_forwarder_session(
- dbus_conn: &mut DBusConnection,
listener: &TcpListener,
target: &TcpForwardTarget,
) -> Result<ForwarderSession> {
@@ -478,38 +314,18 @@
// Bind a vsock port, tell the guest to connect, and accept the connection.
let vsock_listener = VsockListener::bind_with_cid_port(VMADDR_CID_ANY, VMADDR_PORT_ANY)
.map_err(Error::BindVsock)?;
- vsock_listener
- .set_nonblocking(true)
- .map_err(Error::SetVsockNonblocking)?;
-
- let tcp4_port = listener
- .local_addr()
- .map_err(Error::TcpListenerPort)?
- .port();
-
- launch_chunnel(
- dbus_conn,
- vsock_listener
- .local_addr()
- .map_err(Error::VsockListenerPort)?
- .port(),
- tcp4_port,
- target,
- )?;
+ vsock_listener.set_nonblocking(true).map_err(Error::SetVsockNonblocking)?;
#[derive(PollToken)]
enum Token {
VsockAccept,
}
- let poll_ctx: PollContext<Token> =
- PollContext::build_with(&[(&vsock_listener, Token::VsockAccept)])
- .map_err(Error::PollContextNew)?;
+ let poll_ctx: PollContext<Token> = PollContext::new().map_err(Error::PollContextNew)?;
+ poll_ctx.add(&vsock_listener, Token::VsockAccept).map_err(Error::PollContextAdd)?;
// Wait a few seconds for the guest to connect.
- let events = poll_ctx
- .wait_timeout(CHUNNEL_CONNECT_TIMEOUT)
- .map_err(Error::PollWait)?;
+ let events = poll_ctx.wait_timeout(CHUNNEL_CONNECT_TIMEOUT).map_err(Error::PollWait)?;
match events.iter_readable().next() {
Some(_) => {
@@ -518,114 +334,18 @@
if sockaddr.cid() != target.vsock_cid {
Err(Error::IncorrectCid(sockaddr.cid()))
} else {
- Ok(ForwarderSession::new(
- tcp_stream.into(),
- vsock_stream.into(),
- ))
+ Ok(ForwarderSession::new(tcp_stream.into(), vsock_stream.into()))
}
}
None => Err(Error::VsockAcceptTimeout),
}
}
-/// Enqueues the new listening ports received over D-Bus for the main worker thread to process.
-fn update_listening_ports(
- req: UpdateListeningPortsRequest,
- update_queue: &Arc<Mutex<VecDeque<TcpForwardTarget>>>,
- update_evt: &EventFd,
-) -> UpdateListeningPortsResponse {
- let mut response = UpdateListeningPortsResponse::new();
-
- // Unwrap of LockResult is customary.
- let mut update_queue = update_queue.lock().unwrap();
-
- for (forward_port, forward_target) in req.tcp4_forward_targets {
- update_queue.push_back(TcpForwardTarget {
- port: forward_port as u16,
- vm_name: forward_target.vm_name,
- owner_id: forward_target.owner_id,
- container_name: forward_target.container_name,
- vsock_cid: forward_target.vsock_cid,
- });
- }
-
- match update_evt.write(1) {
- Ok(_) => {
- response.status = update_listening_ports_response::Status::SUCCESS.into();
- }
- Err(_) => {
- response.status = update_listening_ports_response::Status::FAILED.into();
- }
- }
-
- response
-}
-
-/// Sets up the D-Bus object paths and runs the D-Bus loop.
-fn dbus_thread(
- update_queue: Arc<Mutex<VecDeque<TcpForwardTarget>>>,
- update_evt: EventFd,
-) -> Result<()> {
- let connection = DBusConnection::new_system().map_err(Error::CreateProtobusService)?;
-
- connection
- .request_name(CHUNNELD_SERVICE_NAME, false, false, false)
- .map_err(Error::CreateProtobusService)?;
-
- let f = dbus_tree::Factory::new_fnmut::<()>();
- let dbus_interface = f.interface(CHUNNELD_INTERFACE, ());
- let dbus_method = f
- .method(UPDATE_LISTENING_PORTS_METHOD, (), move |m| {
- let reply = m.msg.method_return();
- let raw_buf: Vec<u8> = m.msg.read1().map_err(|_| dbus_tree::MethodErr::no_arg())?;
- let proto: UpdateListeningPortsRequest = ProtoMessage::parse_from_bytes(&raw_buf)
- .map_err(|e| dbus_tree::MethodErr::invalid_arg(&e))?;
-
- let response = update_listening_ports(proto, &update_queue, &update_evt);
- Ok(vec![reply.append1(
- response
- .write_to_bytes()
- .map_err(|e| dbus_tree::MethodErr::failed(&e))?,
- )])
- })
- .in_arg("ay")
- .out_arg("ay");
- let t = f.tree(()).add(
- f.object_path(CHUNNELD_SERVICE_PATH, ())
- .introspectable()
- .add(dbus_interface.add_m(dbus_method)),
- );
-
- t.start_receive(&connection);
-
- // We don't want chunneld waking frequently, so use a big value.
- loop {
- connection
- .process(Duration::from_millis(c_int::MAX as u64))
- .map_err(Error::DBusProcessMessage)?;
- }
-}
-
+// TODO(b/340126051): Host can receive opened ports from the guest.
+// TODO(b/340126051): Host can order executing chunnel on the guest.
fn main() -> Result<()> {
- install_memfd_handler();
- syslog::init(IDENT.to_string(), false /* log_to_stderr */).map_err(Error::Syslog)?;
-
- // Block SIGPIPE so the process doesn't exit when writing to a socket that's been shutdown.
- block_signal(Signal::SIGPIPE).map_err(Error::BlockSigpipe)?;
-
let update_evt = EventFd::new().map_err(Error::EventFdNew)?;
let update_queue = Arc::new(Mutex::new(VecDeque::new()));
- let dbus_update_queue = update_queue.clone();
-
- let worker_update_evt = update_evt.try_clone().map_err(Error::EventFdClone)?;
- let _ = thread::Builder::new()
- .name("chunnel_dbus".to_string())
- .spawn(move || {
- match dbus_thread(dbus_update_queue, worker_update_evt) {
- Ok(()) => error!("D-Bus thread has exited unexpectedly"),
- Err(e) => error!("D-Bus thread has exited with err {}", e),
- };
- });
let mut sessions = ForwarderSessions::new(update_evt, update_queue)?;
sessions.run()