Quit NetworkMonitor threads in integration tests
Similarly to NetworkMonitorTest, terminate NetworkMonitor threads and
ExecutorServices after each test case in integration tests.
This should fix flakes where the HTTP probe from testValidation could
finish after mock probe responses are cleared in tearDown.
The utils used in NetworkMonitorTest are factored out to ConcurrentUtils
to achieve that.
Fixes: 292481238
Test: atest --rerun-until-failure 50
Change-Id: I5cdeee8d4f06ea40e338dd2cb1fdcc2fe0907a7d
diff --git a/staticlibs/testutils/hostdevice/com/android/testutils/ConcurrentUtils.kt b/staticlibs/testutils/hostdevice/com/android/testutils/ConcurrentUtils.kt
index af4f96d..c6e5f25 100644
--- a/staticlibs/testutils/hostdevice/com/android/testutils/ConcurrentUtils.kt
+++ b/staticlibs/testutils/hostdevice/com/android/testutils/ConcurrentUtils.kt
@@ -19,10 +19,77 @@
package com.android.testutils
import java.util.concurrent.CountDownLatch
+import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
+import java.util.function.Consumer
import kotlin.system.measureTimeMillis
+import kotlin.test.assertFalse
+import kotlin.test.assertTrue
// For Java usage
fun durationOf(fn: Runnable) = measureTimeMillis { fn.run() }
fun CountDownLatch.await(timeoutMs: Long): Boolean = await(timeoutMs, TimeUnit.MILLISECONDS)
+
+/**
+ * Quit resources provided as a list by a supplier.
+ *
+ * The supplier may return more resources as the process progresses, for example while interrupting
+ * threads and waiting for them to finish they may spawn more threads, so this implements a
+ * [maxRetryCount] which, in this case, would be the maximum length of the thread chain that can be
+ * terminated.
+ */
+fun <T> quitResources(
+ maxRetryCount: Int,
+ supplier: () -> List<T>,
+ terminator: Consumer<T>
+) {
+ // Run it multiple times since new threads might be generated in a thread
+ // that is about to be terminated
+ for (retryCount in 0 until maxRetryCount) {
+ val resourcesToBeCleared = supplier()
+ if (resourcesToBeCleared.isEmpty()) return
+ for (resource in resourcesToBeCleared) {
+ terminator.accept(resource)
+ }
+ }
+ assertEmpty(supplier())
+}
+
+/**
+ * Implementation of [quitResources] to interrupt and wait for [ExecutorService]s to finish.
+ */
+@JvmOverloads
+fun quitExecutorServices(
+ maxRetryCount: Int,
+ interrupt: Boolean = true,
+ timeoutMs: Long = 10_000L,
+ supplier: () -> List<ExecutorService>
+) {
+ quitResources(maxRetryCount, supplier) { ecs ->
+ if (interrupt) {
+ ecs.shutdownNow()
+ }
+ assertTrue(ecs.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS),
+ "ExecutorServices did not terminate within timeout")
+ }
+}
+
+/**
+ * Implementation of [quitResources] to interrupt and wait for [Thread]s to finish.
+ */
+@JvmOverloads
+fun quitThreads(
+ maxRetryCount: Int,
+ interrupt: Boolean = true,
+ timeoutMs: Long = 10_000L,
+ supplier: () -> List<Thread>
+) {
+ quitResources(maxRetryCount, supplier) { th ->
+ if (interrupt) {
+ th.interrupt()
+ }
+ th.join(timeoutMs)
+ assertFalse(th.isAlive, "Threads did not terminate within timeout.")
+ }
+}
diff --git a/tests/integration/src/com/android/server/net/integrationtests/NetworkStackInstrumentationService.kt b/tests/integration/src/com/android/server/net/integrationtests/NetworkStackInstrumentationService.kt
index 104d063..3d948ba 100644
--- a/tests/integration/src/com/android/server/net/integrationtests/NetworkStackInstrumentationService.kt
+++ b/tests/integration/src/com/android/server/net/integrationtests/NetworkStackInstrumentationService.kt
@@ -18,10 +18,14 @@
import android.app.Service
import android.content.Intent
+import androidx.annotation.GuardedBy
+import com.android.testutils.quitExecutorServices
+import com.android.testutils.quitThreads
import java.net.URL
import java.util.Collections
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.ExecutorService
import kotlin.collections.ArrayList
import kotlin.test.fail
@@ -37,7 +41,12 @@
.run {
withDefault { key -> getOrPut(key) { ConcurrentLinkedQueue() } }
}
- private val httpRequestUrls = Collections.synchronizedList(ArrayList<String>())
+ private val httpRequestUrls = Collections.synchronizedList(mutableListOf<String>())
+
+ @GuardedBy("networkMonitorThreads")
+ private val networkMonitorThreads = mutableListOf<Thread>()
+ @GuardedBy("networkMonitorExecutorServices")
+ private val networkMonitorExecutorServices = mutableListOf<ExecutorService>()
/**
* Called when an HTTP request is being processed by NetworkMonitor. Returns the response
@@ -52,10 +61,47 @@
}
/**
+ * Called when NetworkMonitor creates a new Thread.
+ */
+ fun onNetworkMonitorThreadCreated(thread: Thread) {
+ synchronized(networkMonitorThreads) {
+ networkMonitorThreads.add(thread)
+ }
+ }
+
+ /**
+ * Called when NetworkMonitor creates a new ExecutorService.
+ */
+ fun onNetworkMonitorExecutorServiceCreated(executorService: ExecutorService) {
+ synchronized(networkMonitorExecutorServices) {
+ networkMonitorExecutorServices.add(executorService)
+ }
+ }
+
+ /**
* Clear all state of this connector. This is intended for use between two tests, so all
* state should be reset as if the connector was just created.
*/
override fun clearAllState() {
+ quitThreads(
+ maxRetryCount = 3,
+ interrupt = true) {
+ synchronized(networkMonitorThreads) {
+ networkMonitorThreads.toList().also { networkMonitorThreads.clear() }
+ }
+ }
+ quitExecutorServices(
+ maxRetryCount = 3,
+ // NetworkMonitor is expected to have interrupted its executors when probing
+ // finishes, otherwise it's a thread pool leak that should be caught, so they should
+ // not need to be interrupted (the test only needs to wait for them to finish).
+ interrupt = false) {
+ synchronized(networkMonitorExecutorServices) {
+ networkMonitorExecutorServices.toList().also {
+ networkMonitorExecutorServices.clear()
+ }
+ }
+ }
httpResponses.clear()
httpRequestUrls.clear()
}
diff --git a/tests/integration/src/com/android/server/net/integrationtests/TestNetworkStackService.kt b/tests/integration/src/com/android/server/net/integrationtests/TestNetworkStackService.kt
index 7e227c4..e43ce29 100644
--- a/tests/integration/src/com/android/server/net/integrationtests/TestNetworkStackService.kt
+++ b/tests/integration/src/com/android/server/net/integrationtests/TestNetworkStackService.kt
@@ -30,13 +30,14 @@
import com.android.server.NetworkStackService.NetworkStackConnector
import com.android.server.connectivity.NetworkMonitor
import com.android.server.net.integrationtests.NetworkStackInstrumentationService.InstrumentationConnector
-import org.mockito.Mockito.doReturn
-import org.mockito.Mockito.mock
-import org.mockito.Mockito.spy
import java.io.ByteArrayInputStream
import java.net.HttpURLConnection
import java.net.URL
import java.nio.charset.StandardCharsets
+import java.util.concurrent.ExecutorService
+import org.mockito.Mockito.doReturn
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.spy
private const val TEST_NETID = 42
@@ -60,6 +61,10 @@
private class NetworkMonitorDeps(private val privateDnsBypassNetwork: Network) :
NetworkMonitor.Dependencies() {
override fun getPrivateDnsBypassNetwork(network: Network?) = privateDnsBypassNetwork
+ override fun onThreadCreated(thread: Thread) =
+ InstrumentationConnector.onNetworkMonitorThreadCreated(thread)
+ override fun onExecutorServiceCreated(ecs: ExecutorService) =
+ InstrumentationConnector.onNetworkMonitorExecutorServiceCreated(ecs)
}
/**