Report active ports periodically from VM
Bug: 340126051
Test: Run terminal app
Change-Id: I9d7eb41a4b8d11475a8fa4b8611cfd32afda0019
diff --git a/build/debian/fai_config/package_config/AVF b/build/debian/fai_config/package_config/AVF
index 1be57fe..2e55e90 100644
--- a/build/debian/fai_config/package_config/AVF
+++ b/build/debian/fai_config/package_config/AVF
@@ -1,3 +1,5 @@
PACKAGES install
+bpfcc-tools
+linux-headers-generic
procps
diff --git a/guest/forwarder_guest_launcher/Cargo.toml b/guest/forwarder_guest_launcher/Cargo.toml
index 03fda56..c875484 100644
--- a/guest/forwarder_guest_launcher/Cargo.toml
+++ b/guest/forwarder_guest_launcher/Cargo.toml
@@ -7,9 +7,13 @@
[dependencies]
anyhow = "1.0.91"
clap = { version = "4.5.20", features = ["derive"] }
+csv-async = { version = "1.3.0", features = ["tokio"] }
env_logger = "0.11.5"
+futures = "0.3.31"
+listeners = "0.2.1"
log = "0.4.22"
prost = "0.13.3"
+serde = { version = "1.0.215", features = ["derive"] }
tokio = { version = "1.40.0", features = ["process", "rt-multi-thread"] }
tonic = "0.12.3"
vsock = "0.5.1"
diff --git a/guest/forwarder_guest_launcher/src/main.rs b/guest/forwarder_guest_launcher/src/main.rs
index abb39f6..0e06c66 100644
--- a/guest/forwarder_guest_launcher/src/main.rs
+++ b/guest/forwarder_guest_launcher/src/main.rs
@@ -14,19 +14,39 @@
//! Launcher of forwarder_guest
-use anyhow::Context;
+use anyhow::{anyhow, Context};
use clap::Parser;
+use csv_async::AsyncReader;
use debian_service::debian_service_client::DebianServiceClient;
-use debian_service::QueueOpeningRequest;
-use log::debug;
+use debian_service::{QueueOpeningRequest, ReportVmActivePortsRequest};
+use futures::stream::StreamExt;
+use log::{debug, error};
+use serde::Deserialize;
+use std::collections::HashSet;
+use std::process::Stdio;
+use tokio::io::BufReader;
use tokio::process::Command;
-use tonic::transport::Endpoint;
+use tokio::try_join;
+use tonic::transport::{Channel, Endpoint};
use tonic::Request;
mod debian_service {
tonic::include_proto!("com.android.virtualization.vmlauncher.proto");
}
+const NON_PREVILEGED_PORT_RANGE_START: i32 = 1024;
+const TCPSTATES_IP_4: i8 = 4;
+const TCPSTATES_STATE_LISTEN: &str = "LISTEN";
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "UPPERCASE")]
+struct TcpStateRow {
+ ip: i8,
+ lport: i32,
+ oldstate: String,
+ newstate: String,
+}
+
#[derive(Parser)]
/// Flags for running command
pub struct Args {
@@ -40,15 +60,9 @@
grpc_port: String,
}
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn std::error::Error>> {
- env_logger::init();
- debug!("Starting forwarder_guest_launcher");
- let args = Args::parse();
- let addr = format!("https://{}:{}", args.host_addr, args.grpc_port);
-
- let channel = Endpoint::from_shared(addr)?.connect().await?;
- let mut client = DebianServiceClient::new(channel);
+async fn process_forwarding_request_queue(
+ mut client: DebianServiceClient<Channel>,
+) -> Result<(), Box<dyn std::error::Error>> {
let cid = vsock::get_local_cid().context("Failed to get CID of VM")?;
let mut res_stream = client
.open_forwarding_request_queue(Request::new(QueueOpeningRequest { cid: cid as i32 }))
@@ -72,5 +86,78 @@
.arg(format!("vsock:2:{}", vsock_port))
.spawn();
}
+ Err(anyhow!("process_forwarding_request_queue is terminated").into())
+}
+
+async fn send_active_ports_report(
+ listening_ports: HashSet<i32>,
+ client: &mut DebianServiceClient<Channel>,
+) -> Result<(), Box<dyn std::error::Error>> {
+ let res = client
+ .report_vm_active_ports(Request::new(ReportVmActivePortsRequest {
+ ports: listening_ports.into_iter().collect(),
+ }))
+ .await?
+ .into_inner();
+ if res.success {
+ debug!("Successfully reported active ports to the host");
+ } else {
+ error!("Failure response received from the host for reporting active ports");
+ }
+ Ok(())
+}
+
+async fn report_active_ports(
+ mut client: DebianServiceClient<Channel>,
+) -> Result<(), Box<dyn std::error::Error>> {
+ let mut cmd =
+ Command::new("/usr/sbin/tcpstates-bpfcc").arg("-s").stdout(Stdio::piped()).spawn()?;
+ let stdout = cmd.stdout.take().context("Failed to get stdout of tcpstates")?;
+ let mut csv_reader = AsyncReader::from_reader(BufReader::new(stdout));
+ let header = csv_reader.headers().await?.clone();
+
+ // TODO(b/340126051): Consider using NETLINK_SOCK_DIAG for the optimization.
+ let listeners = listeners::get_all()?;
+ // TODO(b/340126051): Support distinguished port forwarding for ipv6 as well.
+ let mut listening_ports: HashSet<_> = listeners
+ .iter()
+ .map(|x| x.socket)
+ .filter(|x| x.is_ipv4())
+ .map(|x| x.port().into())
+ .filter(|x| *x >= NON_PREVILEGED_PORT_RANGE_START) // Ignore privileged ports
+ .collect();
+ send_active_ports_report(listening_ports.clone(), &mut client).await?;
+
+ let mut records = csv_reader.records();
+ while let Some(record) = records.next().await {
+ let row: TcpStateRow = record?.deserialize(Some(&header))?;
+ if row.ip != TCPSTATES_IP_4 {
+ continue;
+ }
+ match (row.oldstate.as_str(), row.newstate.as_str()) {
+ (_, TCPSTATES_STATE_LISTEN) => {
+ listening_ports.insert(row.lport);
+ }
+ (TCPSTATES_STATE_LISTEN, _) => {
+ listening_ports.remove(&row.lport);
+ }
+ (_, _) => continue,
+ }
+ send_active_ports_report(listening_ports.clone(), &mut client).await?;
+ }
+
+ Err(anyhow!("report_active_ports is terminated").into())
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+ env_logger::init();
+ debug!("Starting forwarder_guest_launcher");
+ let args = Args::parse();
+ let addr = format!("https://{}:{}", args.host_addr, args.grpc_port);
+ let channel = Endpoint::from_shared(addr)?.connect().await?;
+ let client = DebianServiceClient::new(channel);
+
+ try_join!(process_forwarding_request_queue(client.clone()), report_active_ports(client))?;
Ok(())
}