Merge "Add support for sending raw packets" into main am: ea809a8819 am: c83492ba67
Original change: https://android-review.googlesource.com/c/platform/packages/modules/Connectivity/+/3133438
Change-Id: I66da1e4c112c39a220a4562c41251f35604c7bbb
Signed-off-by: Automerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
diff --git a/staticlibs/tests/unit/host/python/apf_utils_test.py b/staticlibs/tests/unit/host/python/apf_utils_test.py
index f7fb93b..619609a 100644
--- a/staticlibs/tests/unit/host/python/apf_utils_test.py
+++ b/staticlibs/tests/unit/host/python/apf_utils_test.py
@@ -14,11 +14,17 @@
import unittest
from unittest.mock import MagicMock, patch
+from mobly.controllers.android_device_lib.adb import AdbError
from net_tests_utils.host.python.apf_utils import (
PatternNotFoundException,
+ UnsupportedOperationException,
get_apf_counter,
get_apf_counters_from_dumpsys,
+ get_hardware_address,
+ send_broadcast_empty_ethercat_packet,
+ send_raw_packet_downstream,
)
+from net_tests_utils.host.python.assert_utils import UnexpectedBehaviorError
class TestApfUtils(unittest.TestCase):
@@ -36,8 +42,7 @@
COUNTER_NAME1: 123
COUNTER_NAME2: 456
"""
- iface_name = "wlan0"
- counters = get_apf_counters_from_dumpsys(self.mock_ad, iface_name)
+ counters = get_apf_counters_from_dumpsys(self.mock_ad, "wlan0")
self.assertEqual(counters, {"COUNTER_NAME1": 123, "COUNTER_NAME2": 456})
@patch("net_tests_utils.host.python.adb_utils.get_dumpsys_for_service")
@@ -72,6 +77,79 @@
# Not found
self.assertEqual(get_apf_counter(self.mock_ad, iface, "COUNTER_NAME3"), 0)
+ @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+ def test_get_hardware_address_success(
+ self, mock_adb_shell: MagicMock
+ ) -> None:
+ mock_adb_shell.return_value = """
+46: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq ...
+ link/ether 72:05:77:82:21:e0 brd ff:ff:ff:ff:ff:ff
+"""
+ mac_address = get_hardware_address(self.mock_ad, "wlan0")
+ self.assertEqual(mac_address, "72:05:77:82:21:E0")
+
+ @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+ def test_get_hardware_address_not_found(
+ self, mock_adb_shell: MagicMock
+ ) -> None:
+ mock_adb_shell.return_value = "Some output without MAC address"
+ with self.assertRaises(PatternNotFoundException):
+ get_hardware_address(self.mock_ad, "wlan0")
+
+ @patch("net_tests_utils.host.python.apf_utils.get_hardware_address")
+ @patch("net_tests_utils.host.python.apf_utils.send_raw_packet_downstream")
+ def test_send_broadcast_empty_ethercat_packet(
+ self,
+ mock_send_raw_packet_downstream: MagicMock,
+ mock_get_hardware_address: MagicMock,
+ ) -> None:
+ mock_get_hardware_address.return_value = "12:34:56:78:90:AB"
+ send_broadcast_empty_ethercat_packet(self.mock_ad, "eth0")
+ # Assuming you'll mock the packet construction part, verify calls to send_raw_packet_downstream.
+ mock_send_raw_packet_downstream.assert_called_once()
+
+ @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+ def test_send_raw_packet_downstream_success(
+ self, mock_adb_shell: MagicMock
+ ) -> None:
+ mock_adb_shell.return_value = "" # Successful command output
+ packet_type = "BEEF"
+ iface_name = "eth0"
+ dst_mac = "1234567890AB"
+ packet_in_hex = "AABBCCDDEEFF"
+ send_raw_packet_downstream(
+ self.mock_ad, packet_type, iface_name, dst_mac, packet_in_hex
+ )
+ mock_adb_shell.assert_called_once_with(
+ self.mock_ad,
+ "cmd network_stack send-raw-packet-downstream"
+ f" {packet_type} {iface_name} {dst_mac} {packet_in_hex}",
+ )
+
+ @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+ def test_send_raw_packet_downstream_failure(
+ self, mock_adb_shell: MagicMock
+ ) -> None:
+ mock_adb_shell.return_value = ( # Unexpected command output
+ "Any Unexpected Output"
+ )
+ with self.assertRaises(UnexpectedBehaviorError):
+ send_raw_packet_downstream(
+ self.mock_ad, "BEEF", "eth0", "1234567890AB", "AABBCCDDEEFF"
+ )
+
+ @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+ def test_send_raw_packet_downstream_unsupported(
+ self, mock_adb_shell: MagicMock
+ ) -> None:
+ mock_adb_shell.side_effect = AdbError(
+ cmd="", stdout="Unknown command", stderr="", ret_code=3
+ )
+ with self.assertRaises(UnsupportedOperationException):
+ send_raw_packet_downstream(
+ self.mock_ad, "BEEF", "eth0", "1234567890AB", "AABBCCDDEEFF"
+ )
+
if __name__ == "__main__":
unittest.main()
diff --git a/staticlibs/testutils/host/python/apf_utils.py b/staticlibs/testutils/host/python/apf_utils.py
index 331e970..eea10de 100644
--- a/staticlibs/testutils/host/python/apf_utils.py
+++ b/staticlibs/testutils/host/python/apf_utils.py
@@ -14,13 +14,23 @@
import re
from mobly.controllers import android_device
-from net_tests_utils.host.python import adb_utils
+from mobly.controllers.android_device_lib.adb import AdbError
+from net_tests_utils.host.python import adb_utils, assert_utils
+
+
+# Constants.
+ETHER_BROADCAST = "FFFFFFFFFFFF"
+ETH_P_ETHERCAT = "88A4"
class PatternNotFoundException(Exception):
"""Raised when the given pattern cannot be found."""
+class UnsupportedOperationException(Exception):
+ pass
+
+
def get_apf_counter(
ad: android_device.AndroidDevice, iface: str, counter_name: str
) -> int:
@@ -75,3 +85,115 @@
ad.log.debug("Getting apf counters: " + str(result))
return result
+
+
+def get_hardware_address(
+ ad: android_device.AndroidDevice, iface_name: str
+) -> str:
+ """Retrieves the hardware (MAC) address for a given network interface.
+
+ Returns:
+ The hex representative of the MAC address in uppercase.
+ E.g. 12:34:56:78:90:AB
+
+ Raises:
+ PatternNotFoundException: If the MAC address is not found in the command
+ output.
+ """
+
+ # Run the "ip link" command and get its output.
+ ip_link_output = adb_utils.adb_shell(ad, f"ip link show {iface_name}")
+
+ # Regular expression to extract the MAC address.
+ # Parse hardware address from ip link output like below:
+ # 46: wlan0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc mq ...
+ # link/ether 72:05:77:82:21:e0 brd ff:ff:ff:ff:ff:ff
+ pattern = r"link/ether (([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2})"
+ match = re.search(pattern, ip_link_output)
+
+ if match:
+ return match.group(1).upper() # Extract the MAC address string.
+ else:
+ raise PatternNotFoundException(
+ "Cannot get hardware address for " + iface_name
+ )
+
+
+def send_broadcast_empty_ethercat_packet(
+ ad: android_device.AndroidDevice, iface_name: str
+) -> None:
+ """Transmits a broadcast empty EtherCat packet on the specified interface."""
+
+ # Get the interface's MAC address.
+ mac_address = get_hardware_address(ad, iface_name)
+
+ # TODO: Build packet by using scapy library.
+ # Ethernet header (14 bytes).
+ packet = ETHER_BROADCAST # Destination MAC (broadcast)
+ packet += mac_address.replace(":", "") # Source MAC
+ packet += ETH_P_ETHERCAT # EtherType (EtherCAT)
+
+ # EtherCAT header (2 bytes) + 44 bytes of zero padding.
+ packet += "00" * 46
+
+ # Send the packet using a raw socket.
+ send_raw_packet_downstream(
+ ad, ETH_P_ETHERCAT, iface_name, ETHER_BROADCAST, packet
+ )
+
+
+def send_raw_packet_downstream(
+ ad: android_device.AndroidDevice,
+ packetType: str,
+ iface_name: str,
+ dst_mac: str,
+ packet_in_hex: str,
+) -> None:
+ """Sends a raw packet over the specified downstream interface.
+
+ This function constructs and sends a raw packet using the
+ `send-raw-packet-downstream`
+ command provided by NetworkStack process. It's primarily intended for testing
+ purposes.
+
+ Args:
+ ad: The AndroidDevice object representing the connected device.
+ packetType: The type of packet to send (e.g., "88A4" for EtherCAT).
+ iface_name: The name of the network interface to use (e.g., "wlan0",
+ "eth0").
+ dst_mac: The destination MAC address of the packet (e.g., "FFFFFFFFFFFF"
+ for broadcast).
+ packet_in_hex: The raw packet data starting from L2 header encoded in
+ hexadecimal string format.
+
+ Raises:
+ UnsupportedOperationException: If the NetworkStack doesn't support
+ the `send-raw-packet` command.
+ UnexpectedBehaviorException: If the command execution produces unexpected
+ output other than an empty response or "Unknown command".
+
+ Important Considerations:
+ Security: This method only works on tethering downstream interfaces due
+ to security restrictions.
+ Packet Format: The `packet_in_hex` must be a valid hexadecimal
+ representation of a packet starting from L2 header.
+ """
+
+ cmd = (
+ "cmd network_stack send-raw-packet-downstream"
+ f" {packetType} {iface_name} {dst_mac} {packet_in_hex}"
+ )
+
+ # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise.
+ try:
+ output = adb_utils.adb_shell(ad, cmd)
+ except AdbError as e:
+ output = str(e.stdout)
+ if output:
+ if "Unknown command" in output:
+ raise UnsupportedOperationException(
+ "send-raw-packet-downstream command is not supported."
+ )
+ raise assert_utils.UnexpectedBehaviorError(
+ f"Got unexpected output: {output} for command: {cmd}."
+ )