Merge "Quit NetworkMonitor threads in integration tests" into main
diff --git a/staticlibs/testutils/hostdevice/com/android/testutils/ConcurrentUtils.kt b/staticlibs/testutils/hostdevice/com/android/testutils/ConcurrentUtils.kt
index af4f96d..c6e5f25 100644
--- a/staticlibs/testutils/hostdevice/com/android/testutils/ConcurrentUtils.kt
+++ b/staticlibs/testutils/hostdevice/com/android/testutils/ConcurrentUtils.kt
@@ -19,10 +19,77 @@
 package com.android.testutils
 
 import java.util.concurrent.CountDownLatch
+import java.util.concurrent.ExecutorService
 import java.util.concurrent.TimeUnit
+import java.util.function.Consumer
 import kotlin.system.measureTimeMillis
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
 
 // For Java usage
 fun durationOf(fn: Runnable) = measureTimeMillis { fn.run() }
 
 fun CountDownLatch.await(timeoutMs: Long): Boolean = await(timeoutMs, TimeUnit.MILLISECONDS)
+
+/**
+ * Quit resources provided as a list by a supplier.
+ *
+ * The supplier may return more resources as the process progresses, for example while interrupting
+ * threads and waiting for them to finish they may spawn more threads, so this implements a
+ * [maxRetryCount] which, in this case, would be the maximum length of the thread chain that can be
+ * terminated.
+ */
+fun <T> quitResources(
+    maxRetryCount: Int,
+    supplier: () -> List<T>,
+    terminator: Consumer<T>
+) {
+    // Run it multiple times since new threads might be generated in a thread
+    // that is about to be terminated
+    for (retryCount in 0 until maxRetryCount) {
+        val resourcesToBeCleared = supplier()
+        if (resourcesToBeCleared.isEmpty()) return
+        for (resource in resourcesToBeCleared) {
+            terminator.accept(resource)
+        }
+    }
+    assertEmpty(supplier())
+}
+
+/**
+ * Implementation of [quitResources] to interrupt and wait for [ExecutorService]s to finish.
+ */
+@JvmOverloads
+fun quitExecutorServices(
+    maxRetryCount: Int,
+    interrupt: Boolean = true,
+    timeoutMs: Long = 10_000L,
+    supplier: () -> List<ExecutorService>
+) {
+    quitResources(maxRetryCount, supplier) { ecs ->
+        if (interrupt) {
+            ecs.shutdownNow()
+        }
+        assertTrue(ecs.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS),
+            "ExecutorServices did not terminate within timeout")
+    }
+}
+
+/**
+ * Implementation of [quitResources] to interrupt and wait for [Thread]s to finish.
+ */
+@JvmOverloads
+fun quitThreads(
+    maxRetryCount: Int,
+    interrupt: Boolean = true,
+    timeoutMs: Long = 10_000L,
+    supplier: () -> List<Thread>
+) {
+    quitResources(maxRetryCount, supplier) { th ->
+        if (interrupt) {
+            th.interrupt()
+        }
+        th.join(timeoutMs)
+        assertFalse(th.isAlive, "Threads did not terminate within timeout.")
+    }
+}
diff --git a/tests/integration/src/com/android/server/net/integrationtests/NetworkStackInstrumentationService.kt b/tests/integration/src/com/android/server/net/integrationtests/NetworkStackInstrumentationService.kt
index 104d063..3d948ba 100644
--- a/tests/integration/src/com/android/server/net/integrationtests/NetworkStackInstrumentationService.kt
+++ b/tests/integration/src/com/android/server/net/integrationtests/NetworkStackInstrumentationService.kt
@@ -18,10 +18,14 @@
 
 import android.app.Service
 import android.content.Intent
+import androidx.annotation.GuardedBy
+import com.android.testutils.quitExecutorServices
+import com.android.testutils.quitThreads
 import java.net.URL
 import java.util.Collections
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.ExecutorService
 import kotlin.collections.ArrayList
 import kotlin.test.fail
 
