Make forwarder_host buildable

Bug: 340126051
Test: m forwarder_host
Change-Id: I8c6da4602147ce329f73270559cdfa999d9f3351
diff --git a/android/forwarder_host/Android.bp b/android/forwarder_host/Android.bp
new file mode 100644
index 0000000..35c478e
--- /dev/null
+++ b/android/forwarder_host/Android.bp
@@ -0,0 +1,21 @@
+package {
+    default_applicable_licenses: ["Android-Apache-2.0"],
+}
+
+rust_binary {
+    name: "forwarder_host",
+    edition: "2021",
+    srcs: ["src/main.rs"],
+    rustlibs: [
+        "libforwarder",
+        "liblog_rust",
+        "libnix",
+        "libvmm_sys_util",
+        "libvsock",
+    ],
+    proc_macros: [
+        "libpoll_token_derive",
+        "libremain",
+    ],
+    static_executable: true,
+}
diff --git a/android/forwarder_host/src/main.rs b/android/forwarder_host/src/main.rs
index 40edc84..b95b2cc 100644
--- a/android/forwarder_host/src/main.rs
+++ b/android/forwarder_host/src/main.rs
@@ -15,62 +15,27 @@
 // Copied from ChromiumOS with relicensing:
 // src/platform2/vm_tools/chunnel/src/bin/chunneld.rs
 
+//! Host-side stream socket forwarder
+
 use std::collections::btree_map::Entry as BTreeMapEntry;
 use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
 use std::fmt;
-use std::fs::File;
 use std::io;
 use std::net::{Ipv4Addr, Ipv6Addr, TcpListener};
-use std::os::fd::OwnedFd;
-use std::os::raw::c_int;
 use std::os::unix::io::AsRawFd;
 use std::result;
 use std::sync::{Arc, Mutex};
-use std::thread;
 use std::time::Duration;
 
-use chunnel::forwarder::ForwarderSession;
-use dbus::blocking::LocalConnection as DBusConnection;
-use dbus::{self, Error as DBusError};
-use libchromeos::deprecated::{EventFd, PollContext, PollToken};
-use libchromeos::panic_handler::install_memfd_handler;
-use libchromeos::pipe;
-use libchromeos::signal::block_signal;
-use libchromeos::syslog;
+use forwarder::forwarder::ForwarderSession;
 use log::{error, warn};
-use nix::sys::signal::Signal;
-use protobuf::{self, Message as ProtoMessage};
-use system_api::chunneld_service::*;
-use system_api::cicerone_service;
+use nix::sys::eventfd::EventFd;
+use poll_token_derive::PollToken;
+use vmm_sys_util::poll::{PollContext, PollToken};
 use vsock::VsockListener;
 use vsock::VMADDR_CID_ANY;
 
-// chunnel dbus-constants.h
-const CHUNNELD_INTERFACE: &str = "org.chromium.Chunneld";
-const CHUNNELD_SERVICE_PATH: &str = "/org/chromium/Chunneld";
-const CHUNNELD_SERVICE_NAME: &str = "org.chromium.Chunneld";
-
-// cicerone dbus-constants.h
-const VM_CICERONE_INTERFACE: &str = "org.chromium.VmCicerone";
-const VM_CICERONE_SERVICE_PATH: &str = "/org/chromium/VmCicerone";
-const VM_CICERONE_SERVICE_NAME: &str = "org.chromium.VmCicerone";
-const CONNECT_CHUNNEL_METHOD: &str = "ConnectChunnel";
-
-// permission_broker dbus-constants.h
-const PERMISSION_BROKER_INTERFACE: &str = "org.chromium.PermissionBroker";
-const PERMISSION_BROKER_SERVICE_PATH: &str = "/org/chromium/PermissionBroker";
-const PERMISSION_BROKER_SERVICE_NAME: &str = "org.chromium.PermissionBroker";
-const REQUEST_LOOPBACK_TCP_PORT_LOCKDOWN_METHOD: &str = "RequestLoopbackTcpPortLockdown";
-const RELEASE_LOOPBACK_TCP_PORT_METHOD: &str = "ReleaseLoopbackTcpPort";
-
-// chunneld dbus-constants.h
-const UPDATE_LISTENING_PORTS_METHOD: &str = "UpdateListeningPorts";
-
 const CHUNNEL_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
