Merge "Add support for sending raw packets" into main am: ea809a8819

Original change: https://android-review.googlesource.com/c/platform/packages/modules/Connectivity/+/3133438

Change-Id: Id570abbeed06326828fbb83ca3f0b3847e1b6a33
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/staticlibs/tests/unit/host/python/apf_utils_test.py b/staticlibs/tests/unit/host/python/apf_utils_test.py
index f7fb93b..619609a 100644
--- a/staticlibs/tests/unit/host/python/apf_utils_test.py
+++ b/staticlibs/tests/unit/host/python/apf_utils_test.py
@@ -14,11 +14,17 @@
 
 import unittest
 from unittest.mock import MagicMock, patch
+from mobly.controllers.android_device_lib.adb import AdbError
 from net_tests_utils.host.python.apf_utils import (
     PatternNotFoundException,
+    UnsupportedOperationException,
     get_apf_counter,
     get_apf_counters_from_dumpsys,
+    get_hardware_address,
+    send_broadcast_empty_ethercat_packet,
+    send_raw_packet_downstream,
 )
+from net_tests_utils.host.python.assert_utils import UnexpectedBehaviorError
 
 
 class TestApfUtils(unittest.TestCase):
@@ -36,8 +42,7 @@
     COUNTER_NAME1: 123
     COUNTER_NAME2: 456
 """
-    iface_name = "wlan0"
-    counters = get_apf_counters_from_dumpsys(self.mock_ad, iface_name)
+    counters = get_apf_counters_from_dumpsys(self.mock_ad, "wlan0")
     self.assertEqual(counters, {"COUNTER_NAME1": 123, "COUNTER_NAME2": 456})
 
   @patch("net_tests_utils.host.python.adb_utils.get_dumpsys_for_service")
@@ -72,6 +77,79 @@
     # Not found
     self.assertEqual(get_apf_counter(self.mock_ad, iface, "COUNTER_NAME3"), 0)
 
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_get_hardware_address_success(
+      self, mock_adb_shell: MagicMock
+  ) -> None:
+    mock_adb_shell.return_value = """
+46: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq ...
+ link/ether 72:05:77:82:21:e0 brd ff:ff:ff:ff:ff:ff
+"""
+    mac_address = get_hardware_address(self.mock_ad, "wlan0")
+    self.assertEqual(mac_address, "72:05:77:82:21:E0")
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_get_hardware_address_not_found(
+      self, mock_adb_shell: MagicMock
+  ) -> None:
+    mock_adb_shell.return_value = "Some output without MAC address"
+    with self.assertRaises(PatternNotFoundException):
+      get_hardware_address(self.mock_ad, "wlan0")
+
+  @patch("net_tests_utils.host.python.apf_utils.get_hardware_address")
+  @patch("net_tests_utils.host.python.apf_utils.send_raw_packet_downstream")
+  def test_send_broadcast_empty_ethercat_packet(
+      self,
+      mock_send_raw_packet_downstream: MagicMock,
+      mock_get_hardware_address: MagicMock,
+  ) -> None:
+    mock_get_hardware_address.return_value = "12:34:56:78:90:AB"
+    send_broadcast_empty_ethercat_packet(self.mock_ad, "eth0")
+    # Assuming you'll mock the packet construction part, verify calls to send_raw_packet_downstream.
+    mock_send_raw_packet_downstream.assert_called_once()
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_send_raw_packet_downstream_success(
+      self, mock_adb_shell: MagicMock
+  ) -> None:
+    mock_adb_shell.return_value = ""  # Successful command output
+    packet_type = "BEEF"
+    iface_name = "eth0"
+    dst_mac = "1234567890AB"
+    packet_in_hex = "AABBCCDDEEFF"
+    send_raw_packet_downstream(
+        self.mock_ad, packet_type, iface_name, dst_mac, packet_in_hex
+    )
+    mock_adb_shell.assert_called_once_with(
+        self.mock_ad,
+        "cmd network_stack send-raw-packet-downstream"
+        f" {packet_type} {iface_name} {dst_mac} {packet_in_hex}",
+    )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_send_raw_packet_downstream_failure(
+      self, mock_adb_shell: MagicMock
+  ) -> None:
+    mock_adb_shell.return_value = (  # Unexpected command output
+        "Any Unexpected Output"
+    )
+    with self.assertRaises(UnexpectedBehaviorError):
+      send_raw_packet_downstream(
+          self.mock_ad, "BEEF", "eth0", "1234567890AB", "AABBCCDDEEFF"
+      )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_send_raw_packet_downstream_unsupported(
+      self, mock_adb_shell: MagicMock
+  ) -> None:
+    mock_adb_shell.side_effect = AdbError(
+        cmd="", stdout="Unknown command", stderr="", ret_code=3
+    )
+    with self.assertRaises(UnsupportedOperationException):
+      send_raw_packet_downstream(
+          self.mock_ad, "BEEF", "eth0", "1234567890AB", "AABBCCDDEEFF"
+      )
+
 
 if __name__ == "__main__":
   unittest.main()
diff --git a/staticlibs/testutils/host/python/apf_utils.py b/staticlibs/testutils/host/python/apf_utils.py
index 331e970..eea10de 100644
--- a/staticlibs/testutils/host/python/apf_utils.py
+++ b/staticlibs/testutils/host/python/apf_utils.py
@@ -14,13 +14,23 @@
 
 import re
 from mobly.controllers import android_device
-from net_tests_utils.host.python import adb_utils
+from mobly.controllers.android_device_lib.adb import AdbError
+from net_tests_utils.host.python import adb_utils, assert_utils
+
+
+# Constants.
+ETHER_BROADCAST = "FFFFFFFFFFFF"
+ETH_P_ETHERCAT = "88A4"
 
 
 class PatternNotFoundException(Exception):
   """Raised when the given pattern cannot be found."""
 
 
+class UnsupportedOperationException(Exception):
+  pass
+
+
 def get_apf_counter(
     ad: android_device.AndroidDevice, iface: str, counter_name: str
 ) -> int:
@@ -75,3 +85,115 @@
 
   ad.log.debug("Getting apf counters: " + str(result))
   return result
+
+
+def get_hardware_address(
+    ad: android_device.AndroidDevice, iface_name: str
+) -> str:
+  """Retrieves the hardware (MAC) address for a given network interface.
+
+  Returns:
+      The hex representative of the MAC address in uppercase.
+      E.g. 12:34:56:78:90:AB
+
+  Raises:
+      PatternNotFoundException: If the MAC address is not found in the command
+      output.
+  """
+
+  # Run the "ip link" command and get its output.
+  ip_link_output = adb_utils.adb_shell(ad, f"ip link show {iface_name}")
+
+  # Regular expression to extract the MAC address.
+  # Parse hardware address from ip link output like below:
+  # 46: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq ...
+  #    link/ether 72:05:77:82:21:e0 brd ff:ff:ff:ff:ff:ff
+  pattern = r"link/ether (([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2})"
+  match = re.search(pattern, ip_link_output)
+
+  if match:
+    return match.group(1).upper()  # Extract the MAC address string.
+  else:
+    raise PatternNotFoundException(
+        "Cannot get hardware address for " + iface_name
+    )
+
+
+def send_broadcast_empty_ethercat_packet(
+    ad: android_device.AndroidDevice, iface_name: str
+) -> None:
+  """Transmits a broadcast empty EtherCat packet on the specified interface."""
+
+  # Get the interface's MAC address.
+  mac_address = get_hardware_address(ad, iface_name)
+
+  # TODO: Build packet by using scapy library.
+  # Ethernet header (14 bytes).
+  packet = ETHER_BROADCAST  # Destination MAC (broadcast)
+  packet += mac_address.replace(":", "")  # Source MAC
+  packet += ETH_P_ETHERCAT  # EtherType (EtherCAT)
+
+  # EtherCAT header (2 bytes) + 44 bytes of zero padding.
+  packet += "00" * 46
+
+  # Send the packet using a raw socket.
+  send_raw_packet_downstream(
+      ad, ETH_P_ETHERCAT, iface_name, ETHER_BROADCAST, packet
+  )
+
+
+def send_raw_packet_downstream(
+    ad: android_device.AndroidDevice,
+    packetType: str,
+    iface_name: str,
+    dst_mac: str,
+    packet_in_hex: str,
+) -> None:
+  """Sends a raw packet over the specified downstream interface.
+
+  This function constructs and sends a raw packet using the
+  `send-raw-packet-downstream`
+  command provided by NetworkStack process. It's primarily intended for testing
+  purposes.
+
+  Args:
+      ad: The AndroidDevice object representing the connected device.
+      packetType: The type of packet to send (e.g., "88A4" for EtherCAT).
+      iface_name: The name of the network interface to use (e.g., "wlan0",
+        "eth0").
+      dst_mac: The destination MAC address of the packet (e.g., "FFFFFFFFFFFF"
+        for broadcast).
+      packet_in_hex: The raw packet data starting from L2 header encoded in
+        hexadecimal string format.
+
+  Raises:
+      UnsupportedOperationException: If the NetworkStack doesn't support
+        the `send-raw-packet` command.
+      UnexpectedBehaviorException: If the command execution produces unexpected
+        output other than an empty response or "Unknown command".
+
+  Important Considerations:
+      Security: This method only works on tethering downstream interfaces due
+        to security restrictions.
+      Packet Format: The `packet_in_hex` must be a valid hexadecimal
+        representation of a packet starting from L2 header.
+  """
+
+  cmd = (
+      "cmd network_stack send-raw-packet-downstream"
+      f" {packetType} {iface_name} {dst_mac} {packet_in_hex}"
+  )
+
+  # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise.
+  try:
+    output = adb_utils.adb_shell(ad, cmd)
+  except AdbError as e:
+    output = str(e.stdout)
+  if output:
+    if "Unknown command" in output:
+      raise UnsupportedOperationException(
+          "send-raw-packet-downstream command is not supported."
+      )
+    raise assert_utils.UnexpectedBehaviorError(
+        f"Got unexpected output: {output} for command: {cmd}."
+    )