Add support for sending raw packets

This commit includes several new functions to support sending custom
raw Ethernet packets. This includes methods for retrieving
MAC addresses, sending empty EtherCAT packets, and sending
arbitrary raw packets with specified types and destinations.
These functions are primarily intended for testing purposes
and rely on the `send-raw-packet-downstream` command provided
by NetworkStack.

Test: atest NetworkStaticLibHostPythonTests
Bug: 335368434
Change-Id: Iae7879bf92e1dda8fbd32b92ff166e82c87593c7
diff --git a/staticlibs/tests/unit/host/python/apf_utils_test.py b/staticlibs/tests/unit/host/python/apf_utils_test.py
index f7fb93b..619609a 100644
--- a/staticlibs/tests/unit/host/python/apf_utils_test.py
+++ b/staticlibs/tests/unit/host/python/apf_utils_test.py
@@ -14,11 +14,17 @@
 
 import unittest
 from unittest.mock import MagicMock, patch
+from mobly.controllers.android_device_lib.adb import AdbError
 from net_tests_utils.host.python.apf_utils import (
     PatternNotFoundException,
+    UnsupportedOperationException,
     get_apf_counter,
     get_apf_counters_from_dumpsys,
+    get_hardware_address,
+    send_broadcast_empty_ethercat_packet,
+    send_raw_packet_downstream,
 )
+from net_tests_utils.host.python.assert_utils import UnexpectedBehaviorError
 
 
 class TestApfUtils(unittest.TestCase):