-const DBUS_TIMEOUT: Duration = Duration::from_secs(30);
-
-// Program name.
-const IDENT: &str = "chunneld";
 
 const VMADDR_PORT_ANY: u32 = u32::MAX;
 
@@ -78,32 +43,19 @@
 #[derive(Debug)]
 enum Error {
     BindVsock(io::Error),
-    BlockSigpipe(nix::Error),
-    ConnectChunnelFailure(String),
-    CreateProtobusService(dbus::Error),
-    DBusGetSystemBus(DBusError),
-    DBusMessageSend(DBusError),
-    DBusProcessMessage(DBusError),
-    EventFdClone(io::Error),
     EventFdNew(nix::Error),
     IncorrectCid(u32),
-    LifelinePipe(nix::Error),
     NoListenerForPort(u16),
     NoSessionForTag(SessionTag),
-    PollContextAdd(nix::Error),
-    PollContextDelete(nix::Error),
-    PollContextNew(nix::Error),
-    PollWait(nix::Error),
-    ProtobufDeserialize(protobuf::Error),
-    ProtobufSerialize(protobuf::Error),
+    PollContextAdd(vmm_sys_util::errno::Error),
+    PollContextDelete(vmm_sys_util::errno::Error),
+    PollContextNew(vmm_sys_util::errno::Error),
+    PollWait(vmm_sys_util::errno::Error),
     SetVsockNonblocking(io::Error),
-    Syslog(syslog::Error),
     TcpAccept(io::Error),
-    TcpListenerPort(io::Error),
     UpdateEventRead(nix::Error),
     VsockAccept(io::Error),
     VsockAcceptTimeout,
-    VsockListenerPort(io::Error),
 }
 
 type Result<T> = result::Result<T, Error>;
@@ -116,34 +68,19 @@
         #[remain::sorted]
         match self {
             BindVsock(e) => write!(f, "failed to bind vsock: {}", e),
-            BlockSigpipe(e) => write!(f, "failed to block SIGPIPE: {}", e),
-            ConnectChunnelFailure(e) => write!(f, "failed to connect chunnel: {}", e),
-            CreateProtobusService(e) => write!(f, "failed to create D-Bus service: {}", e),
-            DBusGetSystemBus(e) => write!(f, "failed to get D-Bus system bus: {}", e),
-            DBusMessageSend(e) => write!(f, "failed to send D-Bus message: {}", e),
-            DBusProcessMessage(e) => write!(f, "failed to process D-Bus message: {}", e),
-            EventFdClone(e) => write!(f, "failed to clone eventfd: {}", e),
             EventFdNew(e) => write!(f, "failed to create eventfd: {}", e),
             IncorrectCid(cid) => write!(f, "chunnel connection from unexpected cid {}", cid),
-            LifelinePipe(e) => write!(f, "failed to create firewall lifeline pipe {}", e),
             NoListenerForPort(port) => write!(f, "could not find listener for port: {}", port),
             NoSessionForTag(tag) => write!(f, "could not find session for tag: {:x}", tag),
             PollContextAdd(e) => write!(f, "failed to add fd to poll context: {}", e),
             PollContextDelete(e) => write!(f, "failed to delete fd from poll context: {}", e),
             PollContextNew(e) => write!(f, "failed to create poll context: {}", e),
             PollWait(e) => write!(f, "failed to wait for poll: {}", e),
-            ProtobufDeserialize(e) => write!(f, "failed to deserialize protobuf: {}", e),
-            ProtobufSerialize(e) => write!(f, "failed to serialize protobuf: {}", e),
             SetVsockNonblocking(e) => write!(f, "failed to set vsock to nonblocking: {}", e),
-            Syslog(e) => write!(f, "failed to initialize syslog: {}", e),
             TcpAccept(e) => write!(f, "failed to accept tcp: {}", e),
-            TcpListenerPort(e) => {
-                write!(f, "failed to read local sockaddr for tcp listener: {}", e)
-            }
             UpdateEventRead(e) => write!(f, "failed to read update eventfd: {}", e),
             VsockAccept(e) => write!(f, "failed to accept vsock: {}", e),
             VsockAcceptTimeout => write!(f, "timed out waiting for vsock connection"),
-            VsockListenerPort(e) => write!(f, "failed to get vsock listener port: {}", e),
         }
     }
 }
