Add the onPayloadStarted callback API

The API is called back to the client when the payload starts in the VM.
The standard output from the payload is accessible via the
ParcelFileDescriptor argument as well.

Bug: 192904048
Test: run MicrodroidDemoApp and check that the payload output is shown.

Change-Id: Ie2afbb455496eec21617b94940ed4386a4865876
diff --git a/virtualizationservice/src/aidl.rs b/virtualizationservice/src/aidl.rs
index 8bdfa9d..661abdc 100644
--- a/virtualizationservice/src/aidl.rs
+++ b/virtualizationservice/src/aidl.rs
@@ -36,16 +36,17 @@
 };
 use anyhow::{bail, Result};
 use disk::QcowFile;
-use log::{debug, error, warn};
+use log::{debug, error, warn, info};
 use microdroid_payload_config::{ApexConfig, VmPayloadConfig};
 use std::convert::TryInto;
 use std::ffi::CString;
 use std::fs::{File, create_dir};
 use std::num::NonZeroU32;
-use std::os::unix::io::AsRawFd;
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
 use std::path::{Path, PathBuf};
 use std::sync::{Arc, Mutex, Weak};
 use vmconfig::{VmConfig, Partition};
+use vsock::{VsockListener, SockAddr, VsockStream};
 use zip::ZipArchive;
 
 pub const BINDER_SERVICE_IDENTIFIER: &str = "android.system.virtualizationservice";
@@ -62,10 +63,17 @@
 const MICRODROID_REQUIRED_APEXES: [&str; 3] =
     ["com.android.adbd", "com.android.i18n", "com.android.os.statsd"];
 
+/// The CID representing the host VM
+const VMADDR_CID_HOST: u32 = 2;
+
+/// Port number that virtualizationservice listens on connections from the guest VMs for the
+/// payload output
+const PORT_VIRT_SERVICE: u32 = 3000;
+
 /// Implementation of `IVirtualizationService`, the entry point of the AIDL service.
 #[derive(Debug, Default)]
 pub struct VirtualizationService {
-    state: Mutex<State>,
+    state: Arc<Mutex<State>>,
 }
 
 impl Interface for VirtualizationService {}
@@ -235,6 +243,45 @@
     }
 }
 
+impl VirtualizationService {
+    pub fn init() -> VirtualizationService {
+        let service = VirtualizationService::default();
+        let state = service.state.clone(); // reference to state (not the state itself) is copied
+        std::thread::spawn(move || {
+            handle_connection_from_vm(state).unwrap();
+        });
+        service
+    }
+}
+
+/// Waits for incoming connections from VM. If a new connection is made, notify the event to the
+/// client via the callback (if registered).
+fn handle_connection_from_vm(state: Arc<Mutex<State>>) -> Result<()> {
+    let listener = VsockListener::bind_with_cid_port(VMADDR_CID_HOST, PORT_VIRT_SERVICE)?;
+    for stream in listener.incoming() {
+        let stream = match stream {
+            Err(e) => {
+                warn!("invalid incoming connection: {}", e);
+                continue;
+            }
+            Ok(s) => s,
+        };
+        if let Ok(SockAddr::Vsock(addr)) = stream.peer_addr() {
+            let cid = addr.cid();
+            let port = addr.port();
+            info!("connected from cid={}, port={}", cid, port);
+            if cid < FIRST_GUEST_CID {
+                warn!("connection is not from a guest VM");
+                continue;
+            }
+            if let Some(vm) = state.lock().unwrap().get_vm(cid) {
+                vm.callbacks.notify_payload_started(cid, stream);
+            }
+        }
+    }
+    Ok(())
+}
+
 /// Given the configuration for a disk image, assembles the `DiskFile` to pass to crosvm.
 ///
 /// This may involve assembling a composite disk from a set of partition images.
@@ -442,12 +489,25 @@
 pub struct VirtualMachineCallbacks(Mutex<Vec<Strong<dyn IVirtualMachineCallback>>>);
 
 impl VirtualMachineCallbacks {
+    /// Call all registered callbacks to notify that the payload has started.
+    pub fn notify_payload_started(&self, cid: Cid, stream: VsockStream) {
+        let callbacks = &*self.0.lock().unwrap();
+        // SAFETY: ownership is transferred from stream to f
+        let f = unsafe { File::from_raw_fd(stream.into_raw_fd()) };
+        let pfd = ParcelFileDescriptor::new(f);
+        for callback in callbacks {
+            if let Err(e) = callback.onPayloadStarted(cid as i32, &pfd) {
+                error!("Error notifying payload start event from VM CID {}: {}", cid, e);
+            }
+        }
+    }
+
     /// Call all registered callbacks to say that the VM has died.
     pub fn callback_on_died(&self, cid: Cid) {
         let callbacks = &*self.0.lock().unwrap();
         for callback in callbacks {
             if let Err(e) = callback.onDied(cid as i32) {
-                error!("Error calling callback: {}", e);
+                error!("Error notifying exit of VM CID {}: {}", cid, e);
             }
         }
     }
@@ -492,6 +552,11 @@
         self.vms.push(vm);
     }
 
+    /// Get a VM that corresponds to the given cid
+    fn get_vm(&self, cid: Cid) -> Option<Arc<VmInstance>> {
+        self.vms().into_iter().find(|vm| vm.cid == cid)
+    }
+
     /// Store a strong VM reference.
     fn debug_hold_vm(&mut self, vm: Strong<dyn IVirtualMachine>) {
         self.debug_held_vms.push(vm);