Introduce host side forwarder agent
Brought code from ChromiumOS under confirmation of opensource licensing
team. Modification of these files to be built would be happened in the
following change.
Bug: 340126051
Test: N/A
Change-Id: I98d3a1558fbebf2206ec1156ae9fc1feb30818c6
diff --git a/android/forwarder_host/src/main.rs b/android/forwarder_host/src/main.rs
new file mode 100644
index 0000000..40edc84
--- /dev/null
+++ b/android/forwarder_host/src/main.rs
@@ -0,0 +1,632 @@
+// Copyright 2024 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Copied from ChromiumOS with relicensing:
+// src/platform2/vm_tools/chunnel/src/bin/chunneld.rs
+
+use std::collections::btree_map::Entry as BTreeMapEntry;
+use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
+use std::fmt;
+use std::fs::File;
+use std::io;
+use std::net::{Ipv4Addr, Ipv6Addr, TcpListener};
+use std::os::fd::OwnedFd;
+use std::os::raw::c_int;
+use std::os::unix::io::AsRawFd;
+use std::result;
+use std::sync::{Arc, Mutex};
+use std::thread;
+use std::time::Duration;
+
+use chunnel::forwarder::ForwarderSession;
+use dbus::blocking::LocalConnection as DBusConnection;
+use dbus::{self, Error as DBusError};
+use libchromeos::deprecated::{EventFd, PollContext, PollToken};
+use libchromeos::panic_handler::install_memfd_handler;
+use libchromeos::pipe;
+use libchromeos::signal::block_signal;
+use libchromeos::syslog;
+use log::{error, warn};
+use nix::sys::signal::Signal;
+use protobuf::{self, Message as ProtoMessage};
+use system_api::chunneld_service::*;
+use system_api::cicerone_service;
+use vsock::VsockListener;
+use vsock::VMADDR_CID_ANY;
+
+// chunnel dbus-constants.h
+const CHUNNELD_INTERFACE: &str = "org.chromium.Chunneld";
+const CHUNNELD_SERVICE_PATH: &str = "/org/chromium/Chunneld";
+const CHUNNELD_SERVICE_NAME: &str = "org.chromium.Chunneld";
+
+// cicerone dbus-constants.h
+const VM_CICERONE_INTERFACE: &str = "org.chromium.VmCicerone";
+const VM_CICERONE_SERVICE_PATH: &str = "/org/chromium/VmCicerone";
+const VM_CICERONE_SERVICE_NAME: &str = "org.chromium.VmCicerone";
+const CONNECT_CHUNNEL_METHOD: &str = "ConnectChunnel";
+
+// permission_broker dbus-constants.h
+const PERMISSION_BROKER_INTERFACE: &str = "org.chromium.PermissionBroker";
+const PERMISSION_BROKER_SERVICE_PATH: &str = "/org/chromium/PermissionBroker";
+const PERMISSION_BROKER_SERVICE_NAME: &str = "org.chromium.PermissionBroker";
+const REQUEST_LOOPBACK_TCP_PORT_LOCKDOWN_METHOD: &str = "RequestLoopbackTcpPortLockdown";
+const RELEASE_LOOPBACK_TCP_PORT_METHOD: &str = "ReleaseLoopbackTcpPort";
+
+// chunneld dbus-constants.h
+const UPDATE_LISTENING_PORTS_METHOD: &str = "UpdateListeningPorts";
+
+const CHUNNEL_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
+const DBUS_TIMEOUT: Duration = Duration::from_secs(30);
+
+// Program name.
+const IDENT: &str = "chunneld";
+
+const VMADDR_PORT_ANY: u32 = u32::MAX;
+
+#[remain::sorted]
+#[derive(Debug)]
+enum Error {
+ BindVsock(io::Error),
+ BlockSigpipe(nix::Error),
+ ConnectChunnelFailure(String),
+ CreateProtobusService(dbus::Error),
+ DBusGetSystemBus(DBusError),
+ DBusMessageSend(DBusError),
+ DBusProcessMessage(DBusError),
+ EventFdClone(io::Error),
+ EventFdNew(nix::Error),
+ IncorrectCid(u32),
+ LifelinePipe(nix::Error),
+ NoListenerForPort(u16),
+ NoSessionForTag(SessionTag),
+ PollContextAdd(nix::Error),
+ PollContextDelete(nix::Error),
+ PollContextNew(nix::Error),
+ PollWait(nix::Error),
+ ProtobufDeserialize(protobuf::Error),
+ ProtobufSerialize(protobuf::Error),
+ SetVsockNonblocking(io::Error),
+ Syslog(syslog::Error),
+ TcpAccept(io::Error),
+ TcpListenerPort(io::Error),
+ UpdateEventRead(nix::Error),
+ VsockAccept(io::Error),
+ VsockAcceptTimeout,
+ VsockListenerPort(io::Error),
+}
+
+type Result<T> = result::Result<T, Error>;
+
+impl fmt::Display for Error {
+ #[remain::check]
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ use self::Error::*;
+
+ #[remain::sorted]
+ match self {
+ BindVsock(e) => write!(f, "failed to bind vsock: {}", e),
+ BlockSigpipe(e) => write!(f, "failed to block SIGPIPE: {}", e),
+ ConnectChunnelFailure(e) => write!(f, "failed to connect chunnel: {}", e),
+ CreateProtobusService(e) => write!(f, "failed to create D-Bus service: {}", e),
+ DBusGetSystemBus(e) => write!(f, "failed to get D-Bus system bus: {}", e),
+ DBusMessageSend(e) => write!(f, "failed to send D-Bus message: {}", e),
+ DBusProcessMessage(e) => write!(f, "failed to process D-Bus message: {}", e),
+ EventFdClone(e) => write!(f, "failed to clone eventfd: {}", e),
+ EventFdNew(e) => write!(f, "failed to create eventfd: {}", e),
+ IncorrectCid(cid) => write!(f, "chunnel connection from unexpected cid {}", cid),
+ LifelinePipe(e) => write!(f, "failed to create firewall lifeline pipe {}", e),
+ NoListenerForPort(port) => write!(f, "could not find listener for port: {}", port),
+ NoSessionForTag(tag) => write!(f, "could not find session for tag: {:x}", tag),
+ PollContextAdd(e) => write!(f, "failed to add fd to poll context: {}", e),
+ PollContextDelete(e) => write!(f, "failed to delete fd from poll context: {}", e),
+ PollContextNew(e) => write!(f, "failed to create poll context: {}", e),
+ PollWait(e) => write!(f, "failed to wait for poll: {}", e),
+ ProtobufDeserialize(e) => write!(f, "failed to deserialize protobuf: {}", e),
+ ProtobufSerialize(e) => write!(f, "failed to serialize protobuf: {}", e),
+ SetVsockNonblocking(e) => write!(f, "failed to set vsock to nonblocking: {}", e),
+ Syslog(e) => write!(f, "failed to initialize syslog: {}", e),
+ TcpAccept(e) => write!(f, "failed to accept tcp: {}", e),
+ TcpListenerPort(e) => {
+ write!(f, "failed to read local sockaddr for tcp listener: {}", e)
+ }
+ UpdateEventRead(e) => write!(f, "failed to read update eventfd: {}", e),
+ VsockAccept(e) => write!(f, "failed to accept vsock: {}", e),
+ VsockAcceptTimeout => write!(f, "timed out waiting for vsock connection"),
+ VsockListenerPort(e) => write!(f, "failed to get vsock listener port: {}", e),
+ }
+ }
+}
+
+/// A TCP forwarding target. Uniquely identifies a listening port in a given container.
+struct TcpForwardTarget {
+ pub port: u16,
+ pub vm_name: String,
+ pub container_name: String,
+ pub owner_id: String,
+ pub vsock_cid: u32,
+}
+
+/// A tag that uniquely identifies a particular forwarding session. This has arbitrarily been
+/// chosen as the fd of the local (TCP) socket.
+type SessionTag = u32;
+
+/// Implements PollToken for chunneld's main poll loop.
+#[derive(Clone, Copy, PollToken)]
+enum Token {
+ UpdatePorts,
+ Ipv4Listener(u16),
+ Ipv6Listener(u16),
+ LocalSocket(SessionTag),
+ RemoteSocket(SessionTag),
+}
+
+/// PortListeners includes all listeners (IPv4 and IPv6) for a given port, and the target
+/// container.
+struct PortListeners {
+ tcp4_listener: TcpListener,
+ tcp6_listener: TcpListener,
+ forward_target: TcpForwardTarget,
+ _firewall_lifeline: File,
+}
+
+/// SocketFamily specifies whether a socket uses IPv4 or IPv6.
+enum SocketFamily {
+ Ipv4,
+ Ipv6,
+}
+
+/// ForwarderSessions encapsulates all forwarding state for chunneld.
+struct ForwarderSessions {
+ listening_ports: BTreeMap<u16, PortListeners>,
+ tcp4_forwarders: HashMap<SessionTag, ForwarderSession>,
+ update_evt: EventFd,
+ update_queue: Arc<Mutex<VecDeque<TcpForwardTarget>>>,
+ dbus_conn: DBusConnection,
+}
+
+impl ForwarderSessions {
+ /// Creates a new instance of ForwarderSessions.
+ fn new(
+ update_evt: EventFd,
+ update_queue: Arc<Mutex<VecDeque<TcpForwardTarget>>>,
+ ) -> Result<Self> {
+ Ok(ForwarderSessions {
+ listening_ports: BTreeMap::new(),
+ tcp4_forwarders: HashMap::new(),
+ update_evt,
+ update_queue,
+ dbus_conn: DBusConnection::new_system().map_err(Error::DBusGetSystemBus)?,
+ })
+ }
+
+ /// Adds or removes listeners based on the latest listening ports from the D-Bus thread.
+ fn process_update_queue(&mut self, poll_ctx: &PollContext<Token>) -> Result<()> {
+ // Unwrap of LockResult is customary.
+ let mut update_queue = self.update_queue.lock().unwrap();
+ let mut active_ports: BTreeSet<u16> = BTreeSet::new();
+
+ // Add any new listeners first.
+ while let Some(target) = update_queue.pop_front() {
+ let port = target.port;
+ // Ignore privileged ports.
+ if port < 1024 {
+ continue;
+ }
+ if let BTreeMapEntry::Vacant(o) = self.listening_ports.entry(port) {
+ // Lock down the port to allow only Chrome to connect to it.
+ let (firewall_lifeline, dbus_fd) = pipe(true).map_err(Error::LifelinePipe)?;
+ let (allowed,): (bool,) = self
+ .dbus_conn
+ .with_proxy(
+ PERMISSION_BROKER_SERVICE_NAME,
+ PERMISSION_BROKER_SERVICE_PATH,
+ DBUS_TIMEOUT,
+ )
+ .method_call(
+ PERMISSION_BROKER_INTERFACE,
+ REQUEST_LOOPBACK_TCP_PORT_LOCKDOWN_METHOD,
+ (port, OwnedFd::from(dbus_fd)),
+ )
+ .map_err(Error::DBusMessageSend)?;
+ if !allowed {
+ warn!("failed to lock down loopback TCP port {}", port);
+ continue;
+ }
+
+ // Failing to bind a port is not fatal, but we should log it.
+ // Both IPv4 and IPv6 localhost must be bound since the host may resolve
+ // "localhost" to either.
+ let tcp4_listener = match TcpListener::bind((Ipv4Addr::LOCALHOST, port)) {
+ Ok(listener) => listener,
+ Err(e) => {
+ warn!("failed to bind TCPv4 port: {}", e);
+ continue;
+ }
+ };
+ let tcp6_listener = match TcpListener::bind((Ipv6Addr::LOCALHOST, port)) {
+ Ok(listener) => listener,
+ Err(e) => {
+ warn!("failed to bind TCPv6 port: {}", e);
+ continue;
+ }
+ };
+ poll_ctx
+ .add_many(&[
+ (&tcp4_listener, Token::Ipv4Listener(port)),
+ (&tcp6_listener, Token::Ipv6Listener(port)),
+ ])
+ .map_err(Error::PollContextAdd)?;
+ o.insert(PortListeners {
+ tcp4_listener,
+ tcp6_listener,
+ forward_target: target,
+ _firewall_lifeline: firewall_lifeline,
+ });
+ }
+ active_ports.insert(port);
+ }
+
+ // Iterate over the existing listeners; if the port is no longer in the
+ // listener list, remove it.
+ let old_ports: Vec<u16> = self.listening_ports.keys().cloned().collect();
+ for port in old_ports.iter() {
+ if !active_ports.contains(port) {
+ // Remove the PortListeners struct first - on error we want to drop it and the
+ // fds it contains.
+ let _listening_port = self.listening_ports.remove(port);
+ // Release the locked down port.
+ let (allowed,): (bool,) = self
+ .dbus_conn
+ .with_proxy(
+ PERMISSION_BROKER_SERVICE_NAME,
+ PERMISSION_BROKER_SERVICE_PATH,
+ DBUS_TIMEOUT,
+ )
+ .method_call(
+ PERMISSION_BROKER_INTERFACE,
+ RELEASE_LOOPBACK_TCP_PORT_METHOD,
+ (port,),
+ )
+ .map_err(Error::DBusMessageSend)?;
+ if !allowed {
+ warn!("failed to release loopback TCP port {}", port);
+ }
+ }
+ }
+
+ // Consume the eventfd.
+ self.update_evt.read().map_err(Error::UpdateEventRead)?;
+
+ Ok(())
+ }
+
+ fn accept_connection(
+ &mut self,
+ poll_ctx: &PollContext<Token>,
+ port: u16,
+ sock_family: SocketFamily,
+ ) -> Result<()> {
+ let port_listeners = self
+ .listening_ports
+ .get(&port)
+ .ok_or(Error::NoListenerForPort(port))?;
+
+ let listener = match sock_family {
+ SocketFamily::Ipv4 => &port_listeners.tcp4_listener,
+ SocketFamily::Ipv6 => &port_listeners.tcp6_listener,
+ };
+
+ // This session should be dropped if any of the PollContext setup fails. Since the only
+ // extant fds for the underlying sockets will be closed, they will be unregistered from
+ // epoll set automatically.
+ let session = create_forwarder_session(
+ &mut self.dbus_conn,
+ listener,
+ &port_listeners.forward_target,
+ )?;
+
+ let tag = session.local_stream().as_raw_fd() as u32;
+
+ poll_ctx
+ .add_many(&[
+ (session.local_stream(), Token::LocalSocket(tag)),
+ (session.remote_stream(), Token::RemoteSocket(tag)),
+ ])
+ .map_err(Error::PollContextAdd)?;
+
+ self.tcp4_forwarders.insert(tag, session);
+
+ Ok(())
+ }
+
+ fn forward_from_local(&mut self, poll_ctx: &PollContext<Token>, tag: SessionTag) -> Result<()> {
+ let session = self
+ .tcp4_forwarders
+ .get_mut(&tag)
+ .ok_or(Error::NoSessionForTag(tag))?;
+ let shutdown = session.forward_from_local().unwrap_or(true);
+ if shutdown {
+ poll_ctx
+ .delete(session.local_stream())
+ .map_err(Error::PollContextDelete)?;
+ if session.is_shut_down() {
+ self.tcp4_forwarders.remove(&tag);
+ }
+ }
+
+ Ok(())
+ }
+
+ fn forward_from_remote(
+ &mut self,
+ poll_ctx: &PollContext<Token>,
+ tag: SessionTag,
+ ) -> Result<()> {
+ let session = self
+ .tcp4_forwarders
+ .get_mut(&tag)
+ .ok_or(Error::NoSessionForTag(tag))?;
+ let shutdown = session.forward_from_remote().unwrap_or(true);
+ if shutdown {
+ poll_ctx
+ .delete(session.remote_stream())
+ .map_err(Error::PollContextDelete)?;
+ if session.is_shut_down() {
+ self.tcp4_forwarders.remove(&tag);
+ }
+ }
+
+ Ok(())
+ }
+
+ fn run(&mut self) -> Result<()> {
+ let poll_ctx: PollContext<Token> =
+ PollContext::build_with(&[(&self.update_evt, Token::UpdatePorts)])
+ .map_err(Error::PollContextNew)?;
+
+ loop {
+ let events = poll_ctx.wait().map_err(Error::PollWait)?;
+
+ for event in events.iter_readable() {
+ match event.token() {
+ Token::UpdatePorts => {
+ if let Err(e) = self.process_update_queue(&poll_ctx) {
+ error!("error updating listening ports: {}", e);
+ }
+ }
+ Token::Ipv4Listener(port) => {
+ if let Err(e) = self.accept_connection(&poll_ctx, port, SocketFamily::Ipv4)
+ {
+ error!("error accepting connection: {}", e);
+ }
+ }
+ Token::Ipv6Listener(port) => {
+ if let Err(e) = self.accept_connection(&poll_ctx, port, SocketFamily::Ipv6)
+ {
+ error!("error accepting connection: {}", e);
+ }
+ }
+ Token::LocalSocket(tag) => {
+ if let Err(e) = self.forward_from_local(&poll_ctx, tag) {
+ error!("error forwarding local traffic: {}", e);
+ }
+ }
+ Token::RemoteSocket(tag) => {
+ if let Err(e) = self.forward_from_remote(&poll_ctx, tag) {
+ error!("error forwarding remote traffic: {}", e);
+ }
+ }
+ }
+ }
+ }
+ }
+}
+
+/// Sends a D-Bus request to launch chunnel in the target container.
+fn launch_chunnel(
+ dbus_conn: &mut DBusConnection,
+ vsock_port: u32,
+ tcp4_port: u16,
+ target: &TcpForwardTarget,
+) -> Result<()> {
+ let mut request = cicerone_service::ConnectChunnelRequest::new();
+ request.vm_name = target.vm_name.to_owned();
+ request.container_name = target.container_name.to_owned();
+ request.owner_id = target.owner_id.to_owned();
+ request.chunneld_port = vsock_port;
+ request.target_tcp4_port = u32::from(tcp4_port);
+
+ let (raw_buffer,): (Vec<u8>,) = dbus_conn
+ .with_proxy(
+ VM_CICERONE_SERVICE_NAME,
+ VM_CICERONE_SERVICE_PATH,
+ DBUS_TIMEOUT,
+ )
+ .method_call(
+ VM_CICERONE_INTERFACE,
+ CONNECT_CHUNNEL_METHOD,
+ (request.write_to_bytes().map_err(Error::ProtobufSerialize)?,),
+ )
+ .map_err(Error::DBusMessageSend)?;
+ let response: cicerone_service::ConnectChunnelResponse =
+ ProtoMessage::parse_from_bytes(&raw_buffer).map_err(Error::ProtobufDeserialize)?;
+
+ match response.status.enum_value() {
+ Ok(cicerone_service::connect_chunnel_response::Status::SUCCESS) => Ok(()),
+ _ => Err(Error::ConnectChunnelFailure(response.failure_reason)),
+ }
+}
+
+/// Creates a forwarder session from a `listener` that has a pending connection to accept.
+fn create_forwarder_session(
+ dbus_conn: &mut DBusConnection,
+ listener: &TcpListener,
+ target: &TcpForwardTarget,
+) -> Result<ForwarderSession> {
+ let (tcp_stream, _) = listener.accept().map_err(Error::TcpAccept)?;
+ // Bind a vsock port, tell the guest to connect, and accept the connection.
+ let vsock_listener = VsockListener::bind_with_cid_port(VMADDR_CID_ANY, VMADDR_PORT_ANY)
+ .map_err(Error::BindVsock)?;
+ vsock_listener
+ .set_nonblocking(true)
+ .map_err(Error::SetVsockNonblocking)?;
+
+ let tcp4_port = listener
+ .local_addr()
+ .map_err(Error::TcpListenerPort)?
+ .port();
+
+ launch_chunnel(
+ dbus_conn,
+ vsock_listener
+ .local_addr()
+ .map_err(Error::VsockListenerPort)?
+ .port(),
+ tcp4_port,
+ target,
+ )?;
+
+ #[derive(PollToken)]
+ enum Token {
+ VsockAccept,
+ }
+
+ let poll_ctx: PollContext<Token> =
+ PollContext::build_with(&[(&vsock_listener, Token::VsockAccept)])
+ .map_err(Error::PollContextNew)?;
+
+ // Wait a few seconds for the guest to connect.
+ let events = poll_ctx
+ .wait_timeout(CHUNNEL_CONNECT_TIMEOUT)
+ .map_err(Error::PollWait)?;
+
+ match events.iter_readable().next() {
+ Some(_) => {
+ let (vsock_stream, sockaddr) = vsock_listener.accept().map_err(Error::VsockAccept)?;
+
+ if sockaddr.cid() != target.vsock_cid {
+ Err(Error::IncorrectCid(sockaddr.cid()))
+ } else {
+ Ok(ForwarderSession::new(
+ tcp_stream.into(),
+ vsock_stream.into(),
+ ))
+ }
+ }
+ None => Err(Error::VsockAcceptTimeout),
+ }
+}
+
+/// Enqueues the new listening ports received over D-Bus for the main worker thread to process.
+fn update_listening_ports(
+ req: UpdateListeningPortsRequest,
+ update_queue: &Arc<Mutex<VecDeque<TcpForwardTarget>>>,
+ update_evt: &EventFd,
+) -> UpdateListeningPortsResponse {
+ let mut response = UpdateListeningPortsResponse::new();
+
+ // Unwrap of LockResult is customary.
+ let mut update_queue = update_queue.lock().unwrap();
+
+ for (forward_port, forward_target) in req.tcp4_forward_targets {
+ update_queue.push_back(TcpForwardTarget {
+ port: forward_port as u16,
+ vm_name: forward_target.vm_name,
+ owner_id: forward_target.owner_id,
+ container_name: forward_target.container_name,
+ vsock_cid: forward_target.vsock_cid,
+ });
+ }
+
+ match update_evt.write(1) {
+ Ok(_) => {
+ response.status = update_listening_ports_response::Status::SUCCESS.into();
+ }
+ Err(_) => {
+ response.status = update_listening_ports_response::Status::FAILED.into();
+ }
+ }
+
+ response
+}
+
+/// Sets up the D-Bus object paths and runs the D-Bus loop.
+fn dbus_thread(
+ update_queue: Arc<Mutex<VecDeque<TcpForwardTarget>>>,
+ update_evt: EventFd,
+) -> Result<()> {
+ let connection = DBusConnection::new_system().map_err(Error::CreateProtobusService)?;
+
+ connection
+ .request_name(CHUNNELD_SERVICE_NAME, false, false, false)
+ .map_err(Error::CreateProtobusService)?;
+
+ let f = dbus_tree::Factory::new_fnmut::<()>();
+ let dbus_interface = f.interface(CHUNNELD_INTERFACE, ());
+ let dbus_method = f
+ .method(UPDATE_LISTENING_PORTS_METHOD, (), move |m| {
+ let reply = m.msg.method_return();
+ let raw_buf: Vec<u8> = m.msg.read1().map_err(|_| dbus_tree::MethodErr::no_arg())?;
+ let proto: UpdateListeningPortsRequest = ProtoMessage::parse_from_bytes(&raw_buf)
+ .map_err(|e| dbus_tree::MethodErr::invalid_arg(&e))?;
+
+ let response = update_listening_ports(proto, &update_queue, &update_evt);
+ Ok(vec![reply.append1(
+ response
+ .write_to_bytes()
+ .map_err(|e| dbus_tree::MethodErr::failed(&e))?,
+ )])
+ })
+ .in_arg("ay")
+ .out_arg("ay");
+ let t = f.tree(()).add(
+ f.object_path(CHUNNELD_SERVICE_PATH, ())
+ .introspectable()
+ .add(dbus_interface.add_m(dbus_method)),
+ );
+
+ t.start_receive(&connection);
+
+ // We don't want chunneld waking frequently, so use a big value.
+ loop {
+ connection
+ .process(Duration::from_millis(c_int::MAX as u64))
+ .map_err(Error::DBusProcessMessage)?;
+ }
+}
+
+fn main() -> Result<()> {
+ install_memfd_handler();
+ syslog::init(IDENT.to_string(), false /* log_to_stderr */).map_err(Error::Syslog)?;
+
+ // Block SIGPIPE so the process doesn't exit when writing to a socket that's been shutdown.
+ block_signal(Signal::SIGPIPE).map_err(Error::BlockSigpipe)?;
+
+ let update_evt = EventFd::new().map_err(Error::EventFdNew)?;
+ let update_queue = Arc::new(Mutex::new(VecDeque::new()));
+ let dbus_update_queue = update_queue.clone();
+
+ let worker_update_evt = update_evt.try_clone().map_err(Error::EventFdClone)?;
+ let _ = thread::Builder::new()
+ .name("chunnel_dbus".to_string())
+ .spawn(move || {
+ match dbus_thread(dbus_update_queue, worker_update_evt) {
+ Ok(()) => error!("D-Bus thread has exited unexpectedly"),
+ Err(e) => error!("D-Bus thread has exited with err {}", e),
+ };
+ });
+
+ let mut sessions = ForwarderSessions::new(update_evt, update_queue)?;
+ sessions.run()
+}