Add packet capture and match network_stack adb command to to `apf_utils`

It wraps the following 3 commands for multi-devices tests
- cmd network_stack capture start <interface>
- cmd network_stack capture stop <interface>
- cmd network_stack capture matched-packet-counts <interface> <pkt-in-hex>

Add a helper function `is_packet_capture_supported` as APFv6 precondition check.
Test: atest NetworkStaticLibHostPythonTests

Change-Id: I526c678c6e753cee3969b04a56ab33e5e3d1600d
diff --git a/staticlibs/tests/unit/host/python/apf_utils_test.py b/staticlibs/tests/unit/host/python/apf_utils_test.py
index 2885460..419b338 100644
--- a/staticlibs/tests/unit/host/python/apf_utils_test.py
+++ b/staticlibs/tests/unit/host/python/apf_utils_test.py
@@ -29,6 +29,10 @@
     get_ipv6_addresses,
     get_hardware_address,
     is_send_raw_packet_downstream_supported,
+    is_packet_capture_supported,
+    start_capture_packets,
+    stop_capture_packets,
+    get_matched_packet_counts,
     send_raw_packet_downstream,
 )
 from net_tests_utils.host.python.assert_utils import UnexpectedBehaviorError
@@ -208,6 +212,144 @@
         "Send raw packet should not be supported.",
     )
 
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_start_capture_success(
+          self, mock_adb_shell: MagicMock
+  ) -> None:
+      mock_adb_shell.return_value = "success"  # Successful command output
+      start_capture_packets(
+          self.mock_ad, TEST_IFACE_NAME
+      )
+      mock_adb_shell.assert_called_once_with(
+          self.mock_ad,
+          "cmd network_stack capture start"
+          f" {TEST_IFACE_NAME}"
+      )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_start_capture_failure(
+          self, mock_adb_shell: MagicMock
+  ) -> None:
+      mock_adb_shell.return_value = (  # Unexpected command output
+          "Any Unexpected Output"
+      )
+      with asserts.assert_raises(UnexpectedBehaviorError):
+          start_capture_packets(
+              self.mock_ad, TEST_IFACE_NAME
+          )
+      asserts.assert_true(
+          is_packet_capture_supported(self.mock_ad),
+          "Start capturing packets should be supported.",
+      )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_start_capture_unsupported(
+          self, mock_adb_shell: MagicMock
+  ) -> None:
+      mock_adb_shell.side_effect = AdbError(
+          cmd="", stdout="Unknown command", stderr="", ret_code=3
+      )
+      with asserts.assert_raises(UnsupportedOperationException):
+          start_capture_packets(
+              self.mock_ad, TEST_IFACE_NAME
+          )
+      asserts.assert_false(
+          is_packet_capture_supported(self.mock_ad),
+          "Start capturing packets should not be supported.",
+      )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_stop_capture_success(
+          self, mock_adb_shell: MagicMock
+  ) -> None:
+      mock_adb_shell.return_value = "success"  # Successful command output
+      stop_capture_packets(
+          self.mock_ad, TEST_IFACE_NAME
+      )
+      mock_adb_shell.assert_called_once_with(
+          self.mock_ad,
+          "cmd network_stack capture stop"
+          f" {TEST_IFACE_NAME}"
+      )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_stop_capture_failure(
+          self, mock_adb_shell: MagicMock
+  ) -> None:
+      mock_adb_shell.return_value = (  # Unexpected command output
+          "Any Unexpected Output"
+      )
+      with asserts.assert_raises(UnexpectedBehaviorError):
+          stop_capture_packets(
+              self.mock_ad, TEST_IFACE_NAME
+          )
+      asserts.assert_true(
+          is_packet_capture_supported(self.mock_ad),
+          "Stop capturing packets should be supported.",
+      )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_stop_capture_unsupported(
+          self, mock_adb_shell: MagicMock
+  ) -> None:
+      mock_adb_shell.side_effect = AdbError(
+          cmd="", stdout="Unknown command", stderr="", ret_code=3
+      )
+      with asserts.assert_raises(UnsupportedOperationException):
+          stop_capture_packets(
+              self.mock_ad, TEST_IFACE_NAME
+          )
+      asserts.assert_false(
+          is_packet_capture_supported(self.mock_ad),
+          "Stop capturing packets should not be supported.",
+      )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_get_matched_packet_counts_success(
+          self, mock_adb_shell: MagicMock
+  ) -> None:
+      mock_adb_shell.return_value = "10"  # Successful command output
+      get_matched_packet_counts(
+          self.mock_ad, TEST_IFACE_NAME, TEST_PACKET_IN_HEX
+      )
+      mock_adb_shell.assert_called_once_with(
+          self.mock_ad,
+          "cmd network_stack capture matched-packet-counts"
+          f" {TEST_IFACE_NAME} {TEST_PACKET_IN_HEX}"
+      )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_get_matched_packet_counts_failure(
+          self, mock_adb_shell: MagicMock
+  ) -> None:
+      mock_adb_shell.return_value = (  # Unexpected command output
+          "Any Unexpected Output"
+      )
+      with asserts.assert_raises(UnexpectedBehaviorError):
+          get_matched_packet_counts(
+              self.mock_ad, TEST_IFACE_NAME, TEST_PACKET_IN_HEX
+          )
+      asserts.assert_true(
+          is_packet_capture_supported(self.mock_ad),
+          "Get matched packet counts should be supported.",
+      )
+
+  @patch("net_tests_utils.host.python.adb_utils.adb_shell")
+  def test_get_matched_packet_counts_unsupported(
+          self, mock_adb_shell: MagicMock
+  ) -> None:
+      mock_adb_shell.side_effect = AdbError(
+          cmd="", stdout="Unknown command", stderr="", ret_code=3
+      )
+      with asserts.assert_raises(UnsupportedOperationException):
+          get_matched_packet_counts(
+              self.mock_ad, TEST_IFACE_NAME, TEST_PACKET_IN_HEX
+          )
+      asserts.assert_false(
+          is_packet_capture_supported(self.mock_ad),
+          "Get matched packet counts should not be supported.",
+      )
+
   @parameterized.parameters(
       ("2,2048,1", ApfCapabilities(2, 2048, 1)),  # Valid input
       ("3,1024,0", ApfCapabilities(3, 1024, 0)),  # Valid input
diff --git a/staticlibs/testutils/host/python/apf_utils.py b/staticlibs/testutils/host/python/apf_utils.py
index e84ba3e..7fe60bd 100644
--- a/staticlibs/testutils/host/python/apf_utils.py
+++ b/staticlibs/testutils/host/python/apf_utils.py
@@ -178,6 +178,29 @@
         "Cannot get hardware address for " + iface_name
     )
 
