Add apf_utils for multi-device tests

Test: atest NetworkStaticLibHostPythonTests
Bug: 335368434
Change-Id: I79064abcb88fbcee00ec9ec998ba5a3e2d3478a6
diff --git a/staticlibs/tests/unit/host/python/apf_utils_test.py b/staticlibs/tests/unit/host/python/apf_utils_test.py
new file mode 100644
index 0000000..f7fb93b
--- /dev/null
+++ b/staticlibs/tests/unit/host/python/apf_utils_test.py
@@ -0,0 +1,77 @@
+#  Copyright (C) 2024 The Android Open Source Project
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import unittest
+from unittest.mock import MagicMock, patch
+from net_tests_utils.host.python.apf_utils import (
+    PatternNotFoundException,
+    get_apf_counter,
+    get_apf_counters_from_dumpsys,
+)
+
+
+class TestApfUtils(unittest.TestCase):
+
+  def setUp(self):
+    self.mock_ad = MagicMock()  # Mock Android device object
+
+  @patch("net_tests_utils.host.python.adb_utils.get_dumpsys_for_service")
+  def test_get_apf_counters_from_dumpsys_success(
+      self, mock_get_dumpsys: MagicMock
+  ) -> None:
+    mock_get_dumpsys.return_value = """
+IpClient.wlan0
+  APF packet counters:
+    COUNTER_NAME1: 123
+    COUNTER_NAME2: 456
+"""
+    iface_name = "wlan0"
+    counters = get_apf_counters_from_dumpsys(self.mock_ad, iface_name)
+    self.assertEqual(counters, {"COUNTER_NAME1": 123, "COUNTER_NAME2": 456})
+
+  @patch("net_tests_utils.host.python.adb_utils.get_dumpsys_for_service")
+  def test_get_apf_counters_from_dumpsys_exceptions(
+      self, mock_get_dumpsys: MagicMock
+  ) -> None:
+    test_cases = [
+        "",
+        "IpClient.wlan0\n",
+        "IpClient.wlan0\n APF packet counters:\n",
+        """
+IpClient.wlan1
+  APF packet counters:
+    COUNTER_NAME1: 123
+    COUNTER_NAME2: 456
+""",
+    ]
+
+    for dumpsys_output in test_cases:
+      mock_get_dumpsys.return_value = dumpsys_output
+      with self.assertRaises(PatternNotFoundException):
+        get_apf_counters_from_dumpsys(self.mock_ad, "wlan0")
+
+  @patch("net_tests_utils.host.python.apf_utils.get_apf_counters_from_dumpsys")
+  def test_get_apf_counter(self, mock_get_counters: MagicMock) -> None:
+    iface = "wlan0"
+    mock_get_counters.return_value = {
+        "COUNTER_NAME1": 123,
+        "COUNTER_NAME2": 456,
+    }
+    self.assertEqual(get_apf_counter(self.mock_ad, iface, "COUNTER_NAME1"), 123)
+    # Not found
+    self.assertEqual(get_apf_counter(self.mock_ad, iface, "COUNTER_NAME3"), 0)
+
+
+if __name__ == "__main__":
+  unittest.main()
diff --git a/staticlibs/tests/unit/host/python/run_tests.py b/staticlibs/tests/unit/host/python/run_tests.py
index 7d58c6e..c44f1a8 100644
--- a/staticlibs/tests/unit/host/python/run_tests.py
+++ b/staticlibs/tests/unit/host/python/run_tests.py
@@ -19,6 +19,7 @@
 # Import all unittest classes here, so it can be discovered by unittest module.
 # TODO: make the tests can be executed without manually import classes.
 from host.python.adb_utils_test import TestAdbUtils
+from host.python.apf_utils_test import TestApfUtils
 from host.python.assert_utils_test import TestAssertUtils
 
 
diff --git a/staticlibs/testutils/host/python/apf_utils.py b/staticlibs/testutils/host/python/apf_utils.py
new file mode 100644
index 0000000..331e970
--- /dev/null
+++ b/staticlibs/testutils/host/python/apf_utils.py
@@ -0,0 +1,77 @@
+#  Copyright (C) 2024 The Android Open Source Project
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+import re
+from mobly.controllers import android_device
+from net_tests_utils.host.python import adb_utils
+
+
+class PatternNotFoundException(Exception):
+  """Raised when the given pattern cannot be found."""
+
+
+def get_apf_counter(
+    ad: android_device.AndroidDevice, iface: str, counter_name: str
+) -> int:
+  counters = get_apf_counters_from_dumpsys(ad, iface)
+  return counters.get(counter_name, 0)
+
+
+def get_apf_counters_from_dumpsys(
+    ad: android_device.AndroidDevice, iface_name: str
+) -> dict:
+  dumpsys = adb_utils.get_dumpsys_for_service(ad, "network_stack")
+
+  # Extract IpClient section of the specified interface.
+  # This takes inputs like:
+  # IpClient.wlan0
+  #   ...
+  # IpClient.wlan1
+  #   ...
+  iface_pattern = re.compile(
+      r"^IpClient\." + iface_name + r"\n" + r"((^\s.*\n)+)", re.MULTILINE
+  )
+  iface_result = iface_pattern.search(dumpsys)
+  if iface_result is None:
+    raise PatternNotFoundException("Cannot find IpClient for " + iface_name)
+
+  # Extract APF counters section from IpClient section, which looks like:
+  #     APF packet counters:
+  #       COUNTER_NAME: VALUE
+  #       ....
+  apf_pattern = re.compile(
+      r"APF packet counters:.*\n.(\s+[A-Z_0-9]+: \d+\n)+", re.MULTILINE
+  )
+  apf_result = apf_pattern.search(iface_result.group(0))
+  if apf_result is None:
+    raise PatternNotFoundException(
+        "Cannot find APF counters in text: " + iface_result.group(0)
+    )
+
+  # Extract key-value pairs from APF counters section into a list of tuples,
+  # e.g. [('COUNTER1', '1'), ('COUNTER2', '2')].
+  counter_pattern = re.compile(r"(?P<name>[A-Z_0-9]+): (?P<value>\d+)")
+  counter_result = counter_pattern.findall(apf_result.group(0))
+  if counter_result is None:
+    raise PatternNotFoundException(
+        "Cannot extract APF counters in text: " + apf_result.group(0)
+    )
+
+  # Convert into a dict.
+  result = {}
+  for key, value_str in counter_result:
+    result[key] = int(value_str)
+
+  ad.log.debug("Getting apf counters: " + str(result))
+  return result