@@ -37,7 +41,12 @@
                 .run {
                     withDefault { key -> getOrPut(key) { ConcurrentLinkedQueue() } }
                 }
-        private val httpRequestUrls = Collections.synchronizedList(ArrayList<String>())
+        private val httpRequestUrls = Collections.synchronizedList(mutableListOf<String>())
+
+        @GuardedBy("networkMonitorThreads")
+        private val networkMonitorThreads = mutableListOf<Thread>()
+        @GuardedBy("networkMonitorExecutorServices")
+        private val networkMonitorExecutorServices = mutableListOf<ExecutorService>()
 
         /**
          * Called when an HTTP request is being processed by NetworkMonitor. Returns the response
@@ -52,10 +61,47 @@
         }
 
         /**
+         * Called when NetworkMonitor creates a new Thread.
+         */
+        fun onNetworkMonitorThreadCreated(thread: Thread) {
+            synchronized(networkMonitorThreads) {
+                networkMonitorThreads.add(thread)
+            }
+        }
+
+        /**
+         * Called when NetworkMonitor creates a new ExecutorService.
+         */
+        fun onNetworkMonitorExecutorServiceCreated(executorService: ExecutorService) {
+            synchronized(networkMonitorExecutorServices) {
+                networkMonitorExecutorServices.add(executorService)
+            }
+        }
+
+        /**
          * Clear all state of this connector. This is intended for use between two tests, so all
          * state should be reset as if the connector was just created.
          */
         override fun clearAllState() {
+            quitThreads(
+                maxRetryCount = 3,
+                interrupt = true) {
+                synchronized(networkMonitorThreads) {
+                    networkMonitorThreads.toList().also { networkMonitorThreads.clear() }
+                }
+            }
+            quitExecutorServices(
+                maxRetryCount = 3,
+                // NetworkMonitor is expected to have interrupted its executors when probing
+                // finishes, otherwise it's a thread pool leak that should be caught, so they should
+                // not need to be interrupted (the test only needs to wait for them to finish).
+                interrupt = false) {
+                synchronized(networkMonitorExecutorServices) {
+                    networkMonitorExecutorServices.toList().also {
+                        networkMonitorExecutorServices.clear()
+                    }
+                }
+            }
             httpResponses.clear()
             httpRequestUrls.clear()
         }
diff --git a/tests/integration/src/com/android/server/net/integrationtests/TestNetworkStackService.kt b/tests/integration/src/com/android/server/net/integrationtests/TestNetworkStackService.kt
index 7e227c4..e43ce29 100644
--- a/tests/integration/src/com/android/server/net/integrationtests/TestNetworkStackService.kt
+++ b/tests/integration/src/com/android/server/net/integrationtests/TestNetworkStackService.kt
@@ -30,13 +30,14 @@
 import com.android.server.NetworkStackService.NetworkStackConnector
 import com.android.server.connectivity.NetworkMonitor
 import com.android.server.net.integrationtests.NetworkStackInstrumentationService.InstrumentationConnector
-import org.mockito.Mockito.doReturn
-import org.mockito.Mockito.mock
-import org.mockito.Mockito.spy
 import java.io.ByteArrayInputStream
 import java.net.HttpURLConnection
 import java.net.URL
 import java.nio.charset.StandardCharsets
+import java.util.concurrent.ExecutorService
+import org.mockito.Mockito.doReturn
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.spy
 
 private const val TEST_NETID = 42
 
@@ -60,6 +61,10 @@
     private class NetworkMonitorDeps(private val privateDnsBypassNetwork: Network) :
             NetworkMonitor.Dependencies() {
         override fun getPrivateDnsBypassNetwork(network: Network?) = privateDnsBypassNetwork
+        override fun onThreadCreated(thread: Thread) =
+            InstrumentationConnector.onNetworkMonitorThreadCreated(thread)
+        override fun onExecutorServiceCreated(ecs: ExecutorService) =
+            InstrumentationConnector.onNetworkMonitorExecutorServiceCreated(ecs)
     }
 
     /**