@@ -151,9 +88,6 @@
 /// A TCP forwarding target. Uniquely identifies a listening port in a given container.
 struct TcpForwardTarget {
     pub port: u16,
-    pub vm_name: String,
-    pub container_name: String,
-    pub owner_id: String,
     pub vsock_cid: u32,
 }
 
@@ -177,7 +111,6 @@
     tcp4_listener: TcpListener,
     tcp6_listener: TcpListener,
     forward_target: TcpForwardTarget,
-    _firewall_lifeline: File,
 }
 
 /// SocketFamily specifies whether a socket uses IPv4 or IPv6.
@@ -192,7 +125,6 @@
     tcp4_forwarders: HashMap<SessionTag, ForwarderSession>,
     update_evt: EventFd,
     update_queue: Arc<Mutex<VecDeque<TcpForwardTarget>>>,
-    dbus_conn: DBusConnection,
 }
 
 impl ForwarderSessions {
@@ -206,7 +138,6 @@
             tcp4_forwarders: HashMap::new(),
             update_evt,
             update_queue,
-            dbus_conn: DBusConnection::new_system().map_err(Error::DBusGetSystemBus)?,
         })
     }
 
@@ -224,26 +155,6 @@
                 continue;
             }
             if let BTreeMapEntry::Vacant(o) = self.listening_ports.entry(port) {
-                // Lock down the port to allow only Chrome to connect to it.
-                let (firewall_lifeline, dbus_fd) = pipe(true).map_err(Error::LifelinePipe)?;
-                let (allowed,): (bool,) = self
-                    .dbus_conn
-                    .with_proxy(
-                        PERMISSION_BROKER_SERVICE_NAME,
-                        PERMISSION_BROKER_SERVICE_PATH,
-                        DBUS_TIMEOUT,
-                    )
-                    .method_call(
-                        PERMISSION_BROKER_INTERFACE,
-                        REQUEST_LOOPBACK_TCP_PORT_LOCKDOWN_METHOD,
-                        (port, OwnedFd::from(dbus_fd)),
-                    )
-                    .map_err(Error::DBusMessageSend)?;
-                if !allowed {
-                    warn!("failed to lock down loopback TCP port {}", port);
-                    continue;
-                }
-
                 // Failing to bind a port is not fatal, but we should log it.
                 // Both IPv4 and IPv6 localhost must be bound since the host may resolve
                 // "localhost" to either.
@@ -262,17 +173,12 @@
                     }
                 };
                 poll_ctx
-                    .add_many(&[
-                        (&tcp4_listener, Token::Ipv4Listener(port)),
-                        (&tcp6_listener, Token::Ipv6Listener(port)),
-                    ])
+                    .add(&tcp4_listener, Token::Ipv4Listener(port))
                     .map_err(Error::PollContextAdd)?;
-                o.insert(PortListeners {
-                    tcp4_listener,
-                    tcp6_listener,
-                    forward_target: target,
-                    _firewall_lifeline: firewall_lifeline,
-                });
+                poll_ctx
+                    .add(&tcp6_listener, Token::Ipv6Listener(port))
+                    .map_err(Error::PollContextAdd)?;
+                o.insert(PortListeners { tcp4_listener, tcp6_listener, forward_target: target });
             }
             active_ports.insert(port);
         }
@@ -285,23 +191,6 @@
                 // Remove the PortListeners struct first - on error we want to drop it and the
                 // fds it contains.
                 let _listening_port = self.listening_ports.remove(port);
-                // Release the locked down port.
-                let (allowed,): (bool,) = self
-                    .dbus_conn
-                    .with_proxy(
-                        PERMISSION_BROKER_SERVICE_NAME,
-                        PERMISSION_BROKER_SERVICE_PATH,
-                        DBUS_TIMEOUT,
-                    )
-                    .method_call(
-                        PERMISSION_BROKER_INTERFACE,
-                        RELEASE_LOOPBACK_TCP_PORT_METHOD,
-                        (port,),
-                    )
-                    .map_err(Error::DBusMessageSend)?;
-                if !allowed {
-                    warn!("failed to release loopback TCP port {}", port);
-                }
             }
         }
 