+def is_packet_capture_supported(
+        ad: android_device.AndroidDevice,
+) -> bool:
+
+  # Invoke the shell command with empty argument and see how NetworkStack respond.
+  # If supported, an IllegalArgumentException with help page will be printed.
+  functions_with_args = (
+    # list all functions and args with (func, *args) tuple
+    (start_capture_packets, (ad, "")),
+    (stop_capture_packets, (ad, "")),
+    (get_matched_packet_counts, (ad, "", ""))
+  )
+
+  for func, args in functions_with_args:
+    try:
+      func(*args)
+    except UnsupportedOperationException:
+      return False
+    except Exception:
+      continue
+
+  # If no UnsupportOperationException is thrown, regard it as supported
+  return True
 
 def is_send_raw_packet_downstream_supported(
     ad: android_device.AndroidDevice,
@@ -224,25 +247,92 @@
         representation of a packet starting from L2 header.
   """
 
-  cmd = (
-      "cmd network_stack send-raw-packet-downstream"
-      f" {iface_name} {packet_in_hex}"
-  )
+  cmd = f"cmd network_stack send-raw-packet-downstream {iface_name} {packet_in_hex}"
 
   # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise.
-  try:
-    output = adb_utils.adb_shell(ad, cmd)
-  except AdbError as e:
-    output = str(e.stdout)
-  if output:
-    if "Unknown command" in output:
-      raise UnsupportedOperationException(
-          "send-raw-packet-downstream command is not supported."
-      )
+  adb_output = AdbOutputHandler(ad, cmd).get_output()
+  if adb_output:
     raise assert_utils.UnexpectedBehaviorError(
-        f"Got unexpected output: {output} for command: {cmd}."
+      f"Got unexpected output: {adb_output} for command: {cmd}."
     )
 
+def start_capture_packets(
+        ad: android_device.AndroidDevice,
+        iface_name: str
+) -> None:
+  """Starts packet capturing on a specified network interface.
+
+  This function initiates packet capture on the given network interface of an
+  Android device using an ADB shell command. It handles potential errors
+  related to unsupported commands or unexpected output.
+  This command only supports downstream tethering interface.
+
+  Args:
+    ad: The Android device object.
+    iface_name: The name of the network interface (e.g., "wlan0").
+  """
+  cmd = f"cmd network_stack capture start {iface_name}"
+
+  # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise.
+  adb_output = AdbOutputHandler(ad, cmd).get_output()
+  if adb_output != "success":
+    raise assert_utils.UnexpectedBehaviorError(
+      f"Got unexpected output: {adb_output} for command: {cmd}."
+    )
+
+def stop_capture_packets(
+        ad: android_device.AndroidDevice,
+        iface_name: str
+) -> None:
+  """Stops packet capturing on a specified network interface.
+
+  This function terminates packet capture on the given network interface of an
+  Android device using an ADB shell command. It handles potential errors
+  related to unsupported commands or unexpected output.
+
+  Args:
+    ad: The Android device object.
+    iface_name: The name of the network interface (e.g., "wlan0").
+  """
+  cmd = f"cmd network_stack capture stop {iface_name}"
+
+  # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise.
+  adb_output = AdbOutputHandler(ad, cmd).get_output()
+  if adb_output != "success":
+    raise assert_utils.UnexpectedBehaviorError(
+      f"Got unexpected output: {adb_output} for command: {cmd}."
+    )
+
+def get_matched_packet_counts(
+        ad: android_device.AndroidDevice,
+        iface_name: str,
+        packet_in_hex: str
+) -> int:
+  """Gets the number of captured packets matching a specific hexadecimal pattern.
+
+  This function retrieves the count of captured packets on the specified
+  network interface that match a given hexadecimal pattern. It uses an ADB
+  shell command and handles potential errors related to unsupported commands,
+  unexpected output, or invalid output format.
+
+  Args:
+    ad: The Android device object.
+    iface_name: The name of the network interface (e.g., "wlan0").
+    packet_in_hex: The hexadecimal string representing the packet pattern.
+
+  Returns:
+    The number of matched packets as an integer.
+  """
+  cmd = f"cmd network_stack capture matched-packet-counts {iface_name} {packet_in_hex}"
+
+  # Expect no output or Unknown command if NetworkStack is too old. Throw otherwise.
+  adb_output = AdbOutputHandler(ad, cmd).get_output()
+  try:
+    return int(adb_output)
+  except ValueError as e:
+    raise assert_utils.UnexpectedBehaviorError(
+      f"Got unexpected exception: {e} for command: {cmd}."
+    )
 
 @dataclass
 class ApfCapabilities:
@@ -304,3 +394,19 @@
       f"Supported apf version {caps.apf_version_supported} < expected version"
       f" {expected_version}",
   )
+
+class AdbOutputHandler:
+  def __init__(self, ad, cmd):
+    self._ad = ad
+    self._cmd = cmd
+
+  def get_output(self) -> str:
+    try:
+      return adb_utils.adb_shell(self._ad, self._cmd)
+    except AdbError as e:
+      output = str(e.stdout)
+      if "Unknown command" in output:
+        raise UnsupportedOperationException(
+          f"{self._cmd} is not supported."
+        )
+      return output
\ No newline at end of file