@@ -36,8 +42,7 @@
     COUNTER_NAME1: 123
     COUNTER_NAME2: 456
 """
-    iface_name = "wlan0"
-    counters = get_apf_counters_from_dumpsys(self.mock_ad, iface_name)
+    counters = get_apf_counters_from_dumpsys(self.mock_ad, "wlan0")
     self.assertEqual(counters, {"COUNTER_NAME1": 123, "COUNTER_NAME2": 456})
 
   @patch("net_tests_utils.host.python.adb_utils.get_dumpsys_for_service")
@@ -72,6 +77,79 @@
     # Not found
     self.assertEqual(get_apf_counter(self.mock_ad, iface, "COUNTER_NAME3"), 0)
 
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_get_hardware_address_success(
+      self, mock_adb_shell: MagicMock
+  ) -> None:
+    mock_adb_shell.return_value = """
+46: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq ...
+ link/ether 72:05:77:82:21:e0 brd ff:ff:ff:ff:ff:ff
+"""
+    mac_address = get_hardware_address(self.mock_ad, "wlan0")
+    self.assertEqual(mac_address, "72:05:77:82:21:E0")
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_get_hardware_address_not_found(
+      self, mock_adb_shell: MagicMock
+  ) -> None:
+    mock_adb_shell.return_value = "Some output without MAC address"
+    with self.assertRaises(PatternNotFoundException):
+      get_hardware_address(self.mock_ad, "wlan0")
+
+  @patch("net_tests_utils.host.python.apf_utils.get_hardware_address")
+  @patch("net_tests_utils.host.python.apf_utils.send_raw_packet_downstream")
+  def test_send_broadcast_empty_ethercat_packet(
+      self,
+      mock_send_raw_packet_downstream: MagicMock,
+      mock_get_hardware_address: MagicMock,
+  ) -> None:
+    mock_get_hardware_address.return_value = "12:34:56:78:90:AB"
+    send_broadcast_empty_ethercat_packet(self.mock_ad, "eth0")
+    # Assuming you'll mock the packet construction part, verify calls to send_raw_packet_downstream.
+    mock_send_raw_packet_downstream.assert_called_once()
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_send_raw_packet_downstream_success(
+      self, mock_adb_shell: MagicMock
+  ) -> None:
+    mock_adb_shell.return_value = ""  # Successful command output
+    packet_type = "BEEF"
+    iface_name = "eth0"
+    dst_mac = "1234567890AB"
+    packet_in_hex = "AABBCCDDEEFF"
+    send_raw_packet_downstream(
+        self.mock_ad, packet_type, iface_name, dst_mac, packet_in_hex
+    )
+    mock_adb_shell.assert_called_once_with(
+        self.mock_ad,
+        "cmd network_stack send-raw-packet-downstream"
+        f" {packet_type} {iface_name} {dst_mac} {packet_in_hex}",
+    )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_send_raw_packet_downstream_failure(
+      self, mock_adb_shell: MagicMock
+  ) -> None:
+    mock_adb_shell.return_value = (  # Unexpected command output
+        "Any Unexpected Output"
+    )
+    with self.assertRaises(UnexpectedBehaviorError):
+      send_raw_packet_downstream(
+          self.mock_ad, "BEEF", "eth0", "1234567890AB", "AABBCCDDEEFF"
+      )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_send_raw_packet_downstream_unsupported(
+      self, mock_adb_shell: MagicMock
+  ) -> None:
+    mock_adb_shell.side_effect = AdbError(
+        cmd="", stdout="Unknown command", stderr="", ret_code=3
+    )
+    with self.assertRaises(UnsupportedOperationException):
+      send_raw_packet_downstream(
+          self.mock_ad, "BEEF", "eth0", "1234567890AB", "AABBCCDDEEFF"
+      )
+
 
 if __name__ == "__main__":
   unittest.main()
diff --git a/staticlibs/testutils/host/python/apf_utils.py b/staticlibs/testutils/host/python/apf_utils.py
index 331e970..eea10de 100644
--- a/staticlibs/testutils/host/python/apf_utils.py
+++ b/staticlibs/testutils/host/python/apf_utils.py
@@ -14,13 +14,23 @@
 
 import re
 from mobly.controllers import android_device
-from net_tests_utils.host.python import adb_utils
+from mobly.controllers.android_device_lib.adb import AdbError
+from net_tests_utils.host.python import adb_utils, assert_utils
+
+
+# Constants.
+ETHER_BROADCAST = "FFFFFFFFFFFF"
+ETH_P_ETHERCAT = "88A4"
 
 
 class PatternNotFoundException(Exception):
   """Raised when the given pattern cannot be found."""
 
 
+class UnsupportedOperationException(Exception):
+  pass
+
+
 def get_apf_counter(
     ad: android_device.AndroidDevice, iface: str, counter_name: str
 ) -> int:
@@ -75,3 +85,115 @@
 
   ad.log.debug("Getting apf counters: " + str(result))
   return result
+
+
+def get_hardware_address(
+    ad: android_device.AndroidDevice, iface_name: str
+) -> str:
+  """Retrieves the hardware (MAC) address for a given network interface.
+
+  Returns:
+      The hex representative of the MAC address in uppercase.
+      E.g. 12:34:56:78:90:AB
+
+  Raises:
+      PatternNotFoundException: If the MAC address is not found in the command
+      output.
+  """
+
+  # Run the "ip link" command and get its output.
+  ip_link_output = adb_utils.adb_shell(ad, f"ip link show {iface_name}")
+
+  # Regular expression to extract the MAC address.
+  # Parse hardware address from ip link output like below:
+  # 46: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq ...
+  #    link/ether 72:05:77:82:21:e0 brd ff:ff:ff:ff:ff:ff
+  pattern = r"link/ether (([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2})"
+  match = re.search(pattern, ip_link_output)
+
+  if match:
+    return match.group(1).upper()  # Extract the MAC address string.
+  else:
+    raise PatternNotFoundException(
+        "Cannot get hardware address for " + iface_name
+    )
+
+
+def send_broadcast_empty_ethercat_packet(
+    ad: android_device.AndroidDevice, iface_name: str
+) -> None:
+  """Transmits a broadcast empty EtherCat packet on the specified interface."""
+
+  # Get the interface's MAC address.
+  mac_address = get_hardware_address(ad, iface_name)
+
+  # TODO: Build packet by using scapy library.
+  # Ethernet header (14 bytes).
+  packet = ETHER_BROADCAST  # Destination MAC (broadcast)
+  packet += mac_address.replace(":", "")  # Source MAC
+  packet += ETH_P_ETHERCAT  # EtherType (EtherCAT)
+
+  # EtherCAT header (2 bytes) + 44 bytes of zero padding.
+  packet += "00" * 46
+
+  # Send the packet using a raw socket.
+  send_raw_packet_downstream(
+      ad, ETH_P_ETHERCAT, iface_name, ETHER_BROADCAST, packet
+  )
+
+
+def send_raw_packet_downstream(
+    ad: android_device.AndroidDevice,
+    packetType: str,
+    iface_name: str,
+    dst_mac: str,
+    packet_in_hex: str,
+) -> None:
+  """Sends a raw packet over the specified downstream interface.
+
+  This function constructs and sends a raw packet using the
+  `send-raw-packet-downstream`
+  command provided by NetworkStack process. It's primarily intended for testing
+  purposes.
+
+  Args:
+      ad: The AndroidDevice object representing the connected device.
+      packetType: The type of packet to send (e.g., "88A4" for EtherCAT).
+      iface_name: The name of the network interface to use (e.g., "wlan0",
+        "eth0").
+      dst_mac: The destination MAC address of the packet (e.g., "FFFFFFFFFFFF"
+        for broadcast).
+      packet_in_hex: The raw packet data starting from L2 header encoded in
+        hexadecimal string format.
+
+  Raises:
+      UnsupportedOperationException: If the NetworkStack doesn't support
+        the `send-raw-packet` command.
+      UnexpectedBehaviorException: If the command execution produces unexpected
+        output other than an empty response or "Unknown command".
+
+  Important Considerations:
+      Security: This method only works on tethering downstream interfaces due
+        to security restrictions.
+      Packet Format: The `packet_in_hex` must be a valid hexadecimal
+        representation of a packet starting from L2 header.
+  """
+
+  cmd = (
+      "cmd network_stack send-raw-packet-downstream"
+      f" {packetType} {iface_name} {dst_mac} {packet_in_hex}"
+  )
+
+  # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise.
+  try:
+    output = adb_utils.adb_shell(ad, cmd)
+  except AdbError as e:
+    output = str(e.stdout)
+  if output:
+    if "Unknown command" in output:
+      raise UnsupportedOperationException(
+          "send-raw-packet-downstream command is not supported."
+      )
+    raise assert_utils.UnexpectedBehaviorError(
+        f"Got unexpected output: {output} for command: {cmd}."
+    )