@@ -317,10 +206,8 @@
         port: u16,
         sock_family: SocketFamily,
     ) -> Result<()> {
-        let port_listeners = self
-            .listening_ports
-            .get(&port)
-            .ok_or(Error::NoListenerForPort(port))?;
+        let port_listeners =
+            self.listening_ports.get(&port).ok_or(Error::NoListenerForPort(port))?;
 
         let listener = match sock_family {
             SocketFamily::Ipv4 => &port_listeners.tcp4_listener,
@@ -330,19 +217,15 @@
         // This session should be dropped if any of the PollContext setup fails. Since the only
         // extant fds for the underlying sockets will be closed, they will be unregistered from
         // epoll set automatically.
-        let session = create_forwarder_session(
-            &mut self.dbus_conn,
-            listener,
-            &port_listeners.forward_target,
-        )?;
+        let session = create_forwarder_session(listener, &port_listeners.forward_target)?;
 
         let tag = session.local_stream().as_raw_fd() as u32;
 
         poll_ctx
-            .add_many(&[
-                (session.local_stream(), Token::LocalSocket(tag)),
-                (session.remote_stream(), Token::RemoteSocket(tag)),
-            ])
+            .add(session.local_stream(), Token::LocalSocket(tag))
+            .map_err(Error::PollContextAdd)?;
+        poll_ctx
+            .add(session.remote_stream(), Token::RemoteSocket(tag))
             .map_err(Error::PollContextAdd)?;
 
         self.tcp4_forwarders.insert(tag, session);
@@ -351,15 +234,10 @@
     }
 
     fn forward_from_local(&mut self, poll_ctx: &PollContext<Token>, tag: SessionTag) -> Result<()> {
-        let session = self
-            .tcp4_forwarders
-            .get_mut(&tag)
-            .ok_or(Error::NoSessionForTag(tag))?;
+        let session = self.tcp4_forwarders.get_mut(&tag).ok_or(Error::NoSessionForTag(tag))?;
         let shutdown = session.forward_from_local().unwrap_or(true);
         if shutdown {
-            poll_ctx
-                .delete(session.local_stream())
-                .map_err(Error::PollContextDelete)?;
+            poll_ctx.delete(session.local_stream()).map_err(Error::PollContextDelete)?;
             if session.is_shut_down() {
                 self.tcp4_forwarders.remove(&tag);
             }
@@ -373,15 +251,10 @@
         poll_ctx: &PollContext<Token>,
         tag: SessionTag,
     ) -> Result<()> {
-        let session = self
-            .tcp4_forwarders
-            .get_mut(&tag)
-            .ok_or(Error::NoSessionForTag(tag))?;
+        let session = self.tcp4_forwarders.get_mut(&tag).ok_or(Error::NoSessionForTag(tag))?;
         let shutdown = session.forward_from_remote().unwrap_or(true);
         if shutdown {
-            poll_ctx
-                .delete(session.remote_stream())
-                .map_err(Error::PollContextDelete)?;
+            poll_ctx.delete(session.remote_stream()).map_err(Error::PollContextDelete)?;
             if session.is_shut_down() {
                 self.tcp4_forwarders.remove(&tag);
             }
@@ -391,9 +264,8 @@
     }
 
     fn run(&mut self) -> Result<()> {
-        let poll_ctx: PollContext<Token> =
-            PollContext::build_with(&[(&self.update_evt, Token::UpdatePorts)])
-                .map_err(Error::PollContextNew)?;
+        let poll_ctx: PollContext<Token> = PollContext::new().map_err(Error::PollContextNew)?;
+        poll_ctx.add(&self.update_evt, Token::UpdatePorts).map_err(Error::PollContextAdd)?;
 
         loop {
             let events = poll_ctx.wait().map_err(Error::PollWait)?;
@@ -433,44 +305,8 @@
     }
 }
 
-/// Sends a D-Bus request to launch chunnel in the target container.
-fn launch_chunnel(
-    dbus_conn: &mut DBusConnection,
-    vsock_port: u32,
-    tcp4_port: u16,
-    target: &TcpForwardTarget,
-) -> Result<()> {
-    let mut request = cicerone_service::ConnectChunnelRequest::new();
-    request.vm_name = target.vm_name.to_owned();
-    request.container_name = target.container_name.to_owned();
-    request.owner_id = target.owner_id.to_owned();
-    request.chunneld_port = vsock_port;
-    request.target_tcp4_port = u32::from(tcp4_port);
-
-    let (raw_buffer,): (Vec<u8>,) = dbus_conn
-        .with_proxy(
-            VM_CICERONE_SERVICE_NAME,
-            VM_CICERONE_SERVICE_PATH,
-            DBUS_TIMEOUT,
-        )
-        .method_call(
-            VM_CICERONE_INTERFACE,
-            CONNECT_CHUNNEL_METHOD,
-            (request.write_to_bytes().map_err(Error::ProtobufSerialize)?,),
-        )
-        .map_err(Error::DBusMessageSend)?;
-    let response: cicerone_service::ConnectChunnelResponse =
-        ProtoMessage::parse_from_bytes(&raw_buffer).map_err(Error::ProtobufDeserialize)?;
-
-    match response.status.enum_value() {
-        Ok(cicerone_service::connect_chunnel_response::Status::SUCCESS) => Ok(()),
-        _ => Err(Error::ConnectChunnelFailure(response.failure_reason)),
-    }
-}
-
 /// Creates a forwarder session from a `listener` that has a pending connection to accept.
 fn create_forwarder_session(
-    dbus_conn: &mut DBusConnection,
     listener: &TcpListener,
     target: &TcpForwardTarget,
 ) -> Result<ForwarderSession> {
@@ -478,38 +314,18 @@
     // Bind a vsock port, tell the guest to connect, and accept the connection.
     let vsock_listener = VsockListener::bind_with_cid_port(VMADDR_CID_ANY, VMADDR_PORT_ANY)
         .map_err(Error::BindVsock)?;
-    vsock_listener
-        .set_nonblocking(true)
-        .map_err(Error::SetVsockNonblocking)?;
-
-    let tcp4_port = listener
-        .local_addr()
-        .map_err(Error::TcpListenerPort)?
-        .port();
-
-    launch_chunnel(
-        dbus_conn,
-        vsock_listener
-            .local_addr()
-            .map_err(Error::VsockListenerPort)?
-            .port(),
-        tcp4_port,
-        target,
-    )?;
+    vsock_listener.set_nonblocking(true).map_err(Error::SetVsockNonblocking)?;
 
     #[derive(PollToken)]
     enum Token {
         VsockAccept,
     }
 
-    let poll_ctx: PollContext<Token> =
-        PollContext::build_with(&[(&vsock_listener, Token::VsockAccept)])
-            .map_err(Error::PollContextNew)?;
+    let poll_ctx: PollContext<Token> = PollContext::new().map_err(Error::PollContextNew)?;
+    poll_ctx.add(&vsock_listener, Token::VsockAccept).map_err(Error::PollContextAdd)?;
 
     // Wait a few seconds for the guest to connect.
-    let events = poll_ctx
-        .wait_timeout(CHUNNEL_CONNECT_TIMEOUT)
-        .map_err(Error::PollWait)?;
+    let events = poll_ctx.wait_timeout(CHUNNEL_CONNECT_TIMEOUT).map_err(Error::PollWait)?;
 
     match events.iter_readable().next() {
         Some(_) => {
@@ -518,114 +334,18 @@
             if sockaddr.cid() != target.vsock_cid {
                 Err(Error::IncorrectCid(sockaddr.cid()))
             } else {
-                Ok(ForwarderSession::new(
-                    tcp_stream.into(),
-                    vsock_stream.into(),
-                ))
+                Ok(ForwarderSession::new(tcp_stream.into(), vsock_stream.into()))
             }
         }
         None => Err(Error::VsockAcceptTimeout),
     }
 }
 
-/// Enqueues the new listening ports received over D-Bus for the main worker thread to process.
-fn update_listening_ports(
-    req: UpdateListeningPortsRequest,
-    update_queue: &Arc<Mutex<VecDeque<TcpForwardTarget>>>,
-    update_evt: &EventFd,
-) -> UpdateListeningPortsResponse {
-    let mut response = UpdateListeningPortsResponse::new();
-
-    // Unwrap of LockResult is customary.
-    let mut update_queue = update_queue.lock().unwrap();
-
-    for (forward_port, forward_target) in req.tcp4_forward_targets {
-        update_queue.push_back(TcpForwardTarget {
-            port: forward_port as u16,
-            vm_name: forward_target.vm_name,
-            owner_id: forward_target.owner_id,
-            container_name: forward_target.container_name,
-            vsock_cid: forward_target.vsock_cid,
-        });
-    }
-
-    match update_evt.write(1) {
-        Ok(_) => {
-            response.status = update_listening_ports_response::Status::SUCCESS.into();
-        }
-        Err(_) => {
-            response.status = update_listening_ports_response::Status::FAILED.into();
-        }
-    }
-
-    response
-}
-
-/// Sets up the D-Bus object paths and runs the D-Bus loop.
-fn dbus_thread(
-    update_queue: Arc<Mutex<VecDeque<TcpForwardTarget>>>,
-    update_evt: EventFd,
-) -> Result<()> {
-    let connection = DBusConnection::new_system().map_err(Error::CreateProtobusService)?;
-
-    connection
-        .request_name(CHUNNELD_SERVICE_NAME, false, false, false)
-        .map_err(Error::CreateProtobusService)?;
-
-    let f = dbus_tree::Factory::new_fnmut::<()>();
-    let dbus_interface = f.interface(CHUNNELD_INTERFACE, ());
-    let dbus_method = f
-        .method(UPDATE_LISTENING_PORTS_METHOD, (), move |m| {
-            let reply = m.msg.method_return();
-            let raw_buf: Vec<u8> = m.msg.read1().map_err(|_| dbus_tree::MethodErr::no_arg())?;
-            let proto: UpdateListeningPortsRequest = ProtoMessage::parse_from_bytes(&raw_buf)
-                .map_err(|e| dbus_tree::MethodErr::invalid_arg(&e))?;
-
-            let response = update_listening_ports(proto, &update_queue, &update_evt);
-            Ok(vec![reply.append1(
-                response
-                    .write_to_bytes()
-                    .map_err(|e| dbus_tree::MethodErr::failed(&e))?,
-            )])
-        })
-        .in_arg("ay")
-        .out_arg("ay");
-    let t = f.tree(()).add(
-        f.object_path(CHUNNELD_SERVICE_PATH, ())
-            .introspectable()
-            .add(dbus_interface.add_m(dbus_method)),
-    );
-
-    t.start_receive(&connection);
-
-    // We don't want chunneld waking frequently, so use a big value.
-    loop {
-        connection
-            .process(Duration::from_millis(c_int::MAX as u64))
-            .map_err(Error::DBusProcessMessage)?;
-    }
-}
-
+// TODO(b/340126051): Host can receive opened ports from the guest.
+// TODO(b/340126051): Host can order executing chunnel on the guest.
 fn main() -> Result<()> {
-    install_memfd_handler();
-    syslog::init(IDENT.to_string(), false /* log_to_stderr */).map_err(Error::Syslog)?;
-
-    // Block SIGPIPE so the process doesn't exit when writing to a socket that's been shutdown.
-    block_signal(Signal::SIGPIPE).map_err(Error::BlockSigpipe)?;
-
     let update_evt = EventFd::new().map_err(Error::EventFdNew)?;
     let update_queue = Arc::new(Mutex::new(VecDeque::new()));
-    let dbus_update_queue = update_queue.clone();
-
-    let worker_update_evt = update_evt.try_clone().map_err(Error::EventFdClone)?;
-    let _ = thread::Builder::new()
-        .name("chunnel_dbus".to_string())
-        .spawn(move || {
-            match dbus_thread(dbus_update_queue, worker_update_evt) {
-                Ok(()) => error!("D-Bus thread has exited unexpectedly"),
-                Err(e) => error!("D-Bus thread has exited with err {}", e),
-            };
-        });
 
     let mut sessions = ForwarderSessions::new(update_evt, update_queue)?;
     sessions.run()